webscoket.py 3.25 KB
Newer Older
wanli's avatar
wanli committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
#!/usr/bin/env python
# -*- coding: utf_8 -*-
import websocket
from flask import json
from tornado.websocket import WebSocketHandler, WebSocketClosedError
from utils import ObjectDict
from .log import logger


class WebsocketResponse(ObjectDict):
    def __init__(self, type="Response", api_code=-1, message='fail', data=None, traceback=""):
        super(WebsocketResponse, self).__init__()
        self.type = type
        self.code = api_code
        self.message = message
        self.data = data
        self.traceback = traceback
        if isinstance(self.data, list):
            self.count = len(self.data)


def pushmessage(func):
    def send(*agrs, **kwargs):
        self = agrs[0]
        ret = func(*agrs, **kwargs)
        if ret:
            msg, binary = ret
            try:
                if isinstance(msg, WebsocketResponse) or isinstance(msg, dict):
                    self.write_message(json.dumps(msg), binary)
                elif isinstance(msg, str) or isinstance(msg, str):
                    self.write_message(msg, binary)
                else:
                    self.write_message(repr(msg), binary)
            except WebSocketClosedError as e:
                logger.error(e)
                self.on_close()
    return send

class BaseWebsocket(WebSocketHandler):

    handlers = {}

    def open(self):
        logger.warn("websocket of %s is opened", repr(self))
        className = self.__class__.__name__
        if className not in self.handlers:
            self.handlers[className] = set()
        self.handlers[className].add(self)

    @pushmessage
    def send(self, message, binary=False):
        return message, binary

    def on_close(self):
        logger.warn("websocket of %s is closed", repr(self))
        className = self.__class__.__name__
        if className in self.handlers:
            self.handlers[className].remove(self)

    def check_origin(self, origin):
        return True

    @classmethod
    def boardcastMessage(cls, message, binary=False):
        className = cls.__name__
        if className in cls.handlers:
            for handler in cls.handlers[className]:
                handler.send(message, binary)


class NotifyHandler(BaseWebsocket):
    """
        建立与web前端的通信连接,发送状态信息报文
    """

    def open(self):
        super(NotifyHandler, self).open()

    def on_message(self, message):
        pass


class ThreadNotifyHandler(BaseWebsocket):
    """
        建立与tornado主线程与子线程之间的通信连接
    """
    def open(self):
        super(ThreadNotifyHandler, self).open()

    def on_message(self, message):
        NotifyHandler.boardcastMessage(message)


class Wsclient(object):
    """
        用于和ThreadNotifyHandler建立websocket连接的客户端
        使用方式:在子线程中建立到达threadnotify路由的websocket客户端如
        wsclient = Wsclient()
        wsclient.send(WebsocketResponse("dashboard", 0, "success", data=cache))
    """
    def __init__(self):
        super(Wsclient, self).__init__()
        self.wsclient = websocket.WebSocket()
        self.wsclient.connect("ws://localhost:7777/ws/api/v1/threadnotify")

    def send(self, msg):
        if isinstance(msg, dict):
            msg = json.dumps(msg)
        self.wsclient.send(msg)