monitor.py 14.4 KB
Newer Older
1 2 3
'''
Author: your name
Date: 2021-06-29 19:33:41
wanli's avatar
wanli committed
4
LastEditTime: 2021-07-24 01:15:03
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
LastEditors: Please set LastEditors
Description: In User Settings Edit
FilePath: \evm-store\backend\view\monitor.py
'''
import json
import signal
import time
import pprint
import traceback
import jwt
from typing import ( Any,)
from threading import Timer
from datetime import datetime, timedelta
import tornado.ioloop
import tornado.web
from tornado import httputil
from tornado.web import RequestHandler, StaticFileHandler
from tornado.websocket import WebSocketHandler, WebSocketClosedError
from application.app import config
from webcreator.utils import ObjectDict
from webcreator.log import logger
from models.device import DeviceModel
from models.user import UserModel
from controllers.monitor import insert_data, get_monitor_list, get_watch_list

def datetime2secs(mydate):
    return time.mktime(mydate.timetuple())

def secs2datetime(ts):
    return datetime.fromtimestamp(ts)

class GracefulExit(SystemExit):
    code = 1

def raise_graceful_exit(*args):
    tornado.ioloop.IOLoop.current().stop()
    print("Gracefully shutdown", args)
    raise GracefulExit()

class BaseHandler(RequestHandler):
    """解决JS跨域请求问题"""
    def set_default_headers(self):
        self.set_header('Access-Control-Allow-Origin', '*')
        self.set_header('Access-Control-Allow-Methods', 'POST, GET')
        self.set_header('Access-Control-Max-Age', 1000)
        self.set_header('Access-Control-Allow-Headers', '*')
        self.set_header('Content-type', 'application/json')

class WebsocketResponse(ObjectDict):
    def __init__(self, type="Response", api_code=-1, message='fail', data=None, traceback=""):
        super(WebsocketResponse, self).__init__()
        self.type = type
        self.code = api_code
        self.message = message
        self.data = data
        self.traceback = traceback
        if isinstance(self.data, list):
            self.count = len(self.data)

def pushmessage(func):
    def send(*agrs, **kwargs):
        self = agrs[0]
        ret = func(*agrs, **kwargs)
        if ret:
            msg, binary = ret
            try:
                if isinstance(msg, WebsocketResponse) or isinstance(msg, dict):
                    self.write_message(json.dumps(msg), binary)
                elif isinstance(msg, str):
                    self.write_message(msg, binary)
                else:
                    self.write_message(repr(msg), binary)
            except WebSocketClosedError as e:
                logger.error(e)
                traceback.print_exc()
                self.on_close()
    return send

class BaseWebsocket(WebSocketHandler):
    _clients = []
    handlers = {}
    
    def open(self):
        # 新加入一个连接后,设置一个接收消息时间戳
        # 同时查询改用可以查看哪些设备
        # 当有消息到来时,遍历用户列表,将设备IMEI与每个用户的IMEI列表比对
        # 如果用户列表有这个IMEI,则需要向该用户广播消息

        # 列表 [里面存放字典]
        # 字典 { 'uuid': '', 'context': self, 'devices': ['imei', 'imei'], 'ts': '接收消息时间戳' }

        className = self.__class__.__name__
        logger.warning("websocket of %s is opened" % className)
        if className not in self.handlers:
            self.handlers[className] = set()
        self.handlers[className].add(self)
        logger.info(self.handlers[className])
        # pprint.pprint(self.handlers)

    @pushmessage
    def send(self, message, binary=False):
        return message, binary

    def on_close(self):
        className = self.__class__.__name__
        logger.warning("websocket of %s is closed" % className)
        if className in self.handlers and self in self.handlers[className]:
            # 更加健壮的处理是,这里需要增加一个self是否存在的判断
            self.handlers[className].remove(self)
            for i, c in enumerate(self._clients):
                if id(self) == id(c.get("context")):
                    del self._clients[i]

    def check_origin(self, origin):
        logger.info(origin)
        return True

    @classmethod
    def broadcastMessage(cls, message):
wanli's avatar
wanli committed
124
        pprint.pprint(message)
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
        print("=======>", cls._clients)

        if not message.get("imei"):
            return False

        for item in cls._clients:
            if message.get("imei") in item.get("devices", []):
                item.get("context").write_message(json.dumps(message))

        # className = cls.__name__
        # message = json.dumps(message)
        # if className in cls.handlers:
        #     for handler in cls.handlers[className]:
        #         # 判断用户能否查看该设备
        #         handler.send(message, binary)

class NotifyHandler(BaseWebsocket):
    """
        建立与web前端的通信连接,发送状态信息报文
    """
    _timer = None
    def __init__(self, application: tornado.web.Application, request: httputil.HTTPServerRequest, **kwargs: Any) -> None:
        super().__init__(application, request, **kwargs)
        self.on_heartbeat()

    def open(self):
        super(NotifyHandler, self).open()

    def on_message(self, message):
        try:
            className = self.__class__.__name__
            message = json.loads(message)
wanli's avatar
wanli committed
157
            logger.info(message)
158
            # 判断消息类型
wanli's avatar
wanli committed
159
            if message.get("type") and message.get("token"):
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
                # 获取token值,检验正确与否,获取uuid
                payload = jwt.decode(message.get("token"), config.JWT_SECRET_KEY, verify=True, algorithms=['HS256'])
                logger.info(payload)

                # 认证包,认证不通过,则剔除该连接
                if message.get("type") == "auth":
                    if not message.get("token"):
                        self.write_message(json.dumps({ "code": 400, "data": None, "msg": "token can not be null" }))
                        return

                    user = UserModel.query.filter(UserModel.uuid==payload.get("sub").get("uuid")).one_or_none()
                    if not user:
                        self.write_message(json.dumps({ "code": 400, "data": None, "msg": "user not exists" }))
                        # self.close()
                        return

176 177
                    logger.info(user.to_dict())

178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
                    # 查询该用户可查看设备
                    devices = DeviceModel.query.filter(DeviceModel.create_by==user.id).all()
                    if len(devices):
                        self._clients.append({
                            'uuid': payload.get("sub").get("uuid"),
                            'context': self,
                            'devices': list(map(lambda d:d.imei, devices)),
                            'ts': int(time.time())
                        })
                        self.write_message(json.dumps({ 'code': 200, 'data': None, 'msg': 'auth passed' }))
                    else: # 没有设备,是否断开连接
                        self.write_message(json.dumps({ 'code': 400, 'data': None, 'msg': 'no devices' }))
                        # self.close()
                elif message.get("type") == "heartbeat": # 心跳包
                    # 收到心跳包消息,更新接收数据时间
wanli's avatar
wanli committed
193
                    logger.info("////////////////////////")
194 195 196 197 198 199 200
                    for c in self._clients:
                        if c.get("uuid") == payload.get("sub").get("uuid"):
                            c["ts"] = int(time.time())
                            self.write_message(json.dumps({ 'code': 200, 'data': None, 'msg': 'update session timestamp success' }))
                else:
                    self.write_message(json.dumps({ 'code': 200, 'data': None, 'msg': 'unkonw message packet, disconnect by server' }))
                    self.handlers[className].remove(self)
wanli's avatar
wanli committed
201 202 203
        except jwt.exceptions.ExpiredSignatureError as e:
            logger.error(e)
            self.write_message(json.dumps({ 'code': 401, 'data': None, 'msg': 'auth fail' }))
204 205 206 207 208 209 210 211 212 213
        except Exception as e:
            # 认证失败会导致触发异常,这里不能remove(self),否则会导致on_close方法报错
            self.write_message(json.dumps({ 'code': 400, 'data': e.args, 'msg': "server error" }))
            logger.error(e)
            traceback.print_exc()
        logger.info(message)

    def on_heartbeat(self):
        # 心跳定时器,固定间隔扫描连接列表,当连接超时,主动剔除该连接
        for i in range(len(self._clients) - 1, -1, -1):
wanli's avatar
wanli committed
214 215
            if int(time.time()) - self._clients[i].get("ts") > 30:
                logger.info("################################################")
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
                # self._clients.pop(i)
                del self._clients[i]
                className = self.__class__.__name__
                if self.handlers.get(className, None) and self in self.handlers[className]:
                    self.handlers[className].remove(self)

        self._timer = Timer(1, self.on_heartbeat)
        self._timer.start()

class MainHandler(BaseHandler):
    def get(self, *args, **kwargs):
        print("#############", args)
        print("/////////////", kwargs)
        print(self.request.path)    # 请求路径
        print(self.request.method)  # 请求方法
        print(self.request.host)    # IP地址
        print(self.request.protocol)
        # self.get_query_argument('a', value)
        # self.get_body_argument()
        # self.request.files
        self.write(json.dumps({ "msg": "Hello, world" }))

    def post(self):
wanli's avatar
wanli committed
239 240
        if not self.request.body:
            return None
241

wanli's avatar
wanli committed
242 243 244 245 246 247 248 249 250 251 252 253 254
        try:
            data = tornado.escape.json_decode(self.request.body)
            logger.info(data)
            self.write(json.dumps({ 'code': 100, 'data': data, 'msg': 'success' }))

            message = {'imei': '12345678900005', 'type': 'report', 'system': {'free_size': 0}, 'lvgl': {'total_size': 5242880, 'free_cnt': 31, 'free_size': 1279664, 'free_biggest_size': 1205448, 'used_cnt': 832, 'used_pct': 76, 'frag_pct': 6}, 'evm': {'total_size': 2097152, 'free_size': 0, 'gc_usage': 50}, 'image': [{'uri': 'evue_launcher', 'length': 1043, 'png_total_count': 0, 'png_uncompressed_size': 0, 'png_file_size': 0}, {'uri': 'kdgs_1_storyList', 'length': 9608, 'png_total_count': 193, 'png_uncompressed_size': 370884, 'png_file_size': 209807}]}
            insert_data(message)
            # 这里不能使用广播,得点对点发送,有此设备的账号才能看到调试信息
            NotifyHandler.broadcastMessage(message)
        except Exception as e:
            logger.info(self.request.body)
            logger.error(e)
            traceback.print_exc()
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313

class WatchHandler(BaseHandler):
    def get(self, *args, **kwargs):
        # 获取手表列表
        print("#############", args)
        print("/////////////", kwargs)
        print(self.request.path)    # 请求路径
        print(self.request.method)  # 请求方法
        print(self.request.host)    # IP地址
        print(self.request.protocol)

        try:
            result = get_watch_list()
            if result:
                self.write(json.dumps({ 'code': 200, 'data': result, 'msg': 'success' }))
            else:
                self.write(json.dumps({ 'code': 204, 'data': None, 'msg': 'no data' }))
        except Exception as e:
            logger.error(e)
            self.write(json.dumps({ 'code': 500, 'data': None, 'msg': 'server error' }))

    def post(self):
        data = tornado.escape.json_decode(self.request.body)
        self.write(json.dumps({ 'code': 100, 'data': data, 'msg': 'success' }))

class DeviceMessageHandler(BaseHandler):
    def get(self):
        if not self.get_argument('watch', None):
            self.write(json.dumps({ 'code': 400, 'msg': 'params error, watch can not be null' }))
            return

        try:
            watch = self.get_query_argument('watch')
            category = self.get_query_argument('category', 'all')
            start = self.get_query_argument('start', None)
            end = self.get_query_argument('end', None)
            if start and start.isdigit():
                start = int(start)
                start = time.localtime(start)
                start = time.strftime("%Y-%m-%d %H:%M:%S", start)
            else:
                start = (datetime.now()-timedelta(minutes=10)).strftime("%Y-%m-%d %H:%M:%S")

            if end and end.isdigit():
                end = time.localtime(int(end))
                end = time.strftime("%Y-%m-%d %H:%M:%S", end)

            result = get_monitor_list(int(watch), category, start, end)
            if result:
                self.write(json.dumps({ 'code': 200, 'data': result, 'msg': 'success', 'type': 'array' if isinstance(result, list) else 'object' }))
            else:
                self.write(json.dumps({ 'code': 204, 'data': None, 'msg': 'no data' }))
        except Exception as e:
            logger.error(e)
            traceback.print_exc()
            self.write(json.dumps({ 'code': 500, 'data': None, 'msg': 'server error' }))

    def post(self):
        if not self.request.body:
wanli's avatar
wanli committed
314 315
            logger.info(self.request.body)
            return None
316

wanli's avatar
wanli committed
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
        try:
            # data = tornado.escape.json_decode(self.request.body)
            data = str(self.request.body, encoding="ISO-8859-1")
            data = json.loads(data)
            logger.info(data)

            data['system'].update({ 
                'host': self.request.remote_ip,
                'path': self.request.path,
                'protocol': self.request.protocol
            })

            result = insert_data(data)

            data['type'] = 'report'
            data['system'].update({ 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S") })
            NotifyHandler.broadcastMessage(data)
            self.write(json.dumps({ 'code': 100 if result else 400, 'message': 'success' if result else "fail" }))
        except Exception as e:
            logger.info(self.request.body)
            logger.error(e)
            traceback.print_exc()
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
        (r"/api/v1/evm_store/monitor", DeviceMessageHandler),
        (r"/api/v1/evm_store/watch", WatchHandler),
        (r"/ws/v1/notify", NotifyHandler),
        (r"/dist/(.*)", StaticFileHandler, { "path": "dist" }),
    ])

if __name__ == "__main__":
    app = make_app()
    app.listen(5001)

    signal.signal(signal.SIGINT, raise_graceful_exit)
    signal.signal(signal.SIGTERM, raise_graceful_exit)

    tornado.ioloop.IOLoop.current().start()