Skip to content

Commit

Permalink
stream: fix code style
Browse files Browse the repository at this point in the history
PR-URL: #51168
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
MattiasBuelens authored Dec 26, 2023
1 parent 89ddc98 commit 20c6313
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 282 deletions.
182 changes: 88 additions & 94 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const {
FunctionPrototypeCall,
MathMin,
NumberIsInteger,
ObjectCreate,
ObjectDefineProperties,
ObjectSetPrototypeOf,
Promise,
Expand Down Expand Up @@ -95,9 +94,9 @@ const {
AsyncIterator,
cloneAsUint8Array,
copyArrayBuffer,
createPromiseCallback,
customInspect,
dequeueValue,
ensureIsPromise,
enqueueValueWithSize,
extractHighWaterMark,
extractSizeAlgorithm,
Expand Down Expand Up @@ -251,19 +250,7 @@ class ReadableStream {
markTransferMode(this, false, true);
if (source === null)
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
this[kState] = {
disturbed: false,
reader: undefined,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};
this[kState] = createReadableStreamState();

this[kIsClosedPromise] = createDeferredPromise();
this[kControllerErrorFunction] = () => {};
Expand Down Expand Up @@ -649,19 +636,7 @@ ObjectDefineProperties(ReadableStream, {
function InternalTransferredReadableStream() {
markTransferMode(this, false, true);
this[kType] = 'ReadableStream';
this[kState] = {
disturbed: false,
reader: undefined,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};
this[kState] = createReadableStreamState();

this[kIsClosedPromise] = createDeferredPromise();
}
Expand Down Expand Up @@ -1230,41 +1205,58 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name),
});

function TeeReadableStream(start, pull, cancel) {
function InternalReadableStream(start, pull, cancel, highWaterMark, size) {
markTransferMode(this, false, true);
this[kType] = 'ReadableStream';
this[kState] = {
disturbed: false,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port: undefined,
promise: undefined,
},
};
this[kState] = createReadableStreamState();
this[kIsClosedPromise] = createDeferredPromise();
const controller = new ReadableStreamDefaultController(kSkipThrow);
setupReadableStreamDefaultController(
this,
controller,
start,
pull,
cancel,
highWaterMark,
size);
}

ObjectSetPrototypeOf(InternalReadableStream.prototype, ReadableStream.prototype);
ObjectSetPrototypeOf(InternalReadableStream, ReadableStream);

function createReadableStream(start, pull, cancel, highWaterMark = 1, size = () => 1) {
const stream = new InternalReadableStream(start, pull, cancel, highWaterMark, size);

// For spec compliance the InternalReadableStream must be a ReadableStream
stream.constructor = ReadableStream;
return stream;
}

function InternalReadableByteStream(start, pull, cancel) {
markTransferMode(this, false, true);
this[kType] = 'ReadableStream';
this[kState] = createReadableStreamState();
this[kIsClosedPromise] = createDeferredPromise();
setupReadableStreamDefaultControllerFromSource(
const controller = new ReadableByteStreamController(kSkipThrow);
setupReadableByteStreamController(
this,
ObjectCreate(null, {
start: { __proto__: null, value: start },
pull: { __proto__: null, value: pull },
cancel: { __proto__: null, value: cancel },
}),
1,
() => 1);
controller,
start,
pull,
cancel,
0,
undefined);
}

ObjectSetPrototypeOf(TeeReadableStream.prototype, ReadableStream.prototype);
ObjectSetPrototypeOf(TeeReadableStream, ReadableStream);
ObjectSetPrototypeOf(InternalReadableByteStream.prototype, ReadableStream.prototype);
ObjectSetPrototypeOf(InternalReadableByteStream, ReadableStream);

function createTeeReadableStream(start, pull, cancel) {
const tee = new TeeReadableStream(start, pull, cancel);
function createReadableByteStream(start, pull, cancel) {
const stream = new InternalReadableByteStream(start, pull, cancel);

// For spec compliance the Tee must be a ReadableStream
tee.constructor = ReadableStream;
return tee;
// For spec compliance the InternalReadableByteStream must be a ReadableStream
stream.constructor = ReadableStream;
return stream;
}

const isReadableStream =
Expand All @@ -1280,6 +1272,23 @@ const isReadableStreamBYOBReader =

// ---- ReadableStream Implementation

function createReadableStreamState() {
return {
__proto__: null,
disturbed: false,
reader: undefined,
state: 'readable',
storedError: undefined,
transfer: {
__proto__: null,
writable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};
}

function readableStreamFromIterable(iterable) {
let stream;
const iteratorRecord = getIterator(iterable, 'async');
Expand Down Expand Up @@ -1319,16 +1328,12 @@ function readableStreamFromIterable(iterable) {
});
}

stream = new ReadableStream({
start: startAlgorithm,
pull: pullAlgorithm,
cancel: cancelAlgorithm,
}, {
size() {
return 1;
},
highWaterMark: 0,
});
stream = createReadableStream(
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
0,
);

return stream;
}
Expand Down Expand Up @@ -1654,9 +1659,9 @@ function readableStreamDefaultTee(stream, cloneForBranch2) {
}

branch1 =
createTeeReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm);
createReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm);
branch2 =
createTeeReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm);
createReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm);

PromisePrototypeThen(
reader[kState].close.promise,
Expand Down Expand Up @@ -1933,16 +1938,10 @@ function readableByteStreamTee(stream) {
return cancelDeferred.promise;
}

branch1 = new ReadableStream({
type: 'bytes',
pull: pull1Algorithm,
cancel: cancel1Algorithm,
});
branch2 = new ReadableStream({
type: 'bytes',
pull: pull2Algorithm,
cancel: cancel2Algorithm,
});
branch1 =
createReadableByteStream(nonOpStart, pull1Algorithm, cancel1Algorithm);
branch2 =
createReadableByteStream(nonOpStart, pull2Algorithm, cancel2Algorithm);

forwardReaderError(reader);

Expand Down Expand Up @@ -1993,10 +1992,7 @@ function readableStreamCancel(stream, reason) {
}

return PromisePrototypeThen(
ensureIsPromise(
stream[kState].controller[kCancel],
stream[kState].controller,
reason),
stream[kState].controller[kCancel](reason),
() => {});
}

Expand Down Expand Up @@ -2361,7 +2357,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) {
assert(!controller[kState].pullAgain);
controller[kState].pulling = true;
PromisePrototypeThen(
ensureIsPromise(controller[kState].pullAlgorithm, controller),
controller[kState].pullAlgorithm(controller),
() => {
controller[kState].pulling = false;
if (controller[kState].pullAgain) {
Expand Down Expand Up @@ -2391,12 +2387,9 @@ function readableStreamDefaultControllerError(controller, error) {

function readableStreamDefaultControllerCancelSteps(controller, reason) {
resetQueue(controller);
try {
const result = controller[kState].cancelAlgorithm(reason);
return result;
} finally {
readableStreamDefaultControllerClearAlgorithms(controller);
}
const result = controller[kState].cancelAlgorithm(reason);
readableStreamDefaultControllerClearAlgorithms(controller);
return result;
}

function readableStreamDefaultControllerPullSteps(controller, readRequest) {
Expand Down Expand Up @@ -2470,11 +2463,10 @@ function setupReadableStreamDefaultControllerFromSource(
FunctionPrototypeBind(start, source, controller) :
nonOpStart;
const pullAlgorithm = pull ?
FunctionPrototypeBind(pull, source, controller) :
createPromiseCallback('source.pull', pull, source) :
nonOpPull;

const cancelAlgorithm = cancel ?
FunctionPrototypeBind(cancel, source) :
createPromiseCallback('source.cancel', cancel, source) :
nonOpCancel;

setupReadableStreamDefaultController(
Expand Down Expand Up @@ -3102,7 +3094,7 @@ function readableByteStreamControllerCallPullIfNeeded(controller) {
assert(!controller[kState].pullAgain);
controller[kState].pulling = true;
PromisePrototypeThen(
ensureIsPromise(controller[kState].pullAlgorithm, controller),
controller[kState].pullAlgorithm(controller),
() => {
controller[kState].pulling = false;
if (controller[kState].pullAgain) {
Expand Down Expand Up @@ -3269,10 +3261,10 @@ function setupReadableByteStreamControllerFromSource(
FunctionPrototypeBind(start, source, controller) :
nonOpStart;
const pullAlgorithm = pull ?
FunctionPrototypeBind(pull, source, controller) :
createPromiseCallback('source.pull', pull, source, controller) :
nonOpPull;
const cancelAlgorithm = cancel ?
FunctionPrototypeBind(cancel, source) :
createPromiseCallback('source.cancel', cancel, source) :
nonOpCancel;

if (autoAllocateChunkSize === 0) {
Expand Down Expand Up @@ -3369,4 +3361,6 @@ module.exports = {
readableByteStreamControllerPullSteps,
setupReadableByteStreamController,
setupReadableByteStreamControllerFromSource,
createReadableStream,
createReadableByteStream,
};
Loading

0 comments on commit 20c6313

Please sign in to comment.