''' Author: your name Date: 2021-06-29 19:33:41 LastEditTime: 2021-07-05 18:56:38 LastEditors: Please set LastEditors Description: In User Settings Edit FilePath: \evm-store\backend\view\monitor.py ''' import tornado.ioloop import tornado.web from tornado import httputil from tornado.web import RequestHandler, StaticFileHandler from tornado.websocket import WebSocketHandler, WebSocketClosedError import json import signal import time import pprint import traceback import jwt from typing import ( Any,) from threading import Timer from datetime import datetime, timedelta from app import config from fullstack.log import logger from model.monitor import session, Device, User from controller.monitor import insert_data, get_monitor_list, get_watch_list def datetime2secs(mydate): return time.mktime(mydate.timetuple()) def secs2datetime(ts): return datetime.fromtimestamp(ts) class ObjectDict(dict): """ Makes a dictionary behave like an object, with attribute-style access. """ def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError(name) def __setattr__(self, name, value): self[name] = value class GracefulExit(SystemExit): code = 1 def raise_graceful_exit(*args): tornado.ioloop.IOLoop.current().stop() print("Gracefully shutdown", args) raise GracefulExit() class BaseHandler(RequestHandler): """解决JS跨域请求问题""" def set_default_headers(self): self.set_header('Access-Control-Allow-Origin', '*') self.set_header('Access-Control-Allow-Methods', 'POST, GET') self.set_header('Access-Control-Max-Age', 1000) self.set_header('Access-Control-Allow-Headers', '*') self.set_header('Content-type', 'application/json') class WebsocketResponse(ObjectDict): def __init__(self, type="Response", api_code=-1, message='fail', data=None, traceback=""): super(WebsocketResponse, self).__init__() self.type = type self.code = api_code self.message = message self.data = data self.traceback = traceback if isinstance(self.data, list): self.count = len(self.data) def pushmessage(func): def send(*agrs, **kwargs): self = agrs[0] ret = func(*agrs, **kwargs) if ret: msg, binary = ret try: if isinstance(msg, WebsocketResponse) or isinstance(msg, dict): self.write_message(json.dumps(msg), binary) elif isinstance(msg, str): self.write_message(msg, binary) else: self.write_message(repr(msg), binary) except WebSocketClosedError as e: logger.error(e) traceback.print_exc() self.on_close() return send class BaseWebsocket(WebSocketHandler): _clients = [] handlers = {} def open(self): # 新加入一个连接后,设置一个接收消息时间戳 # 同时查询改用可以查看哪些设备 # 当有消息到来时,遍历用户列表,将设备IMEI与每个用户的IMEI列表比对 # 如果用户列表有这个IMEI,则需要向该用户广播消息 # 列表 [里面存放字典] # 字典 { 'uuid': '', 'context': self, 'devices': ['imei', 'imei'], 'ts': '接收消息时间戳' } className = self.__class__.__name__ logger.warning("websocket of %s is opened" % className) if className not in self.handlers: self.handlers[className] = set() self.handlers[className].add(self) logger.info(self.handlers[className]) pprint.pprint(self.handlers) @pushmessage def send(self, message, binary=False): return message, binary def on_close(self): className = self.__class__.__name__ logger.warning("websocket of %s is closed" % className) if className in self.handlers and self in self.handlers[className]: # 更加健壮的处理是,这里需要增加一个self是否存在的判断 self.handlers[className].remove(self) for i, c in enumerate(self._clients): if id(self) == id(c.get("context")): del self._clients[i] def check_origin(self, origin): logger.info(origin) return True @classmethod def broadcastMessage(cls, message): # pprint.pprint(cls.handlers) pprint.pprint(message) print("=======>", cls._clients) if not message.get("imei"): return False for item in cls._clients: if message.get("imei") in item.get("devices", []): item.get("context").write_message(json.dumps(message)) # className = cls.__name__ # message = json.dumps(message) # if className in cls.handlers: # for handler in cls.handlers[className]: # # 判断用户能否查看该设备 # handler.send(message, binary) class NotifyHandler(BaseWebsocket): """ 建立与web前端的通信连接,发送状态信息报文 """ _timer = None def __init__(self, application: tornado.web.Application, request: httputil.HTTPServerRequest, **kwargs: Any) -> None: super().__init__(application, request, **kwargs) self.on_heartbeat() def open(self): super(NotifyHandler, self).open() def on_message(self, message): try: className = self.__class__.__name__ message = json.loads(message) # 判断消息类型 if message.get("type"): # 获取token值,检验正确与否,获取uuid payload = jwt.decode(message.get("token"), config['SECRET_KEY'], verify=True, algorithms=['HS256'], options={'require': ['exp', 'iss', 'sub', 'aud', 'iat', 'data']}, audience="bytecode") # 认证包,认证不通过,则剔除该连接 if message.get("type") == "auth": if not message.get("token"): self.write_message(json.dumps({ "code": 400, "data": None, "msg": "token can not be null" })) return user = session.query(User).filter(User.id == payload.get("data").get("id")).all() if not user: self.write_message(json.dumps({ "code": 400, "data": None, "msg": "user not exists" })) # self.close() return # 查询该用户可查看设备 devices = session.query(Device).filter(Device.create_by==payload.get("data").get("id")).all() if len(devices): self._clients.append({ 'uuid': payload.get("data").get("uuid"), 'context': self, 'devices': list(map(lambda d:d.imei, devices)), 'ts': int(time.time()) }) self.write_message(json.dumps({ 'code': 200, 'data': None, 'msg': 'auth passed' })) else: # 没有设备,是否断开连接 self.write_message(json.dumps({ 'code': 400, 'data': None, 'msg': 'no devices' })) # self.close() elif message.get("type") == "heartbeat": # 心跳包 # 收到心跳包消息,更新接收数据时间 for c in self._clients: if c.get("uuid") == payload.get("data").get("uuid"): c["ts"] = int(time.time()) self.write_message(json.dumps({ 'code': 200, 'data': None, 'msg': 'update session timestamp success' })) else: self.write_message(json.dumps({ 'code': 200, 'data': None, 'msg': 'unkonw message packet, disconnect by server' })) self.handlers[className].remove(self) except Exception as e: # 认证失败会导致触发异常,这里不能remove(self),否则会导致on_close方法报错 self.write_message(json.dumps({ 'code': 400, 'data': e.args, 'msg': "server error" })) logger.error(e) traceback.print_exc() logger.info(message) def on_heartbeat(self): # 心跳定时器,固定间隔扫描连接列表,当连接超时,主动剔除该连接 for i in range(len(self._clients) - 1, -1, -1): if int(time.time()) - self._clients[i].get("ts") > 5: # self._clients.pop(i) del self._clients[i] className = self.__class__.__name__ if self.handlers.get(className, None) and self in self.handlers[className]: self.handlers[className].remove(self) self._timer = Timer(1, self.on_heartbeat) self._timer.start() class MainHandler(BaseHandler): def get(self, *args, **kwargs): print("#############", args) print("/////////////", kwargs) print(self.request.path) # 请求路径 print(self.request.method) # 请求方法 print(self.request.host) # IP地址 print(self.request.protocol) # self.get_query_argument('a', value) # self.get_body_argument() # self.request.files self.write(json.dumps({ "msg": "Hello, world" })) def post(self): data = tornado.escape.json_decode(self.request.body) self.write(json.dumps({ 'code': 100, 'data': data, 'msg': 'success' })) message = {'imei': '12345678900005', 'type': 'report', 'system': {'free_size': 0}, 'lvgl': {'total_size': 5242880, 'free_cnt': 31, 'free_size': 1279664, 'free_biggest_size': 1205448, 'used_cnt': 832, 'used_pct': 76, 'frag_pct': 6}, 'evm': {'total_size': 2097152, 'free_size': 0, 'gc_usage': 50}, 'image': [{'uri': 'evue_launcher', 'length': 1043, 'png_total_count': 0, 'png_uncompressed_size': 0, 'png_file_size': 0}, {'uri': 'kdgs_1_storyList', 'length': 9608, 'png_total_count': 193, 'png_uncompressed_size': 370884, 'png_file_size': 209807}]} insert_data(message) # 这里不能使用广播,得点对点发送,有此设备的账号才能看到调试信息 NotifyHandler.broadcastMessage(message) class WatchHandler(BaseHandler): def get(self, *args, **kwargs): # 获取手表列表 print("#############", args) print("/////////////", kwargs) print(self.request.path) # 请求路径 print(self.request.method) # 请求方法 print(self.request.host) # IP地址 print(self.request.protocol) try: result = get_watch_list() if result: self.write(json.dumps({ 'code': 200, 'data': result, 'msg': 'success' })) else: self.write(json.dumps({ 'code': 204, 'data': None, 'msg': 'no data' })) except Exception as e: logger.error(e) self.write(json.dumps({ 'code': 500, 'data': None, 'msg': 'server error' })) def post(self): data = tornado.escape.json_decode(self.request.body) self.write(json.dumps({ 'code': 100, 'data': data, 'msg': 'success' })) class DeviceMessageHandler(BaseHandler): def get(self): if not self.get_argument('watch', None): self.write(json.dumps({ 'code': 400, 'msg': 'params error, watch can not be null' })) return try: watch = self.get_query_argument('watch') category = self.get_query_argument('category', 'all') start = self.get_query_argument('start', None) end = self.get_query_argument('end', None) if start and start.isdigit(): start = int(start) start = time.localtime(start) start = time.strftime("%Y-%m-%d %H:%M:%S", start) else: start = (datetime.now()-timedelta(minutes=10)).strftime("%Y-%m-%d %H:%M:%S") if end and end.isdigit(): end = time.localtime(int(end)) end = time.strftime("%Y-%m-%d %H:%M:%S", end) result = get_monitor_list(int(watch), category, start, end) if result: self.write(json.dumps({ 'code': 200, 'data': result, 'msg': 'success', 'type': 'array' if isinstance(result, list) else 'object' })) else: self.write(json.dumps({ 'code': 204, 'data': None, 'msg': 'no data' })) except Exception as e: logger.error(e) traceback.print_exc() self.write(json.dumps({ 'code': 500, 'data': None, 'msg': 'server error' })) def post(self): data = tornado.escape.json_decode(self.request.body) data.update({ 'request': { 'host': self.request.remote_ip, 'path': self.request.path, 'protocol': self.request.protocol } }) insert_data(data) data['type'] = 'report' data['request'].update({ 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) NotifyHandler.broadcastMessage(data) self.write(json.dumps({ 'code': 100, 'message': 'success' })) def make_app(): return tornado.web.Application([ (r"/", MainHandler), (r"/api/v1/evm_store/monitor", DeviceMessageHandler), (r"/api/v1/evm_store/watch", WatchHandler), (r"/ws/v1/notify", NotifyHandler), (r"/dist/(.*)", StaticFileHandler, { "path": "dist" }), ]) if __name__ == "__main__": app = make_app() app.listen(5001) signal.signal(signal.SIGINT, raise_graceful_exit) signal.signal(signal.SIGTERM, raise_graceful_exit) tornado.ioloop.IOLoop.current().start()