diff --git a/extensions/notification/configManager/MongoConfigManager.js b/extensions/notification/configManager/MongoConfigManager.js index ce26da086..2ec679f68 100644 --- a/extensions/notification/configManager/MongoConfigManager.js +++ b/extensions/notification/configManager/MongoConfigManager.js @@ -5,6 +5,7 @@ const { ZenkoMetrics } = require('arsenal').metrics; const LRUCache = require('arsenal').algorithms .cache.LRUCache; const { errors } = require('arsenal'); +const { promisify } = require('util'); const MongoClient = require('mongodb').MongoClient; @@ -94,47 +95,42 @@ class MongoConfigManager extends BaseConfigManager { * and retreives the metastore collection * @param {Function} cb callback * @returns {undefined} - */ - _setupMongoClient(cb) { - const mongoUrl = constructConnectionString(this._mongoConfig); - const client = new MongoClient(mongoUrl, { - replicaSet: this._mongoConfig.replicaSet, - useNewUrlParser: true, - readPreference: this._mongoConfig.readPreference, - }); - - return client.connect().then(client => { + */ + async _setupMongoClient(cb) { + const mongoUrl = constructConnectionString(this._mongoConfig); + try { + const client = await new MongoClient(mongoUrl, { + replicaSet: this._mongoConfig.replicaSet, + useNewUrlParser: true, + readPreference: this._mongoConfig.readPreference, + }).connect(); this._logger.debug('Connected to MongoDB', { method: 'MongoConfigManager._setupMongoClient', }); + + this._mongoClient = client.db(this._mongoConfig.database, { + ignoreUndefined: true, + }); + this._metastore = this._mongoClient.collection(this._bucketMetastore); + try { - this._mongoClient = client.db(this._mongoConfig.database, { - ignoreUndefined: true, + this._mongoVersion = await promisify(getMongoVersion)(this._mongoClient); + } catch (err) { + this._logger.error('Could not get MongoDB version', { + method: 'MongoConfigManager._setupMongoClient', + error: err.message, }); - this._metastore = this._mongoClient.collection(this._bucketMetastore); - // get mongodb version - getMongoVersion(this._mongoClient, (err, version) => { - if (err) { - this._logger.error('Could not get MongoDB version', { - method: 'MongoConfigManager._setupMongoClient', - error: err.message, - }); - return cb(err); - } - this._mongoVersion = version; - return cb(); - }); - return undefined; - } catch (error) { - return cb(error); + return cb(err); } - }).catch(err => { + + return cb(); + } catch (err) { this._logger.error('Could not connect to MongoDB', { method: 'MongoConfigManager._setupMongoClient', error: err.message, }); return cb(err); - }); + } } /** diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index 43829ac8d..f359afa31 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -83,13 +83,12 @@ class OplogPopulator { */ async _setupMongoClient() { try { - let client = new MongoClient(this._mongoUrl, { + const client = await new MongoClient(this._mongoUrl, { replicaSet: this._replicaSet, useNewUrlParser: true, useUnifiedTopology: true, readPreference: this._mongoConfig.readPreference, - }); - client = await client.connect(); + }).connect(); // connect to metadata DB this._mongoClient = client.db(this._database, { ignoreUndefined: true, diff --git a/tests/functional/ingestion/IngestionReader.js b/tests/functional/ingestion/IngestionReader.js index 373959a19..968a8adb6 100644 --- a/tests/functional/ingestion/IngestionReader.js +++ b/tests/functional/ingestion/IngestionReader.js @@ -4,6 +4,8 @@ const http = require('http'); const kafka = require('node-rdkafka'); const { MetadataMock, mockLogs } = require('../utils/MetadataMock'); const MongoClient = require('mongodb').MongoClient; +const { promisify } = require('util'); +const { setTimeout } = require('timers/promises'); const dummyLogger = require('../../utils/DummyLogger'); const dummyPensieveCredentials = require('./DummyPensieveCredentials.json'); @@ -113,22 +115,16 @@ describe('ingestion reader tests with mock', function fD() { await client.connect(); db = client.db('metadata', { ignoreUndefined: true }); try { - await new Promise((resolve, reject) => { - kafkaAdminClient.createTopic({ - topic, - num_partitions: 1, // eslint-disable-line camelcase - replication_factor: 1, // eslint-disable-line camelcase - }, err => { - if (err && err.code === 36) { - // if topic already exits. - return resolve(); - } - return err ? reject(err) : resolve(); - }); - }); - } catch (err) { - throw err; - } + const createTopic = promisify(kafkaAdminClient.createTopic).bind(kafkaAdminClient); + await createTopic({ + topic, + num_partitions: 1, // eslint-disable-line camelcase + replication_factor: 1, // eslint-disable-line camelcase + }).catch(err => { + if (err.code !== 36) { // if topic does not already exist + throw err; + } + }); producer = new BackbeatProducer({ kafka: testConfig.kafka, topic: testConfig.extensions.ingestion.topic, @@ -140,7 +136,7 @@ describe('ingestion reader tests with mock', function fD() { }); consumer.subscribe([testConfig.extensions.ingestion.topic]); - await new Promise(resolve => setTimeout(resolve, 2000)); + await setTimeout(2000); await db.createCollection('PENSIEVE'); const collection = db.collection('PENSIEVE'); await collection.insertOne(dummyPensieveCredentials); @@ -154,28 +150,17 @@ describe('ingestion reader tests with mock', function fD() { zkClient.once('error', reject); zkClient.once('ready', resolve); }); - await new Promise((resolve, reject) => { - initManagement(testConfig, err => { - if (err) { - return reject(err); - } - return resolve(); - }); - }); + await promisify(initManagement)(testConfig); const metadataMock = new MetadataMock(); httpServer = http.createServer((req, res) => metadataMock.onRequest(req, res)) .listen(testPort); + } catch (err) { + throw err; + } }); after(async () => { - await new Promise((resolve, reject) => { - httpServer.close(err => { - if (err) { - return reject(err); - } - return resolve(); - }); - }); + await promisify(httpServer.close); consumer.unsubscribe(); await db.collection('PENSIEVE').drop(); await client.close();