import pprint import base64 from pathlib import Path import json import mimetypes from webcreator.log import logger from application.config import config from webcreator.response import ResponseCode disk_root = config.UPLOAD_ROOT_DIR logger.info(disk_root) class FileManager(object): def __init__(self) -> None: pass ''' @description: 根据前端传递的路径参数,获取该路径的目录以及文件信息,文件类别,可以看做是一个盘符 @param {*} self @return {*} ''' def initialize(self): ''' disks: { files: {driver: "local"}, images: {driver: "local"} } lang: "en" leftDisk: null rightDisk: null windowsConfig: 2 ''' result = { "disks": {}, "lang": "zh", "leftDisk": None, "rightDisk": None, "windowsConfig": 1 } # 这里需要过滤,有些目录只能管理员才能查看 p = Path(disk_root) for child in p.iterdir(): if child.is_dir(): result["disks"].update({ child.name: { "driver": "local" } }) return result, ResponseCode.HTTP_SUCCESS ''' @description: 获取当前类别的目录信息 @param {*} self @return {*} ''' def content(self, disk, target_path='/'): ''' 目录信息结构体: { basename: "docs" dirname: "" path: "docs" timestamp: 1556821372 type: "dir" }, { basename: "cars" dirname: "wallpapers" path: "wallpapers/cars" timestamp: 1544277291 type: "dir" } 文件信息结构体: { basename: "alfa.sql" dirname: "code" extension: "sql" filename: "alfa" path: "code/alfa.sql" size: 3208 timestamp: 1544277274 type: "file" } ''' target_path = Path(target_path) result = { "directories": [], "files": [] } disk_path = Path(disk_root).joinpath(disk) if not disk_path.exists(): return result target_path = disk_path.joinpath(target_path) if not target_path.exists(): return result for child in target_path.iterdir(): if child.is_dir(): result["directories"].append({ "basename": child.name, "dirname": child.parent.relative_to(disk_path).as_posix(), "path": child.resolve().relative_to(disk_path).as_posix(), "timestamp": int(child.stat().st_mtime), "type": "dir" }) else: result["files"].append({ "basename": child.name, "dirname": child.parent, "extension": child.suffix[1:], "filename": child.stem, "path": child.resolve().relative_to(disk_path).as_posix(), "size": child.stat().st_size, "timestamp": int(child.stat().st_mtime), "type": "file" }) with open("result.json", "w") as f: json.dump(result, f) f.seek(0) f.truncate() f.write(json.dumps(result, ensure_ascii=True)) pprint.pprint(result) return result, ResponseCode.HTTP_SUCCESS ''' @description: 获取目录结构树 @param {*} self @return {*} ''' def tree(self, disk, target_path="/"): ''' { basename: "trees" dirname: "wallpapers/nature" path: "wallpapers/nature/trees" props: { hasSubdirectories: false } timestamp: 1544277291 type: "dir" } ''' target_path = Path(target_path) result = [] rp = Path(disk_root) disk_path = rp / disk if not disk_path.exists(): return result temp_path = disk_path.joinpath(target_path) if not temp_path.exists(): return result p = Path(disk_path) for child in p.iterdir(): if child.is_dir(): result.append({ "basename": child.name, "dirname": child.parent.relative_to(rp).as_posix(), "path": child.relative_to(rp).as_posix(), "props": { "hasSubdirectories": True if child.iterdir() else False }, "timestamp": int(child.stat().st_mtime), "type": "dir" }) pprint.pprint(result) return result, ResponseCode.HTTP_SUCCESS def disk(self, disk): # select-disks print(disk) return True, ResponseCode.HTTP_SUCCESS def upload(self, disk): # select-disks print(disk) return True, ResponseCode.HTTP_SUCCESS def create_dir(self, disk, target_file): # create directory print(disk, target_file) return True, ResponseCode.HTTP_SUCCESS def create_file(self, disk, target_file, content): # create file print(disk, target_file, content) return True, ResponseCode.HTTP_SUCCESS def delete(self, disk): # delete file print(disk) return True, ResponseCode.HTTP_SUCCESS def copy(self, disk): # copy file print(disk) return True, ResponseCode.HTTP_SUCCESS def cut(self, disk): # cut file print(disk) return True, ResponseCode.HTTP_SUCCESS def paste(self, disk): # paste file print(disk) return True, ResponseCode.HTTP_SUCCESS def download(self, disk, target_file): # 获取文件内容 if not target_file: target_file = Path('result.json') else: target_file = Path(disk_root).joinpath(disk).joinpath(target_file) if not target_file.exists(): return False, ResponseCode.HTTP_NOT_FOUND # with open(target_file.resolve().as_posix(), "r", encoding="utf-8") as f: # data = f.read() # logger.info(data) mime = mimetypes.guess_type(target_file.resolve().as_posix())[0] content = target_file.read_text(encoding="utf-8") return (content, mime), ResponseCode.HTTP_SUCCESS def preview(self, disk, target_file): # 预览图片 if not target_file: target_file = Path('evue_photo.png') else: target_file = Path(disk_root).joinpath(disk).joinpath(target_file) if not target_file.exists(): return False, ResponseCode.HTTP_NOT_FOUND mime = mimetypes.guess_type(target_file.resolve().as_posix())[0] # mime = MimeTypes.guess_type(target_file.resolve().as_posix()) img_stream = target_file.read_bytes() # with open(target_file, 'rb') as img_f: # img_stream = img_f.read() # img_stream = base64.b64encode(img_stream).decode() return (img_stream, mime), ResponseCode.HTTP_SUCCESS fileManager = FileManager() if __name__ == "__main__": ''' python -m memory_profiler example.py ''' # from memory_profiler import profile # @profile # def test(): # pass result = fileManager.initialize() print("----->", result) result = fileManager.content("uploads", "evueapps/evm") print(">>>>>>", result) result = fileManager.tree("uploads", "evueapps/evm") print("=====>", result) result = fileManager.preview("uploads", "evueapps/evm") print("$$$$$>", result)