From 29ff0744e23e3a7610e90623f5eba0003b7c9815 Mon Sep 17 00:00:00 2001 From: Forrest Date: Fri, 5 Jan 2024 13:16:46 -0500 Subject: [PATCH 01/20] refactor(dataSource): rename to FileDataSource Fix an incorrect impl of the PartialWithRequired type --- src/io/import/dataSource.ts | 7 +++++-- src/io/import/processors/importSingleFile.ts | 6 +++--- src/io/import/processors/restoreStateFile.ts | 4 ++-- src/store/datasets-dicom.ts | 6 +++--- src/store/datasets-files.ts | 6 +++--- src/types/index.ts | 8 +++++++- 6 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/io/import/dataSource.ts b/src/io/import/dataSource.ts index 973e845f4..df07bb8b4 100644 --- a/src/io/import/dataSource.ts +++ b/src/io/import/dataSource.ts @@ -1,4 +1,4 @@ -import { Maybe } from '@/src/types'; +import { Maybe, PartialWithRequired } from '@/src/types'; /** * Represents a URI source with a file name for the downloaded resource. @@ -62,7 +62,10 @@ export interface DataSource { parent?: DataSource; } -export type DataSourceWithFile = DataSource & { fileSrc: FileSource }; +/** + * A data source that has a File. + */ +export type FileDataSource = PartialWithRequired; /** * Creates a DataSource from a single file. diff --git a/src/io/import/processors/importSingleFile.ts b/src/io/import/processors/importSingleFile.ts index f64de81c9..84d408714 100644 --- a/src/io/import/processors/importSingleFile.ts +++ b/src/io/import/processors/importSingleFile.ts @@ -5,7 +5,7 @@ import { useImageStore } from '@/src/store/datasets-images'; import { useModelStore } from '@/src/store/datasets-models'; import { FILE_READERS } from '@/src/io'; import { ImportHandler } from '@/src/io/import/common'; -import { DataSourceWithFile } from '@/src/io/import/dataSource'; +import { FileDataSource } from '@/src/io/import/dataSource'; import { useDatasetStore } from '@/src/store/datasets'; import { useMessageStore } from '@/src/store/messages'; @@ -34,7 +34,7 @@ const importSingleFile: ImportHandler = async (dataSource, { done }) => { fileSrc.file.name, dataObject as vtkImageData ); - fileStore.add(dataID, [dataSource as DataSourceWithFile]); + fileStore.add(dataID, [dataSource as FileDataSource]); return done({ dataID, @@ -53,7 +53,7 @@ const importSingleFile: ImportHandler = async (dataSource, { done }) => { fileSrc.file.name, dataObject as vtkPolyData ); - fileStore.add(dataID, [dataSource as DataSourceWithFile]); + fileStore.add(dataID, [dataSource as FileDataSource]); return done({ dataID, diff --git a/src/io/import/processors/restoreStateFile.ts b/src/io/import/processors/restoreStateFile.ts index 2704086e1..ec446bde6 100644 --- a/src/io/import/processors/restoreStateFile.ts +++ b/src/io/import/processors/restoreStateFile.ts @@ -10,7 +10,7 @@ import { } from '@/src/io/import/common'; import { DataSource, - DataSourceWithFile, + FileDataSource, fileToDataSource, } from '@/src/io/import/dataSource'; import { MANIFEST, isStateFile } from '@/src/io/state-file'; @@ -290,7 +290,7 @@ async function restoreDatasets( ); // do the import - const dicomSources: DataSourceWithFile[] = []; + const dicomSources: FileDataSource[] = []; const importResults = await Promise.all( datasetDataSources.map((source) => execute(source, { diff --git a/src/store/datasets-dicom.ts b/src/store/datasets-dicom.ts index 41da34784..7d3fcbb5b 100644 --- a/src/store/datasets-dicom.ts +++ b/src/store/datasets-dicom.ts @@ -2,7 +2,7 @@ import vtkITKHelper from '@kitware/vtk.js/Common/DataModel/ITKHelper'; import vtkImageData from '@kitware/vtk.js/Common/DataModel/ImageData'; import { defineStore } from 'pinia'; import { Image } from 'itk-wasm'; -import { DataSourceWithFile } from '@/src/io/import/dataSource'; +import { FileDataSource } from '@/src/io/import/dataSource'; import * as DICOM from '@/src/io/dicom'; import { identity, pick, removeFromArray } from '../utils'; import { useImageStore } from './datasets-images'; @@ -167,7 +167,7 @@ export const useDICOMStore = defineStore('dicom', { needsRebuild: {}, }), actions: { - async importFiles(datasets: DataSourceWithFile[]) { + async importFiles(datasets: FileDataSource[]) { if (!datasets.length) return []; const fileToDataSource = new Map( @@ -326,7 +326,7 @@ export const useDICOMStore = defineStore('dicom', { await serializeData(stateFile, dataIDs, DatasetType.DICOM); }, - async deserialize(files: DataSourceWithFile[]) { + async deserialize(files: FileDataSource[]) { return this.importFiles(files).then((volumeKeys) => { if (volumeKeys.length !== 1) { // Volumes are store individually so we should get one back. diff --git a/src/store/datasets-files.ts b/src/store/datasets-files.ts index fb68fa9c0..ea400351a 100644 --- a/src/store/datasets-files.ts +++ b/src/store/datasets-files.ts @@ -1,8 +1,8 @@ import { defineStore } from 'pinia'; -import { DataSourceWithFile } from '@/src/io/import/dataSource'; +import { FileDataSource } from '@/src/io/import/dataSource'; interface State { - byDataID: Record; + byDataID: Record; } /** @@ -29,7 +29,7 @@ export const useFileStore = defineStore('files', { } }, - add(dataID: string, files: DataSourceWithFile[]) { + add(dataID: string, files: FileDataSource[]) { this.byDataID[dataID] = files; }, }, diff --git a/src/types/index.ts b/src/types/index.ts index 9cf7311eb..22d83978c 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -20,10 +20,16 @@ export type SampleDataset = { }; }; +/** + * Sets certain keys to be required and the rest to optional. + */ export type RequiredWithPartial = Required> & Partial>; -export type PartialWithRequired = Pick & +/** + * Sets certain keys to be optional and the rest to required. + */ +export type PartialWithRequired = Required> & Partial>; export type Optional = Pick, K> & Omit; From 163290a435f7c8ed6fcc922542e99d54c0bfeed8 Mon Sep 17 00:00:00 2001 From: Forrest Date: Fri, 5 Jan 2024 14:19:40 -0500 Subject: [PATCH 02/20] feat(streaming): open URIs as streams - Detect mimetype of URI streams - Download any unhandled streams as files - Add onCleanup() hook to pipelines --- src/core/pipeline.ts | 17 ++ .../streaming/__tests__/byteDeque.spec.ts | 79 ++++++++ .../streaming/__tests__/requestPool.spec.ts | 52 ++++++ .../__tests__/resumableFetcher.spec.ts | 53 ++++++ .../__tests__/streamingByteReader.spec.ts | 55 ++++++ src/core/streaming/byteDeque.ts | 163 +++++++++++++++++ src/core/streaming/concatStreams.ts | 37 ++++ src/core/streaming/httpCodes.ts | 5 + src/core/streaming/requestPool.ts | 122 +++++++++++++ src/core/streaming/resumableFetcher.ts | 172 ++++++++++++++++++ src/core/streaming/streamingByteReader.ts | 121 ++++++++++++ src/core/streaming/types.ts | 20 ++ src/io/import/dataSource.ts | 31 +++- src/io/import/importDataSources.ts | 29 ++- src/io/import/processors/downloadStream.ts | 47 +++++ src/io/import/processors/openUriStream.ts | 36 ++++ src/io/import/processors/updateUriType.ts | 54 ++++++ src/io/magic.ts | 41 ++++- src/utils/index.ts | 52 +++++- 19 files changed, 1162 insertions(+), 24 deletions(-) create mode 100644 src/core/streaming/__tests__/byteDeque.spec.ts create mode 100644 src/core/streaming/__tests__/requestPool.spec.ts create mode 100644 src/core/streaming/__tests__/resumableFetcher.spec.ts create mode 100644 src/core/streaming/__tests__/streamingByteReader.spec.ts create mode 100644 src/core/streaming/byteDeque.ts create mode 100644 src/core/streaming/concatStreams.ts create mode 100644 src/core/streaming/httpCodes.ts create mode 100644 src/core/streaming/requestPool.ts create mode 100644 src/core/streaming/resumableFetcher.ts create mode 100644 src/core/streaming/streamingByteReader.ts create mode 100644 src/core/streaming/types.ts create mode 100644 src/io/import/processors/downloadStream.ts create mode 100644 src/io/import/processors/openUriStream.ts create mode 100644 src/io/import/processors/updateUriType.ts diff --git a/src/core/pipeline.ts b/src/core/pipeline.ts index 9771ffb6e..a6cc2e0b1 100644 --- a/src/core/pipeline.ts +++ b/src/core/pipeline.ts @@ -84,6 +84,11 @@ export interface PipelineContext { input: DataType, extra?: ExtraContext ): Promise>; + /** + * Register cleanup code + * @param callback + */ + onCleanup(callback: Function): void; /** * Any extra user-supplied data. */ @@ -179,8 +184,17 @@ export default class Pipeline< Promise> > = []; const execution = defer>(); + const cleanupCallbacks: Function[] = []; const terminate = (result: Maybe, error?: Error) => { + cleanupCallbacks.forEach((callback) => { + try { + callback(); + } catch (e) { + console.error(e); + } + }); + if (error) { execution.reject(error); } else { @@ -209,6 +223,9 @@ export default class Pipeline< nestedExecutions.push(promise); return promise; }, + onCleanup: (callback: Function) => { + cleanupCallbacks.push(callback); + }, extra: extraContext, }; diff --git a/src/core/streaming/__tests__/byteDeque.spec.ts b/src/core/streaming/__tests__/byteDeque.spec.ts new file mode 100644 index 000000000..70d8777a1 --- /dev/null +++ b/src/core/streaming/__tests__/byteDeque.spec.ts @@ -0,0 +1,79 @@ +import ByteDeque from '@/src/core/streaming/byteDeque'; +import { describe, it, expect } from 'vitest'; + +describe('ByteDeque', () => { + it('pushStart/pushEnd() should work', () => { + const dq = new ByteDeque(); + + dq.pushEnd(new Uint8Array([4, 5, 6])); + expect(dq.size).to.equal(3); + + dq.pushEnd(new Uint8Array([7, 8, 9])); + expect(dq.size).to.equal(6); + + dq.pushStart(new Uint8Array([1, 2, 3])); + expect(dq.size).to.equal(9); + + const data = dq.popAll(); + expect(data).to.eql(new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8, 9])); + }); + + it('popStart() should work', () => { + const dq = new ByteDeque(); + + dq.pushEnd(new Uint8Array([1, 2, 3, 4, 5])); + dq.pushEnd(new Uint8Array([6, 7, 8, 9])); + + let data = dq.popStart(7); + expect(data).to.eql(new Uint8Array([1, 2, 3, 4, 5, 6, 7])); + + data = dq.popStart(2); + expect(data).to.eql(new Uint8Array([8, 9])); + }); + + it('popEnd() should work', () => { + const dq = new ByteDeque(); + + dq.pushEnd(new Uint8Array([1, 2, 3, 4, 5])); + expect(dq.size).to.equal(5); + + dq.pushEnd(new Uint8Array([6, 7, 8, 9])); + expect(dq.size).to.equal(9); + + let data = dq.popEnd(7); + expect(data).to.eql(new Uint8Array([3, 4, 5, 6, 7, 8, 9])); + + data = dq.popEnd(2); + expect(data).to.eql(new Uint8Array([1, 2])); + }); + + it('popStart() consumes the remainder', () => { + const dq = new ByteDeque(); + + dq.pushEnd(new Uint8Array([1, 2, 3])); + dq.pushEnd(new Uint8Array([4, 5, 6])); + + const data = dq.popStart(10); + expect(data).to.eql(new Uint8Array([1, 2, 3, 4, 5, 6])); + }); + + it('popEnd() consumes the remainder', () => { + const dq = new ByteDeque(); + + dq.pushEnd(new Uint8Array([1, 2, 3])); + dq.pushEnd(new Uint8Array([4, 5, 6])); + + const data = dq.popEnd(10); + expect(data).to.eql(new Uint8Array([1, 2, 3, 4, 5, 6])); + }); + + it('clear() clears the deque', () => { + const dq = new ByteDeque(); + dq.pushStart(new Uint8Array([1, 2, 3, 4, 5])); + expect(dq.isEmpty()).to.be.false; + + dq.clear(); + expect(dq.size).to.equal(0); + expect(dq.isEmpty()).to.be.true; + }); +}); diff --git a/src/core/streaming/__tests__/requestPool.spec.ts b/src/core/streaming/__tests__/requestPool.spec.ts new file mode 100644 index 000000000..ea8418539 --- /dev/null +++ b/src/core/streaming/__tests__/requestPool.spec.ts @@ -0,0 +1,52 @@ +import { RequestPool } from '@/src/core/streaming/requestPool'; +import { addEventListenerOnce } from '@/src/utils'; +import { describe, it } from 'vitest'; +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; + +chai.use(chaiAsPromised); + +// @ts-ignore +global.fetch = async (request: RequestInfo | URL, init?: RequestInit) => { + return new Promise((resolve, reject) => { + const timeout = setTimeout(resolve, 100); + if (init?.signal) { + addEventListenerOnce(init.signal, 'abort', (reason) => { + clearTimeout(timeout); + reject(reason ?? new Error('cancelled timeout')); + }); + } + }); +}; + +describe('requestPool', () => { + it('should not have more active requests than the pool size', () => { + const N = 4; + const pool = new RequestPool(N); + for (let i = 0; i < 10; i++) { + pool.fetch('url'); + } + expect(pool.activeConnections).to.equal(N); + }); + + it('should support removal of requests via an AbortController', () => { + const N = 4; + const pool = new RequestPool(N); + const controllers: AbortController[] = []; + const promises: Promise[] = []; + + for (let i = 0; i < 10; i++) { + const controller = new AbortController(); + controllers.push(controller); + promises.push(pool.fetch('url', { signal: controller.signal })); + } + + controllers.forEach((controller) => { + controller.abort(); + }); + + promises.forEach((promise) => { + expect(promise).to.be.rejected; + }); + }); +}); diff --git a/src/core/streaming/__tests__/resumableFetcher.spec.ts b/src/core/streaming/__tests__/resumableFetcher.spec.ts new file mode 100644 index 000000000..89bf113e0 --- /dev/null +++ b/src/core/streaming/__tests__/resumableFetcher.spec.ts @@ -0,0 +1,53 @@ +/* eslint-disable no-restricted-syntax */ +import { RequestPool } from '@/src/core/streaming/requestPool'; +import { + ResumableFetcher, + StopSignal, +} from '@/src/core/streaming/resumableFetcher'; +import { describe, expect, it } from 'vitest'; + +describe('ResumableFetcher', () => { + it('should support stopping and resuming', async () => { + const pool = new RequestPool(); + const fetcher = new ResumableFetcher( + 'https://data.kitware.com/api/v1/file/57b5d4648d777f10f2693e7e/download', + { + fetch: pool.fetch, + } + ); + + await fetcher.connect(); + let stream = fetcher.getStream(); + let size = 0; + try { + // @ts-ignore + for await (const chunk of stream) { + size += chunk.length; + if (size > 4096 * 3) { + break; + } + } + } catch (err) { + if (err !== StopSignal) throw err; + } finally { + fetcher.close(); + } + + await fetcher.connect(); + + // ensure we can read the stream multiple times + for (let i = 0; i < 2; i++) { + stream = fetcher.getStream(); + size = 0; + // @ts-ignore + // eslint-disable-next-line no-await-in-loop + for await (const chunk of stream) { + size += chunk.length; + } + + expect(size).to.equal(fetcher.size); + } + + fetcher.close(); + }); +}); diff --git a/src/core/streaming/__tests__/streamingByteReader.spec.ts b/src/core/streaming/__tests__/streamingByteReader.spec.ts new file mode 100644 index 000000000..cbd4e5dd5 --- /dev/null +++ b/src/core/streaming/__tests__/streamingByteReader.spec.ts @@ -0,0 +1,55 @@ +import StreamingByteReader from '@/src/core/streaming/streamingByteReader'; +import { asCoroutine } from '@/src/utils'; +import { describe, it, expect } from 'vitest'; + +describe('StreamingByteReader', () => { + it('read() should handle small chunks', () => { + const reader = new StreamingByteReader(); + const coro = asCoroutine(reader.read(10)); + + let result = coro(new Uint8Array([1, 2, 3, 4, 5])); + expect(result.done).to.be.false; + + result = coro(new Uint8Array([1, 2, 3, 4, 5])); + expect(result.done).to.be.true; + expect(result.value).to.eql(new Uint8Array([1, 2, 3, 4, 5, 1, 2, 3, 4, 5])); + }); + + it('read() should buffer large chunks', () => { + const reader = new StreamingByteReader(); + const gen = reader.read(5); + gen.next(); + + let result = gen.next(new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])); + expect(result.done).to.be.true; + expect(result.value).to.eql(new Uint8Array([1, 2, 3, 4, 5])); + + result = reader.read(5).next(); + expect(result.done).to.be.true; + expect(result.value).to.eql(new Uint8Array([6, 7, 8, 9, 10])); + }); + + it('readAscii() should work', () => { + const reader = new StreamingByteReader(); + const gen = reader.readAscii(4); + gen.next(); + + const result = gen.next(new Uint8Array([68, 73, 67, 77])); + expect(result.done).to.be.true; + expect(result.value).to.equal('DICM'); + }); + + it('seek() should work', () => { + const reader = new StreamingByteReader(); + const seekg = reader.seek(4); + seekg.next(); + + const seekres = seekg.next(new Uint8Array([1, 2, 3, 4, 5])); + expect(seekres.done).to.be.true; + expect(seekres.value).to.be.undefined; + + const readres = reader.read(1).next(); + expect(readres.done).to.be.true; + expect(readres.value).to.eql(new Uint8Array([5])); + }); +}); diff --git a/src/core/streaming/byteDeque.ts b/src/core/streaming/byteDeque.ts new file mode 100644 index 000000000..2515d3deb --- /dev/null +++ b/src/core/streaming/byteDeque.ts @@ -0,0 +1,163 @@ +/** + * A byte array deque. + */ +export default class ByteDeque { + private arrays: Uint8Array[]; + private _size = 0; + + constructor() { + this.arrays = []; + } + + get size() { + return this._size; + } + + isEmpty() { + return this.size === 0; + } + + clear() { + this.arrays.length = 0; + this._size = 0; + } + + /** + * Push a new Uint8Array to the end. + * @param {Uint8Array} bytes + */ + pushEnd(bytes: Uint8Array) { + this.arrays.push(bytes); + this._size += bytes.length; + } + + /** + * Push the contents of a ByteDeque to the start. + * @param {ByteDeque} bytes + */ + pushStart(bytes: ByteDeque): void; + + /** + * Push a new Uint8Array to the start. + * @param {Uint8Array} bytes + */ + pushStart(bytes: Uint8Array): void; + + pushStart(bytes: Uint8Array | ByteDeque) { + if (bytes instanceof ByteDeque) { + this.arrays.unshift(...bytes.arrays); + this._size += bytes.size; + bytes.clear(); + } else { + this.arrays.unshift(bytes); + this._size += bytes.length; + } + } + + /** + * Pop bytes off the end. + * @param {number} count + */ + popEnd(count: number): Uint8Array { + return this.consumeEnd(count, false) as Uint8Array; + } + + /** + * Erase bytes off the end. + * + * This is more efficient than ignoring the return value of popEnd + * due to not allocating memory for the return result. + * @param {number} count + */ + eraseEnd(count: number): void { + this.consumeEnd(count, true); + } + + private consumeEnd(count: number, discard = false) { + let processed = 0; + const bufferSize = Math.min(count, this.size); + const popped = discard ? null : new Uint8Array(bufferSize); + let writeOffset = bufferSize; + + while (processed < count && this.arrays.length) { + const bytes = this.arrays.pop()!; + const remaining = count - processed; + // chomp bytes[offset:] + const offset = remaining >= bytes.length ? 0 : bytes.length - remaining; + const takeEntireArray = offset === 0; + + if (popped) { + const toSet = takeEntireArray ? bytes : bytes.subarray(offset); + writeOffset -= toSet.length; + popped.set(toSet, writeOffset); + } + + if (!takeEntireArray) { + // put back remainder + this.arrays.push(bytes.subarray(0, offset)); + } + + processed += bytes.length - offset; + } + + return popped ?? undefined; + } + + /** + * Pop bytes off the start. + * @param {number} count + */ + popStart(count: number): Uint8Array { + return this.consumeStart(count, false) as Uint8Array; + } + + /** + * Erase bytes off the start. + * + * This is more efficient than ignoring the return value of popStart + * due to not allocating memory for the return result. + * @param {number} count + */ + eraseStart(count: number): void { + this.consumeStart(count, true); + } + + private consumeStart(count: number, discard = false) { + let processed = 0; + const bufferSize = Math.min(count, this.size); + const popped = discard ? null : new Uint8Array(bufferSize); + let writeOffset = 0; + + while (processed < count && this.arrays.length) { + const bytes = this.arrays.shift()!; + const remaining = count - processed; + // chomp bytes[:offset] + const offset = Math.min(remaining, bytes.length); + const takeEntireArray = offset === bytes.length; + + if (popped) { + const toSet = takeEntireArray ? bytes : bytes.subarray(0, offset); + popped.set(toSet, writeOffset); + writeOffset += toSet.length; + } + + if (!takeEntireArray) { + // put back remainder + this.arrays.unshift(bytes.subarray(offset)); + } + + processed += offset; + } + + this._size = Math.max(0, this._size - count); + return popped ?? undefined; + } + + popAll(discard = false) { + if (discard) { + this.arrays.length = 0; + return undefined; + } + return this.popStart(this.size); + } +} diff --git a/src/core/streaming/concatStreams.ts b/src/core/streaming/concatStreams.ts new file mode 100644 index 000000000..0406dde2b --- /dev/null +++ b/src/core/streaming/concatStreams.ts @@ -0,0 +1,37 @@ +/** + * Concatenates multiple streams together in order. + * @param streams + * @returns + */ +export function concatStreams( + ...streams: ReadableStream[] +): ReadableStream { + let reader: ReadableStreamDefaultReader | null = null; + return new ReadableStream({ + async pull(controller) { + let enqueued = false; + while (!enqueued && streams.length) { + if (!reader) { + reader = streams[0].getReader(); + } + + // eslint-disable-next-line no-await-in-loop + const result = await reader.read(); + + if (result.value) { + controller.enqueue(result.value); + enqueued = true; + } + + if (result.done) { + streams.shift(); + reader = null; + } + } + + if (streams.length === 0) { + controller.close(); + } + }, + }); +} diff --git a/src/core/streaming/httpCodes.ts b/src/core/streaming/httpCodes.ts new file mode 100644 index 000000000..a65871421 --- /dev/null +++ b/src/core/streaming/httpCodes.ts @@ -0,0 +1,5 @@ +export const HTTP_STATUS_OK = 200; +export const HTTP_STATUS_PARTIAL_CONTENT = 206; +export const HTTP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE = 416; + +export class HttpNotFound extends Error {} diff --git a/src/core/streaming/requestPool.ts b/src/core/streaming/requestPool.ts new file mode 100644 index 000000000..3534da7f4 --- /dev/null +++ b/src/core/streaming/requestPool.ts @@ -0,0 +1,122 @@ +import { Deferred, addEventListenerOnce, defer } from '@/src/utils'; + +const DEFAULT_POOL_SIZE = 6; + +let nextId = 0; +function getNextId() { + return ++nextId; +} + +interface FetchRequest { + id: number; + request: RequestInfo | URL; + init: RequestInit | undefined; + deferred: Deferred; +} + +/** + * Fixed-size pool for managing requests. + * + * Requests are processed in the order that they are received. + */ +export class RequestPool { + public readonly poolSize: number; + + private queue: FetchRequest[]; + private inflight: Set; + + constructor(poolSize = DEFAULT_POOL_SIZE) { + this.poolSize = poolSize; + this.queue = []; + this.inflight = new Set(); + } + + get activeConnections() { + return this.inflight.size; + } + + /** + * Queues up a fetch request. + * + * If an AbortSignal is provided and is triggered prior to + * the request actually starting, it will be removed from the queue. + * @param requestUrl + * @param options + * @returns + */ + fetch = (request: RequestInfo | URL, init?: RequestInit) => { + const id = this.pushToQueue({ + request, + init, + }); + const { deferred } = this.queue.at(-1)!; + + if (init?.signal) { + addEventListenerOnce(init.signal, 'abort', () => { + const idx = this.queue.findIndex((req) => req.id === id); + if (idx > -1) this.queue.splice(idx, 1); + }); + } + + this.processQueue(); + return deferred.promise; + }; + + /** + * Adds a fetch request to the internal queue. + * @param req + * @returns + */ + private pushToQueue(req: Omit) { + const id = getNextId(); + const deferred = defer(); + this.queue.push({ ...req, id, deferred }); + return id; + } + + /** + * Begins processing the queue. + * @returns + */ + private processQueue() { + if (this.inflight.size === this.poolSize) return; + if (this.queue.length === 0) return; + this.startRequest(this.queue.shift()!); + } + + /** + * Acts on a fetch request. + * @param req + */ + private async startRequest(req: FetchRequest) { + const { id, deferred, request, init } = req; + this.inflight.add(id); + + try { + const resp = await fetch(request, init); + deferred.resolve(resp); + } catch (err) { + deferred.reject(err); + } finally { + this.inflight.delete(id); + // continue processing the queue + this.processQueue(); + } + } +} + +let currentRequestPool: RequestPool = new RequestPool(); + +/** + * Gets the current request pool. + * @returns + */ +export const getRequestPool = () => currentRequestPool; + +/** + * Sets the current request pool. + * @param pool + */ +export function setCurrentRequestPool(pool: RequestPool) { + currentRequestPool = pool; +} diff --git a/src/core/streaming/resumableFetcher.ts b/src/core/streaming/resumableFetcher.ts new file mode 100644 index 000000000..2d4ed9586 --- /dev/null +++ b/src/core/streaming/resumableFetcher.ts @@ -0,0 +1,172 @@ +import { concatStreams } from '@/src/core/streaming/concatStreams'; +import { + HTTP_STATUS_OK, + HTTP_STATUS_PARTIAL_CONTENT, + HTTP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE, + HttpNotFound, +} from '@/src/core/streaming/httpCodes'; +import { Fetcher, FetcherInit } from '@/src/core/streaming/types'; +import { Maybe } from '@/src/types'; + +type FetchFunction = typeof fetch; + +export interface ResumableRequestInit extends RequestInit { + prefixChunks?: Uint8Array[]; + contentLength?: number; + fetch?: FetchFunction; +} + +export const StopSignal = Symbol('StopSignal'); + +/** + * A resumable fetcher that caches previously downloaded partial streams. + * + * This fetcher falls back to downloading the entire stream if the server does + * not support the Range header with bytes. + * + * A new call to start() will stream the cached stream until empty, after which + * the partial response is streamed. + */ +export class ResumableFetcher implements Fetcher { + private abortController: Maybe; + private fetch: typeof fetch; + private chunks: Uint8Array[]; + private contentLength: number | null = null; + private activeNetworkStream: ReadableStream | null = null; + public contentType: string = ''; + + constructor( + private request: RequestInfo | URL, + private init?: ResumableRequestInit + ) { + this.chunks = [...(init?.prefixChunks ?? [])]; + this.contentLength = init?.contentLength ?? null; + this.fetch = init?.fetch ?? globalThis.fetch; + } + + get connected() { + return !!this.abortController; + } + + get size() { + return this.chunks.reduce((sum, chunk) => sum + chunk.length, 0); + } + + get cachedChunks() { + return this.chunks; + } + + get abortSignal() { + return this.abortController?.signal; + } + + async connect(init?: FetcherInit) { + if (this.connected) return; + + this.abortController = init?.abortController ?? new AbortController(); + this.abortController.signal.addEventListener('abort', () => this.cleanup()); + + // do not actually send the request, since we've cached the entire response + if (this.size === this.contentLength) return; + + // Use fromEntries as a workaround to handle + // jsdom not setting Range properly. + const headers = Object.fromEntries(new Headers(this.init?.headers ?? {})); + if (this.size > 0) { + headers.Range = `bytes=${this.size}-`; + } + + const response = await this.fetch(new Request(this.request), { + ...this.init, + headers, + signal: this.abortController.signal, + }); + + this.contentType = response.headers.get('content-type') ?? ''; + + if (this.size === 0 && response.headers.has('content-length')) { + this.contentLength = Number(response.headers.get('content-length')!); + } + + if (!response.body) throw new Error('Did not receive a response body'); + + const noMoreContent = response.headers.get('content-length') === '0'; + const rangeNotSatisfiable = + response.status === HTTP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE; + + if (rangeNotSatisfiable && !noMoreContent) { + throw new Error('Range could not be satisfied'); + } + + if ( + !noMoreContent && + response.status !== HTTP_STATUS_OK && + response.status !== HTTP_STATUS_PARTIAL_CONTENT + ) { + throw new HttpNotFound(); + } + + if (!noMoreContent && response.status !== HTTP_STATUS_PARTIAL_CONTENT) { + this.chunks = []; + } + + this.activeNetworkStream = this.wrapNetworkStream(response.body); + } + + close() { + if (!this.abortController) return; + this.abortController.abort(StopSignal); + this.cleanup(); + } + + getStream() { + return concatStreams(this.getDataChunksAsStream(), this.getNetworkStream()); + } + + async blob() { + const stream = this.getStream(); + const sink = new WritableStream(); + await stream.pipeTo(sink); + return new Blob(this.chunks, { type: this.contentType }); + } + + private cleanup() { + this.activeNetworkStream = null; + this.abortController = null; + } + + private getDataChunksAsStream() { + let i = 0; + const self = this; + return new ReadableStream({ + pull(controller) { + if (i < self.chunks.length) { + controller.enqueue(self.chunks[i]); + i += 1; + } else { + controller.close(); + } + }, + }); + } + + private wrapNetworkStream(stream: ReadableStream) { + const cacheChunkStream = new TransformStream({ + transform: (chunk, controller) => { + this.chunks.push(chunk); + controller.enqueue(chunk); + }, + }); + + return stream.pipeThrough(cacheChunkStream); + } + + private getNetworkStream() { + if (!this.activeNetworkStream) { + return new ReadableStream(); + } + const [s1, s2] = this.activeNetworkStream.tee(); + this.activeNetworkStream = s2; + return s1; + } +} diff --git a/src/core/streaming/streamingByteReader.ts b/src/core/streaming/streamingByteReader.ts new file mode 100644 index 000000000..df8adf8f9 --- /dev/null +++ b/src/core/streaming/streamingByteReader.ts @@ -0,0 +1,121 @@ +import ByteDeque from '@/src/core/streaming/byteDeque.js'; +import { toAscii } from '@/src/utils'; + +/** + * StreamingByteReader + * + * Expects Uint8Array inputs + */ +export default class StreamingByteReader { + protected leftover: ByteDeque; + protected pos = 0; + + constructor() { + this.leftover = new ByteDeque(); + } + + get position() { + return this.pos; + } + + /** + * Seeks along the byte stream. + * + * No negative values. + * @param offset + */ + *seek(offset: number) { + if (offset < 0) { + throw new Error('Offset must not be negative'); + } + + while (this.leftover.size < offset) { + this.leftover.pushEnd(yield); + } + + this.leftover.eraseStart(offset); + this.pos += offset; + } + + /** + * Reads a number of bytes. + * @param length + * @param param1 + * @returns + */ + *read( + length: number, + { peek = false } = {} + ): Generator { + if (length <= 0) { + throw new Error('Length must be a positive number'); + } + + while (this.leftover.size < length) { + this.leftover.pushEnd(yield); + } + + const data = this.leftover.popStart(length); + + if (peek) { + this.leftover.pushStart(data); + } else { + this.pos += length; + } + + return data; + } + + /** + * Reads an ASCII string. + * @param length + * @param param1 + * @returns + */ + *readAscii(length: number, { ignoreNulls = false, peek = false } = {}) { + const bytes = yield* this.read(length, { peek }); + return toAscii(bytes, { ignoreNulls }); + } + + /** + * + * @param {'getUint8' | 'getInt8' | ...} method + * @param length must be the length associated with the method + */ + private *readDataView( + method: T extends `get${infer R}` ? `get${R}` : never, + length: number, + { littleEndian = false, peek = false } = {} + ) { + const bytes = yield* this.read(length, { peek }); + const dv = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength); + if (method === 'getUint8' || method === 'getInt8') { + return dv[method](0) as number; + } + return dv[method](0, littleEndian) as number; + } + + *readUint8() { + return yield* this.readDataView('getUint8', 1); + } + + *readInt8() { + return yield* this.readDataView('getInt8', 1); + } + + *readUint16(opts = {}) { + return yield* this.readDataView('getUint16', 2, opts); + } + + *readInt16(opts = {}) { + return yield* this.readDataView('getInt16', 2, opts); + } + + *readUint32(opts = {}) { + return yield* this.readDataView('getUint32', 4, opts); + } + + *readInt32(opts = {}) { + return yield* this.readDataView('getInt32', 4, opts); + } +} diff --git a/src/core/streaming/types.ts b/src/core/streaming/types.ts new file mode 100644 index 000000000..afd67495c --- /dev/null +++ b/src/core/streaming/types.ts @@ -0,0 +1,20 @@ +/** + * Init options for a Fetcher. + */ +export interface FetcherInit { + abortController?: AbortController; +} + +/** + * A fetcher that caches an incoming stream. + */ +export interface Fetcher { + connect(): Promise; + getStream(): ReadableStream; + blob(): Promise; + close(): void; + cachedChunks: Uint8Array[]; + connected: boolean; + size: number; + abortSignal?: AbortSignal; +} diff --git a/src/io/import/dataSource.ts b/src/io/import/dataSource.ts index df07bb8b4..a6cc38a48 100644 --- a/src/io/import/dataSource.ts +++ b/src/io/import/dataSource.ts @@ -1,3 +1,4 @@ +import { Fetcher } from '@/src/core/streaming/types'; import { Maybe, PartialWithRequired } from '@/src/types'; /** @@ -9,6 +10,8 @@ import { Maybe, PartialWithRequired } from '@/src/types'; export interface UriSource { uri: string; name: string; + mime?: string; + fetcher?: Fetcher; } /** @@ -32,12 +35,12 @@ export interface ArchiveSource { } /** - * Used to collect DICOM file data sources. + * Represents a collection of data sources. * - * This is currently used for consolidating multiple DICOM files into one - * DataSource for error stack trace purposes. + * This is used for data that is derived from a colleciton of data sources, + * e.g. reconstructed DICOM. */ -export interface DicomSource { +export interface CollectionSource { // eslint-disable-next-line no-use-before-define sources: DataSource[]; } @@ -52,13 +55,12 @@ export interface DicomSource { * - { uriSrc }: a file that has yet to be downloaded. * - { fileSrc, parent: { uriSrc } }: a file with URI provenance info. * - { fileSrc, archiveSrc, parent }: a file originating from an archive. - * - { dicomSrc }: a list of dicom data sources. */ export interface DataSource { fileSrc?: FileSource; uriSrc?: UriSource; archiveSrc?: ArchiveSource; - dicomSrc?: DicomSource; + collectionSrc?: CollectionSource; parent?: DataSource; } @@ -84,10 +86,15 @@ export const fileToDataSource = (file: File): DataSource => ({ * @param uri * @returns */ -export const uriToDataSource = (uri: string, name: string): DataSource => ({ +export const uriToDataSource = ( + uri: string, + name: string, + mime?: string +): DataSource => ({ uriSrc: { uri, name, + mime, }, }); @@ -145,8 +152,8 @@ export function getDataSourceName(ds: Maybe): Maybe { return ds.uriSrc.name; } - if (ds?.dicomSrc?.sources.length) { - const { sources } = ds.dicomSrc; + if (ds?.collectionSrc?.sources.length) { + const { sources } = ds.collectionSrc; const [first] = sources; const more = sources.length > 1 ? ` (+${sources.length - 1} more)` : ''; return `${getDataSourceName(first)}${more}`; @@ -165,7 +172,13 @@ export function getDataSourceName(ds: Maybe): Maybe { */ export function serializeDataSource(ds: DataSource) { const output = { ...ds }; + + if (output.uriSrc) { + delete output.uriSrc.fetcher; + } + delete output.fileSrc; + if (output.parent) { output.parent = serializeDataSource(output.parent); } diff --git a/src/io/import/importDataSources.ts b/src/io/import/importDataSources.ts index a92598259..6b8b41130 100644 --- a/src/io/import/importDataSources.ts +++ b/src/io/import/importDataSources.ts @@ -9,9 +9,8 @@ import { isLoadableResult, VolumeResult, } from '@/src/io/import/common'; -import { DataSource, DataSourceWithFile } from '@/src/io/import/dataSource'; +import { FileDataSource, DataSource } from '@/src/io/import/dataSource'; import handleDicomFile from '@/src/io/import/processors/handleDicomFile'; -import downloadUrl from '@/src/io/import/processors/downloadUrl'; import extractArchive from '@/src/io/import/processors/extractArchive'; import extractArchiveTargetFromCache from '@/src/io/import/processors/extractArchiveTarget'; import handleAmazonS3 from '@/src/io/import/processors/handleAmazonS3'; @@ -23,6 +22,9 @@ import updateFileMimeType from '@/src/io/import/processors/updateFileMimeType'; import handleConfig from '@/src/io/import/processors/handleConfig'; import { useDICOMStore } from '@/src/store/datasets-dicom'; import { applyConfig } from '@/src/io/import/configJson'; +import updateUriType from '@/src/io/import/processors/updateUriType'; +import openUriStream from '@/src/io/import/processors/openUriStream'; +import downloadStream from '@/src/io/import/processors/downloadStream'; /** * Tries to turn a thrown object into a meaningful error string. @@ -88,11 +90,9 @@ const importConfigs = async ( } }; -const importDicomFiles = async ( - dicomDataSources: Array -) => { +const importDicomFiles = async (dicomDataSources: Array) => { const resultSources: DataSource = { - dicomSrc: { + collectionSrc: { sources: dicomDataSources, }, }; @@ -126,21 +126,30 @@ const importDicomFiles = async ( } }; -export async function importDataSources(dataSources: DataSource[]) { +export async function importDataSources( + dataSources: DataSource[] +): Promise[]> { const importContext = { fetchFileCache: new Map(), - dicomDataSources: [] as DataSourceWithFile[], + dicomDataSources: [], }; const middleware = [ - // updating the file type should be first in the pipeline + openUriStream, + + // updating the file/uri type should be first step in the pipeline updateFileMimeType, + updateUriType, + // before extractArchive as .zip extension is part of state file check restoreStateFile, handleRemoteManifest, handleGoogleCloudStorage, handleAmazonS3, - downloadUrl, + + // stream handling + downloadStream, + extractArchiveTargetFromCache, extractArchive, handleConfig, // collect config files to apply later diff --git a/src/io/import/processors/downloadStream.ts b/src/io/import/processors/downloadStream.ts new file mode 100644 index 000000000..5e7333ad3 --- /dev/null +++ b/src/io/import/processors/downloadStream.ts @@ -0,0 +1,47 @@ +import { ImportHandler } from '@/src/io/import/common'; +import { ensureError } from '@/src/utils'; + +/** + * Downloads a URL to a file DataSource. + * + * Input: { uriSrc } + * Output: { fileSrc, uriSrc } + * + * Provides optional caching if the execution context provides a cache. + * @param dataSource + * @returns + */ +const downloadStream: ImportHandler = async (dataSource, { execute, done }) => { + const { fileSrc, uriSrc } = dataSource; + if (fileSrc || !uriSrc?.fetcher) { + return dataSource; + } + + const { fetcher } = uriSrc; + await fetcher.connect(); + + try { + const blob = await fetcher.blob(); + const file = new File([blob], uriSrc.name, { + type: uriSrc.mime, + }); + + execute({ + ...dataSource, + fileSrc: { + file, + fileType: file.type, + }, + }); + return done(); + } catch (err) { + throw new Error( + `Could not download stream associated with URL ${uriSrc.uri}`, + { + cause: ensureError(err), + } + ); + } +}; + +export default downloadStream; diff --git a/src/io/import/processors/openUriStream.ts b/src/io/import/processors/openUriStream.ts new file mode 100644 index 000000000..f17fc31b5 --- /dev/null +++ b/src/io/import/processors/openUriStream.ts @@ -0,0 +1,36 @@ +import { getRequestPool } from '@/src/core/streaming/requestPool'; +import { ResumableFetcher } from '@/src/core/streaming/resumableFetcher'; +import { ImportHandler } from '@/src/io/import/common'; +import { canFetchUrl } from '@/src/utils/fetch'; + +const openUriStream: ImportHandler = async (dataSource, { onCleanup }) => { + const { uriSrc } = dataSource; + if (!uriSrc || !canFetchUrl(uriSrc.uri)) { + return dataSource; + } + + if (uriSrc.fetcher?.connected) { + return dataSource; + } + + const fetcher = new ResumableFetcher(uriSrc.uri, { + fetch: (...args) => getRequestPool().fetch(...args), + }); + + await fetcher.connect(); + + // ensure we close the connection on completion + onCleanup(() => { + fetcher.close(); + }); + + return { + ...dataSource, + uriSrc: { + ...uriSrc, + fetcher, + }, + }; +}; + +export default openUriStream; diff --git a/src/io/import/processors/updateUriType.ts b/src/io/import/processors/updateUriType.ts new file mode 100644 index 000000000..d0c5434db --- /dev/null +++ b/src/io/import/processors/updateUriType.ts @@ -0,0 +1,54 @@ +import StreamingByteReader from '@/src/core/streaming/streamingByteReader'; +import { ImportHandler } from '@/src/io/import/common'; +import { getFileMimeFromMagicStream } from '@/src/io/magic'; +import { asCoroutine } from '@/src/utils'; + +const DoneSignal = Symbol('DoneSignal'); + +function detectStreamType(stream: ReadableStream) { + return new Promise((resolve, reject) => { + const reader = new StreamingByteReader(); + const consume = asCoroutine(getFileMimeFromMagicStream(reader)); + + const writableStream = new WritableStream({ + write(chunk) { + const result = consume(chunk); + if (result.done) { + const mime = result.value; + resolve(mime ?? ''); + throw DoneSignal; + } + }, + }); + + stream.pipeTo(writableStream).catch((err) => { + if (err !== DoneSignal) { + reject(err); + } + }); + }); +} + +const updateUriType: ImportHandler = async (dataSource) => { + const { fileSrc, uriSrc } = dataSource; + if (fileSrc || !uriSrc?.fetcher) { + return dataSource; + } + + const { fetcher } = uriSrc; + + const stream = fetcher.getStream(); + const mime = await detectStreamType(stream); + + const streamDataSource = { + ...dataSource, + uriSrc: { + ...uriSrc, + mime, + }, + }; + + return streamDataSource; +}; + +export default updateUriType; diff --git a/src/io/magic.ts b/src/io/magic.ts index ab62b83d2..980e7f06b 100644 --- a/src/io/magic.ts +++ b/src/io/magic.ts @@ -1,9 +1,11 @@ +import StreamingByteReader from '@/src/core/streaming/streamingByteReader'; import { Maybe } from '../types'; interface MagicDatabase { mime: string; header: number[]; skip?: number; + readTotal: number; } /** @@ -16,13 +18,18 @@ const FILE_MAGIC_DB: MagicDatabase[] = [ skip: 128, header: Array.from('DICM').map((c) => c.charCodeAt(0)), }, -]; +].map((magic) => ({ + ...magic, + readTotal: (magic.skip ?? 0) + magic.header.length, +})); // How much data to read when extracting file magic. -// This should be generous enough for most files. -const HEAD_CHUNK = 512; +const HEAD_CHUNK = Math.max(...FILE_MAGIC_DB.map((magic) => magic.readTotal)); -function prefixEquals(target: Uint8Array, prefix: number[] | Uint8Array) { +function prefixEquals( + target: number[] | Uint8Array, + prefix: number[] | Uint8Array +) { if (prefix.length > target.length) { return false; } @@ -52,3 +59,29 @@ export async function getFileMimeFromMagic(file: File): Promise> { reader.readAsArrayBuffer(head); }); } + +const STREAMING_MAGIC_ORDER = FILE_MAGIC_DB.sort( + (a, b) => a.readTotal - b.readTotal +); + +/** + * Gets the mime type from a streaming byte reader. + * + * Also returns the file stream prefix. + * @param stream + * @returns + */ +export function* getFileMimeFromMagicStream(reader: StreamingByteReader) { + const head: number[] = []; + for (let i = 0; i < STREAMING_MAGIC_ORDER.length; i++) { + const { mime, header, readTotal, skip = 0 } = STREAMING_MAGIC_ORDER[i]; + if (head.length < readTotal) { + const bytes = yield* reader.read(readTotal - head.length); + head.push(...bytes); + } + if (prefixEquals(head.slice(skip), header)) { + return mime; + } + } + return null; +} diff --git a/src/utils/index.ts b/src/utils/index.ts index 28a83ac51..e04429489 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -56,7 +56,7 @@ export const isFulfilled = ( ): input is PromiseFulfilledResult => input.status === 'fulfilled'; type PromiseResolveFunction = (value: T) => void; -type PromiseRejectFunction = (reason?: Error) => void; +type PromiseRejectFunction = (reason?: any) => void; export interface Deferred { promise: Promise; resolve: PromiseResolveFunction; @@ -326,3 +326,53 @@ export function normalizeForStore(objects: T[], key: K) { return { order, byKey }; } + +/** + * Listens for an event once. + * @param target + * @param event + * @param callback + */ +export function addEventListenerOnce( + target: T, + event: string, + callback: (...args: any[]) => any +) { + const handler = () => { + target.removeEventListener(event, handler); + return callback(); + }; + target.addEventListener(event, handler); +} + +/** + * Converts a byte sequence to ASCII. + * @param bytes + * @param param1 + * @returns + */ +export function toAscii( + bytes: Uint8Array | Uint8ClampedArray, + { ignoreNulls = false } = {} +) { + const chars = []; + for (let i = 0; i < bytes.length; i++) { + if (!(ignoreNulls && bytes[i] === 0)) { + chars.push(String.fromCharCode(bytes[i])); + } + } + return chars.join(''); +} + +/** + * Wraps a generator as a coroutine. + * @param generator + * @param args + * @returns + */ +export function asCoroutine(gen: Generator) { + // run initial code + const result = gen.next(); + if (result.done) return () => result; + return (value: N): IteratorResult => gen.next(value); +} From ccc0625a3744dd627ef7d7fa2937dd1c6c301dc8 Mon Sep 17 00:00:00 2001 From: Forrest Date: Tue, 9 Jan 2024 16:16:57 -0500 Subject: [PATCH 03/20] feat(streaming): add DICOM stream handler --- src/core/dicomTags.ts | 34 ++ src/core/stateMachine.ts | 113 ++++++ .../__tests__/chunkStateMachine.spec.ts | 38 ++ src/core/streaming/chunk.ts | 138 +++++++ src/core/streaming/chunkStateMachine.ts | 41 ++ .../dicom/__tests__/dicomMetaLoader.spec.ts | 27 ++ src/core/streaming/dicom/dicomDataLoader.ts | 27 ++ .../streaming/dicom/dicomFileDataLoader.ts | 15 + .../streaming/dicom/dicomFileMetaLoader.ts | 22 ++ src/core/streaming/dicom/dicomMetaLoader.ts | 66 ++++ src/core/streaming/dicom/dicomParser.ts | 372 ++++++++++++++++++ src/core/streaming/types.ts | 28 ++ src/io/import/processors/handleDicomStream.ts | 51 +++ 13 files changed, 972 insertions(+) create mode 100644 src/core/dicomTags.ts create mode 100644 src/core/stateMachine.ts create mode 100644 src/core/streaming/__tests__/chunkStateMachine.spec.ts create mode 100644 src/core/streaming/chunk.ts create mode 100644 src/core/streaming/chunkStateMachine.ts create mode 100644 src/core/streaming/dicom/__tests__/dicomMetaLoader.spec.ts create mode 100644 src/core/streaming/dicom/dicomDataLoader.ts create mode 100644 src/core/streaming/dicom/dicomFileDataLoader.ts create mode 100644 src/core/streaming/dicom/dicomFileMetaLoader.ts create mode 100644 src/core/streaming/dicom/dicomMetaLoader.ts create mode 100644 src/core/streaming/dicom/dicomParser.ts create mode 100644 src/io/import/processors/handleDicomStream.ts diff --git a/src/core/dicomTags.ts b/src/core/dicomTags.ts new file mode 100644 index 000000000..d1632d95b --- /dev/null +++ b/src/core/dicomTags.ts @@ -0,0 +1,34 @@ +interface Tag { + name: string; + tag: string; +} + +const tags: Tag[] = [ + { name: 'PatientName', tag: '0010|0010' }, + { name: 'PatientID', tag: '0010|0020' }, + { name: 'PatientBirthDate', tag: '0010|0030' }, + { name: 'PatientSex', tag: '0010|0040' }, + { name: 'StudyInstanceUID', tag: '0020|000d' }, + { name: 'StudyDate', tag: '0008|0020' }, + { name: 'StudyTime', tag: '0008|0030' }, + { name: 'StudyID', tag: '0020|0010' }, + { name: 'AccessionNumber', tag: '0008|0050' }, + { name: 'StudyDescription', tag: '0008|1030' }, + { name: 'Modality', tag: '0008|0060' }, + { name: 'SeriesInstanceUID', tag: '0020|000e' }, + { name: 'SeriesNumber', tag: '0020|0011' }, + { name: 'SeriesDescription', tag: '0008|103e' }, + { name: 'WindowLevel', tag: '0028|1050' }, + { name: 'WindowWidth', tag: '0028|1051' }, + { name: 'Rows', tag: '0028|0010' }, + { name: 'Columns', tag: '0028|0011' }, + { name: 'BitsAllocated', tag: '0028|0100' }, + { name: 'PixelRepresentation', tag: '0028|0103' }, + { name: 'ImagePositionPatient', tag: '0020|0032' }, + { name: 'ImageOrientationPatient', tag: '0020|0037' }, + { name: 'PixelSpacing', tag: '0028|0030' }, + { name: 'SamplesPerPixel', tag: '0028|0002' }, +]; + +export const TAG_TO_NAME = new Map(tags.map((t) => [t.tag, t.name])); +export const NAME_TO_TAG = new Map(tags.map((t) => [t.name, t.tag])); diff --git a/src/core/stateMachine.ts b/src/core/stateMachine.ts new file mode 100644 index 000000000..63c0488f4 --- /dev/null +++ b/src/core/stateMachine.ts @@ -0,0 +1,113 @@ +import mitt, { Emitter } from 'mitt'; + +/** + * The payload of an update event. + */ +export type UpdateEvent = { + state: S; + prevState: S; + event: TE; + data?: any; +}; + +/** + * The available events emitted internally by a state machine. + */ +export type StateMachineEvents = { + update: UpdateEvent; +}; + +/** + * The available state transitions. + */ +export type StateTransitions = Partial< + Record>> +>; + +/** + * Tests to see if a state event is entering a given state. + * @param event + * @param state + * @returns + */ +export function enters(event: UpdateEvent, state: S) { + return event.state === state; +} + +/** + * Tests to see if a state event is leaving a given state. + * @param event + * @param state + * @returns + */ +export function leaves(event: UpdateEvent, state: S) { + return event.prevState === state; +} + +/** + * Represents a state machine. + */ +export default class StateMachine< + S extends string, + TE extends string, + E extends StateMachineEvents = StateMachineEvents +> { + private _events: Emitter; + private _state: S; + private _transitions: StateTransitions; + + constructor(initState: S, transitions: StateTransitions) { + this._state = initState; + this._transitions = transitions; + this._events = mitt(); + } + + get state() { + return this._state; + } + + /** + * Send an event to the state machine to trigger a transition. + * + * Event data will be broadcasted to update subscribers. + * @param event + * @param eventData + * @returns + */ + send(event: TE, eventData?: any) { + if (!(this._state in this._transitions)) return; + const stateTransitions = this._transitions[this._state]!; + + if (!(event in stateTransitions)) return; + const prevState = this._state; + this._state = stateTransitions[event]!; + this._events.emit('update', { + state: this._state, + prevState, + event, + data: eventData, + }); + } + + /** + * Subscribe to the update event. + * + * Returns a function to unsubscribe. + * @param callback + * @returns + */ + subscribe(callback: (event: UpdateEvent) => void) { + this._events.on('update', callback); + return () => { + this.unsubscribe(callback); + }; + } + + /** + * Unsubscribe from the update event. + * @param callback + */ + unsubscribe(callback: (event: UpdateEvent) => void) { + this._events.off('update', callback); + } +} diff --git a/src/core/streaming/__tests__/chunkStateMachine.spec.ts b/src/core/streaming/__tests__/chunkStateMachine.spec.ts new file mode 100644 index 000000000..4933fd158 --- /dev/null +++ b/src/core/streaming/__tests__/chunkStateMachine.spec.ts @@ -0,0 +1,38 @@ +import { + ChunkState, + ChunkStateMachine, + TransitionEvent, +} from '@/src/core/streaming/chunkStateMachine'; +import { describe, expect, it } from 'vitest'; + +describe('chunk', () => { + describe('state machine', () => { + it('should transition properly', () => { + const machine = new ChunkStateMachine(); + + expect(machine.state).to.equal(ChunkState.Init); + + [ + TransitionEvent.LoadData, + TransitionEvent.MetaLoaded, + TransitionEvent.DataLoaded, + TransitionEvent.Cancel, + ].forEach((event) => { + machine.send(event); + expect(machine.state).to.equal(ChunkState.Init); + }); + + machine.send(TransitionEvent.LoadMeta); + expect(machine.state).to.equal(ChunkState.MetaLoading); + + machine.send(TransitionEvent.MetaLoaded); + expect(machine.state).to.equal(ChunkState.MetaOnly); + + machine.send(TransitionEvent.LoadData); + expect(machine.state).to.equal(ChunkState.DataLoading); + + machine.send(TransitionEvent.DataLoaded); + expect(machine.state).to.equal(ChunkState.Loaded); + }); + }); +}); diff --git a/src/core/streaming/chunk.ts b/src/core/streaming/chunk.ts new file mode 100644 index 000000000..dc46af051 --- /dev/null +++ b/src/core/streaming/chunk.ts @@ -0,0 +1,138 @@ +import { UpdateEvent, enters, leaves } from '@/src/core/stateMachine'; +import { + ChunkState, + ChunkStateMachine, + TransitionEvent, +} from '@/src/core/streaming/chunkStateMachine'; +import { DataLoader, MetaLoader } from '@/src/core/streaming/types'; +import mitt, { Emitter } from 'mitt'; + +type ChunkEvents = { + doneMeta: void; + doneData: void; + error: any; +}; + +interface ChunkEventData { + error?: Error; +} + +interface ChunkInit { + metaLoader: MetaLoader; + dataLoader: DataLoader; +} + +/** + * Represents a data chunk. + */ +export class Chunk { + private machine: ChunkStateMachine; + private metaLoader: MetaLoader; + private dataLoader: DataLoader; + private events: Emitter; + + constructor(init: ChunkInit) { + this.metaLoader = init.metaLoader; + this.dataLoader = init.dataLoader; + + this.machine = new ChunkStateMachine(); + this.machine.subscribe(this.onStateUpdated); + + this.events = mitt(); + } + + dispose() { + this.machine.unsubscribe(this.onStateUpdated); + this.events.all.clear(); + } + + get state() { + return this.machine.state; + } + + get metadata() { + return this.metaLoader.meta; + } + + get metaBlob() { + return this.metaLoader.metaBlob; + } + + get dataBlob() { + return this.dataLoader.data; + } + + loadMeta() { + if (this.machine.state !== ChunkState.Init) { + return Promise.resolve(); + } + return new Promise((resolve, reject) => { + this.machine.send(TransitionEvent.LoadMeta); + this.events.on('doneMeta', () => { + this.cleanupEventListeners(); + resolve(); + }); + this.events.on('error', (error) => { + this.cleanupEventListeners(); + reject(error); + }); + }); + } + + loadData() { + if (this.machine.state !== ChunkState.MetaOnly) { + return Promise.resolve(); + } + return new Promise((resolve, reject) => { + this.machine.send(TransitionEvent.LoadData); + this.events.on('doneData', () => { + this.cleanupEventListeners(); + resolve(); + }); + this.events.on('error', (error) => { + this.cleanupEventListeners(); + reject(error); + }); + }); + } + + private cleanupEventListeners() { + this.events.off('doneMeta'); + this.events.off('doneData'); + this.events.off('error'); + } + + private onStateUpdated = async ( + event: UpdateEvent + ) => { + const data = event.data as ChunkEventData; + + if (data?.error) { + this.events.emit('error', data.error); + } + + if (leaves(event, ChunkState.MetaLoading)) { + this.metaLoader.stop(); + } else if (leaves(event, ChunkState.DataLoading)) { + this.dataLoader.stop(); + } + + if (event.event === TransitionEvent.MetaLoaded) { + this.events.emit('doneMeta'); + } else if (event.event === TransitionEvent.DataLoaded) { + this.events.emit('doneData'); + } + + try { + if (enters(event, ChunkState.MetaLoading)) { + await this.metaLoader.load(); + this.machine.send(TransitionEvent.MetaLoaded); + } else if (enters(event, ChunkState.DataLoading)) { + await this.dataLoader.load(); + this.machine.send(TransitionEvent.DataLoaded); + } + } catch (error) { + this.machine.send(TransitionEvent.Cancel, { error }); + } + }; +} diff --git a/src/core/streaming/chunkStateMachine.ts b/src/core/streaming/chunkStateMachine.ts new file mode 100644 index 000000000..638f0292f --- /dev/null +++ b/src/core/streaming/chunkStateMachine.ts @@ -0,0 +1,41 @@ +import StateMachine from '@/src/core/stateMachine'; + +export enum ChunkState { + Init = 'Init', + MetaLoading = 'MetaLoading', + MetaOnly = 'MetaOnly', + DataLoading = 'DataLoading', + Loaded = 'Loaded', +} + +export enum TransitionEvent { + LoadMeta = 'LoadMeta', + MetaLoaded = 'MetaLoaded', + LoadData = 'LoadData', + DataLoaded = 'DataLoaded', + Cancel = 'Cancel', +} + +export class ChunkStateMachine extends StateMachine< + ChunkState, + TransitionEvent +> { + constructor() { + super(ChunkState.Init, { + [ChunkState.Init]: { + [TransitionEvent.LoadMeta]: ChunkState.MetaLoading, + }, + [ChunkState.MetaLoading]: { + [TransitionEvent.Cancel]: ChunkState.Init, + [TransitionEvent.MetaLoaded]: ChunkState.MetaOnly, + }, + [ChunkState.MetaOnly]: { + [TransitionEvent.LoadData]: ChunkState.DataLoading, + }, + [ChunkState.DataLoading]: { + [TransitionEvent.DataLoaded]: ChunkState.Loaded, + [TransitionEvent.Cancel]: ChunkState.MetaOnly, + }, + }); + } +} diff --git a/src/core/streaming/dicom/__tests__/dicomMetaLoader.spec.ts b/src/core/streaming/dicom/__tests__/dicomMetaLoader.spec.ts new file mode 100644 index 000000000..3c63f1396 --- /dev/null +++ b/src/core/streaming/dicom/__tests__/dicomMetaLoader.spec.ts @@ -0,0 +1,27 @@ +import { DicomMetaLoader } from '@/src/core/streaming/dicom/dicomMetaLoader'; +import { RequestPool } from '@/src/core/streaming/requestPool'; +import { ResumableFetcher } from '@/src/core/streaming/resumableFetcher'; +import { describe, it, expect } from 'vitest'; + +describe('dicomMetaLoader', () => { + it('should load only metadata', async () => { + const pool = new RequestPool(); + const fetcher = new ResumableFetcher( + 'https://data.kitware.com/api/v1/file/57b5d4648d777f10f2693e7e/download', + { + fetch: pool.fetch, + } + ); + const loader = new DicomMetaLoader(fetcher, () => { + return []; + }); + await loader.load(); + + const downloaded = fetcher.dataChunks.reduce( + (sum, chunk) => sum + chunk.length, + 0 + ); + // metadata header fits within 4096 + expect(downloaded).to.be.lessThanOrEqual(4096); + }); +}); diff --git a/src/core/streaming/dicom/dicomDataLoader.ts b/src/core/streaming/dicom/dicomDataLoader.ts new file mode 100644 index 000000000..4dc781173 --- /dev/null +++ b/src/core/streaming/dicom/dicomDataLoader.ts @@ -0,0 +1,27 @@ +import { DataLoader, Fetcher } from '@/src/core/streaming/types'; +import { FILE_EXT_TO_MIME } from '@/src/io/mimeTypes'; +import { Maybe } from '@/src/types'; + +export class DicomDataLoader implements DataLoader { + public data: Maybe; + private fetcher: Fetcher; + + constructor(fetcher: Fetcher) { + this.fetcher = fetcher; + } + + async load() { + await this.fetcher.connect(); + const stream = await this.fetcher.getStream(); + + // consume the rest of the stream in order to cache the chunks + await stream.pipeTo(new WritableStream()); + this.data = new Blob(this.fetcher.cachedChunks, { + type: FILE_EXT_TO_MIME.dcm, + }); + } + + stop() { + this.fetcher.close(); + } +} diff --git a/src/core/streaming/dicom/dicomFileDataLoader.ts b/src/core/streaming/dicom/dicomFileDataLoader.ts new file mode 100644 index 000000000..f601c4e57 --- /dev/null +++ b/src/core/streaming/dicom/dicomFileDataLoader.ts @@ -0,0 +1,15 @@ +import { DataLoader } from '@/src/core/streaming/types'; + +export class DicomFileDataLoader implements DataLoader { + public data: Blob; + + constructor(data: Blob) { + this.data = data; + } + + // Data is provided, so load/stop does nothing. + // eslint-disable-next-line class-methods-use-this + load() {} + // eslint-disable-next-line class-methods-use-this + stop() {} +} diff --git a/src/core/streaming/dicom/dicomFileMetaLoader.ts b/src/core/streaming/dicom/dicomFileMetaLoader.ts new file mode 100644 index 000000000..692b7aa67 --- /dev/null +++ b/src/core/streaming/dicom/dicomFileMetaLoader.ts @@ -0,0 +1,22 @@ +import { ReadDicomTagsFunction } from '@/src/core/streaming/dicom/dicomMetaLoader'; +import { MetaLoader } from '@/src/core/streaming/types'; +import { Maybe } from '@/src/types'; + +export class DicomFileMetaLoader implements MetaLoader { + public tags: Maybe>; + private file: File; + + constructor(file: File, private readDicomTags: ReadDicomTagsFunction) { + this.file = file; + } + + async load() { + if (this.tags) return; + this.tags = await this.readDicomTags(this.file); + } + + // eslint-disable-next-line class-methods-use-this + stop() { + // do nothing + } +} diff --git a/src/core/streaming/dicom/dicomMetaLoader.ts b/src/core/streaming/dicom/dicomMetaLoader.ts new file mode 100644 index 000000000..524e815bb --- /dev/null +++ b/src/core/streaming/dicom/dicomMetaLoader.ts @@ -0,0 +1,66 @@ +import { createDicomParser } from '@/src/core/streaming/dicom/dicomParser'; +import { StopSignal } from '@/src/core/streaming/resumableFetcher'; +import { Fetcher, MetaLoader } from '@/src/core/streaming/types'; +import { FILE_EXT_TO_MIME } from '@/src/io/mimeTypes'; +import { Maybe } from '@/src/types'; +import { Awaitable } from '@vueuse/core'; + +export type ReadDicomTagsFunction = ( + file: File +) => Awaitable>; + +export class DicomMetaLoader implements MetaLoader { + private tags: Maybe>; + private fetcher: Fetcher; + private readDicomTags: ReadDicomTagsFunction; + + constructor(fetcher: Fetcher, readDicomTags: ReadDicomTagsFunction) { + this.fetcher = fetcher; + this.readDicomTags = readDicomTags; + } + + get meta() { + return this.tags; + } + + get metaBlob() { + return new Blob(this.fetcher.cachedChunks, { type: FILE_EXT_TO_MIME.dcm }); + } + + async load() { + if (this.tags) return; + + await this.fetcher.connect(); + const stream = this.fetcher.getStream(); + + const parse = createDicomParser(12); + + const sinkStream = new WritableStream({ + write: (chunk) => { + const result = parse(chunk); + if (result.done) { + this.fetcher.close(); + } + }, + }); + + try { + await stream.pipeTo(sinkStream, { + // ensure we use the fetcher's abort signal, + // otherwise a DOMException will be propagated + signal: this.fetcher.abortSignal, + }); + } catch (err) { + if (err !== StopSignal) { + throw err; + } + } + + const metadataFile = new File(this.fetcher.cachedChunks, 'file.dcm'); + this.tags = await this.readDicomTags(metadataFile); + } + + stop() { + this.fetcher.close(); + } +} diff --git a/src/core/streaming/dicom/dicomParser.ts b/src/core/streaming/dicom/dicomParser.ts new file mode 100644 index 000000000..cac63eaf2 --- /dev/null +++ b/src/core/streaming/dicom/dicomParser.ts @@ -0,0 +1,372 @@ +/* eslint-disable no-use-before-define */ +/* eslint-disable no-continue */ +import StreamingByteReader from '@/src/core/streaming/streamingByteReader.js'; +import { toAscii, asCoroutine } from '@/src/utils'; + +// [Group, Element] +type Tag = [number, number]; + +interface ParseOptions { + peek?: boolean; + skipValue?: boolean; + littleEndian?: boolean; + explicitVr?: boolean; +} + +// This is based on the DICOM standard as of 2023. All links may point to whatever is the current at the time of visiting. + +const UndefinedLength = 0xffffffff; +const ImplicitTransferSyntaxUID = '1.2.840.10008.1.2'; +const ExplicitVRBigEndianUID = '1.2.840.10008.1.2.2'; + +const Tags = { + TransferSyntaxUID: [0x0002, 0x0010] as Tag, + ItemDelimiter: [0xfffe, 0xe00d] as Tag, + Item: [0xfffe, 0xe000] as Tag, +}; + +// VR reference: https://dicom.nema.org/medical/dicom/current/output/chtml/part05/sect_6.2.html +// See: https://dicom.nema.org/medical/dicom/current/output/chtml/part05/chapter_7.html#sect_7.1.2 +// prettier-ignore +const ExplicitVrLegacyDataElementFormat = new Set(["AE", "AS", "AT", "CS", "DA", "DS", "DT", "FL", "FD", "IS", "LO", "LT", "PN", "SH", "SL", "SS", "ST", "TM", "UI", "UL", "US"]); +// prettier-ignore +const ExplicitVrCannotHaveUndefinedLength = new Set(["SV", "UC", "UR", "UV", "UT"]); + +const DEBUG = false; + +function debug(...args: any[]) { + if (DEBUG) console.log(...args); +} + +function equalsToTag(tag: Tag) { + return (group: number, element: number) => + group === tag[0] && element === tag[1]; +} + +function* readElementTag( + reader: StreamingByteReader, + { littleEndian = false } = {} +) { + const group = yield* reader.readUint16({ littleEndian }); + const element = yield* reader.readUint16({ littleEndian }); + return [group, element] as Tag; +} + +/** + * Peeks the data element group num and element num + * + * Assumes the reader is pointing to the start of a Data Element. + * @param {*} reader + * @param {*} opts + * @returns + */ +function* peekElementTag( + reader: StreamingByteReader, + { littleEndian = false } = {} +) { + const bytes = yield* reader.read(4, { peek: true }); + let group = 0; + let element = 0; + /* eslint-disable no-bitwise */ + if (littleEndian) { + group = bytes[0] | (bytes[1] << 8); + element = bytes[2] | (bytes[3] << 8); + } else { + group = bytes[1] | (bytes[0] << 8); + element = bytes[3] | (bytes[2] << 8); + } + /* eslint-enable no-bitwise */ + return [group, element] as Tag; +} + +/** + * Reads an element value. + * + * Assumes the reader is pointing to the first byte of an element's value. + * + * Does not return data for undefined lengths, but the reader pointer is advanced. + * @returns + */ +function* readElementValue( + reader: StreamingByteReader, + vr: string, + length: number, + { explicitVr = false, littleEndian = false, skip = false } = {} +): Generator { + if (length !== UndefinedLength) { + if (skip) { + yield* reader.seek(length); + return undefined; + } + return yield* reader.read(length); + } + + if (vr === 'SQ') { + yield* skipSequenceValue(reader, { explicitVr, littleEndian }); + } else { + // "Otherwise, the Value Field has an Undefined Length and a Sequence Delimitation Item marks the end of the Value Field." + // https://dicom.nema.org/medical/dicom/current/output/chtml/part05/chapter_7.html#sect_7.1.2 + yield* seekUntilSequenceDelimitationItem(reader, { littleEndian }); + } + return undefined; +} + +/** + * Reads the data element at point. + * + * Does not parse the value according to VR or transfer syntax. + * + * Does not return the element value if the length is undefined. + * + * Assumes the reader is pointing to the start of a Data Element. + * https://dicom.nema.org/medical/dicom/current/output/chtml/part05/chapter_7.html#sect_7.1.1 + * @param {*} reader + */ +function* readDataElement( + reader: StreamingByteReader, + { littleEndian = false, explicitVr = false, skipValue = false } = {} +) { + // read tag + const [group, element] = yield* readElementTag(reader, { littleEndian }); + let length = 0; + let vr: string = ''; + let data: Uint8Array | undefined; + + if (explicitVr) { + // read VR + vr = yield* reader.readAscii(2); + if (ExplicitVrLegacyDataElementFormat.has(vr)) { + length = yield* reader.readUint16({ littleEndian }); + } else { + // skip reserved bytes + yield* reader.seek(2); + length = yield* reader.readUint32({ littleEndian }); + } + + if ( + length === UndefinedLength && + ExplicitVrCannotHaveUndefinedLength.has(vr) + ) { + console.warn(`Invalid DICOM. VR ${vr} may not have undefined length`); + } + + debug( + 'readDataElement explicitVr', + group.toString(16), + element.toString(16), + 'length', + length + ); + data = yield* readElementValue(reader, vr, length, { + littleEndian, + explicitVr, + skip: skipValue, + }); + } else { + length = yield* reader.readUint32({ littleEndian }); + debug( + 'readDataElement implicitVr', + group.toString(16), + element.toString(16), + 'length', + length + ); + data = yield* readElementValue(reader, vr, length, { + littleEndian, + explicitVr, + skip: skipValue, + }); + } + + return { group, element, vr, length, data }; +} + +/** + * Stops the reader at the beginning of the data element that satisfies untilFn. + * + * Assumes the reader is pointing to the start of a Data Element. + * + * untilFn is passed (group, element) + */ +function* skipDataElementsUntil( + reader: StreamingByteReader, + untilFn: (group: number, element: number) => boolean, + opts: ParseOptions +) { + while (true) { + const [group, element] = yield* peekElementTag(reader, opts); + if (untilFn(group, element)) { + return; + } + debug( + '-- found', + group.toString(16), + element.toString(16), + 'at', + `${reader.position} (0x${reader.position.toString(16)})` + ); + const el = yield* readDataElement(reader, { ...opts, skipValue: true }); + debug( + '-- skipped', + group.toString(16), + element.toString(16), + el, + 'at', + `${reader.position} (0x${reader.position.toString(16)})` + ); + } +} + +/** + * The reader must be pointing to the start of the sequence's data, namely the first item or the sequence delimitation item. + * + * See: https://dicom.nema.org/medical/dicom/current/output/chtml/part05/sect_7.5.html + * and: https://dicom.nema.org/medical/dicom/current/output/chtml/part05/sect_7.5.2.html + */ +function* skipSequenceValue( + reader: StreamingByteReader, + { explicitVr = false, littleEndian = false } = {} +) { + while (true) { + const [group, element] = yield* readElementTag(reader, { littleEndian }); + // Item tag + // https://dicom.nema.org/medical/dicom/current/output/chtml/part05/sect_7.5.2.html#table_7.5-3 + if (equalsToTag(Tags.Item)(group, element)) { + const itemLength = yield* reader.readUint32({ littleEndian }); + if (itemLength !== UndefinedLength) { + yield* reader.seek(itemLength); + } else { + // read until item delimitation tag + yield* skipDataElementsUntil(reader, equalsToTag(Tags.ItemDelimiter), { + explicitVr, + littleEndian, + skipValue: true, + }); + // skip the item delimitation tag and the 4-byte zero length + yield* reader.seek(4 + 4); + } + continue; + } + + // Sequence Delimitation Item tag + if (group === 0xfffe && element === 0xe0dd) { + // skip the 4-byte zero length + yield* reader.seek(4); + return; + } + + throw new Error( + `skipSequenceValue: encountered unknown element: (${group.toString( + 16 + )},${element.toString(16)})` + ); + } +} + +function* seekUntilSequenceDelimitationItem( + reader: StreamingByteReader, + { littleEndian = false } = {} +) { + // The DICOM standard states that all data element values must have even length, + // so we can just read every uint16 until we hit a sequence delimitation item. + // Sequence Delimitation item VR is encoded as implicit VR and ignores the transfer syntax. + // Presumably this also means little endian. + // https://dicom.nema.org/medical/dicom/current/output/chtml/part05/sect_7.5.html + while (true) { + // Sequence Delimitation Item is (fffe,e0dd) + const group = yield* reader.readUint16({ littleEndian }); + if (group !== 0xfffe) continue; + const element = yield* reader.readUint16({ littleEndian }); + if (element !== 0xe0dd) continue; + // ignore the 4-byte zero length bit. + yield* reader.seek(4); + break; + } +} + +// Per Part 10: "File Meta Information shall be encoded using the Explicit VR Little Endian Transfer Syntax" +// Transfer syntaxes are negotiated between DICOM nodes. Additionally, the File Meta Info block contains the transfer syntax used to encode the Data Set. +// Assumption: we are using Explicit VR Little Endian. The default (Implicit VR Little Endian) is not widely used. +// Assumption: File Meta Elements follow the VRs listed here: https://dicom.nema.org/medical/dicom/current/output/chtml/part06/chapter_7.html +// Resources: +// data element layout: https://www.leadtools.com/help/sdk/v20/dicom/api/overview-data-element-structure.html +// transfer syntax info: https://pacsbootcamp.com/transfer-syntax +function* readFieldMetaInfo(reader: StreamingByteReader) { + // skip until we find Transfer Syntax UID + yield* skipDataElementsUntil(reader, equalsToTag(Tags.TransferSyntaxUID), { + explicitVr: true, + littleEndian: true, + }); + + const transferSyntaxUidElement = yield* readDataElement(reader, { + explicitVr: true, + littleEndian: true, + skipValue: false, + }); + + if (transferSyntaxUidElement.vr !== 'UI') + throw new Error('Transfer syntax UID element does not have a VR of UI'); + if (!transferSyntaxUidElement.data) + throw new Error('Did not get data for the transfer syntax UID element'); + + const transferSyntaxUid = toAscii(transferSyntaxUidElement.data, { + ignoreNulls: true, + }); + + // skip until end of file meta info group block, which is the first non 0x0002 grouped element. + // "Values of all Tags (0002,xxxx) are reserved for use by this Standard and later versions of DICOM." + // https://dicom.nema.org/medical/dicom/current/output/html/part10.html#table_7.1-1 + yield* skipDataElementsUntil(reader, (group) => group !== 0x0002, { + explicitVr: true, + littleEndian: true, + }); + + return { + transferSyntaxUid, + }; +} + +function* parseDicomUpToPixelData(readExtraSuffix = 0) { + const reader = new StreamingByteReader(); + + // preamble + yield* reader.seek(128); + + // prefix + const prefix = yield* reader.readAscii(4); + if (prefix !== 'DICM') { + throw new Error('Not DICOM'); + } + + const info = yield* readFieldMetaInfo(reader); + const explicitVr = info.transferSyntaxUid !== ImplicitTransferSyntaxUID; + const littleEndian = info.transferSyntaxUid !== ExplicitVRBigEndianUID; + + debug( + '-- parsed field meta info', + info, + 'at', + reader.position, + 'explicitVr', + explicitVr, + 'littleEndian', + littleEndian + ); + + yield* skipDataElementsUntil( + reader, + // (7fe0,0010): Pixel Data + (group, element) => group === 0x7fe0 && element === 0x0010, + { explicitVr, littleEndian } + ); + + yield* reader.seek(readExtraSuffix); + + return { + position: reader.position, + }; +} + +export const createDicomParser = (readExtraSuffix = 0) => { + return asCoroutine(parseDicomUpToPixelData(readExtraSuffix)); +}; diff --git a/src/core/streaming/types.ts b/src/core/streaming/types.ts index afd67495c..9fe61c5b8 100644 --- a/src/core/streaming/types.ts +++ b/src/core/streaming/types.ts @@ -1,3 +1,31 @@ +import { Maybe } from '@/src/types'; +import { Awaitable } from '@vueuse/core'; + +export type LoaderEvents = { + error: any; + done: any; +}; + +interface Loader { + load(): Awaitable; + stop(): Awaitable; +} + +/** + * A metadata loader. + */ +export interface MetaLoader extends Loader { + meta: Maybe>; + metaBlob: Maybe; +} + +/** + * A data loader. + */ +export interface DataLoader extends Loader { + data: Maybe; +} + /** * Init options for a Fetcher. */ diff --git a/src/io/import/processors/handleDicomStream.ts b/src/io/import/processors/handleDicomStream.ts new file mode 100644 index 000000000..444996e89 --- /dev/null +++ b/src/io/import/processors/handleDicomStream.ts @@ -0,0 +1,51 @@ +import { Chunk } from '@/src/core/streaming/chunk'; +import { DicomDataLoader } from '@/src/core/streaming/dicom/dicomDataLoader'; +import { + DicomMetaLoader, + ReadDicomTagsFunction, +} from '@/src/core/streaming/dicom/dicomMetaLoader'; +import { getRequestPool } from '@/src/core/streaming/requestPool'; +import { ResumableFetcher } from '@/src/core/streaming/resumableFetcher'; +import { ImportHandler } from '@/src/io/import/common'; +import { getWorker } from '@/src/io/itk/worker'; +import { FILE_EXT_TO_MIME } from '@/src/io/mimeTypes'; +import { readDicomTags } from '@itk-wasm/dicom'; + +const handleDicomStream: ImportHandler = async (dataSource, { done }) => { + const { fileSrc, uriSrc } = dataSource; + if (fileSrc || uriSrc?.mime !== FILE_EXT_TO_MIME.dcm) { + return dataSource; + } + + const fetcher = + uriSrc.fetcher ?? + new ResumableFetcher(uriSrc.uri, { + fetch: (...args) => getRequestPool().fetch(...args), + }); + + const readTags: ReadDicomTagsFunction = async (file) => { + const result = await readDicomTags(file, { webWorker: getWorker() }); + return result.tags; + }; + + const metaLoader = new DicomMetaLoader(fetcher, readTags); + const dataLoader = new DicomDataLoader(fetcher); + const chunk = new Chunk({ + metaLoader, + dataLoader, + }); + + await chunk.loadMeta(); + + return done({ + dataSource: { + ...dataSource, + chunkSrc: { + chunk, + mime: FILE_EXT_TO_MIME.dcm, + }, + }, + }); +}; + +export default handleDicomStream; From e54adc688f058e7bbc9348c71d31daa451c80878 Mon Sep 17 00:00:00 2001 From: Forrest Date: Thu, 23 May 2024 15:17:47 -0400 Subject: [PATCH 04/20] feat(streaming): add streaming integration --- src/actions/importDicomChunks.ts | 27 ++ src/components/PatientStudyVolumeBrowser.vue | 54 +--- src/components/SliceViewer.vue | 30 +- src/components/VolumeViewer.vue | 25 +- .../vtk/VtkBaseVolumeRepresentation.vue | 8 +- src/composables/useCurrentImage.ts | 6 +- src/composables/useSliceConfigInitializer.ts | 13 +- .../useVolumeColoringInitializer.ts | 6 +- .../useWindowingConfigInitializer.ts | 57 ++-- src/core/dicomTags.ts | 5 + ...er.spec.ts => cachedStreamFetcher.spec.ts} | 6 +- ...mableFetcher.ts => cachedStreamFetcher.ts} | 114 ++++--- src/core/streaming/chunk.ts | 69 ++-- src/core/streaming/chunkImage.ts | 15 + .../dicom/__tests__/dicomMetaLoader.spec.ts | 6 +- src/core/streaming/dicom/dicomDataLoader.ts | 10 +- .../streaming/dicom/dicomFileMetaLoader.ts | 8 + src/core/streaming/dicom/dicomMetaLoader.ts | 2 +- src/core/streaming/dicomChunkImage.ts | 302 ++++++++++++++++++ src/io/import/dataSource.ts | 22 ++ src/io/import/importDataSources.ts | 75 ++--- src/io/import/processors/handleDicomFile.ts | 39 ++- src/io/import/processors/handleDicomStream.ts | 4 +- src/io/import/processors/openUriStream.ts | 4 +- src/io/import/processors/updateUriType.ts | 1 + src/io/itk/worker.ts | 5 +- src/store/chunks.ts | 13 + src/store/datasets.ts | 9 - .../__tests__/allocateImageFromChunks.spec.ts | 13 + src/utils/allocateImageFromChunks.ts | 146 +++++++++ src/utils/dataSelection.ts | 13 +- 31 files changed, 844 insertions(+), 263 deletions(-) create mode 100644 src/actions/importDicomChunks.ts rename src/core/streaming/__tests__/{resumableFetcher.spec.ts => cachedStreamFetcher.spec.ts} (90%) rename src/core/streaming/{resumableFetcher.ts => cachedStreamFetcher.ts} (65%) create mode 100644 src/core/streaming/chunkImage.ts create mode 100644 src/core/streaming/dicomChunkImage.ts create mode 100644 src/store/chunks.ts create mode 100644 src/utils/__tests__/allocateImageFromChunks.spec.ts create mode 100644 src/utils/allocateImageFromChunks.ts diff --git a/src/actions/importDicomChunks.ts b/src/actions/importDicomChunks.ts new file mode 100644 index 000000000..43d659f76 --- /dev/null +++ b/src/actions/importDicomChunks.ts @@ -0,0 +1,27 @@ +import { Chunk } from '@/src/core/streaming/chunk'; +import DicomChunkImage from '@/src/core/streaming/dicomChunkImage'; +import { splitAndSort } from '@/src/io/dicom'; +import useChunkStore from '@/src/store/chunks'; + +export async function importDicomChunks(chunks: Chunk[]) { + // split into groups + const chunksByVolume = await splitAndSort(chunks, (chunk) => chunk.metaBlob!); + + // add to matching DICOM images + const chunkStore = useChunkStore(); + await Promise.all( + Object.entries(chunksByVolume).map(async ([id, groupedChunks]) => { + const image = + (chunkStore.chunkImageById[id] as DicomChunkImage) ?? + new DicomChunkImage(); + chunkStore.chunkImageById[id] = image; + + await image.addChunks(groupedChunks); + + // TODO(fli) REMOVE to be on-demand when the dataset is being viewed + image.startLoad(); + }) + ); + + return Object.keys(chunksByVolume); +} diff --git a/src/components/PatientStudyVolumeBrowser.vue b/src/components/PatientStudyVolumeBrowser.vue index ffed6b9bc..027f25f38 100644 --- a/src/components/PatientStudyVolumeBrowser.vue +++ b/src/components/PatientStudyVolumeBrowser.vue @@ -1,9 +1,10 @@