#-*- coding: UTF-8 -*- #!/usr/bin/python import os import sys import fs import struct import json from collections import OrderedDict import zlib import pprint import hashlib from ctypes import * import platform current_abspath = os.path.dirname(os.path.realpath(__file__)) if platform.system() == 'Windows': pDll = CDLL(os.sep.join([current_abspath, "lib", "eheatshrink.dll"])) elif platform.system() == 'Linux': pDll = CDLL(os.sep.join([current_abspath, "lib", "libeheatshrink.so"])) pDll.ecompress_size.restype = c_uint32 pDll.ecompress_size.argtypes = [c_void_p, c_uint32] pDll.ecompress.restype = POINTER(c_uint8) pDll.ecompress.argtypes = [c_void_p, c_uint32] def heatshrink_compress(buf:bytes, level:int): count = len(buf) size = pDll.ecompress_size(buf, count) pDll.ecompress.restype = POINTER(c_uint8) pDll.ecompress.argtypes = [c_void_p, c_uint32] ret = pDll.ecompress(buf, count) arr = bytearray(size) i = 0 while i < size: arr[i] = ret[i] i = i+1 return arr def str_to_hex(s): return ' '.join([hex(ord(c)).replace('0x', '') for c in s]) def hex_to_str(s): return ''.join([chr(i) for i in [int(b, 16) for b in s.split(' ')]]) def str_to_bin(s): return ' '.join([bin(ord(c)).replace('0b', '') for c in s]) def bin_to_str(s): return ''.join([chr(i) for i in [int(b, 2) for b in s.split(' ')]]) class EpkApp(object): def __init__(self, appName, appDir, algorithm='zlib',appVersion="1.0", output="epks"): super(EpkApp, self).__init__() self._appName = appName self._appDir = os.path.abspath(appDir) self.algorithm = algorithm self._appVersion = appVersion self._appCRCCode = None self._files = [] self._infoPath = os.sep.join([self._appDir, "%s.json" % self._appName]) self._epksDir = output if not os.path.exists(self._epksDir): fs.open_fs(os.getcwd()).makedirs(output) self._epkName = os.sep.join([self._epksDir, "%s.epk" % self._appName]) def compress(self): if self.algorithm == 'h': return heatshrink_compress return zlib.compress def epkInfo(self): epkInfo = OrderedDict({ "appName": self._appName, "appVersion": self._appVersion, "files": self.fileinfos(self._appDir), }) infocontent = json.dumps(epkInfo) with open(self._infoPath, "w", encoding='utf-8') as f: f.write(infocontent) return epkInfo def fileinfos(self, path): path = os.path.abspath(path) home_fs = fs.open_fs(path) files = [] for jspath in home_fs.glob('*', namespaces=['details']): fpath = "C:/%s" % jspath.info.name fname = jspath.info.name fsize = jspath.info.size fbasename, fext = os.path.splitext(jspath.info.name) if fext in ["", ".exe", ".dll", ".nv", ".conf"]: continue finfo = { "path": fpath, "name": fname, "size": fsize, "basename": fbasename, "ext": fext } if self._infoPath == os.sep.join([path, fname]): files.insert(0, finfo) else: files.append(finfo) if fext == ".evue": self.fileMD5(finfo) return files def header(self, epk_start=0xAAAA, md5_offset=0, file_count=0): bytes_header = struct.pack("<HLH", epk_start, md5_offset, file_count) return bytes_header def fileMD5(self, info): md5path = os.sep.join([self._appDir, "%s.md5" % info["basename"]]) fpath = os.sep.join([self._appDir, info["name"]]) with open(fpath, "rb") as f: filecontent = f.read() newmd5 = self.md5(filecontent) with open(md5path, "wb") as f: f.write(newmd5) return newmd5 def sign(self, content): ret = b"" for i in range(int(len(content) / 2 )): ret += struct.pack("<B", int("0x%s" % (content[i*2:i*2+2]), 16)) ret = ret + b'EVM is NB ++!' return ret def md5(self, filecontent): newmd5 = '' content = filecontent for i in range(3): md5 = hashlib.md5() #获取一个md5加密算法对象 md5.update(content) #指定需要加密的字符串 newmd5 = md5.hexdigest() #获取加密后的16进制字符串 content = self.sign(newmd5) ret = b"" for i in range(int(len(newmd5) / 2 )): ret += struct.pack("<B", int("0x%s" % (newmd5[i*2:i*2+2]), 16)) return ret def packFile(self, info, level=9): fname = info["name"] fpath = os.sep.join([self._appDir, fname]) fext = info["ext"] fileBytes = b"" if fext == "md5": fileBytes += struct.pack("<B", 1) else: fileBytes += struct.pack("<B", 2) _name = fname + "\0" fileBytes += struct.pack("<B", len(_name)) fileBytes += struct.pack("<%ds" % len(_name), fname.encode("utf-8")) with open(fpath, "rb") as fc: fileContentBytes = fc.read() fileBytes += struct.pack("<L", len(fileContentBytes)) if fext == "md5": fileCompressBytes = fileContentBytes else: fileCompressBytes = self.compress()(fileContentBytes, level) fileBytes += struct.pack("<L", len(fileCompressBytes)) fileBytes += fileCompressBytes return fileBytes def pack(self, level=9): for i in range(10): infos = self.epkInfo() # infos = self.epkInfo() # infos = self.epkInfo() epkFileBytes = b"" epkFileContentBytes = b"" file_count = len(infos["files"]) with open(self._epkName, "wb") as f: for info in infos["files"]: epkFileContentBytes += self.packFile(info) epkFileContentLength = len(epkFileContentBytes) epkFileBytes += self.header(md5_offset= 8 + epkFileContentLength, file_count=file_count) epkFileBytes += epkFileContentBytes epkmd5Bytes = self.md5(epkFileBytes) epkFileBytes += struct.pack("<H", len(epkmd5Bytes)) epkFileBytes += epkmd5Bytes crcBytes = zlib.crc32(epkFileBytes) epkFileBytes += struct.pack("<L", crcBytes) f.write(epkFileBytes) ret = { "epkfile": self._epkName, "epk_filecontent_size": epkFileContentLength, "md5_offset": 10 + epkFileContentLength, "file_count": file_count, "md5_length": len(epkmd5Bytes), "md5": epkmd5Bytes, "raw_crc": hex(crcBytes), "compress_level": level, "buff_length": len(epkFileBytes) } pprint.pprint(ret) return ret def main(path, appName, algorithm): epk = EpkApp(appName, path, algorithm) epk.pack() if __name__ == '__main__': main(sys.argv[1], sys.argv[2], sys.argv[3])