Skip to content
This repository has been archived by the owner on Oct 21, 2024. It is now read-only.

Commit

Permalink
feat: txs processor
Browse files Browse the repository at this point in the history
  • Loading branch information
aaitor committed Jun 26, 2024
1 parent 5e39b5d commit e2eb9e0
Show file tree
Hide file tree
Showing 33 changed files with 418 additions and 111 deletions.
15 changes: 9 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@
"prebuild": "rimraf dist",
"build": "nest build",
"format": "prettier --write \"src/**/*.ts\" \"test/**/*.ts\"",
"start": "nest start",
"start:dev": "nest start --watch",
"start:debug": "nest start --debug --watch",
"start:prod": "node dist/main",
"start:api": "nest start",
"start:api:dev": "nest start --watch",
"start:api:debug": "nest start --debug --watch",
"start:api:prod": "node dist/main",
"start:proc": "nest start --entryFile processor",
"start:proc:dev": "nest start --entryFile processor --watch",
"start:proc:debug": "nest start --entryFile processor --debug --watch",
"start:proc:prod": "node dist/processor",
"lint": "eslint \"{src,apps,libs,test}/**/*.ts\" --fix",
"test": "jest",
"test:watch": "jest --watch",
Expand Down Expand Up @@ -49,7 +53,6 @@
"@types/express": "^4.17.21",
"@types/jest": "^29.5.12",
"@types/node": "^16.0.0",
"@types/supertest": "^6.0.2",
"@typescript-eslint/eslint-plugin": "^7.13.1",
"@typescript-eslint/parser": "^7.13.1",
"eslint": "^8.56.0",
Expand All @@ -58,7 +61,7 @@
"jest": "^29.7.0",
"prettier": "^3.3.2",
"source-map-support": "^0.5.20",
"supertest": "^7.0.0",
"node-fetch": "^2.6.7",
"ts-jest": "^29.1.5",
"ts-node": "^10.9.2",
"tsconfig-paths": "^3.10.1",
Expand Down
88 changes: 88 additions & 0 deletions src/api/agent/agent.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { DataSource } from 'typeorm'
import { testConfig } from '../../configuration.tests'
import { ConfigService } from '@nestjs/config'
import { CreateTaskDto } from './dto/create-task-dto'
import fetch from 'node-fetch'

describe('AppController', () => {
// let appController: AgentController
// beforeEach(async () => {
// const appModule: TestingModule = await Test.createTestingModule({
// // imports: [TypeOrmModule.forFeature([TaskEntity, StepEntity])],
// imports: [],
// controllers: [AgentController],
// providers: [
// { provide: AgentService, useValue: undefined },
// { provide: getRepositoryToken(TaskEntity), useClass: TaskEntity },
// { provide: getRepositoryToken(StepEntity), useClass: StepEntity }
// ]
// }).compile()
// await appModule.init()
// // appController = app.get<AgentController>(AgentController)
// })
let dataSource: DataSource
let configService: ConfigService
const TASKS_ENDPOINT = '/api/v1/agent/tasks'

beforeAll(async () => {
configService = new ConfigService(testConfig)

console.log('Connecting to database')
//@ts-expect-error-next-line
dataSource = new DataSource({
type: configService.get('database.type'),
host: configService.get('database.host'),
port: configService.get('database.port'),
username: configService.get('database.username'),
password: configService.get<string>('database.password'),
database: configService.get('database.name'),
logging: ['query', 'log', 'error', 'warn', 'info'],
entities: [],
synchronize: true
// logging: true
})
await dataSource.initialize()
console.log('Conection to database established')
})

afterAll(async () => {
try {
if (dataSource.isInitialized) {
await dataSource.query('DELETE FROM steps ')
console.log('Closing database connection ..')
dataSource.destroy()
}
} catch (error) {
console.warn('Error deleting content from db')
}
})

describe('root', () => {
it('Can create a task', async () => {
const task = new CreateTaskDto()
task.query = 'My test query'
task.additional_params = [{ param1: 'value1' }, { param2: 'value2' }]
task.artifacts = [{ artifact_id: 'art-0123', url: 'https://url.test' }]
createTask(task).then((response) => {
expect(response.status).toBe(201)
})
})
})

async function createTask(createTaskDto: CreateTaskDto) {
console.log(`Creating task: ${configService.get('api.url')}`)
return fetch(`${configService.get('api.url')}${TASKS_ENDPOINT}`, {
method: 'post',
body: JSON.stringify(createTaskDto),
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${configService.get('api.authToken')}`
}
})
// return request(configService.get('api.url'))
// .post(TASKS_ENDPOINT)
// .set('Accept', 'application/json')
// .set('Authorization', `Bearer ${configService.get('api.authToken')}`)
// .send(createTaskDto)
}
})
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { Module } from '@nestjs/common'
import { AgentController } from './agent.controller'
import { AgentService } from './agent.service'
import { TypeOrmModule } from '@nestjs/typeorm'
import { TaskEntity } from 'src/database/entities/task.entity'
import { StepEntity } from 'src/database/entities/step.entity'
import { TaskEntity } from '../../database/entities/task.entity'
import { StepEntity } from '../../database/entities/step.entity'

@Module({
imports: [TypeOrmModule.forFeature([TaskEntity, StepEntity])],
Expand Down
127 changes: 127 additions & 0 deletions src/api/agent/agent.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import { Injectable, Logger } from '@nestjs/common'
import { CreateTaskDto } from './dto/create-task-dto'
import { AgentsFactory } from '../../common/utils/agents-factory'
import { TaskEntity } from '../../database/entities/task.entity'
import { InjectRepository } from '@nestjs/typeorm'
import { StepEntity } from '../../database/entities/step.entity'
import { MoreThan, Repository } from 'typeorm'
import { ExecutionStatus } from '../../common/models/agent-models'
import { AppDataSource } from '../../database/typeorm.config'

@Injectable()
export class AgentService {
// The maximum number of retries for a step, after which it is marked as failed
readonly MAX_STEP_RETRIES = 3
constructor(
@InjectRepository(TaskEntity) private taskEntity: Repository<TaskEntity>,
@InjectRepository(StepEntity) private stepEntity: Repository<StepEntity>
) {}

async createAgentTask(agentTaskDto: CreateTaskDto) {
const task = AgentsFactory.getTaskFromTemplate(agentTaskDto)
const dbTaskEntity: TaskEntity = {
...new TaskEntity(),
name: task.name,
input_query: task.input.query,
input_params: JSON.stringify(task.input.additional_params),
input_artifacts: JSON.stringify(task.input.artifacts)
}
const insertedTask = await this.taskEntity.save(dbTaskEntity)

const step = AgentsFactory.getStepFromTemplate(
agentTaskDto,
insertedTask.task_id
)
const dbStepEntity: StepEntity = {
...new StepEntity(),
task_id: step.task_id,
name: task.name,
input_query: step.input.query,
is_last: step.is_last,
input_params: JSON.stringify(step.input.additional_params),
input_artifacts: JSON.stringify(step.input.artifacts)
}
await this.stepEntity.save(dbStepEntity)

return insertedTask
}

async getPendingTasksFromDb() {
return this.taskEntity.find({
where: { task_status: ExecutionStatus.PENDING }
})
}

async getPendingStepsForTask(task: TaskEntity) {
return this.stepEntity.find({
where: { step_status: ExecutionStatus.PENDING, task_id: task.task_id }
})
}

updateStep(stepId: string, stepUpdated: Partial<StepEntity>) {
Logger.log(`Updating step ${stepId}, with ${JSON.stringify(stepUpdated)}`)
return this.stepEntity.update(
{ step_id: stepId },
{ ...stepUpdated, updated_at: new Date() }
)
}

async completeTasksWhenStepsAreDone() {
const tasks = await AppDataSource.query(
`SELECT tasks.task_id as task_id, steps.output as output, steps.output_artifacts as output_artifacts, steps.output_additional as output_additional FROM tasks, steps WHERE tasks.task_id = steps.task_id AND steps.is_last = true AND steps.step_status = \'COMPLETED\' AND tasks.task_status = \'PENDING\' `
)
for await (const task of tasks) {
Logger.log(`Marking task ${task.task_id} as completed`)
this.taskEntity.update(
{ task_id: task.task_id },
{
task_status: ExecutionStatus.COMPLETED,
output: task.output,
output_artifacts: task.output_artifacts,
output_additional: task.output_additional,
updated_at: new Date()
}
)
}
return tasks
}

async markTasksAsFailedAfterRetries() {
this.stepEntity.update(
{ retries: MoreThan(this.MAX_STEP_RETRIES) },
{ step_status: ExecutionStatus.FAILED, updated_at: new Date() }
)
const tasks = await AppDataSource.query(
`SELECT tasks.task_id as task_id, steps.output as output, steps.output_artifacts as output_artifacts, steps.output_additional as output_additional FROM tasks, steps WHERE tasks.task_id = steps.task_id AND steps.step_status = \'FAILED\' AND tasks.task_status != \'FAILED\' `
)
for await (const task of tasks) {
Logger.log(`Marking task ${task.task_id} as failed`)
this.taskEntity.update(
{ task_id: task.task_id },
{
task_status: ExecutionStatus.FAILED,
output: task.output,
output_artifacts: task.output_artifacts,
output_additional: task.output_additional,
updated_at: new Date()
}
)
}
return tasks
}
// async addFirstStepToTask(taskId: string) {
// }

// async addLastStepToTask(taskId: string) {
// }

async getTaskById(taskId: string) {
const steps = await this.stepEntity.find({ where: { task_id: taskId } })
const task = await this.taskEntity.findOne({ where: { task_id: taskId } })
return { task, steps }
}

listTasks() {
return []
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
5 changes: 5 additions & 0 deletions src/common/models/agent-models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ export interface ExecutionOptions {
* When the execution was last updated
*/
updated_at?: Date

/**
* The number of retries for the task or step
*/
retries?: number
}

/**
Expand Down
9 changes: 7 additions & 2 deletions src/common/utils/agents-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ export abstract class AgentsFactory {
return task
}

static getStepFromTemplate(input: ExecutionInput | string, task_id: string) {
static getStepFromTemplate(
input: ExecutionInput | string,
task_id: string,
isLast = false
) {
if (typeof input === 'string') {
input = {
query: input
Expand All @@ -32,7 +36,8 @@ export abstract class AgentsFactory {
task_id,
input,
status: ExecutionStatus.PENDING,
is_last: false,
is_last: isLast,
retries: 0,
created_at: new Date(),
updated_at: new Date()
}
Expand Down
3 changes: 3 additions & 0 deletions src/common/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ export function extractAuthTokenFromHeader(
return undefined
}
}

export const sleep = (ms: number) =>
new Promise((resolve) => setTimeout(resolve, ms))
23 changes: 23 additions & 0 deletions src/configuration.tests.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
export const testConfig = {
api: {
host: process.env.API_HOST || 'localhost',
port: parseInt(process.env.API_PORT, 10) || 4100,
url:
process.env.API_URL ||
`http://${process.env.API_HOST || 'localhost'}:${
process.env.API_PORT || '4100'
}`,
authToken: process.env.API_AUTH_TOKEN,
enableHttpsRedirect: process.env.API_ENABLE_HTTPS_REDIRECT === 'true'
},
database: {
type: process.env.DATABASE_TYPE || 'sqlite', // sqlite, mysql, postgres, etc
name: process.env.DATABASE_NAME || '/tmp/test/agent-template.db', // path to the database file if sqlite if is a different database type put the database name instead
host: process.env.DATABASE_HOST,
port: parseInt(process.env.DATABASE_PORT, 10),
username: process.env.DATABASE_USERNAME,
password: process.env.DATABASE_PASSWORD,
schena: process.env.DATABASE_SCHEMA
}
}
export default () => testConfig
3 changes: 3 additions & 0 deletions src/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ export const appConfig = {
username: process.env.DATABASE_USERNAME,
password: process.env.DATABASE_PASSWORD,
schena: process.env.DATABASE_SCHEMA
},
processor: {
sleepDuration: Number(process.env.AGENT_SLEEP_DURATION) || 3000 // In milliseconds
}
}
export default () => appConfig
5 changes: 5 additions & 0 deletions src/database/1718898482722-AgentBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ export class AgentBase1718898482722 implements MigrationInterface {
default: false,
isNullable: false
},
{
name: 'retries',
type: 'integer',
default: 0
},
{
name: 'step_status',
type: 'enum',
Expand Down
7 changes: 5 additions & 2 deletions src/database/entities/step.entity.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Column, Entity, PrimaryColumn } from 'typeorm'
import { BaseEntity } from './base.entity'
import { ExecutionStatus } from 'src/common/models/agent-models'
import { ExecutionStatus } from '../../common/models/agent-models'
import { v4 as uuidv4 } from 'uuid'

@Entity('steps')
Expand All @@ -14,11 +14,13 @@ export class StepEntity extends BaseEntity {
@Column('varchar')
name: string

@Column('integer')
retries: number

@Column('boolean')
is_last: boolean

@Column({
// enum: Object.values(ExecutionStatus),
enum: ExecutionStatus,
type: 'simple-enum'
})
Expand Down Expand Up @@ -46,5 +48,6 @@ export class StepEntity extends BaseEntity {
super()
this.step_id = `step-${uuidv4()}`
this.step_status = ExecutionStatus.PENDING
this.retries = 0
}
}
2 changes: 1 addition & 1 deletion src/database/entities/task.entity.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Column, Entity, PrimaryColumn } from 'typeorm'
import { BaseEntity } from './base.entity'
import { ExecutionStatus } from 'src/common/models/agent-models'
import { ExecutionStatus } from '../../common/models/agent-models'
import { v4 as uuidv4 } from 'uuid'

@Entity('tasks')
Expand Down
2 changes: 1 addition & 1 deletion src/database/typeorm.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export const AppDataSource = new DataSource({
username: configService.get('database.username'),
password: configService.get<string>('database.password'),
database: configService.get('database.name'),
logging: ['query', 'log', 'error', 'warn', 'info'],
logging: ['log', 'error', 'warn', 'info'],
entities: [],
migrations: [AgentBase1718898482722]
})
Expand Down
Loading

0 comments on commit e2eb9e0

Please sign in to comment.