Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
williamlardier committed Oct 9, 2023
1 parent fc217cc commit 0878cc9
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 30 deletions.
98 changes: 68 additions & 30 deletions CountItemsV2/CountItems.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ class CountItems {
// Initialize the ChangeStream
this.changeStreamListenDeletion();
let stop = false;
let startTime;
while (!stop) {
startTime = process.hrtime();
this.log.info('Starting a new round...');
await this.listAllBuckets();
this.log.info(`Found ${Object.keys(this.pool).length} buckets`);
Expand All @@ -137,7 +139,7 @@ class CountItems {
const bucketInfo = this.pool[bucketName];
if (bucketInfo && !bucketInfo.ongoing) {
bucketInfo.ongoing = true;
const promise = this.processBucket(bucketName, bucketInfo.doc.value.owner, bucketInfo.doc.value.locationConstraint, bucketInfo.first)
const promise = this.processBucket(bucketName, bucketInfo.doc.value.ownerDisplayName, bucketInfo.doc.value.locationConstraint, bucketInfo.first)
.then((result) => {
bucketInfo.first = false;
this.consolidateResults(bucketName, result);
Expand All @@ -162,12 +164,9 @@ class CountItems {
await this.setCheckPoints();
// then compute all metrics and save them
await this.aggregateResults();
this.log.info(`Round completed in ${process.hrtime(startTime)[0]}s. Restarting in 2 seconds...`);
// await sleep 2 seconds
console.log(this.pool)
await new Promise(r => setTimeout(r, 2000));
// TODO how to detect object deletion that are not
// delete markers, as changestreams are scoped and
// there may be a lot of buckets?
}
}

Expand Down Expand Up @@ -358,6 +357,10 @@ class CountItems {
// Default to the current time
}

// add to the set of bulked checkpoints, the current bucket name with
// the filter used, +1s
this.bulkedCheckpoints[bucketName] = lastSyncedTimestamp;

// Step 2: Setup collection and checkpoint
// We get the current bucket status from the pool;
if (!this.pool[bucketName]) {
Expand Down Expand Up @@ -393,10 +396,12 @@ class CountItems {
$project: {
isMaster: {
$cond: [
{ $and: [
{ $eq: ["$value.versionId", null] },
{ $eq: ["$value.isNull", false] }] },
1, 0]
{
$and: [
{ $eq: [{ $indexOfBytes: ["$_id", "\0"] }, -1] },
{ $eq: ["$value.isNull", false] }]
},
1, 0]
},
isNull: {
$cond: [{ $eq: ["$value.isNull", true] }, 1, 0]
Expand All @@ -405,35 +410,39 @@ class CountItems {
$cond: [{ $ne: [{ $indexOfBytes: ["$_id", "\0"] }, -1] }, 1, 0]
},
isMaster2: {
$cond: [{ $and: [
{ },
{ $ne: ["$value.isNull", true] }] }, 1, 0]
$cond: [{
$and: [
{},
{ $ne: ["$value.isNull", true] }]
}, 1, 0]
},
contentLength: "$value.content-length",
}
},
{
$group: {
_id: null,
masterData: { $sum: { $multiply: [ { $add: ["$isMaster", "$isMaster2"] }, "$contentLength" ] } },
masterData: { $sum: { $multiply: [{ $add: ["$isMaster", "$isMaster2"] }, "$contentLength"] } },
nullData: { $sum: { $multiply: ['$isNull', '$contentLength'] } },
versionData: { $sum: { $multiply: ['$isVersioned', '$contentLength'] } },
masterCount: { $sum: { $add: ["$isMaster", "$isMaster2"] } },
nullCount: { $sum: '$isNull' },
versionCount: { $sum: '$isVersioned' },
}
}
]);

// wait till the aggregation is done
const result = await operation.toArray();
console.log(result)
const metrics = {
masterData: result?.[0]?.masterData || 0,
nullData: result?.[0]?.nullData || 0,
versionData: result?.[0]?.versionData || 0,
masterCount: result?.[0]?.masterCount || 0,
nullCount: result?.[0]?.nullCount || 0,
versionCount: result?.[0]?.versionCount || 0,
};

// add to the set of bulked checkpoints, the current bucket name with
// the filter used, +1s
this.bulkedCheckpoints[bucketName] = lastSyncedTimestamp;
// return the computed metrics as a single object holding all the data
return resolve({
accountName,
Expand Down Expand Up @@ -463,27 +472,51 @@ class CountItems {
}

consolidateResults(bucketName, result) {
const updateMetrics = (target, source) => {
if (!target) return;
for (const key in source.metrics) {
target.metrics[key] = (target.metrics[key] || 0) + source.metrics[key];
}
};

if (!bucketName || !this.pool[bucketName]) {
this.log.error('Bucket not found in pool', {
bucketName,
});
this.log.error('Bucket not found in pool', { bucketName });
return;
}

if (!result) {
this.log.error('No result provided', {
bucketName,
});
this.log.error('No result provided', { bucketName });
return;
}

// add each metric
this.pool[bucketName].metrics = {
masterData: (this.pool[bucketName].metrics.masterData || 0) + result.metrics.masterData,
nullData: (this.pool[bucketName].metrics.nullData || 0) + result.metrics.nullData,
versionData: (this.pool[bucketName].metrics.versionData || 0) + result.metrics.versionData,
};
updateMetrics(this.pool[bucketName], result);

const accountMetrics = {};
const locationMetrics = {};
const bucketMetrics = {};

for (const currentBucketName in this.pool) {
const bucketInfo = this.pool[currentBucketName];

if (!accountMetrics[bucketInfo.doc.value.ownerDisplayName]) {
accountMetrics[bucketInfo.doc.value.ownerDisplayName] = { metrics: {} };
}
updateMetrics(accountMetrics[bucketInfo.doc.value.ownerDisplayName], bucketInfo);

if (!locationMetrics[bucketInfo.doc.value.locationConstraint]) {
locationMetrics[bucketInfo.doc.value.locationConstraint] = { metrics: {} };
}
updateMetrics(locationMetrics[bucketInfo.doc.value.locationConstraint], bucketInfo);

if (!bucketMetrics[currentBucketName]) {
bucketMetrics[currentBucketName] = { metrics: {} };
}
updateMetrics(bucketMetrics[currentBucketName], bucketInfo);
}

console.log(JSON.stringify({ accountMetrics, locationMetrics, bucketMetrics }));
}

/**
* Detect objects that are deleted while the aggregations are running.
* The documents whose bucketName is in the pool (after the first successful run)
Expand Down Expand Up @@ -515,15 +548,19 @@ class CountItems {
});
const size = change.updateDescription.updatedFields.value['content-length'];
let type;
let typeCount;
if (change.documentKey._id.indexOf('\0') !== -1) {
type = 'versionData';
typeCount = 'versionCount';
} else if (
!change.updateDescription.updatedFields.value.versionId ||
(!!change.updateDescription.updatedFields.value.versionId &&
!change.updateDescription.updatedFields.value.isNull)) {
type = 'masterData';
typeCount = 'masterCount';
} else {
type = 'nullData';
typeCount = 'nullCount';
}
// do not process object if last modified date is after the current
// scan date.
Expand All @@ -532,12 +569,12 @@ class CountItems {
return;
}
this.pool[change.ns.coll].metrics[type] = Math.max(0, this.pool[change.ns.coll].metrics[type] - size);
this.pool[change.ns.coll].metrics[typeCount] = Math.max(0, this.pool[change.ns.coll].metrics[typeCount] - 1);
});

// Listen for errors
watcher.on('error', (error) => {
this.log.error('Error in change stream', { error });
console.log(error);

// Close the errored change stream
watcher.close();
Expand All @@ -560,6 +597,7 @@ class CountItems {
module.exports = CountItems;

// todo:
// - fix secondary optime check
// - detect object replacement
// - better way of detecting object being deletec but not yet in the computed metrics?
// - periodically flush all metrics of buckets
77 changes: 77 additions & 0 deletions conf/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"backbeat": {
"host": "artesca-data-management-backbeat-api.zenko.svc.cluster.local",
"port": 80
},
"clusters": 1,
"dataClient": {
"host": "artesca-data-base-local-data.zenko.svc.cluster.local",
"port": 9991
},
"healthChecks": {
"allowFrom": [
"0.0.0.0/0"
]
},
"listenOn": [],
"log": {
"dumpLevel": "error",
"logLevel": "info"
},
"mongodb": {
"database": "s3utils",
"readPreference": "primary",
"replicaSetHosts": "localhost:27017",
"replicaSet": "rs0",
"shardCollections": true,
"writeConcern": "majority"
},
"overlayVersion": 1,
"port": 8000,
"recordLog": {
"enabled": true,
"recordLogName": "s3-recordlog"
},
"replicationEndpoints": [],
"replicationGroupId": "RG00001",
"requests": {
"extractClientIPFromHeader": "X-Forwarded-For",
"trustedProxyCIDRs": [
"0.0.0.0/0"
],
"viaProxy": false
},
"restEndpoints": {
"*.s3.w3.com": "us-east-1",
"*.zenko-cloudserver-replicator": "us-east-1",
"127.0.0.1": "us-east-1",
"artesca-data-connector-s3api.zenko.svc.cluster.local": "us-east-1",
"artesca-data-management-ui.zenko.svc.cluster.local": "us-east-1",
"localhost": "us-east-1",
"s3.amazonaws.com": "us-east-1",
"s3.w3.com": "us-east-1",
"ui.w3.com": "us-east-1",
"zenko-cloudserver-replicator": "us-east-1"
},
"vaultd": {
"host": "artesca-data-connector-vault-auth-api.zenko.svc.cluster.local",
"port": 80
},
"websiteEndpoints": [
"s3-website-us-east-1.amazonaws.com",
"s3-website.us-east-2.amazonaws.com",
"s3-website-us-west-1.amazonaws.com",
"s3-website-us-west-2.amazonaws.com",
"s3-website.ap-south-1.amazonaws.com",
"s3-website.ap-northeast-2.amazonaws.com",
"s3-website-ap-southeast-1.amazonaws.com",
"s3-website-ap-southeast-2.amazonaws.com",
"s3-website-ap-northeast-1.amazonaws.com",
"s3-website.eu-central-1.amazonaws.com",
"s3-website-eu-west-1.amazonaws.com",
"s3-website-sa-east-1.amazonaws.com",
"s3-website.localhost",
"s3-website.scality.test",
"zenkoazuretest.blob.core.windows.net"
]
}

0 comments on commit 0878cc9

Please sign in to comment.