dictionary_manager.py 4.66 KB
Newer Older
wanli's avatar
wanli committed
1 2 3 4 5 6 7 8 9 10 11 12
#!/usr/bin/env python
# -*- coding: utf_8 -*-

import copy
import logging
from datetime import datetime
from pony.orm import select, desc
from flask import request
from model import fullStackDB
from model.dictionary import Dict
from model.user import User

wanli's avatar
wanli committed
13
logger = logging.getLogger(__name__)
wanli's avatar
wanli committed
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120

class DictManager(object):
    def __init__(self):
        super(DictManager, self).__init__()

    def add(self, data):
        # 判断相同的类别和标签是否存在
        result = Dict.get(label=data.get("label"), value=data.get("value"), category=data.get("category"))
        if result:
            return False, "dictionary has been exists."

        editor = User.get(id=request.current_user.get("id"))
        if not editor:
            return False, "current user is not exists"

        data.update({
            'create_by': editor,
            'create_at': datetime.now(),
            'update_by': editor,
            'update_at': datetime.now(),
        })

        result = fullStackDB.add(Dict, **data)
        return result, "add dictionary {}.".format("success" if result else "fail")

    def delete(self, uuid):
        editor = User.get(id=request.current_user.get("id"))
        if not editor:
            return False, "current user is not exists"

        result = Dict.get(uuid=uuid)
        if not result:
            return False, "dict does not exists"
        else:
            if result.is_system:
                return False, "dict can not delete"

        result = fullStackDB.update(Dict, { 'uuid': uuid }, is_delete=True, delete_at=datetime.now(), delete_by=editor)
        return result, "delete dictionary {}.".format("success" if result else "fail")

    def get(self, data):
        result = Dict.get(**data)
        if result:
            result = result.to_dict(only=["uuid", "label", "create_at", "update_at"])
        return result, "get dictionary {}.".format("success" if result else "no data")

    def getList(self, data):
        if not data or len(data) <= 0:
            return False, 0, "parameters can not be null."

        temp = copy.deepcopy(data)
        if 'pagenum' in temp:
            temp.pop('pagenum')
        if 'pagesize' in temp:
            temp.pop('pagesize')
        if 'scope_type' in temp:
            temp.pop('scope_type')
        if 'is_category' in temp:
            temp.pop('is_category')
        if 'category' in temp and isinstance(data.get("category"), list):
            temp.pop("category")
        temp.setdefault("is_delete", False)

        if "scope_type" in data and data.get("scope_type") == "list":
            result = Dict.select().where(**temp).order_by(Dict.sort)
            temp = []
            for item in result:
                if item.category in data.get("category"):
                    temp.append(item.to_dict(only=["uuid", "label", "value", "category"]))
            return temp, len(temp), "get dictionary list {}.".format("success" if temp else "no data")
        elif "is_category" in data and data.get("is_category") == 1:
            result = select(d.category for d in Dict)
            # result = Dict.select_by_sql("SELECT DISTINCT(`category`) FROM `{}`".format(Dict._table_))
            return list(result), len(result), "get select {}.".format("success" if result else "fail")

        result = Dict.select().where(**temp).order_by(desc(Dict.create_at)).page(data.get("pagenum", 1), data.get("pagesize", 10))
        count = Dict.select().where(**temp).count()

        if result:
            temp = []
            for item in result:
                t = item.to_dict(with_collections=True, related_objects=True,)
                t.update({
                    "create_at": item.create_at.strftime("%Y-%m-%d %H:%M:%S"),
                    "update_at": item.update_at.strftime("%Y-%m-%d %H:%M:%S"),
                    "create_by": item.create_by.to_dict(only=["uuid", "username"]),
                    "update_by": item.update_by.to_dict(only=["uuid", "username"])
                })
                temp.append(t)
            result = temp

        return result, count, "get dictionary {}.".format("success" if result else "fail")

    def update(self, uuid, data):
        # 当参数为空时,直接返回错误
        if len(data) <= 0 or (len(data.keys()) == 1 and "id" in data):
            return False, "parameters can not be null."

        # 查询请求者是否存在
        editor = User.get(id=request.current_user.get("id"))
        if not editor:
            return False, "current user is not exists"

        result = fullStackDB.update(Dict, { 'uuid': uuid }, update_at=datetime.now(), update_by=editor, **data)
        return result, "update dictionary {}.".format("success" if result else "fail")

dictManager = DictManager()