epk_2.0.py 7.12 KB
#-*- coding: UTF-8 -*- 
#!/usr/bin/python

import os
import sys
import fs
import struct
import json
from collections import OrderedDict
import zlib
import pprint
import hashlib

from ctypes import *
import platform

current_abspath = os.path.dirname(os.path.realpath(__file__))

if platform.system() == 'Windows':
    pDll = CDLL(os.sep.join([current_abspath, "lib", "eheatshrink.dll"]))
elif platform.system() == 'Linux':
    pDll = CDLL(os.sep.join([current_abspath, "lib", "libeheatshrink.so"]))

pDll.ecompress_size.restype = c_uint32
pDll.ecompress_size.argtypes = [c_void_p, c_uint32]

pDll.ecompress.restype = POINTER(c_uint8)
pDll.ecompress.argtypes = [c_void_p, c_uint32]

def heatshrink_compress(buf:bytes, level:int):
    count = len(buf)
    size = pDll.ecompress_size(buf, count)
    pDll.ecompress.restype = POINTER(c_uint8)
    pDll.ecompress.argtypes = [c_void_p, c_uint32]
    ret = pDll.ecompress(buf, count)
    arr = bytearray(size)
    i = 0
    while i < size:
        arr[i] = ret[i]
        i = i+1
    return arr

def str_to_hex(s):
    return ' '.join([hex(ord(c)).replace('0x', '') for c in s])

def hex_to_str(s):
    return ''.join([chr(i) for i in [int(b, 16) for b in s.split(' ')]])

def str_to_bin(s):
    return ' '.join([bin(ord(c)).replace('0b', '') for c in s])

def bin_to_str(s):
    return ''.join([chr(i) for i in [int(b, 2) for b in s.split(' ')]])


class EpkApp(object):

    def __init__(self, appName, appDir, algorithm='zlib',appVersion="1.0", output="epks"):
        super(EpkApp, self).__init__()
        
        self._appName = appName
        self._appDir = os.path.abspath(appDir)
        self.algorithm = algorithm
        self._appVersion = appVersion
        self._appCRCCode = None
        self._files = []
        self._infoPath = os.sep.join([self._appDir, "%s.json" % self._appName])
        self._epksDir = output
        if not os.path.exists(self._epksDir):
            fs.open_fs(os.getcwd()).makedirs(output)
        self._epkName = os.sep.join([self._epksDir, "%s.epk" % self._appName])

    def compress(self):
        if self.algorithm == 'h':
            return heatshrink_compress
        return zlib.compress

    def epkInfo(self):
        epkInfo = OrderedDict({
            "appName": self._appName,
            "appVersion": self._appVersion,
            "files": self.fileinfos(self._appDir),
        })

        infocontent = json.dumps(epkInfo)
        with open(self._infoPath, "w", encoding='utf-8') as f:
            f.write(infocontent)

        return epkInfo
    
    def fileinfos(self, path):
        path = os.path.abspath(path)
        home_fs = fs.open_fs(path)
        files = []
        for jspath in home_fs.glob('*', namespaces=['details']):
            fpath = "C:/%s" % jspath.info.name
            fname = jspath.info.name
            fsize = jspath.info.size
            fbasename, fext = os.path.splitext(jspath.info.name)

            if fext in ["", ".exe", ".dll", ".nv", ".conf"]:
                continue

            finfo = {
                "path": fpath,
                "name": fname,
                "size": fsize,
                "basename": fbasename,
                "ext": fext
            }
            
            if self._infoPath == os.sep.join([path, fname]):
                files.insert(0, finfo)
            else:
                files.append(finfo)
            
            if fext == ".evue":
                self.fileMD5(finfo)

        return files
    
    def header(self, epk_start=0xAAAA, md5_offset=0, file_count=0):
        bytes_header = struct.pack("<HLH", epk_start, md5_offset, file_count)
        return bytes_header
    
    def fileMD5(self, info):
        md5path = os.sep.join([self._appDir, "%s.md5" % info["basename"]])
        fpath = os.sep.join([self._appDir, info["name"]])
        with open(fpath, "rb") as f:
            filecontent = f.read()

        newmd5 = self.md5(filecontent)
        with open(md5path, "wb") as f:
            f.write(newmd5)

        return newmd5
    
    def sign(self, content):
        ret = b""
        for i in range(int(len(content) / 2 )):
            ret += struct.pack("<B", int("0x%s" % (content[i*2:i*2+2]), 16))
        ret = ret + b'EVM is NB ++!'
        return ret

    def md5(self, filecontent):
        newmd5 = ''
        content = filecontent
        for i in range(3):
            md5 = hashlib.md5()   #获取一个md5加密算法对象
            md5.update(content)  #指定需要加密的字符串
            newmd5 = md5.hexdigest()  #获取加密后的16进制字符串
            content = self.sign(newmd5)

        ret = b""
        for i in range(int(len(newmd5) / 2 )):
            ret += struct.pack("<B", int("0x%s" % (newmd5[i*2:i*2+2]), 16))
            
        return ret

    def packFile(self, info, level=9):
        fname = info["name"]
        fpath = os.sep.join([self._appDir, fname])
        fext = info["ext"]

        fileBytes = b""
        if fext == "md5":
            fileBytes += struct.pack("<B", 1)
        else:
            fileBytes += struct.pack("<B", 2)

        _name = fname + "\0"

        fileBytes += struct.pack("<B", len(_name))
        fileBytes += struct.pack("<%ds" % len(_name), fname.encode("utf-8"))
        with open(fpath, "rb") as fc:
            fileContentBytes = fc.read()
            fileBytes += struct.pack("<L", len(fileContentBytes))
        
        if fext == "md5":
            fileCompressBytes = fileContentBytes
        else:
            fileCompressBytes = self.compress()(fileContentBytes, level)

        fileBytes += struct.pack("<L", len(fileCompressBytes))
        fileBytes += fileCompressBytes
        return fileBytes

    def pack(self, level=9):
        for i in range(10):
            infos = self.epkInfo()
        # infos = self.epkInfo()
        # infos = self.epkInfo()
        epkFileBytes = b""

        epkFileContentBytes = b""
        file_count = len(infos["files"])
        with open(self._epkName, "wb") as f:
            for info in infos["files"]:
                epkFileContentBytes += self.packFile(info)

            epkFileContentLength = len(epkFileContentBytes)

            epkFileBytes += self.header(md5_offset= 8 + epkFileContentLength, file_count=file_count)
            epkFileBytes += epkFileContentBytes
            epkmd5Bytes = self.md5(epkFileBytes)
            epkFileBytes += struct.pack("<H", len(epkmd5Bytes))
            epkFileBytes += epkmd5Bytes
            crcBytes = zlib.crc32(epkFileBytes)
            epkFileBytes += struct.pack("<L", crcBytes)
            f.write(epkFileBytes)

            ret = {
                "epkfile": self._epkName,
                "epk_filecontent_size": epkFileContentLength,
                "md5_offset": 10 + epkFileContentLength,
                "file_count": file_count,
                "md5_length": len(epkmd5Bytes),
                "md5": epkmd5Bytes,
                "raw_crc": hex(crcBytes),
                "compress_level": level,
                "buff_length": len(epkFileBytes)
            }

            pprint.pprint(ret)
            return ret


def main(path, appName, algorithm):
    epk = EpkApp(appName, path, algorithm)
    epk.pack()


if __name__ == '__main__':
    main(sys.argv[1], sys.argv[2], sys.argv[3])