#!/usr/bin/env python # -*- coding: utf_8 -*- import copy import time import types import json import logging import traceback from datetime import datetime from pony.orm import select, desc from flask import request from model import fullStackDB from model.dictionary import Dict from model.user import User from utils import sql_filter logger = logging.getLogger("DictManager") class DictManager(object): def __init__(self): super(DictManager, self).__init__() def add(self, data): # 判断相同的类别和标签是否存在 result = Dict.get(label=data.get("label"), value=data.get("value"), category=data.get("category")) if result: return False, "dictionary has been exists." editor = User.get(id=request.current_user.get("id")) if not editor: return False, "current user is not exists" data.update({ 'create_by': editor, 'create_at': datetime.now(), 'update_by': editor, 'update_at': datetime.now(), }) result = fullStackDB.add(Dict, **data) return result, "add dictionary {}.".format("success" if result else "fail") def delete(self, uuid): editor = User.get(id=request.current_user.get("id")) if not editor: return False, "current user is not exists" result = Dict.get(uuid=uuid) if not result: return False, "dict does not exists" else: if result.is_system: return False, "dict can not delete" result = fullStackDB.update(Dict, { 'uuid': uuid }, is_delete=True, delete_at=datetime.now(), delete_by=editor) return result, "delete dictionary {}.".format("success" if result else "fail") def get(self, data): result = Dict.get(**data) if result: result = result.to_dict(only=["uuid", "label", "create_at", "update_at"]) return result, "get dictionary {}.".format("success" if result else "no data") def getList(self, data): if not data or len(data) <= 0: return False, 0, "parameters can not be null." temp = copy.deepcopy(data) if 'pagenum' in temp: temp.pop('pagenum') if 'pagesize' in temp: temp.pop('pagesize') if 'scope_type' in temp: temp.pop('scope_type') if 'is_category' in temp: temp.pop('is_category') if 'category' in temp and isinstance(data.get("category"), list): temp.pop("category") temp.setdefault("is_delete", False) if "scope_type" in data and data.get("scope_type") == "list": result = Dict.select().where(**temp).order_by(Dict.sort) temp = [] for item in result: if item.category in data.get("category"): temp.append(item.to_dict(only=["uuid", "label", "value", "category"])) return temp, len(temp), "get dictionary list {}.".format("success" if temp else "no data") elif "is_category" in data and data.get("is_category") == 1: result = select(d.category for d in Dict) # result = Dict.select_by_sql("SELECT DISTINCT(`category`) FROM `{}`".format(Dict._table_)) return list(result), len(result), "get select {}.".format("success" if result else "fail") result = Dict.select().where(**temp).order_by(desc(Dict.create_at)).page(data.get("pagenum", 1), data.get("pagesize", 10)) count = Dict.select().where(**temp).count() if result: temp = [] for item in result: t = item.to_dict(with_collections=True, related_objects=True,) t.update({ "create_at": item.create_at.strftime("%Y-%m-%d %H:%M:%S"), "update_at": item.update_at.strftime("%Y-%m-%d %H:%M:%S"), "create_by": item.create_by.to_dict(only=["uuid", "username"]), "update_by": item.update_by.to_dict(only=["uuid", "username"]) }) temp.append(t) result = temp return result, count, "get dictionary {}.".format("success" if result else "fail") def update(self, uuid, data): # 当参数为空时,直接返回错误 if len(data) <= 0 or (len(data.keys()) == 1 and "id" in data): return False, "parameters can not be null." # 查询请求者是否存在 editor = User.get(id=request.current_user.get("id")) if not editor: return False, "current user is not exists" result = fullStackDB.update(Dict, { 'uuid': uuid }, update_at=datetime.now(), update_by=editor, **data) return result, "update dictionary {}.".format("success" if result else "fail") dictManager = DictManager()