#!/usr/bin/env python # -*- coding: utf_8 -*- import os import re import sys import traceback import tempfile import shutil import base64 import logging from flask import Blueprint, request, redirect, url_for, json from app.setting import config if ((3, 0) <= sys.version_info <= (3, 10)): from hashlib import md5 as fmd5 elif ((2, 0) <= sys.version_info <= (2, 10)): import md5 fmd5 = md5.new logger = logging.getLogger("filesApi") file_api = Blueprint("file_api", __name__, url_prefix="/api/v1/file") FileStoragePath = os.path.join(config.get("UPLOAD_PATH"), config.get("NETDISC")) def ajaxCheckAcess(path): realpath = os.path.realpath(path) if not realpath.startswith(FileStoragePath): return False return True def checkPath(path): if not path: return False, {"code": -1, "data": {}, "message": "[%s] arg missed!" % path} fpath = os.path.abspath(os.sep.join([os.path.abspath(FileStoragePath), path])) if not ajaxCheckAcess(fpath): return False, {"code": -1, "data": {}, "message": "You have no access to [%s]!" % fpath} if not os.path.exists(fpath): return False, {"code": -1, "data": {}, "message": "[%s] is not existed!" % fpath} return True, os.path.abspath(fpath) def saveToFile(saveFile, content): try: tfn = tempfile.mktemp() tf = open(tfn, 'wb+') tf.write(content) tf.close() os.rename(tfn, saveFile) return True except Exception as e: traceback.format_exc() logger.error(str(e)) return False def getFileInfo(infofile): return { "isfile": os.path.isfile(infofile), "isdir": os.path.isdir(infofile), "size": os.path.getsize(infofile), "atime": os.path.getatime(infofile), "mtime": os.path.getmtime(infofile), "ctime": os.path.getctime(infofile), "name": os.path.basename(infofile) } # 参数 {"path":"/xxx/xxx.png"} # path 需要下载的文件名 @file_api.route("/download", methods=["POST"]) def download(): try: obj = json.loads(request.data) isAcessed, path = checkPath(obj["path"]) if not isAcessed: return { "code": -1, "data": {}, "message": "invaild access" } if not os.path.isfile(path): return {"code": -1, "data": {}, "message": "Path [%s] is not a valid file!" % path} with open(path, "rb") as f: content = base64.b64encode(f.read()) md5code = fmd5(content).hexdigest() return { "code": 0, "data": { "content": content, "md5": md5code, "filename": os.path.basename(path) }, "message": "download file [%s] successfully!" % obj['path'] } except Exception as e: return {"code": -1, "data": {}, "message": "upload file [%s] failed!\n %s" % (obj['path'], repr(e))} # 参数 {"filename":"xxx.jpg","path":"/upload/","content":"xxxxxxx","md5":"xxxxxxxxxxxxxx"} # filename 保存的文件名,不带路径 # path 保存的路径 # content 文件的内容,采用base64编码 # md5 文件的MD5校验值 @file_api.route("/upload", methods=["POST"]) def upload(): try: obj = json.loads(request.data) if not obj['filename'] or not obj['content']: return {"code": -1, "data": {}, "message": "filename、path、content and md5 is not completed"} saveFile = os.path.normpath(os.sep.join([FileStoragePath, obj['filename']])) if not ajaxCheckAcess(saveFile): return {"code": -1, "data": {}, "message": "You have no access to [%s]!" % saveFile} if os.path.exists(saveFile): return {"code": -1, "data": {}, "message": "File [%s] is existed! Please remove firstly" % saveFile} fr = base64.b64decode(obj['content']) md5code = fmd5(obj['content']).hexdigest() if obj['md5'] != md5code: return {"code": -1, "data": {}, "message": "File md5 [%s] != [%s]" % (obj['md5'], md5code)} if saveToFile(saveFile, fr): return {"code": 0, "data": {}, "message": "upload file [%s] successfully!" % obj['filename']} else: return {"code": -1, "data": {}, "message": "upload file [%s] failed!" % obj['filename']} except Exception as e: logger.error(str(e)) binfile = request.files.get("binfile") obj = {} obj['filename'] = binfile.filename obj['content'] = binfile.stream.read() saveFile = os.path.normpath(os.sep.join([FileStoragePath, obj['filename']])) with open(saveFile, 'wb') as f: f.write(obj['content']) return {"code": 0, "data": {}, "message": "upload file by ElementUI[%s] successfully!" % obj['filename']} # 参数 {"path":"/xxx/xxx.png"} # path 需要删除的文件名或者目录名 @file_api.route("/remove", methods=["POST"]) def delete(): obj = json.loads(request.data) isAcessed, path = checkPath(obj["path"]) if not isAcessed: return { "code": -1, "data": {}, "message": "invaild access" } try: if os.path.isfile(path): os.remove(path) return {"code": 0, "data": {}, "message": "delete file [%s] successfully!" % path} elif os.path.isdir(path): os.rmdir(path) return {"code": 0, "data": {}, "message": "delete dir [%s] successfully!" % path} else: return {"code": 0, "data": {}, "message": "Path [%s] is not a valid file or path!" % path} except Exception as e: return {"code": -1, "data": {}, "message": repr(e)} # 参数 {"path":"/"} # path 路径 @file_api.route("getDirectoryList", methods=["POST"]) def dirlist(): obj = json.loads(request.data) isAcessed, path = checkPath(obj["path"]) if not isAcessed: return { "code": -1, "data": {}, "message": "invaild access" } result = [] for p in os.listdir(path): result.append(getFileInfo(os.path.join(FileStoragePath, p))) return {"code": 0, "data": result, "message": "Get [%s] successfully!" % path} # 参数 {"path":"/xe/xxx.png"} # file 需求获取MD5的文件 @file_api.route("/getFileMD5", methods=["POST"]) def filemd5(): obj = json.loads(request.data) isAcessed, path = checkPath(obj["path"]) if not isAcessed: return { "code": -1, "data": {}, "message": "invaild access" } if not os.path.isfile(path): return {"code": -1, "data": {}, "message": "Path [%s] is not a valid file!" % path} with open(path, "rb") as f: filemd5 = fmd5(f.read()).hexdigest() return {"code": 0, "data": filemd5, "message": "Get md5 of [%s] successfully!" % path} # 参数 {"path":"/xe/xxx.png"} # file 需要获取信息的文件或目录 @file_api.route("/getFileInfo", methods=["POST"]) def fileinfo(): obj = json.loads(request.data) isAcessed, path = checkPath(obj["path"]) if not isAcessed: return { "code": -1, "data": {}, "message": "invaild access" } if not os.path.isfile(path): return {"code": -1, "data": {}, "message": "Path [%s] is not a valid file!" % path} return {"code": 0, "result": getFileInfo(path), "message": "Get md5 of [%s] successfully!" % path}