#!/usr/bin/env python # -*- coding: utf_8 -*- import copy import logging from datetime import datetime from pony.orm import select, desc from flask import request from model import fullStackDB from model.dictionary import Dict from model.user import User logger = logging.getLogger(__name__) class DictManager(object): def __init__(self): super(DictManager, self).__init__() def add(self, data): # 判断相同的类别和标签是否存在 result = Dict.get(label=data.get("label"), value=data.get("value"), category=data.get("category")) if result: return False, "dictionary has been exists." editor = User.get(id=request.current_user.get("id")) if not editor: return False, "current user is not exists" data.update({ 'create_by': editor, 'create_at': datetime.now(), 'update_by': editor, 'update_at': datetime.now(), }) result = fullStackDB.add(Dict, **data) return result, "add dictionary {}.".format("success" if result else "fail") def delete(self, uuid): editor = User.get(id=request.current_user.get("id")) if not editor: return False, "current user is not exists" result = Dict.get(uuid=uuid) if not result: return False, "dict does not exists" else: if result.is_system: return False, "dict can not delete" result = fullStackDB.update(Dict, { 'uuid': uuid }, is_delete=True, delete_at=datetime.now(), delete_by=editor) return result, "delete dictionary {}.".format("success" if result else "fail") def get(self, data): result = Dict.get(**data) if result: result = result.to_dict(only=["uuid", "label", "create_at", "update_at"]) return result, "get dictionary {}.".format("success" if result else "no data") def getList(self, data): if not data or len(data) <= 0: return False, 0, "parameters can not be null." temp = copy.deepcopy(data) if 'pagenum' in temp: temp.pop('pagenum') if 'pagesize' in temp: temp.pop('pagesize') if 'scope_type' in temp: temp.pop('scope_type') if 'is_category' in temp: temp.pop('is_category') if 'category' in temp and isinstance(data.get("category"), list): temp.pop("category") temp.setdefault("is_delete", False) if "scope_type" in data and data.get("scope_type") == "list": result = Dict.select().where(**temp).order_by(Dict.sort) temp = [] for item in result: if item.category in data.get("category"): temp.append(item.to_dict(only=["uuid", "label", "value", "category"])) return temp, len(temp), "get dictionary list {}.".format("success" if temp else "no data") elif "is_category" in data and data.get("is_category") == 1: result = select(d.category for d in Dict) # result = Dict.select_by_sql("SELECT DISTINCT(`category`) FROM `{}`".format(Dict._table_)) return list(result), len(result), "get select {}.".format("success" if result else "fail") result = Dict.select().where(**temp).order_by(desc(Dict.create_at)).page(data.get("pagenum", 1), data.get("pagesize", 10)) count = Dict.select().where(**temp).count() if result: temp = [] for item in result: t = item.to_dict(with_collections=True, related_objects=True,) t.update({ "create_at": item.create_at.strftime("%Y-%m-%d %H:%M:%S"), "update_at": item.update_at.strftime("%Y-%m-%d %H:%M:%S"), "create_by": item.create_by.to_dict(only=["uuid", "username"]), "update_by": item.update_by.to_dict(only=["uuid", "username"]) }) temp.append(t) result = temp return result, count, "get dictionary {}.".format("success" if result else "fail") def update(self, uuid, data): # 当参数为空时,直接返回错误 if len(data) <= 0 or (len(data.keys()) == 1 and "id" in data): return False, "parameters can not be null." # 查询请求者是否存在 editor = User.get(id=request.current_user.get("id")) if not editor: return False, "current user is not exists" result = fullStackDB.update(Dict, { 'uuid': uuid }, update_at=datetime.now(), update_by=editor, **data) return result, "update dictionary {}.".format("success" if result else "fail") dictManager = DictManager()