-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtapdata.js
136 lines (129 loc) · 3.84 KB
/
tapdata.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/**
* @author lg<[email protected]>
* @date 3/31/19
* @description
*/
const request = require('request');
// const appConfig = require('./config');
const EventEmitter = require('events');
const eventEmitter = new EventEmitter();
const cluster = require('cluster');
const Conf = require('conf');
const config = new Conf();
let token = null;
const getToken = function (cb) {
cb = cb || function () { };
if (token) {
cb(token);
} else {
request.post({
url: config.get('tapDataServer.url') + '/api/users/generatetoken',
form: {
accesscode: config.get('tapDataServer.accessCode')
}
}, (err, response, body) => {
if (err) {
console.error('Get access token error', err);
cb(false);
} else if (response.statusCode === 200) {
let result = JSON.parse(body);
token = result.id;
cb(token);
console.log('Get access token success,', body);
if (result.ttl)
setTimeout(() => {
token = null;
}, (result.ttl - 10000) * 1000) // 提前10分钟获取token
} else {
console.error('Get access token error,', body);
cb(false)
}
})
}
},
removeToken = function () {
token = null;
};
/**
* 检查是否启用 load schema 功能
*/
const checkEnableLoadSchemaFeature = function (cb) {
getToken(token => {
let params = {
'filter[where][worker_type][in][0]': 'connector',
'filter[where][worker_type][in][1]': 'transformer',
'filter[where][ping_time][gte]': new Date().getTime() - 60000
};
request.get(config.get('tapDataServer.url') + '/api/Workers?access_token=' + token,
{ qs: params, json: true }, function (err, response, body) {
if (err) {
console.error('get connector worker process fail.', err);
cb(false);
} else if (response.statusCode === 401 || response.statusCode === 403) {
console.error('Access token Expired');
removeToken();
} else if (response.statusCode === 200) {
if (body && body.length > 0) {
console.log('exists process connector or transformer, disable load schema feature');
cb(false);
} else {
console.log('not exists process connector or transformer, enable load schema feature');
cb(true);
}
} else {
console.error('get connector worker process error: \n', body);
cb(false);
}
});
});
};
// const settingCache = {};
const loadNewSettings = function () {
getToken(token => {
if (token) {
let params = {
filter: '{"where":{"category" :"ApiServer"}}',
access_token: token
};
request.get(
config.get('tapDataServer.url') + '/api/Settings',
{ qs: params, json: true },
(err, response, body) => {
if (err) {
console.error('get settings from backend fail.', err);
} else if (response.statusCode === 401 || response.statusCode === 403) {
console.error('Access token Expired');
removeToken();
} else if (response.statusCode === 200) {
if (body && body.length > 0) {
body.forEach(setting => {
let key = setting.key;
let oldVal = config.get(key); //settingCache[key] || appConfig[key];
let newVal = setting.value;
if (String(oldVal) !== String(newVal)) { // setting changed
eventEmitter.emit(key + ':changed', newVal, oldVal);
//appConfig[key] = newVal;
config.set(key, newVal);
// settingCache[key] = newVal;
}
});
} else {
console.log('not found api server settings from backend.');
}
} else {
console.error('get settings from backend fail.', err);
}
}
);
}
});
};
if (cluster.isMaster) {
console.log('check backend settings change in main process.');
// load new settings for api server.
setInterval(loadNewSettings, 5000);
}
module.exports = eventEmitter;
module.exports.getToken = getToken;
module.exports.removeToken = removeToken;
module.exports.checkEnableLoadSchemaFeature = checkEnableLoadSchemaFeature;