forked from WIPACrepo/monitoring-scripts
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gridftp_to_es.py
executable file
·74 lines (64 loc) · 2.61 KB
/
gridftp_to_es.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#!/usr/bin/env python
"""
Read from gridftp transfer logs and write to elasticsearch
"""
from __future__ import print_function
import os
import glob
import gzip
import decimal
from optparse import OptionParser
import logging
from functools import partial
parser = OptionParser('usage: %prog [options] transfer_files')
parser.add_option('-a','--address',help='elasticsearch address')
parser.add_option('-n','--indexname',default='gridftp',
help='index name (default gridftp)')
(options, args) = parser.parse_args()
if not args:
parser.error('no gridftp transfer log files')
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s')
def date_convert(d):
return d[:4]+'-'+d[4:6]+'-'+d[6:8]+'T'+d[8:10]+':'+d[10:12]+':'+d[12:]
def read_from_file(filename):
with (gzip.open(filename) if filename.endswith('.gz') else open(filename)) as f:
for line in f:
try:
data = {p.split('=',1)[0]:p.split('=',1)[1] for p in line.split() if '=' in p}
if data['NL.EVNT'] == 'PROG':
continue
data['start_date'] = date_convert(data['START'])
data['end_date'] = date_convert(data['DATE'])
data['DEST'] = data['DEST'].strip('[]')
for k in ('NBYTES','BLOCK','BUFFER','STREAMS'):
data[k] = int(data[k])
data['duration'] = float(decimal.Decimal(data['DATE']) - decimal.Decimal(data['START']))
data['bandwidth_mbps'] = data['NBYTES'] * 8 / 1000000. / data['duration']
for k in ('START','DATE','PROG','NL.EVNT','VOLUME','CODE','TASKID'):
del data[k]
except Exception:
print(data)
continue
yield data
def es_generator(entries):
for data in entries:
data['_index'] = options.indexname
data['_type'] = 'transfer_log'
data['_id'] = data['end_date'].replace('#','-').replace('.','-')
yield data
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
prefix = 'http'
address = options.address
if '://' in address:
prefix,address = address.split('://')
url = '{}://{}'.format(prefix, address)
logging.info('connecting to ES at %s',url)
es = Elasticsearch(hosts=[url],
timeout=5000)
es_import = partial(bulk, es, max_retries=20, initial_backoff=2, max_backoff=3600)
for path in args:
for filename in glob.iglob(path):
gen = es_generator(read_from_file(filename))
success, _ = es_import(gen)
logging.info('finished processing %s', filename)