forked from DataDog/dd-agent
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathgraphite.py
117 lines (88 loc) · 3.61 KB
/
graphite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import sys, os, re, struct
import logging
import cPickle as pickle
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream
log = logging.getLogger(__name__)
try:
from tornado.netutil import TCPServer
except Exception, e:
log.warn("Tornado < 2.1.1 detected, using compatibility TCPServer")
from compat.tornadotcpserver import TCPServer
class GraphiteServer(TCPServer):
def __init__(self, app, hostname, io_loop=None, ssl_options=None, **kwargs):
log.info('Graphite listener is started')
self.app = app
self.hostname = hostname
TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options, **kwargs)
def handle_stream(self, stream, address):
GraphiteConnection(stream, address, self.app, self.hostname)
class GraphiteConnection(object):
def __init__(self, stream, address, app, hostname):
log.debug('received a new connection from %s', address)
self.app = app
self.stream = stream
self.address = address
self.hostname = hostname
self.stream.set_close_callback(self._on_close)
self.stream.read_bytes(4, self._on_read_header)
def _on_read_header(self,data):
try:
size = struct.unpack("!L",data)[0]
log.debug("Receiving a string of size:" + str(size))
self.stream.read_bytes(size, self._on_read_line)
except Exception, e:
log.error(e)
def _on_read_line(self, data):
log.debug('read a new line from %s', self.address)
self._decode(data)
def _on_close(self):
log.debug('client quit %s', self.address)
def _parseMetric(self, metric):
"""Graphite does not impose a particular metric structure.
So this is where you can insert logic to extract various bits
out of the graphite metric name.
For instance, if the hostname is in 4th position,
you could use: host = components[3]
"""
try:
components = metric.split('.')
host = self.hostname
metric = metric
device = "N/A"
return metric, host, device
except Exception, e:
log.exception("Unparsable metric: %s" % metric)
return None, None, None
def _postMetric(self, name, host, device, datapoint):
ts = datapoint[0]
value = datapoint[1]
self.app.appendMetric("graphite", name, host, device, ts, value)
def _processMetric(self, metric, datapoint):
"""Parse the metric name to fetch (host, metric, device) and
send the datapoint to datadog"""
log.debug("New metric: %s, values: %s" % (metric, datapoint))
(metric,host,device) = self._parseMetric(metric)
if metric is not None:
self._postMetric(metric,host,device, datapoint)
log.info("Posted metric: %s, host: %s, device: %s" % (metric, host, device))
def _decode(self,data):
try:
datapoints = pickle.loads(data)
except:
log.exception("Cannot decode grapite points")
return
for (metric, datapoint) in datapoints:
try:
datapoint = ( float(datapoint[0]), float(datapoint[1]) )
except Exception, e:
log.error(e)
continue
self._processMetric(metric,datapoint)
self.stream.read_bytes(4, self._on_read_header)
def start_graphite_listener(port):
echo_server = GraphiteServer()
echo_server.listen(port)
IOLoop.instance().start()
if __name__ == '__main__':
start_graphite_listener(17124)