import pprint import base64 import shutil from pathlib import Path import json import mimetypes from models.user import UserModel from webcreator.log import logger from application.config import config from webcreator.response import ResponseCode disk_root = config.UPLOAD_ROOT_DIR class FileManager(object): def __init__(self) -> None: pass ''' @description: 根据前端传递的路径参数,获取该路径的目录以及文件信息,文件类别,可以看做是一个盘符 @param {*} self @return {*} ''' def initialize(self, path, jwt): ''' disks: { files: {driver: "local"}, images: {driver: "local"} } lang: "en" leftDisk: null rightDisk: null windowsConfig: 2 ''' result = { "disks": {}, "lang": "zh", "leftDisk": None, "rightDisk": None, "windowsConfig": 1 } # 这里需要过滤,有些目录只能管理员才能查看 user = UserModel.query.filter(UserModel.uuid==jwt.get("uuid")).one_or_none() if user.role == 1: p = Path(disk_root) for child in p.iterdir(): if child.is_dir(): result["disks"].update({ child.name: { "driver": "local" } }) else: result["disks"].update({ "epks": { "driver": "local" } }) return result, ResponseCode.HTTP_SUCCESS ''' @description: 获取当前类别的目录信息 @param {*} self @return {*} ''' def content(self, disk, target_path='.', root="."): ''' 目录信息结构体: { basename: "docs" dirname: "" path: "docs" timestamp: 1556821372 type: "dir" }, { basename: "cars" dirname: "wallpapers" path: "wallpapers/cars" timestamp: 1544277291 type: "dir" } 文件信息结构体: { basename: "alfa.sql" dirname: "code" extension: "sql" filename: "alfa" path: "code/alfa.sql" size: 3208 timestamp: 1544277274 type: "file" } ''' result = { "directories": [], "files": [] } if target_path != None and target_path.find(disk) > -1: disk_path = Path(disk_root) else: disk_path = Path(disk_root).joinpath(disk) if not disk_path.exists(): logger.info(disk_path) return None, ResponseCode.DIRECTORY_NOT_EXISTS if target_path == None: target_path = '' target_path = Path(target_path) target_path = disk_path.joinpath(target_path) if not target_path.exists(): logger.info(target_path) return None, ResponseCode.DIRECTORY_NOT_EXISTS for child in target_path.iterdir(): if child.resolve().as_posix().find(root) < 0: continue if child.is_dir(): result["directories"].append({ "basename": child.name, "dirname": child.parent.relative_to(disk_path).as_posix(), "path": child.resolve().relative_to(disk_path).as_posix(), "timestamp": int(child.stat().st_mtime), "type": "dir" }) else: result["files"].append({ "basename": child.name, "dirname": child.parent.relative_to(disk_path).as_posix(), "extension": child.suffix[1:], "filename": child.stem, "path": child.resolve().relative_to(disk_path).as_posix(), "size": child.stat().st_size, "timestamp": int(child.stat().st_mtime), "type": "file" }) with open("result.json", "w") as f: json.dump(result, f) f.seek(0) f.truncate() f.write(json.dumps(result, ensure_ascii=True)) # pprint.pprint(result) if len(result["directories"]) or len(result["files"]): return result, ResponseCode.HTTP_SUCCESS else: return None, ResponseCode.HTTP_NO_DATA ''' @description: 获取目录结构树 @param {*} self @return {*} ''' def tree(self, disk, target_path=".", root="."): ''' { basename: "trees" dirname: "wallpapers/nature" path: "wallpapers/nature/trees" props: { hasSubdirectories: false } timestamp: 1544277291 type: "dir" } ''' if target_path == None: target_path = '' target_path = Path(target_path) result = [] rp = Path(disk_root) disk_path = rp / disk if not disk_path.exists(): return result, ResponseCode.DIRECTORY_NOT_EXISTS temp_path = disk_path.joinpath(target_path) if not temp_path.exists(): temp_path = Path(disk_root).joinpath(target_path) if not temp_path.exists(): return result, ResponseCode.DIRECTORY_NOT_EXISTS # p = Path(disk_path) for child in temp_path.iterdir(): if child.is_dir() and child.resolve().name.find(root) > -1: result.append({ "basename": child.name, "dirname": child.parent.relative_to(rp).as_posix(), "path": child.relative_to(rp).as_posix(), "props": { "hasSubdirectories": True if child.iterdir() else False }, "timestamp": int(child.stat().st_mtime), "type": "dir" }) # pprint.pprint(result) # logger.info(result) return result, ResponseCode.HTTP_SUCCESS def disk(self, disk): # select-disks print(disk) return True, ResponseCode.HTTP_SUCCESS def update_file(self, disk, path, file): # update file # 判断文件是否存在 target_dir = Path(disk_root).joinpath(disk).joinpath(path) if not target_dir.is_dir(): return False, ResponseCode.HTTP_INVAILD_REQUEST if not target_dir.exists(): target_dir.mkdir() target_file = target_dir.joinpath(file.filename) if target_file.exists(): if not target_file.is_file(): return False, ResponseCode.HTTP_INVAILD_REQUEST target_file.unlink() file.save(target_file.resolve().as_posix()) return True, ResponseCode.HTTP_SUCCESS def upload(self, disk, path, overwrite, fileList): # upload target_dir = Path(disk_root).joinpath(disk).joinpath(path) if not target_dir.exists(): target_dir.mkdir() for file in fileList: target_file = target_dir.joinpath(file.filename) if target_file.exists() and overwrite == 0: target_file.unlink() elif target_file.exists(): continue file.save(target_file.resolve().as_posix()) return True, ResponseCode.HTTP_SUCCESS def create_dir(self, disk, path, name): # create directory print(disk, path, name) target_dir = Path(disk_root).joinpath(disk).joinpath(path).joinpath(name) if not target_dir.exists(): target_dir.mkdir() return True, ResponseCode.HTTP_SUCCESS def create_file(self, disk, path, name): # create file print(disk, disk, path, name) target_dir = Path(disk_root).joinpath(disk).joinpath(path) if not target_dir.exists(): target_dir.mkdir() target_file = target_dir.joinpath(name) if target_file.exists(): return False, ResponseCode.FILE_EXISTS target_file.touch() return True, ResponseCode.HTTP_SUCCESS def delete(self, disk, items): # delete file print(disk, items, type(items)) for item in items: target = Path(disk_root).joinpath(disk).joinpath(item.get("path")) if not target.exists(): return False, ResponseCode.APPLICATION_NOT_EXISTS if target.is_dir(): target.mkdir() elif target.is_file(): target.unlink() return True, ResponseCode.HTTP_SUCCESS def copy(self, disk): # copy file # 无动作,前端缓存相关变量值 print(disk) return True, ResponseCode.HTTP_SUCCESS def cut(self, disk): # cut file # 复制源文件路径 # 无动作,前端缓存相关变量值 print(disk) return True, ResponseCode.HTTP_SUCCESS def paste(self, disk, path, clipboard): # paste file ''' directories: [] disk: "files" files: ["code/config.ini"] type: "copy" directories: [] disk: "files" files: ["code/alfa.sql"] type: "cut" ''' # 粘贴到当前目录,然后删除源文件 # 需要区分是复制操作还是剪贴操作 print(disk, path, clipboard) target_dir = Path(disk_root).joinpath(disk).joinpath(path) for dir in clipboard.get("directories", []): source_dir = Path(disk_root).joinpath(disk).joinpath(dir) if not source_dir.exists(): continue if target_dir.joinpath(dir).exists(): continue shutil.copytree(source_dir.resolve().as_posix(), target_dir.resolve().as_posix()) if clipboard.get("type") == "cut": source_dir.rmdir() for file in clipboard.get("files", []): source_file = Path(disk_root).joinpath(disk).joinpath(file) if not source_file.exists(): continue if target_dir.joinpath(file).exists(): continue shutil.copyfile(source_file.resolve().as_posix(), target_dir.resolve().as_posix()) if clipboard.get("type") == "cut": source_dir.unlink() return True, ResponseCode.HTTP_SUCCESS def download(self, disk, target_file): # 获取文件内容 if not target_file: target_file = Path('result.json') else: target_file = Path(disk_root).joinpath(disk).joinpath(target_file) if not target_file.exists(): return False, ResponseCode.HTTP_NOT_FOUND # with open(target_file.resolve().as_posix(), "r", encoding="utf-8") as f: # data = f.read() # print(data) mime = mimetypes.guess_type(target_file.resolve().as_posix())[0] content = target_file.read_text(encoding="utf-8") if mime == "application/json": content = json.dumps(content) return (content, mime), ResponseCode.HTTP_SUCCESS def preview(self, disk, target_file): # 预览图片 if not target_file: target_file = Path('evue_photo.png') else: target_file = Path(disk_root).joinpath(disk).joinpath(target_file) if not target_file.exists(): return False, ResponseCode.HTTP_NOT_FOUND mime = mimetypes.guess_type(target_file.resolve().as_posix())[0] # mime = MimeTypes.guess_type(target_file.resolve().as_posix()) img_stream = target_file.read_bytes() # with open(target_file, 'rb') as img_f: # img_stream = img_f.read() # img_stream = base64.b64encode(img_stream).decode() return (img_stream, mime), ResponseCode.HTTP_SUCCESS fileManager = FileManager() if __name__ == "__main__": ''' python -m memory_profiler example.py ''' # from memory_profiler import profile # @profile # def test(): # pass result = fileManager.initialize() print("----->", result) result = fileManager.content("uploads", "evueapps/evm") print(">>>>>>", result) result = fileManager.tree("uploads", "evueapps/evm") print("=====>", result) result = fileManager.preview("uploads", "evueapps/evm") print("$$$$$>", result)