Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
E
evm-store
Project overview
Project overview
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wanli
evm-store
Commits
daeea215
Commit
daeea215
authored
Jul 15, 2021
by
wanli
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
3922979c
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
462 additions
and
81 deletions
+462
-81
tools/build_out/views/__init__.20210714181414.py
tools/build_out/views/__init__.20210714181414.py
+0
-66
tools/build_out/views/__init__.py
tools/build_out/views/__init__.py
+18
-1
tools/build_out/views/file.py
tools/build_out/views/file.py
+172
-0
tools/build_out/webcreator/utils/__init__.py
tools/build_out/webcreator/utils/__init__.py
+136
-7
tools/resources/webcreator/utils/__init__.py
tools/resources/webcreator/utils/__init__.py
+136
-7
No files found.
tools/build_out/views/__init__.20210714181414.py
deleted
100644 → 0
View file @
3922979c
#!/usr/bin/env python
# -*- coding: utf_8 -*-
from
flask
import
Blueprint
from
flask_restful
import
Api
from
.
import
area
from
.
import
app
from
.
import
package
from
.
import
user
from
.
import
login
from
.
import
device
from
.
import
annex
from
.
import
file
from
.
import
monitorWatch
from
.
import
monitorSystem
from
.
import
monitorLvgl
from
.
import
monitorImage
from
.
import
monitorEvm
api_v1
=
Blueprint
(
'api_v1'
,
__name__
)
api
=
Api
(
api_v1
)
api
.
add_resource
(
area
.
AreaResource
,
'/area/<string:uuid>'
)
api
.
add_resource
(
area
.
AreaResourceList
,
'/area'
)
api
.
add_resource
(
app
.
AppResource
,
'/app/<string:uuid>'
)
api
.
add_resource
(
app
.
AppResourceList
,
'/app'
)
api
.
add_resource
(
package
.
PackageResource
,
'/package/<string:uuid>'
)
api
.
add_resource
(
package
.
PackageResourceList
,
'/package'
)
api
.
add_resource
(
user
.
UserResource
,
'/user/<string:uuid>'
)
api
.
add_resource
(
user
.
UserResourceList
,
'/user'
)
api
.
add_resource
(
login
.
LoginResource
,
'/login/<string:uuid>'
)
api
.
add_resource
(
login
.
LoginResourceList
,
'/login'
)
api
.
add_resource
(
device
.
DeviceResource
,
'/device/<string:uuid>'
)
api
.
add_resource
(
device
.
DeviceResourceList
,
'/device'
)
api
.
add_resource
(
annex
.
AnnexResource
,
'/annex/<string:uuid>'
)
api
.
add_resource
(
annex
.
AnnexResourceList
,
'/annex'
)
api
.
add_resource
(
file
.
FileInit
,
"/file-manager/initialize"
)
api
.
add_resource
(
file
.
FileContent
,
"/file-manager/content"
)
api
.
add_resource
(
file
.
FileDisk
,
"/file-manager/disk"
)
api
.
add_resource
(
file
.
FileTree
,
"/file-manager/tree"
)
api
.
add_resource
(
file
.
FileDownload
,
"/file-manager/download"
)
api
.
add_resource
(
file
.
FilePrview
,
"/file-manager/preview"
)
api
.
add_resource
(
monitorWatch
.
MonitorWatchResource
,
'/monitorWatch/<string:uuid>'
)
api
.
add_resource
(
monitorWatch
.
MonitorWatchResourceList
,
'/monitorWatch'
)
api
.
add_resource
(
monitorSystem
.
MonitorSystemResource
,
'/monitorSystem/<string:uuid>'
)
api
.
add_resource
(
monitorSystem
.
MonitorSystemResourceList
,
'/monitorSystem'
)
api
.
add_resource
(
monitorLvgl
.
MonitorLvglResource
,
'/monitorLvgl/<string:uuid>'
)
api
.
add_resource
(
monitorLvgl
.
MonitorLvglResourceList
,
'/monitorLvgl'
)
api
.
add_resource
(
monitorImage
.
MonitorImageResource
,
'/monitorImage/<string:uuid>'
)
api
.
add_resource
(
monitorImage
.
MonitorImageResourceList
,
'/monitorImage'
)
api
.
add_resource
(
monitorEvm
.
MonitorEvmResource
,
'/monitorEvm/<string:uuid>'
)
api
.
add_resource
(
monitorEvm
.
MonitorEvmResourceList
,
'/monitorEvm'
)
tools/build_out/views/__init__.py
View file @
daeea215
# -*- coding: utf-8 -*-
'''
Author: your name
Date: 2021-07-15 03:22:19
LastEditTime: 2021-07-15 09:12:48
LastEditors: your name
Description: In User Settings Edit
FilePath:
\
evm-store
\t
ools
\b
uild_out
\v
iews
\
__init__.py
'''
#!/usr/bin/env python
# -*- coding: utf_8 -*-
from
flask
import
Blueprint
from
flask
import
Blueprint
from
flask_restful
import
Api
from
flask_restful
import
Api
...
@@ -9,6 +18,7 @@ from . import user
...
@@ -9,6 +18,7 @@ from . import user
from
.
import
login
from
.
import
login
from
.
import
device
from
.
import
device
from
.
import
annex
from
.
import
annex
from
.
import
file
from
.
import
monitorWatch
from
.
import
monitorWatch
from
.
import
monitorSystem
from
.
import
monitorSystem
from
.
import
monitorLvgl
from
.
import
monitorLvgl
...
@@ -41,6 +51,13 @@ api.add_resource(device.DeviceResourceList, '/device')
...
@@ -41,6 +51,13 @@ api.add_resource(device.DeviceResourceList, '/device')
api
.
add_resource
(
annex
.
AnnexResource
,
'/annex/<string:uuid>'
)
api
.
add_resource
(
annex
.
AnnexResource
,
'/annex/<string:uuid>'
)
api
.
add_resource
(
annex
.
AnnexResourceList
,
'/annex'
)
api
.
add_resource
(
annex
.
AnnexResourceList
,
'/annex'
)
api
.
add_resource
(
file
.
FileInit
,
"/file-manager/initialize"
)
api
.
add_resource
(
file
.
FileContent
,
"/file-manager/content"
)
api
.
add_resource
(
file
.
FileDisk
,
"/file-manager/disk"
)
api
.
add_resource
(
file
.
FileTree
,
"/file-manager/tree"
)
api
.
add_resource
(
file
.
FileDownload
,
"/file-manager/download"
)
api
.
add_resource
(
file
.
FilePrview
,
"/file-manager/preview"
)
api
.
add_resource
(
monitorWatch
.
MonitorWatchResource
,
'/monitorWatch/<string:uuid>'
)
api
.
add_resource
(
monitorWatch
.
MonitorWatchResource
,
'/monitorWatch/<string:uuid>'
)
api
.
add_resource
(
monitorWatch
.
MonitorWatchResourceList
,
'/monitorWatch'
)
api
.
add_resource
(
monitorWatch
.
MonitorWatchResourceList
,
'/monitorWatch'
)
...
...
tools/build_out/views/file.py
0 → 100644
View file @
daeea215
'''
Author: your name
Date: 2021-07-09 12:39:40
LastEditTime: 2021-07-15 09:16:01
LastEditors: Please set LastEditors
Description: In User Settings Edit
FilePath:
\
evm-store
\t
ools
\b
uild_out
\v
iews
\f
ile.py
'''
from
flask
import
current_app
,
jsonify
,
request
from
flask_restful
import
Resource
from
flask_restful.reqparse
import
RequestParser
from
flask_jwt_extended
import
(
jwt_required
,
get_jwt_identity
)
from
application.signal_manager
import
signalManager
from
models.login
import
postLoginSchema
,
getListLoginSchema
,
getListLoginsSchema
,
getLoginSchema
from
webcreator.log
import
logger
from
webcreator.response
import
ResponseCode
,
response_result
class
FileInit
(
Resource
):
def
__init__
(
self
):
# 特殊参数,即不是从json获取参数的接口,可以将这个注释打开
self
.
parser
=
RequestParser
()
def
get
(
self
):
# 特殊参数,即不是从json获取参数的接口,可以将这个注释打开
# self.parser.add_argument("page", type=int, location="args", default=1)
# self.parser.add_argument("pageSize", type=int, location="args", default=15)
# args = self.parser.parse_args()
try
:
result
,
message
=
signalManager
.
actionGetFileInit
.
emit
()
json_dumps
=
getListLoginSchema
.
dump
(
result
)
if
result
:
return
response_result
(
ResponseCode
.
HTTP_SUCCESS
,
data
=
result
,
count
=
message
)
return
response_result
(
ResponseCode
.
HTTP_NO_DATA
)
except
Exception
as
e
:
current_app
.
logger
.
error
(
e
)
return
response_result
(
ResponseCode
.
DB_ERROR
)
class
FileContent
(
Resource
):
def
__init__
(
self
):
pass
# 特殊参数,即不是从json获取参数的接口,可以将这个注释打开
# self.parser = RequestParser()
@
jwt_required
(
locations
=
[
"headers"
])
def
get
(
self
,
uuid
):
# 特殊参数,即不是从json获取参数的接口,可以将这个注释打开
# self.parser.add_argument("page", type=int, location="args", default=1)
# self.parser.add_argument("pageSize", type=int, location="args", default=15)
# args = self.parser.parse_args()
try
:
json_payload
=
request
.
json
print
(
"========>"
,
uuid
,
json_payload
)
data
=
getLoginSchema
.
load
(
json_payload
)
result
=
signalManager
.
actionGetLogin
.
emit
(
uuid
,
data
)
if
result
[
0
]:
json_dumps
=
getLoginSchema
.
dump
(
result
[
1
])
return
response_result
(
ResponseCode
.
OK
,
data
=
json_dumps
)
return
response_result
(
ResponseCode
.
NO_DATA
)
except
Exception
as
e
:
current_app
.
logger
.
error
(
e
)
return
response_result
(
ResponseCode
.
DB_ERROR
)
class
FileDisk
(
Resource
):
def
__init__
(
self
):
pass
# 特殊参数,即不是从json获取参数的接口,可以将这个注释打开
# self.parser = RequestParser()
def
get
(
self
):
# 特殊参数,即不是从json获取参数的接口,可以将这个注释打开
# self.parser.add_argument("page", type=int, location="args", default=1)
# self.parser.add_argument("pageSize", type=int, location="args", default=15)
# args = self.parser.parse_args()
try
:
json_payload
=
request
.
json
logger
.
warn
(
json_payload
)
data
=
getListLoginSchema
.
load
(
json_payload
)
result
=
signalManager
.
actionGetListLogin
.
emit
(
data
)
json_dumps
=
getListLoginSchema
.
dump
(
result
)
if
result
[
0
]:
json_dumps
=
getListLoginsSchema
.
dump
(
result
[
1
])
logger
.
warn
(
json_dumps
)
return
response_result
(
ResponseCode
.
OK
,
data
=
json_dumps
,
count
=
result
[
2
])
return
response_result
(
ResponseCode
.
REQUEST_ERROR
)
except
Exception
as
e
:
current_app
.
logger
.
error
(
e
)
return
response_result
(
ResponseCode
.
DB_ERROR
)
class
FileTree
(
Resource
):
def
__init__
(
self
):
pass
# 特殊参数,即不是从json获取参数的接口,可以将这个注释打开
# self.parser = RequestParser()
@
jwt_required
(
locations
=
[
"headers"
])
def
get
(
self
,
uuid
):
# 特殊参数,即不是从json获取参数的接口,可以将这个注释打开
# self.parser.add_argument("page", type=int, location="args", default=1)
# self.parser.add_argument("pageSize", type=int, location="args", default=15)
# args = self.parser.parse_args()
try
:
json_payload
=
request
.
json
print
(
"========>"
,
uuid
,
json_payload
)
data
=
getLoginSchema
.
load
(
json_payload
)
result
=
signalManager
.
actionGetLogin
.
emit
(
uuid
,
data
)
if
result
[
0
]:
json_dumps
=
getLoginSchema
.
dump
(
result
[
1
])
return
response_result
(
ResponseCode
.
OK
,
data
=
json_dumps
)
return
response_result
(
ResponseCode
.
NO_DATA
)
except
Exception
as
e
:
current_app
.
logger
.
error
(
e
)
return
response_result
(
ResponseCode
.
DB_ERROR
)
class
FileDownload
(
Resource
):
def
__init__
(
self
):
pass
# 特殊参数,即不是从json获取参数的接口,可以将这个注释打开
# self.parser = RequestParser()
def
get
(
self
):
# 特殊参数,即不是从json获取参数的接口,可以将这个注释打开
# self.parser.add_argument("page", type=int, location="args", default=1)
# self.parser.add_argument("pageSize", type=int, location="args", default=15)
# args = self.parser.parse_args()
try
:
json_payload
=
request
.
json
logger
.
warn
(
json_payload
)
data
=
getListLoginSchema
.
load
(
json_payload
)
result
=
signalManager
.
actionGetListLogin
.
emit
(
data
)
json_dumps
=
getListLoginSchema
.
dump
(
result
)
if
result
[
0
]:
json_dumps
=
getListLoginsSchema
.
dump
(
result
[
1
])
logger
.
warn
(
json_dumps
)
return
response_result
(
ResponseCode
.
OK
,
data
=
json_dumps
,
count
=
result
[
2
])
return
response_result
(
ResponseCode
.
REQUEST_ERROR
)
except
Exception
as
e
:
current_app
.
logger
.
error
(
e
)
return
response_result
(
ResponseCode
.
DB_ERROR
)
class
FilePrview
(
Resource
):
def
__init__
(
self
):
pass
# 特殊参数,即不是从json获取参数的接口,可以将这个注释打开
# self.parser = RequestParser()
@
jwt_required
(
locations
=
[
"headers"
])
def
get
(
self
,
uuid
):
# 特殊参数,即不是从json获取参数的接口,可以将这个注释打开
# self.parser.add_argument("page", type=int, location="args", default=1)
# self.parser.add_argument("pageSize", type=int, location="args", default=15)
# args = self.parser.parse_args()
try
:
json_payload
=
request
.
json
print
(
"========>"
,
uuid
,
json_payload
)
data
=
getLoginSchema
.
load
(
json_payload
)
result
=
signalManager
.
actionGetLogin
.
emit
(
uuid
,
data
)
if
result
[
0
]:
json_dumps
=
getLoginSchema
.
dump
(
result
[
1
])
return
response_result
(
ResponseCode
.
OK
,
data
=
json_dumps
)
return
response_result
(
ResponseCode
.
NO_DATA
)
except
Exception
as
e
:
current_app
.
logger
.
error
(
e
)
return
response_result
(
ResponseCode
.
DB_ERROR
)
tools/build_out/webcreator/utils/__init__.py
View file @
daeea215
# -*- coding: utf-8 -*-
#!/usr/bin/env python
# -*- coding: utf_8 -*-
from
typing
import
(
from
typing
import
(
Any
,
Any
,
...
@@ -13,6 +14,19 @@ from typing import (
...
@@ -13,6 +14,19 @@ from typing import (
Sequence
,
Sequence
,
)
)
import
shutil
import
time
import
re
import
os
import
json
import
functools
import
hashlib
import
random
import
string
import
datetime
import
threading
import
decimal
class
Klass
:
class
Klass
:
def
__init__
(
self
):
def
__init__
(
self
):
pass
pass
...
@@ -23,18 +37,133 @@ def dict2obj(dictionary):
...
@@ -23,18 +37,133 @@ def dict2obj(dictionary):
klass
.
__dict__
.
update
(
dictionary
)
klass
.
__dict__
.
update
(
dictionary
)
return
klass
return
klass
class
ObjectDict
(
Dict
[
str
,
Any
]):
class
DecimalEncoder
(
json
.
JSONEncoder
):
"""Makes a dictionary behave like an object, with attribute-style access."""
def
default
(
self
,
o
):
if
isinstance
(
o
,
decimal
.
Decimal
):
return
float
(
o
)
super
(
DecimalEncoder
,
self
)
.
default
(
o
)
class
ObjectDict
(
dict
):
"""Makes a dictionary behave like an object, with attribute-style access.
"""
def
__getattr__
(
self
,
name
:
str
)
->
Any
:
def
__getattr__
(
self
,
name
)
:
try
:
try
:
return
self
[
name
]
return
self
[
name
]
except
KeyError
:
except
KeyError
:
raise
AttributeError
(
name
)
raise
AttributeError
(
name
)
def
__setattr__
(
self
,
name
:
str
,
value
:
Any
)
->
None
:
def
__setattr__
(
self
,
name
,
value
)
:
self
[
name
]
=
value
self
[
name
]
=
value
def
ThreadMaker
(
f
):
def
runner
(
*
args
,
**
argv
):
t
=
threading
.
Thread
(
target
=
f
,
args
=
args
,
kwargs
=
argv
)
t
.
start
()
return
t
return
runner
def
copytree
(
src
,
dst
,
symlinks
=
False
,
ignore
=
None
):
names
=
os
.
listdir
(
src
)
if
ignore
is
not
None
:
ignored_names
=
ignore
else
:
ignored_names
=
set
()
if
not
os
.
path
.
exists
(
dst
):
os
.
makedirs
(
dst
)
errors
=
[]
for
name
in
names
:
if
name
in
ignored_names
:
continue
if
re
.
match
(
r'.*?.pyc$'
,
name
):
continue
srcname
=
os
.
path
.
join
(
src
,
name
)
dstname
=
os
.
path
.
join
(
dst
,
name
)
try
:
if
symlinks
and
os
.
path
.
islink
(
srcname
):
linkto
=
os
.
readlink
(
srcname
)
os
.
symlink
(
linkto
,
dstname
)
elif
os
.
path
.
isdir
(
srcname
):
copytree
(
srcname
,
dstname
,
symlinks
,
ignore
)
else
:
shutil
.
copy2
(
srcname
,
dstname
)
# XXX What about devices, sockets etc.?
except
(
IOError
,
os
.
error
)
as
why
:
errors
.
append
((
srcname
,
dstname
,
str
(
why
)))
# catch the Error from the recursive copytree so that we can
# continue with other files
except
Exception
as
e
:
errors
.
extend
(
e
.
args
[
0
])
try
:
shutil
.
copystat
(
src
,
dst
)
except
WindowsError
:
# can't copy file access times on Windows
pass
except
OSError
as
why
:
errors
.
extend
((
src
,
dst
,
str
(
why
)))
if
errors
:
raise
Exception
(
errors
)
def
timing
(
f
):
@
functools
.
wraps
(
f
)
def
inner
(
*
args
,
**
kwargs
):
startTime
=
time
.
time
()
f
(
*
args
,
**
kwargs
)
print
(
"[function]:
%
s [finished in]:
%
fs"
%
(
f
.
__name__
,
time
.
time
()
-
startTime
))
return
inner
def
timeToSeconds
(
t
,
sep
=
":"
):
if
t
==
""
:
t
=
"0:0:0"
ts
=
[
int
(
i
)
for
i
in
t
.
split
(
sep
)]
return
ts
[
0
]
*
60
*
60
+
ts
[
1
]
*
60
+
ts
[
2
]
def
secondsToTime
(
seconds
,
sep
=
":"
):
h
=
seconds
/
3600
m
=
seconds
%
3600
/
60
s
=
(
seconds
-
h
*
3600
-
m
*
60
)
%
60
return
sep
.
join
([
str
(
i
)
for
i
in
[
h
,
m
,
s
]])
def
md5_salt
(
s
):
md5
=
hashlib
.
md5
(
"EhuqUkwV"
.
encode
(
"utf-8"
))
md5
.
update
(
s
.
encode
(
'utf-8'
))
return
md5
.
hexdigest
()
def
filter_dict
(
source
:
dict
,
rules_list
:
list
):
# 如果source中的词典数量过多,请使用itertools模块的ifilter。 它会返回一个迭代器,而不是立即用整个列表填充系统的内存
result
=
dict
()
# res = [d for d in source.keys() if d in rules_list]
# res = list(filter(lambda d: d in rules_list, source.keys()))
for
k
in
source
.
keys
():
if
k
in
rules_list
:
result
.
update
(
k
,
source
[
k
])
return
result
def
sql_filter
(
sql
):
return
re
.
sub
(
r"[\"\\/*\'=\-#;<>+
%
$()!@]"
,
""
,
sql
)
def
random_string
(
length
=
32
):
return
''
.
join
(
random
.
sample
(
string
.
ascii_letters
+
string
.
digits
,
length
))
def
get_days_before_datetime
(
dt
,
dayAgo
):
if
not
isinstance
(
dt
,
datetime
.
datetime
):
dt
=
datetime
.
datetime
.
strptime
(
dt
,
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
# 先获得时间数组格式的日期
dayAgo
=
(
dt
-
datetime
.
timedelta
(
days
=
dayAgo
))
# 转换为其他字符串格式
return
dayAgo
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
if
__name__
==
"__main__"
:
if
__name__
==
"__main__"
:
d
=
{
'a'
:
1
,
'b'
:
2
}
print
(
os
.
path
.
abspath
(
__file__
))
print
(
dict2obj
(
d
))
print
(
random_string
(
7
))
\ No newline at end of file
\ No newline at end of file
tools/resources/webcreator/utils/__init__.py
View file @
daeea215
# -*- coding: utf-8 -*-
#!/usr/bin/env python
# -*- coding: utf_8 -*-
from
typing
import
(
from
typing
import
(
Any
,
Any
,
...
@@ -13,6 +14,19 @@ from typing import (
...
@@ -13,6 +14,19 @@ from typing import (
Sequence
,
Sequence
,
)
)
import
shutil
import
time
import
re
import
os
import
json
import
functools
import
hashlib
import
random
import
string
import
datetime
import
threading
import
decimal
class
Klass
:
class
Klass
:
def
__init__
(
self
):
def
__init__
(
self
):
pass
pass
...
@@ -23,18 +37,133 @@ def dict2obj(dictionary):
...
@@ -23,18 +37,133 @@ def dict2obj(dictionary):
klass
.
__dict__
.
update
(
dictionary
)
klass
.
__dict__
.
update
(
dictionary
)
return
klass
return
klass
class
ObjectDict
(
Dict
[
str
,
Any
]):
class
DecimalEncoder
(
json
.
JSONEncoder
):
"""Makes a dictionary behave like an object, with attribute-style access."""
def
default
(
self
,
o
):
if
isinstance
(
o
,
decimal
.
Decimal
):
return
float
(
o
)
super
(
DecimalEncoder
,
self
)
.
default
(
o
)
class
ObjectDict
(
dict
):
"""Makes a dictionary behave like an object, with attribute-style access.
"""
def
__getattr__
(
self
,
name
:
str
)
->
Any
:
def
__getattr__
(
self
,
name
)
:
try
:
try
:
return
self
[
name
]
return
self
[
name
]
except
KeyError
:
except
KeyError
:
raise
AttributeError
(
name
)
raise
AttributeError
(
name
)
def
__setattr__
(
self
,
name
:
str
,
value
:
Any
)
->
None
:
def
__setattr__
(
self
,
name
,
value
)
:
self
[
name
]
=
value
self
[
name
]
=
value
def
ThreadMaker
(
f
):
def
runner
(
*
args
,
**
argv
):
t
=
threading
.
Thread
(
target
=
f
,
args
=
args
,
kwargs
=
argv
)
t
.
start
()
return
t
return
runner
def
copytree
(
src
,
dst
,
symlinks
=
False
,
ignore
=
None
):
names
=
os
.
listdir
(
src
)
if
ignore
is
not
None
:
ignored_names
=
ignore
else
:
ignored_names
=
set
()
if
not
os
.
path
.
exists
(
dst
):
os
.
makedirs
(
dst
)
errors
=
[]
for
name
in
names
:
if
name
in
ignored_names
:
continue
if
re
.
match
(
r'.*?.pyc$'
,
name
):
continue
srcname
=
os
.
path
.
join
(
src
,
name
)
dstname
=
os
.
path
.
join
(
dst
,
name
)
try
:
if
symlinks
and
os
.
path
.
islink
(
srcname
):
linkto
=
os
.
readlink
(
srcname
)
os
.
symlink
(
linkto
,
dstname
)
elif
os
.
path
.
isdir
(
srcname
):
copytree
(
srcname
,
dstname
,
symlinks
,
ignore
)
else
:
shutil
.
copy2
(
srcname
,
dstname
)
# XXX What about devices, sockets etc.?
except
(
IOError
,
os
.
error
)
as
why
:
errors
.
append
((
srcname
,
dstname
,
str
(
why
)))
# catch the Error from the recursive copytree so that we can
# continue with other files
except
Exception
as
e
:
errors
.
extend
(
e
.
args
[
0
])
try
:
shutil
.
copystat
(
src
,
dst
)
except
WindowsError
:
# can't copy file access times on Windows
pass
except
OSError
as
why
:
errors
.
extend
((
src
,
dst
,
str
(
why
)))
if
errors
:
raise
Exception
(
errors
)
def
timing
(
f
):
@
functools
.
wraps
(
f
)
def
inner
(
*
args
,
**
kwargs
):
startTime
=
time
.
time
()
f
(
*
args
,
**
kwargs
)
print
(
"[function]:
%
s [finished in]:
%
fs"
%
(
f
.
__name__
,
time
.
time
()
-
startTime
))
return
inner
def
timeToSeconds
(
t
,
sep
=
":"
):
if
t
==
""
:
t
=
"0:0:0"
ts
=
[
int
(
i
)
for
i
in
t
.
split
(
sep
)]
return
ts
[
0
]
*
60
*
60
+
ts
[
1
]
*
60
+
ts
[
2
]
def
secondsToTime
(
seconds
,
sep
=
":"
):
h
=
seconds
/
3600
m
=
seconds
%
3600
/
60
s
=
(
seconds
-
h
*
3600
-
m
*
60
)
%
60
return
sep
.
join
([
str
(
i
)
for
i
in
[
h
,
m
,
s
]])
def
md5_salt
(
s
):
md5
=
hashlib
.
md5
(
"EhuqUkwV"
.
encode
(
"utf-8"
))
md5
.
update
(
s
.
encode
(
'utf-8'
))
return
md5
.
hexdigest
()
def
filter_dict
(
source
:
dict
,
rules_list
:
list
):
# 如果source中的词典数量过多,请使用itertools模块的ifilter。 它会返回一个迭代器,而不是立即用整个列表填充系统的内存
result
=
dict
()
# res = [d for d in source.keys() if d in rules_list]
# res = list(filter(lambda d: d in rules_list, source.keys()))
for
k
in
source
.
keys
():
if
k
in
rules_list
:
result
.
update
(
k
,
source
[
k
])
return
result
def
sql_filter
(
sql
):
return
re
.
sub
(
r"[\"\\/*\'=\-#;<>+
%
$()!@]"
,
""
,
sql
)
def
random_string
(
length
=
32
):
return
''
.
join
(
random
.
sample
(
string
.
ascii_letters
+
string
.
digits
,
length
))
def
get_days_before_datetime
(
dt
,
dayAgo
):
if
not
isinstance
(
dt
,
datetime
.
datetime
):
dt
=
datetime
.
datetime
.
strptime
(
dt
,
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
# 先获得时间数组格式的日期
dayAgo
=
(
dt
-
datetime
.
timedelta
(
days
=
dayAgo
))
# 转换为其他字符串格式
return
dayAgo
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
if
__name__
==
"__main__"
:
if
__name__
==
"__main__"
:
d
=
{
'a'
:
1
,
'b'
:
2
}
print
(
os
.
path
.
abspath
(
__file__
))
print
(
dict2obj
(
d
))
print
(
random_string
(
7
))
\ No newline at end of file
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment