#!/usr/bin/env python # -*- coding: utf_8 -*- import logging import copy from datetime import datetime from pony.orm import * from model import fullStackDB from model.user import User from utils import md5_salt logger = logging.getLogger(__name__) class UserManager(object): ''' 用户管理的单例类 ''' def __init__(self): super(UserManager, self).__init__() def check(self, data): with db_session: user = User.get(uuid=data.get("uuid")) if not user: return False return True def add(self, uuid, data): ''' 添加用户 ''' # 判断账号是否存在 isExists = select(u for u in User if u.account == data['account'] and u.is_delete == False).exists() if isExists: return False, "user already exists" editor = User.get(id=uuid) if not editor: return False, "current user is not exists" if "username" not in data or not data.get("username"): data.update({ "username": data.get("account") }) # 密码加密 data['password'] = md5_salt(data['password']) data.update({ "create_by": editor.id, "create_at": datetime.now(), "update_by": editor.id, "update_at": datetime.now() }) # 添加用户时,是否考虑将该用户的过期时间加入预警 result = fullStackDB.add(User, **data) return result, "add user {}.".format("success" if result else "fail") def delete(self, uuid): ''' 删除用户 ''' with db_session: editor = User.get(uuid=uuid) if not editor: return False, "current user is not exists" result = User.get(uuid=uuid) if result: result.delete() commit() result = True return result, "delete user {}.".format("success" if result else "fail") def update(self, uuid, data): ''' 更新用户 ''' # 当参数为空时,直接返回错误 if len(data) <= 0 or (len(data.keys()) == 1 and "id" in data): return False, "parameters can not be null." with db_session: # 查询请求者是否存在 editor = User.get(uuid=uuid) if not editor: return False, "current user is not exists" if "password" in data: data["password"] = md5_salt(data['password']) user = User.get(uuid=uuid) if user: user.set(update_at=datetime.now(), update_by=editor.id, **data) result = user.to_dict(only=["account", "gender", "birthday", "phone", "email",]) if result.get("birthday"): result.update({ "birthday": result.get("birthday").strftime("%Y-%m-%d") }) return result, "update user success" else: return None, "user does not exists" def get(self, user): ''' 查询单用户 ''' result = User.get(id=user, is_delete=False) if result: temp = result.to_dict(with_collections=True, related_objects=True, only=["uuid", "username", "account", "role", "phone", "email", "gender", "create_at", "update_at"]) temp.update({ "create_at": result.create_at.strftime("%Y-%m-%d %H:%M:%S") if result.create_at else None, "update_at": result.update_at.strftime("%Y-%m-%d %H:%M:%S") if result.update_at else None, }) result = temp return result, "get user {}.".format("success" if result else "not found") def getList(self, data): ''' 查询多用户 ''' # 当参数为空时,直接返回错误 # if not data or len(data) <= 0: # return False, 0, "parameters can not be null." temp = copy.deepcopy(data) if 'pagenum' in temp: temp.pop('pagenum') if 'pagesize' in temp: temp.pop('pagesize') if 'scope_type' in temp: temp.pop('scope_type') temp.setdefault("is_delete", False) if "scope_type" in data and data.get("scope_type") == "list": result = User.select().where(**temp).order_by(desc(User.create_at)) temp = [] for item in result: t = item.to_dict(only=["id", "uuid", "username", "account"]) temp.append(t) return temp, len(temp), "get select {}.".format("success" if temp else "no data") logger.info(temp) result = User.select().where(**temp).order_by(desc(User.create_at)).page(data.get("pagenum", 1), data.get("pagesize", 10)) count = User.select().where(**temp).count() if result and len(result): temp = [] for item in result: t = item.to_dict(with_collections=True, related_objects=True, only=["uuid", "username", "account", "phone", "email", "gender", "create_at", "update_at", "remarks"]) t.update({ "email": "" if item.email == "user@example.com" else item.email, "create_at": item.create_at.strftime("%Y-%m-%d %H:%M:%S"), "update_at": item.update_at.strftime("%Y-%m-%d %H:%M:%S"), }) temp.append(t) result = temp return result, count, "get users {}.".format("success" if result else "no data") userManager = UserManager()