-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprometheus_exporter.py
127 lines (97 loc) · 2.69 KB
/
prometheus_exporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from typing import Dict, Iterator
import time
import os
import prefect
import dateutil.parser
from flask import Flask, Response
from prometheus_client import generate_latest, REGISTRY, start_http_server
from prometheus_client.core import GaugeMetricFamily
from prometheus_client.samples import Sample
PREFECT_API_KEY=os.environ['PREFECT_API_KEY']
client = prefect.Client(api_key=PREFECT_API_KEY)
def _add_gauge_metric(metric, labels, value):
metric.samples.append(Sample(
metric.name, labels,
value,
None
))
def fetch_all(client: prefect.Client, query: str) -> Iterator[Dict]:
offset = 0
while True:
res = client.graphql(query, variables={'offset': offset})
yield res
count = res['data']['res']['aggregate']['count']
if count < 100:
break
offset += 100
def to_metrics(res: Dict):
for i in res['data']['res']['nodes']:
yield (
{
'state': i['state'],
'project': i['flow']['project']['name'],
'flow': i['flow']['name'],
**{f'param__{k}': str(v) for (k,v) in i['parameters'].items()}
},
dateutil.parser.parse(i['state_timestamp']).timestamp()
)
class PrefectCollector:
def describe(self):
return []
def collect(self):
flow_run_last_state = GaugeMetricFamily(
'prefect_flow_run_last_state_ts',
'Metric that captures timestamp of last Successful and Failed runs of each flow with each unique parameters set',
labels=['state', 'project', 'flow']
)
for res in fetch_all(client, """
query($offset: Int) {
res: flow_run_aggregate(
distinct_on: [flow_id, state, parameters]
where: {
state: {_in: ["Success", "Failed"]}
flow: {
archived: {_eq: false}
}
}
offset: $offset
order_by: [
{flow_id: asc}
{state: asc}
{parameters: asc}
{state_timestamp: desc_nulls_last}
]
) {
aggregate {
max {
state_timestamp
}
count
}
nodes {
flow {
name
project {
name
}
}
parameters
state
state_timestamp
}
}
}
"""):
for labels, value in to_metrics(res):
_add_gauge_metric(flow_run_last_state, labels, value)
yield flow_run_last_state
REGISTRY.register(PrefectCollector())
app = Flask(__name__)
@app.route('/')
def ok():
return 'ok'
@app.route('/metrics/')
def metrics():
return Response(generate_latest(), mimetype='text/plain')
if __name__ == "__main__":
app.run(host='0.0.0.0', port=8080)