Skip to content

Commit

Permalink
fixups post review
Browse files Browse the repository at this point in the history
  • Loading branch information
benzekrimaha committed Dec 23, 2024
1 parent 22171f5 commit c502728
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 66 deletions.
56 changes: 26 additions & 30 deletions extensions/notification/configManager/MongoConfigManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { ZenkoMetrics } = require('arsenal').metrics;
const LRUCache = require('arsenal').algorithms
.cache.LRUCache;
const { errors } = require('arsenal');
const { promisify } = require('util');

const MongoClient = require('mongodb').MongoClient;

Expand Down Expand Up @@ -94,47 +95,42 @@ class MongoConfigManager extends BaseConfigManager {
* and retreives the metastore collection
* @param {Function} cb callback
* @returns {undefined}
*/
_setupMongoClient(cb) {
const mongoUrl = constructConnectionString(this._mongoConfig);
const client = new MongoClient(mongoUrl, {
replicaSet: this._mongoConfig.replicaSet,
useNewUrlParser: true,
readPreference: this._mongoConfig.readPreference,
});

return client.connect().then(client => {
*/
async _setupMongoClient(cb) {
const mongoUrl = constructConnectionString(this._mongoConfig);
try {
const client = await new MongoClient(mongoUrl, {
replicaSet: this._mongoConfig.replicaSet,
useNewUrlParser: true,
readPreference: this._mongoConfig.readPreference,
}).connect();
this._logger.debug('Connected to MongoDB', {
method: 'MongoConfigManager._setupMongoClient',
});

this._mongoClient = client.db(this._mongoConfig.database, {
ignoreUndefined: true,
});
this._metastore = this._mongoClient.collection(this._bucketMetastore);

try {
this._mongoClient = client.db(this._mongoConfig.database, {
ignoreUndefined: true,
this._mongoVersion = await promisify(getMongoVersion)(this._mongoClient);
} catch (err) {
this._logger.error('Could not get MongoDB version', {

Check warning on line 119 in extensions/notification/configManager/MongoConfigManager.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/notification/configManager/MongoConfigManager.js#L119

Added line #L119 was not covered by tests
method: 'MongoConfigManager._setupMongoClient',
error: err.message,
});
this._metastore = this._mongoClient.collection(this._bucketMetastore);
// get mongodb version
getMongoVersion(this._mongoClient, (err, version) => {
if (err) {
this._logger.error('Could not get MongoDB version', {
method: 'MongoConfigManager._setupMongoClient',
error: err.message,
});
return cb(err);
}
this._mongoVersion = version;
return cb();
});
return undefined;
} catch (error) {
return cb(error);
return cb(err);
}
}).catch(err => {

return cb();
} catch (err) {
this._logger.error('Could not connect to MongoDB', {
method: 'MongoConfigManager._setupMongoClient',
error: err.message,
});
return cb(err);
});
}
}

/**
Expand Down
5 changes: 2 additions & 3 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,12 @@ class OplogPopulator {
*/
async _setupMongoClient() {
try {
let client = new MongoClient(this._mongoUrl, {
const client = await new MongoClient(this._mongoUrl, {
replicaSet: this._replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
readPreference: this._mongoConfig.readPreference,
});
client = await client.connect();
}).connect();
// connect to metadata DB
this._mongoClient = client.db(this._database, {
ignoreUndefined: true,
Expand Down
51 changes: 18 additions & 33 deletions tests/functional/ingestion/IngestionReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const http = require('http');
const kafka = require('node-rdkafka');
const { MetadataMock, mockLogs } = require('../utils/MetadataMock');
const MongoClient = require('mongodb').MongoClient;
const { promisify } = require('util');
const { setTimeout } = require('timers/promises');

const dummyLogger = require('../../utils/DummyLogger');
const dummyPensieveCredentials = require('./DummyPensieveCredentials.json');
Expand Down Expand Up @@ -113,22 +115,16 @@ describe('ingestion reader tests with mock', function fD() {
await client.connect();
db = client.db('metadata', { ignoreUndefined: true });
try {
await new Promise((resolve, reject) => {
kafkaAdminClient.createTopic({
topic,
num_partitions: 1, // eslint-disable-line camelcase
replication_factor: 1, // eslint-disable-line camelcase
}, err => {
if (err && err.code === 36) {
// if topic already exits.
return resolve();
}
return err ? reject(err) : resolve();
});
});
} catch (err) {
throw err;
}
const createTopic = promisify(kafkaAdminClient.createTopic).bind(kafkaAdminClient);
await createTopic({
topic,
num_partitions: 1, // eslint-disable-line camelcase
replication_factor: 1, // eslint-disable-line camelcase
}).catch(err => {
if (err.code !== 36) { // if topic does not already exist
throw err;
}
});
producer = new BackbeatProducer({
kafka: testConfig.kafka,
topic: testConfig.extensions.ingestion.topic,
Expand All @@ -140,7 +136,7 @@ describe('ingestion reader tests with mock', function fD() {
});

consumer.subscribe([testConfig.extensions.ingestion.topic]);
await new Promise(resolve => setTimeout(resolve, 2000));
await setTimeout(2000);
await db.createCollection('PENSIEVE');
const collection = db.collection('PENSIEVE');
await collection.insertOne(dummyPensieveCredentials);
Expand All @@ -154,28 +150,17 @@ describe('ingestion reader tests with mock', function fD() {
zkClient.once('error', reject);
zkClient.once('ready', resolve);
});
await new Promise((resolve, reject) => {
initManagement(testConfig, err => {
if (err) {
return reject(err);
}
return resolve();
});
});
await promisify(initManagement)(testConfig);
const metadataMock = new MetadataMock();
httpServer = http.createServer((req, res) => metadataMock.onRequest(req, res))
.listen(testPort);
} catch (err) {
throw err;
}
});

after(async () => {
await new Promise((resolve, reject) => {
httpServer.close(err => {
if (err) {
return reject(err);
}
return resolve();
});
});
await promisify(httpServer.close);
consumer.unsubscribe();
await db.collection('PENSIEVE').drop();
await client.close();
Expand Down

0 comments on commit c502728

Please sign in to comment.