#!/usr/bin/env python # -*- coding: utf_8 -*- import shutil import time import re import os import json import functools import hashlib import random import string import datetime import threading import decimal import uuid class DecimalEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, decimal.Decimal): return float(o) super(DecimalEncoder, self).default(o) class ObjectDict(dict): """Makes a dictionary behave like an object, with attribute-style access. """ def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError(name) def __setattr__(self, name, value): self[name] = value def ThreadMaker(f): def runner(*args, **argv): t = threading.Thread(target=f, args=args, kwargs=argv) t.start() return t return runner def copytree(src, dst, symlinks=False, ignore=None): names = os.listdir(src) if ignore is not None: ignored_names = ignore else: ignored_names = set() if not os.path.exists(dst): os.makedirs(dst) errors = [] for name in names: if name in ignored_names: continue if re.match(r'.*?.pyc$', name): continue srcname = os.path.join(src, name) dstname = os.path.join(dst, name) try: if symlinks and os.path.islink(srcname): linkto = os.readlink(srcname) os.symlink(linkto, dstname) elif os.path.isdir(srcname): copytree(srcname, dstname, symlinks, ignore) else: shutil.copy2(srcname, dstname) # XXX What about devices, sockets etc.? except (IOError, os.error) as why: errors.append((srcname, dstname, str(why))) # catch the Error from the recursive copytree so that we can # continue with other files except Exception as e: errors.extend(e.args[0]) try: shutil.copystat(src, dst) except WindowsError: # can't copy file access times on Windows pass except OSError as why: errors.extend((src, dst, str(why))) if errors: raise Exception(errors) def timing(f): @functools.wraps(f) def inner(*args, **kwargs): startTime = time.time() f(*args, **kwargs) print("[function]: %s [finished in]: %fs" % (f.__name__, time.time()-startTime)) return inner def timeToSeconds(t, sep=":"): if t == "": t = "0:0:0" ts = [int(i) for i in t.split(sep)] return ts[0] * 60 * 60 + ts[1] * 60 + ts[2] def secondsToTime(seconds, sep=":"): h = seconds / 3600 m = seconds % 3600 / 60 s = (seconds - h * 3600 - m * 60) % 60 return sep.join([str(i) for i in [h, m, s]]) def writeFile(fpath, result): with open(fpath, 'w') as f: f.write(result) def md5_salt(s): md5 = hashlib.md5("EhuqUkwV".encode("utf-8")) md5.update(s.encode('utf-8')) return md5.hexdigest() def filter_dict(source: dict, rules_list: list): # 如果source中的词典数量过多,请使用itertools模块的ifilter。 它会返回一个迭代器,而不是立即用整个列表填充系统的内存 result = dict() # res = [d for d in source.keys() if d in rules_list] # res = list(filter(lambda d: d in rules_list, source.keys())) for k in source.keys(): if k in rules_list: result.update(k, source[k]) return result def sql_filter(sql): return re.sub(r"[\"\\/*\'=\-#;<>+%$()!@]", "", sql) def random_string(length=32): return ''.join(random.sample(string.ascii_letters + string.digits, length)) def get_days_before_datetime(dt, dayAgo): if not isinstance(dt, datetime.datetime): dt = datetime.datetime.strptime(dt, "%Y-%m-%d %H:%M:%S") # 先获得时间数组格式的日期 dayAgo = (dt - datetime.timedelta(days = dayAgo)) # 转换为其他字符串格式 return dayAgo.strftime("%Y-%m-%d %H:%M:%S") if __name__ == "__main__": print(os.path.abspath(__file__)) print(random_string(7))