-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflusher.py
45 lines (33 loc) · 878 Bytes
/
flusher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import time
from os import (
environ,
)
import redis
from src.ss_instrumentation import (
RedisMetricStorage,
SSInstrumentation,
)
def flush(event, context):
storage = RedisMetricStorage(
redis.Redis(
host=environ["REDIS_HOST"],
port=int(environ.get("REDIS_PORT", "6379")),
db=int(environ.get("REDIS_DB", "0")),
)
)
config = {
"AWS_METRIC_NAMESPACE": environ["AWS_METRIC_NAMESPACE"],
"AWS_LOGGING_REGION": environ["AWS_LOGGING_REGION"],
}
instr = SSInstrumentation(config, storage)
print("About to flush meters")
tic = time.time()
instr.flush_meters()
toc = time.time()
print("Flushed in %f seconds" % (toc - tic))
return {
"statusCode": 200,
"body": '{"STATUS": "OK"}',
}
if __name__ == "__main__":
flush(None, None)