From e4b08bd0d01adb90aa09bcbc76d663ab18925f76 Mon Sep 17 00:00:00 2001 From: Icebob Date: Sat, 14 Jan 2023 17:45:42 +0100 Subject: [PATCH] Better error handling in event sending and receiving. Resolves #1105 --- dev/event-error.js | 57 +++++++++++++++++++++++++++++++ src/registry/event-catalog.js | 6 +++- src/service-broker.js | 63 ++++++++++++++++++++++++----------- src/transit.js | 11 +++++- test/unit/transit.spec.js | 9 ++++- 5 files changed, 124 insertions(+), 22 deletions(-) create mode 100644 dev/event-error.js diff --git a/dev/event-error.js b/dev/event-error.js new file mode 100644 index 000000000..04b9c6d69 --- /dev/null +++ b/dev/event-error.js @@ -0,0 +1,57 @@ +const { ServiceBroker } = require(".."); + +const broker1 = new ServiceBroker({ + nodeID: "node-1", + transporter: "NATS" +}); + +const broker2 = new ServiceBroker({ + nodeID: "node-2", + transporter: "NATS" + /*errorHandler(err, info) { + this.logger.warn("Error handled:", err); + }*/ +}); + +broker2.createService({ + name: "test", + + events: { + "test.event": [ + () => { + console.log("Called first handler"); + throw new Error("Error in event handler"); + }, + () => { + console.log("Called second handler"); + } + ] + }, + started() {} +}); + +broker1.createService({ + name: "events", + events: { + "test.event": () => { + console.log("Called third handler"); + } + } +}); + +(async function () { + await broker1.start(); + await broker2.start(); + + // broker1.transit.sendEvent = () => Promise.reject(new Error("Unable to send")); + + await broker1.waitForServices("test"); + + try { + await broker1.broadcast("test.event", { a: 5 }, { throwError: true }); + } catch (err) { + broker1.logger.warn("Catched error", err); + } + + broker1.repl(); +})(); diff --git a/src/registry/event-catalog.js b/src/registry/event-catalog.js index 3febe2f4c..f10e9ebbb 100644 --- a/src/registry/event-catalog.js +++ b/src/registry/event-catalog.js @@ -194,7 +194,11 @@ class EventCatalog { } }); - return this.broker.Promise.all(promises); + return this.broker.Promise.allSettled(promises).then(results => { + const err = results.find(r => r.status == "rejected"); + if (err) return this.broker.Promise.reject(err.reason); + return true; + }); } /** diff --git a/src/service-broker.js b/src/service-broker.js index 7189fb592..96c96c0d0 100644 --- a/src/service-broker.js +++ b/src/service-broker.js @@ -1451,8 +1451,6 @@ class ServiceBroker { promises.push(this.transit.sendEvent(newCtx)); }); } - - return this.Promise.all(promises); } else if (this.transit) { // Disabled balancer case let groups = opts.groups; @@ -1462,11 +1460,24 @@ class ServiceBroker { groups = this.getEventGroups(eventName); } - if (groups.length === 0) return this.Promise.resolve(); + if (groups.length === 0) return this.Promise.resolve(true); ctx.eventGroups = groups; - return this.transit.sendEvent(ctx); + promises.push(this.transit.sendEvent(ctx)); + } + + const p = this.Promise.allSettled(promises).then(results => { + const err = results.find(r => r.status == "rejected"); + if (err) return this.Promise.reject(err.reason); + return true; + }); + + if (opts.throwError) { + return p; } + return p.catch(() => { + // swallow the error. It's already logged. + }); } /** @@ -1523,20 +1534,31 @@ class ServiceBroker { const endpoints = this.registry.events.getAllEndpoints(eventName, groups); // Return here because balancer disabled, so we can't call the local services. - return this.Promise.all( - endpoints.map(ep => { - const newCtx = ctx.copy(ep); - newCtx.eventGroups = groups; - return this.transit.sendEvent(newCtx); - }) - ); + endpoints.forEach(ep => { + const newCtx = ctx.copy(ep); + newCtx.eventGroups = groups; + promises.push(this.transit.sendEvent(newCtx)); + }); } } - // Send to local services - promises.push(this.broadcastLocal(eventName, payload, opts)); + if (!this.options.disableBalancer) { + // Send to local services + promises.push(this.broadcastLocal(eventName, payload, opts)); + } + + const p = this.Promise.allSettled(promises).then(results => { + const err = results.find(r => r.status == "rejected"); + if (err) return this.Promise.reject(err.reason); + return true; + }); - return this.Promise.all(promises); + if (opts.throwError) { + return p; + } + return p.catch(() => { + // swallow the error. It's already logged. + }); } /** @@ -1569,7 +1591,13 @@ class ServiceBroker { ctx.eventType = "broadcastLocal"; ctx.eventGroups = opts.groups; - return this.emitLocalServices(ctx).catch(err => { + const p = this.emitLocalServices(ctx); + + if (opts.throwError) { + return p; + } + + return p.catch(err => { // Catch and log the error because it's a local event handler, not throwing further. this.logger.error(err); }); @@ -1706,10 +1734,7 @@ class ServiceBroker { * @memberof ServiceBroker */ emitLocalServices(ctx) { - return this.registry.events.emitLocalServices(ctx).catch(err => { - // Catch and log the error because it's a local event handler, not throwing further. - this.logger.error(err); - }); + return this.registry.events.emitLocalServices(ctx); } /** diff --git a/src/transit.js b/src/transit.js index 78a32e06a..cec534886 100644 --- a/src/transit.js +++ b/src/transit.js @@ -438,7 +438,14 @@ class Transit { ctx.nodeID = payload.sender; // ensure the eventHandler resolves true when the event was handled successfully - return this.broker.emitLocalServices(ctx).then(() => true); + return this.broker + .emitLocalServices(ctx) + .then(() => true) + .catch(err => { + this.logger.error(err); + + return false; + }); } /** @@ -1012,6 +1019,8 @@ class Transit { module: "transit", type: C.FAILED_SEND_EVENT_PACKET }); + + return Promise.reject(err); } ); } diff --git a/test/unit/transit.spec.js b/test/unit/transit.spec.js index 007d422bd..e5e4c0c7e 100644 --- a/test/unit/transit.spec.js +++ b/test/unit/transit.spec.js @@ -2576,7 +2576,14 @@ describe("Test Transit.sendEvent", () => { ); ctx.eventGroups = ["users", "mail"]; - await transit.sendEvent(ctx); + + expect.assertions(4); + try { + await transit.sendEvent(ctx); + } catch (err) { + expect(err).toBeInstanceOf(Error); + expect(err.message).toEqual("Error during failedSendEventPacket!"); + } expect(broker.broadcastLocal).toHaveBeenCalledTimes(1); expect(broker.broadcastLocal).toHaveBeenCalledWith("$transit.error", {