-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstats_updater.py
165 lines (136 loc) · 6.09 KB
/
stats_updater.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import subprocess
import logging
from pathlib import Path
import os
from datetime import datetime
import shutil
class StatsUpdater:
def __init__(self, update_interval: int = 300): # 300 seconds = 5 minutes
"""
Initialize the statistics updater.
Args:
update_interval: Time between updates in seconds
"""
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Ensure we have a handler
if not self.logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(handler)
self.scheduler = BackgroundScheduler()
self.update_interval = update_interval
# Define paths
self.base_path = Path('/app')
self.data_processor = self.base_path / 'data-processor.py'
self.analysis_script = self.base_path / 'data-analysis.py'
self.static_path = self.base_path / 'static' / 'images' / 'stats'
self.data_path = self.base_path / 'data'
# Create necessary directories
self.static_path.mkdir(parents=True, exist_ok=True)
self.data_path.mkdir(parents=True, exist_ok=True)
def run_data_processor(self):
"""Run the data processor script to prepare data for analysis."""
try:
self.logger.info("Running data processor...")
# Prepare paths for data processor arguments
results_path = self.data_path / 'results.csv'
config_path = self.base_path / 'config.json'
metadata_dir = self.base_path / 'static' / 'images'
output_path = self.data_path / 'processed_data.csv'
self.logger.info(f"Results path: {results_path}")
self.logger.info(f"Results file exists: {results_path.exists()}")
if not results_path.exists():
self.logger.error("Results file not found")
return False
# Build command
cmd = [
'python', str(self.data_processor),
'--results', str(results_path),
'--config', str(config_path),
'--metadata-dir', str(metadata_dir),
'--output', str(output_path)
]
# Run data processor
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
self.logger.info("Data processor completed successfully")
if result.stdout:
self.logger.info(f"Data processor output: {result.stdout}")
return True
except subprocess.CalledProcessError as e:
self.logger.error(f"Error running data processor: {e}")
self.logger.error(f"Data processor stderr: {e.stderr}")
return False
except Exception as e:
self.logger.error(f"Unexpected error in data processor: {e}")
import traceback
self.logger.error(traceback.format_exc())
return False
def update_statistics(self):
"""Run the data processor and analysis scripts to update statistics."""
try:
self.logger.info("Starting statistics update...")
start_time = datetime.now()
# First run the data processor
if not self.run_data_processor():
self.logger.error("Data processing failed, skipping analysis")
return
processed_data_path = self.data_path / 'processed_data.csv'
self.logger.info(f"Checking processed data exists: {processed_data_path.exists()}")
# Set up environment for analysis script
env = os.environ.copy()
env['DATA_DIR'] = str(self.data_path)
# Then run the analysis script
cmd = [
'python',
str(self.analysis_script),
'--input', str(processed_data_path),
'--output-dir', str(self.static_path)
]
self.logger.info(f"Running analysis command: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
cwd=str(self.base_path),
env=env
)
if result.stdout:
self.logger.info(f"Analysis output: {result.stdout}")
duration = (datetime.now() - start_time).total_seconds()
self.logger.info(f"Statistics update completed in {duration:.2f} seconds")
except subprocess.CalledProcessError as e:
self.logger.error(f"Error running analysis script: {e}")
self.logger.error(f"Script stderr: {e.stderr}")
except Exception as e:
self.logger.error(f"Unexpected error during statistics update: {e}")
import traceback
self.logger.error(traceback.format_exc())
def start(self):
"""Start the periodic statistics updates."""
# Run once immediately
self.update_statistics()
# Schedule periodic updates
self.scheduler.add_job(
self.update_statistics,
trigger=IntervalTrigger(seconds=self.update_interval),
id='stats_update',
name='Update Statistics',
replace_existing=True
)
self.scheduler.start()
self.logger.info(f"Started statistics updater (interval: {self.update_interval} seconds)")
def stop(self):
"""Stop the periodic statistics updates."""
self.scheduler.shutdown()
self.logger.info("Stopped statistics updater")