Skip to content

Commit

Permalink
feat: support join rooms on reconnect (#79)
Browse files Browse the repository at this point in the history
* feat: support join rooms on reconnect

* fix: format:

* fix: send step as step type

* fix: start listen event before

* fix: format

* feat: bumpversion
  • Loading branch information
eruizgar91 authored Oct 29, 2024
1 parent 515d634 commit e770c53
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 239 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nevermined-io/payments",
"version": "0.6.0",
"version": "0.6.1",
"description": "Typescript SDK to interact with the Nevermined Payments Protocol",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
Expand Down
23 changes: 10 additions & 13 deletions src/api/nvm-backend.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import axios from 'axios'
import { io } from 'socket.io-client'
import { decodeJwt } from 'jose'
import { io } from 'socket.io-client'
import { sleep } from '../common/helper'
import { AgentExecutionStatus } from '../common/types'
import { isEthereumAddress } from '../utils'
import { sleep } from '../common/helper'

export interface BackendApiOptions {
/**
Expand Down Expand Up @@ -157,7 +157,7 @@ export class NVMBackendApi {
}
}

private async connectSocket() {
public async connectSocket(_callback: (err?: any) => any, opts: SubscriptionOptions) {
if (!this.hasKey)
throw new Error('Unable to subscribe to the server becase a key was not provided')

Expand All @@ -169,6 +169,9 @@ export class NVMBackendApi {
// nvm-backend:: Connecting to websocket server: ${this.opts.webSocketHost}
this.socketClient = io(this.opts.webSocketHost!, this.opts.webSocketOptions)
await this.socketClient.connect()
await this.socketClient.on('_connected', async () => {
this._subscribe(_callback, opts)
})
for (let i = 0; i < 5; i++) {
await sleep(1_000)
if (this.socketClient.connected) {
Expand Down Expand Up @@ -200,22 +203,17 @@ export class NVMBackendApi {
if (!opts.joinAccountRoom && opts.joinAgentRooms.length === 0) {
throw new Error('No rooms to join in configuration')
}
await this.connectSocket()
// await this.socketClient.emit('subscribe-agent', '')
await this.socketClient.on('connect', async () => {
// nvm-backend:: On:: ${this.socketClient.id} Connected to the server
})

await this.socketClient.emit('_join-rooms', JSON.stringify(opts))

// await this.socketClient.on('task-updated', (data: any) => {
// console.log(`RECEIVED TASK data: ${JSON.stringify(data)}`)
// _callback(data)
// })
opts.subscribeEventTypes.forEach(async (eventType) => {
await this.socketClient.on(eventType, (data: any) => {
_callback(data)
})
})
if (opts.getPendingEventsOnSubscribe) {
await this._emitStepEvents(AgentExecutionStatus.Pending, opts.joinAgentRooms)
}
}

private async eventHandler(data: any, _callback: (err?: any) => any, _opts: SubscriptionOptions) {
Expand All @@ -226,7 +224,6 @@ export class NVMBackendApi {
status: AgentExecutionStatus = AgentExecutionStatus.Pending,
dids: string[] = [],
) {
await this.connectSocket()
const message = {
status,
dids,
Expand Down
12 changes: 1 addition & 11 deletions src/api/query-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,7 @@ export class AIQueryApi extends NVMBackendApi {
_callback: (err?: any) => any,
opts: SubscriptionOptions = DefaultSubscriptionOptions,
) {
await super._subscribe(_callback, opts).then(() => {
// query-api:: Subscribed to server
})
try {
if (opts.getPendingEventsOnSubscribe) {
// query-api:: Emitting pending events
await super._emitStepEvents(AgentExecutionStatus.Pending, opts.joinAgentRooms)
}
} catch {
// query-api:: Unable to get pending events
}
await super.connectSocket(_callback, opts)
}

/**
Expand Down
Loading

0 comments on commit e770c53

Please sign in to comment.