#!/usr/bin/env python # -*- coding: utf_8 -*- import os import copy import json import logging import urllib from pathlib import Path from urllib import parse, request from urllib.parse import urlparse from datetime import datetime from pony.orm import * from app.setting import config from model.download import AppDownload from model.device import Device from model.apps import Apps from model.user import User from utils import ThreadMaker logger = logging.getLogger(__name__) @ThreadMaker def update_download_information(ip, id): params = { 'ak': 'aZEAgYG8wKuLd6DS9BmCloGtfnGGkRMn', 'coor': 'bd09ll' } parameters = urllib.parse.urlencode(params) headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko' } if ip != '127.0.0.1': params.update({'ip': ip}) parameters = urllib.parse.urlencode(params) url = 'http://api.map.baidu.com/location/ip' req = request.Request(url='%s%s%s' % (url, '?', parameters), headers=headers) ret = request.urlopen(req).read() jsonData = json.loads(ret) logger.info(jsonData) if (0 != jsonData['status']): return None with db_session: down = AppDownload.get(id=id) if down: down.set(address=jsonData['address'], ip=ip, remarks=json.dumps(jsonData['content'], ensure_ascii=False)) commit() def convert_url_to_local_path(url): parsed_result = urlparse(url) target_file = os.sep.join([config.get("UPLOAD_PATH").replace('\\', '/'), parsed_result.path]) target_file = os.path.normpath(target_file) return target_file class DownloadManager(object): def __init__(self): super(DownloadManager, self).__init__() def add(self, user, data): with db_session: app = Apps.get(id=data.get("app"), is_delete=False) if not app: return False, "app does not found" editor = User.get(id=user) if not editor: return False, "current user is not exists" data.update({ 'create_by': editor, 'create_at': datetime.now(), 'update_by': editor, 'update_at': datetime.now(), }) result = AppDownload(app=app, imei=data.get("imei"), **data) commit() return result, "add dictionary {}.".format("success" if result else "fail") def delete(self, user, uuid): with db_session: editor = User.get(id=user) if not editor: return False, "current user is not exists" result = AppDownload.get(uuid=uuid) if not result: return False, "dict does not exists" result.set(is_delete=True, delete_at=datetime.now(), delete_by=editor) commit() return result, "delete dictionary {}.".format("success" if result else "fail") def get(self, data): # 流程如下: # 获取前端传来的应用UUID和IMEI # 根据应用UUID查找应用信息,找不到返回失败信息 # 如果应用ID是evue_launcher,则进行如下处理 # 遍历系统所有应用,将相关应用根据4个一组拼接到一个数组里面去 # 最后将这些信息导出为一个JSON文件 # 将evue_launcher.evue和evue_dock.evue以及相关资源文件进行打包 # 以上文件全部是复制处理,最后根据格式生成文件夹,将epk文件写入到这里 # 读取这个epk文件,以encoding='utf8'格式返回字节流 # 否则就是普通应用 # 查找出这个应用路径以及依赖文件 # 将这些文件进行打包 # 读取这个epk文件,以encoding='utf8'格式返回字节流 # 此次下载将生成一次下载记录 # 当前还没有校验前端传来的IMEI是否是合法的 # 根据IMEI查找设备,根据设备查找用户,根据用户查找应用 with db_session: app = [] # 根据IMEI查找设备 # device = Device.select().where(imei=data.get("imei")).first() # logger.info(data) # if not device: # return False, "device not found" # if not device.create_by: # return False, "create user is null" if data.get("byId"): # 通过id下载应用 app = Apps.get(uuid=data.get("uuid")) if not app: return False, "app not found" else: app = Apps.select(app_name=data.get("name")).order_by(desc(Apps.create_at)) if len(app) > 1: app = Apps.select(lambda p: p.app_name == data.get("name") and p.create_by.account == 'evm').order_by(desc(Apps.create_at)).first() if not app: return False, "app not found" epk_path = "" if app.app_build_log: epk_path = os.path.normpath(os.sep.join([config.get("UPLOAD_PATH"), app.app_build_log.app_path])) epk_path = Path(epk_path) if not epk_path.exists(): return False, "epk file not found" down = AppDownload(app=app, imei=data.get("imei")) commit() if down: update_download_information(data.get('real_ip', '127.0.0.1'), down.id) return epk_path, "get dictionary {}.".format("success" if epk_path else "no data") def getList(self, user, data): if not data or len(data) <= 0: return False, 0, "parameters can not be null." temp = copy.deepcopy(data) if 'pagenum' in temp: temp.pop('pagenum') if 'pagesize' in temp: temp.pop('pagesize') if 'scope_type' in temp: temp.pop('scope_type') temp.setdefault("is_delete", False) with db_session: if "scope_type" in data and data.get("scope_type") == "list": result = AppDownload.select().where(**temp).order_by(AppDownload.sort) temp = [] for item in result: temp.append(item.to_dict(only=["uuid"])) return temp, len(temp), "get dictionary list {}.".format("success" if temp else "no data") result = AppDownload.select().where(**temp).order_by(desc(AppDownload.download_at)).page(data.get("pagenum", 1), data.get("pagesize", 10)) count = AppDownload.select().where(**temp).count() if result: temp = [] for item in result: t = item.to_dict(with_collections=True, related_objects=True) t.update({ "app": item.app.to_dict(exclude=["create_by", "update_by", "delete_by", "is_delete"]), "download_at": item.download_at.strftime("%Y-%m-%d %H:%M:%S") }) temp.append(t) result = temp return result, count, "get dictionary {}.".format("success" if result else "fail") def update(self, user, uuid, data): # 当参数为空时,直接返回错误 if len(data) <= 0 or (len(data.keys()) == 1 and "id" in data): return False, "parameters can not be null." with db_session: # 查询请求者是否存在 editor = User.get(id=user) if not editor: return False, "current user is not exists" result = AppDownload.get(uuid=uuid) if not result: return False, "app does not found" result.set(update_at=datetime.now(), update_by=editor, **data) commit() return result, "update dictionary {}.".format("success" if result else "fail") downloadManager = DownloadManager()