#!/usr/bin/env python # -*- coding: utf_8 -*- import logging import traceback from datetime import datetime, timedelta from functools import wraps import jwt from flask import current_app, request logger = logging.getLogger(__name__) class Auth(object): def __init__(self): super(Auth, self).__init__() @staticmethod def auth_required(func): ''' api接口的登录鉴权装饰器 包括cookie和jwt_token鉴权 ''' @wraps(func) def inner(*args, **kwargs): if current_app.config.get('LOGIN_DISABLED'): return func(*args, **kwargs) auth = Auth() isSuccess, message = auth.identify(request) if isSuccess: return func(*args, **kwargs) return current_app.unauthorized(message) return inner @staticmethod def encode_auth_token(user_id, username, uuid): """ 生成认证Token :param user_id: int :param username: str :return: bytes """ payload = { 'iat': datetime.now(), 'exp': datetime.now() + timedelta(hours=2), # 两个小时后过期 'iss': 'bytecode', # token签发者 'aud': 'bytecode', # token接收者 'sub': user_id, # token所属者 'data': { # payload 'id': user_id, 'uuid': uuid, 'username': username } } token = jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256') if isinstance(token, bytes): # dict 转成json传输时, value中为byte类型会报错 token = str(token, 'utf8') return token @staticmethod def decode_auth_token(auth_token): """ 验证Token :param auth_token: :return: dict|string """ try: # result = re.match(r'[+-/*\w]+\.[+-/*\w]+\.[+/*-\w]+', auth_token, re.I) # if not result or not result.group(): # return False, "token invaild" payload = jwt.decode(auth_token, current_app.config['SECRET_KEY'], verify=True, algorithms=['HS256'], options={'require': ['exp', 'iss', 'sub', 'aud', 'iat', 'data']}, audience="bytecode") if payload: return True, payload else: return False, "payload invaild" # except jwt.ExpiredSignatureError: # return 'token expire' # except jwt.InvalidTokenError: # return 'token invaild' except Exception as e: traceback.print_exc() return False, str(e) def identify(self, request): """ 用户鉴权 :param request: flask的requst全局对象 :return: bool """ try: auth_token = request.headers.get('Authorization') if auth_token: isSuccess, result = self.decode_auth_token(auth_token) print(result) if isSuccess: if "data" in result and "id" in result["data"] and "username" in result["data"]: setattr(request, 'current_user', result['data']) return True, result['data'] return False, "data invaild" else: return False, result return False, "authorization token can not be none" except Exception as e: traceback.print_exc() logger.error(str(e)) return False, str(e)