__init__.py 5.03 KB
Newer Older
wanli's avatar
wanli committed
1 2
#!/usr/bin/env python
# -*- coding: utf_8 -*-
wanli's avatar
wanli committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16

from typing import (
    Any,
    Optional,
    Dict,
    Mapping,
    List,
    Tuple,
    Match,
    Callable,
    Type,
    Sequence,
)

wanli's avatar
wanli committed
17 18 19 20 21 22 23 24 25 26 27 28
import shutil
import time
import re
import os
import json
import functools
import hashlib
import random
import string
import datetime
import threading
import decimal
29 30 31
import urllib
from urllib import parse, request
from urllib.parse import urlparse, urljoin, urlencode
wanli's avatar
wanli committed
32

wanli's avatar
wanli committed
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            return float(o)
        super(DecimalEncoder, self).default(o)

def ThreadMaker(f):
    def runner(*args, **argv):
        t = threading.Thread(target=f, args=args, kwargs=argv)
        t.start()
        return t
    return runner

def copytree(src, dst, symlinks=False, ignore=None):
    names = os.listdir(src)
    if ignore is not None:
        ignored_names = ignore
    else:
        ignored_names = set()

    if not os.path.exists(dst):
        os.makedirs(dst)

    errors = []
    for name in names:
        if name in ignored_names:
            continue

        if re.match(r'.*?.pyc$', name):
            continue

        srcname = os.path.join(src, name)
        dstname = os.path.join(dst, name)
        try:
            if symlinks and os.path.islink(srcname):
                linkto = os.readlink(srcname)
                os.symlink(linkto, dstname)
            elif os.path.isdir(srcname):
                copytree(srcname, dstname, symlinks, ignore)
            else:
                shutil.copy2(srcname, dstname)
            # XXX What about devices, sockets etc.?
        except (IOError, os.error) as why:
            errors.append((srcname, dstname, str(why)))
        # catch the Error from the recursive copytree so that we can
        # continue with other files
        except Exception as e:
            errors.extend(e.args[0])
    try:
        shutil.copystat(src, dst)
    except WindowsError:
        # can't copy file access times on Windows
        pass
    except OSError as why:
        errors.extend((src, dst, str(why)))
    if errors:
        raise Exception(errors)

def timing(f):
    @functools.wraps(f)
    def inner(*args, **kwargs):
        startTime = time.time()
        f(*args, **kwargs)
        print("[function]: %s  [finished in]: %fs" %
              (f.__name__, time.time()-startTime))
    return inner


def timeToSeconds(t, sep=":"):
    if t == "":
        t = "0:0:0"
    ts = [int(i) for i in t.split(sep)]
    return ts[0] * 60 * 60 + ts[1] * 60 + ts[2]


def secondsToTime(seconds, sep=":"):
    h = seconds / 3600
    m = seconds % 3600 / 60
    s = (seconds - h * 3600 - m * 60) % 60

    return sep.join([str(i) for i in [h, m, s]])

115
def md5_encryption(s):
wanli's avatar
wanli committed
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
    md5 = hashlib.md5("EhuqUkwV".encode("utf-8"))
    md5.update(s.encode('utf-8'))
    return md5.hexdigest()

def filter_dict(source: dict, rules_list: list):
    # 如果source中的词典数量过多,请使用itertools模块的ifilter。 它会返回一个迭代器,而不是立即用整个列表填充系统的内存
    result = dict()
    # res = [d for d in source.keys() if d in rules_list]
    # res = list(filter(lambda d: d in rules_list, source.keys()))
    for k in source.keys():
        if k in rules_list:
            result.update(k, source[k])

    return result

131 132 133 134 135 136 137 138 139 140 141 142 143 144
def get_location_by_ip(ip):
    params = { 'ak': 'aZEAgYG8wKuLd6DS9BmCloGtfnGGkRMn', 'coor': 'bd09ll' }
    parameters = urllib.parse.urlencode(params)
    headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko' }

    if ip != '127.0.0.1':
        params.update({ 'ip': ip })
        parameters = urllib.parse.urlencode(params)

    url = 'http://api.map.baidu.com/location/ip'
    req = request.Request(url='%s%s%s' % (url, '?', parameters), headers=headers)
    ret = request.urlopen(req).read()
    jsonData = json.loads(ret)
    return jsonData
wanli's avatar
wanli committed
145 146 147 148 149 150 151 152 153 154

def sql_filter(sql):
    return re.sub(r"[\"\\/*\'=\-#;<>+%$()!@]", "", sql)

def random_string(length=32):
    return ''.join(random.sample(string.ascii_letters + string.digits, length))

def get_days_before_datetime(dt, dayAgo):
    if not isinstance(dt, datetime.datetime):
        dt = datetime.datetime.strptime(dt, "%Y-%m-%d %H:%M:%S")
155

wanli's avatar
wanli committed
156 157 158 159 160
    # 先获得时间数组格式的日期
    dayAgo = (dt - datetime.timedelta(days = dayAgo))
    # 转换为其他字符串格式
    return dayAgo.strftime("%Y-%m-%d %H:%M:%S")

161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
class Klass:
    def __init__(self):
        pass

klass = Klass()

def dict2obj(dictionary):
    klass.__dict__.update(dictionary)
    return klass

class ObjectDict(Dict[str, Any]):
    """Makes a dictionary behave like an object, with attribute-style access."""

    def __getattr__(self, name: str) -> Any:
        try:
            return self[name]
        except KeyError:
            raise AttributeError(name)

    def __setattr__(self, name: str, value: Any) -> None:
        self[name] = value

wanli's avatar
wanli committed
183
if __name__ == "__main__":
184 185 186
    d = { 'a': 1, 'b': 2 }
    print(dict2obj(d))

wanli's avatar
wanli committed
187
    print(os.path.abspath(__file__))
188
    print(random_string(7))