Skip to content

Commit

Permalink
Merge pull request #40 from Enraged-Dun-Cookie-Development-Team/feat-…
Browse files Browse the repository at this point in the history
…降低预测时候内存

⚡️ 尝试降低内存
  • Loading branch information
YoungHector authored Dec 1, 2024
2 parents 1231889 + df2988f commit 8c1b7ec
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 11 deletions.
4 changes: 2 additions & 2 deletions conf/auto_sche.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"daily_preprocess_time": {
"hour": 19,
"minute": 30,
"hour": 3,
"minute": 0,
"second": 0
},
"default_datasource_id": 1,
Expand Down
39 changes: 30 additions & 9 deletions src/_data_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ def daily_model_predict(self):
"""
每日更新模型全量预测结果
"""
messager.send_to_bot_shortcut('每日更新模型全量预测结果 开始内存:{}'.format(get_memory_usage()))
# 拆成24个小时的数据运行

# 加载模型
Expand Down Expand Up @@ -342,7 +343,9 @@ def limit_cpu(interval):

if cpu_usage > 20: # 如果CPU使用率超过40%
time.sleep(interval) # 暂停一小段时间
del cpu_usage
else:
del cpu_usage
break

start_time = time.time()
Expand All @@ -357,30 +360,39 @@ def limit_cpu(interval):
batch = X_list[i:i + batch_size]
batch_predictions = self.model.predict(batch)
if i % 100000 == 0:
gc.collect()
# gc.collect()
messager.send_to_bot_shortcut('预测中,批次{} 内存:{}'.format(i, get_memory_usage()))
limit_cpu(interval)

predictions.extend(batch_predictions)
if i == 0:
messager.send_to_bot_shortcut('预测结果第一批样例形状:')
messager.send_to_bot_shortcut(batch_predictions.shape)

del batch_predictions

del batch
del interval
del batch_size

stop_time = time.time()

messager.send_to_bot(
info_dict={'info': '{} '.format(datetime.datetime.now()) +
str({'模型预测消耗时间:': stop_time - start_time})})
del start_time
del stop_time

# predicted_result = self.model.predict_proba(X_list)[:, 1]

self._set_model_predicted_result_pool(X_list, predictions)
del predictions
del X_list
except Exception as e:
# 打印报错
messager.send_to_bot_shortcut('出现报错,详细信息为:')
messager.send_to_bot_shortcut(str(e))
del self._model_predicted_result_pool
gc.collect(2)
messager.send_to_bot_shortcut('最终内存:{}'.format(get_memory_usage()))

# 删除模型。
MODEL_DICT.model_dict.pop('decision_tree_model')
Expand Down Expand Up @@ -499,7 +511,7 @@ def _set_model_predicted_result_pool(self, X_list, predicted_result):

# X_list.to_csv('./tmp.csv', index=False)
# del X_list
gc.collect()
# gc.collect()

# snapshot2 = tracemalloc.take_snapshot()
# top_stats = snapshot2.compare_to(snapshot1, 'lineno') # statistics('lineno')
Expand All @@ -515,13 +527,13 @@ def _set_model_predicted_result_pool(self, X_list, predicted_result):
X_list.drop(columns=['year', 'month', 'day', 'hour', 'minute', 'second'], inplace=True)

# X_list = X_list[['datasource', 'datetime']]
gc.collect()
# gc.collect()
messager.send_to_bot_shortcut('完成时间戳转换')
messager.send_to_bot_shortcut('完成时间戳转换 内存:{}'.format(get_memory_usage()))
print(X_list.info(memory_usage='deep'))
X_list['predicted_y'] = np.array(predicted_result) > 0.99999
del predicted_result
gc.collect()
# gc.collect()

messager.send_to_bot_shortcut('将预测结果与特征完成拼接,完整形状为:')
messager.send_to_bot_shortcut(X_list.shape)
Expand All @@ -536,7 +548,7 @@ def _set_model_predicted_result_pool(self, X_list, predicted_result):
for i in range(1000):

if i % 10 == 0:
gc.collect()
# gc.collect()
messager.send_to_bot_shortcut('时间戳转换字符串批次{} 内存:{}'.format(i, get_memory_usage()))

start_index = i * batch_size
Expand All @@ -549,6 +561,9 @@ def _set_model_predicted_result_pool(self, X_list, predicted_result):
X_list.loc[start_index:end_index, 'datetime_str'] = X_list.loc[start_index:end_index,
'datetime'].dt.strftime('%Y-%m-%d %H:%M:%S')
X_list.loc[start_index:end_index, 'datetime'] = None
del start_index
del end_index
del batch_size
# # 使用.dt.strftime()将日期时间对象格式化为字符串
# X_list['datetime_str'] = X_list['datetime'].dt.strftime('%Y-%m-%d %H:%M:%S')

Expand All @@ -563,7 +578,7 @@ def _set_model_predicted_result_pool(self, X_list, predicted_result):
X_list = X_list[X_list['datasource'] < 33].reset_index(drop=True)
messager.send_to_bot_shortcut('完成datasource筛选')

gc.collect()
# gc.collect()

# debug
print('未来一天的预测结果')
Expand All @@ -576,7 +591,13 @@ def _set_model_predicted_result_pool(self, X_list, predicted_result):
# 旧:一次性存储一天所有数据
# self._model_predicted_result_pool = X_list
# 新:每次存储1小时的数据
self._model_predicted_result_pool.append(X_list)
# self._model_predicted_result_pool.append(X_list)
# TODO: 替换成redis写入
X_list.to_csv('./tmp.csv', index=False)
del X_list
gc.collect(2)

messager.send_to_bot_shortcut('启动时预测当天可能有饼的时间点数量 内存:{}'.format(get_memory_usage()))

def get_pending_datasources(self, end_time=None, time_window_seconds=None):

Expand Down
10 changes: 10 additions & 0 deletions src/auto_sche/model_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ def feature_combine(self):
cur_feature[:, 11] = MODEL_DICT['weekday_encoder'].transform([self._convert_date(*t[:3])])

X_list.append(cur_feature)
del cur_feature

del time_points
del time_points_nums
del datasource_num
del feature_num

# 组织成dataframe用于模型输入.
X_list = pd.DataFrame(np.concatenate(X_list))
Expand Down Expand Up @@ -96,6 +102,8 @@ def feature_of_time(self):
# 生成未来24小时的时间点,从凌晨4点开始,到凌晨4点结束(不包括)
end_time = scheduled_time + datetime.timedelta(hours=1)
time_points = self._generate_time_points(scheduled_time, end_time)
del scheduled_time
del end_time

return time_points

Expand All @@ -119,6 +127,7 @@ def _generate_time_points(self, start_time, end_time, interval=1):
current_time.second))

current_time += datetime.timedelta(seconds=interval)
del start_time, end_time, interval

return time_points

Expand All @@ -129,6 +138,7 @@ def _convert_date(year, month, day):
'''
date = datetime.date(int(year), int(month), int(day))
weekday = date.strftime("%A") # %A表示完整的星期名称(如Monday)
del date

return weekday

Expand Down
5 changes: 5 additions & 0 deletions src/auto_sche/post_processer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ def batch_predict(X, clf):
hour = (hour + 1) % 24

predict_time.append([hour, minute, second])
del hour, minute, second

df_time = pd.DataFrame(predict_time)
df_time.columns = ['pred_hour', 'pred_minute', 'pred_second']
print(df_time.shape)
print(X.shape)

del predict_time, X, raw_y
return pd.concat([X.reset_index(drop=True), df_time.reset_index(drop=True)], axis=1)


Expand Down Expand Up @@ -72,4 +75,6 @@ def delay_estimation(predict_time, actual_time, y_label=None):
total_seconds_pred = predict_time.apply(time_to_seconds, axis=1)
total_seconds_actual = actual_time.apply(time_to_seconds, axis=1)

del predict_time, actual_time, y_label

return (total_seconds_pred - total_seconds_actual + 86400) % 86400
1 change: 1 addition & 0 deletions src/schedular.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ def test_headtbeat():
# minute=AUTO_SCHE_CONFIG['DAILY_PREPROCESS_TIME']['MINUTE'])
# daily_scheduler.start()


# 蹲饼器健康情况监控
ioloop.PeriodicCallback(health_monitor.health_scan, 5000).start()

Expand Down

0 comments on commit 8c1b7ec

Please sign in to comment.