Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/w/7.70/bugfix/BB-466/zookeeper-e…
Browse files Browse the repository at this point in the history
…xpired-session' into w/8.5/bugfix/BB-466/zookeeper-expired-session
  • Loading branch information
nicolas2bert committed Nov 22, 2023
2 parents 3df6fcd + 6cbd495 commit e5a4938
Show file tree
Hide file tree
Showing 19 changed files with 812 additions and 90 deletions.
7 changes: 3 additions & 4 deletions bin/ingestion.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const { reshapeExceptionError } = require('arsenal').errorUtils;
const IngestionPopulator = require('../lib/queuePopulator/IngestionPopulator');
const config = require('../lib/Config');
const { initManagement } = require('../lib/management/index');
const zookeeperWrapper = require('../lib/clients/zookeeper');
const ZookeeperManager = require('../lib/clients/ZookeeperManager');
const { zookeeperNamespace, zkStatePath } =
require('../extensions/ingestion/constants');
const { startProbeServer } = require('../lib/util/probe');
Expand Down Expand Up @@ -314,10 +314,9 @@ function initAndStart(zkClient) {
});
}

const zkClient = zookeeperWrapper.createClient(connectionString, {
const zkClient = new ZookeeperManager(connectionString, {
autoCreateNamespace,
});
zkClient.connect();
}, log);
zkClient.once('error', err => {
log.fatal('error connecting to zookeeper', {
error: err.message,
Expand Down
8 changes: 3 additions & 5 deletions extensions/lifecycle/conductor/LifecycleConductor.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const MongoClient = require('arsenal').storage

const BackbeatProducer = require('../../../lib/BackbeatProducer');
const BackbeatTask = require('../../../lib/tasks/BackbeatTask');
const zookeeperHelper = require('../../../lib/clients/zookeeper');
const ZookeeperManager = require('../../../lib/clients/ZookeeperManager');
const KafkaBacklogMetrics = require('../../../lib/KafkaBacklogMetrics');
const safeJsonParse = require('../util/safeJsonParse');
const { AccountIdCache } = require('../util/AccountIdCache');
Expand Down Expand Up @@ -620,9 +620,7 @@ class LifecycleConductor {
process.nextTick(cb);
return;
}
this._zkClient = zookeeperHelper.createClient(
this.zkConfig.connectionString, this.zkConfig);
this._zkClient.connect();
this._zkClient = new ZookeeperManager(this.zkConfig.connectionString, this.zkConfig, this.logger);
this._zkClient.once('error', cb);
this._zkClient.once('ready', () => {
// just in case there would be more 'error' events
Expand Down Expand Up @@ -675,7 +673,7 @@ class LifecycleConductor {
// just in case there would be more 'error' events emitted
this._kafkaBacklogMetrics.removeAllListeners('error');
this._kafkaBacklogMetrics.on('error', err => {
this._log.error('error from kafka topic metrics', {
this.logger.error('error from kafka topic metrics', {
error: err.message,
method: 'LifecycleConductor._initKafkaBacklogMetrics',
});
Expand Down
8 changes: 4 additions & 4 deletions extensions/replication/queueProcessor/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const config = require('../../../lib/Config');
const { initManagement } = require('../../../lib/management/index');
const { applyBucketReplicationWorkflows } = require('../management');
const { reshapeExceptionError } = require('arsenal').errorUtils;
const zookeeper = require('../../../lib/clients/zookeeper');
const ZookeeperManager = require('../../../lib/clients/ZookeeperManager');
const { zookeeperNamespace, zkStatePath, zkReplayStatePath } =
require('../constants');

Expand Down Expand Up @@ -284,10 +284,10 @@ function initAndStart(zkClient) {
});
}

const zkClient = zookeeper.createClient(connectionString, {
const zkClient = new ZookeeperManager(connectionString, {
autoCreateNamespace,
});
zkClient.connect();
}, log);

zkClient.once('error', err => {
log.fatal('error connecting to zookeeper', {
error: err.message,
Expand Down
5 changes: 2 additions & 3 deletions lib/KafkaBacklogMetrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const zookeeper = require('node-zookeeper-client');
const Logger = require('werelogs').Logger;
const { errors, metrics } = require('arsenal');

const zookeeperHelper = require('./clients/zookeeper');
const ZookeeperManager = require('./clients/ZookeeperManager');
const { readUInt64BE } = require('./util/buffer');
const { promMetricNames } = require('./constants').kafkaBacklogMetrics;

Expand Down Expand Up @@ -58,8 +58,7 @@ class KafkaBacklogMetrics extends EventEmitter {
}

_initZookeeperClient() {
this._zookeeper = zookeeperHelper.createClient(this._zookeeperEndpoint);
this._zookeeper.connect();
this._zookeeper = new ZookeeperManager(this._zookeeperEndpoint, null, this._log);
this._zookeeper.on('error', err => {
this.emit('error', err);
});
Expand Down
7 changes: 3 additions & 4 deletions lib/api/BackbeatAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const Redis = require('ioredis');
const { errors } = require('arsenal');
const { RedisClient, StatsModel, ZenkoMetrics } = require('arsenal').metrics;

const zookeeper = require('../clients/zookeeper');
const ZookeeperManager = require('../clients/ZookeeperManager');
const BackbeatProducer = require('../BackbeatProducer');
const ObjectQueueEntry =
require('../../lib/models/ObjectQueueEntry');
Expand Down Expand Up @@ -1412,10 +1412,9 @@ class BackbeatAPI {

_setZookeeper(cb) {
const { connectionString, autoCreateNamespace } = this._zkConfig;
const zkClient = zookeeper.createClient(connectionString, {
const zkClient = new ZookeeperManager(connectionString, {
autoCreateNamespace,
});
zkClient.connect();
}, this._logger);

zkClient.once('error', cb);
zkClient.once('connected', () => {
Expand Down
2 changes: 1 addition & 1 deletion lib/api/Healthcheck.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Healthcheck {
/**
* @constructor
* @param {object} repConfig - extensions.replication configs
* @param {node-zookeeper-client.Client} zkClient - zookeeper client
* @param {ZookeeperManager} zkClient - zookeeper client manager
* @param {BackbeatProducer} crrProducer - producer for CRR topic
* @param {BackbeatProducer} crrStatusProducer - CRR status producer
* @param {BackbeatProducer} metricProducer - producer for metric
Expand Down
Loading

0 comments on commit e5a4938

Please sign in to comment.