#!/usr/bin/env python # -*- coding: utf_8 -*- import json import uuid from app.setting import config from datetime import datetime from sqlalchemy import create_engine from sqlalchemy import func, Column, Integer, String, DateTime, Boolean from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, class_mapper engine = create_engine('sqlite:///{}?check_same_thread=False'.format(config.get("DATABASE")), echo=False) Base = declarative_base() def get_current_datetime(): return datetime.now() def gen_id(): return uuid.uuid4().hex def object_to_dict(obj): columns = [column.key for column in class_mapper(obj.__class__).columns] get_key_value = lambda c: (c, getattr(obj, c).isoformat()) if isinstance(getattr(obj, c), datetime) else (c, getattr(obj, c)) return dict(map(get_key_value, columns)) class WatchTest(Base): __tablename__ = 'monitor_watch_test' id = Column(Integer, primary_key=True, autoincrement=True) imei = Column(String) create_at = Column(DateTime(timezone=True), default=get_current_datetime, server_default=func.now(), onupdate=func.now()) class Watch(Base): __tablename__ = 'monitor_watch' id = Column(Integer, primary_key=True, autoincrement=True) imei = Column(String) create_at = Column(DateTime(timezone=True), default=get_current_datetime, server_default=func.now(), onupdate=func.now()) def to_dict(self): return {c.name: getattr(self, c.name) for c in self.__table__.columns} class Request(Base): __tablename__ = 'monitor_request' id = Column(Integer, primary_key=True, autoincrement=True) watch = Column(Integer) # 手表ID host = Column(String) path = Column(String) protocol = Column(String) create_at = Column(DateTime(timezone=True), default=get_current_datetime, server_default=func.now(), onupdate=func.now()) def to_dict(self): return {c.name: getattr(self, c.name) for c in self.__table__.columns} class System(Base): __tablename__ = 'monitor_system' id = Column(Integer, primary_key=True, autoincrement=True) watch = Column(Integer) # 手表ID free_size = Column(Integer) # 单位:字节 timestamp = Column(DateTime(timezone=True), default=get_current_datetime, server_default=func.now(), onupdate=func.now()) def to_dict(self): return {c.name: getattr(self, c.name) for c in self.__table__.columns} class Lvgl(Base): __tablename__ = 'monitor_lvgl' id = Column(Integer, primary_key=True, autoincrement=True) watch = Column(Integer) # 手表ID total_size = Column(Integer) # 单位:字节 free_cnt = Column(Integer) free_size = Column(Integer) free_biggest_size = Column(Integer) used_cnt = Column(Integer) used_pct = Column(Integer) frag_pct = Column(Integer) timestamp = Column(DateTime(timezone=True), default=get_current_datetime, server_default=func.now(), onupdate=func.now()) def to_dict(self): return {c.name: getattr(self, c.name) for c in self.__table__.columns} class Evm(Base): __tablename__ = 'monitor_evm' id = Column(Integer, primary_key=True, autoincrement=True) watch = Column(Integer) # 手表ID # total_size = Column(Integer) # 单位:字节 # free_size = Column(Integer) # gc_usage = Column(Integer) heap_map_size = Column(Integer) heap_total_size = Column(Integer) heap_used_size = Column(Integer) stack_total_size = Column(Integer) stack_used_size = Column(Integer) timestamp = Column(DateTime(timezone=True), default=get_current_datetime, server_default=func.now(), onupdate=func.now()) def to_dict(self): # def convert_datetime(value): # if value: # return value.strftime("%Y-%m-%d %H:%M:%S") # else: # return "" # for col in self.__table__.columns: # if isinstance(col.type, DateTime): # value = convert_datetime(getattr(self, col.name)) # elif isinstance(col.type, Numeric): # value = float(getattr(self, col.name)) # else: # value = getattr(self, col.name) # yield {col.name: value} return {c.name: getattr(self, c.name) for c in self.__table__.columns} def to_json(self): d = dict(self.__todict__()) return json.dumps(d) class Image(Base): __tablename__ = 'monitor_image' id = Column(Integer, primary_key=True, autoincrement=True) watch = Column(Integer) # 手表ID uri = Column(String(50)) length = Column(Integer) png_uncompressed_size = Column(Integer) png_total_count = Column(Integer) png_file_size = Column(Integer) timestamp = Column(DateTime(timezone=True), default=get_current_datetime, server_default=func.now(), onupdate=func.now()) def to_dict(self): return {c.name: getattr(self, c.name) for c in self.__table__.columns} class User(Base): __tablename__ = "{}".format(config['TABLE_PREFIX']) + "user" id = Column(Integer, primary_key=True, autoincrement=True) uuid = Column(String(64), default=gen_id, index=True) account = Column(String(256)) class Device(Base): __tablename__ = "{}".format(config['TABLE_PREFIX']) + "device" id = Column(Integer, primary_key=True, autoincrement=True) uuid = Column(String(64), default=gen_id, index=True) name = Column(String, default="") imei = Column(String) desc = Column(String, default="") type = Column(String, default="watch") create_at = Column(DateTime(timezone=True), default=get_current_datetime, onupdate=func.now()) create_by = Column(Integer, default=None, nullable=True) update_at = Column(DateTime(timezone=True), default=get_current_datetime, onupdate=func.now()) update_by = Column(Integer, default=None) delete_at = Column(DateTime(timezone=True), default=get_current_datetime, onupdate=func.now()) delete_by = Column(Integer, default=None) is_delete = Column(Boolean, default=False) sort = Column(Integer, default=1) remarks = Column(String, default="") def to_dict(self): return {c.name: getattr(self, c.name) for c in self.__table__.columns} Base.metadata.create_all(engine, checkfirst=True) # engine是2.2中创建的连接 Session = sessionmaker(bind=engine) # 创建Session类实例 session = Session()