#!/usr/bin/env python # -*- coding: utf_8 -*- import os import json import shutil import subprocess import traceback import uuid import time import zipfile import chardet from pathlib import Path from datetime import datetime from flask import Blueprint, request, json from werkzeug.utils import secure_filename from app import config, signalManager from fullstack.log import logger from fullstack.login import Auth from fullstack.response import ResponseCode, response_result from fullstack.validation import validate_schema from schema.api import UpdatePasswordSchema, ApplicationBuildSchema, ConvertString, GenerateFont import sys sys.path.append("..") from utils import vbuild from utils.evuefonttool import EvueFontTool from utils.mkromfs import mkromfs sys.path.append("../..") from evm_module_tools.code import generate_from_list, compress_files from evm_module_tools.cpp_header_parse import parse_header_file, update_mark_type, get_mark_type_list api = Blueprint("api", __name__, url_prefix="/api/v1/%s" % config['NAME']) logger.info("/api/v1/%s" % config['NAME']) # 获取文件编码类型 def get_encoding(file: str): # 二进制方式读取,获取字节数据,检测类型 with open(file, 'rb') as f: data = f.read() return chardet.detect(data)['encoding'] def get_program_lang(file: Path): suffix = file.suffix lang = { 'c': [".c", "cpp", ".h", ".hpp"], "javascript": [".js"], "json": [".json"], "python": [".py"] } l = "c" for key in lang: if suffix in lang[key]: l = key break return l @api.route("/system/getFileContent", methods=["POST"]) def get_file_content(): params = request.json if not params or not params.get("file", None): return response_result(ResponseCode.REQUEST_ERROR, msg="parameter file is null") # 获取文件路径 fpath = Path(config.get("UPLOAD_PATH")).joinpath(params["file"]) # 判断文件是否存在 if not fpath.exists(): return response_result(ResponseCode.REQUEST_ERROR, msg="file not found") # 存在则读取文件内容 encode_type = get_encoding(fpath.resolve().as_posix()) result = "" with open(fpath.resolve().as_posix(), "r", encoding=encode_type) as f: result = f.read() # 判断文件类型 # 将内容以字符串形式返回给前端 return response_result(ResponseCode.OK, data=result, lang=get_program_lang(fpath)) @api.route("/evm", methods=['GET','POST']) def hello_evm(): def check(p): if hasattr(request, p): return getattr(request, p) else: return None print(request.method) print(request.headers) print(request.data) print(request.stream.read()) print(request.get_data()) if request.method == "GET": result = { "args": request.args } else: result = { 'args': check('args'), 'json': check('json'), 'data': check('values'), 'form': check('form') } time.sleep(5) return json.dumps(result) @api.route("/store", methods=['GET', 'POST']) def get_store(): result = {} with open("./apiData.json", "r", encoding="utf-8") as f: result = json.loads(f.read()) # logger.info(request.query_string) if request.args and request.args.get("category"): res = [] for item in result.get("appList", []): if item.get("category") == request.args.get("category"): res.append(item) result["appList"] = res result = { 'appList': result["appList"], 'categoryList': result["categoryList"] } return response_result(ResponseCode.OK, data=result) @api.route("/store/app/", methods=['GET', 'POST']) def get_store_app(uuid): logger.info(uuid) result = {} with open("./apiData.json", "r", encoding="utf-8") as f: result = json.loads(f.read()) res = {} for item in result.get("appList", []): if item.get("uuid") == uuid: res = item break return response_result(ResponseCode.OK, data=res) @api.route("/store/appInfo/", methods=['GET', 'POST']) def get_app_info(uuid): result = {} with open("./apiData.json", "r", encoding="utf-8") as f: result = json.loads(f.read()) res = None for item in result.get("downloadList", []): if item.get("apkId") == uuid: res = item break return response_result(ResponseCode.OK, data=res) @api.route("/store/downloadEpk/", methods=['GET', 'POST']) def download_epk(uuid): # 这里要做一些业务处理,根据应用ID和版本号,判断是否有新的应用更新 result = {} with open("./apiData.json", "r", encoding="utf-8") as f: result = json.loads(f.read()) res = {} for item in result.get("downloadList", []): if item.get("apkId") == uuid: res.update(item) break res.update({ 'status': 0, 'time': int(time.time()) }) return response_result(ResponseCode.OK, data=res) @api.route("/opqcp", methods=['POST']) def action_opqcp(): params = request.json result, message = signalManager.actionOpqcp.emit(params) return response_result(ResponseCode.OK, msg=message) @api.route("/build", methods=['POST']) def action_build(): # 接收用户上传的evue文件 # 创建一个文件夹 # 将用户上传文件移动到新创建的文件夹中 # 解析这个evue文件,分别生成3个文件:index.html.bc/index.css.bc/index.js.bc # 对这三个进行zip压缩,将压缩后的zip链接返回给前端 # binfile = request.files.get("binfile") # if not binfile: # return response_result(ResponseCode.REQUEST_ERROR, msg="upload field name error") if len(request.files.getlist('binfile')) < 0: return response_result(ResponseCode.REQUEST_ERROR, msg="upload file is null") target_path = Path(config.get("UPLOAD_PATH")).joinpath(config.get("BYTECODE_DIR")) if not target_path.exists(): target_path.mkdir() target_path = target_path.joinpath(uuid.uuid1().hex) if not target_path.exists(): target_path.mkdir() dst_files = [] zip_filepath = target_path.joinpath("{}.zip".format(target_path.name)).resolve().as_posix() z = zipfile.ZipFile(zip_filepath, 'w') for f in request.files.getlist('binfile'): target = target_path.joinpath(f.filename) if target.suffix not in [".evue", ".js", ".css", ".hml"]: continue with open(target.resolve().as_posix(), "wb+") as fd: fd.write(f.stream.read()) files = [] if target.suffix == ".evue": content = vbuild.render(target.resolve().as_posix()) if content: files = [ (target.parent.joinpath("{}.hml".format(Path(f.filename).stem)).resolve().as_posix(), content.html), (target.parent.joinpath("{}.css".format(Path(f.filename).stem)).resolve().as_posix(), content.style), (target.parent.joinpath("{}.js".format(Path(f.filename).stem)).resolve().as_posix(), content.script) ] else: files = [(target.resolve().as_posix(), True)] for item in files: file, text = item if not text: continue if not isinstance(text, bool): with open(file, "w+") as fd: fd.write(text) result = subprocess.call("./utils/executable -c {file}".format(file=file), shell=True) # new_name = "{}.{}".format(Path(file).name, "bc") # os.rename(os.sep.join([os.getcwd(), "executable.bc"]), os.sep.join([os.getcwd(), new_name])) # dst_files.append(Path(os.getcwd()).joinpath(new_name)) t = Path(file) dst_files.append(t.parent.joinpath("{}.{}".format(t.name, "bc"))) for file in dst_files: z.write(file.resolve().as_posix(), arcname=file.name) # shutil.move(file.resolve().as_posix(), target_path.joinpath(file.name).resolve().as_posix()) # 压缩 if len(dst_files): z.close() result = Path(zip_filepath).resolve().relative_to(Path(config.get("UPLOAD_PATH"))).as_posix() return response_result(ResponseCode.OK, data={ 'url': result, 'filename': os.path.basename(zip_filepath) }) else: return response_result(ResponseCode.SERVER_ERROR) @api.route("/monitor", methods=['GET', 'POST']) def action_monitor(): print(request.json) print(request.data) print(request.form) print(type(request.json)) return response_result(ResponseCode.OK) @api.route("/updatePassword", methods=['POST']) @validate_schema(UpdatePasswordSchema) @Auth.auth_required def update_password(): result, message = signalManager.actionUpdatePassword.emit(request.current_user.get("id"), request.schema_data) if result: return response_result(ResponseCode.OK, data=result, msg=message) else: return response_result(ResponseCode.NOTHING_CHANGE, msg=message) @api.route("/upload", methods=['POST']) # 上传文件 def upload_file(): try: binfile = request.files.get("binfile") if not binfile: return response_result(ResponseCode.REQUEST_ERROR, msg="upload field name error") obj = dict() obj['filename'] = binfile.filename obj['content'] = binfile.stream.read() dtNowString = datetime.now().strftime("%Y%m%d%H%M%S%f") # 文件名构成:文件名_时间日期.文件后缀 filename = os.path.splitext(obj['filename'])[0] + "_{}".format(dtNowString) + os.path.splitext(obj['filename'])[-1] # 获取相对路径 relative_path = os.sep.join([config.get("TEMP_DIR"), dtNowString]) # 获取最终存储的绝对路径 savePath = os.path.normpath(os.sep.join([config.get("UPLOAD_PATH"), relative_path])) # 获取最终存储的文件路径 saveFile = os.path.normpath(os.sep.join([savePath, filename])) if not os.path.exists(savePath): os.makedirs(savePath) with open(saveFile, 'wb') as f: # 保存文件 f.write(obj['content']) result = { "uuid": str(uuid.uuid4()), # 附件唯一编号 "filename": obj['filename'], # 附件名称 "filesize": os.path.getsize(saveFile), # 附件大小 "filepath": os.sep.join([relative_path, filename]).replace("\\", "/"), # 附件存储路径 } return response_result(ResponseCode.OK, data=result, msg="upload file [%s] successfully!" % obj['filename']) except Exception as e: traceback.print_exc() logger.error(str(e)) return response_result(ResponseCode.SERVER_ERROR, msg=str(e)) @api.route("/system/updateDatabase", methods=['GET']) def update_db(): result = [] for index in range(16): result.append(str(uuid.uuid1())) # conn = sqlite3.connect('./app-store.db') # cur = conn.cursor() # update_sql = """update test set name = 'noname' where id = ?""" # x = (1, ) # cur.execute(update_sql, x) # # commit()提交事务 # conn.commit() # # 关闭游标 # cur.close() # # 关闭连接 # conn.close() return response_result(ResponseCode.OK, data=result) @api.route("/system/generateFont", methods=['POST']) def generate_font(): if not request.form: return response_result(ResponseCode.PARAMETER_ERROR, msg="form can not be null") sizes = request.form.get("sizes", None) if not sizes: return response_result(ResponseCode.PARAMETER_ERROR, msg="param sizes can not be null") font = request.files.get("font", None) if not font: return response_result(ResponseCode.PARAMETER_ERROR, msg="param font can not be null") # 保存字体文件到临时目录 random_str = uuid.uuid4().hex temp_dir = Path(os.path.dirname(os.path.dirname(__file__))).joinpath("temp").joinpath(random_str) if not temp_dir.exists(): # temp_dir.mkdir() os.makedirs(temp_dir.resolve().as_posix()) fontSavePath = temp_dir.joinpath(font.filename).resolve().as_posix() font.save(fontSavePath) # 判断用户输入的字典是否为空 text_dict = request.form.get("text", None) if not text_dict: return response_result(ResponseCode.PARAMETER_ERROR, msg="param text can not be null") # 如果通过上面校验,则进行生成文件流程 # 参数一:字体文件、参数二:输出目录 target_dir = Path(config.get("UPLOAD_PATH")).joinpath(config.get("TEMP_DIR")).joinpath(datetime.now().strftime("%Y%m%d%H%M%S%f")) if not target_dir.exists(): target_dir.mkdir() # 当生成字体比较大的时候,是否考虑启用线程处理文字? sizes = sizes.split(",") ts = [] for s in sizes: ts.append(int(s)) evueFontTool = EvueFontTool(fontSavePath, target_dir.resolve().as_posix(), sizes=ts) evueFontTool.dump(text_dict) # 这里是否有必要让用户输入最终生成的文件名,可选 filename = request.form.get("filename", "fonts.bin") mkromfs(target_dir.resolve().as_posix(), "/", filename) # 删除字体文件 os.chdir(temp_dir.parent.resolve().as_posix()) shutil.rmtree(random_str) return response_result(ResponseCode.OK, data={ 'filename': filename, 'url': target_dir.joinpath(filename).resolve().relative_to(config.get("UPLOAD_PATH")).as_posix(), }) @api.route("/system/parseCHeader", methods=["POST"]) def parse_header_files(): # 获取上传的文件,另存到uploads文件夹中。文件名命名规则为:时间/源文件内容 # 遍历这些上传的文件,逐个解析 # 获取解析完成后的返回结果 # 如果有错误,则向前端提示错误 # 如果没有错误,则向前端返回下载地址。最终需要打包成压缩文件,放到nginx负载的公网目录下 params = request.form if not params: params = {} if params.get("type", None) and params.get("type") not in ["evm", "cffi"]: return response_result(ResponseCode.REQUEST_ERROR, msg="type value is invaild") if len(request.files.getlist('binfile')) < 0: return response_result(ResponseCode.REQUEST_ERROR, msg="nothing upload") target_path = Path(config.get("UPLOAD_PATH")).joinpath(config.get("TEMP_DIR")) if not target_path.exists(): target_path.mkdir() target_path = target_path.joinpath(uuid.uuid1().hex) if not target_path.exists(): target_path.mkdir() binfile = request.files.getlist("binfile") if not binfile: return response_result(ResponseCode.REQUEST_ERROR, msg="nothing upload...") files = [] source_file_list = [] for f in request.files.getlist('binfile'): target = target_path.joinpath(f.filename) if target.suffix not in [".h", ".hpp", ".c", ".cpp"]: continue with open(target.resolve().as_posix(), "wb+") as fd: fd.write(f.stream.read()) files.append(target) source_file_list.append(target.relative_to(Path(config.get("UPLOAD_PATH"))).as_posix()) # 加载系统内置类型映射文件 config_file = "typeconfig.json" typeconfig = {} if os.path.exists(config_file): encode_type = get_encoding(config_file) with open(config_file, "r", encoding=encode_type) as f: typeconfig = json.loads(f.read()) if len(typeconfig.keys()) > 0: update_mark_type(typeconfig) try: undefined_type = [] error_tips = [] result_list = [] for file in files: result = parse_header_file(file.resolve().as_posix(), undefined_type, error_tips, target_path.resolve().as_posix()) if result != None: target = target_path.joinpath("analyze.{}.json".format(file.name)) source_file_list.append(target.relative_to(Path(config.get("UPLOAD_PATH"))).as_posix()) target = target_path.joinpath("generate.{}.json".format(file.name)) with open(target.resolve().as_posix(), "w+", encoding="utf-8") as f: f.write(json.dumps(result, indent=2)) source_file_list.append(target.relative_to(Path(config.get("UPLOAD_PATH"))).as_posix()) target = target_path.joinpath("{}.json".format(file.name)) with open(target.resolve().as_posix(), "w+", encoding="utf-8") as f: f.write(json.dumps(result)) if params.get("name", None) != None and len(files) <= 1: result["name"] = params.get("name") result_list.append(result) source_file_list.append(target.relative_to(Path(config.get("UPLOAD_PATH"))).as_posix()) target_dir = target_path.parent if len(result_list) > 0: path_list = generate_from_list(result_list, params["type"], target_path.resolve().as_posix()) target_file = compress_files(path_list, target_dir.resolve().as_posix()) for file in path_list: source_file_list.append(Path(file["c"]).relative_to(Path(config.get("UPLOAD_PATH"))).as_posix()) source_file_list.append(Path(file["h"]).relative_to(Path(config.get("UPLOAD_PATH"))).as_posix()) source_file_list.append(Path(file["j"]).relative_to(Path(config.get("UPLOAD_PATH"))).as_posix()) if len(undefined_type) > 0: # 说明有用户自定义的类型,需要用户标注这些类型 return response_result(ResponseCode.REQUEST_ERROR, data={ 'undefined_type': undefined_type, 'dir': target_path.name }, msg=error_tips) else: # 没啥问题,返回下载地址给前端 target_file = Path(target_file).relative_to(Path(config.get("UPLOAD_PATH"))) data = { "file": target_file.name, "url": target_file.as_posix(), "parent": target_path.relative_to(target_dir.parent).as_posix(), "sources": source_file_list } return response_result(ResponseCode.OK, data=data) except Exception as e: traceback.print_exc() logger.error(str(e)) return response_result(ResponseCode.SERVER_ERROR, msg=str(e)) @api.route("/system/processParse", methods=['POST']) def process_parse(): # 校验参数,没有参数则不处理 # 判断目录是否存在 # 获取该目录下所有文件 # 更新标记类型json文件 # 解析头文件 params = request.json if len(params) <= 0: return response_result(ResponseCode.PARAMETER_NULL) if params.get("module", None) and params.get("module") not in ["evm", "cffi"]: return response_result(ResponseCode.REQUEST_ERROR, msg="module value is invaild") target_dir = Path(config.get("UPLOAD_PATH")).joinpath(config.get("TEMP_DIR")).joinpath(params["dir"]) if not target_dir.exists(): return response_result(ResponseCode.INVAILD_REQUEST, msg="target directory not exists") # 加载系统内置类型映射文件 config_file = "typeconfig.json" typeconfig = {} encode_type = "utf-8" if os.path.exists(config_file): encode_type = get_encoding(config_file) with open(config_file, "r", encoding=encode_type) as f: typeconfig = json.loads(f.read()) if not typeconfig: typeconfig = {} if isinstance(params["type"], dict) and len(params["type"].keys()) > 0: typeconfig.update(params["type"]) if len(typeconfig.keys()) > 0: conf = update_mark_type(typeconfig) with open(config_file, "w", encoding=encode_type) as f: f.write(json.dumps(conf)) try: undefined_type = [] error_tips = [] result_list = [] source_file_list = [] for root, dirs, files in os.walk(target_dir.resolve().as_posix()): target_path = Path(root) for file in files: target_file = target_path.joinpath(file) if target_file.exists(): result = parse_header_file(target_file.resolve().as_posix(), undefined_type, error_tips, root) if result != None: target = target_path.joinpath("analyze.{}.json".format(file.name)) source_file_list.append(target.relative_to(Path(config.get("UPLOAD_PATH"))).as_posix()) target = target_path.joinpath("generate.{}.json".format(file.name)) with open(target.resolve().as_posix(), "w+", encoding="utf-8") as f: f.write(json.dumps(result, indent=2)) source_file_list.append(target.relative_to(Path(config.get("UPLOAD_PATH"))).as_posix()) target = target_file.parent.joinpath("{}.json".format(target_file.name)) with open(target.resolve().as_posix(), "w+", encoding="utf-8") as f: f.write(json.dumps(result)) if params.get("name", None) and len(files) <= 1: result["name"] = params.get("name") result_list.append(result) source_file_list.append(target.relative_to(Path(config.get("UPLOAD_PATH"))).as_posix()) source_file_list.append(file) if len(result_list) > 0: path_list = generate_from_list(result_list, params["module"], target_dir.resolve().as_posix()) target_dir = Path(config.get("UPLOAD_PATH")).joinpath(config.get("UPLOAD_DIR")).resolve().as_posix() target_file = compress_files(path_list, target_dir) for file in path_list: source_file_list.append(Path(file["c"]).relative_to(Path(config.get("UPLOAD_PATH"))).as_posix()) source_file_list.append(Path(file["h"]).relative_to(Path(config.get("UPLOAD_PATH"))).as_posix()) source_file_list.append(Path(file["j"]).relative_to(Path(config.get("UPLOAD_PATH"))).as_posix()) if len(undefined_type) <= 0: target_file = Path(target_file).relative_to(Path(config.get("UPLOAD_PATH"))) return response_result(ResponseCode.OK, data={ "file": target_file.name, "url": target_file.as_posix(), "parent": target_file.resolve().as_posix(), "sources": source_file_list }) else: return response_result(ResponseCode.REQUEST_ERROR, data={ 'undefined_type': undefined_type, 'dir': params["dir"] }, msg=error_tips) except Exception as e: traceback.print_exc() logger.error(str(e)) return response_result(ResponseCode.SERVER_ERROR, msg=str(e)) @api.route("/system/getMarkType", methods=['GET']) def get_mark_type(): data = get_mark_type_list() return response_result(ResponseCode.OK, data=data) @api.route("/system/updateMarkType", methods=['POST']) def modify_mark_type(): params = request.json data = update_mark_type(params) return response_result(ResponseCode.OK, data=data) @api.route("/system/convertString", methods=['POST']) @validate_schema(ConvertString) def convert_string(): result = signalManager.actionGetConvertString.emit(request.schema_data) return response_result(ResponseCode.OK, data=result) @api.route("/application/build", methods=["post"]) @validate_schema(ApplicationBuildSchema) def application_build(): try: if request.method == 'POST' and 'binfiles' in request.files: files = [] data = request.schema_data dt = datetime.now().strftime("%Y%m%d%H%M%S") upload_path = os.sep.join([config["UPLOAD_PATH"], config["TEMP_DIR"], str(data['access_key']), dt]) if not os.path.exists(upload_path): os.makedirs(upload_path) for f in request.files.getlist('binfiles'): filename = secure_filename(f.filename) file_path = os.sep.join([upload_path, filename]) f.save(file_path) files.append(file_path) result, message = signalManager.actionApplicationBuild.emit(files, data) if result: return response_result(ResponseCode.OK, data=result, msg=message) else: return response_result(ResponseCode.REQUEST_ERROR, msg=message) else: return response_result(ResponseCode.REQUEST_ERROR, msg="files can not be null") except Exception as e: traceback.print_exc() logger.error(str(e)) return response_result(ResponseCode.SERVER_ERROR, msg=str(e))