-
Notifications
You must be signed in to change notification settings - Fork 18
/
import.js
97 lines (87 loc) · 2.7 KB
/
import.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
const config = require('config');
const csv = require('csv-parser');
const fs = require('fs');
const { Client } = require('@elastic/elasticsearch');
const indexName = config.get('elasticsearch.index_name');
async function* generator() {
const stream = fs.createReadStream('dataset/dans-ma-rue.csv')
.pipe(csv({ separator: ';' }));
let chunk = [];
let counter = 0;
for await (const value of stream) {
counter++;
chunk.push(value);
if (counter === 20000) {
yield chunk;
chunk = [];
counter = 0;
}
}
yield chunk;
}
const itemToDocument = data => ({
"timestamp": new Date(data.DATEDECL),
"object_id": data.OBJECTID,
"annee_declaration" : data["ANNEE DECLARATION"],
"mois_declaration" : data["MOIS DECLARATION"],
"mois_annee_declaration" : ('0' + data["MOIS DECLARATION"]).slice(-2) + '/' + data["ANNEE DECLARATION"],
"type" : data.TYPE,
"sous_type" : data.SOUSTYPE,
"code_postal" : data.CODE_POSTAL,
"ville" : data.VILLE,
"arrondissement" : data.ARRONDISSEMENT,
"prefixe" : data.PREFIXE,
"intervenant" : data.INTERVENANT,
"conseil_de_quartier" : data["CONSEIL DE QUARTIER"],
"location" : data.geo_point_2d
});
function createBulkInsertQuery(anomalies) {
const body = anomalies.reduce((acc, data) => {
const anomalie = itemToDocument(data);
acc.push({ index: { _index: indexName, _type: '_doc', _id: anomalie.object_id } })
acc.push(anomalie);
return acc
}, []);
return { body };
}
async function sendBulk(client, chunk) {
const query = createBulkInsertQuery(chunk);
await client.bulk(query);
}
async function sendBulks(client) {
const iterator = generator();
let chunk = await iterator.next();
while(!chunk.done) {
const value = chunk.value;
await sendBulk(client, value);
console.log(`Sent bulk of size ${value.length}`);
chunk = await iterator.next();
}
}
async function deleteIndex(client) {
await client.indices.delete({ index : indexName });
}
async function createIndex(client) {
await client.indices.create({ index: indexName });
}
async function putMapping(client) {
await client.indices.putMapping({
index: indexName,
body: {
properties: {
"location" : {
type: 'geo_point'
}
}
}
});
}
async function run() {
const client = new Client({ node: config.get("elasticsearch.uri") });
await deleteIndex(client);
await createIndex(client);
await putMapping(client);
await sendBulks(client);
console.log('Finished');
}
run().catch(error => console.log(error));