Skip to content

Commit

Permalink
feat(streams): add ByteSliceStream (#2795)
Browse files Browse the repository at this point in the history
  • Loading branch information
iuioiua authored Nov 3, 2022
1 parent 1470f8a commit b25e580
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 1 deletion.
41 changes: 41 additions & 0 deletions streams/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,44 @@ export class LimitedTransformStream<T> extends TransformStream<T, T> {
});
}
}

/**
* A transform stream that only transforms from the zero-indexed `start` and `end` bytes (both inclusive).
*
* @example
* ```ts
* import { ByteSliceStream } from "https://deno.land/std@$STD_VERSION/streams/buffer.ts";
* const response = await fetch("https://example.com");
* const rangedStream = response.body!
* .pipeThrough(new ByteSliceStream(3, 8));
* ```
*/
export class ByteSliceStream extends TransformStream<Uint8Array, Uint8Array> {
#offsetStart = 0;
#offsetEnd = 0;

constructor(start = 0, end = Infinity) {
super({
start: () => {
assert(start >= 0, "`start` must be greater than 0");
end += 1;
},
transform: (chunk, controller) => {
this.#offsetStart = this.#offsetEnd;
this.#offsetEnd += chunk.byteLength;
if (this.#offsetEnd > start) {
if (this.#offsetStart < start) {
chunk = chunk.slice(start - this.#offsetStart);
}
if (this.#offsetEnd >= end) {
chunk = chunk.slice(0, chunk.byteLength - this.#offsetEnd + end);
controller.enqueue(chunk);
controller.terminate();
} else {
controller.enqueue(chunk);
}
}
},
});
}
}
84 changes: 83 additions & 1 deletion streams/buffer_test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.

import { assert, assertEquals, assertRejects } from "../testing/asserts.ts";
import {
assert,
assertEquals,
assertRejects,
assertThrows,
} from "../testing/asserts.ts";
import {
Buffer,
ByteSliceStream,
LimitedBytesTransformStream,
LimitedTransformStream,
} from "./buffer.ts";
Expand Down Expand Up @@ -128,3 +134,79 @@ Deno.test("[streams] LimitedTransformStream error", async function () {
}
}, RangeError);
});

Deno.test("[streams] ByteSliceStream", async function () {
function createStream(start = 0, end = Infinity) {
return new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([0, 1]));
controller.enqueue(new Uint8Array([2, 3]));
controller.close();
},
}).pipeThrough(new ByteSliceStream(start, end));
}

let chunks = [];

for await (const chunk of createStream(0, 3)) {
chunks.push(chunk);
}
assertEquals(chunks, [
new Uint8Array([0, 1]),
new Uint8Array([2, 3]),
]);

chunks = [];
for await (const chunk of createStream(0, 1)) {
chunks.push(chunk);
}
assertEquals(chunks, [
new Uint8Array([0, 1]),
]);

chunks = [];
for await (const chunk of createStream(0, 2)) {
chunks.push(chunk);
}
assertEquals(chunks, [
new Uint8Array([0, 1]),
new Uint8Array([2]),
]);

chunks = [];
for await (const chunk of createStream(0, 3)) {
chunks.push(chunk);
}
assertEquals(chunks, [
new Uint8Array([0, 1]),
new Uint8Array([2, 3]),
]);

chunks = [];
for await (const chunk of createStream(1, 3)) {
chunks.push(chunk);
}
assertEquals(chunks, [
new Uint8Array([1]),
new Uint8Array([2, 3]),
]);

chunks = [];
for await (const chunk of createStream(2, 3)) {
chunks.push(chunk);
}
assertEquals(chunks, [
new Uint8Array([2, 3]),
]);

chunks = [];
for await (const chunk of createStream(0, 10)) {
chunks.push(chunk);
}
assertEquals(chunks, [
new Uint8Array([0, 1]),
new Uint8Array([2, 3]),
]);

assertThrows(() => createStream(-1, Infinity));
});

0 comments on commit b25e580

Please sign in to comment.