diff --git a/package.json b/package.json index 70be526..c2cfbfc 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@nevermined-io/payments", - "version": "0.7.2", + "version": "0.7.3", "description": "Typescript SDK to interact with the Nevermined Payments Protocol", "main": "./dist/index.js", "types": "./dist/index.d.ts", @@ -39,7 +39,7 @@ "eslint-config-prettier": "^9.1.0", "eslint-plugin-tsdoc": "^0.2.17", "jest": "^29.7.0", - "ts-jest": "^29.2.5", + "ts-jest": "^29.2.5", "@types/jest": "^29.5.13", "@types/ws": "^8.0.3", "prettier": "^3.2.5", diff --git a/src/api/nvm-backend.ts b/src/api/nvm-backend.ts index 4fa271e..48b3d0f 100644 --- a/src/api/nvm-backend.ts +++ b/src/api/nvm-backend.ts @@ -2,7 +2,7 @@ import axios from 'axios' import { decodeJwt } from 'jose' import { io } from 'socket.io-client' import { sleep } from '../common/helper' -import { AgentExecutionStatus, TaskLogMessage } from '../common/types' +import { AgentExecutionStatus, TaskLogMessage, TaskCallback } from '../common/types' import { isEthereumAddress } from '../utils' import { PaymentsError } from '../common/payments.error' @@ -91,6 +91,7 @@ export class NVMBackendApi { private opts: BackendApiOptions private socketClient: any private userRoomId: string | undefined = undefined + private taskCallbacks: Map = new Map() private hasKey = false private _defaultSocketOptions: BackendWebSocketOptions = { // path: '', @@ -150,6 +151,8 @@ export class NVMBackendApi { } catch (error) { throw new Error(`Invalid URL: ${this.opts.backendHost} - ${(error as Error).message}`) } + + this.taskCallbacks = new Map() } private async _connectInternalSocketClient() { @@ -204,13 +207,14 @@ export class NVMBackendApi { // `connectTasksSocket:: Is connected? ${this.isWebSocketConnected()}` - await this.socketClient.on('_connected', async () => { - // `connectTasksSocket:: Joining tasks: ${JSON.stringify(tasks)}` - await this.socketClient.emit('_join-tasks', JSON.stringify({ tasks, history })) - await this.socketClient.on('task-log', (data: any) => { - _callback(data) - }) + // `connectTasksSocket:: Joining tasks: ${JSON.stringify(tasks)}` + + tasks.forEach((task) => { + this.taskCallbacks.set(task, _callback) }) + + await this.socketClient.emit('_join-tasks', JSON.stringify({ tasks, history })) + await this.socketClient.on('task-log', this.handleTaskLog.bind(this)) } catch (error) { throw new PaymentsError( `Unable to initialize websocket client: ${this.opts.webSocketHost} - ${(error as Error).message}`, @@ -218,6 +222,39 @@ export class NVMBackendApi { } } + /** + * Handles the 'task-log' event from the websocket. + * Parses the incoming data, retrieves the corresponding callback, + * executes it, and removes the callback if the task is completed or failed. + * + * @param data - The data received from the websocket event. + */ + private handleTaskLog(data: any): void { + const parsedData = JSON.parse(data) + const { task_id: taskId } = parsedData + const callback = this.taskCallbacks.get(taskId) + if (callback) { + // Execute the stored callback + callback(data) + if (['Completed', 'Failed'].includes(parsedData.task_status)) { + // Remove the callback from the map once the task is completed + this.removeTaskCallback(taskId) + } + } + } + + /** + * Removes the callback associated with the given task ID. + * Logs the removal of the callback. + * + * @param taskId - The ID of the task whose callback is to be removed. + */ + private removeTaskCallback(taskId: string) { + if (this.taskCallbacks.has(taskId)) { + this.taskCallbacks.delete(taskId) + } + } + private disconnectSocket() { if (this.isWebSocketConnected()) { this.socketClient.disconnect() diff --git a/src/common/types.ts b/src/common/types.ts index 0a04cdf..e17c521 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -192,6 +192,8 @@ export interface TaskLogMessage { step_id?: string } +export type TaskCallback = (data: any) => void + export interface CreateTaskDto { /** * The query parameter for the task