Skip to content

Commit

Permalink
update etcd service: handle end and error events
Browse files Browse the repository at this point in the history
  • Loading branch information
miaowing committed Jan 15, 2020
1 parent c99ca5b commit 7a821d5
Showing 1 changed file with 21 additions and 25 deletions.
46 changes: 21 additions & 25 deletions packages/service/etcd-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ export class EtcdService implements IService, OnModuleInit, OnModuleDestroy {
private readonly serviceCallbackMaps: Map<string, ((nodes: IServiceNode[]) => void)[]> = new Map();
private readonly servicesCallbacks: ((services: string[]) => void)[] = [];

private watcherWrapper: WatcherWrapper = { connected: false, watcher: null };
private watcher: Watcher;
private timer: NodeJS.Timeout;

constructor(
private readonly client: IEtcd,
Expand All @@ -45,7 +46,6 @@ export class EtcdService implements IService, OnModuleInit, OnModuleDestroy {
}

public async onModuleInit(): Promise<void> {
this.checkServiceWatcher();
await this.initServices();
await this.initServicesWatcher();
}
Expand Down Expand Up @@ -156,55 +156,51 @@ export class EtcdService implements IService, OnModuleInit, OnModuleDestroy {
}
}

private checkServiceWatcher(immediate?: boolean) {
setTimeout(async () => {
if (this.watcherWrapper.watcher && !this.watcherWrapper.connected) {
private recreateServiceWatcher(immediate?: boolean) {
if (this.timer) {
clearTimeout(this.timer);
}
this.timer = setTimeout(async () => {
if (this.watcher) {
try {
await this.watcherWrapper.watcher.cancel();
this.watcherWrapper.watcher = null;
await this.watcher.cancel();
this.watcher = null;
} catch (e) {
this.recreateServiceWatcher();
this.logger.warn(`Cancel the service watcher fail`);
}
try {
await this.initServicesWatcher();
} catch (e) {
this.recreateServiceWatcher();
this.logger.error('Service watcher created error.', e);
}

this.logger.log('Service watcher recreate succeed.');

this.checkServiceWatcher(false);
}
}, immediate ? 0 : 60000);
}

private async initServicesWatcher() {
this.watcherWrapper.watcher = await this.client.namespace(this.namespace).watch().prefix('').create();
this.watcherWrapper.connected = true;
this.watcherWrapper.watcher.on('connected', () => {
this.watcherWrapper.connected = true;
this.watcher = await this.client.namespace(this.namespace).watch().prefix('').create();
this.watcher.on('connected', () => {
this.logger.log('Service watcher connected');
});
this.watcherWrapper.watcher.on('disconnected', () => {
this.watcherWrapper.connected = false;
this.watcher.on('disconnected', () => {
this.logger.log('Service watcher disconnected');
});
this.watcherWrapper.watcher.on('connecting', () => {
this.watcher.on('connecting', () => {
this.logger.log('Service watcher connecting...');
});
this.watcherWrapper.watcher.on('end', async () => {
this.watcher.on('end', async () => {
this.logger.error('Service watcher unexpected end and will recreate soon');

this.watcherWrapper.connected = false;
this.checkServiceWatcher(true);
this.recreateServiceWatcher(true);
});
this.watcherWrapper.watcher.on('error', async e => {
this.watcher.on('error', async e => {
this.logger.error('Service watcher occur unexpected error and will recreate soon', e.stack);

this.watcherWrapper.connected = false;
this.checkServiceWatcher(true);
this.recreateServiceWatcher(true);
});
this.watcherWrapper.watcher.on('data', (res) => {
this.watcher.on('data', (res) => {
res.events.forEach(evt => {
const key = evt.kv.key.toString();
const chunks = key.split('__');
Expand Down

0 comments on commit 7a821d5

Please sign in to comment.