diff --git a/dev/client.js b/dev/client.js index c649a3ae7..4fdb5886b 100644 --- a/dev/client.js +++ b/dev/client.js @@ -80,8 +80,8 @@ const broker = new ServiceBroker({ logger: console, logLevel: "info", middlewares: [ - //Middlewares.Transmit.Encryption("moleculer", "aes-256-cbc"), - //Middlewares.Transmit.Compression(), + // Middlewares.Transmit.Encryption("moleculer", "aes-256-cbc"), + // Middlewares.Transmit.Compression() //Middlewares.Debugging.TransitLogger({ logPacketData: false, /*folder: null, colors: { send: "magenta", receive: "blue"}*/ }), //Middlewares.Debugging.ActionLogger({ logParams: true, logResponse: true, /*folder: null, colors: { send: "magenta", receive: "blue"}*/ }), //require("./RedisHeartbeat") diff --git a/dev/server.js b/dev/server.js index a4f47c9a3..690b14758 100644 --- a/dev/server.js +++ b/dev/server.js @@ -63,8 +63,8 @@ const broker = new ServiceBroker({ logLevel: "info", middlewares: [ - //Middlewares.Transmit.Encryption("moleculer", "aes-256-cbc"), - //Middlewares.Transmit.Compression(), + // Middlewares.Transmit.Encryption("moleculer", "aes-256-cbc"), + // Middlewares.Transmit.Compression() //Middlewares.Debugging.TransitLogger({ logPacketData: false, /*folder: null, colors: { send: "magenta", receive: "blue"}*/ }), //Middlewares.Debugging.ActionLogger({ logPacketData: false, /*folder: null, colors: { send: "magenta", receive: "blue"}*/ }), //require("./RedisHeartbeat") diff --git a/src/transporters/kafka.js b/src/transporters/kafka.js index 5408b5a7c..7141d0019 100644 --- a/src/transporters/kafka.js +++ b/src/transporters/kafka.js @@ -25,14 +25,7 @@ const toMoleculerLogLevel = level => { }; /** - * Lightweight transporter for Kafka - * - * For test: - * 1. clone https://github.com/wurstmeister/kafka-docker.git repo - * 2. follow instructions on https://github.com/wurstmeister/kafka-docker#pre-requisites - * 3. start containers with Docker Compose - * - * docker-compose -f docker-compose-single-broker.yml up -d + * Transporter for Kafka * * @class KafkaTransporter * @extends {Transporter} @@ -47,7 +40,7 @@ class KafkaTransporter extends Transporter { */ constructor(opts) { if (typeof opts === "string") { - opts = { brokers: opts.replace("kafka://", "") }; + opts = { client: { brokers: [opts.replace("kafka://", "")] } }; } else if (opts == null) { opts = {}; } @@ -55,7 +48,11 @@ class KafkaTransporter extends Transporter { opts = defaultsDeep(opts, { // KafkaClient options. More info: https://kafka.js.org/docs/configuration client: { - brokers: Array.isArray(opts.brokers) ? opts.brokers : [opts.brokers], + brokers: Array.isArray(opts.brokers) + ? opts.brokers + : opts.brokers + ? [opts.brokers] + : null, logLevel: 1, logCreator: logLevel => @@ -138,15 +135,15 @@ class KafkaTransporter extends Transporter { */ async disconnect() { if (this.admin) { - await this.admin.disconnect; + await this.admin.disconnect(); this.admin = null; } if (this.producer) { - await this.producer.disconnect; + await this.producer.disconnect(); this.producer = null; } if (this.consumer) { - await this.consumer.disconnect; + await this.consumer.disconnect(); this.consumer = null; } } diff --git a/test/unit/transporters/index.spec.js b/test/unit/transporters/index.spec.js index 1c4d8215f..8f6b4c1ab 100644 --- a/test/unit/transporters/index.spec.js +++ b/test/unit/transporters/index.spec.js @@ -119,18 +119,18 @@ describe("Test Transporter resolver", () => { }); it("should resolve KafkaTransporter from connection string", () => { - let trans = Transporters.resolve("kafka://localhost:2181"); + let trans = Transporters.resolve("kafka://localhost:9093"); expect(trans).toBeInstanceOf(Transporters.Kafka); expect(trans.opts).toEqual({ - host: "localhost:2181", client: { - kafkaHost: "localhost:2181" + brokers: ["localhost:9093"], + logCreator: expect.any(Function), + logLevel: 1 }, - consumer: {}, - customPartitioner: undefined, producer: {}, - publish: { - attributes: 0, + consumer: {}, + publish: {}, + publishMessage: { partition: 0 } }); @@ -138,23 +138,23 @@ describe("Test Transporter resolver", () => { it("should resolve KafkaTransporter from obj", () => { let options = { - host: "localhost:2181", - publish: { + client: { brokers: ["localhost:9093"] }, + publishMessage: { partition: 2 } }; let trans = Transporters.resolve({ type: "Kafka", options }); expect(trans).toBeInstanceOf(Transporters.Kafka); expect(trans.opts).toEqual({ - host: "localhost:2181", client: { - kafkaHost: "localhost:2181" + brokers: ["localhost:9093"], + logCreator: expect.any(Function), + logLevel: 1 }, - consumer: {}, - customPartitioner: undefined, producer: {}, - publish: { - attributes: 0, + consumer: {}, + publish: {}, + publishMessage: { partition: 2 } }); diff --git a/test/unit/transporters/kafka.spec.js b/test/unit/transporters/kafka.spec.js index 09c8c57e7..bb8115e42 100644 --- a/test/unit/transporters/kafka.spec.js +++ b/test/unit/transporters/kafka.spec.js @@ -3,17 +3,40 @@ const Transit = require("../../../src/transit"); const P = require("../../../src/packets"); const C = require("../../../src/constants"); -jest.mock("kafka-node"); - -let Kafka = require("kafka-node"); -const clientCallbacks = {}; -let clientCloseCB; -Kafka.KafkaClient = jest.fn(() => { - return { - close: jest.fn(cb => (clientCloseCB = cb)), - callbacks: clientCallbacks - }; -}); +jest.mock("kafkajs"); + +let Kafka = require("kafkajs"); + +const FakeKafkaAdmin = { + connect: jest.fn(), + disconnect: jest.fn(), + createTopics: jest.fn() +}; + +const FakeKafkaProducer = { + connect: jest.fn(), + disconnect: jest.fn(), + send: jest.fn() +}; + +let consumerRunOpts = {}; +const FakeKafkaConsumer = { + connect: jest.fn(), + disconnect: jest.fn(), + subscribe: jest.fn(), + run: jest.fn(opts => { + consumerRunOpts = opts; + }) +}; + +const FakeKafkaClient = { + admin: jest.fn(() => FakeKafkaAdmin), + producer: jest.fn(() => FakeKafkaProducer), + consumer: jest.fn(() => FakeKafkaConsumer) +}; + +Kafka.Kafka = jest.fn(() => FakeKafkaClient); +/* const producerCallbacks = {}; Kafka.Producer = jest.fn(() => { return { @@ -33,7 +56,7 @@ Kafka.ConsumerGroup = jest.fn(() => { callbacks: groupCallbacks }; }); - +*/ const KafkaTransporter = require("../../../src/transporters/kafka"); describe("Test KafkaTransporter constructor", () => { @@ -41,17 +64,15 @@ describe("Test KafkaTransporter constructor", () => { let transporter = new KafkaTransporter(); expect(transporter).toBeDefined(); expect(transporter.opts).toEqual({ - host: undefined, client: { - noAckBatchOptions: undefined, - sslOptions: undefined, - zkOptions: undefined + brokers: null, + logCreator: expect.any(Function), + logLevel: 1 }, - customPartitioner: undefined, producer: {}, consumer: {}, - publish: { - attributes: 0, + publish: {}, + publishMessage: { partition: 0 } }); @@ -64,15 +85,15 @@ describe("Test KafkaTransporter constructor", () => { it("check constructor with string param", () => { let transporter = new KafkaTransporter("localhost:9092"); expect(transporter.opts).toEqual({ - host: "localhost:9092", client: { - kafkaHost: "localhost:9092" + brokers: ["localhost:9092"], + logCreator: expect.any(Function), + logLevel: 1 }, - customPartitioner: undefined, producer: {}, consumer: {}, - publish: { - attributes: 0, + publish: {}, + publishMessage: { partition: 0 } }); @@ -80,22 +101,24 @@ describe("Test KafkaTransporter constructor", () => { it("check constructor with options", () => { let opts = { - host: "localhost:9092", - publish: { + client: { + brokers: ["localhost:9092"] + }, + publishMessage: { partition: 1 } }; let transporter = new KafkaTransporter(opts); expect(transporter.opts).toEqual({ - host: "localhost:9092", client: { - kafkaHost: "localhost:9092" + brokers: ["localhost:9092"], + logCreator: expect.any(Function), + logLevel: 1 }, - customPartitioner: undefined, producer: {}, consumer: {}, - publish: { - attributes: 0, + publish: {}, + publishMessage: { partition: 1 } }); @@ -109,43 +132,57 @@ describe("Test KafkaTransporter connect & disconnect", () => { let transporter; beforeEach(() => { - transporter = new KafkaTransporter(); + transporter = new KafkaTransporter({ + client: { + some: "thing" + }, + producer: { + extraProp: 7 + } + }); transporter.init(transit, msgHandler); }); - it("check connect", () => { - let p = transporter.connect().then(() => { - expect(transporter.client).toBeDefined(); - expect(transporter.producer).toBeDefined(); - expect(transporter.producer.on).toHaveBeenCalledTimes(2); - expect(transporter.producer.on).toHaveBeenCalledWith("ready", expect.any(Function)); - expect(transporter.producer.on).toHaveBeenCalledWith("error", expect.any(Function)); + it("check connect", async () => { + await transporter.connect(); + expect(transporter.client).toBeDefined(); + expect(transporter.admin).toBeDefined(); + expect(transporter.producer).toBeDefined(); + + expect(Kafka.Kafka).toHaveBeenCalledTimes(1); + expect(Kafka.Kafka).toHaveBeenCalledWith({ + brokers: null, + logCreator: expect.any(Function), + logLevel: 1, + some: "thing" }); - transporter.producer.callbacks.ready(); - - return p; + expect(FakeKafkaClient.producer).toHaveBeenCalledTimes(1); + expect(FakeKafkaClient.producer).toHaveBeenCalledWith({ + extraProp: 7 + }); }); - it("check connect - should broadcast error", () => { + it("check connect - should broadcast error", async () => { broker.broadcastLocal = jest.fn(); - let p = transporter.connect().catch(() => { + const origErr = new Error("Ups"); + FakeKafkaAdmin.connect = jest.fn(() => Promise.reject(origErr)); + try { + await transporter.connect(); + expect(1).toBe(2); + } catch (err) { + expect(err).toBe(origErr); expect(transporter.producer).toBeDefined(); expect(broker.broadcastLocal).toHaveBeenCalledTimes(1); expect(broker.broadcastLocal).toHaveBeenNthCalledWith(1, "$transporter.error", { - error: new Error("Ups"), + error: origErr, module: "transporter", type: C.FAILED_PUBLISHER_ERROR }); - }); - - // Trigger an error - const error = new Error("Ups"); - transporter.producer.callbacks.error(error); - - return p; + } + FakeKafkaAdmin.connect = jest.fn(); }); it("check onConnected after connect", () => { @@ -155,26 +192,20 @@ describe("Test KafkaTransporter connect & disconnect", () => { expect(transporter.onConnected).toHaveBeenCalledWith(); }); - transporter.producer.callbacks.ready(); - return p; }); - it("check disconnect", () => { - let p = transporter.connect().then(() => { - let close = transporter.client.close; - let close2 = jest.fn(cb => cb()); - transporter.consumer = { - close: close2 - }; - transporter.disconnect(); - expect(close).toHaveBeenCalledTimes(1); - clientCloseCB(); - expect(close2).toHaveBeenCalledTimes(1); - }); + it("check disconnect", async () => { + await transporter.connect(); + await transporter.makeSubscriptions([ + { cmd: "REQ", nodeID: "node" }, + { cmd: "RES", nodeID: "node" } + ]); + await transporter.disconnect(); - transporter.producer.callbacks.ready(); // Trigger the `resolve` - return p; + expect(FakeKafkaAdmin.disconnect).toHaveBeenCalledTimes(1); + expect(FakeKafkaProducer.disconnect).toHaveBeenCalledTimes(1); + expect(FakeKafkaConsumer.disconnect).toHaveBeenCalledTimes(1); }); }); @@ -182,105 +213,108 @@ describe("Test KafkaTransporter makeSubscriptions", () => { let transporter; let msgHandler; - beforeEach(() => { + beforeEach(async () => { msgHandler = jest.fn(); - transporter = new KafkaTransporter("kafka://kafka-server:1234"); + transporter = new KafkaTransporter({ + client: { brokers: ["kafka://kafka-server:1234"] }, + consumer: { extraProp: 5 } + }); transporter.init( new Transit(new ServiceBroker({ logger: false, namespace: "TEST", nodeID: "node-1" })), msgHandler ); - let p = transporter.connect(); - transporter.producer.callbacks.ready(); // Trigger the `resolve` + await transporter.connect(); transporter.incomingMessage = jest.fn(); - return p; + + transporter.admin.createTopics.mockClear(); }); - it("check makeSubscriptions", () => { - transporter.producer.createTopics = jest.fn((topics, a, cb) => cb()); - transporter.makeSubscriptions([ + it("check makeSubscriptions", async () => { + FakeKafkaClient.consumer.mockClear(); + FakeKafkaConsumer.connect.mockClear(); + + await transporter.makeSubscriptions([ { cmd: "REQ", nodeID: "node" }, { cmd: "RES", nodeID: "node" } ]); - expect(transporter.producer.createTopics).toHaveBeenCalledTimes(1); - expect(transporter.producer.createTopics).toHaveBeenCalledWith( - ["MOL-TEST.REQ.node", "MOL-TEST.RES.node"], - true, - expect.any(Function) - ); + expect(transporter.admin.createTopics).toHaveBeenCalledTimes(1); + expect(transporter.admin.createTopics).toHaveBeenCalledWith({ + topics: [{ topic: "MOL-TEST.REQ.node" }, { topic: "MOL-TEST.RES.node" }] + }); - expect(Kafka.ConsumerGroup).toHaveBeenCalledTimes(1); - expect(Kafka.ConsumerGroup).toHaveBeenCalledWith( - { - encoding: "buffer", - fromOffset: "latest", - groupId: transporter.broker.instanceID, - kafkaHost: "kafka-server:1234", - id: "default-kafka-consumer" - }, - ["MOL-TEST.REQ.node", "MOL-TEST.RES.node"] - ); + expect(FakeKafkaClient.consumer).toHaveBeenCalledTimes(1); + expect(FakeKafkaClient.consumer).toHaveBeenCalledWith({ + groupId: transporter.broker.instanceID, + extraProp: 5 + }); + expect(FakeKafkaConsumer.connect).toHaveBeenCalledTimes(1); expect(transporter.consumer).toBeDefined(); - expect(transporter.consumer.on).toHaveBeenCalledTimes(3); - expect(transporter.consumer.on).toHaveBeenCalledWith("error", expect.any(Function)); - expect(transporter.consumer.on).toHaveBeenCalledWith("message", expect.any(Function)); - expect(transporter.consumer.on).toHaveBeenCalledWith("connect", expect.any(Function)); - transporter.consumer.callbacks.connect(); - - transporter.consumer.callbacks.message({ + consumerRunOpts.eachMessage({ topic: "MOL.INFO.node-2", - value: '{ ver: "3" }' + message: { + value: '{ ver: "3" }' + } }); expect(transporter.incomingMessage).toHaveBeenCalledTimes(1); expect(transporter.incomingMessage).toHaveBeenCalledWith("INFO", '{ ver: "3" }'); }); - it("check makeSubscriptions - should broadcast a producer error", () => { + it("check makeSubscriptions - should broadcast an error", async () => { transporter.broker.broadcastLocal = jest.fn(); - transporter.producer.createTopics = jest.fn((topics, a, cb) => cb(new Error("Ups!"))); - const p = transporter - .makeSubscriptions([ + const origErr = new Error("Ups"); + transporter.admin.createTopics = jest.fn(() => Promise.reject(origErr)); + + try { + await transporter.makeSubscriptions([ { cmd: "REQ", nodeID: "node" }, { cmd: "RES", nodeID: "node" } - ]) - .then(() => transporter.consumer.callbacks.connect()) - .catch(() => { - expect(transporter.broker.broadcastLocal).toHaveBeenCalledTimes(1); - expect(transporter.broker.broadcastLocal).toHaveBeenNthCalledWith( - 1, - "$transporter.error", - { - error: new Error("Ups!"), - module: "transporter", - type: C.FAILED_TOPIC_CREATION - } - ); + ]); + expect(1).toBe(2); + } catch (err) { + expect(err).toBe(origErr); + expect(transporter.producer).toBeDefined(); + + expect(transporter.broker.broadcastLocal).toHaveBeenCalledTimes(1); + expect(transporter.broker.broadcastLocal).toHaveBeenCalledWith("$transporter.error", { + error: origErr, + module: "transporter", + type: C.FAILED_TOPIC_CREATION }); + } - return p; + transporter.admin.createTopics = jest.fn(); }); it("check makeSubscriptions - should broadcast a consumer error", async () => { transporter.broker.broadcastLocal = jest.fn(); - transporter.producer.createTopics = jest.fn((topics, a, cb) => cb()); - transporter.makeSubscriptions([ - { cmd: "REQ", nodeID: "node" }, - { cmd: "RES", nodeID: "node" } - ]); + const origErr = new Error("Ups"); + FakeKafkaConsumer.run = jest.fn(() => { + throw origErr; + }); - transporter.consumer.callbacks.error(new Error("Ups!")); + try { + await transporter.makeSubscriptions([ + { cmd: "REQ", nodeID: "node" }, + { cmd: "RES", nodeID: "node" } + ]); + expect(1).toBe(2); + } catch (err) { + expect(err).toBe(origErr); + expect(transporter.broker.broadcastLocal).toHaveBeenCalledTimes(1); + expect(transporter.broker.broadcastLocal).toHaveBeenCalledWith("$transporter.error", { + error: origErr, + module: "transporter", + type: C.FAILED_CONSUMER_ERROR + }); + } - expect(transporter.broker.broadcastLocal).toHaveBeenCalledTimes(1); - expect(transporter.broker.broadcastLocal).toHaveBeenCalledWith("$transporter.error", { - error: new Error("Ups!"), - module: "transporter", - type: C.FAILED_CONSUMER_ERROR - }); + FakeKafkaConsumer.run = jest.fn(); }); }); @@ -288,39 +322,60 @@ describe("Test KafkaTransporter subscribe & publish", () => { let transporter; let msgHandler; - beforeEach(() => { + beforeEach(async () => { msgHandler = jest.fn(); - transporter = new KafkaTransporter(); + transporter = new KafkaTransporter({ + client: { brokers: ["kafka://kafka-server:1234"] }, + publish: { extraProp: 5 }, + publishMessage: { partition: 2 } + }); transporter.init( new Transit(new ServiceBroker({ logger: false, namespace: "TEST", nodeID: "node1" })), msgHandler ); transporter.serialize = jest.fn(() => Buffer.from("json data")); - let p = transporter.connect(); - transporter.producer.callbacks.ready(); // Trigger the `resolve` - return p; + await transporter.connect(); }); - it("check publish", () => { + it("check publish", async () => { transporter.producer.send.mockClear(); const packet = new P.Packet(P.PACKET_INFO, "node2", { services: {} }); - transporter.publish(packet); + await transporter.publish(packet); expect(transporter.producer.send).toHaveBeenCalledTimes(1); - expect(transporter.producer.send).toHaveBeenCalledWith( - [ - { - topic: "MOL-TEST.INFO.node2", - messages: [Buffer.from("json data")], - partition: 0, - attributes: 0 - } - ], - expect.any(Function) - ); + expect(transporter.producer.send).toHaveBeenCalledWith({ + topic: "MOL-TEST.INFO.node2", + messages: [{ value: Buffer.from("json data"), partition: 2 }], + extraProp: 5 + }); expect(transporter.serialize).toHaveBeenCalledTimes(1); expect(transporter.serialize).toHaveBeenCalledWith(packet); }); + + it("check publish - should broadcast a publisher error", async () => { + transporter.broker.broadcastLocal = jest.fn(); + + const origErr = new Error("Ups"); + FakeKafkaProducer.send = jest.fn(() => { + throw origErr; + }); + + try { + const packet = new P.Packet(P.PACKET_INFO, "node2", { services: {} }); + await transporter.publish(packet); + expect(1).toBe(2); + } catch (err) { + expect(err).toBe(origErr); + expect(transporter.broker.broadcastLocal).toHaveBeenCalledTimes(1); + expect(transporter.broker.broadcastLocal).toHaveBeenCalledWith("$transporter.error", { + error: origErr, + module: "transporter", + type: C.FAILED_PUBLISHER_ERROR + }); + } + + FakeKafkaConsumer.run = jest.fn(); + }); });