diff --git a/docs/pages/function-lifecycle-events.md b/docs/pages/function-lifecycle-events.md index d47f8f87b..7e4a1bd25 100644 --- a/docs/pages/function-lifecycle-events.md +++ b/docs/pages/function-lifecycle-events.md @@ -22,12 +22,14 @@ Functions take a single argument, which is the event received from a broker or a |channel|The name of the channel/topic from which the event was read. |serverName|The name of the server/broker from which the event was received. -Functions may return an object to tell Glee what to do next. For instance, the following example greets the user back: +Functions may return an object to tell Glee what to do next. For instance, the following example sends a greeting message to `development` server: ```js /* onHello.js */ export default async function (event) { return { - reply: [{ + send: [{ + server: 'developement', + channel: 'greets', payload: 'Greetings! How is your day going?' }] } @@ -37,7 +39,7 @@ export default async function (event) { |Attribute|Type|Description| |---|---|---| |send|array<[OutboundMessage](#anatomy-of-an-outbound-message)>|A list of outbound messages to send when the processing of the inbound event has finished. All clients subscribed to the given channel/topic will receive the message. -|reply|array<[OutboundMessage](#anatomy-of-an-outbound-message)>|A list of outbound messages to send as a reply when the processing of the inbound event has finished. This is useful when the target of your message is the sender of the inbound event. Note, however, that this only works when you're running Glee as a server. For example, using `reply` when receiving a WebSocket message is fine and the reply will exclusively go to the client that sent the message. However, if you're receiving a message from an MQTT broker, `reply` will work exactly the same way as `send` above, and will send the message to all the clients subscribed to the given channel/topic. + ##### Anatomy of an outbound message |Attribute|Type|Description| |---|---|---| diff --git a/docs/pages/glee-template.md b/docs/pages/glee-template.md index ec0db7a50..8fea93cfc 100644 --- a/docs/pages/glee-template.md +++ b/docs/pages/glee-template.md @@ -24,9 +24,6 @@ operations: action: receive channel: $ref: '#/channels/hello' - reply: - channel: - $ref: "#/channels/hello" SendHello: action: send channel: @@ -73,9 +70,6 @@ operations: action: receive channel: $ref: '#/channels/hello' - reply: - channel: - $ref: "#/channels/hello" sendHello: action: send channel: @@ -84,8 +78,8 @@ operations: The channels section defines the communication channels available in the API. In this case, there's a channel named "hello". This channel supports both sending and receiving. -The `receive` action indicates that messages received on the `hello` channel should follow the structure defined in the hello message component. Under this action, `reply` which is in a request-reply operation, contains the payload on `onHello.js` function. -The `send` action specifies that the operation with ID `sendHello` is used for sending messages to the `hello` channel. The message structure is referenced from the hello message component. +The `receive` action indicates that messages received on the `hello` channel should follow the structure defined in the hello message component. +The `send` action specifies that the operation with ID `sendHello` is used for sending messages to the `hello` channel. The message structure is referenced from the hello message component. The message payload is going to be validated against the `sendHello` operation message schemas. Next is the `payload` property under `hello` message component which is used to understand how the event should look like when publishing to that channel: diff --git a/docs/pages/index.md b/docs/pages/index.md index ef5638eec..bc1b1d540 100644 --- a/docs/pages/index.md +++ b/docs/pages/index.md @@ -82,9 +82,6 @@ operations: action: receive channel: $ref: '#/channels/greet' - reply: - channel: - $ref: '#/channels/greet' sendGreet: action: send channel: @@ -127,8 +124,10 @@ export default async function (event) { response = `Good Evening ${name}` } return { - reply: [ + send: [ { + server: "websockets", + channel: "greet" payload: response, }, ], diff --git a/docs/pages/installation.md b/docs/pages/installation.md index b3d587e39..e06de1bf7 100644 --- a/docs/pages/installation.md +++ b/docs/pages/installation.md @@ -102,9 +102,6 @@ operations: action: receive channel: $ref: '#/channels/hello' - reply: - channel: - $ref: "#/channels/hello" SendHello: action: send channel: @@ -121,7 +118,9 @@ Create an operation function `onHello.js` inside `myapp/functions`: ```js export default async function (event) { return { - reply: [{ + send: [{ + server: "websockets", + channel: "hello", payload: `Hello from Glee! You said: "${event.payload}".` }] } diff --git a/examples/dummy/README.md b/examples/dummy/README.md index 33d864865..4f32e5801 100644 --- a/examples/dummy/README.md +++ b/examples/dummy/README.md @@ -1,3 +1,51 @@ -# Dummy +## Introduction -This is a dummy example mainly used to test functionalities. \ No newline at end of file +This project is an example websocket-based application that simulates email sending and forwards messages to a Mosquitto server. It's a great example of real-time data handling and integration with MQTT protocol. + +## Getting Started + +### Installation + +To set up the project, follow these simple steps: + +1. Clone the Glee repository to your local machine. +2. Navigate to the project directory (examples/dummy). +3. Run the following command to install all the required dependencies: + + ```bash + npm i + ``` + +### Running the Project + +After installing the dependencies, you can start the project by running: + +```bash +npm run dev +``` + +This will start the development server on `localhost:3005`. + +## Making a WebSocket Connection + +To interact with the WebSocket server, you can use a WebSocket client like [websocat](https://github.com/vi/websocat). Here's how you can connect and send a message: + +1. Open a terminal and connect to the WebSocket server using the following command: + + ```bash + websocat ws://localhost:3005/user/signedup + ``` + +2. Once connected, you can send a message in JSON format. For example: + + ```json + {"displayName": "John Doe", "email": "ffdd@ff.com"} + ``` + +## Behind the Scenes + +When you send a message: + +- The `receiveUserSignedUp` function is triggered. +- The application simulates sending an email to the provided email address. +- It then forwards the message to the Mosquitto server at `test.mosquitto.org`. \ No newline at end of file diff --git a/examples/dummy/asyncapi.yaml b/examples/dummy/asyncapi.yaml index 4f55a4ea6..35e78fbf9 100644 --- a/examples/dummy/asyncapi.yaml +++ b/examples/dummy/asyncapi.yaml @@ -17,15 +17,12 @@ servers: bindingVersion: 0.2.0 websockets: host: 'localhost:3005' - pathname: /ws protocol: ws channels: - user/signedup: - address: user/signedup + userSignedUp: + address: /user/signedup messages: - onUserSignedUp.message: - $ref: '#/components/messages/UserSignedUp' - subscribe.message: + UserSignedUp: $ref: '#/components/messages/UserSignedUp' bindings: ws: @@ -43,35 +40,35 @@ channels: my-custom-header: type: string const: custom value - server/announce: + serverAnnounce: address: server/announce messages: - subscribe.message: + ServerAnnounce: $ref: '#/components/messages/ServerAnnounce' operations: - onUserSignedUp: + recieveUserSignedUp: action: receive channel: - $ref: '#/channels/user~1signedup' + $ref: '#/channels/userSignedUp' bindings: mqtt: qos: 2 retain: true bindingVersion: 0.2.0 messages: - - $ref: '#/components/messages/UserSignedUp' - user/signedup.subscribe: + - $ref: '#/channels/userSignedUp/messages/UserSignedUp' + sendSignedUpUser: action: send channel: - $ref: '#/channels/user~1signedup' + $ref: '#/channels/userSignedUp' messages: - - $ref: '#/components/messages/UserSignedUp' - server/announce.subscribe: + - $ref: '#/channels/userSignedUp/messages/UserSignedUp' + sendServerAnnounce: action: send channel: - $ref: '#/channels/server~1announce' + $ref: '#/channels/serverAnnounce' messages: - - $ref: '#/components/messages/ServerAnnounce' + - $ref: '#/channels/serverAnnounce/messages/ServerAnnounce' components: securitySchemes: userAndPassword: diff --git a/examples/dummy/functions/onUserSignedUp.ts b/examples/dummy/functions/recieveUserSignedUp.ts similarity index 71% rename from examples/dummy/functions/onUserSignedUp.ts rename to examples/dummy/functions/recieveUserSignedUp.ts index 867040cf6..63506c26c 100644 --- a/examples/dummy/functions/onUserSignedUp.ts +++ b/examples/dummy/functions/recieveUserSignedUp.ts @@ -1,9 +1,12 @@ export default async function (event) { const user: any = event.payload console.log(`${user.displayName} has recently signed up. Sending an email to ${user.email}.`) + // Send an email to the user here + return { send: [{ - server: 'websockets', + server: 'mosquitto', + channel: 'userSignedUp', payload: event.payload, }] } diff --git a/examples/dummy/glee.config.js b/examples/dummy/glee.config.js index 41ee8ad12..f0e8ab180 100644 --- a/examples/dummy/glee.config.js +++ b/examples/dummy/glee.config.js @@ -3,12 +3,6 @@ import fs from 'fs' export default async function () { return { - glee: { - logs: { - incoming: 'channel-only', - outgoing: 'channel-only' - } - }, docs: { enabled: false }, @@ -21,14 +15,5 @@ export default async function () { } } } - // websocket: { - // httpServer: customServer, - // adapter: 'native', // Default. Can also be 'socket.io' or a reference to a custom adapter. - // }, - // cluster: { - // adapter: 'redis', - // name: 'gleeCluster', - // url: 'redis://localhost:6379' - // } } } diff --git a/examples/dummy/lifecycle/announceServer.ts b/examples/dummy/lifecycle/announceServer.ts index d758fcac3..8b7ad8d35 100644 --- a/examples/dummy/lifecycle/announceServer.ts +++ b/examples/dummy/lifecycle/announceServer.ts @@ -2,7 +2,7 @@ export default async function () { return { send: [{ server: 'mosquitto', - channel: 'server/announce', + channel: 'serverAnnounce', payload: { id: process.env.SERVER_ID || String(Date.now()), } diff --git a/examples/dummy/package-lock.json b/examples/dummy/package-lock.json index 3d19035ec..20656b4e6 100644 --- a/examples/dummy/package-lock.json +++ b/examples/dummy/package-lock.json @@ -14,14 +14,14 @@ }, "../..": { "name": "@asyncapi/glee", - "version": "0.30.0", + "version": "0.33.4", "license": "Apache-2.0", "dependencies": { - "@asyncapi/generator": "^1.15.0", - "@asyncapi/html-template": "^1.0.0", - "@asyncapi/markdown-template": "^1.4.0", - "@asyncapi/parser": "^3.0.0-next-major-spec.12", - "@types/jest": "^27.4.0", + "@asyncapi/generator": "^1.16.0", + "@asyncapi/html-template": "^2.0.0", + "@asyncapi/markdown-template": "^1.5.0", + "@asyncapi/parser": "^3.0.2", + "@types/jest": "^29.5.11", "@types/qs": "^6.9.7", "ajv": "^6.12.6", "async": "^3.2.0", @@ -65,15 +65,15 @@ "@typescript-eslint/parser": "^5.9.0", "all-contributors-cli": "^6.14.2", "eslint": "^8.6.0", - "eslint-plugin-jest": "^23.8.2", + "eslint-plugin-jest": "^27.6.0", "eslint-plugin-sonarjs": "^0.19.0", "fs-extra": "^10.1.0", - "jest": "^27.4.7", - "jest-extended": "^1.2.0", + "jest": "^29.7.0", + "jest-extended": "^4.0.2", "jsdoc-to-markdown": "^5.0.3", "markdown-toc": "^1.2.0", "rimraf": "^3.0.2", - "ts-jest": "^27.1.2", + "ts-jest": "^29.1.1", "tsc-watch": "^4.5.0", "typedoc": "^0.23.28", "typedoc-plugin-markdown": "^3.11.8", @@ -13082,14 +13082,14 @@ "@asyncapi/glee": { "version": "file:../..", "requires": { - "@asyncapi/generator": "^1.15.0", - "@asyncapi/html-template": "^1.0.0", - "@asyncapi/markdown-template": "^1.4.0", - "@asyncapi/parser": "^3.0.0-next-major-spec.12", + "@asyncapi/generator": "^1.16.0", + "@asyncapi/html-template": "^2.0.0", + "@asyncapi/markdown-template": "^1.5.0", + "@asyncapi/parser": "^3.0.2", "@tsconfig/node14": "^1.0.1", "@types/async": "^3.2.11", "@types/debug": "^4.1.7", - "@types/jest": "^27.4.0", + "@types/jest": "^29.5.11", "@types/qs": "^6.9.7", "@types/socket.io": "^3.0.2", "@types/uri-templates": "^0.1.31", @@ -13109,13 +13109,13 @@ "emojis": "^1.0.10", "eslint": "^8.6.0", "eslint-plugin-github": "^4.3.5", - "eslint-plugin-jest": "^23.8.2", + "eslint-plugin-jest": "^27.6.0", "eslint-plugin-security": "^1.4.0", "eslint-plugin-sonarjs": "^0.19.0", "fs-extra": "^10.1.0", "got": "^12.5.3", - "jest": "^27.4.7", - "jest-extended": "^1.2.0", + "jest": "^29.7.0", + "jest-extended": "^4.0.2", "jsdoc-to-markdown": "^5.0.3", "kafkajs": "^2.2.3", "markdown-toc": "^1.2.0", @@ -13126,7 +13126,7 @@ "rimraf": "^3.0.2", "socket.io": "^4.1.2", "terminal-image": "^2.0.0", - "ts-jest": "^27.1.2", + "ts-jest": "^29.1.1", "tsc-watch": "^4.5.0", "typedoc": "^0.23.28", "typedoc-plugin-markdown": "^3.11.8", diff --git a/examples/slack-reaction-listener/asyncapi.yaml b/examples/slack-reaction-listener/asyncapi.yaml index 0ea95d372..377af8eb9 100644 --- a/examples/slack-reaction-listener/asyncapi.yaml +++ b/examples/slack-reaction-listener/asyncapi.yaml @@ -84,15 +84,7 @@ operations: $ref: "#/channels/OpenAICompletion" messages: - $ref: "#/channels/OpenAICompletion/messages/OpenAICompletionResponse" - reply: - channel: - $ref: "#/channels/SlackPostMessage" HandleSlackReaction: - reply: - channel: - $ref: "#/channels/SlackEventStream" - messages: - - $ref: "#/channels/SlackEventStream/messages/slackAckEvent" action: receive channel: $ref: "#/channels/SlackEventStream" diff --git a/examples/slack-reaction-listener/package-lock.json b/examples/slack-reaction-listener/package-lock.json index 21d8665f2..20656b4e6 100644 --- a/examples/slack-reaction-listener/package-lock.json +++ b/examples/slack-reaction-listener/package-lock.json @@ -14,10 +14,10 @@ }, "../..": { "name": "@asyncapi/glee", - "version": "0.32.18", + "version": "0.33.4", "license": "Apache-2.0", "dependencies": { - "@asyncapi/generator": "^1.15.9", + "@asyncapi/generator": "^1.16.0", "@asyncapi/html-template": "^2.0.0", "@asyncapi/markdown-template": "^1.5.0", "@asyncapi/parser": "^3.0.2", @@ -13082,7 +13082,7 @@ "@asyncapi/glee": { "version": "file:../..", "requires": { - "@asyncapi/generator": "^1.15.9", + "@asyncapi/generator": "^1.16.0", "@asyncapi/html-template": "^2.0.0", "@asyncapi/markdown-template": "^1.5.0", "@asyncapi/parser": "^3.0.2", diff --git a/src/adapters/mqtt/index.ts b/src/adapters/mqtt/index.ts index 2a3c0b8e1..6c0c95a48 100644 --- a/src/adapters/mqtt/index.ts +++ b/src/adapters/mqtt/index.ts @@ -66,8 +66,7 @@ class MqttAdapter extends Adapter { this.emit( 'error', new Error( - `Invalid security type '${securityType}' specified for server '${ - this.serverName + `Invalid security type '${securityType}' specified for server '${this.serverName }'. Please double-check your configuration to ensure you're using a supported security type. Here is a list of supported types: ${Object.values( SecurityTypes )}` @@ -152,7 +151,12 @@ class MqttAdapter extends Adapter { private subscribe(channels: string[]) { channels.forEach((channel) => { const asyncAPIChannel = this.parsedAsyncAPI.channels().get(channel) - const binding = asyncAPIChannel.bindings().get('mqtt')?.value() + const receiveOperations = asyncAPIChannel.operations().filterByReceive() + if (receiveOperations.length > 1) { + this.emit('error', new Error(`Channel ${channel} has more than one receive operation. Please make sure you have only one.`)) + return + } + const binding = asyncAPIChannel.operations().filterByReceive()[0].bindings().get('mqtt')?.value() const topic = asyncAPIChannel.address() this.client.subscribe( topic, @@ -173,13 +177,15 @@ class MqttAdapter extends Adapter { console.log(err.message) return } - logLineWithIcon( - ':zap:', - `Subscribed to \`${topic}\` topic with QoS ${granted?.[0].qos}`, - { - highlightedWords: [topic], - } - ) + granted.forEach(({ topic, qos }) => { + logLineWithIcon( + ':zap:', + `Subscribed to \`${topic}\` topic with QoS ${qos}`, + { + highlightedWords: [topic, qos.toString()], + } + ) + }) } ) }) diff --git a/src/index.ts b/src/index.ts index 216b33407..57ac5b413 100755 --- a/src/index.ts +++ b/src/index.ts @@ -33,7 +33,6 @@ import { getSelectedServerNames } from './lib/servers.js' import { EnrichedEvent, AuthEvent } from './lib/adapter.js' import { ClusterEvent } from './lib/cluster.js' import { getMessagesSchema } from './lib/util.js' -import { ChannelInterface, OperationReplyInterface } from '@asyncapi/parser' dotenvExpand(dotenv.config()) @@ -101,9 +100,9 @@ export default async function GleeAppInitializer() { await generateDocs(config) parsedAsyncAPI.operations().filterByReceive().forEach(operation => { const channel = operation.channels()[0] // operation can have only one channel. - const reply = operation.reply() - setUpReplyMiddlewares(reply, app) - + if (operation.reply()) { + logWarningMessage(`Operation ${operation.id()} has a reply defined. Glee does not support replies yet.`) + } const schema = getMessagesSchema(operation) if (schema.oneOf.length > 0) app.use(channel.id(), validate(schema)) app.use(channel.id(), (event, next) => { @@ -117,6 +116,9 @@ export default async function GleeAppInitializer() { parsedAsyncAPI.operations().filterBySend().forEach(operation => { const channel = operation.channels()[0] // operation can have only one channel. + if (operation.reply()) { + logWarningMessage(`Operation ${operation.id()} has a reply defined. Glee does not support replies yet.`) + } const schema = getMessagesSchema(operation) if (schema.oneOf.length > 0) app.useOutbound(channel.id(), validate(schema)) app.useOutbound(channel.id(), json2string) @@ -228,20 +230,4 @@ export default async function GleeAppInitializer() { }) app.listen().catch(console.error) -} - - -export function setUpReplyMiddlewares(reply: OperationReplyInterface, app: Glee) { - const channel = reply?.channel() - if (!channel) return - const hasSendOperation = channel.operations().filterBySend().length > 0 - if (hasSendOperation) { - logWarningMessage(`Warning: Channel '${channel.id()}' is configured with both reply and send operations. The payload for the reply will be validated against the send operation's schema. and the binding of the send operation is going to be used for this reply. To avoid potential conflicts and streamline message processing, consider using only the send operation in your Glee function. Remove the reply operation if it's not required for your use case.`) - return - } - const replyMessagesSchemas = getMessagesSchema(reply) - if (replyMessagesSchemas.oneOf.length > 0) { - app.useOutbound(channel.id(), validate(replyMessagesSchemas)) - } - app.useOutbound(channel.id(), json2string) } \ No newline at end of file diff --git a/src/lib/adapter.ts b/src/lib/adapter.ts index 7c33a3443..3c0a90dfd 100644 --- a/src/lib/adapter.ts +++ b/src/lib/adapter.ts @@ -237,7 +237,7 @@ class GleeAdapter extends EventEmitter { getSubscribedChannels(): string[] { return this._channelNames.filter((channelName) => { const channel = this._parsedAsyncAPI.channels().get(channelName) - if (channel.operations().filterBySend().length == 0) return true + if (channel.operations().filterByReceive().length > 0) return true const channelServers = channel.servers() ? channel.servers() diff --git a/src/lib/functions.ts b/src/lib/functions.ts index 1c89776bc..6842fb88e 100644 --- a/src/lib/functions.ts +++ b/src/lib/functions.ts @@ -4,19 +4,18 @@ import walkdir from 'walkdir' import { getConfigs } from './configs.js' import { logWarningMessage, logError } from './logger.js' import GleeMessage from './message.js' -import { GleeFunction, GleeFunctionReturnReply } from './index.js' +import { GleeFunction } from './index.js' import Glee from './glee.js' import { gleeMessageToFunctionEvent, validateData, isRemoteServer, - extractExpressionValueFromMessage, } from './util.js' import { pathToFileURL } from 'url' import GleeError from '../errors/glee-error.js' import { getParsedAsyncAPI } from './asyncapiFile.js' import Debug from 'debug' -import { AsyncAPIDocumentInterface, OperationInterface } from '@asyncapi/parser' +import { OperationInterface } from '@asyncapi/parser' const debug = Debug('glee:functions') interface FunctionInfo { @@ -168,22 +167,6 @@ export async function trigger({ })) }) }) - - functionResult?.reply?.forEach((reply) => { - const replyMessage = createReplies(reply, message, parsedAsyncAPI) - if (!replyMessage) { - return - } - - const replyChannel = parsedAsyncAPI.channels().get(replyMessage.channel) - replyChannel.servers().forEach((server) => { - replyMessage.serverName = server.id() - app.send( - replyMessage - ) - }) - - }) } catch (err) { if (err.code === 'ERR_MODULE_NOT_FOUND') { const functionsPath = relative(GLEE_DIR, GLEE_FUNCTIONS_DIR) @@ -197,30 +180,4 @@ export async function trigger({ return } } -} - -function createReplies(functionReply: GleeFunctionReturnReply, message: GleeMessage, parsedAsyncAPI: AsyncAPIDocumentInterface): GleeMessage { - const operation = message.operation - const reply = operation.reply() - if (!reply) { - const warningMsg = `Operation ${operation.id()} doesn't have a reply field. the return result from your function will be ignored.` - logWarningMessage(warningMsg) - return - } - - let replyChannel = parsedAsyncAPI.channels().all().filter((c) => c.address() === reply.channel().address())[0] - const replyAddress = reply.address() - if (replyAddress) { - const channelAddress = extractExpressionValueFromMessage(this, replyAddress.location()) - if (!channelAddress) { - throw Error(`cannot parse the ${replyAddress.location()} from your message.`) - } - const channel = parsedAsyncAPI.allChannels().filter((c) => c.address === channelAddress)[0] - if (!channel) { - throw Error(`cannot find a channel with the address of "${channelAddress}" in your AsyncAPI file.`) - } - replyChannel = channel - } - - return new GleeMessage({ ...functionReply, channel: replyChannel.id(), request: message, operation, connection: message.connection }) } \ No newline at end of file diff --git a/src/lib/wsHttpAuth.ts b/src/lib/wsHttpAuth.ts index 7544a3669..78891f47c 100644 --- a/src/lib/wsHttpAuth.ts +++ b/src/lib/wsHttpAuth.ts @@ -34,7 +34,7 @@ class GleeAuth extends EventEmitter { const authKeys = Object.keys(this.auth) authKeys.forEach(authKey => { const allowed = securitySchemeID.includes(authKey) - if(!allowed) { + if (!allowed) { const err = new Error(`${authKey} securityScheme is not defined is your asyncapi.yaml config`) this.emit('error', err) } @@ -80,6 +80,7 @@ class GleeAuth extends EventEmitter { if (query) { Object.keys(query).forEach(k => { + // eslint-disable-next-line security/detect-object-injection url.searchParams.set(k, query[k]) }) } @@ -105,7 +106,7 @@ class GleeAuth extends EventEmitter { } private httpApiKeyLogic(scheme, headers, query, authKey) { - + const loc = scheme.in() if (loc == 'header') { headers[scheme.name()] = this.auth[String(authKey)]