Skip to content

Commit

Permalink
processDocument is now an awaited process when running inside a
Browse files Browse the repository at this point in the history
transaction
  • Loading branch information
daneryl committed Feb 5, 2025
1 parent 03f6dc0 commit 7a05798
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 11 deletions.
13 changes: 7 additions & 6 deletions app/api/entities/managerFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { MetadataObjectSchema } from 'shared/types/commonTypes';
import { EntityWithFilesSchema } from 'shared/types/entityType';
import { TypeOfFile } from 'shared/types/fileSchema';
import { FileAttachment } from './entitySavingManager';
import { tenants } from 'api/tenants';

const prepareNewFiles = async (
entity: EntityWithFilesSchema,
Expand Down Expand Up @@ -197,21 +198,21 @@ const saveFiles = async (
);

if (documentsToProcess.length) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
Promise.allSettled(
const documentsBeingProcessed = Promise.allSettled(
documentsToProcess.map(async document => processDocument(entity.sharedId!, document))
).then(results => {
results
.filter(result => result.status === 'rejected')
.map(rejected => {
const { reason } = rejected as PromiseRejectedResult;
handleError(reason);
});
.map(rejected => handleError(rejected.reason));

if (socketEmiter) {
socketEmiter('documentProcessed', entity.sharedId!);
}
});

if (tenants.current().featureFlags?.v1_transactions) {
await documentsBeingProcessed;
}
}

if (attachments.length || documents.length) {
Expand Down
7 changes: 7 additions & 0 deletions app/api/entities/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { thesauri } from '../thesauri/thesauri';
import { parseQuery, validation } from '../utils';
import date from '../utils/date';
import entities from './entities';
import { tenants } from 'api/tenants';

async function updateThesauriWithEntity(entity, req) {
const template = await templates.getById(entity.template);
Expand Down Expand Up @@ -97,6 +98,12 @@ export default app => {
return req.body.entity ? saveResult : entity;
});
res.json(result);
if (tenants.current().featureFlags.v1_transactions) {
req.emitToSessionSocket(
'documentProcessed',
req.body.entity ? result.entity.sharedId : result.sharedId
);
}
} catch (e) {
next(e);
}
Expand Down
28 changes: 27 additions & 1 deletion app/api/utils/specs/withTransaction.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,24 @@ describe('withTransaction utility', () => {
});
});

it('should clear the context after a transaction', async () => {
await appContext.run(async () => {
await withTransaction(async () => {
await model.save({ title: 'test-clear-session' });
dbSessionContext.registerFileOperation({
filename: 'test',
file: Readable.from(['content']),
type: 'document' as const,
});
dbSessionContext.registerESIndexOperation([{}, 'select', 10]);
});

expect(dbSessionContext.getSession()).toBeUndefined();
expect(dbSessionContext.getFileOperations()).toEqual([]);
expect(dbSessionContext.getReindexOperations()).toEqual([]);
});
});

describe('manual abort', () => {
it('should allow manual abort without throwing error', async () => {
await appContext.run(async () => {
Expand Down Expand Up @@ -252,6 +270,7 @@ describe('withTransaction utility', () => {
expect(sessionToTest?.hasEnded).toBe(true);
});
});

it('should do nothing when the feature flag is off', async () => {
testingTenants.changeCurrentTenant({ featureFlags: { v1_transactions: false } });

Expand All @@ -266,7 +285,8 @@ describe('withTransaction utility', () => {
});
});
});
describe('Entities elasticsearch index', () => {

describe('entities elasticsearch index', () => {
beforeEach(async () => {
await testingEnvironment.setUp(
{
Expand Down Expand Up @@ -349,6 +369,12 @@ describe('withTransaction utility', () => {
});

describe('storeFile', () => {
afterAll(async () => {
await storage.removeFile('file_to_commit.txt', 'document');
await storage.removeFile('file_to_fail.txt', 'document');
await storage.removeFile('file_to_abort.txt', 'document');
});

it('should store file after transaction is committed', async () => {
await appContext.run(async () => {
await withTransaction(async () => {
Expand Down
7 changes: 3 additions & 4 deletions app/api/utils/withTransaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ search.indexEntities = async (query, select, limit) => {

const originalStoreFile = storage.storeFile.bind(storage);
storage.storeFile = async (filename, file, type) => {
if (dbSessionContext.getSession() && !appContext.get('fileOperationsNow')) {
if (dbSessionContext.getSession()) {
return dbSessionContext.registerFileOperation({ filename, file, type });
}
return originalStoreFile(filename, file, type);
};

const performDelayedFileStores = async () => {
appContext.set('fileOperationsNow', true);
await storage.storeMultipleFiles(dbSessionContext.getFileOperations());
};

Expand Down Expand Up @@ -59,9 +58,9 @@ const withTransaction = async <T>(
try {
const result = await operation(context);
if (!wasManuallyAborted) {
dbSessionContext.clearSession();
await performDelayedFileStores();
await session.commitTransaction();
dbSessionContext.clearSession();
await performDelayedReindexes();
}
return result;
Expand All @@ -71,8 +70,8 @@ const withTransaction = async <T>(
}
throw e;
} finally {
appContext.set('fileOperationsNow', false);
dbSessionContext.clearSession();
dbSessionContext.clearContext();
await session.endSession();
}
};
Expand Down

0 comments on commit 7a05798

Please sign in to comment.