diff --git a/packages/origin-247-certificate/src/certificateForUnitTests.service.ts b/packages/origin-247-certificate/src/certificateForUnitTests.service.ts index 36fdf1e1..00a7a2dd 100644 --- a/packages/origin-247-certificate/src/certificateForUnitTests.service.ts +++ b/packages/origin-247-certificate/src/certificateForUnitTests.service.ts @@ -30,10 +30,10 @@ export class CertificateForUnitTestsService implements PublicPart { const isDateOk = new Date(entry.generationStartTime * 1000) >= generationStartFrom && - new Date(entry.generationStartTime * 1000) <= generationStartTo; - new Date(entry.generationEndTime * 1000) >= generationEndFrom && - new Date(entry.generationEndTime * 1000) <= generationEndTo; - new Date(entry.creationTime * 1000) >= creationTimeFrom && + new Date(entry.generationStartTime * 1000) <= generationStartTo && + new Date(entry.generationEndTime * 1000) >= generationEndFrom && + new Date(entry.generationEndTime * 1000) <= generationEndTo && + new Date(entry.creationTime * 1000) >= creationTimeFrom && new Date(entry.creationTime * 1000) <= creationTimeTo; const isDeviceOk = deviceId ? entry.deviceId === entry.deviceId : true; diff --git a/packages/origin-247-transfer/src/batch/events.ts b/packages/origin-247-transfer/src/batch/events.ts new file mode 100644 index 00000000..c6f7fbea --- /dev/null +++ b/packages/origin-247-transfer/src/batch/events.ts @@ -0,0 +1,5 @@ +import { IEvent } from '@nestjs/cqrs'; + +export class AwaitingTransferEvent implements IEvent {} +export class AwaitingIssuanceEvent implements IEvent {} +export class AwaitingValidationEvent implements IEvent {} diff --git a/packages/origin-247-transfer/src/batch/issue.batch.ts b/packages/origin-247-transfer/src/batch/issue.batch.ts index b2f9b621..941c32dc 100644 --- a/packages/origin-247-transfer/src/batch/issue.batch.ts +++ b/packages/origin-247-transfer/src/batch/issue.batch.ts @@ -1,10 +1,9 @@ -import { EventsHandler, IEventHandler, IEvent } from '@nestjs/cqrs'; +import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; import { Inject } from '@nestjs/common'; import { BatchConfiguration, BATCH_CONFIGURATION_TOKEN } from './configuration'; import { IssueService } from '../issue.service'; import { queueThrottle } from './queueThrottle'; - -export class AwaitingIssuanceEvent implements IEvent {} +import { AwaitingIssuanceEvent } from './events'; @EventsHandler(AwaitingIssuanceEvent) export class AwaitingIssuanceEventHandler implements IEventHandler { diff --git a/packages/origin-247-transfer/src/batch/transfer.batch.ts b/packages/origin-247-transfer/src/batch/transfer.batch.ts index 4d04d3c8..e0d77705 100644 --- a/packages/origin-247-transfer/src/batch/transfer.batch.ts +++ b/packages/origin-247-transfer/src/batch/transfer.batch.ts @@ -1,10 +1,9 @@ -import { IEvent, EventsHandler, IEventHandler } from '@nestjs/cqrs'; +import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; import { Inject } from '@nestjs/common'; import { TransferService } from '../transfer.service'; import { queueThrottle } from './queueThrottle'; import { BatchConfiguration, BATCH_CONFIGURATION_TOKEN } from './configuration'; - -export class AwaitingTransferEvent implements IEvent {} +import { AwaitingTransferEvent } from './events'; @EventsHandler(AwaitingTransferEvent) export class AwaitingTransferEventHandler implements IEventHandler { diff --git a/packages/origin-247-transfer/src/batch/validate.batch.ts b/packages/origin-247-transfer/src/batch/validate.batch.ts index 50648a68..94138a1f 100644 --- a/packages/origin-247-transfer/src/batch/validate.batch.ts +++ b/packages/origin-247-transfer/src/batch/validate.batch.ts @@ -1,10 +1,9 @@ -import { EventsHandler, IEventHandler, IEvent } from '@nestjs/cqrs'; +import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; import { Inject } from '@nestjs/common'; import { BatchConfiguration, BATCH_CONFIGURATION_TOKEN } from './configuration'; import { ValidateService } from '../validate.service'; import { queueThrottle } from './queueThrottle'; - -export class AwaitingValidationEvent implements IEvent {} +import { AwaitingValidationEvent } from './events'; @EventsHandler(AwaitingValidationEvent) export class AwaitingValidationEventHandler implements IEventHandler { diff --git a/packages/origin-247-transfer/src/handlers/GenerationReadingStoredEvent.handler.ts b/packages/origin-247-transfer/src/handlers/GenerationReadingStoredEvent.handler.ts index 9d6bd6b1..2629f0dc 100644 --- a/packages/origin-247-transfer/src/handlers/GenerationReadingStoredEvent.handler.ts +++ b/packages/origin-247-transfer/src/handlers/GenerationReadingStoredEvent.handler.ts @@ -2,6 +2,7 @@ import { EventsHandler, IEventHandler, QueryBus, EventBus } from '@nestjs/cqrs'; import { Inject, Logger } from '@nestjs/common'; import { GenerationReadingStoredEvent } from '../events/GenerationReadingStored.event'; import { IssueService } from '../issue.service'; +import { AwaitingIssuanceEvent } from '../batch/events'; import { EnergyTransferRequestRepository, ENERGY_TRANSFER_REQUEST_REPOSITORY @@ -10,7 +11,6 @@ import { GetTransferSitesQuery, IGetTransferSitesQueryResponse } from '../queries/GetTransferSites.query'; -import { AwaitingIssuanceEvent } from '../batch/issue.batch'; @EventsHandler(GenerationReadingStoredEvent) export class GenerationReadingStoredEventHandler diff --git a/packages/origin-247-transfer/src/issue.service.ts b/packages/origin-247-transfer/src/issue.service.ts index 6df39840..dacaa094 100644 --- a/packages/origin-247-transfer/src/issue.service.ts +++ b/packages/origin-247-transfer/src/issue.service.ts @@ -6,9 +6,8 @@ import { } from './repositories/EnergyTransferRequest.repository'; import { EventBus } from '@nestjs/cqrs'; import { EnergyTransferRequest, State } from './EnergyTransferRequest'; -import { chunk } from 'lodash'; -import { AwaitingValidationEvent } from './batch/validate.batch'; import { BatchConfiguration, BATCH_CONFIGURATION_TOKEN } from './batch/configuration'; +import { AwaitingValidationEvent, AwaitingIssuanceEvent } from './batch/events'; @Injectable() export class IssueService { @@ -23,13 +22,14 @@ export class IssueService { ) {} public async issueTask() { - const etrs = await this.etrRepository.findByState(State.IssuanceAwaiting); + const etrs = await this.etrRepository.findByState(State.IssuanceAwaiting, { + limit: this.batchConfiguration.issueBatchSize + }); - const etrGroups = chunk(etrs, this.batchConfiguration.issueBatchSize); + await this.issueCertificates(etrs); - for (const group of etrGroups) { - await this.issueCertificates(group); - } + // Loop + this.eventBus.publish(new AwaitingIssuanceEvent()); } private async issueCertificates(etrs: EnergyTransferRequest[]) { diff --git a/packages/origin-247-transfer/src/repositories/EnergyTransferRequest.repository.ts b/packages/origin-247-transfer/src/repositories/EnergyTransferRequest.repository.ts index 960e7390..4b8c1374 100644 --- a/packages/origin-247-transfer/src/repositories/EnergyTransferRequest.repository.ts +++ b/packages/origin-247-transfer/src/repositories/EnergyTransferRequest.repository.ts @@ -2,12 +2,16 @@ import { EnergyTransferRequest, NewAttributesParams, State } from '../EnergyTran export const ENERGY_TRANSFER_REQUEST_REPOSITORY = Symbol.for('ENERGY_TRANSFER_REQUEST_REPOSITORY'); +export interface IFindByStateOptions { + limit: number; +} + export interface EnergyTransferRequestRepository { createNew(command: NewAttributesParams): Promise; findByCertificateId(certificateId: number): Promise; findById(id: number): Promise; findAll(): Promise; - findByState(status: State): Promise; + findByState(status: State, options: IFindByStateOptions): Promise; save(entity: EnergyTransferRequest): Promise; saveManyInTransaction(entity: EnergyTransferRequest[]): Promise; updateWithLock(id: number, cb: (entity: EnergyTransferRequest) => void): Promise; diff --git a/packages/origin-247-transfer/src/repositories/EnergyTransferRequestInMemory.repository.ts b/packages/origin-247-transfer/src/repositories/EnergyTransferRequestInMemory.repository.ts index 62150a18..3d0cfbdb 100644 --- a/packages/origin-247-transfer/src/repositories/EnergyTransferRequestInMemory.repository.ts +++ b/packages/origin-247-transfer/src/repositories/EnergyTransferRequestInMemory.repository.ts @@ -1,6 +1,9 @@ import { Injectable } from '@nestjs/common'; import { EnergyTransferRequest, NewAttributesParams, State } from '../EnergyTransferRequest'; -import { EnergyTransferRequestRepository } from './EnergyTransferRequest.repository'; +import { + EnergyTransferRequestRepository, + IFindByStateOptions +} from './EnergyTransferRequest.repository'; @Injectable() export class EnergyTransferRequestInMemoryRepository implements EnergyTransferRequestRepository { @@ -36,8 +39,11 @@ export class EnergyTransferRequestInMemoryRepository implements EnergyTransferRe return request ?? null; } - public async findByState(state: State): Promise { - return this.db.filter((e) => e.toAttrs().state === state); + public async findByState( + state: State, + options: IFindByStateOptions + ): Promise { + return this.db.filter((e) => e.toAttrs().state === state).slice(0, options.limit); } public async findById(id: number): Promise { diff --git a/packages/origin-247-transfer/src/repositories/EnergyTransferRequestPostgres.repository.ts b/packages/origin-247-transfer/src/repositories/EnergyTransferRequestPostgres.repository.ts index 0684b734..6de4d173 100644 --- a/packages/origin-247-transfer/src/repositories/EnergyTransferRequestPostgres.repository.ts +++ b/packages/origin-247-transfer/src/repositories/EnergyTransferRequestPostgres.repository.ts @@ -3,8 +3,10 @@ import { InjectRepository, InjectConnection } from '@nestjs/typeorm'; import { Repository, Connection } from 'typeorm'; import { EnergyTransferRequest, NewAttributesParams, State } from '../EnergyTransferRequest'; import { EnergyTransferRequestEntity, tableName } from './EnergyTransferRequest.entity'; -import { EnergyTransferRequestRepository } from './EnergyTransferRequest.repository'; -import { omit } from 'lodash'; +import { + EnergyTransferRequestRepository, + IFindByStateOptions +} from './EnergyTransferRequest.repository'; @Injectable() export class EnergyTransferRequestPostgresRepository implements EnergyTransferRequestRepository { @@ -55,9 +57,13 @@ export class EnergyTransferRequestPostgresRepository implements EnergyTransferRe return request ? EnergyTransferRequest.fromAttrs(request) : null; } - public async findByState(state: State): Promise { + public async findByState( + state: State, + options: IFindByStateOptions + ): Promise { const results = await this.repository.find({ - state + where: { state }, + take: options.limit }); return results.map(EnergyTransferRequest.fromAttrs); diff --git a/packages/origin-247-transfer/src/transfer.service.ts b/packages/origin-247-transfer/src/transfer.service.ts index 15403b3a..cf16bc53 100644 --- a/packages/origin-247-transfer/src/transfer.service.ts +++ b/packages/origin-247-transfer/src/transfer.service.ts @@ -5,8 +5,9 @@ import { ENERGY_TRANSFER_REQUEST_REPOSITORY } from './repositories/EnergyTransferRequest.repository'; import { EnergyTransferRequest, State } from './EnergyTransferRequest'; -import { chunk } from 'lodash'; import { BatchConfiguration, BATCH_CONFIGURATION_TOKEN } from './batch/configuration'; +import { EventBus } from '@nestjs/cqrs'; +import { AwaitingTransferEvent } from './batch/events'; @Injectable() export class TransferService { @@ -16,17 +17,19 @@ export class TransferService { @Inject(ENERGY_TRANSFER_REQUEST_REPOSITORY) private etrRepository: EnergyTransferRequestRepository, @Inject(BATCH_CONFIGURATION_TOKEN) - private batchConfiguration: BatchConfiguration + private batchConfiguration: BatchConfiguration, + private eventBus: EventBus ) {} public async transferTask() { - const etrs = await this.etrRepository.findByState(State.TransferAwaiting); + const etrs = await this.etrRepository.findByState(State.TransferAwaiting, { + limit: this.batchConfiguration.transferBatchSize + }); - const chunkedEtrs = chunk(etrs, this.batchConfiguration.transferBatchSize); + await this.transferCertificates(etrs); - for (const chunk of chunkedEtrs) { - await this.transferCertificates(chunk); - } + // Loop + this.eventBus.publish(new AwaitingTransferEvent()); } private async transferCertificates(etrs: EnergyTransferRequest[]): Promise { diff --git a/packages/origin-247-transfer/src/validate.service.ts b/packages/origin-247-transfer/src/validate.service.ts index 3fcbfa40..c513f7e5 100644 --- a/packages/origin-247-transfer/src/validate.service.ts +++ b/packages/origin-247-transfer/src/validate.service.ts @@ -15,9 +15,8 @@ import { TransferValidationStatus, UpdateStatusCode } from './EnergyTransferRequest'; -import { chunk } from 'lodash'; -import { AwaitingTransferEvent } from './batch/transfer.batch'; import { BatchConfiguration, BATCH_CONFIGURATION_TOKEN } from './batch/configuration'; +import { AwaitingValidationEvent, AwaitingTransferEvent } from './batch/events'; export interface IUpdateValidationStatusCommand { requestId: number; @@ -41,13 +40,14 @@ export class ValidateService { ) {} public async validateTask(): Promise { - const etrs = await this.etrRepository.findByState(State.ValidationAwaiting); + const etrs = await this.etrRepository.findByState(State.ValidationAwaiting, { + limit: this.batchConfiguration.validateBatchSize + }); - const chunkedEtrs = chunk(etrs, this.batchConfiguration.validateBatchSize); + await this.startValidation(etrs); - for (const chunk of chunkedEtrs) { - await this.startValidation(chunk); - } + // Loop + this.eventBus.publish(new AwaitingValidationEvent()); } private async startValidation(etrs: EnergyTransferRequest[]): Promise {