-
Notifications
You must be signed in to change notification settings - Fork 0
/
clean_logs.py
66 lines (54 loc) · 2.11 KB
/
clean_logs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import os
import datetime
import time
class DoFile():
# 获取某个磁盘路径里所有文件
def getFiles(self, strDir, isLoop, overDay):
files = []
if len(strDir) <= 0 or not os.path.exists(strDir):
return files
dirs = os.listdir(strDir)
for dir in dirs:
path = os.path.join(strDir, dir)
if (os.path.isfile(path) and path.find(".log") >= 0): # 是.log文件
if (self.compareFileTime(path, -overDay)):
files.append(path)
elif (os.path.isdir(path) and isLoop): # 是磁盘
files.extend(self.getFiles(path, isLoop, overDay))
else:
continue
return files
# 综合处理磁盘文件
def doFiles(self, clearDirs, isLoop=False, overDay=3):
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ":执行中...")
for dir in clearDirs:
files = self.getFiles(dir, isLoop, overDay)
print("{}查询出{}个文件".format(dir, len(files)))
self.clearFiles(files)
print("执行完毕...")
# 清除文本文件/目录
def clearFiles(self, files):
for file in files:
f = file[:file.find(".log") - 2]
if f.endswith("00:00"):
strcmd = "rm -rf {}".format(f)
self.exec_cmd(strcmd)
print(f)
# 执行脚本命令
def exec_cmd(self, strcmd):
os.system(strcmd)
# 获取文件创建时间
def getCreateFileTime(self, path):
return os.path.getctime(path)
# 时间戳转datetime
def TimeStampToTime(self, timestamp):
return datetime.datetime.utcfromtimestamp(timestamp)
# 比较当前时间与文件创建时间差值(天)
def compareFileTime(self, path, overDay):
comparTime = self.TimeStampToTime(self.getCreateFileTime(path))
now = datetime.datetime.utcnow() + datetime.timedelta(days=overDay)
return now > comparTime
if __name__ == '__main__':
clearDirs = ["/data/logs/airflow"]
doFile = DoFile()
doFile.doFiles(clearDirs, True, 45)