From 0878cc9de9eca759467b2058c72fd744380c1b45 Mon Sep 17 00:00:00 2001 From: williamlardier Date: Mon, 9 Oct 2023 18:49:23 +0200 Subject: [PATCH] wip --- CountItemsV2/CountItems.js | 98 ++++++++++++++++++++++++++------------ conf/config.json | 77 ++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 30 deletions(-) create mode 100644 conf/config.json diff --git a/CountItemsV2/CountItems.js b/CountItemsV2/CountItems.js index 339e06cf..aef5599e 100644 --- a/CountItemsV2/CountItems.js +++ b/CountItemsV2/CountItems.js @@ -121,7 +121,9 @@ class CountItems { // Initialize the ChangeStream this.changeStreamListenDeletion(); let stop = false; + let startTime; while (!stop) { + startTime = process.hrtime(); this.log.info('Starting a new round...'); await this.listAllBuckets(); this.log.info(`Found ${Object.keys(this.pool).length} buckets`); @@ -137,7 +139,7 @@ class CountItems { const bucketInfo = this.pool[bucketName]; if (bucketInfo && !bucketInfo.ongoing) { bucketInfo.ongoing = true; - const promise = this.processBucket(bucketName, bucketInfo.doc.value.owner, bucketInfo.doc.value.locationConstraint, bucketInfo.first) + const promise = this.processBucket(bucketName, bucketInfo.doc.value.ownerDisplayName, bucketInfo.doc.value.locationConstraint, bucketInfo.first) .then((result) => { bucketInfo.first = false; this.consolidateResults(bucketName, result); @@ -162,12 +164,9 @@ class CountItems { await this.setCheckPoints(); // then compute all metrics and save them await this.aggregateResults(); + this.log.info(`Round completed in ${process.hrtime(startTime)[0]}s. Restarting in 2 seconds...`); // await sleep 2 seconds - console.log(this.pool) await new Promise(r => setTimeout(r, 2000)); - // TODO how to detect object deletion that are not - // delete markers, as changestreams are scoped and - // there may be a lot of buckets? } } @@ -358,6 +357,10 @@ class CountItems { // Default to the current time } + // add to the set of bulked checkpoints, the current bucket name with + // the filter used, +1s + this.bulkedCheckpoints[bucketName] = lastSyncedTimestamp; + // Step 2: Setup collection and checkpoint // We get the current bucket status from the pool; if (!this.pool[bucketName]) { @@ -393,10 +396,12 @@ class CountItems { $project: { isMaster: { $cond: [ - { $and: [ - { $eq: ["$value.versionId", null] }, - { $eq: ["$value.isNull", false] }] }, - 1, 0] + { + $and: [ + { $eq: [{ $indexOfBytes: ["$_id", "\0"] }, -1] }, + { $eq: ["$value.isNull", false] }] + }, + 1, 0] }, isNull: { $cond: [{ $eq: ["$value.isNull", true] }, 1, 0] @@ -405,9 +410,11 @@ class CountItems { $cond: [{ $ne: [{ $indexOfBytes: ["$_id", "\0"] }, -1] }, 1, 0] }, isMaster2: { - $cond: [{ $and: [ - { }, - { $ne: ["$value.isNull", true] }] }, 1, 0] + $cond: [{ + $and: [ + {}, + { $ne: ["$value.isNull", true] }] + }, 1, 0] }, contentLength: "$value.content-length", } @@ -415,25 +422,27 @@ class CountItems { { $group: { _id: null, - masterData: { $sum: { $multiply: [ { $add: ["$isMaster", "$isMaster2"] }, "$contentLength" ] } }, + masterData: { $sum: { $multiply: [{ $add: ["$isMaster", "$isMaster2"] }, "$contentLength"] } }, nullData: { $sum: { $multiply: ['$isNull', '$contentLength'] } }, versionData: { $sum: { $multiply: ['$isVersioned', '$contentLength'] } }, + masterCount: { $sum: { $add: ["$isMaster", "$isMaster2"] } }, + nullCount: { $sum: '$isNull' }, + versionCount: { $sum: '$isVersioned' }, } } ]); // wait till the aggregation is done const result = await operation.toArray(); - console.log(result) const metrics = { masterData: result?.[0]?.masterData || 0, nullData: result?.[0]?.nullData || 0, versionData: result?.[0]?.versionData || 0, + masterCount: result?.[0]?.masterCount || 0, + nullCount: result?.[0]?.nullCount || 0, + versionCount: result?.[0]?.versionCount || 0, }; - // add to the set of bulked checkpoints, the current bucket name with - // the filter used, +1s - this.bulkedCheckpoints[bucketName] = lastSyncedTimestamp; // return the computed metrics as a single object holding all the data return resolve({ accountName, @@ -463,27 +472,51 @@ class CountItems { } consolidateResults(bucketName, result) { + const updateMetrics = (target, source) => { + if (!target) return; + for (const key in source.metrics) { + target.metrics[key] = (target.metrics[key] || 0) + source.metrics[key]; + } + }; + if (!bucketName || !this.pool[bucketName]) { - this.log.error('Bucket not found in pool', { - bucketName, - }); + this.log.error('Bucket not found in pool', { bucketName }); return; } if (!result) { - this.log.error('No result provided', { - bucketName, - }); + this.log.error('No result provided', { bucketName }); return; } - // add each metric - this.pool[bucketName].metrics = { - masterData: (this.pool[bucketName].metrics.masterData || 0) + result.metrics.masterData, - nullData: (this.pool[bucketName].metrics.nullData || 0) + result.metrics.nullData, - versionData: (this.pool[bucketName].metrics.versionData || 0) + result.metrics.versionData, - }; + updateMetrics(this.pool[bucketName], result); + + const accountMetrics = {}; + const locationMetrics = {}; + const bucketMetrics = {}; + + for (const currentBucketName in this.pool) { + const bucketInfo = this.pool[currentBucketName]; + + if (!accountMetrics[bucketInfo.doc.value.ownerDisplayName]) { + accountMetrics[bucketInfo.doc.value.ownerDisplayName] = { metrics: {} }; + } + updateMetrics(accountMetrics[bucketInfo.doc.value.ownerDisplayName], bucketInfo); + + if (!locationMetrics[bucketInfo.doc.value.locationConstraint]) { + locationMetrics[bucketInfo.doc.value.locationConstraint] = { metrics: {} }; + } + updateMetrics(locationMetrics[bucketInfo.doc.value.locationConstraint], bucketInfo); + + if (!bucketMetrics[currentBucketName]) { + bucketMetrics[currentBucketName] = { metrics: {} }; + } + updateMetrics(bucketMetrics[currentBucketName], bucketInfo); + } + + console.log(JSON.stringify({ accountMetrics, locationMetrics, bucketMetrics })); } + /** * Detect objects that are deleted while the aggregations are running. * The documents whose bucketName is in the pool (after the first successful run) @@ -515,15 +548,19 @@ class CountItems { }); const size = change.updateDescription.updatedFields.value['content-length']; let type; + let typeCount; if (change.documentKey._id.indexOf('\0') !== -1) { type = 'versionData'; + typeCount = 'versionCount'; } else if ( !change.updateDescription.updatedFields.value.versionId || (!!change.updateDescription.updatedFields.value.versionId && !change.updateDescription.updatedFields.value.isNull)) { type = 'masterData'; + typeCount = 'masterCount'; } else { type = 'nullData'; + typeCount = 'nullCount'; } // do not process object if last modified date is after the current // scan date. @@ -532,12 +569,12 @@ class CountItems { return; } this.pool[change.ns.coll].metrics[type] = Math.max(0, this.pool[change.ns.coll].metrics[type] - size); + this.pool[change.ns.coll].metrics[typeCount] = Math.max(0, this.pool[change.ns.coll].metrics[typeCount] - 1); }); // Listen for errors watcher.on('error', (error) => { this.log.error('Error in change stream', { error }); - console.log(error); // Close the errored change stream watcher.close(); @@ -560,6 +597,7 @@ class CountItems { module.exports = CountItems; // todo: +// - fix secondary optime check // - detect object replacement // - better way of detecting object being deletec but not yet in the computed metrics? // - periodically flush all metrics of buckets \ No newline at end of file diff --git a/conf/config.json b/conf/config.json new file mode 100644 index 00000000..efd7e143 --- /dev/null +++ b/conf/config.json @@ -0,0 +1,77 @@ +{ + "backbeat": { + "host": "artesca-data-management-backbeat-api.zenko.svc.cluster.local", + "port": 80 + }, + "clusters": 1, + "dataClient": { + "host": "artesca-data-base-local-data.zenko.svc.cluster.local", + "port": 9991 + }, + "healthChecks": { + "allowFrom": [ + "0.0.0.0/0" + ] + }, + "listenOn": [], + "log": { + "dumpLevel": "error", + "logLevel": "info" + }, + "mongodb": { + "database": "s3utils", + "readPreference": "primary", + "replicaSetHosts": "localhost:27017", + "replicaSet": "rs0", + "shardCollections": true, + "writeConcern": "majority" + }, + "overlayVersion": 1, + "port": 8000, + "recordLog": { + "enabled": true, + "recordLogName": "s3-recordlog" + }, + "replicationEndpoints": [], + "replicationGroupId": "RG00001", + "requests": { + "extractClientIPFromHeader": "X-Forwarded-For", + "trustedProxyCIDRs": [ + "0.0.0.0/0" + ], + "viaProxy": false + }, + "restEndpoints": { + "*.s3.w3.com": "us-east-1", + "*.zenko-cloudserver-replicator": "us-east-1", + "127.0.0.1": "us-east-1", + "artesca-data-connector-s3api.zenko.svc.cluster.local": "us-east-1", + "artesca-data-management-ui.zenko.svc.cluster.local": "us-east-1", + "localhost": "us-east-1", + "s3.amazonaws.com": "us-east-1", + "s3.w3.com": "us-east-1", + "ui.w3.com": "us-east-1", + "zenko-cloudserver-replicator": "us-east-1" + }, + "vaultd": { + "host": "artesca-data-connector-vault-auth-api.zenko.svc.cluster.local", + "port": 80 + }, + "websiteEndpoints": [ + "s3-website-us-east-1.amazonaws.com", + "s3-website.us-east-2.amazonaws.com", + "s3-website-us-west-1.amazonaws.com", + "s3-website-us-west-2.amazonaws.com", + "s3-website.ap-south-1.amazonaws.com", + "s3-website.ap-northeast-2.amazonaws.com", + "s3-website-ap-southeast-1.amazonaws.com", + "s3-website-ap-southeast-2.amazonaws.com", + "s3-website-ap-northeast-1.amazonaws.com", + "s3-website.eu-central-1.amazonaws.com", + "s3-website-eu-west-1.amazonaws.com", + "s3-website-sa-east-1.amazonaws.com", + "s3-website.localhost", + "s3-website.scality.test", + "zenkoazuretest.blob.core.windows.net" + ] +} \ No newline at end of file