Skip to content

Commit

Permalink
fixups post reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
benzekrimaha committed Jan 7, 2025
1 parent 22171f5 commit 42913a5
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 62 deletions.
53 changes: 26 additions & 27 deletions extensions/notification/configManager/MongoConfigManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,50 +91,49 @@ class MongoConfigManager extends BaseConfigManager {

/**
* Connects to MongoDB using the MongoClientInterface
* and retreives the metastore collection
* @param {Function} cb callback
* and retrieves the metastore collection
* @param {Function} cb callback

Check failure on line 95 in extensions/notification/configManager/MongoConfigManager.js

View workflow job for this annotation

GitHub Actions / tests

Trailing spaces not allowed
* @returns {undefined}
*/
_setupMongoClient(cb) {
async _setupMongoClient(cb) {
const mongoUrl = constructConnectionString(this._mongoConfig);
const client = new MongoClient(mongoUrl, {
let client = new MongoClient(mongoUrl, {
replicaSet: this._mongoConfig.replicaSet,
useNewUrlParser: true,
readPreference: this._mongoConfig.readPreference,
});

return client.connect().then(client => {
try {
client = await client.connect();

this._logger.debug('Connected to MongoDB', {
method: 'MongoConfigManager._setupMongoClient',
});
try {
this._mongoClient = client.db(this._mongoConfig.database, {
ignoreUndefined: true,
});
this._metastore = this._mongoClient.collection(this._bucketMetastore);

this._mongoClient = client.db(this._mongoConfig.database, {
ignoreUndefined: true,
});
this._metastore = this._mongoClient.collection(this._bucketMetastore);
// get mongodb version
getMongoVersion(this._mongoClient, (err, version) => {
if (err) {
this._logger.error('Could not get MongoDB version', {
method: 'MongoConfigManager._setupMongoClient',
error: err.message,
});
return cb(err);
}
this._mongoVersion = version;
return cb();
});
return undefined;
} catch (error) {
return cb(error);
}
}).catch(err => {
getMongoVersion(this._mongoClient, (err, version) => {
if (err) {
this._logger.error('Could not get MongoDB version', {
method: 'MongoConfigManager._setupMongoClient',
error: err.message,
});
return cb(err);
}
this._mongoVersion = version;
return cb();
});
return undefined;
} catch (err) {
this._logger.error('Could not connect to MongoDB', {
method: 'MongoConfigManager._setupMongoClient',
error: err.message,
});
return cb(err);
});
}
}

/**
Expand Down
5 changes: 2 additions & 3 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,12 @@ class OplogPopulator {
*/
async _setupMongoClient() {
try {
let client = new MongoClient(this._mongoUrl, {
const client = await new MongoClient(this._mongoUrl, {
replicaSet: this._replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
readPreference: this._mongoConfig.readPreference,
});
client = await client.connect();
}).connect();
// connect to metadata DB
this._mongoClient = client.db(this._database, {
ignoreUndefined: true,
Expand Down
2 changes: 1 addition & 1 deletion lib/util/LocationStatusManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class LocationStatusManager {
this._locationStatusColl.insertOne({
_id: location,
value: locationConfig.getValue(),
}).finally(next);
}).then(() => next()).catch(next);
}, err => {
if (err) {
this._logger.error('Could not add new locations', {
Expand Down
43 changes: 13 additions & 30 deletions tests/functional/ingestion/IngestionReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const http = require('http');
const kafka = require('node-rdkafka');
const { MetadataMock, mockLogs } = require('../utils/MetadataMock');
const MongoClient = require('mongodb').MongoClient;
const { promisify } = require('util');
const timers = require('timers/promises');

const dummyLogger = require('../../utils/DummyLogger');
const dummyPensieveCredentials = require('./DummyPensieveCredentials.json');
Expand Down Expand Up @@ -113,21 +115,16 @@ describe('ingestion reader tests with mock', function fD() {
await client.connect();
db = client.db('metadata', { ignoreUndefined: true });
try {
await new Promise((resolve, reject) => {
kafkaAdminClient.createTopic({
topic,
num_partitions: 1, // eslint-disable-line camelcase
replication_factor: 1, // eslint-disable-line camelcase
}, err => {
if (err && err.code === 36) {
// if topic already exits.
return resolve();
}
return err ? reject(err) : resolve();
});
const createTopic = promisify(kafkaAdminClient.createTopic).bind(kafkaAdminClient);
await createTopic({
topic,
num_partitions: 1, // eslint-disable-line camelcase
replication_factor: 1, // eslint-disable-line camelcase
});
} catch (err) {
throw err;
if (err.code !== 36) { // if topic does not already exist
throw err;
}
}
producer = new BackbeatProducer({
kafka: testConfig.kafka,
Expand All @@ -140,7 +137,7 @@ describe('ingestion reader tests with mock', function fD() {
});

consumer.subscribe([testConfig.extensions.ingestion.topic]);
await new Promise(resolve => setTimeout(resolve, 2000));
await timers.setTimeout(2000);
await db.createCollection('PENSIEVE');
const collection = db.collection('PENSIEVE');
await collection.insertOne(dummyPensieveCredentials);
Expand All @@ -154,28 +151,14 @@ describe('ingestion reader tests with mock', function fD() {
zkClient.once('error', reject);
zkClient.once('ready', resolve);
});
await new Promise((resolve, reject) => {
initManagement(testConfig, err => {
if (err) {
return reject(err);
}
return resolve();
});
});
await promisify(initManagement)(testConfig);
const metadataMock = new MetadataMock();
httpServer = http.createServer((req, res) => metadataMock.onRequest(req, res))
.listen(testPort);
});

after(async () => {
await new Promise((resolve, reject) => {
httpServer.close(err => {
if (err) {
return reject(err);
}
return resolve();
});
});
await promisify(httpServer.close.bind(httpServer))();
consumer.unsubscribe();
await db.collection('PENSIEVE').drop();
await client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ describe('MongoConfigManager ::', () => {
const getDbStub = sinon.stub().returns({
collection: getCollectionStub,
});
sinon.stub(MongoClient, 'connect').callsArgWith(2, null, {
sinon.stub(MongoClient.prototype, 'connect').resolves({
db: getDbStub,
});
manager._setupMongoClient(err => {
Expand Down

0 comments on commit 42913a5

Please sign in to comment.