-
Notifications
You must be signed in to change notification settings - Fork 3
/
index.js
123 lines (108 loc) · 3.17 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
'use strict';
var csv = require('csv');
var Q = require('q');
var async = require('async');
var merge = require('lodash/object/merge');
var path = require('path');
var csvParseOptions = {
delimiter: ';',
auto_parse: true,
columns: true
};
module.exports = function importerFactory (sequelize, options) {
sequelize.import(path.join(__dirname, 'lib/model'));
var AddressDAO = sequelize.model('Address', {});
options = merge({
maxBulkCreate: 500,
// Does not work on SQLLite
updateOnDuplicate: false
}, options);
return {
/**
* syncStream()
*
* @params {Stream} stream | Stream which contains csv content
* @params {Callback} done
*/
syncStream: function (stream, done) {
var promises = [];
var ids = [];
var defer = Q.defer();
var handledCount = 0;
// Async cargo does not care about errors!
var error = null;
var cargo = async.cargo(function (datas, done) {
// Ignore processing if an error appear
if (error) {
return done();
}
var transaction = datas[0].transaction;
var records = datas.map(function (d) { return d.record; });
AddressDAO.bulkCreate(records, {
updateOnDuplicate: options.updateOnDuplicate,
transaction: transaction,
validate: true
})
.catch(function (err) {
error = err;
})
.then(done);
}, options.maxBulkCreate);
sequelize.transaction()
.then(function (t) {
stream
.pipe(csv.parse(csvParseOptions))
.pipe(csv.transform(function (record) {
return {
id: record.language + '-' + record.id,
language: record.language,
cityName: record.locality,
streetName: record.street,
postCode: record.postcode,
country: record.iso
};
}))
.on('data', function (record) {
if (record) {
ids.push(record.id);
cargo.push({ record: record, transaction: t }, function () {
defer.notify(handledCount++);
});
}
})
.on('end', function () {
var end = function () {
if (error) {
return t
.rollback()
.catch(defer.reject.bind(defer))
.then(function () {
defer.reject(error);
});
}
var promise = AddressDAO
.destroy({
where: {
$not: [
{ id: ids }
]
},
transaction: t
})
.then(t.commit.bind(t))
defer.resolve(promise);
};
if (cargo.length()) {
cargo.drain = end;
} else {
end();
}
});
});
if (done) {
defer.promise.nodeify(done);
}
return defer.promise;
}
};
};