Skip to content

Commit

Permalink
Corrected tests - Added comments - format
Browse files Browse the repository at this point in the history
  • Loading branch information
Gonzalo Avalos Ribas authored and Gonzalo Avalos Ribas committed Oct 6, 2023
1 parent bdd38f7 commit 0267c15
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ describe('executeStepDependencyGraph', () => {
expect(spyB).toHaveBeenCalledBefore(spyC);
});

test('should mark steps with failed executionHandlers with status FAILURE and dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => {
test('should throw if upload fails', async () => {
const spyA = jest.fn();
const spyB = jest.fn();
const spyC = jest.fn();
Expand Down Expand Up @@ -697,6 +697,21 @@ describe('executeStepDependencyGraph', () => {
};
}

function createFailingUploader(
stepId: string,
collector: FlushedGraphObjectData[],
): StepGraphObjectDataUploader {
return {
stepId,
async enqueue(graphObjectData) {
collector.push(graphObjectData);
return Promise.resolve();
},
waitUntilUploadsComplete() {
return Promise.reject(new Error('Expected error'));
},
};
}
const passingUploaderCollector: FlushedGraphObjectData[] = [];

/**
Expand All @@ -707,14 +722,14 @@ describe('executeStepDependencyGraph', () => {
* 'b' depends on 'a',
* 'c' depends on 'b'
*/
const result = await executeSteps(
steps,
stepStartStates,
graphObjectStore,
(stepId) => {
await expect(
executeSteps(steps, stepStartStates, graphObjectStore, (stepId) => {
if (stepId == 'c') {
return createFailingUploader(stepId, passingUploaderCollector);
}
return createPassingUploader(stepId, passingUploaderCollector);
},
);
}),
).rejects.toThrow();

const expectedCollected: FlushedGraphObjectData[] = [
{
Expand All @@ -724,35 +739,6 @@ describe('executeStepDependencyGraph', () => {
];
expect(passingUploaderCollector).toEqual(expectedCollected);

expect(result).toEqual([
{
id: 'a',
name: 'a',
declaredTypes: [],
partialTypes: [],
encounteredTypes: [eA._type],
status: StepResultStatus.SUCCESS,
},
{
id: 'b',
name: 'b',
declaredTypes: [],
partialTypes: [],
encounteredTypes: [eB._type],
dependsOn: ['a'],
status: StepResultStatus.SUCCESS,
},
{
id: 'c',
name: 'c',
declaredTypes: [],
partialTypes: [],
encounteredTypes: [eC._type],
dependsOn: ['b'],
status: StepResultStatus.SUCCESS,
},
]);

expect(spyA).toHaveBeenCalledTimes(1);
expect(spyB).toHaveBeenCalledTimes(1);
expect(spyC).toHaveBeenCalledTimes(1);
Expand Down
76 changes: 39 additions & 37 deletions packages/integration-sdk-runtime/src/execution/dependencyGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -442,48 +442,50 @@ export function executeStepDependencyGraph<

return status;
}
async function forceFlushEverything() {
/** Instead of flushing after each step, flush only when we finish all steps OR when we reach the threshold limit
* Because the 'createStepGraphObjectDataUploader' needs a step I'm using the last step as it
*/
let uploader: StepGraphObjectDataUploader | undefined;
if (createStepGraphObjectDataUploader) {
uploader = createStepGraphObjectDataUploader(
Array.from(stepResultsMap.keys()).pop() as string,
);
}
await graphObjectStore.flush(
async (entities) =>
entities.length
? uploader?.enqueue({
entities,
relationships: [],
})
: undefined,
async (relationships) =>
relationships.length
? uploader?.enqueue({
entities: [],
relationships,
})
: undefined,
);
try {
await uploader?.waitUntilUploadsComplete();
} catch (error) {
executionContext.logger.publishErrorEvent({
name: IntegrationErrorEventName.UnexpectedError,
description: 'Upload to persister failed',
});
//How can we fail gracefully here?
throw error;
}
}

// kick off work for all leaf nodes
enqueueLeafSteps();

void promiseQueue
.onIdle()
.then(async () => {
/** Instead of flushing after each step, flush only when we finish all steps OR when we reach the threshold limit
* Because the 'createStepGraphObjectDataUploader' needs a step I'm using the last step as it
* I think we should decouple as much as possible upload from step success.
*/
let uploader: StepGraphObjectDataUploader | undefined;
if (createStepGraphObjectDataUploader) {
uploader = createStepGraphObjectDataUploader(
Array.from(stepResultsMap.keys()).pop() as string,
);
}
await graphObjectStore.flush(
async (entities) =>
entities.length
? uploader?.enqueue({
entities,
relationships: [],
})
: undefined,
async (relationships) =>
relationships.length
? uploader?.enqueue({
entities: [],
relationships,
})
: undefined,
);
try {
await uploader?.waitUntilUploadsComplete();
} catch (error) {
executionContext.logger.publishErrorEvent({
name: IntegrationErrorEventName.UnexpectedError,
description: 'Upload to persister failed',
});
throw error;
}
})
.then(forceFlushEverything)
.then(() => resolve([...stepResultsMap.values()]))
.catch(reject);
});
Expand Down

0 comments on commit 0267c15

Please sign in to comment.