From f59f77d461465113809e9cbbe81c5b37d5336425 Mon Sep 17 00:00:00 2001 From: Forrest Date: Tue, 16 Jul 2024 13:51:23 -0400 Subject: [PATCH] feat(import): simpler import + state restore logic The pipeline code has been removed in favor of a simpler chain-of-responsibility approach, using `evaluateChain` and `asyncSelect`. `evaluateChain` is responsible for evaluating a data source against a chain of import handlers until one of them returns a new data source. To keep processing a data source like how the old pipeline code supported nested executions, `evaluateChain` is invoked inside a loop for every data source. `asyncSelect` is used to drive the loop execution, seleting `evaluateChain` promises whenever they are done. The state schema is updated to generically operate on serialized data sources. Instead of special-casing for remote files, the serialized DataSource type encodes this state. --- src/actions/importDicomChunks.ts | 2 +- src/actions/loadUserFiles.ts | 65 ++-- src/components/SampleDataBrowser.vue | 4 +- src/core/__tests__/pipeline.spec.ts | 284 --------------- src/core/pipeline.ts | 309 ---------------- src/io/import/common.ts | 114 +++++- src/io/import/importDataSources.ts | 238 +++++++------ src/io/import/processors/downloadStream.ts | 27 +- src/io/import/processors/extractArchive.ts | 21 +- .../import/processors/extractArchiveTarget.ts | 65 +--- src/io/import/processors/handleAmazonS3.ts | 13 +- src/io/import/processors/handleConfig.ts | 11 +- src/io/import/processors/handleDicomFile.ts | 13 +- src/io/import/processors/handleDicomStream.ts | 17 +- .../processors/handleGoogleCloudStorage.ts | 16 +- src/io/import/processors/importSingleFile.ts | 27 +- src/io/import/processors/openUriStream.ts | 25 +- src/io/import/processors/remoteManifest.ts | 46 ++- .../processors/resolveIncompleteDataSource.ts | 108 ------ src/io/import/processors/restoreStateFile.ts | 332 ++++++++---------- .../import/processors/updateFileMimeType.ts | 33 +- src/io/import/processors/updateUriType.ts | 11 +- src/io/state-file/index.ts | 3 +- src/io/state-file/schema.ts | 44 ++- src/io/state-file/utils.ts | 44 --- src/io/zip.ts | 17 +- src/store/datasets-dicom.ts | 18 - src/store/datasets-images.ts | 12 - src/store/datasets.ts | 113 +++++- src/store/segmentGroups.ts | 8 +- src/utils/__tests__/asyncSelect.spec.ts | 41 +++ src/utils/__tests__/evaluateChain.spec.ts | 52 +++ src/utils/asyncSelect.ts | 23 ++ src/utils/evaluateChain.ts | 26 ++ 34 files changed, 863 insertions(+), 1319 deletions(-) delete mode 100644 src/core/__tests__/pipeline.spec.ts delete mode 100644 src/core/pipeline.ts delete mode 100644 src/io/import/processors/resolveIncompleteDataSource.ts delete mode 100644 src/io/state-file/utils.ts create mode 100644 src/utils/__tests__/asyncSelect.spec.ts create mode 100644 src/utils/__tests__/evaluateChain.spec.ts create mode 100644 src/utils/asyncSelect.ts create mode 100644 src/utils/evaluateChain.ts diff --git a/src/actions/importDicomChunks.ts b/src/actions/importDicomChunks.ts index 43d659f76..102c05c5c 100644 --- a/src/actions/importDicomChunks.ts +++ b/src/actions/importDicomChunks.ts @@ -23,5 +23,5 @@ export async function importDicomChunks(chunks: Chunk[]) { }) ); - return Object.keys(chunksByVolume); + return chunksByVolume; } diff --git a/src/actions/loadUserFiles.ts b/src/actions/loadUserFiles.ts index 351b14477..7c973588d 100644 --- a/src/actions/loadUserFiles.ts +++ b/src/actions/loadUserFiles.ts @@ -10,23 +10,24 @@ import { useDatasetStore } from '@/src/store/datasets'; import { useDICOMStore } from '@/src/store/datasets-dicom'; import { useLayersStore } from '@/src/store/datasets-layers'; import { useSegmentGroupStore } from '@/src/store/segmentGroups'; -import { wrapInArray, nonNullable } from '@/src/utils'; +import { wrapInArray, nonNullable, partition } from '@/src/utils'; import { basename } from '@/src/utils/path'; import { parseUrl } from '@/src/utils/url'; import { logError } from '@/src/utils/loggers'; -import { PipelineResultSuccess, partitionResults } from '@/src/core/pipeline'; import { - ImportDataSourcesResult, importDataSources, toDataSelection, } from '@/src/io/import/importDataSources'; import { + ErrorResult, ImportResult, LoadableResult, - VolumeResult, + LoadableVolumeResult, isLoadableResult, isVolumeResult, + ImportDataSourcesResult, } from '@/src/io/import/common'; +import { isDicomImage } from '@/src/utils/dataSelection'; // higher value priority is preferred for picking a primary selection const BASE_MODALITY_TYPES = { @@ -38,8 +39,8 @@ const BASE_MODALITY_TYPES = { function findBaseDicom(loadableDataSources: Array) { // find dicom dataset for primary selection if available - const dicoms = loadableDataSources.filter( - ({ dataType }) => dataType === 'dicom' + const dicoms = loadableDataSources.filter(({ dataID }) => + isDicomImage(dataID) ); // prefer some modalities as base const dicomStore = useDICOMStore(); @@ -97,19 +98,15 @@ function findBaseImage( } // returns image and dicom sources, no config files -function filterLoadableDataSources( - succeeded: Array> -) { - return succeeded.flatMap((result) => { - return result.data.filter(isLoadableResult); - }); +function filterLoadableDataSources(succeeded: Array) { + return succeeded.filter(isLoadableResult); } // Returns list of dataSources with file names where the name has the extension argument // and the start of the file name matches the primary file name. function filterMatchingNames( - primaryDataSource: VolumeResult, - succeeded: Array>, + primaryDataSource: LoadableVolumeResult, + succeeded: Array, extension: string ) { const primaryName = getDataSourceName(primaryDataSource.dataSource); @@ -137,7 +134,7 @@ function getStudyUID(volumeID: string) { } function findBaseDataSource( - succeeded: Array>, + succeeded: Array, segmentGroupExtension: string ) { const loadableDataSources = filterLoadableDataSources(succeeded); @@ -151,24 +148,24 @@ function findBaseDataSource( function filterOtherVolumesInStudy( volumeID: string, - succeeded: Array> + succeeded: Array ) { const targetStudyUID = getStudyUID(volumeID); const dicomDataSources = filterLoadableDataSources(succeeded).filter( - ({ dataType }) => dataType === 'dicom' + ({ dataID }) => isDicomImage(dataID) ); return dicomDataSources.filter((ds) => { const sourceStudyUID = getStudyUID(ds.dataID); return sourceStudyUID === targetStudyUID && ds.dataID !== volumeID; - }) as Array; + }) as Array; } // Layers a DICOM PET on a CT if found function loadLayers( - primaryDataSource: VolumeResult, - succeeded: Array> + primaryDataSource: LoadableVolumeResult, + succeeded: Array ) { - if (primaryDataSource.dataType !== 'dicom') return; + if (!isDicomImage(primaryDataSource.dataID)) return; const otherVolumesInStudy = filterOtherVolumesInStudy( primaryDataSource.dataID, succeeded @@ -194,8 +191,8 @@ function loadLayers( // - DICOM SEG modalities with matching StudyUIDs. // - DataSources that have a name like foo.segmentation.bar and the primary DataSource is named foo.baz function loadSegmentations( - primaryDataSource: VolumeResult, - succeeded: Array>, + primaryDataSource: LoadableVolumeResult, + succeeded: Array, segmentGroupExtension: string ) { const matchingNames = filterMatchingNames( @@ -233,13 +230,19 @@ function loadDataSources(sources: DataSource[]) { let results: ImportDataSourcesResult[]; try { - results = await importDataSources(sources); + results = (await importDataSources(sources)).filter((result) => + // only look at data and error results + ['data', 'error'].includes(result.type) + ); } catch (error) { loadDataStore.setError(error as Error); return; } - const [succeeded, errored] = partitionResults(results); + const [succeeded, errored] = partition( + (result) => result.type !== 'error', + results + ); if (!dataStore.primarySelection && succeeded.length) { const primaryDataSource = findBaseDataSource( @@ -260,14 +263,12 @@ function loadDataSources(sources: DataSource[]) { } if (errored.length) { - const errorMessages = errored.map((errResult) => { - // pick first error - const [firstError] = errResult.errors; - // pick innermost dataset that errored - const name = getDataSourceName(firstError.inputDataStackTrace[0]); + const errorMessages = (errored as ErrorResult[]).map((errResult) => { + const { dataSource, error } = errResult; + const name = getDataSourceName(dataSource); // log error for debugging - logError(firstError.cause); - return `- ${name}: ${firstError.message}`; + logError(error); + return `- ${name}: ${error.message}`; }); const failedError = new Error( `These files failed to load:\n${errorMessages.join('\n')}` diff --git a/src/components/SampleDataBrowser.vue b/src/components/SampleDataBrowser.vue index 3579799a5..d8fbbf7ea 100644 --- a/src/components/SampleDataBrowser.vue +++ b/src/components/SampleDataBrowser.vue @@ -96,8 +96,8 @@ export default defineComponent({ if (!loadResult) { throw new Error('Did not receive a load result'); } - if (!loadResult.ok) { - throw loadResult.errors[0].cause; + if (loadResult.type === 'error') { + throw loadResult.error; } const selection = convertSuccessResultToDataSelection(loadResult); diff --git a/src/core/__tests__/pipeline.spec.ts b/src/core/__tests__/pipeline.spec.ts deleted file mode 100644 index 7b4aaeaae..000000000 --- a/src/core/__tests__/pipeline.spec.ts +++ /dev/null @@ -1,284 +0,0 @@ -import { describe, it } from 'vitest'; -import sinonChai from 'sinon-chai'; -import Chai, { expect } from 'chai'; -import Pipeline, { Handler } from '../pipeline'; - -Chai.use(sinonChai); - -function asyncSleep(msec: number) { - return new Promise((resolve) => { - setTimeout(() => { - resolve(); - }, msec); - }); -} - -describe('Pipeline', () => { - it('should execute a pipeline in order with results', async () => { - const callOrder: number[] = []; - - const handlers: Array> = [ - () => { - callOrder.push(1); - }, - () => { - callOrder.push(2); - }, - (input, { done }) => { - callOrder.push(3); - return done(42); - }, - ]; - const pipeline = new Pipeline(handlers); - const result = await pipeline.execute(); - - expect(result.ok).to.be.true; - if (result.ok) { - expect(result.data).to.deep.equal([42]); - } - expect(callOrder).to.deep.equal([1, 2, 3]); - }); - - it('should terminate a pipeline at the end without done', async () => { - const callOrder: number[] = []; - - const handlers: Array> = [ - () => { - callOrder.push(1); - }, - () => { - callOrder.push(2); - }, - () => { - callOrder.push(3); - }, - ]; - const pipeline = new Pipeline(handlers); - const result = await pipeline.execute(); - - expect(result.ok).to.be.true; - if (result.ok) { - expect(result.data).to.have.length(0); - } - expect(callOrder).to.deep.equal([1, 2, 3]); - }); - - it('should execute an async pipeline with transforms', async () => { - let calc = 0; - - const handlers: Array> = [ - async (input) => { - await asyncSleep(1); - return input + 1; - }, - (input) => { - return input + 2; - }, - async (input, { done }) => { - await asyncSleep(1); - calc = input; - return done(); - }, - ]; - const pipeline = new Pipeline(handlers); - const result = await pipeline.execute(5); - - expect(result.ok).to.be.true; - expect(calc).to.equal(8); - }); - - it('should execute an asynchronous (promise) pipeline with done', async () => { - const callOrder: number[] = []; - - const handlers: Array> = [ - () => { - return asyncSleep(1).then(() => { - callOrder.push(1); - }); - }, - () => { - return asyncSleep(1).then(() => { - callOrder.push(2); - }); - }, - (input, { done }) => { - return asyncSleep(1).then(() => { - callOrder.push(3); - done(); - }); - }, - ]; - const pipeline = new Pipeline(handlers); - const result = await pipeline.execute(); - - expect(result.ok).to.be.true; - expect(callOrder).to.deep.equal([1, 2, 3]); - }); - - it('should support a null result to done()', async () => { - const handlers: Array> = [ - (input, { done }) => { - return asyncSleep(1).then(() => { - done(null); - }); - }, - ]; - const pipeline = new Pipeline(handlers); - const result = await pipeline.execute(); - - expect(result.ok).to.be.true; - if (result.ok) { - expect(result.data).to.deep.equal([]); - } - }); - - it('should detect double done()', async () => { - const handlers: Array> = [ - (input, { done }) => { - done(); - done(); - }, - ]; - const pipeline = new Pipeline(handlers); - const result = await pipeline.execute(); - - expect(result.ok).to.be.false; - if (!result.ok) { - expect(result.errors).to.have.length(1); - } - }); - - it('should handle top-level errors', async () => { - const error = new Error('Some failure'); - const handlers: Array> = [ - () => { - throw error; - }, - ]; - const pipeline = new Pipeline(handlers); - const result = await pipeline.execute(); - - expect(result.ok).to.be.false; - if (!result.ok) { - expect(result.errors).to.have.length(1); - expect(result.errors[0].message).to.equal(error.message); - } - }); - - it('should handle top-level async errors', async () => { - const error = new Error('Some failure'); - const handlers: Array> = [ - async () => { - asyncSleep(5); - throw error; - }, - ]; - const pipeline = new Pipeline(handlers); - const result = await pipeline.execute(); - - expect(result.ok).to.be.false; - if (!result.ok) { - expect(result.errors).to.have.length(1); - expect(result.errors[0].message).to.equal(error.message); - } - }); - - it('should handle nested executions', async () => { - // handlers encode fibonacci - const handlers: Array> = [ - async (idx, { done }) => { - if (idx === 0 || idx === 1) { - return done(1); - } - return idx; - }, - async (idx, { execute, done }) => { - const result = await execute(idx - 1); - if (result.ok) { - let fnum = result.data[0]; - if (idx > 1) { - const r = await execute(idx - 2); - if (r.ok) fnum += r.data[0]; - else throw new Error('error'); - } - return done(fnum); - } - throw new Error('error'); - }, - ]; - - const pipeline = new Pipeline(handlers); - const N = 5; - const result = await pipeline.execute(N); - - expect(result.ok).to.be.true; - if (result.ok) { - // pick first result data, which is the top-level pipeline result - expect(result.data[0]).to.equal(8); - } - }); - - it('should handle allow extra context overriding', async () => { - type Extra = number; - const handlers: Array> = [ - (val, { done, execute, extra }) => { - if (extra === 42) { - return done(extra); - } - execute(val, 42); - return val; - }, - ]; - - const pipeline = new Pipeline(handlers); - const result = await pipeline.execute(0, 21); - - expect(result.ok).to.be.true; - if (result.ok) { - expect(result.data).to.deep.equal([42]); - } - }); - - it('should handle nested async errors', async () => { - const error = new Error('Some failure'); - const handlers: Array> = [ - async (counter) => { - if (counter === 0) { - throw error; - } - await asyncSleep(1); - return counter; - }, - async (counter, { execute, done }) => { - await asyncSleep(1); - execute(counter - 1); - if (counter > 1) { - execute(counter - 2); - } - return done(); - }, - ]; - - // handlers encode fibonacci - const pipeline = new Pipeline(handlers); - const N = 5; - const result = await pipeline.execute(N); - - expect(result.ok).to.be.false; - if (!result.ok) { - // we expect there to be fib(N+1) errors - expect(result.errors).to.have.length(8); - - result.errors.forEach((err) => { - const { message, inputDataStackTrace } = err; - expect(message).to.equal(error.message); - // first object should be the input passed to the erroring handler - expect(inputDataStackTrace[0]).to.equal(0); - // last object should be the input passed to the pipeline. - expect(inputDataStackTrace.at(-1)).to.equal(N); - }); - } else { - expect.fail('Expected not ok result'); - } - }); -}); diff --git a/src/core/pipeline.ts b/src/core/pipeline.ts deleted file mode 100644 index a6cc2e0b1..000000000 --- a/src/core/pipeline.ts +++ /dev/null @@ -1,309 +0,0 @@ -import { Awaitable } from '@vueuse/core'; -import { Maybe } from '@/src/types'; -import { defer, partitionByType } from '../utils'; - -/** - * Represents a pipeline error. - * - * The inputDataStackTrace property provides the inputs that caused the error. - * It is ordered by nested level, starting with the inner most execution context - * input. - * - * The cause property refers to the original thrown object that resulted in the - * error. - */ -export interface PipelineError { - message: string; - inputDataStackTrace: DataType[]; - cause: unknown; -} - -export interface PipelineResultSuccess { - ok: true; - data: ResultType[]; -} - -export interface PipelineResultError { - ok: false; - errors: PipelineError[]; -} - -/** - * Represents a pipeline's execution result. - * - * The data property holds any return values from handlers. - * - * The errors property holds any errors reported from (potentially nested) - * executions. - */ -export type PipelineResult = - | PipelineResultSuccess - | PipelineResultError; - -export const partitionResults = (arr: Array>) => - partitionByType( - (r: PipelineResult): r is PipelineResultSuccess => r.ok, - arr - ); - -function createPipelineError( - message: string, - input: DataType, - cause: unknown -) { - return { - message, - inputDataStackTrace: [input], - cause, - }; -} - -export interface IPipeline { - /** - * Runs a given input through a middleware pipeline. - * @param input - */ - execute(input: DataType): Promise>; -} - -const DoneSentinel: symbol = Symbol('DoneSentinel'); -type DoneSentinelType = symbol; -export type Done = (out?: Out) => DoneSentinelType; - -export interface PipelineContext { - /** - * Terminate the pipeline with an optional pipeline return value. - * @param pipelineReturn - */ - done: Done; - /** - * Execute the pipeline with the given input. - * @param input - */ - execute( - input: DataType, - extra?: ExtraContext - ): Promise>; - /** - * Register cleanup code - * @param callback - */ - onCleanup(callback: Function): void; - /** - * Any extra user-supplied data. - */ - extra?: ExtraContext; -} - -/** - * Represents an element/step of a pipeline. - * - * Handlers have three pipeline operations availble to them: - * - process input and produce output for the rest of the pipeline - * - terminate the pipeline and optionally produce a result - * - start a nested execution of the pipeline with new data - * - * Handlers receive input data via the `input` parameter and pass data down the - * pipeline by returning. Pipeline execution will await asynchronous handlers if - * they return a Promise that resolves to the output data. - * - * The second argument to a handler is a context object containing an - * `execute()` method and a `done()` method. - * - * A handler is free to start new pipeline executions by calling - * `execute(input)`. The handler does not need to await the `execute` call, as - * the top-level pipeline will track all nested executions. - * - * If a handler wishes to terminate the pipeline, it must call `done()`. This - * will signal the pipeline to terminate after the handler returns. An optional - * pipeline result value can be passed as the single argument to `done(output)`. - * If `done()` is signalled, then the handler's return value is ignored. - * - * To facilitate typing and to avoid accidentally forgetting to return a value - * in a handler, handlers are typed to return either the DataType or the return - * value of done(). - */ -export type Handler< - DataType, - ResultType = undefined, - ExtraContext = undefined -> = ( - input: DataType, - context: PipelineContext -) => Awaitable; - -/** - * Represents an executable pipeline. - * - * Features supported: - * - Execution of a pipeline in the given order of the provided handlers - * - Handlers can run nested executions of the same pipeline - * - Handlers can optionally transform data for downstream use - * - Early termination - * - Reporting errors. This includes un-nesting errors from nested executions. - * - Reporting data returned from terminating handlers, if any. - */ -export default class Pipeline< - DataType, - ResultType = undefined, - ExtraContext = undefined -> implements IPipeline -{ - private handlers: Handler[]; - - constructor(handlers?: Handler[]) { - this.handlers = Array.from(handlers ?? []); - } - - /** - * Executes the pipeline with a given input. - * - * This method will resolve once this execution context and all - * nested execution contexts have finished, allowing for aggregate - * error reporting. - * - * Extra context data can be passed to all handlers via the `.extra` property. - * In nested execution scenarios, handlers may choose to pass their own extra - * context data into `execute(arg, extra)`. If none is supplied, the extra - * context data from the outermost `execute()` call is used. - * - * @param input - * @param extraContext - * @returns {PipelineResult} - */ - async execute(input: DataType, extraContext?: ExtraContext) { - return this.startExecutionContext(input, extraContext); - } - - private async startExecutionContext( - input: DataType, - extraContext?: ExtraContext - ) { - const handlers = [...this.handlers]; - const nestedExecutions: Array< - Promise> - > = []; - const execution = defer>(); - const cleanupCallbacks: Function[] = []; - - const terminate = (result: Maybe, error?: Error) => { - cleanupCallbacks.forEach((callback) => { - try { - callback(); - } catch (e) { - console.error(e); - } - }); - - if (error) { - execution.reject(error); - } else { - execution.resolve(result); - } - }; - - const invokeHandler = async (data: DataType, index: number) => { - let doneInvoked = false; - // eslint-disable-next-line no-undef-init - let pipelineResult: ResultType | undefined = undefined; - const endOfPipeline = index >= handlers.length; - - const context: PipelineContext = { - done: (out?: ResultType): DoneSentinelType => { - if (doneInvoked) { - throw new Error('done() called twice!'); - } - - doneInvoked = true; - pipelineResult = out; - return DoneSentinel; - }, - execute: async (arg: DataType, innerExtra?: ExtraContext) => { - const promise = this.execute(arg, innerExtra ?? extraContext); - nestedExecutions.push(promise); - return promise; - }, - onCleanup: (callback: Function) => { - cleanupCallbacks.push(callback); - }, - extra: extraContext, - }; - - let output: DataType | DoneSentinelType; - - if (endOfPipeline) { - output = DoneSentinel; - } - - try { - if (endOfPipeline) { - output = DoneSentinel; - } else { - const handler = handlers[index]; - output = await handler(data, context); - } - } catch (thrown) { - const error = - thrown instanceof Error - ? thrown - : new Error(thrown ? String(thrown) : 'Unknown error occurred'); - terminate(undefined, error); - return; - } - - if (doneInvoked || endOfPipeline) { - terminate(pipelineResult); - return; - } - - invokeHandler(output as DataType, index + 1); - }; - - const result: PipelineResult = await (async () => { - try { - await invokeHandler(input, 0); - const ret = await execution.promise; - if (ret != null) { - return { - ok: true as const, - data: [ret] as Array, - }; - } - return { ok: true as const, data: [] }; - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - return { - ok: false as const, - errors: [createPipelineError(message, input, err)], - }; - } - })(); - - const innerResults = await Promise.all(nestedExecutions); - const [succeededInner, failedInner] = partitionResults(innerResults); - - if (failedInner.length > 0) { - const errors = failedInner.flatMap((failedResult) => { - const { errors: innerErrors } = failedResult; - // add current input to the input stack trace - innerErrors.forEach((err) => { - err.inputDataStackTrace.push(input); - }); - return innerErrors; - }); - - return { - ok: false as const, - errors, - }; - } - - if (result.ok) { - succeededInner.forEach((okResult) => { - result.data.push(...okResult.data); - }); - } - - return result; - } -} diff --git a/src/io/import/common.ts b/src/io/import/common.ts index 716117709..de8b75884 100644 --- a/src/io/import/common.ts +++ b/src/io/import/common.ts @@ -1,28 +1,100 @@ import { FetchCache } from '@/src/utils/fetch'; import { DataSource, FileSource } from '@/src/io/import/dataSource'; -import { Handler } from '@/src/core/pipeline'; import { ARCHIVE_FILE_TYPES } from '@/src/io/mimeTypes'; import { Awaitable } from '@vueuse/core'; import { Config } from '@/src/io/import/configJson'; +import { ChainHandler } from '@/src/utils/evaluateChain'; -interface DataResult { +export interface LoadableResult { + type: 'data'; + dataID: string; dataSource: DataSource; + dataType: 'image' | 'model'; } -export interface LoadableResult extends DataResult { - dataID: string; - dataType: 'image' | 'dicom' | 'model'; +export interface LoadableVolumeResult extends LoadableResult { + dataType: 'image'; } -export interface VolumeResult extends LoadableResult { - dataType: 'image' | 'dicom'; +export interface LoadableModelResult extends LoadableResult { + dataType: 'model'; } -export interface ConfigResult extends DataResult { +export interface ConfigResult { + type: 'config'; config: Config; + dataSource: DataSource; } -export type ImportResult = LoadableResult | ConfigResult | DataResult; +export interface OkayResult { + type: 'ok'; + dataSource: DataSource; +} + +export interface IntermediateResult { + type: 'intermediate'; + dataSources: DataSource[]; +} + +export interface ErrorResult { + type: 'error'; + error: Error; + dataSource: DataSource; +} + +export type ImportResult = + | LoadableResult + | ConfigResult + | IntermediateResult + | OkayResult + | ErrorResult; + +export type ImportDataSourcesResult = + | ConfigResult + | LoadableResult + | OkayResult + | ErrorResult; + +export const asLoadableResult = ( + dataID: string, + dataSource: DataSource, + dataType: 'image' | 'model' +): LoadableResult => ({ + type: 'data', + dataID, + dataSource, + dataType, +}); + +export const asIntermediateResult = ( + dataSources: DataSource[] +): IntermediateResult => ({ + type: 'intermediate', + dataSources, +}); + +export const asConfigResult = ( + dataSource: DataSource, + config: Config +): ConfigResult => ({ + type: 'config', + dataSource, + config, +}); + +export const asErrorResult = ( + error: Error, + dataSource: DataSource +): ErrorResult => ({ + type: 'error', + error, + dataSource, +}); + +export const asOkayResult = (dataSource: DataSource): OkayResult => ({ + type: 'ok', + dataSource, +}); export type ArchiveContents = Record; export type ArchiveCache = Map>; @@ -34,9 +106,20 @@ export interface ImportContext { archiveCache?: ArchiveCache; // Records dicom files dicomDataSources?: DataSource[]; + onCleanup?: (fn: () => void) => void; + /** + * A reference to importDataSources for nested imports. + */ + importDataSources?: ( + dataSources: DataSource[] + ) => Promise; } -export type ImportHandler = Handler; +export type ImportHandler = ChainHandler< + DataSource, + ImportResult, + ImportContext +>; export function isArchive( ds: DataSource @@ -47,20 +130,17 @@ export function isArchive( export function isLoadableResult( importResult: ImportResult ): importResult is LoadableResult { - return 'dataID' in importResult && 'dataType' in importResult; + return importResult.type === 'data'; } export function isVolumeResult( importResult: ImportResult -): importResult is VolumeResult { - return ( - isLoadableResult(importResult) && - (importResult.dataType === 'image' || importResult.dataType === 'dicom') - ); +): importResult is LoadableVolumeResult { + return isLoadableResult(importResult) && importResult.dataType === 'image'; } export function isConfigResult( importResult: ImportResult ): importResult is ConfigResult { - return 'config' in importResult; + return importResult.type === 'config'; } diff --git a/src/io/import/importDataSources.ts b/src/io/import/importDataSources.ts index 2357dbe13..608d0c75c 100644 --- a/src/io/import/importDataSources.ts +++ b/src/io/import/importDataSources.ts @@ -1,18 +1,19 @@ -import Pipeline, { - PipelineResult, - PipelineResultSuccess, -} from '@/src/core/pipeline'; import { - isConfigResult, ImportHandler, ImportResult, - isLoadableResult, - VolumeResult, + asErrorResult, + asLoadableResult, + ConfigResult, + LoadableVolumeResult, + LoadableResult, + ErrorResult, + ImportDataSourcesResult, + asIntermediateResult, } from '@/src/io/import/common'; import { DataSource, ChunkDataSource } from '@/src/io/import/dataSource'; import handleDicomFile from '@/src/io/import/processors/handleDicomFile'; import extractArchive from '@/src/io/import/processors/extractArchive'; -import extractArchiveTargetFromCache from '@/src/io/import/processors/extractArchiveTarget'; +import extractArchiveTarget from '@/src/io/import/processors/extractArchiveTarget'; import handleAmazonS3 from '@/src/io/import/processors/handleAmazonS3'; import handleGoogleCloudStorage from '@/src/io/import/processors/handleGoogleCloudStorage'; import importSingleFile from '@/src/io/import/processors/importSingleFile'; @@ -27,101 +28,84 @@ import downloadStream from '@/src/io/import/processors/downloadStream'; import handleDicomStream from '@/src/io/import/processors/handleDicomStream'; import { FILE_EXT_TO_MIME } from '@/src/io/mimeTypes'; import { importDicomChunks } from '@/src/actions/importDicomChunks'; - -/** - * Tries to turn a thrown object into a meaningful error string. - * @param error - * @returns - */ -function toMeaningfulErrorString(thrown: unknown) { - const strThrown = String(thrown); - if (!strThrown || strThrown === '[object Object]') { - return 'Unknown error. More details in the dev console.'; - } - return strThrown; -} +import { asyncSelect } from '@/src/utils/asyncSelect'; +import { evaluateChain, Skip } from '@/src/utils/evaluateChain'; +import { ensureError, partition } from '@/src/utils'; +import { Chunk } from '@/src/core/streaming/chunk'; +import { useDatasetStore } from '@/src/store/datasets'; const unhandledResource: ImportHandler = () => { throw new Error('Failed to handle resource'); }; -function isSelectable( - result: PipelineResult -): result is PipelineResultSuccess { - if (!result.ok) return false; - if (result.data.length === 0) { - return false; - } - const importResult = result.data[0]; - if (!isLoadableResult(importResult)) { - return false; - } - if (importResult.dataType === 'model') { - return false; - } +const handleCollections: ImportHandler = (dataSource) => { + if (!dataSource.collectionSrc) return Skip; + return asIntermediateResult(dataSource.collectionSrc.sources); +}; - return true; +function isSelectable(result: ImportResult): result is LoadableVolumeResult { + return result.type === 'data' && result.dataType === 'image'; } -const importConfigs = async ( - results: Array> -) => { - try { - results - .flatMap((pipelineResult) => - pipelineResult.ok ? pipelineResult.data : [] - ) - .filter(isConfigResult) - .map(({ config }) => config) - .forEach(applyConfig); - return { - ok: true as const, - data: [], - }; - } catch (err) { - return { - ok: false as const, - errors: [ - { - message: toMeaningfulErrorString(err), - cause: err, - inputDataStackTrace: [], - }, - ], - }; - } +const importConfigs = ( + results: Array +): (ConfigResult | ErrorResult)[] => { + return results.map((result) => { + try { + applyConfig(result.config); + return result; + } catch (err) { + return asErrorResult(ensureError(err), result.dataSource); + } + }); }; async function importDicomChunkSources(sources: ChunkDataSource[]) { if (sources.length === 0) return []; - const dataIds = await importDicomChunks( + const volumeChunks = await importDicomChunks( sources.map((src) => src.chunkSrc.chunk) ); - return [ - { - ok: true as const, - data: dataIds.map((id) => ({ - dataID: id, - dataType: 'dicom' as const, - dataSource: { - collectionSrc: { - sources, - }, + + // this is used to reconstruct the ChunkDataSource list + const chunkToDataSource = new Map(); + sources.forEach((src) => { + chunkToDataSource.set(src.chunkSrc.chunk, src); + }); + + return Object.entries(volumeChunks).map(([id, chunks]) => + asLoadableResult( + id, + { + collectionSrc: { + sources: chunks.map((chunk) => chunkToDataSource.get(chunk)!), }, - })), - }, - ]; + }, + 'image' + ) + ); } export async function importDataSources( dataSources: DataSource[] -): Promise[]> { +): Promise { + const cleanupHandlers: Array<() => void> = []; + const onCleanup = (fn: () => void) => { + cleanupHandlers.push(fn); + }; + const cleanup = () => { + while (cleanupHandlers.length) cleanupHandlers.pop()!(); + }; + const importContext = { fetchFileCache: new Map(), + onCleanup, + importDataSources, }; - const middleware = [ + const handlers = [ + handleCollections, + openUriStream, // updating the file/uri type should be first step in the pipeline @@ -138,8 +122,8 @@ export async function importDataSources( handleDicomStream, downloadStream, - extractArchiveTargetFromCache, extractArchive, + extractArchiveTarget, handleConfig, // collect config files to apply later // should be before importSingleFile, since DICOM is more specific handleDicomFile, // collect DICOM files to import later @@ -147,50 +131,80 @@ export async function importDataSources( // catch any unhandled resource unhandledResource, ]; - const loader = new Pipeline(middleware); - const results = await Promise.all( - dataSources.map((r) => loader.execute(r, importContext)) - ); + const chunkSources: DataSource[] = []; + const configResults: ConfigResult[] = []; + const results: ImportDataSourcesResult[] = []; - const successfulResults = results.filter( - (result): result is PipelineResultSuccess => result.ok - ); + let queue = [ + ...dataSources.map((src) => evaluateChain(src, handlers, importContext)), + ]; - const chunks = successfulResults - .flatMap((result) => result.data) - .map((data) => data.dataSource) - .filter((src): src is ChunkDataSource => !!src.chunkSrc); + /* eslint-disable no-await-in-loop */ + while (queue.length) { + const { promise, index, rest } = await asyncSelect(queue); + const result = await promise.catch((err) => + asErrorResult(err, dataSources[index]) + ); + queue = rest; + + switch (result.type) { + case 'intermediate': { + const [chunks, otherSources] = partition( + (ds) => !!ds.chunkSrc, + result.dataSources + ); + chunkSources.push(...chunks); + + // try loading intermediate results + queue.push( + ...otherSources.map((src) => + evaluateChain(src, handlers, importContext) + ) + ); + break; + } + case 'config': + configResults.push(result); + break; + case 'ok': + case 'data': + case 'error': + results.push(result); + break; + default: + throw new Error(`Invalid result: ${result}`); + } + } + /* eslint-enable no-await-in-loop */ + + cleanup(); + + results.push(...importConfigs(configResults)); - const dicomChunks = chunks.filter( - (ch) => ch.chunkSrc.mime === FILE_EXT_TO_MIME.dcm + results.push( + ...(await importDicomChunkSources( + chunkSources.filter( + (src): src is ChunkDataSource => + src.chunkSrc?.mime === FILE_EXT_TO_MIME.dcm + ) + )) ); - const configResult = await importConfigs(results); - const dicomChunkResult = await importDicomChunkSources(dicomChunks); + // save data sources + useDatasetStore().addDataSources( + results.filter((result): result is LoadableResult => result.type === 'data') + ); - return [ - ...results, - ...dicomChunkResult, - configResult, - // Consuming code expects only errors and image import results. - // Remove ok results that don't result in something to load (like config.JSON files) - ].filter((result) => !result.ok || isSelectable(result)); + return results; } -export type ImportDataSourcesResult = Awaited< - ReturnType ->[number]; - -export function toDataSelection(loadable: VolumeResult) { +export function toDataSelection(loadable: LoadableVolumeResult) { const { dataID } = loadable; return dataID; } -export function convertSuccessResultToDataSelection( - result: ImportDataSourcesResult -) { +export function convertSuccessResultToDataSelection(result: ImportResult) { if (!isSelectable(result)) return null; - const importResult = result.data[0]; - return toDataSelection(importResult); + return toDataSelection(result); } diff --git a/src/io/import/processors/downloadStream.ts b/src/io/import/processors/downloadStream.ts index 5e7333ad3..6bfeeca36 100644 --- a/src/io/import/processors/downloadStream.ts +++ b/src/io/import/processors/downloadStream.ts @@ -1,4 +1,5 @@ -import { ImportHandler } from '@/src/io/import/common'; +import { Skip } from '@/src/utils/evaluateChain'; +import { ImportHandler, asIntermediateResult } from '@/src/io/import/common'; import { ensureError } from '@/src/utils'; /** @@ -11,10 +12,11 @@ import { ensureError } from '@/src/utils'; * @param dataSource * @returns */ -const downloadStream: ImportHandler = async (dataSource, { execute, done }) => { - const { fileSrc, uriSrc } = dataSource; - if (fileSrc || !uriSrc?.fetcher) { - return dataSource; +const downloadStream: ImportHandler = async (dataSource) => { + const { fileSrc, uriSrc, chunkSrc } = dataSource; + // existence of a chunkSrc means that the stream doesn't need to be downloaded. + if (fileSrc || chunkSrc || !uriSrc?.fetcher) { + return Skip; } const { fetcher } = uriSrc; @@ -26,14 +28,15 @@ const downloadStream: ImportHandler = async (dataSource, { execute, done }) => { type: uriSrc.mime, }); - execute({ - ...dataSource, - fileSrc: { - file, - fileType: file.type, + return asIntermediateResult([ + { + ...dataSource, + fileSrc: { + file, + fileType: file.type, + }, }, - }); - return done(); + ]); } catch (err) { throw new Error( `Could not download stream associated with URL ${uriSrc.uri}`, diff --git a/src/io/import/processors/extractArchive.ts b/src/io/import/processors/extractArchive.ts index f8191a355..88329b7fe 100644 --- a/src/io/import/processors/extractArchive.ts +++ b/src/io/import/processors/extractArchive.ts @@ -1,28 +1,33 @@ import { extractFilesFromZip } from '@/src/io/zip'; -import { ImportHandler, isArchive } from '@/src/io/import/common'; +import { + ImportHandler, + asIntermediateResult, + isArchive, +} from '@/src/io/import/common'; +import { Skip } from '@/src/utils/evaluateChain'; /** * Extracts all files from an archive. * @param dataSource */ -const extractArchive: ImportHandler = async (dataSource, { execute, done }) => { +const extractArchive: ImportHandler = async (dataSource) => { if (isArchive(dataSource)) { const files = await extractFilesFromZip(dataSource.fileSrc.file); - files.forEach((entry) => { - execute({ + const newSources = files.map((entry) => { + return { fileSrc: { file: entry.file, fileType: '', }, archiveSrc: { - path: `${entry.archivePath}/${entry.file.name}`, + path: entry.archivePath, }, parent: dataSource, - }); + }; }); - return done(); + return asIntermediateResult(newSources); } - return dataSource; + return Skip; }; export default extractArchive; diff --git a/src/io/import/processors/extractArchiveTarget.ts b/src/io/import/processors/extractArchiveTarget.ts index b13ec420e..fb30f6326 100644 --- a/src/io/import/processors/extractArchiveTarget.ts +++ b/src/io/import/processors/extractArchiveTarget.ts @@ -1,36 +1,10 @@ import { - ArchiveCache, - ArchiveContents, + asIntermediateResult, ImportHandler, isArchive, } from '@/src/io/import/common'; -import { extractFilesFromZip } from '@/src/io/zip'; -import { Maybe } from '@/src/types'; -import * as path from '@/src/utils/path'; -import { Awaitable } from '@vueuse/core'; - -async function extractArchiveContents(archiveFile: File, cache?: ArchiveCache) { - let contentsPromise: Maybe> = - cache?.get(archiveFile); - if (contentsPromise) { - return contentsPromise; - } - - contentsPromise = extractFilesFromZip(archiveFile).then((files) => { - return files.reduce((mapping, fileEntry) => { - const fullPath = path.join(fileEntry.archivePath, fileEntry.file.name); - return Object.assign(mapping, { - [fullPath]: fileEntry.file, - }); - }, {} as ArchiveContents); - }); - - if (cache) { - cache.set(archiveFile, contentsPromise); - } - - return contentsPromise; -} +import { extractFileFromZip } from '@/src/io/zip'; +import { Skip } from '@/src/utils/evaluateChain'; /** * Extracts a single target file from an archive. @@ -43,12 +17,8 @@ async function extractArchiveContents(archiveFile: File, cache?: ArchiveCache) { * @param dataSource * @returns */ -const extractArchiveTarget: ImportHandler = async ( - dataSource, - { extra, execute, done } -) => { +const extractArchiveTarget: ImportHandler = async (dataSource) => { const { fileSrc, archiveSrc, parent } = dataSource; - const { archiveCache } = extra ?? {}; if (!fileSrc && archiveSrc && parent) { if (!parent?.fileSrc) { @@ -61,28 +31,23 @@ const extractArchiveTarget: ImportHandler = async ( throw new Error('Parent is not a supported archive file'); } - const archiveContents = await extractArchiveContents( + const targetFile = await extractFileFromZip( parent.fileSrc.file, - archiveCache + archiveSrc.path ); - const targetName = path.normalize(archiveSrc.path); - const targetFile = archiveContents[targetName]; - if (!targetFile) { - throw new Error(`Failed to find archive member ${targetName}`); - } - - execute({ - ...dataSource, - fileSrc: { - file: targetFile, - fileType: '', + return asIntermediateResult([ + { + ...dataSource, + fileSrc: { + file: targetFile, + fileType: '', + }, }, - }); - return done(); + ]); } - return dataSource; + return Skip; }; export default extractArchiveTarget; diff --git a/src/io/import/processors/handleAmazonS3.ts b/src/io/import/processors/handleAmazonS3.ts index a519736a8..569d391d9 100644 --- a/src/io/import/processors/handleAmazonS3.ts +++ b/src/io/import/processors/handleAmazonS3.ts @@ -1,12 +1,15 @@ +import { Skip } from '@/src/utils/evaluateChain'; import { getObjectsFromS3, isAmazonS3Uri } from '@/src/io/amazonS3'; -import { ImportHandler } from '@/src/io/import/common'; +import { ImportHandler, asIntermediateResult } from '@/src/io/import/common'; +import { DataSource } from '@/src/io/import/dataSource'; -const handleAmazonS3: ImportHandler = async (dataSource, { execute, done }) => { +const handleAmazonS3: ImportHandler = async (dataSource) => { const { uriSrc } = dataSource; if (uriSrc && isAmazonS3Uri(uriSrc.uri)) { try { + const newSources: DataSource[] = []; await getObjectsFromS3(uriSrc.uri, (name, url) => { - execute({ + newSources.push({ uriSrc: { uri: url, name, @@ -14,14 +17,14 @@ const handleAmazonS3: ImportHandler = async (dataSource, { execute, done }) => { parent: dataSource, }); }); - return done(); + return asIntermediateResult(newSources); } catch (err) { throw new Error(`Could not download S3 URI ${uriSrc.uri}`, { cause: err instanceof Error ? err : undefined, }); } } - return dataSource; + return Skip; }; export default handleAmazonS3; diff --git a/src/io/import/processors/handleConfig.ts b/src/io/import/processors/handleConfig.ts index b093ba236..cf909226b 100644 --- a/src/io/import/processors/handleConfig.ts +++ b/src/io/import/processors/handleConfig.ts @@ -1,29 +1,30 @@ -import { ImportHandler } from '@/src/io/import/common'; +import { ImportHandler, asConfigResult } from '@/src/io/import/common'; import { ensureError } from '@/src/utils'; import { readConfigFile } from '@/src/io/import/configJson'; +import { Skip } from '@/src/utils/evaluateChain'; /** * Reads a JSON file with label config and updates stores. * @param dataSource * @returns */ -const handleConfig: ImportHandler = async (dataSource, { done }) => { +const handleConfig: ImportHandler = async (dataSource) => { const { fileSrc } = dataSource; if (fileSrc?.fileType === 'application/json') { try { const manifest = await readConfigFile(fileSrc.file); // Don't consume JSON if it has no known key if (Object.keys(manifest).length === 0) { - return dataSource; + return Skip; } - return done({ dataSource, config: manifest }); + return asConfigResult(dataSource, manifest); } catch (err) { throw new Error('Failed to parse config file', { cause: ensureError(err), }); } } - return dataSource; + return Skip; }; export default handleConfig; diff --git a/src/io/import/processors/handleDicomFile.ts b/src/io/import/processors/handleDicomFile.ts index e77b0234b..1a6de38e5 100644 --- a/src/io/import/processors/handleDicomFile.ts +++ b/src/io/import/processors/handleDicomFile.ts @@ -1,8 +1,9 @@ +import { Skip } from '@/src/utils/evaluateChain'; import { Chunk } from '@/src/core/streaming/chunk'; import { DicomFileDataLoader } from '@/src/core/streaming/dicom/dicomFileDataLoader'; import { DicomFileMetaLoader } from '@/src/core/streaming/dicom/dicomFileMetaLoader'; import { ReadDicomTagsFunction } from '@/src/core/streaming/dicom/dicomMetaLoader'; -import { ImportHandler } from '@/src/io/import/common'; +import { ImportHandler, asIntermediateResult } from '@/src/io/import/common'; import { getWorker } from '@/src/io/itk/worker'; import { FILE_EXT_TO_MIME } from '@/src/io/mimeTypes'; import { readDicomTags } from '@itk-wasm/dicom'; @@ -12,10 +13,10 @@ import { readDicomTags } from '@itk-wasm/dicom'; * @param dataSource * @returns */ -const handleDicomFile: ImportHandler = async (dataSource, { done }) => { +const handleDicomFile: ImportHandler = async (dataSource) => { const { fileSrc } = dataSource; if (fileSrc?.fileType !== FILE_EXT_TO_MIME.dcm) { - return dataSource; + return Skip; } const readTags: ReadDicomTagsFunction = async (file) => { @@ -32,15 +33,15 @@ const handleDicomFile: ImportHandler = async (dataSource, { done }) => { await chunk.loadMeta(); - return done({ - dataSource: { + return asIntermediateResult([ + { ...dataSource, chunkSrc: { chunk, mime: FILE_EXT_TO_MIME.dcm, }, }, - }); + ]); }; export default handleDicomFile; diff --git a/src/io/import/processors/handleDicomStream.ts b/src/io/import/processors/handleDicomStream.ts index b96a111f7..0aad299e0 100644 --- a/src/io/import/processors/handleDicomStream.ts +++ b/src/io/import/processors/handleDicomStream.ts @@ -1,3 +1,4 @@ +import { Skip } from '@/src/utils/evaluateChain'; import { CachedStreamFetcher } from '@/src/core/streaming/cachedStreamFetcher'; import { Chunk } from '@/src/core/streaming/chunk'; import { DicomDataLoader } from '@/src/core/streaming/dicom/dicomDataLoader'; @@ -6,17 +7,19 @@ import { ReadDicomTagsFunction, } from '@/src/core/streaming/dicom/dicomMetaLoader'; import { getRequestPool } from '@/src/core/streaming/requestPool'; -import { ImportHandler } from '@/src/io/import/common'; +import { ImportHandler, asIntermediateResult } from '@/src/io/import/common'; import { getWorker } from '@/src/io/itk/worker'; import { FILE_EXT_TO_MIME } from '@/src/io/mimeTypes'; import { readDicomTags } from '@itk-wasm/dicom'; -const handleDicomStream: ImportHandler = async (dataSource, { done }) => { - const { fileSrc, uriSrc } = dataSource; +const handleDicomStream: ImportHandler = async (dataSource) => { + const { fileSrc, uriSrc, chunkSrc } = dataSource; if (fileSrc || uriSrc?.mime !== FILE_EXT_TO_MIME.dcm) { - return dataSource; + return Skip; } + if (chunkSrc?.chunk && chunkSrc?.mime === FILE_EXT_TO_MIME.dcm) return Skip; + const fetcher = uriSrc.fetcher ?? new CachedStreamFetcher(uriSrc.uri, { @@ -37,15 +40,15 @@ const handleDicomStream: ImportHandler = async (dataSource, { done }) => { await chunk.loadMeta(); - return done({ - dataSource: { + return asIntermediateResult([ + { ...dataSource, chunkSrc: { chunk, mime: FILE_EXT_TO_MIME.dcm, }, }, - }); + ]); }; export default handleDicomStream; diff --git a/src/io/import/processors/handleGoogleCloudStorage.ts b/src/io/import/processors/handleGoogleCloudStorage.ts index b0f3813d8..0e7051a8e 100644 --- a/src/io/import/processors/handleGoogleCloudStorage.ts +++ b/src/io/import/processors/handleGoogleCloudStorage.ts @@ -1,18 +1,18 @@ +import { Skip } from '@/src/utils/evaluateChain'; import { getObjectsFromGsUri, isGoogleCloudStorageUri, } from '@/src/io/googleCloudStorage'; -import { ImportHandler } from '@/src/io/import/common'; +import { ImportHandler, asIntermediateResult } from '@/src/io/import/common'; +import { DataSource } from '@/src/io/import/dataSource'; -const handleGoogleCloudStorage: ImportHandler = async ( - dataSource, - { execute, done } -) => { +const handleGoogleCloudStorage: ImportHandler = async (dataSource) => { const { uriSrc } = dataSource; if (uriSrc && isGoogleCloudStorageUri(uriSrc.uri)) { try { + const newSources: DataSource[] = []; await getObjectsFromGsUri(uriSrc.uri, (object) => { - execute({ + newSources.push({ uriSrc: { uri: object.mediaLink, name: object.name, @@ -20,14 +20,14 @@ const handleGoogleCloudStorage: ImportHandler = async ( parent: dataSource, }); }); - return done(); + return asIntermediateResult(newSources); } catch (err) { throw new Error(`Could not download GCS URI ${uriSrc.uri}`, { cause: err instanceof Error ? err : undefined, }); } } - return dataSource; + return Skip; }; export default handleGoogleCloudStorage; diff --git a/src/io/import/processors/importSingleFile.ts b/src/io/import/processors/importSingleFile.ts index 84d408714..ac5c0c1d0 100644 --- a/src/io/import/processors/importSingleFile.ts +++ b/src/io/import/processors/importSingleFile.ts @@ -1,46 +1,38 @@ import vtkImageData from '@kitware/vtk.js/Common/DataModel/ImageData'; import vtkPolyData from '@kitware/vtk.js/Common/DataModel/PolyData'; -import { useFileStore } from '@/src/store/datasets-files'; import { useImageStore } from '@/src/store/datasets-images'; import { useModelStore } from '@/src/store/datasets-models'; import { FILE_READERS } from '@/src/io'; -import { ImportHandler } from '@/src/io/import/common'; -import { FileDataSource } from '@/src/io/import/dataSource'; +import { ImportHandler, asLoadableResult } from '@/src/io/import/common'; import { useDatasetStore } from '@/src/store/datasets'; import { useMessageStore } from '@/src/store/messages'; +import { Skip } from '@/src/utils/evaluateChain'; /** * Reads and imports a file DataSource. * @param dataSource * @returns */ -const importSingleFile: ImportHandler = async (dataSource, { done }) => { +const importSingleFile: ImportHandler = async (dataSource) => { if (!dataSource.fileSrc) { - return dataSource; + return Skip; } const { fileSrc } = dataSource; if (!FILE_READERS.has(fileSrc.fileType)) { - return dataSource; + return Skip; } const reader = FILE_READERS.get(fileSrc.fileType)!; const dataObject = await reader(fileSrc.file); - const fileStore = useFileStore(); - if (dataObject.isA('vtkImageData')) { const dataID = useImageStore().addVTKImageData( fileSrc.file.name, dataObject as vtkImageData ); - fileStore.add(dataID, [dataSource as FileDataSource]); - return done({ - dataID, - dataSource, - dataType: 'image', - }); + return asLoadableResult(dataID, dataSource, 'image'); } if (dataObject.isA('vtkPolyData')) { @@ -53,13 +45,8 @@ const importSingleFile: ImportHandler = async (dataSource, { done }) => { fileSrc.file.name, dataObject as vtkPolyData ); - fileStore.add(dataID, [dataSource as FileDataSource]); - return done({ - dataID, - dataSource, - dataType: 'model', - }); + return asLoadableResult(dataID, dataSource, 'model'); } throw new Error('Data reader did not produce a valid dataset'); diff --git a/src/io/import/processors/openUriStream.ts b/src/io/import/processors/openUriStream.ts index 9a9fdb626..4426cd723 100644 --- a/src/io/import/processors/openUriStream.ts +++ b/src/io/import/processors/openUriStream.ts @@ -1,16 +1,17 @@ +import { Skip } from '@/src/utils/evaluateChain'; import { CachedStreamFetcher } from '@/src/core/streaming/cachedStreamFetcher'; import { getRequestPool } from '@/src/core/streaming/requestPool'; -import { ImportHandler } from '@/src/io/import/common'; +import { ImportHandler, asIntermediateResult } from '@/src/io/import/common'; import { canFetchUrl } from '@/src/utils/fetch'; -const openUriStream: ImportHandler = async (dataSource, { onCleanup }) => { +const openUriStream: ImportHandler = async (dataSource, context) => { const { uriSrc } = dataSource; if (!uriSrc || !canFetchUrl(uriSrc.uri)) { - return dataSource; + return Skip; } if (uriSrc.fetcher?.connected) { - return dataSource; + return Skip; } const fetcher = new CachedStreamFetcher(uriSrc.uri, { @@ -20,17 +21,19 @@ const openUriStream: ImportHandler = async (dataSource, { onCleanup }) => { await fetcher.connect(); // ensure we close the connection on completion - onCleanup(() => { + context?.onCleanup?.(() => { fetcher.close(); }); - return { - ...dataSource, - uriSrc: { - ...uriSrc, - fetcher, + return asIntermediateResult([ + { + ...dataSource, + uriSrc: { + ...uriSrc, + fetcher, + }, }, - }; + ]); }; export default openUriStream; diff --git a/src/io/import/processors/remoteManifest.ts b/src/io/import/processors/remoteManifest.ts index 9e85f3a1b..b3c09abea 100644 --- a/src/io/import/processors/remoteManifest.ts +++ b/src/io/import/processors/remoteManifest.ts @@ -1,40 +1,38 @@ import { DataSource } from '@/src/io/import/dataSource'; -import { ImportHandler } from '@/src/io/import/common'; +import { ImportHandler, asIntermediateResult } from '@/src/io/import/common'; import { readRemoteManifestFile } from '@/src/io/manifest'; +import { Skip } from '@/src/utils/evaluateChain'; +import { ZodError } from 'zod'; /** * Reads a JSON file that conforms to the remote manifest spec. * @param dataSource * @returns */ -const handleRemoteManifest: ImportHandler = async ( - dataSource, - { done, execute } -) => { +const handleRemoteManifest: ImportHandler = async (dataSource) => { const { fileSrc } = dataSource; - if (fileSrc?.fileType === 'application/json') { + if (fileSrc?.fileType !== 'application/json') { + return Skip; + } + + try { const remotes: DataSource[] = []; - try { - const manifest = await readRemoteManifestFile(fileSrc.file); - manifest.resources.forEach((res) => { - remotes.push({ - uriSrc: { - uri: res.url, - name: res.name ?? new URL(res.url, window.location.origin).pathname, - }, - parent: dataSource, - }); + const manifest = await readRemoteManifestFile(fileSrc.file); + manifest.resources.forEach((res) => { + remotes.push({ + uriSrc: { + uri: res.url, + name: res.name ?? new URL(res.url, window.location.origin).pathname, + }, + parent: dataSource, }); - } catch (err) { - return dataSource; - } - - remotes.forEach((remote) => { - execute(remote); }); - return done(); + + return asIntermediateResult(remotes); + } catch (err) { + if (err instanceof ZodError) return Skip; + throw err; } - return dataSource; }; export default handleRemoteManifest; diff --git a/src/io/import/processors/resolveIncompleteDataSource.ts b/src/io/import/processors/resolveIncompleteDataSource.ts deleted file mode 100644 index 00520e906..000000000 --- a/src/io/import/processors/resolveIncompleteDataSource.ts +++ /dev/null @@ -1,108 +0,0 @@ -import Pipeline from '@/src/core/pipeline'; -import { ImportHandler } from '@/src/io/import/common'; -import doneWithDataSource from '@/src/io/import/processors/doneWithDataSource'; -import downloadUrl from '@/src/io/import/processors/downloadUrl'; -import extractArchiveTarget from '@/src/io/import/processors/extractArchiveTarget'; -import updateFileMimeType from '@/src/io/import/processors/updateFileMimeType'; -import { ensureError } from '@/src/utils'; - -/** - * Resolves a parent that is a UriSource. - * - * The input data source's parent into { fileSrc, parent: { uriSrc }} - * @param dataSource - * @param param1 - * @returns - */ -const resolveParentUri: ImportHandler = async (dataSource, { extra }) => { - const { parent } = dataSource; - if (parent?.uriSrc) { - const pipeline = new Pipeline([ - updateFileMimeType, - downloadUrl, - doneWithDataSource, - ]); - const result = await pipeline.execute(parent, extra); - if (!result.ok) { - throw new Error('Failed to resolve data source with URI', { - cause: ensureError(result.errors[0].cause), - }); - } - - // replace the parent with the result data source. - return { - ...dataSource, - parent: result.data[0].dataSource, - }; - } - return dataSource; -}; - -/** - * Resolves an incomplete archive member. - * - * Transforms the input data source by adding a FileSource. - * @param dataSource - * @param param1 - */ -const resolveArchiveMember: ImportHandler = async (dataSource, { extra }) => { - if (dataSource.archiveSrc) { - const pipeline = new Pipeline([ - updateFileMimeType, - extractArchiveTarget, - doneWithDataSource, - ]); - const result = await pipeline.execute(dataSource, extra); - if (!result.ok) { - throw new Error('Failed to resolve archive member', { - cause: ensureError(result.errors[0].cause), - }); - } - - // extractArchiveTarget returns the fully resolved data source. - return result.data[0].dataSource; - } - return dataSource; -}; - -/** - * Resolves an incomplete data source. - * - * Should be used after resolveParent in the same pipeline. - * - * There are two general kinds of unresolved data sources: - * 1. URI src not downloaded - * 2. archive member not extracted - * @param dataSource - * @returns - */ -const resolveIncompleteDataSource: ImportHandler = async ( - dataSource, - { extra } -) => { - // if fileSrc already exists, continue. - if (dataSource.fileSrc) { - return dataSource; - } - - const { parent } = dataSource; - if (!parent) { - return dataSource; - } - - const pipeline = new Pipeline([ - resolveParentUri, - resolveArchiveMember, - doneWithDataSource, - ]); - const result = await pipeline.execute(dataSource, extra); - if (!result.ok) { - throw new Error('Failed to resolve data source', { - cause: ensureError(result.errors[0].cause), - }); - } - - return result.data[0].dataSource; -}; - -export default resolveIncompleteDataSource; diff --git a/src/io/import/processors/restoreStateFile.ts b/src/io/import/processors/restoreStateFile.ts index ec446bde6..088b4dcc1 100644 --- a/src/io/import/processors/restoreStateFile.ts +++ b/src/io/import/processors/restoreStateFile.ts @@ -1,25 +1,18 @@ -import { Dataset, Manifest, ManifestSchema } from '@/src/io/state-file/schema'; -import { FileEntry } from '@/src/io/types'; -import * as path from '@/src/utils/path'; import { - ArchiveContents, + DataSourceType, + Manifest, + ManifestSchema, +} from '@/src/io/state-file/schema'; +import { + asOkayResult, ImportContext, ImportHandler, ImportResult, - isLoadableResult, } from '@/src/io/import/common'; -import { - DataSource, - FileDataSource, - fileToDataSource, -} from '@/src/io/import/dataSource'; +import { DataSource } from '@/src/io/import/dataSource'; import { MANIFEST, isStateFile } from '@/src/io/state-file'; -import { ensureError, partition } from '@/src/utils'; +import { partition } from '@/src/utils'; import { pipe } from '@/src/utils/functional'; -import Pipeline, { PipelineContext } from '@/src/core/pipeline'; -import { Awaitable } from '@vueuse/core'; -import doneWithDataSource from '@/src/io/import/processors/doneWithDataSource'; -import { useDICOMStore } from '@/src/store/datasets-dicom'; import { useViewStore } from '@/src/store/views'; import { useDatasetStore } from '@/src/store/datasets'; import { @@ -29,9 +22,14 @@ import { import { useToolStore } from '@/src/store/tools'; import { useLayersStore } from '@/src/store/datasets-layers'; import { extractFilesFromZip } from '@/src/io/zip'; -import downloadUrl from '@/src/io/import/processors/downloadUrl'; import updateFileMimeType from '@/src/io/import/processors/updateFileMimeType'; import extractArchiveTarget from '@/src/io/import/processors/extractArchiveTarget'; +import { ChainHandler, evaluateChain, Skip } from '@/src/utils/evaluateChain'; +import openUriStream from '@/src/io/import/processors/openUriStream'; +import updateUriType from '@/src/io/import/processors/updateUriType'; +import handleDicomStream from '@/src/io/import/processors/handleDicomStream'; +import downloadStream from '@/src/io/import/processors/downloadStream'; +import { FileEntry } from '@/src/io/types'; const LABELMAP_PALETTE_2_1_0 = { '1': { @@ -156,190 +154,168 @@ const migrateManifest = (manifestString: string) => { ); }; -const resolveUriSource: ImportHandler = async (dataSource, { extra, done }) => { - const { uriSrc } = dataSource; - - if (uriSrc) { - const result = await new Pipeline([ - downloadUrl, - updateFileMimeType, - doneWithDataSource, - ]).execute(dataSource, extra); - if (!result.ok) { - throw result.errors[0].cause; +type ResolvedResult = { + type: 'resolved'; + dataSource: DataSource; +}; + +type ResolvingImportHandler = ChainHandler< + DataSource, + ImportResult | ResolvedResult, + ImportContext +>; + +const resolvingHandlers: ResolvingImportHandler[] = [ + openUriStream, + + // updating the file/uri type should be first step in the pipeline + updateFileMimeType, + updateUriType, + + // stream handling + handleDicomStream, + downloadStream, + + extractArchiveTarget, + + (dataSource) => { + return { type: 'resolved', dataSource }; + }, +]; + +async function rebuildDataSources( + serializedDataSources: DataSourceType[], + fileIDToFile: Record +) { + const dataSourceCache: Record = {}; + const byId: Record = {}; + const leaves = new Set(); + + serializedDataSources.forEach((serializedSrc) => { + byId[serializedSrc.id] = serializedSrc; + leaves.add(serializedSrc.id); + }); + + // serializedDataSources should be topologically ordered by + // ancestors first and descendants last + for (let i = 0; i < serializedDataSources.length; i++) { + const serializedSrc = serializedDataSources[i]; + + if (serializedSrc.id in dataSourceCache) { + // eslint-disable-next-line no-continue + continue; } - // downloadUrl returns the fully resolved data source. - // We call done here since we've resolved the UriSource - // and no more processing is needed. - return done({ - dataSource: result.data[0].dataSource, - }); - } - return dataSource; -}; + let dataSource: DataSource = {}; + + if (serializedSrc.fileSrc) { + dataSource.fileSrc = { + file: fileIDToFile[serializedSrc.fileSrc.fileId], + fileType: serializedSrc.fileSrc.fileType, + }; + } -const processParentIfNoFile: ImportHandler = async ( - dataSource, - { execute } -) => { - const { fileSrc, parent } = dataSource; - if (!fileSrc && parent) { - const result = await execute(parent); - if (!result.ok) { - throw new Error('Could not process parent', { - cause: ensureError(result.errors[0].cause), + if (serializedSrc.archiveSrc) { + dataSource.archiveSrc = serializedSrc.archiveSrc; + } + + if (serializedSrc.uriSrc) { + dataSource.uriSrc = serializedSrc.uriSrc; + } + + if (serializedSrc.collectionSrc) { + serializedSrc.collectionSrc.sources.forEach((id) => { + leaves.delete(id); }); + dataSource.collectionSrc = { + sources: serializedSrc.collectionSrc.sources.map( + (id) => dataSourceCache[id] + ), + }; } - // update the parent - return { - ...dataSource, - parent: result.data[0].dataSource, - }; - } - return dataSource; -}; -const resolveArchiveMember: ImportHandler = async ( - dataSource, - { extra, done } -) => { - if (dataSource.archiveSrc) { - const pipeline = new Pipeline([ - extractArchiveTarget, - updateFileMimeType, - doneWithDataSource, - ]); - const result = await pipeline.execute(dataSource, extra); - if (!result.ok) { - throw result.errors[0].cause; + if (serializedSrc.parent) { + dataSource.parent = dataSourceCache[serializedSrc.parent]; + leaves.delete(serializedSrc.parent); + } + + let stillResolving = true; + while (stillResolving) { + // eslint-disable-next-line no-await-in-loop + const result = await evaluateChain(dataSource, resolvingHandlers); + + stillResolving = result.type !== 'resolved'; + if (!stillResolving) break; + + if (result.type !== 'intermediate') { + throw new Error( + 'Resolving pipeline does not produce intermediate results!' + ); + } + + dataSource = result.dataSources[0]; } - // extractArchiveTarget returns the fully resolved data source. - return done({ - dataSource: result.data[0].dataSource, - }); + + dataSourceCache[serializedSrc.id] = dataSource; } - return dataSource; -}; -function getDataSourcesForDataset( - dataset: Dataset, - manifest: Manifest, - stateFileContents: FileEntry[] -) { - const inStateFile = stateFileContents - .filter( - (entry) => - path.normalize(entry.archivePath) === path.normalize(dataset.path) - ) - .map((entry) => fileToDataSource(entry.file)); - const remotes = manifest.remoteFiles[dataset.id] ?? []; - return [...inStateFile, ...remotes]; + return { dataSourceCache, leaves }; } async function restoreDatasets( manifest: Manifest, datasetFiles: FileEntry[], - { extra, execute }: PipelineContext + context?: ImportContext ) { - const archiveCache = new Map>(); - - // normalize archive paths for comparison - const stateDatasetFiles = datasetFiles.map((datasetFile) => { - return { - ...datasetFile, - archivePath: path.normalize(datasetFile.archivePath), - }; - }); + const { datasets, dataSources, datasetFilePath } = manifest; + const dataSourceIDToStateID = datasets.reduce>( + (acc, ds) => + Object.assign(acc, { + [ds.dataSourceId]: ds.id, + }), + {} + ); + const pathToFile = datasetFiles.reduce>( + (acc, datasetFile) => + Object.assign(acc, { + [datasetFile.archivePath]: datasetFile.file, + }), + {} + ); + const fileIDToFile = Object.entries(datasetFilePath).reduce< + Record + >( + (acc, [fileId, filePath]) => + Object.assign(acc, { + [fileId]: pathToFile[filePath], + }), + {} + ); - const { datasets } = manifest; - // Mapping of the state file ID => new store ID - const stateIDToStoreID: Record = {}; + const { dataSourceCache, leaves } = await rebuildDataSources( + dataSources, + fileIDToFile + ); - // This pipeline resolves data sources that have remote provenance. - const resolvePipeline = new Pipeline([ - updateFileMimeType, - resolveUriSource, - // process parent after resolving the uri source, so we don't - // unnecessarily download ancestor UriSources. - processParentIfNoFile, - resolveArchiveMember, - doneWithDataSource, - ]); + const stateIDToStoreID: Record = {}; await Promise.all( - datasets.map(async (dataset) => { - let datasetDataSources = getDataSourcesForDataset( - dataset, - manifest, - stateDatasetFiles - ); - - // resolve any remote data sources or archive members - datasetDataSources = await Promise.all( - datasetDataSources.map(async (source) => { - const result = await resolvePipeline.execute(source, { - ...extra, - archiveCache, - }); - if (!result.ok) { - throw result.errors[0].cause; - } - return result.data[0].dataSource; - }) - ); - - // do the import - const dicomSources: FileDataSource[] = []; - const importResults = await Promise.all( - datasetDataSources.map((source) => - execute(source, { - ...extra, - archiveCache, - dicomDataSources: dicomSources, - }) - ) - ); - - if (dicomSources.length) { - const dicomStore = useDICOMStore(); - const volumeKeys = await dicomStore.importFiles(dicomSources); - if (volumeKeys.length !== 1) { - throw new Error('Obtained more than one volume from DICOM import'); - } - - const [key] = volumeKeys; - // generate imageID so rulers and labelmaps can use stateIDToStoreID to setup there internal imageStore imageID references - await dicomStore.buildVolume(key); - stateIDToStoreID[dataset.id] = key; - } else if (importResults.length === 1) { - if (!importResults[0].ok) { - throw importResults[0].errors[0].cause; - } - - const [result] = importResults; - if (result.data.length !== 1) { - throw new Error( - 'Import encountered multiple volumes for a single dataset' - ); - } - - const importResult = result.data[0]; - if (!isLoadableResult(importResult)) { - throw new Error('Failed to import dataset'); - } - - stateIDToStoreID[dataset.id] = importResult.dataID; - } else { - throw new Error('Could not load any data from the session'); - } + [...leaves].map(async (leafId) => { + const dataSource = dataSourceCache[leafId]; + const importResult = + (await context?.importDataSources?.([dataSource])) ?? []; + const [result] = importResult; + if (result?.type !== 'data' || importResult.length !== 1) + throw new Error('Expected a single dataset'); + + stateIDToStoreID[dataSourceIDToStateID[leafId]] = result.dataID; }) ); return stateIDToStoreID; } -const restoreStateFile: ImportHandler = async (dataSource, pipelineContext) => { +const restoreStateFile: ImportHandler = async (dataSource, context) => { const { fileSrc } = dataSource; if (fileSrc && (await isStateFile(fileSrc.file))) { const stateFileContents = await extractFilesFromZip(fileSrc.file); @@ -364,7 +340,7 @@ const restoreStateFile: ImportHandler = async (dataSource, pipelineContext) => { const stateIDToStoreID = await restoreDatasets( manifest, restOfStateFile, - pipelineContext + context ); // Restore the primary selection @@ -389,9 +365,9 @@ const restoreStateFile: ImportHandler = async (dataSource, pipelineContext) => { useLayersStore().deserialize(manifest, stateIDToStoreID); - return pipelineContext.done(); + return asOkayResult(dataSource); } - return dataSource; + return Skip; }; export default restoreStateFile; diff --git a/src/io/import/processors/updateFileMimeType.ts b/src/io/import/processors/updateFileMimeType.ts index b805df8c4..06238781c 100644 --- a/src/io/import/processors/updateFileMimeType.ts +++ b/src/io/import/processors/updateFileMimeType.ts @@ -1,26 +1,29 @@ +import { Skip } from '@/src/utils/evaluateChain'; import { getFileMimeType } from '@/src/io'; -import { ImportHandler } from '@/src/io/import/common'; +import { ImportHandler, asIntermediateResult } from '@/src/io/import/common'; /** * Transforms a file data source to have a mime type * @param dataSource */ const updateFileMimeType: ImportHandler = async (dataSource) => { - let src = dataSource; - const { fileSrc } = src; - if (fileSrc && fileSrc.fileType === '') { - const mime = await getFileMimeType(fileSrc.file); - if (mime) { - src = { - ...src, - fileSrc: { - ...fileSrc, - fileType: mime, - }, - }; - } + const { fileSrc } = dataSource; + if (!fileSrc || fileSrc.fileType !== '') return Skip; + + const mime = await getFileMimeType(fileSrc.file); + if (!mime) { + throw new Error('File is unsupported'); } - return src; + + return asIntermediateResult([ + { + ...dataSource, + fileSrc: { + ...fileSrc, + fileType: mime, + }, + }, + ]); }; export default updateFileMimeType; diff --git a/src/io/import/processors/updateUriType.ts b/src/io/import/processors/updateUriType.ts index b6aa5169f..e79b94acb 100644 --- a/src/io/import/processors/updateUriType.ts +++ b/src/io/import/processors/updateUriType.ts @@ -1,5 +1,6 @@ +import { Skip } from '@/src/utils/evaluateChain'; import StreamingByteReader from '@/src/core/streaming/streamingByteReader'; -import { ImportHandler } from '@/src/io/import/common'; +import { ImportHandler, asIntermediateResult } from '@/src/io/import/common'; import { getFileMimeFromMagicStream } from '@/src/io/magic'; import { asCoroutine } from '@/src/utils'; @@ -37,7 +38,11 @@ function detectStreamType(stream: ReadableStream) { const updateUriType: ImportHandler = async (dataSource) => { const { fileSrc, uriSrc } = dataSource; if (fileSrc || !uriSrc?.fetcher) { - return dataSource; + return Skip; + } + + if (uriSrc.mime !== undefined) { + return Skip; } const { fetcher } = uriSrc; @@ -54,7 +59,7 @@ const updateUriType: ImportHandler = async (dataSource) => { }, }; - return streamDataSource; + return asIntermediateResult([streamDataSource]); }; export default updateUriType; diff --git a/src/io/state-file/index.ts b/src/io/state-file/index.ts index 8a7bcd05d..c91225a1f 100644 --- a/src/io/state-file/index.ts +++ b/src/io/state-file/index.ts @@ -25,7 +25,8 @@ export async function serialize() { const manifest: Manifest = { version: MANIFEST_VERSION, datasets: [], - remoteFiles: {}, + dataSources: [], + datasetFilePath: {}, labelMaps: [], tools: { crosshairs: { diff --git a/src/io/state-file/schema.ts b/src/io/state-file/schema.ts index 65a8cbc90..4d8e15af3 100644 --- a/src/io/state-file/schema.ts +++ b/src/io/state-file/schema.ts @@ -38,13 +38,6 @@ import type { } from '../../types/views'; import { WLAutoRanges } from '../../constants'; -export enum DatasetType { - DICOM = 'dicom', - IMAGE = 'image', -} - -const DatasetTypeNative = z.nativeEnum(DatasetType); - const LPSAxisDir = z.union([ z.literal('Left'), z.literal('Right'), @@ -54,12 +47,40 @@ const LPSAxisDir = z.union([ z.literal('Inferior'), ]); +const FileSource = z.object({ + fileId: z.number(), + fileType: z.string(), +}); + +const UriSource = z.object({ + uri: z.string(), + name: z.string(), + mime: z.string().optional(), +}); + +const ArchiveSource = z.object({ + path: z.string(), +}); + +const CollectionSource = z.object({ + sources: z.number().array(), +}); + +const DataSource = z.object({ + id: z.number(), + parent: z.number().optional(), + fileSrc: FileSource.optional(), + uriSrc: UriSource.optional(), + archiveSrc: ArchiveSource.optional(), + collectionSrc: CollectionSource.optional(), +}); + +export type DataSourceType = z.infer; + const Dataset = z.object({ id: z.string(), - path: z.string(), - type: DatasetTypeNative, + dataSourceId: z.number(), }); -export type Dataset = z.infer; const baseRemoteFileSchema = z.object({ archiveSrc: z.object({ path: z.string() }).optional(), @@ -349,7 +370,8 @@ export type ParentToLayers = z.infer; export const ManifestSchema = z.object({ version: z.string(), datasets: Dataset.array(), - remoteFiles: z.record(RemoteFile.array()), + dataSources: DataSource.array(), + datasetFilePath: z.record(z.string()), labelMaps: LabelMap.array(), tools: Tools, views: View.array(), diff --git a/src/io/state-file/utils.ts b/src/io/state-file/utils.ts deleted file mode 100644 index a1edb69b2..000000000 --- a/src/io/state-file/utils.ts +++ /dev/null @@ -1,44 +0,0 @@ -import { partition } from '@/src/utils'; -import { - isRemoteDataSource, - serializeDataSource, -} from '@/src/io/import/dataSource'; -import { StateFile, DatasetType } from './schema'; -import { useFileStore } from '../../store/datasets-files'; - -export async function serializeData( - stateFile: StateFile, - dataIDs: string[], - dataType: DatasetType -) { - const fileStore = useFileStore(); - const { zip } = stateFile; - const { - manifest: { datasets, remoteFiles }, - } = stateFile; - - dataIDs.forEach((id) => { - const sources = fileStore.getDataSources(id); - if (!sources.length) { - throw new Error(`No files for dataID: ${id}`); - } - - const [remotes, toZip] = partition(isRemoteDataSource, sources); - - remoteFiles[id] = remotes.map(serializeDataSource); - - const dataPath = `data/${id}/`; - - toZip.forEach((ds) => { - const { file } = ds.fileSrc; - const filePath = `${dataPath}/${file.name}`; - zip.file(filePath, file); - }); - - datasets.push({ - id, - path: dataPath, - type: dataType, - }); - }); -} diff --git a/src/io/zip.ts b/src/io/zip.ts index 73f988eed..c25e5c975 100644 --- a/src/io/zip.ts +++ b/src/io/zip.ts @@ -24,8 +24,23 @@ export async function extractFilesFromZip(zipFile: File): Promise { return files.map((file, index) => { return { file, - archivePath: paths[index], + archivePath: `${paths[index]}/${file.name}`, }; }); }); } + +export async function extractFileFromZip( + zipFile: File, + filePath: string +): Promise { + const zip = await JSZip.loadAsync(zipFile); + const zippedFile = zip.file(filePath); + + if (!zippedFile) + throw new Error(`File ${filePath} does not exist in the zip file`); + if (zippedFile.dir) throw new Error(`Given file path is a directory`); + + const blob = await zippedFile.async('blob'); + return new File([blob], basename(zippedFile.name)); +} diff --git a/src/store/datasets-dicom.ts b/src/store/datasets-dicom.ts index 7d3fcbb5b..3dc1de626 100644 --- a/src/store/datasets-dicom.ts +++ b/src/store/datasets-dicom.ts @@ -7,8 +7,6 @@ import * as DICOM from '@/src/io/dicom'; import { identity, pick, removeFromArray } from '../utils'; import { useImageStore } from './datasets-images'; import { useFileStore } from './datasets-files'; -import { StateFile, DatasetType } from '../io/state-file/schema'; -import { serializeData } from '../io/state-file/utils'; export const ANONYMOUS_PATIENT = 'Anonymous'; export const ANONYMOUS_PATIENT_ID = 'ANONYMOUS'; @@ -321,22 +319,6 @@ export const useDICOMStore = defineStore('dicom', { } }, - async serialize(stateFile: StateFile) { - const dataIDs = Object.keys(this.volumeInfo); - await serializeData(stateFile, dataIDs, DatasetType.DICOM); - }, - - async deserialize(files: FileDataSource[]) { - return this.importFiles(files).then((volumeKeys) => { - if (volumeKeys.length !== 1) { - // Volumes are store individually so we should get one back. - throw new Error('Invalid state file.'); - } - - return volumeKeys[0]; - }); - }, - // returns an ITK image object async getVolumeSlice( volumeKey: string, diff --git a/src/store/datasets-images.ts b/src/store/datasets-images.ts index 817a7c067..034fd8487 100644 --- a/src/store/datasets-images.ts +++ b/src/store/datasets-images.ts @@ -6,9 +6,6 @@ import type { Bounds } from '@kitware/vtk.js/types'; import { useIdStore } from '@/src/store/id'; import { defaultLPSDirections, getLPSDirections } from '../utils/lps'; import { removeFromArray } from '../utils'; -import { StateFile, DatasetType } from '../io/state-file/schema'; -import { serializeData } from '../io/state-file/utils'; -import { useFileStore } from './datasets-files'; import { ImageMetadata } from '../types/image'; export const defaultImageMetadata = () => ({ @@ -78,14 +75,5 @@ export const useImageStore = defineStore('images', { removeFromArray(this.idList, id); } }, - - async serialize(stateFile: StateFile) { - const fileStore = useFileStore(); - // We want to filter out volume images (which are generated and don't have - // input files in fileStore with matching imageID.) - const dataIDs = this.idList.filter((id) => id in fileStore.byDataID); - - await serializeData(stateFile, dataIDs, DatasetType.IMAGE); - }, }, }); diff --git a/src/store/datasets.ts b/src/store/datasets.ts index 0c7ef8fa3..d9c6e8591 100644 --- a/src/store/datasets.ts +++ b/src/store/datasets.ts @@ -1,15 +1,15 @@ import vtkImageData from '@kitware/vtk.js/Common/DataModel/ImageData'; import { defineStore } from 'pinia'; -import { computed, ref } from 'vue'; +import { computed, ref, shallowRef } from 'vue'; import { isDicomImage, isRegularImage, type DataSelection, } from '@/src/utils/dataSelection'; +import { DataSource } from '@/src/io/import/dataSource'; import { useDICOMStore } from './datasets-dicom'; import { useImageStore } from './datasets-images'; -import { useFileStore } from './datasets-files'; -import { StateFile } from '../io/state-file/schema'; +import * as Schema from '../io/state-file/schema'; import { useLayersStore } from './datasets-layers'; export const DataType = { @@ -17,15 +17,90 @@ export const DataType = { Model: 'Model', }; +interface LoadedData { + dataID: string; + dataSource: DataSource; +} + +function createIdGenerator() { + let nextId = 1; + return () => nextId++; +} + +function serializeLoadedData(loadedDataSources: Array) { + const nextId = createIdGenerator(); + const dataSourceToId = new Map(); + // topologically ordered ancestor -> descendant + const serializedDependencies: Array = []; + const dataIDToDataSourceID: Record = {}; + const files: Record = {}; + + function serializeDataSource(ds: DataSource): number { + if (dataSourceToId.has(ds)) { + return dataSourceToId.get(ds)!; + } + + const serialized: Schema.DataSourceType = { id: nextId() }; + dataSourceToId.set(ds, serialized.id); + + if (ds.fileSrc) { + if (ds.archiveSrc || ds.uriSrc) { + // fileSrc is constructed from either an archive or uri + delete serialized.fileSrc; + } else { + const fileId = nextId(); + serialized.fileSrc = { fileId, fileType: ds.fileSrc.fileType }; + files[fileId] = ds.fileSrc.file; + } + } + + if (ds.archiveSrc) { + serialized.archiveSrc = ds.archiveSrc; + } + + if (ds.uriSrc) { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { fetcher, ...rest } = ds.uriSrc; + serialized.uriSrc = rest; + } + + if (ds.collectionSrc) { + serialized.collectionSrc = { + sources: ds.collectionSrc.sources.map((s) => serializeDataSource(s)), + }; + } + + const shouldSerializeParent = !!ds.archiveSrc; + + if (shouldSerializeParent && ds.parent) { + serialized.parent = serializeDataSource(ds.parent); + } + + serializedDependencies.push(serialized); + return serialized.id; + } + + loadedDataSources.forEach(({ dataID, dataSource }) => { + const id = serializeDataSource(dataSource); + dataIDToDataSourceID[dataID] = id; + }); + + return { + serializedDependencies, + dataIDToDataSourceID, + files, + }; +} + export const useDatasetStore = defineStore('dataset', () => { const imageStore = useImageStore(); const dicomStore = useDICOMStore(); - const fileStore = useFileStore(); const layersStore = useLayersStore(); // --- state --- // const primarySelection = ref(null); + const loadedData = shallowRef>([]); // --- getters --- // @@ -48,12 +123,28 @@ export const useDatasetStore = defineStore('dataset', () => { primarySelection.value = sel; } - async function serialize(stateFile: StateFile) { - await dicomStore.serialize(stateFile); - await imageStore.serialize(stateFile); + async function serialize(stateFile: Schema.StateFile) { + const { manifest, zip } = stateFile; + + const { serializedDependencies, dataIDToDataSourceID, files } = + serializeLoadedData(loadedData.value); + + // save datasets and data sources + manifest.datasets = loadedData.value.map(({ dataID }) => ({ + id: dataID, + dataSourceId: dataIDToDataSourceID[dataID], + })); + manifest.dataSources = serializedDependencies; + + // add any locally loaded files + manifest.datasetFilePath = {}; + Object.entries(files).forEach(([fileId, file]) => { + const filePath = `data/${fileId}/${file.name}`; + zip.file(filePath, file); + manifest.datasetFilePath[fileId] = filePath; + }); if (primarySelection.value) { - const { manifest } = stateFile; manifest.primarySelection = primarySelection.value; } } @@ -68,15 +159,19 @@ export const useDatasetStore = defineStore('dataset', () => { } imageStore.deleteData(id); - fileStore.remove(id); layersStore.remove(id); }; + function addDataSources(sources: Array) { + loadedData.value.push(...sources); + } + return { primaryImageID, primarySelection, primaryDataset, idsAsSelections, + addDataSources, setPrimarySelection, serialize, remove, diff --git a/src/store/segmentGroups.ts b/src/store/segmentGroups.ts index 53e928bdd..599cdea4d 100644 --- a/src/store/segmentGroups.ts +++ b/src/store/segmentGroups.ts @@ -5,7 +5,7 @@ import vtkBoundingBox from '@kitware/vtk.js/Common/DataModel/BoundingBox'; import type { RGBAColor } from '@kitware/vtk.js/types'; import { defineStore } from 'pinia'; import { useImageStore } from '@/src/store/datasets-images'; -import { join, normalize } from '@/src/utils/path'; +import { normalize } from '@/src/utils/path'; import { useIdStore } from '@/src/store/id'; import { onImageDeleted } from '@/src/composables/onImageDeleted'; import { normalizeForStore, removeFromArray } from '@/src/utils'; @@ -440,11 +440,7 @@ export const useSegmentGroupStore = defineStore('segmentGroup', () => { const newLabelmapIDs = await Promise.all( labelMaps.map(async (labelMap) => { const [file] = stateFiles - .filter( - (entry) => - join(entry.archivePath, entry.file.name) === - normalize(labelMap.path) - ) + .filter((entry) => entry.archivePath === normalize(labelMap.path)) .map((entry) => entry.file); const vtkImage = await readImage(file); diff --git a/src/utils/__tests__/asyncSelect.spec.ts b/src/utils/__tests__/asyncSelect.spec.ts new file mode 100644 index 000000000..5d25df96b --- /dev/null +++ b/src/utils/__tests__/asyncSelect.spec.ts @@ -0,0 +1,41 @@ +import { asyncSelect } from '@/src/utils/asyncSelect'; +import { it, describe } from 'vitest'; +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; + +chai.use(chaiAsPromised); + +function sleep(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +describe('asyncSelect', () => { + it('should act similar to Promise.race()', async () => { + const promises = [sleep(11), sleep(1), sleep(111)]; + const { promise, index } = await asyncSelect(promises); + expect(promise).to.equal(promises[1]); + expect(index).to.equal(1); + }); + + it('should return the rest of the unselected promises', async () => { + const promises = [sleep(1), sleep(11), sleep(111)]; + const { rest } = await asyncSelect(promises); + expect(rest).to.deep.equal(promises.slice(1)); + }); + + it('should handle rejected promises', async () => { + const promises = [ + sleep(11), + sleep(1), + sleep(111), + new Promise((resolve, reject) => { + reject(new Error('Error')); + }), + ]; + const { promise, index } = await asyncSelect(promises); + expect(promise).to.be.rejected; + expect(index).to.equal(3); + }); +}); diff --git a/src/utils/__tests__/evaluateChain.spec.ts b/src/utils/__tests__/evaluateChain.spec.ts new file mode 100644 index 000000000..27c5493ae --- /dev/null +++ b/src/utils/__tests__/evaluateChain.spec.ts @@ -0,0 +1,52 @@ +import { describe, it } from 'vitest'; +import sinonChai from 'sinon-chai'; +import chaiAsPromised from 'chai-as-promised'; +import Chai, { expect } from 'chai'; +import { ChainHandler, Skip, evaluateChain } from '@/src/utils/evaluateChain'; + +Chai.use(chaiAsPromised); +Chai.use(sinonChai); + +function delayedMul(a: number, b: number) { + return new Promise((resolve) => { + setTimeout(() => { + resolve(a * b); + }, 10); + }); +} + +describe('evaluateChain', () => { + it('should evaluate a chain of sync handlers', () => { + const chain: Array> = [ + (n) => (n < 5 ? n * 2 : Skip), + (n) => (n < 10 ? n * 4 : Skip), + (n) => (n < 15 ? n * 8 : Skip), + ]; + + expect(evaluateChain(3, chain)).to.eventually.equal(6); + expect(evaluateChain(8, chain)).to.eventually.equal(32); + expect(evaluateChain(11, chain)).to.eventually.equal(88); + }); + + it('should evaluate a chain of async handlers', () => { + const chain: Array> = [ + (n) => (n < 5 ? delayedMul(n, 2) : Skip), + (n) => (n < 10 ? delayedMul(n, 4) : Skip), + (n) => (n < 15 ? delayedMul(n, 8) : Skip), + ]; + + expect(evaluateChain(3, chain)).to.eventually.equal(6); + expect(evaluateChain(8, chain)).to.eventually.equal(32); + expect(evaluateChain(11, chain)).to.eventually.equal(88); + }); + + it('should throw if all handlers skip', () => { + const chain: Array> = [ + (n) => (n < 5 ? delayedMul(n, 2) : Skip), + (n) => (n < 10 ? delayedMul(n, 4) : Skip), + (n) => (n < 15 ? delayedMul(n, 8) : Skip), + ]; + + expect(evaluateChain(20, chain)).to.eventually.be.rejected; + }); +}); diff --git a/src/utils/asyncSelect.ts b/src/utils/asyncSelect.ts new file mode 100644 index 000000000..e044e8555 --- /dev/null +++ b/src/utils/asyncSelect.ts @@ -0,0 +1,23 @@ +/** + * The same as Promise.race(), but returns richer promise information. + * + * Return object structure: + * - promise: the settled promise + * - index: the index of the settled promise + * - rest: the rest of the unselected promises + * @param promises + * @returns + */ +export function asyncSelect(promises: Promise[]) { + return Promise.race( + promises.map((p, i) => { + const info = { promise: p, index: i }; + return p.catch(() => {}).then(() => info); + }) + ).then(({ promise, index }) => { + const rest = [...promises]; + rest.splice(index, 1); + + return { promise, index, rest }; + }); +} diff --git a/src/utils/evaluateChain.ts b/src/utils/evaluateChain.ts new file mode 100644 index 000000000..e195d8fb3 --- /dev/null +++ b/src/utils/evaluateChain.ts @@ -0,0 +1,26 @@ +import { Awaitable } from '@vueuse/core'; + +export const Skip = Symbol('Chain:Skip'); + +export type ChainHandler = ( + input: Input, + context?: Context +) => Awaitable; + +export async function evaluateChain( + data: Input, + handlers: Array>, + context?: Context +) { + /* eslint-disable no-await-in-loop */ + for (let i = 0; i < handlers.length; i++) { + const handler = handlers[i]; + const response = await handler(data, context); + if (response !== Skip) { + return response; + } + } + /* eslint-enable no-await-in-loop */ + + throw new Error('Unhandled request'); +}