#!/usr/bin/env python # -*- coding: utf_8 -*- import shutil import time import re import os import json import functools import hashlib import random import string import datetime import threading import decimal import uuid from app.setting import config class DecimalEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, decimal.Decimal): return float(o) super(DecimalEncoder, self).default(o) class ObjectDict(dict): """Makes a dictionary behave like an object, with attribute-style access. """ def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError(name) def __setattr__(self, name, value): self[name] = value def ThreadMaker(f): def runner(*args, **argv): t = threading.Thread(target=f, args=args, kwargs=argv) t.start() return t return runner def copytree(src, dst, symlinks=False, ignore=None): names = os.listdir(src) if ignore is not None: ignored_names = ignore else: ignored_names = set() if not os.path.exists(dst): os.makedirs(dst) errors = [] for name in names: if name in ignored_names: continue if re.match(r'.*?.pyc$', name): continue srcname = os.path.join(src, name) dstname = os.path.join(dst, name) try: if symlinks and os.path.islink(srcname): linkto = os.readlink(srcname) os.symlink(linkto, dstname) elif os.path.isdir(srcname): copytree(srcname, dstname, symlinks, ignore) else: shutil.copy2(srcname, dstname) # XXX What about devices, sockets etc.? except (IOError, os.error) as why: errors.append((srcname, dstname, str(why))) # catch the Error from the recursive copytree so that we can # continue with other files except Exception as e: errors.extend(e.args[0]) try: shutil.copystat(src, dst) except WindowsError: # can't copy file access times on Windows pass except OSError as why: errors.extend((src, dst, str(why))) if errors: raise Exception(errors) def timing(f): @functools.wraps(f) def inner(*args, **kwargs): startTime = time.time() f(*args, **kwargs) print("[function]: %s [finished in]: %fs" % (f.__name__, time.time()-startTime)) return inner def timeToSeconds(t, sep=":"): if t == "": t = "0:0:0" ts = [int(i) for i in t.split(sep)] return ts[0] * 60 * 60 + ts[1] * 60 + ts[2] def secondsToTime(seconds, sep=":"): h = seconds / 3600 m = seconds % 3600 / 60 s = (seconds - h * 3600 - m * 60) % 60 return sep.join([str(i) for i in [h, m, s]]) def writeFile(fpath, result): with open(fpath, 'w') as f: f.write(result) def md5_salt(s): md5 = hashlib.md5(config['MD5_SALT'].encode("utf-8")) md5.update(s.encode('utf-8')) return md5.hexdigest() def filter_dict(source: dict, rules_list: list): # 如果source中的词典数量过多,请使用itertools模块的ifilter。 它会返回一个迭代器,而不是立即用整个列表填充系统的内存 result = dict() # res = [d for d in source.keys() if d in rules_list] # res = list(filter(lambda d: d in rules_list, source.keys())) for k in source.keys(): if k in rules_list: result.update(k, source[k]) return result def sql_filter(sql): return re.sub(r"[\"\\/*\'=\-#;<>+%$()!@]", "", sql) def random_string(length=32): return ''.join(random.sample(string.ascii_letters + string.digits, length)) def get_days_before_datetime(dt, dayAgo): if not isinstance(dt, datetime.datetime): dt = datetime.datetime.strptime(dt, "%Y-%m-%d %H:%M:%S") # 先获得时间数组格式的日期 dayAgo = (dt - datetime.timedelta(days = dayAgo)) # 转换为其他字符串格式 return dayAgo.strftime("%Y-%m-%d %H:%M:%S") if __name__ == "__main__": print(random_string(7))