''' Author: your name Date: 2021-06-29 19:33:41 LastEditTime: 2021-08-07 18:06:46 LastEditors: Please set LastEditors Description: In User Settings Edit FilePath: \evm-store\backend\view\monitor.py ''' import json import signal import time import pprint import traceback import jwt from typing import ( Any,) from datetime import datetime, timedelta import tornado.ioloop import tornado.web from tornado import httputil from tornado.web import RequestHandler, StaticFileHandler from tornado.websocket import WebSocketHandler, WebSocketClosedError from fullstack.log import logger from app.signal_manager import signalManager class ObjectDict(dict): """ Makes a dictionary behave like an object, with attribute-style access. """ def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError(name) def __setattr__(self, name, value): self[name] = value class GracefulExit(SystemExit): code = 1 def raise_graceful_exit(*args): tornado.ioloop.IOLoop.current().stop() print("Gracefully shutdown", args) raise GracefulExit() class BaseHandler(RequestHandler): """解决JS跨域请求问题""" def set_default_headers(self): self.set_header('Access-Control-Allow-Origin', '*') self.set_header('Access-Control-Allow-Methods', 'POST, GET') self.set_header('Access-Control-Max-Age', 1000) self.set_header('Access-Control-Allow-Headers', '*') self.set_header('Content-type', 'application/json') class WebsocketResponse(ObjectDict): def __init__(self, type="Response", api_code=-1, message='fail', data=None, traceback=""): super(WebsocketResponse, self).__init__() self.type = type self.code = api_code self.message = message self.data = data self.traceback = traceback if isinstance(self.data, list): self.count = len(self.data) def pushmessage(func): def send(*agrs, **kwargs): self = agrs[0] ret = func(*agrs, **kwargs) if ret: msg, binary = ret try: if isinstance(msg, WebsocketResponse) or isinstance(msg, dict): self.write_message(json.dumps(msg), binary) elif isinstance(msg, str): self.write_message(msg, binary) else: self.write_message(repr(msg), binary) except WebSocketClosedError as e: logger.error(e) traceback.print_exc() self.on_close() return send class BaseWebsocket(WebSocketHandler): handlers = {} def check_origin(self, origin): logger.info(origin) return True def open(self): # 新加入一个连接后,设置一个接收消息时间戳 # 同时查询改用可以查看哪些设备 # 当有消息到来时,遍历用户列表,将设备IMEI与每个用户的IMEI列表比对 # 如果用户列表有这个IMEI,则需要向该用户广播消息 className = self.__class__.__name__ logger.warning("websocket of %s is opened" % className) if className not in self.handlers: self.handlers[className] = set() self.handlers[className].add(self) logger.info(self.handlers[className]) pprint.pprint(self.handlers) @pushmessage def send(self, message, binary=False): return message, binary def on_close(self): className = self.__class__.__name__ logger.warning("websocket of %s is closed" % className) if className in self.handlers and self in self.handlers[className]: # 更加健壮的处理是,这里需要增加一个self是否存在的判断 self.handlers[className].remove(self) @classmethod def broadcastMessage(cls, message): # pprint.pprint(cls.handlers) # pprint.pprint(message) className = cls.__name__ message = json.dumps(message) if className in cls.handlers: for handler in cls.handlers[className]: # 判断用户能否查看该设备 handler.send(message) # 建立与web前端的通信连接,发送状态信息报文 class NotifyHandler(BaseWebsocket): """ 保存uuid与self以及imei的映射关系 """ _clients = [] def __init__(self, application: tornado.web.Application, request: httputil.HTTPServerRequest, **kwargs: Any) -> None: super().__init__(application, request, **kwargs) def open(self): # 列表 [字典] [{ 'uuid': '', 'context': self, 'devices': ['imei', 'imei'], 'ts': '接收消息时间戳' }] # super(NotifyHandler, self).open() logger.info("new client connect......") def on_close(self): # super(NotifyHandler, self).on_close() index = signalManager.actionCheckClient.emit(self) if index >= 0: del self._clients[index] logger.info("client disconnect") def on_message(self, message): try: message = json.loads(message) message = signalManager.actionReceiveMessage.emit(self, message) if message: self.write_message(message) logger.info(self._clients) except jwt.exceptions.InvalidSignatureError as e: logger.error(e) self.write_message(json.dumps({ 'code': 401, 'data': None, 'msg': 'auth failed' })) except Exception as e: # 认证失败会导致触发异常,这里不能remove(self),否则会导致on_close方法报错 self.write_message(json.dumps({ 'code': 500, 'data': e.args, 'msg': "server error" })) logger.error(e) traceback.print_exc() logger.info(message) @classmethod def broadcastMessage(cls, message): pprint.pprint(cls.handlers) pprint.pprint(message) logger.info(cls._clients) signalManager.actionBroadcateMessage.emit(cls._clients, message) class DeviceMessageHandler(BaseHandler): def get(self): if not self.get_argument('watch', None): self.write(json.dumps({ 'code': 400, 'msg': 'params error, watch can not be null' })) return try: watch = self.get_query_argument('watch') category = self.get_query_argument('category', 'all') start = self.get_query_argument('start', None) end = self.get_query_argument('end', None) if start and start.isdigit(): start = int(start) start = time.localtime(start) start = time.strftime("%Y-%m-%d %H:%M:%S", start) else: start = (datetime.now()-timedelta(minutes=10)).strftime("%Y-%m-%d %H:%M:%S") if end and end.isdigit(): end = time.localtime(int(end)) end = time.strftime("%Y-%m-%d %H:%M:%S", end) data = { 'watch': int(watch), 'category': category, 'start': start, 'end': end } result = signal.actionGetMonitor.emit(data) if result: self.write(json.dumps({ 'code': 200, 'data': result, 'msg': 'success', 'type': 'array' if isinstance(result, list) else 'object' })) else: self.write(json.dumps({ 'code': 204, 'data': None, 'msg': 'no data' })) except Exception as e: logger.error(e) traceback.print_exc() self.write(json.dumps({ 'code': 500, 'data': None, 'msg': 'server error' })) def post(self): if not self.request.body: logger.info(self.request.body) return None data = self.request.body.decode("utf-8") # try: # data = tornado.escape.json_decode(self.request.body) # except: # data = str(self.request.body, encoding="ISO-8859-1") try: # data = tornado.escape.json_decode(self.request.body) # data = str(self.request.body, encoding="ISO-8859-1") data = json.loads(data) logger.info(data) data.get("system", {}).update({ 'host': self.request.remote_ip, 'path': self.request.path, 'protocol': self.request.protocol }) signalManager.actionPostMonitor.emit(data) data['type'] = 'report' data['system'].update({ 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) NotifyHandler.broadcastMessage(data) self.write(json.dumps({ 'code': 100, 'message': 'success' })) except Exception as e: traceback.print_exc() logger.info(self.request.body) logger.error(e) self.write(json.dumps({ 'code': 500, 'message': 'error' })) # ========================>调试接口 class MainHandler(BaseHandler): def get(self, *args, **kwargs): print(args) print(kwargs) print(self.request.path) # 请求路径 print(self.request.method) # 请求方法 print(self.request.host) # IP地址 print(self.request.protocol) # self.get_query_argument('a', value) # self.get_body_argument() # self.request.files self.write(json.dumps({ "msg": "Hello, world" })) def post(self): if not self.request.body: return None try: data = tornado.escape.json_decode(self.request.body) logger.info(data) self.write(json.dumps({ 'code': 100, 'data': data, 'msg': 'success' })) message = {'imei': '12345678900005', 'type': 'report', 'system': {'free_size': 0}, 'lvgl': {'total_size': 5242880, 'free_cnt': 31, 'free_size': 1279664, 'free_biggest_size': 1205448, 'used_cnt': 832, 'used_pct': 76, 'frag_pct': 6}, 'evm': {'total_size': 2097152, 'free_size': 0, 'gc_usage': 50}, 'image': [{'uri': 'evue_launcher', 'length': 1043, 'png_total_count': 0, 'png_uncompressed_size': 0, 'png_file_size': 0}, {'uri': 'kdgs_1_storyList', 'length': 9608, 'png_total_count': 193, 'png_uncompressed_size': 370884, 'png_file_size': 209807}]} signalManager.actionPostMonitor.emit(message) # 这里不能使用广播,得点对点发送,有此设备的账号才能看到调试信息 NotifyHandler.broadcastMessage(message) except Exception as e: logger.info(self.request.body) logger.error(e) traceback.print_exc() # ========================>调试接口 class WatchHandler(BaseHandler): def get(self, *args, **kwargs): # 获取手表列表 print(args) print(kwargs) print(self.request.path) # 请求路径 print(self.request.method) # 请求方法 print(self.request.host) # IP地址 print(self.request.protocol) try: self.write(json.dumps({ 'code': 204, 'data': None, 'msg': 'no data' })) except Exception as e: logger.error(e) self.write(json.dumps({ 'code': 500, 'data': None, 'msg': 'server error' })) def post(self): data = tornado.escape.json_decode(self.request.body) self.write(json.dumps({ 'code': 100, 'data': data, 'msg': 'success' })) def make_app(): return tornado.web.Application([ (r"/", MainHandler), (r"/api/v1/evm_store/monitor", DeviceMessageHandler), (r"/api/v1/evm_store/watch", WatchHandler), (r"/ws/v1/notify", NotifyHandler), (r"/dist/(.*)", StaticFileHandler, { "path": "dist" }), ]) if __name__ == "__main__": app = make_app() app.listen(5001) signal.signal(signal.SIGINT, raise_graceful_exit) signal.signal(signal.SIGTERM, raise_graceful_exit) tornado.ioloop.IOLoop.current().start()