Skip to content

Commit

Permalink
Merge pull request #48 from wri/main
Browse files Browse the repository at this point in the history
[MERGE] main -> staging post WW release.
  • Loading branch information
roguenet authored Jan 27, 2025
2 parents 4d168a5 + e00d902 commit 45f57c1
Show file tree
Hide file tree
Showing 23 changed files with 847 additions and 302 deletions.
45 changes: 30 additions & 15 deletions apps/unified-database-service/src/airtable/airtable.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,34 @@ import { ConfigService } from "@nestjs/config";
import Airtable from "airtable";
import {
ApplicationEntity,
DemographicEntity,
NurseryEntity,
NurseryReportEntity,
OrganisationEntity,
ProjectEntity,
ProjectReportEntity,
RestorationPartnerEntity,
SiteEntity,
SiteReportEntity,
TreeSpeciesEntity
TreeSpeciesEntity,
WorkdayEntity
} from "./entities";
import * as Sentry from "@sentry/node";
import { SlackService } from "nestjs-slack";

export const AIRTABLE_ENTITIES = {
application: ApplicationEntity,
demographic: DemographicEntity,
nursery: NurseryEntity,
"nursery-report": NurseryReportEntity,
organisation: OrganisationEntity,
project: ProjectEntity,
"project-report": ProjectReportEntity,
"restoration-partner": RestorationPartnerEntity,
site: SiteEntity,
"site-report": SiteReportEntity,
"tree-species": TreeSpeciesEntity
"tree-species": TreeSpeciesEntity,
workday: WorkdayEntity
};

export type EntityType = keyof typeof AIRTABLE_ENTITIES;
Expand Down Expand Up @@ -64,26 +70,28 @@ export class AirtableProcessor extends WorkerHost {
}

async process(job: Job) {
switch (job.name) {
const { name, data } = job;
await this.sendSlackUpdate(`:construction_worker: Beginning job: ${JSON.stringify({ name, data })}`);
switch (name) {
case "updateEntities":
return await this.updateEntities(job.data as UpdateEntitiesData);
return await this.updateEntities(data as UpdateEntitiesData);

case "deleteEntities":
return await this.deleteEntities(job.data as DeleteEntitiesData);
return await this.deleteEntities(data as DeleteEntitiesData);

case "updateAll":
return await this.updateAll(job.data as UpdateAllData);
return await this.updateAll(data as UpdateAllData);

default:
throw new NotImplementedException(`Unknown job type: ${job.name}`);
throw new NotImplementedException(`Unknown job type: ${name}`);
}
}

@OnWorkerEvent("failed")
async onFailed(job: Job, error: Error) {
Sentry.captureException(error);
this.logger.error(`Worker event failed: ${JSON.stringify(job)}`, error.stack);
this.sendSlackUpdate(`:warning: ERROR: Job processing failed: ${JSON.stringify(job)}`);
await this.sendSlackUpdate(`:warning: ERROR: Job processing failed: ${JSON.stringify(job)}`);
}

private async updateEntities({ entityType, startPage, updatedSince }: UpdateEntitiesData) {
Expand All @@ -98,7 +106,7 @@ export class AirtableProcessor extends WorkerHost {
await entity.updateBase(this.base, { startPage, updatedSince });

this.logger.log(`Completed entity update: ${JSON.stringify({ entityType, updatedSince })}`);
this.sendSlackUpdate(`Completed updating table "${entity.TABLE_NAME}" [updatedSince: ${updatedSince}]`);
await this.sendSlackUpdate(`Completed updating table "${entity.TABLE_NAME}" [updatedSince: ${updatedSince}]`);
}

private async deleteEntities({ entityType, deletedSince }: DeleteEntitiesData) {
Expand All @@ -113,23 +121,30 @@ export class AirtableProcessor extends WorkerHost {
await entity.deleteStaleRecords(this.base, deletedSince);

this.logger.log(`Completed entity delete: ${JSON.stringify({ entityType, deletedSince })}`);
this.sendSlackUpdate(`Completed deleting rows from table "${entity.TABLE_NAME}" [deletedSince: ${deletedSince}]`);
await this.sendSlackUpdate(
`Completed deleting rows from table "${entity.TABLE_NAME}" [deletedSince: ${deletedSince}]`
);
}

private async updateAll({ updatedSince }: UpdateAllData) {
this.sendSlackUpdate(`:white_check_mark: Beginning sync of all data [changedSince: ${updatedSince}]`);
await this.sendSlackUpdate(`:white_check_mark: Beginning sync of all data [changedSince: ${updatedSince}]`);
for (const entityType of ENTITY_TYPES) {
await this.updateEntities({ entityType, updatedSince });
await this.deleteEntities({ entityType, deletedSince: updatedSince });
}
this.sendSlackUpdate(`:100: Completed sync of all data [changedSince: ${updatedSince}]`);
await this.sendSlackUpdate(`:100: Completed sync of all data [changedSince: ${updatedSince}]`);
}

private sendSlackUpdate(message: string) {
private async sendSlackUpdate(message: string) {
const channel = this.config.get("UDB_SLACK_CHANNEL");
if (channel == null) return;

// Ignore promise; we don't want the process to fail if comms with Slack break down.
this.slack.sendText(`[${process.env.DEPLOY_ENV}]: ${message}`, { channel });
await this.slack
.sendText(`[${process.env.DEPLOY_ENV}]: ${message}`, { channel })
// Don't allow a failure in slack sending to hose our process, but do log it and send it to Sentry
.catch(error => {
Sentry.captureException(error);
this.logger.error("Send to slack failed", error.stack);
});
}
}
Loading

0 comments on commit 45f57c1

Please sign in to comment.