-
Notifications
You must be signed in to change notification settings - Fork 1
/
gator.js
149 lines (132 loc) · 4.46 KB
/
gator.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
var thrift = require('thrift')
, http = require('http')
, dgram = require('dgram')
, sys = require('sys')
, Hbase = require('./gen-nodejs/Hbase')
, ttypes = require('./gen-nodejs/Hbase_types')
, conf = require('./configure')
var counters = {};
counters.total = 0;
var flusher = null, alligator = null, cocodrie = null;
config = new conf.Configure(process.argv[2]);
config.on('set', function (config,oldConfig) {
if(config.backend) {
var connection = thrift.createConnection(config.hbaseHost, config.hbasePort);
var client = thrift.createClient(Hbase, connection);
var categoryNum = Object.keys(config.categories).length;
for (var i in config.categories) {
var columnFamilies = [];
columnFamilies.push(new ttypes.ColumnDescriptor({name:'total'}));
var category = config.categories[i];
counters[category.table] = {};
for (var j=0;j<config.intervals.length;j++) {
columnFamilies.push(new ttypes.ColumnDescriptor({name:config.intervals[j].toString()}));
counters[category.table][config.intervals[j].toString()] = {};
}
if(category.intervals) {
for (var j=0;j<category.intervals.length;j++) {
columnFamilies.push(new ttypes.ColumnDescriptor({name:category.intervals[j].toString()}));
counters[category.table][category.intervals[j].toString()] = {};
}
}
client.createTable(category.table, columnFamilies, function (err,data) {
if (data) sys.log(data);
if (err) sys.log(err);
if (--categoryNum == 0) connection.end();
});
}
connection.on('error', function (err) { sys.error(err) });
if (config.alligatorPort != oldConfig.alligatorPort && alligator !== null) {
alligator.close();
alligator = null;
}
if (alligator === null) {
alligator = dgram.createSocket('udp4', function (msg) {
var hit = JSON.parse(msg);
var category = config.categories[hit.category];
if (!category) return;
var table = category.table;
var keys = hit.keys.slice(0);
while (keys.length) {
for (var interval in counters[table]) {
var ts = Math.floor(hit.timestamp/interval) * interval;
if(typeof counters[table][interval][ts] == 'undefined')
counters[table][interval][ts] = {}
var hbaseKey = keys.join('+');
if(typeof counters[table][interval][ts][hbaseKey] == 'undefined') {
counters.total++;
counters[table][interval][ts][hbaseKey] = 0;
}
counters[table][interval][ts][hbaseKey] += hit.value;
}
keys.pop();
}
});
alligator.bind(config.alligatorPort);
}
if (flusher === null || config.flushInt != oldConfig.flushInt) {
clearInterval(flusher);
flush();
flusher = setInterval(flush, config.flushInt);
}
} else if (oldConfig.backend) {
clearInterval(flusher);
flush();
alligator.close();
alligator = null;
}
if (config.frontend) {
if (config.cocodriePort != oldConfig.cocodriePort && cocodrie !== null) {
cocodrie.close();
cocodrie = null;
}
if (cocodrie === null) {
cocodrie = http.createServer(function(request, response) {
response.writeHead(200, {'Content-Type': 'text/plain'});
var from = Date.parse('-1 day')/1000;
var to = (new Date()).getTime() / 1000;
var target;
})
cocodrie.listen(config.cocodriePort);
}
} else if (oldConfig.frontend) {
cocodrie.close();
cocodrie = null;
}
});
var flush = function() {
if (!counters.total) return;
var flushers = clone(counters);
var counterNum = flushers.total;
for (var table in counters) for (var interval in counters[table]) counters[table][interval] = {};
counters.total = 0;
var connection = thrift.createConnection(config.hbaseHost, config.hbasePort);
var client = thrift.createClient(Hbase, connection);
for (var table in flushers) {
for (var interval in flushers[table]) {
for (var ts in flushers[table][interval]) {
for (var key in flushers[table][interval][ts]) {
counterNum++;
column = interval + ':' + ts;
client.atomicIncrement(table,key,column,flushers[table][interval][ts][key],function call(err,data) {
if (data) sys.log(data);
if (err) sys.log(err);
if (--counterNum == 0) connection.end();
});
}
}
}
}
connection.on('error', function (err) { sys.error(err) });
};
var clone = function(oldObj) {
var newObj = (oldObj instanceof Array) ? [] : {};
for (i in oldObj) {
if (i == 'clone') continue;
if (oldObj[i] && typeof oldObj[i] == "object")
newObj[i] = clone(oldObj[i]);
else
newObj[i] = oldObj[i]
}
return newObj;
};