Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streams): ByteSliceStream #2795

Merged
merged 10 commits into from
Nov 3, 2022
40 changes: 40 additions & 0 deletions streams/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,43 @@ export class LimitedTransformStream<T> extends TransformStream<T, T> {
});
}
}

/**
* A transform stream that only transforms from the zero-indexed `start` and `end` bytes (both inclusive).
*
* @example
* ```ts
* import { RangedByteTransformStream } from "https://deno.land/std@$STD_VERSION/streams/buffer.ts";
* const response = await fetch("https://example.com");
* const rangedStream = response.body!
* .pipeThrough(new RangedByteTransformStream(3, 8));
* ```
*/
export class RangedByteTransformStream
extends TransformStream<Uint8Array, Uint8Array> {
#offset = 0;

constructor(start: number, end = Infinity) {
iuioiua marked this conversation as resolved.
Show resolved Hide resolved
super({
start: () => {
end += 1;
},
transform: (chunk, controller) => {
this.#offset += chunk.byteLength;
if (this.#offset >= start + 1) {
iuioiua marked this conversation as resolved.
Show resolved Hide resolved
if (start - (this.#offset - chunk.byteLength) >= 0) {
iuioiua marked this conversation as resolved.
Show resolved Hide resolved
chunk = chunk.slice(start - (this.#offset - chunk.byteLength));
}

if (end <= this.#offset) {
chunk = chunk.slice(0, chunk.byteLength - (this.#offset - end));
controller.enqueue(chunk);
controller.terminate();
}

controller.enqueue(chunk);
iuioiua marked this conversation as resolved.
Show resolved Hide resolved
}
},
});
}
}
26 changes: 26 additions & 0 deletions streams/buffer_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
Buffer,
LimitedBytesTransformStream,
LimitedTransformStream,
RangedByteTransformStream,
} from "./buffer.ts";

Deno.test("[streams] Buffer Write & Read", async function () {
Expand Down Expand Up @@ -128,3 +129,28 @@ Deno.test("[streams] LimitedTransformStream error", async function () {
}
}, RangeError);
});

Deno.test("[streams] RangedByteTransformStream", async function () {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.enqueue(new Uint8Array([4, 5, 6]));
controller.enqueue(new Uint8Array([7, 8, 9]));
controller.enqueue(new Uint8Array([10, 11, 12]));
controller.enqueue(new Uint8Array([13, 14, 15]));
controller.enqueue(new Uint8Array([16, 17, 18]));
controller.close();
},
}).pipeThrough(new RangedByteTransformStream(2, 10));

const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
assertEquals(chunks, [
new Uint8Array([3]),
new Uint8Array([4, 5, 6]),
new Uint8Array([7, 8, 9]),
new Uint8Array([10, 11]),
]);
});