diff --git a/packages/integration-sdk-core/src/types/storage.ts b/packages/integration-sdk-core/src/types/storage.ts index 0f6ab8959..171902755 100644 --- a/packages/integration-sdk-core/src/types/storage.ts +++ b/packages/integration-sdk-core/src/types/storage.ts @@ -46,4 +46,6 @@ export interface GraphObjectStore { getIndexMetadataForGraphObjectType?: ( params: GetIndexMetadataForGraphObjectTypeParams, ) => GraphObjectIndexMetadata | undefined; + + getStepsStored?: () => string[]; } diff --git a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts index db2b38176..b14f53d87 100644 --- a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts +++ b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts @@ -452,6 +452,9 @@ export function executeStepDependencyGraph< Array.from(stepResultsMap.keys()).pop() as string, ); } + const stepsInvolvedInUpload = graphObjectStore.getStepsStored + ? graphObjectStore.getStepsStored() + : []; await graphObjectStore.flush( async (entities) => entities.length @@ -470,13 +473,18 @@ export function executeStepDependencyGraph< ); try { await uploader?.waitUntilUploadsComplete(); - } catch (error) { + } catch (err) { executionContext.logger.publishErrorEvent({ name: IntegrationErrorEventName.UnexpectedError, description: 'Upload to persister failed', }); - //How can we fail gracefully here? - throw error; + for (const stepId of stepsInvolvedInUpload) { + executionContext.logger.stepFailure( + workingGraph.getNodeData(stepId), + err, + ); + stepResultsMap[stepId] = StepResultStatus.FAILURE; + } } } diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts index 67044254d..caa67ff34 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts @@ -152,6 +152,9 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { integrationStepsToGraphObjectIndexMetadataMap(params.integrationSteps); } } + getStepsStored() { + return this.localGraphObjectStore.getStepsStored(); + } async addEntities( stepId: string, diff --git a/packages/integration-sdk-runtime/src/storage/memory.ts b/packages/integration-sdk-runtime/src/storage/memory.ts index e7179e676..e1b931633 100644 --- a/packages/integration-sdk-runtime/src/storage/memory.ts +++ b/packages/integration-sdk-runtime/src/storage/memory.ts @@ -323,4 +323,22 @@ export class InMemoryGraphObjectStore implements GraphObjectStore { 0, ); } + + getStepsStored(): string[] { + const stepIds: string[] = []; + + for (const [_, graphObjectData] of this.relationshipKeyToRelationshipMap) { + const { stepId } = graphObjectData; + if (!stepIds.includes(stepId)) { + stepIds.push(stepId); + } + } + for (const [_, graphObjectData] of this.entityKeyToEntityMap) { + const { stepId } = graphObjectData; + if (!stepIds.includes(stepId)) { + stepIds.push(stepId); + } + } + return stepIds; + } }