-
Notifications
You must be signed in to change notification settings - Fork 163
/
Copy pathfileProcessingUtils.js
98 lines (85 loc) · 2.43 KB
/
fileProcessingUtils.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
let aws = require('aws-sdk');
let common = require('./common');
let async = require('async');
require('./constants');
/** create clients for s3 and dynamodb */
function connect(setRegion, callback) {
let dynamoDB = new aws.DynamoDB({
apiVersion : '2012-08-10',
region : setRegion
});
let s3 = new aws.S3({
apiVersion : '2006-03-01',
region : setRegion
});
callback(dynamoDB, s3);
}
/** function to delete a file */
function deleteFile(setRegion, file, callback) {
connect(setRegion, function(dynamoDB) {
common.deleteFile(dynamoDB, file, callback);
});
}
exports.deleteFile = deleteFile;
/** function to reprocess a file */
function reprocessFile(setRegion, file, callback) {
connect(setRegion, function(dynamoDB, s3) {
common.reprocessFile(dynamoDB, s3, setRegion, file, callback);
});
}
exports.reprocessFile = reprocessFile;
function reprocessS3Prefix(setRegion, bucket, prefix, regexFilter, callback) {
let matcher;
if (regexFilter) {
matcher = new RegExp(regexFilter);
}
let filesProcessed = 0;
connect(setRegion, function(dynamoDB, s3) {
let processing = true;
let params = {
Bucket : bucket,
Prefix : prefix
};
async.whilst(function(test_cb) {
test_cb(null, processing);
}, function(whilstCallback) {
s3.listObjectsV2(params, function(err, data) {
if (err) {
whilstCallback(err);
} else {
// for each returned object, check the filter regex
async.map(data.Contents, function(item, mapCallback) {
if ((matcher && matcher.test(item.Key)) || !matcher) {
filesProcessed++;
console.log("Requesting reprocess of " + bucket + "/" + item.Key);
common.reprocessFile(dynamoDB, s3, setRegion, bucket + "/" + item.Key, mapCallback);
}
}, function(err) {
if (err) {
whilstCallback(err)
} else {
if (data.IsTruncated === true) {
params.ContinuationToken = data.NextContinuationToken;
} else {
// data wasn't truncated, so we have all the
// results
processing = false;
}
whilstCallback();
}
});
}
});
}, function(err) {
callback(err, filesProcessed);
});
});
}
exports.reprocessS3Prefix = reprocessS3Prefix;
/** function to query files from the system and understand their status */
function queryFile(setRegion, file, callback) {
connect(setRegion, function(dynamoDB) {
common.queryFile(dynamoDB, setRegion, file, callback);
});
}
exports.queryFile = queryFile;