-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdcollector.py
76 lines (57 loc) · 2.07 KB
/
dcollector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from multiprocessing import Value, Pipe
import os
import signal
import sys
from collector.worker import Director, Collector
from lib.daemon import Daemon
from lib.logger import get_logger
from lib.load_conf import load_conf
class CollectorDaemon(Daemon):
def __init__(self, cfg):
self._cfg = cfg
Daemon.__init__(self, self._cfg['pid_path'])
self._logger = get_logger('snoopy-collector')
def stop(self, *args, **kargs):
self._is_running.value = False
def run(self):
try:
signal.signal(signal.SIGTERM, self.stop)
self._is_running = Value('b', True)
self._processes = []
self._collector_pipes = []
for proc_num in range(self._cfg['workers_num']):
p, c = Pipe()
self._collector_pipes.append(p)
self._processes.append(
Collector(
name='snoopy-collector-col',
pipe=c,
proc_num=proc_num,
rabbit_cfg=self._cfg['rabbit_cfg'],
rabbit_sc_cfg=self._cfg['rabbit_snoopy_charges_cfg'],
is_running=self._is_running
)
)
self._processes.append(
Director(
name='snoopy-collector-dir',
pipes=self._collector_pipes,
cfg=self._cfg,
is_running=self._is_running
)
)
[p.start() for p in self._processes]
self._logger.info('I\'m ready!')
while self._is_running.value:
try:
signal.pause()
except:
continue
[p.join() for p in self._processes]
self._logger.info('Finalised shutdown.')
except:
self._logger.exception('General failure')
if __name__ == '__main__':
conf = load_conf('%s/conf/collector.json' % (os.path.abspath(sys.path[0])))
d = CollectorDaemon(conf)
d.start()