diff --git a/package.json b/package.json index 10c862e..bb31d75 100644 --- a/package.json +++ b/package.json @@ -5,14 +5,15 @@ "exports": { ".": "./index.js", "./package": "./package.json", - "./promises": "./promises.js" + "./promises": "./promises.js", + "./web": "./web.js" }, "files": [ "index.js", "promises.js" ], "scripts": { - "test": "prettier . --check && bare test.js" + "test": "prettier . --check && bare test/all.js" }, "repository": { "type": "git", diff --git a/test/all.js b/test/all.js new file mode 100644 index 0000000..1075a78 --- /dev/null +++ b/test/all.js @@ -0,0 +1,2 @@ +require('./basic') +require('./web') diff --git a/test.js b/test/basic.js similarity index 99% rename from test.js rename to test/basic.js index 0504459..7b23152 100644 --- a/test.js +++ b/test/basic.js @@ -7,10 +7,10 @@ const { Transform, PassThrough, finished -} = require('.') +} = require('..') test('default export', (t) => { - t.is(require('.'), Stream) + t.is(require('..'), Stream) }) test('readable', (t) => { diff --git a/test/web.js b/test/web.js new file mode 100644 index 0000000..29f8ef5 --- /dev/null +++ b/test/web.js @@ -0,0 +1,86 @@ +const test = require('brittle') +const { ReadableStream } = require('../web') + +test('basic', async (t) => { + t.plan(1) + + const read = [] + + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(1) + controller.enqueue(2) + controller.enqueue(3) + controller.close() + } + }) + + for await (const value of stream) { + read.push(value) + } + + t.alike(read, [1, 2, 3]) +}) + +test('error', async (t) => { + t.plan(1) + + const stream = new ReadableStream({ + start(controller) { + controller.error('boom!') + } + }) + + t.exception(async () => { + for await (const value of stream) { + } + }, 'boom!') +}) + +test('cancel', async (t) => { + t.plan(1) + + const stream = new ReadableStream() + + await stream.cancel() + + t.pass() +}) + +test('from', async (t) => { + async function* asyncIterator() { + yield 1 + yield 2 + yield 3 + } + + const stream = ReadableStream.from(asyncIterator()) + + t.ok(stream instanceof ReadableStream) + + const read = [] + + for await (const value of stream) { + read.push(value) + } + + t.alike(read, [1, 2, 3]) +}) + +test('reader', async (t) => { + t.plan(3) + + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(1) + controller.enqueue(2) + controller.close() + } + }) + + const reader = stream.getReader() + + t.alike(await reader.read(), { value: 1, done: false }) + t.alike(await reader.read(), { value: 2, done: false }) + t.alike(await reader.read(), { value: undefined, done: true }) +}) diff --git a/web.js b/web.js new file mode 100644 index 0000000..a747e4a --- /dev/null +++ b/web.js @@ -0,0 +1,134 @@ +const { Readable, getStreamError } = require('streamx') + +class ReadableStreamReader { + constructor(stream) { + this._stream = stream + } + + read() { + const stream = this._stream + + return new Promise((resolve, reject) => { + const err = getStreamError(stream) + + if (err) return reject(err) + + if (stream.destroyed) { + return resolve({ value: undefined, done: true }) + } + + const value = stream.read() + + if (value !== null) { + return resolve({ value, done: false }) + } + + stream + .once('readable', onreadable) + .once('close', onclose) + .once('error', onerror) + + function onreadable() { + const value = stream.read() + + ondone( + null, + value === null + ? { value: undefined, done: true } + : { value, done: false } + ) + } + + function onclose() { + ondone(null, { value: undefined, done: true }) + } + + function onerror(err) { + ondone(err, null) + } + + function ondone(err, value) { + stream + .off('readable', onreadable) + .off('close', onclose) + .off('error', onerror) + + if (err) reject(err) + else resolve(value) + } + }) + } + + cancel(reason) { + if (this._stream.destroyed) return Promise.resolve() + + return new Promise((resolve) => + this._stream.once('close', resolve).destroy(reason) + ) + } +} + +class ReadableStreamController { + constructor(stream) { + this._stream = stream + } + + enqueue(data) { + this._stream.push(data) + } + + close() { + this._stream.push(null) + } + + error(err) { + this._stream.destroy(err) + } +} + +exports.ReadableStream = class ReadableStream { + constructor( + underlyingSource = {}, + queuingStrategy = {}, + stream = new Readable() + ) { + const { start } = underlyingSource + + this._stream = stream + this._controller = new ReadableStreamController(this._stream) + + if (start) this._start = start.bind(this) + + this._start(this._controller) + } + + _start(controller) {} + + getReader() { + return new ReadableStreamReader(this._stream) + } + + cancel(reason) { + if (this._stream.destroyed) return Promise.resolve() + + return new Promise((resolve) => + this._stream.once('close', resolve).destroy(reason) + ) + } + + pipeTo(destination) { + return new Promise((resolve, reject) => + this._stream.pipe(destination, (err) => { + err ? reject(err) : resolve() + }) + ) + } + + [Symbol.asyncIterator]() { + return this._stream[Symbol.asyncIterator]() + } + + static from(iterable) { + return new ReadableStream(undefined, undefined, Readable.from(iterable)) + } +}