dictionary_manager.py 4.75 KB
Newer Older
wanli's avatar
wanli committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
#!/usr/bin/env python
# -*- coding: utf_8 -*-

import copy
import time
import types
import json
import logging
import traceback
from datetime import datetime
from pony.orm import select, desc
from flask import request
from model import fullStackDB
from model.dictionary import Dict
from model.user import User
from utils import sql_filter

logger = logging.getLogger("DictManager")

class DictManager(object):
    def __init__(self):
        super(DictManager, self).__init__()

    def add(self, data):
        # 判断相同的类别和标签是否存在
        result = Dict.get(label=data.get("label"), value=data.get("value"), category=data.get("category"))
        if result:
            return False, "dictionary has been exists."

        editor = User.get(id=request.current_user.get("id"))
        if not editor:
            return False, "current user is not exists"

        data.update({
            'create_by': editor,
            'create_at': datetime.now(),
            'update_by': editor,
            'update_at': datetime.now(),
        })

        result = fullStackDB.add(Dict, **data)
        return result, "add dictionary {}.".format("success" if result else "fail")

    def delete(self, uuid):
        editor = User.get(id=request.current_user.get("id"))
        if not editor:
            return False, "current user is not exists"

        result = Dict.get(uuid=uuid)
        if not result:
            return False, "dict does not exists"
        else:
            if result.is_system:
                return False, "dict can not delete"

        result = fullStackDB.update(Dict, { 'uuid': uuid }, is_delete=True, delete_at=datetime.now(), delete_by=editor)
        return result, "delete dictionary {}.".format("success" if result else "fail")

    def get(self, data):
        result = Dict.get(**data)
        if result:
            result = result.to_dict(only=["uuid", "label", "create_at", "update_at"])
        return result, "get dictionary {}.".format("success" if result else "no data")

    def getList(self, data):
        if not data or len(data) <= 0:
            return False, 0, "parameters can not be null."

        temp = copy.deepcopy(data)
        if 'pagenum' in temp:
            temp.pop('pagenum')
        if 'pagesize' in temp:
            temp.pop('pagesize')
        if 'scope_type' in temp:
            temp.pop('scope_type')
        if 'is_category' in temp:
            temp.pop('is_category')
        if 'category' in temp and isinstance(data.get("category"), list):
            temp.pop("category")
        temp.setdefault("is_delete", False)

        if "scope_type" in data and data.get("scope_type") == "list":
            result = Dict.select().where(**temp).order_by(Dict.sort)
            temp = []
            for item in result:
                if item.category in data.get("category"):
                    temp.append(item.to_dict(only=["uuid", "label", "value", "category"]))
            return temp, len(temp), "get dictionary list {}.".format("success" if temp else "no data")
        elif "is_category" in data and data.get("is_category") == 1:
            result = select(d.category for d in Dict)
            # result = Dict.select_by_sql("SELECT DISTINCT(`category`) FROM `{}`".format(Dict._table_))
            return list(result), len(result), "get select {}.".format("success" if result else "fail")

        result = Dict.select().where(**temp).order_by(desc(Dict.create_at)).page(data.get("pagenum", 1), data.get("pagesize", 10))
        count = Dict.select().where(**temp).count()

        if result:
            temp = []
            for item in result:
                t = item.to_dict(with_collections=True, related_objects=True,)
                t.update({
                    "create_at": item.create_at.strftime("%Y-%m-%d %H:%M:%S"),
                    "update_at": item.update_at.strftime("%Y-%m-%d %H:%M:%S"),
                    "create_by": item.create_by.to_dict(only=["uuid", "username"]),
                    "update_by": item.update_by.to_dict(only=["uuid", "username"])
                })
                temp.append(t)
            result = temp

        return result, count, "get dictionary {}.".format("success" if result else "fail")

    def update(self, uuid, data):
        # 当参数为空时,直接返回错误
        if len(data) <= 0 or (len(data.keys()) == 1 and "id" in data):
            return False, "parameters can not be null."

        # 查询请求者是否存在
        editor = User.get(id=request.current_user.get("id"))
        if not editor:
            return False, "current user is not exists"

        result = fullStackDB.update(Dict, { 'uuid': uuid }, update_at=datetime.now(), update_by=editor, **data)
        return result, "update dictionary {}.".format("success" if result else "fail")

dictManager = DictManager()