''' Author: your name Date: 2021-07-12 11:14:48 LastEditTime: 2021-07-20 17:11:57 LastEditors: Please set LastEditors Description: In User Settings Edit FilePath: \evm-store\tools\build_out\controllers\appi.py ''' import os import re import json import shutil from pprint import pprint from pathlib import Path import urllib from datetime import datetime from urllib import parse, request from urllib.parse import urlparse from application.app import db, config from models.device import DeviceModel from models.annex import AnnexModel from models.app import AppModel from models.user import UserModel from models.package import PackageModel from webcreator.log import logger from webcreator.utils import ThreadMaker from webcreator.utils.epk import EpkApp from webcreator.response import ResponseCode @ThreadMaker def update_download_information(ip, id): params = { 'ak': 'aZEAgYG8wKuLd6DS9BmCloGtfnGGkRMn', 'coor': 'bd09ll' } parameters = urllib.parse.urlencode(params) headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko' } if ip != '127.0.0.1': params.update({'ip': ip}) parameters = urllib.parse.urlencode(params) url = 'http://api.map.baidu.com/location/ip' req = request.Request(url='%s%s%s' % (url, '?', parameters), headers=headers) ret = request.urlopen(req).read() jsonData = json.loads(ret) logger.info(jsonData) if (0 != jsonData['status']): return None pack = PackageModel.query.filter(PackageModel.id==id).one_or_none() if pack: pack.address=jsonData['address'] pack.ip=ip pack.remarks=json.dumps(jsonData['content'], ensure_ascii=False) db.session.commit() class BuildAppResource(object): def __init__(self): super().__init__() def post(self, params, files=[]): logger.info(params) user = UserModel.query.filter(UserModel.uuid==params.get("access_key")).one_or_none() if not user: return False, ResponseCode.USER_NOT_EXISTS print(user) params.update({ 'app_icon': params.get('app_icon', ''), 'create_by': user.id, 'create_at': datetime.now(), 'update_by': user.id, 'update_at': datetime.now(), }) props = dict() for p in params: if hasattr(AppModel, p): props[p] = params[p] # logger.info(props) # pprint(props) app = AppModel(**props) db.session.add(app) db.session.commit() dir_format = "{}-{}-{}".format(app.app_name, app.app_version, datetime.now().strftime("%Y%m%d%H%M%S")) target_dir = Path(config.EPK_DIR).joinpath(user.account).joinpath(dir_format) dest_dir = target_dir.joinpath("src") if not dest_dir.exists(): os.makedirs(dest_dir.resolve().as_posix()) for target_file in files: filename = os.path.basename(target_file) name, suffix = os.path.splitext(filename) name = re.sub(r"_\d{14}$", "", name) dst_file = dest_dir.joinpath(name + suffix) shutil.copy(os.path.normpath(target_file), dst_file.resolve().as_posix()) res = AnnexModel(app=app.id, title=filename, path=dst_file.relative_to(config.UPLOAD_ROOT_DIR).resolve().as_posix(), size=dst_file.stat().st_size, create_by=user.id, create_at=datetime.now(), update_by=user.id, update_at=datetime.now()) db.session.add(res) db.session.flush() db.session.commit() # 打包成EPK文件 app_info = {} params = { 'appName': app.app_name, 'appDir': dest_dir.resolve().as_posix(), 'appVersion': app.app_version, 'output': target_dir.resolve().as_posix() } if user.role == 1: params['algorithm'] = "h" epk = EpkApp(**params) app_info = epk.pack() app_info['md5'] = str(app_info['md5']) # 更新数据库对应文件路径 # 将文件拷贝过去后,需要重新更新数据库文件记录 epk_path = target_dir.relative_to(config.UPLOAD_ROOT_DIR).joinpath("{}.epk".format(app.app_name)).resolve().as_posix() result = PackageModel.query.filter(PackageModel.app==app.id).one_or_none() if result: result.app_path = epk_path result.app_info = app_info result.update_by = user result.update_at = datetime.now() db.session.commit() else: result = PackageModel(app=app.id, file_path=epk_path, package_info=json.dumps(app_info, ensure_ascii=False), app_version=params.get("app_version", '1.0.0'), source=2, create_by=user.id, create_at=datetime.now(), update_by=user.id, update_at=datetime.now()) db.session.add(result) db.session.commit() target_dir.joinpath("epk.json").write_text(json.dumps(app.to_dict(), ensure_ascii=False)) # with open(os.sep.join([target_dir, "epk.json"]), "w") as f: # json.dump(app.to_dict(), f) return { 'app_name': app.app_name, 'app_file': "{}.epk".format(app.app_name), 'app_url': epk_path }, ResponseCode.HTTP_SUCCESS buildAppResource = BuildAppResource() class AppReview(object): def __init__(self): super().__init__() def get(self, params, jwt={}): logger.info(params) user = UserModel.query.filter(UserModel.uuid==jwt.get("uuid")).one_or_none() if not user: return None, ResponseCode.USER_NOT_EXISTS app = AppModel.query.filter(AppModel.launcher=="yes", AppModel.create_by==user.id, AppModel.is_delete==False).one_or_none() if app: app.launcher = "no" db.session.commit() app = AppModel.query.filter(AppModel.uuid==params.get("app"), AppModel.create_by==user.id, AppModel.is_delete==False).one_or_none() if not app: return None, ResponseCode.APPLICATION_NOT_EXISTS app.launcher = "yes" app.update_at = datetime.now() app.update_by = user.id db.session.commit() return True, ResponseCode.HTTP_SUCCESS def getApp(self, params, jwt={}): app = AppModel.query.filter(AppModel.uuid==params.get("uuid"), AppModel.is_delete==False).one_or_none() if not app: return None, ResponseCode.APPLICATION_NOT_EXISTS return app, ResponseCode.HTTP_SUCCESS def checkAppVersion(self, params, jwt={}): app = AppModel.query.filter(AppModel.uuid==params.get("uuid"), AppModel.is_delete==False).one_or_none() if not app: return None, ResponseCode.APPLICATION_NOT_EXISTS pack = PackageModel.query.filter(PackageModel.app==app.id, PackageModel.is_delete==False).order_by(PackageModel.create_at.desc()).one_or_none() if not pack: return None, ResponseCode.HTTP_NOT_FOUND if pack != params.get("app_version"): return True, ResponseCode.HTTP_SUCCESS else: return False, ResponseCode.HTTP_SUCCESS def getLauncher(self, params, jwt={}): device = DeviceModel.query.filter(DeviceModel.imei==params.get("imei"), DeviceModel.is_delete==False).one_or_none() if not device: return None, ResponseCode.DEVICE_NOT_EXISTS app = AppModel.query.filter(AppModel.launcher=="yes", AppModel.create_by==device.create_by, AppModel.is_delete==False).one_or_none() if not app: return None, ResponseCode.APPLICATION_NOT_EXISTS # 根据app找到EPK文件,读取出二进制数据,返回 target_file = Path(config.UPLOAD_ROOT_DIR).joinpath(app.download_url) if not target_file.exists(): return None, ResponseCode.APPLICATION_NOT_EXISTS return target_file, ResponseCode.HTTP_SUCCESS def getAppList(self, params, jwt={}): filters = [AppModel.is_delete==False] logger.info(params) if params.get("review"): filters.append(AppModel.app_review==params.get("review")) if params.get("category"): filters.append(AppModel.category==params.get("category")) apps = AppModel.query.filter(*filters).all() if not apps: return None, ResponseCode.APPLICATION_NOT_EXISTS return apps, ResponseCode.HTTP_SUCCESS def post(self, app, review, jwt={}): user = UserModel.query.filter(UserModel.uuid==jwt.get("uuid")).one_or_none() if not user: return None, ResponseCode.USER_NOT_EXISTS app = AppModel.query.filter(AppModel.uuid==app).one_or_none() if not app: return None, ResponseCode.APPLICATION_NOT_EXISTS app.app_review = review app.update_at = datetime.now() app.update_by = user.id db.session.commit() return True, ResponseCode.HTTP_SUCCESS def getDownloadFile(self, params, jwt={}): # 流程如下: # 获取前端传来的应用UUID和IMEI # 根据应用UUID查找应用信息,找不到返回失败信息 # 如果应用ID是evue_launcher,则进行如下处理 # 遍历系统所有应用,将相关应用根据4个一组拼接到一个数组里面去 # 最后将这些信息导出为一个JSON文件 # 将evue_launcher.evue和evue_dock.evue以及相关资源文件进行打包 # 以上文件全部是复制处理,最后根据格式生成文件夹,将epk文件写入到这里 # 读取这个epk文件,以encoding='utf8'格式返回字节流 # 否则就是普通应用 # 查找出这个应用路径以及依赖文件 # 将这些文件进行打包 # 读取这个epk文件,以encoding='utf8'格式返回字节流 # 此次下载将生成一次下载记录 # 当前还没有校验前端传来的IMEI是否是合法的 # 根据IMEI查找设备,根据设备查找用户,根据用户查找应用 app = [] # 根据IMEI查找设备 # device = Device.select().where(imei=data.get("imei")).first() # logger.info(data) # if not device: # return False, "device not found" # if not device.create_by: # return False, "create user is null" if params.get("byId"): # 通过id下载应用 app = AppModel.query.filter(AppModel.uuid==params.get("uuid")).one_or_none() if not app: return False, ResponseCode.APPLICATION_NOT_EXISTS else: apps = AppModel.query.filter(AppModel.app_name==params.get("name")).order_by(AppModel.create_at.desc()).all() if len(apps) > 1: app = AppModel.query.filter(AppModel.app_name == params.get("name")).order_by(AppModel.create_at.desc()).one_or_none() if not app: return False, ResponseCode.APPLICATION_NOT_EXISTS epk_path = Path(config.UPLOAD_ROOT_DIR).joinpath(app.download_url) if not epk_path.exists(): return False, ResponseCode.HTTP_NOT_FOUND # pack = PackageModel(app=app.id, app_version=app.app_version, imei=params.get("imei")) # db.session.add(pack) # db.session.commit() # if pack: # update_download_information(params.get('real_ip', '127.0.0.1'), pack.id) return epk_path, ResponseCode.HTTP_NOT_FOUND appReview = AppReview()