-
Notifications
You must be signed in to change notification settings - Fork 0
/
models_library.py
132 lines (116 loc) · 4.32 KB
/
models_library.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from threading import Lock
import stats_mgr
class ModelsLibrary:
__instance = None
__threads_lock = Lock()
@staticmethod
def get_instance():
if ModelsLibrary.__instance is None:
ModelsLibrary()
return ModelsLibrary.__instance
def __init__(self):
ModelsLibrary.__threads_lock.acquire()
try:
if ModelsLibrary.__instance is not None:
raise Exception("This is a singleton class. Please use the get_instance() method.")
else:
ModelsLibrary.__instance = self
self._models = {}
self._anomaly_likelihood_detectors = {}
self._stats_mgr = stats_mgr.StatsMgr.get_instance(__file__)
finally:
ModelsLibrary.__threads_lock.release()
def get_model(self, model_name):
ModelsLibrary.__threads_lock.acquire()
res = None
try:
if model_name in self._models:
res = self._models[model_name]
finally:
ModelsLibrary.__threads_lock.release()
return res
def get_model_by_idx(self, model_idx):
ModelsLibrary.__threads_lock.acquire()
res = None
try:
if model_idx < len(self._models):
res = self._models.items()[model_idx]
finally:
ModelsLibrary.__threads_lock.release()
return res
def model_exists(self, model_name):
ModelsLibrary.__threads_lock.acquire()
try:
res = model_name in self._models
finally:
ModelsLibrary.__threads_lock.release()
return res
def get_models_count(self):
ModelsLibrary.__threads_lock.acquire()
try:
count = len(self._models)
finally:
ModelsLibrary.__threads_lock.release()
return count
def get_anomaly_calc(self, model_name):
ModelsLibrary.__threads_lock.acquire()
res = None
try:
if model_name in self._anomaly_likelihood_detectors:
res = self._anomaly_likelihood_detectors[model_name]
finally:
ModelsLibrary.__threads_lock.release()
return res
def get_anomaly_calc_by_idx(self, idx):
ModelsLibrary.__threads_lock.acquire()
res = None
try:
if idx < len(self._anomaly_likelihood_detectors):
res = self._anomaly_likelihood_detectors.items()[idx]
finally:
ModelsLibrary.__threads_lock.release()
return res
def anomaly_calc_exists(self, model_name):
ModelsLibrary.__threads_lock.acquire()
try:
res = model_name in self._anomaly_likelihood_detectors
finally:
ModelsLibrary.__threads_lock.release()
return res
def get_anomaly_calc_count(self):
ModelsLibrary.__threads_lock.acquire()
try:
count = len(self._anomaly_likelihood_detectors)
finally:
ModelsLibrary.__threads_lock.release()
return count
def add_anomaly_calc_for_metric(self, key, anomaly_calc):
ModelsLibrary.__threads_lock.acquire()
try:
self._anomaly_likelihood_detectors[key] = anomaly_calc
self._stats_mgr.set("anomaly_calculators_loaded", len(self._anomaly_likelihood_detectors))
finally:
ModelsLibrary.__threads_lock.release()
def add_model_for_metric(self, key, model):
ModelsLibrary.__threads_lock.acquire()
try:
self._models[key] = model
self._stats_mgr.set("models_loaded", len(self._models))
finally:
ModelsLibrary.__threads_lock.release()
def add_models_for_metric(self, key, model, anomaly_calc):
ModelsLibrary.__threads_lock.acquire()
try:
self._anomaly_likelihood_detectors[key] = anomaly_calc
self._models[key] = model
self._stats_mgr.set("anomaly_calculators_loaded", len(self._anomaly_likelihood_detectors))
self._stats_mgr.set("models_loaded", len(self._models))
finally:
ModelsLibrary.__threads_lock.release()
def get_metrics_loaded(self):
ModelsLibrary.__threads_lock.acquire()
try:
metrics_loaded = self._models.keys()
finally:
ModelsLibrary.__threads_lock.release()
return metrics_loaded