Skip to content

Commit

Permalink
Better error handling in event sending and receiving. Resolves #1105
Browse files Browse the repository at this point in the history
  • Loading branch information
icebob committed Jan 14, 2023
1 parent 317e68b commit e4b08bd
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 22 deletions.
57 changes: 57 additions & 0 deletions dev/event-error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
const { ServiceBroker } = require("..");

const broker1 = new ServiceBroker({
nodeID: "node-1",
transporter: "NATS"
});

const broker2 = new ServiceBroker({
nodeID: "node-2",
transporter: "NATS"
/*errorHandler(err, info) {
this.logger.warn("Error handled:", err);
}*/
});

broker2.createService({
name: "test",

events: {
"test.event": [
() => {
console.log("Called first handler");
throw new Error("Error in event handler");
},
() => {
console.log("Called second handler");
}
]
},
started() {}
});

broker1.createService({
name: "events",
events: {
"test.event": () => {
console.log("Called third handler");
}
}
});

(async function () {
await broker1.start();
await broker2.start();

// broker1.transit.sendEvent = () => Promise.reject(new Error("Unable to send"));

await broker1.waitForServices("test");

try {
await broker1.broadcast("test.event", { a: 5 }, { throwError: true });
} catch (err) {
broker1.logger.warn("Catched error", err);
}

broker1.repl();
})();
6 changes: 5 additions & 1 deletion src/registry/event-catalog.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,11 @@ class EventCatalog {
}
});

return this.broker.Promise.all(promises);
return this.broker.Promise.allSettled(promises).then(results => {
const err = results.find(r => r.status == "rejected");
if (err) return this.broker.Promise.reject(err.reason);
return true;
});
}

/**
Expand Down
63 changes: 44 additions & 19 deletions src/service-broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -1451,8 +1451,6 @@ class ServiceBroker {
promises.push(this.transit.sendEvent(newCtx));
});
}

return this.Promise.all(promises);
} else if (this.transit) {
// Disabled balancer case
let groups = opts.groups;
Expand All @@ -1462,11 +1460,24 @@ class ServiceBroker {
groups = this.getEventGroups(eventName);
}

if (groups.length === 0) return this.Promise.resolve();
if (groups.length === 0) return this.Promise.resolve(true);

ctx.eventGroups = groups;
return this.transit.sendEvent(ctx);
promises.push(this.transit.sendEvent(ctx));
}

const p = this.Promise.allSettled(promises).then(results => {
const err = results.find(r => r.status == "rejected");
if (err) return this.Promise.reject(err.reason);
return true;
});

if (opts.throwError) {
return p;
}
return p.catch(() => {
// swallow the error. It's already logged.
});
}

/**
Expand Down Expand Up @@ -1523,20 +1534,31 @@ class ServiceBroker {
const endpoints = this.registry.events.getAllEndpoints(eventName, groups);

// Return here because balancer disabled, so we can't call the local services.
return this.Promise.all(
endpoints.map(ep => {
const newCtx = ctx.copy(ep);
newCtx.eventGroups = groups;
return this.transit.sendEvent(newCtx);
})
);
endpoints.forEach(ep => {
const newCtx = ctx.copy(ep);
newCtx.eventGroups = groups;
promises.push(this.transit.sendEvent(newCtx));
});
}
}

// Send to local services
promises.push(this.broadcastLocal(eventName, payload, opts));
if (!this.options.disableBalancer) {
// Send to local services
promises.push(this.broadcastLocal(eventName, payload, opts));
}

const p = this.Promise.allSettled(promises).then(results => {
const err = results.find(r => r.status == "rejected");
if (err) return this.Promise.reject(err.reason);
return true;
});

return this.Promise.all(promises);
if (opts.throwError) {
return p;
}
return p.catch(() => {
// swallow the error. It's already logged.
});
}

/**
Expand Down Expand Up @@ -1569,7 +1591,13 @@ class ServiceBroker {
ctx.eventType = "broadcastLocal";
ctx.eventGroups = opts.groups;

return this.emitLocalServices(ctx).catch(err => {
const p = this.emitLocalServices(ctx);

if (opts.throwError) {
return p;
}

return p.catch(err => {
// Catch and log the error because it's a local event handler, not throwing further.
this.logger.error(err);
});
Expand Down Expand Up @@ -1706,10 +1734,7 @@ class ServiceBroker {
* @memberof ServiceBroker
*/
emitLocalServices(ctx) {
return this.registry.events.emitLocalServices(ctx).catch(err => {
// Catch and log the error because it's a local event handler, not throwing further.
this.logger.error(err);
});
return this.registry.events.emitLocalServices(ctx);
}

/**
Expand Down
11 changes: 10 additions & 1 deletion src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,14 @@ class Transit {
ctx.nodeID = payload.sender;

// ensure the eventHandler resolves true when the event was handled successfully
return this.broker.emitLocalServices(ctx).then(() => true);
return this.broker
.emitLocalServices(ctx)
.then(() => true)
.catch(err => {
this.logger.error(err);

return false;
});
}

/**
Expand Down Expand Up @@ -1012,6 +1019,8 @@ class Transit {
module: "transit",
type: C.FAILED_SEND_EVENT_PACKET
});

return Promise.reject(err);
}
);
}
Expand Down
9 changes: 8 additions & 1 deletion test/unit/transit.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2576,7 +2576,14 @@ describe("Test Transit.sendEvent", () => {
);

ctx.eventGroups = ["users", "mail"];
await transit.sendEvent(ctx);

expect.assertions(4);
try {
await transit.sendEvent(ctx);
} catch (err) {
expect(err).toBeInstanceOf(Error);
expect(err.message).toEqual("Error during failedSendEventPacket!");
}

expect(broker.broadcastLocal).toHaveBeenCalledTimes(1);
expect(broker.broadcastLocal).toHaveBeenCalledWith("$transit.error", {
Expand Down

0 comments on commit e4b08bd

Please sign in to comment.