Skip to content

Commit

Permalink
Add ReadableStream (#3)
Browse files Browse the repository at this point in the history
Co-authored-by: Kasper Isager Dalsgarð <[email protected]>
  • Loading branch information
yassernasc and kasperisager authored Nov 21, 2024
1 parent 4d3bcbf commit 8181a02
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 4 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
"exports": {
".": "./index.js",
"./package": "./package.json",
"./promises": "./promises.js"
"./promises": "./promises.js",
"./web": "./web.js"
},
"files": [
"index.js",
"promises.js"
],
"scripts": {
"test": "prettier . --check && bare test.js"
"test": "prettier . --check && bare test/all.js"
},
"repository": {
"type": "git",
Expand Down
2 changes: 2 additions & 0 deletions test/all.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
require('./basic')
require('./web')
4 changes: 2 additions & 2 deletions test.js → test/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ const {
Transform,
PassThrough,
finished
} = require('.')
} = require('..')

test('default export', (t) => {
t.is(require('.'), Stream)
t.is(require('..'), Stream)
})

test('readable', (t) => {
Expand Down
86 changes: 86 additions & 0 deletions test/web.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
const test = require('brittle')
const { ReadableStream } = require('../web')

test('basic', async (t) => {
t.plan(1)

const read = []

const stream = new ReadableStream({
start(controller) {
controller.enqueue(1)
controller.enqueue(2)
controller.enqueue(3)
controller.close()
}
})

for await (const value of stream) {
read.push(value)
}

t.alike(read, [1, 2, 3])
})

test('error', async (t) => {
t.plan(1)

const stream = new ReadableStream({
start(controller) {
controller.error('boom!')
}
})

t.exception(async () => {
for await (const value of stream) {
}
}, 'boom!')
})

test('cancel', async (t) => {
t.plan(1)

const stream = new ReadableStream()

await stream.cancel()

t.pass()
})

test('from', async (t) => {
async function* asyncIterator() {
yield 1
yield 2
yield 3
}

const stream = ReadableStream.from(asyncIterator())

t.ok(stream instanceof ReadableStream)

const read = []

for await (const value of stream) {
read.push(value)
}

t.alike(read, [1, 2, 3])
})

test('reader', async (t) => {
t.plan(3)

const stream = new ReadableStream({
start(controller) {
controller.enqueue(1)
controller.enqueue(2)
controller.close()
}
})

const reader = stream.getReader()

t.alike(await reader.read(), { value: 1, done: false })
t.alike(await reader.read(), { value: 2, done: false })
t.alike(await reader.read(), { value: undefined, done: true })
})
134 changes: 134 additions & 0 deletions web.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
const { Readable, getStreamError } = require('streamx')

class ReadableStreamReader {
constructor(stream) {
this._stream = stream
}

read() {
const stream = this._stream

return new Promise((resolve, reject) => {
const err = getStreamError(stream)

if (err) return reject(err)

if (stream.destroyed) {
return resolve({ value: undefined, done: true })
}

const value = stream.read()

if (value !== null) {
return resolve({ value, done: false })
}

stream
.once('readable', onreadable)
.once('close', onclose)
.once('error', onerror)

function onreadable() {
const value = stream.read()

ondone(
null,
value === null
? { value: undefined, done: true }
: { value, done: false }
)
}

function onclose() {
ondone(null, { value: undefined, done: true })
}

function onerror(err) {
ondone(err, null)
}

function ondone(err, value) {
stream
.off('readable', onreadable)
.off('close', onclose)
.off('error', onerror)

if (err) reject(err)
else resolve(value)
}
})
}

cancel(reason) {
if (this._stream.destroyed) return Promise.resolve()

return new Promise((resolve) =>
this._stream.once('close', resolve).destroy(reason)
)
}
}

class ReadableStreamController {
constructor(stream) {
this._stream = stream
}

enqueue(data) {
this._stream.push(data)
}

close() {
this._stream.push(null)
}

error(err) {
this._stream.destroy(err)
}
}

exports.ReadableStream = class ReadableStream {
constructor(
underlyingSource = {},
queuingStrategy = {},
stream = new Readable()
) {
const { start } = underlyingSource

this._stream = stream
this._controller = new ReadableStreamController(this._stream)

if (start) this._start = start.bind(this)

this._start(this._controller)
}

_start(controller) {}

getReader() {
return new ReadableStreamReader(this._stream)
}

cancel(reason) {
if (this._stream.destroyed) return Promise.resolve()

return new Promise((resolve) =>
this._stream.once('close', resolve).destroy(reason)
)
}

pipeTo(destination) {
return new Promise((resolve, reject) =>
this._stream.pipe(destination, (err) => {
err ? reject(err) : resolve()
})
)
}

[Symbol.asyncIterator]() {
return this._stream[Symbol.asyncIterator]()
}

static from(iterable) {
return new ReadableStream(undefined, undefined, Readable.from(iterable))
}
}

0 comments on commit 8181a02

Please sign in to comment.