Skip to content

Commit

Permalink
feat: logging task messages
Browse files Browse the repository at this point in the history
  • Loading branch information
aaitor committed Nov 12, 2024
1 parent 6256409 commit 0515b31
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 20 deletions.
72 changes: 58 additions & 14 deletions src/api/nvm-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { io } from 'socket.io-client'
import { sleep } from '../common/helper'
import { AgentExecutionStatus } from '../common/types'
import { isEthereumAddress } from '../utils'
import { PaymentsError } from '../common/payments.error'
import { TaskLogMessage } from './query-api'

export interface BackendApiOptions {
/**
Expand Down Expand Up @@ -156,35 +158,73 @@ export class NVMBackendApi {
}
}

public async connectSocket(_callback: (err?: any) => any, opts: SubscriptionOptions) {
private async _connectInternalSocketClient() {
if (!this.hasKey)
throw new Error('Unable to subscribe to the server becase a key was not provided')

if (this.socketClient && this.socketClient.connected) {
// nvm-backend:: Already connected to the websocket server
if (this.isWebSocketConnected()) {
console.log(
`_connectInternalSocketClient:: Already connected to the websocket server with id ${this.socketClient.id}`,
)
return
}

this.socketClient = io(this.opts.webSocketHost!, this.opts.webSocketOptions)
await this.socketClient.connect()
for (let i = 0; i < 10; i++) {
if (this.isWebSocketConnected()) return
await sleep(500)
}
if (!this.isWebSocketConnected()) {
throw new Error('Unable to connect to the websocket server')
}
}

protected async connectSocketSubscriber(
_callback: (err?: any) => any,
opts: SubscriptionOptions,
) {
try {
// nvm-backend:: Connecting to websocket server: ${this.opts.webSocketHost}
console.log(
`nvm-backend:: Connecting to websocket server: ${JSON.stringify(this.opts.webSocketOptions)}`,
)
this.socketClient = io(this.opts.webSocketHost!, this.opts.webSocketOptions)
await this.socketClient.connect()
this._connectInternalSocketClient()

await this.socketClient.on('_connected', async () => {
this._subscribe(_callback, opts)
})
for (let i = 0; i < 5; i++) {
await sleep(1_000)
if (this.socketClient.connected) {
break
}
}
if (!this.socketClient.connected) {
throw new Error('Unable to connect to the websocket server')
} catch (error) {
throw new PaymentsError(
`Unable to initialize websocket client: ${this.opts.webSocketHost} - ${(error as Error).message}`,
)
}
}

protected async connectTasksSocket(_callback: (err?: any) => any, tasks: string[]) {
try {
if (tasks.length === 0) {
throw new Error('No task rooms to join in configuration')
}

console.log(
`connectTasksSocket:: Connecting to websocket server: ${JSON.stringify(this.opts.webSocketOptions)}`,
)
this._connectInternalSocketClient()

console.log(`connectTasksSocket:: Is connected? ${this.isWebSocketConnected()}`)

await this.socketClient.on('_connected', async () => {
console.log(`connectTasksSocket:: Joining tasks: ${JSON.stringify(tasks)}`)
await this.socketClient.emit('_join-tasks', JSON.stringify({ tasks }))
await this.socketClient.on('task-log', (data: any) => {
_callback(data)
})
})

console.log(`connectTasksSocket:: ending`)
} catch (error) {
throw new Error(
throw new PaymentsError(
`Unable to initialize websocket client: ${this.opts.webSocketHost} - ${(error as Error).message}`,
)
}
Expand Down Expand Up @@ -233,6 +273,10 @@ export class NVMBackendApi {
this.socketClient.emit('_emit-steps', JSON.stringify(message))
}

protected async _emitTaskLog(logMessage: TaskLogMessage) {
this.socketClient.emit('task-log', JSON.stringify(logMessage))
}

disconnect() {
this.disconnectSocket()
// nvm-backend:: Disconnected from the server
Expand Down
34 changes: 33 additions & 1 deletion src/api/query-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ export interface SearchSteps {
offset?: number
}

export interface TaskLogMessage {
level: 'info' | 'error' | 'warning' | 'debug'
task_id: string
task_status: AgentExecutionStatus
message: string
step_id?: string
}

/**
* Options required for interacting with an external AI Agent/Service.
*/
Expand Down Expand Up @@ -78,7 +86,20 @@ export class AIQueryApi extends NVMBackendApi {
_callback: (err?: any) => any,
opts: SubscriptionOptions = DefaultSubscriptionOptions,
) {
await super.connectSocket(_callback, opts)
await super.connectSocketSubscriber(_callback, opts)
}

/**
* It subscribes to receive the logs generated during the execution of a task/s
*
* @remarks
* This method is used by users/subscribers of AI agents after they create a task on them
*
* @param _callback - The callback to execute when a new task log event is received
* @param tasks - The list of tasks to subscribe to
*/
async subscribeTasksLogs(_callback: (err?: any) => any, tasks: string[]) {
await super.connectTasksSocket(_callback, tasks)
}

/**
Expand Down Expand Up @@ -334,4 +355,15 @@ export class AIQueryApi extends NVMBackendApi {
async getTasksFromAgents() {
return this.get(GET_AGENTS_ENDPOINT, { sendThroughProxy: false })
}

/**
* It emits a log message related to a task
*
* @remarks
* This method is used by the AI Agent to emit log messages
*
*/
async logTask(logMessage: TaskLogMessage) {
super._emitTaskLog(logMessage)
}
}
47 changes: 42 additions & 5 deletions tests/e2e/payments.e2e.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { TaskLogMessage } from '../../src/api/query-api'
import { sleep } from '../../src/common/helper'
import { AgentExecutionStatus, Step } from '../../src/common/types'
import { EnvironmentName } from '../../src/environments'
Expand Down Expand Up @@ -33,6 +34,7 @@ describe('Payments API (e2e)', () => {
// let proxyHost: string
let subscriberQueryOpts = {}
let balanceBefore: bigint
let createdTaskId: string
let completedTaskId: string
let completedTaskDID: string
let failedTaskId: string
Expand Down Expand Up @@ -244,6 +246,9 @@ describe('Payments API (e2e)', () => {
expect(taskResult).toBeDefined()
expect(taskResult.status).toBe(201)
console.log('Task Result', taskResult.data)
createdTaskId = taskResult.data.task.task_id
expect(createdTaskId).toBeDefined()

},
TEST_TIMEOUT,
)
Expand All @@ -258,8 +263,19 @@ describe('Payments API (e2e)', () => {
expect(steps.data.steps.length).toBeGreaterThan(0)
})

it.skip('Builder should be able to send logs', async () => {
const logMessage: TaskLogMessage = {
level: 'info',
task_status: AgentExecutionStatus.Pending,
task_id: createdTaskId,
message: 'This is a log message',
}
await paymentsBuilder.query.logTask(logMessage)
expect(true).toBeTruthy() // If the emitTaskLog does not throw an error, it is working
})

it(
'I should be able to subscribe and process pending tasks',
'Builder should be able to subscribe and process pending tasks',
async () => {
let stepsReceived = 0
const opts = {
Expand Down Expand Up @@ -337,7 +353,28 @@ describe('Payments API (e2e)', () => {
TEST_TIMEOUT,
)

it('I should be able to validate an AI task is completed', async () => {
it('Subscriber should be able to receive logs', async () => {
let logsReceived = 0
await paymentsSubscriber.query.subscribeTasksLogs(async (data) => {
console.log('Task Log received', data)
logsReceived++
}, [completedTaskId])
await sleep(5_000)

const logMessage: TaskLogMessage = {
level: 'info',
task_status: AgentExecutionStatus.Completed,
task_id: completedTaskId,
message: 'This is a log message',
}
await paymentsBuilder.query.logTask(logMessage)

await sleep(2_000)
expect(logsReceived).toBeGreaterThan(0)

}, TEST_TIMEOUT)

it.skip('Subscriber should be able to validate an AI task is completed', async () => {
console.log(`@@@@@ getting task with steps: ${completedTaskDID} - ${completedTaskId}`)
console.log(JSON.stringify(subscriberQueryOpts))
const result = await paymentsSubscriber.query.getTaskWithSteps(
Expand All @@ -353,7 +390,7 @@ describe('Payments API (e2e)', () => {
expect(taskCost).toBeGreaterThan(0)
})

it('I should be able to check that I consumed some credits', async () => {
it.skip('Subscriber should be able to check that I consumed some credits', async () => {
const balanceResult = await paymentsSubscriber.getPlanBalance(planDID)
expect(balanceResult).toBeDefined()
const balanceAfter = BigInt(balanceResult.balance)
Expand All @@ -367,10 +404,10 @@ describe('Payments API (e2e)', () => {
expect(balanceAfter).toBeLessThan(balanceBefore)
})

it.skip('I should be able to end a task with a failure', () => {})
// it.skip('I should be able to end a task with a failure', () => {})
})

describe('Failed tasks are free of charge', () => {
describe.skip('Failed tasks are free of charge', () => {
it(
'I should be able to create a wrong AI Task',
async () => {
Expand Down

0 comments on commit 0515b31

Please sign in to comment.