-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlib_file.py
169 lines (145 loc) · 4.91 KB
/
lib_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""
2021.03.27 检查文件路径是否存在,并创建路径
2019.09.10 save/load dictionary to/as json
2019.08.25 pickle save/load object.
集成了常用的文件操作函数
"""
import os
import shutil
import pickle
import json
import lxml.etree as ET
import pandas as pd
def check_and_creat_dir(file_url):
'''
判断文件是否存在,文件路径不存在则创建文件夹
:param file_url: 文件路径,包含文件名
:return:
'''
file_gang_list = file_url.split('/')
if len(file_gang_list)>1:
[fname,fename] = os.path.split(file_url)
print(fname,fename)
if not os.path.exists(fname):
os.makedirs(fname)
else:
return None
#还可以直接创建空文件
else:
os.makedirs(file_url)
########################## pickle ##########################################
def pickle_save(obj,file):
check_and_creat_dir(file)
with open(file,'wb') as f:
pickle.dump(obj,f)
def pickle_load(file):
with open(file,'rb') as f:
obj = pickle.load(f)
return obj
def read_csv2dict(f,key,value):
""" read CSV, 并指定两列作为键和值
"""
df = read_file2df(f)
return dict(zip(df[key], df[value]))
def read_file2df(f,encoding='utf-8'):
"""read CSV or EXCEL file
有时会由于编码的问题无法工作
"""
if f.endswith('.csv'):
return pd.read_csv(f,encoding=encoding)
elif f.endswith('.xls') or f.endswith('.xlsx'):
return pd.read_excel(f,encoding=encoding)
else:
print('Unsupported file format!')
return None
def read_XML2df(f,encoding='utf-8'):
""" 针对欧洲铁路车站XML格式编的代码
"""
tree = ET.parse(f)
stations = tree.findall('station')
temp = []
for s in stations:
temp.append([s.attrib['name'],s.attrib['lon'],s.attrib['lat'], s.attrib['country'], s.attrib['UTC']])
return pd.DataFrame(temp, columns=['name','lon','lat','country','time_zone'])
def get_LocIndex(df, key='name', lon='lon', lat='lat'):
""" 创建一个 name:[lon,lat] 的字典
"""
dict_NameLocation = {} # name:location
temp_locs = df.set_index(key).T.to_dict()
for name in temp_locs:
if name in dict_NameLocation:
print('Warning. Key is not unique!')
dict_NameLocation[name] = (temp_locs[name][lon],temp_locs[name][lat])
return dict_NameLocation
##############################################################################
def move_file(in_file,out_file):
"""
剪切文件
in_file:输入文件名(完整路径的)
out_file:输出文件名(完整路径的)
"""
s=out_file.split(os.sep)
out_path = ""
for i in s[:-1]:
out_path+=i+os.sep
if not os.path.exists(out_path):
os.makedirs(out_path)
shutil.move(in_file,out_file)
def copy_file(in_file,out_file):
"""
复制文件
in_file:输入文件名(完整路径的)
out_file:输出文件名(完整路径的)
"""
s=out_file.split(os.sep)
# 生成路径
out_path = ""
for i in s[:-1]:
out_path+=i+os.sep
if not os.path.exists(out_path):
os.makedirs(out_path)
# 复制文件
shutil.copy(in_file,out_file)
def list_all_files(rootdir, type=None):
"""
列出该目录下的所有文件
以列表形式输出所有文件
"""
_files = []
lst = os.listdir(rootdir) #列出文件夹下所有的目录与文件
for i in range(0,len(lst)):
path = os.path.join(rootdir,lst[i])
if os.path.isdir(path):
_files.extend(list_all_files(path))
if os.path.isfile(path):
_files.append(path)
if type:
_files = [f for f in _files if f.endswith(type)]
return _files
def find_corresponding(f,files):
"""
f: 输入的文件名(可以只是文件名,也可以是包含完整路径的文件名)
files: 待查找的文件列表
从文件列表中寻找文件f,输出找到的文件路径
"""
short_name = f.split(os.sep)[-1]
for f_new in files:
if short_name in f_new.split(os.sep):
return f_new
print('No corresponding file:',f)
return None
######################### json ################################################
def json_save(test_dict, file,encoding='utf-8'):
json_str = json.dumps(test_dict, indent=4,ensure_ascii =False)
with open(file, 'w',encoding=encoding) as json_file:
json_file.write(json_str)
def json_load(file,encoding='utf-8'):
with open(file, 'r',encoding=encoding) as f:
#print("Load str file from {}".format(file))
str1 = f.read()
r = json.loads(str1)
return r
########################### DataFrame与字典 ##########################################
def creat_dict_from_CSV(f,key,value):
df = pd.read_csv(f)
return dict(zip(df[key], df[value]))