Skip to content

Commit

Permalink
feat(fetch): accept async iterables for body (#24623)
Browse files Browse the repository at this point in the history
Implements whatwg/webidl#1397
Fixes #21454 
Closes #24849
  • Loading branch information
crowlKats authored Aug 6, 2024
1 parent f0cd2c4 commit ba40347
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 51 deletions.
13 changes: 13 additions & 0 deletions ext/fetch/22_body.js
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ function extractBody(object) {
if (object.locked || isReadableStreamDisturbed(object)) {
throw new TypeError("ReadableStream is locked or disturbed");
}
} else if (object[webidl.AsyncIterable] === webidl.AsyncIterable) {
stream = ReadableStream.from(object.open());
}
if (typeof source === "string") {
// WARNING: this deviates from spec (expects length to be set)
Expand All @@ -475,6 +477,9 @@ function extractBody(object) {
return { body, contentType };
}

webidl.converters["async iterable<Uint8Array>"] = webidl
.createAsyncIterableConverter(webidl.converters.Uint8Array);

webidl.converters["BodyInit_DOMString"] = (V, prefix, context, opts) => {
// Union for (ReadableStream or Blob or ArrayBufferView or ArrayBuffer or FormData or URLSearchParams or USVString)
if (ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, V)) {
Expand All @@ -493,6 +498,14 @@ webidl.converters["BodyInit_DOMString"] = (V, prefix, context, opts) => {
if (ArrayBufferIsView(V)) {
return webidl.converters["ArrayBufferView"](V, prefix, context, opts);
}
if (webidl.isAsyncIterator(V)) {
return webidl.converters["async iterable<Uint8Array>"](
V,
prefix,
context,
opts,
);
}
}
// BodyInit conversion is passed to extractBody(), which calls core.encode().
// core.encode() will UTF-8 encode strings with replacement, being equivalent to the USV normalization.
Expand Down
1 change: 1 addition & 0 deletions ext/fetch/lib.deno_fetch.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ declare type BodyInit =
| FormData
| URLSearchParams
| ReadableStream<Uint8Array>
| AsyncIterable<Uint8Array>
| string;
/** @category Fetch */
declare type RequestDestination =
Expand Down
63 changes: 15 additions & 48 deletions ext/web/06_streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ const {
String,
Symbol,
SymbolAsyncIterator,
SymbolIterator,
SymbolFor,
TypeError,
TypedArrayPrototypeGetBuffer,
Expand Down Expand Up @@ -5083,34 +5082,6 @@ function initializeCountSizeFunction(globalObject) {
WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
}

// Ref: https://tc39.es/ecma262/#sec-getiterator
function getAsyncOrSyncIterator(obj) {
let iterator;
if (obj[SymbolAsyncIterator] != null) {
iterator = obj[SymbolAsyncIterator]();
if (!isObject(iterator)) {
throw new TypeError(
"[Symbol.asyncIterator] returned a non-object value",
);
}
} else if (obj[SymbolIterator] != null) {
iterator = obj[SymbolIterator]();
if (!isObject(iterator)) {
throw new TypeError("[Symbol.iterator] returned a non-object value");
}
} else {
throw new TypeError("No iterator found");
}
if (typeof iterator.next !== "function") {
throw new TypeError("iterator.next is not a function");
}
return iterator;
}

function isObject(x) {
return (typeof x === "object" && x != null) || typeof x === "function";
}

const _resourceBacking = Symbol("[[resourceBacking]]");
// This distinction exists to prevent unrefable streams being used in
// regular fast streams that are unaware of refability
Expand Down Expand Up @@ -5196,21 +5167,22 @@ class ReadableStream {
}

static from(asyncIterable) {
const prefix = "Failed to execute 'ReadableStream.from'";
webidl.requiredArguments(
arguments.length,
1,
"Failed to execute 'ReadableStream.from'",
prefix,
);
asyncIterable = webidl.converters.any(asyncIterable);

const iterator = getAsyncOrSyncIterator(asyncIterable);
asyncIterable = webidl.converters["async iterable<any>"](
asyncIterable,
prefix,
"Argument 1",
);
const iter = asyncIterable.open();

const stream = createReadableStream(noop, async () => {
// deno-lint-ignore prefer-primordials
const res = await iterator.next();
if (!isObject(res)) {
throw new TypeError("iterator.next value is not an object");
}
const res = await iter.next();
if (res.done) {
readableStreamDefaultControllerClose(stream[_controller]);
} else {
Expand All @@ -5220,17 +5192,8 @@ class ReadableStream {
);
}
}, async (reason) => {
if (iterator.return == null) {
return undefined;
} else {
// deno-lint-ignore prefer-primordials
const res = await iterator.return(reason);
if (!isObject(res)) {
throw new TypeError("iterator.return value is not an object");
} else {
return undefined;
}
}
// deno-lint-ignore prefer-primordials
await iter.return(reason);
}, 0);
return stream;
}
Expand Down Expand Up @@ -6890,6 +6853,10 @@ webidl.converters.StreamPipeOptions = webidl
{ key: "signal", converter: webidl.converters.AbortSignal },
]);

webidl.converters["async iterable<any>"] = webidl.createAsyncIterableConverter(
webidl.converters.any,
);

internals.resourceForReadableStream = resourceForReadableStream;

export {
Expand Down
126 changes: 126 additions & 0 deletions ext/webidl/00_webidl.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const {
Float32Array,
Float64Array,
FunctionPrototypeBind,
FunctionPrototypeCall,
Int16Array,
Int32Array,
Int8Array,
Expand Down Expand Up @@ -77,6 +78,7 @@ const {
StringPrototypeToWellFormed,
Symbol,
SymbolIterator,
SymbolAsyncIterator,
SymbolToStringTag,
TypedArrayPrototypeGetBuffer,
TypedArrayPrototypeGetSymbolToStringTag,
Expand Down Expand Up @@ -919,6 +921,127 @@ function createSequenceConverter(converter) {
};
}

function isAsyncIterator(obj) {
if (obj[SymbolAsyncIterator] === undefined) {
if (obj[SymbolIterator] === undefined) {
return false;
}
}

return true;
}

const AsyncIterable = Symbol("[[asyncIterable]]");

function createAsyncIterableConverter(converter) {
return function (
V,
prefix = undefined,
context = undefined,
opts = { __proto__: null },
) {
if (type(V) !== "Object") {
throw makeException(
TypeError,
"can not be converted to async iterable.",
prefix,
context,
);
}

let isAsync = true;
let method = V[SymbolAsyncIterator];
if (method === undefined) {
method = V[SymbolIterator];

if (method === undefined) {
throw makeException(
TypeError,
"is not iterable.",
prefix,
context,
);
}

isAsync = false;
}

return {
value: V,
[AsyncIterable]: AsyncIterable,
open(context) {
const iter = FunctionPrototypeCall(method, V);
if (type(iter) !== "Object") {
throw new TypeError(
`${context} could not be iterated because iterator method did not return object, but ${
type(iter)
}.`,
);
}

let asyncIterator = iter;

if (!isAsync) {
asyncIterator = {
// deno-lint-ignore require-await
async next() {
// deno-lint-ignore prefer-primordials
return iter.next();
},
};
}

return {
async next() {
// deno-lint-ignore prefer-primordials
const iterResult = await asyncIterator.next();
if (type(iterResult) !== "Object") {
throw TypeError(
`${context} failed to iterate next value because the next() method did not return an object, but ${
type(iterResult)
}.`,
);
}

if (iterResult.done) {
return { done: true };
}

const iterValue = converter(
iterResult.value,
`${context} failed to iterate next value`,
`The value returned from the next() method`,
opts,
);

return { done: false, value: iterValue };
},
async return(reason) {
if (asyncIterator.return === undefined) {
return undefined;
}

// deno-lint-ignore prefer-primordials
const returnPromiseResult = await asyncIterator.return(reason);
if (type(returnPromiseResult) !== "Object") {
throw TypeError(
`${context} failed to close iterator because the return() method did not return an object, but ${
type(returnPromiseResult)
}.`,
);
}

return undefined;
},
[SymbolAsyncIterator]() {
return this;
},
};
},
};
};
}

function createRecordConverter(keyConverter, valueConverter) {
return (V, prefix, context, opts) => {
if (type(V) !== "Object") {
Expand Down Expand Up @@ -1287,9 +1410,11 @@ function setlike(obj, objPrototype, readonly) {

export {
assertBranded,
AsyncIterable,
brand,
configureInterface,
converters,
createAsyncIterableConverter,
createBranded,
createDictionaryConverter,
createEnumConverter,
Expand All @@ -1300,6 +1425,7 @@ export {
createSequenceConverter,
illegalConstructor,
invokeCallbackFunction,
isAsyncIterator,
makeException,
mixinPairIterable,
requiredArguments,
Expand Down
21 changes: 21 additions & 0 deletions ext/webidl/internal.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,27 @@ declare module "ext:deno_webidl/00_webidl.js" {
opts?: any,
) => T[];

/**
* Create a converter that converts an async iterable of the inner type.
*/
function createAsyncIterableConverter<V, T>(
converter: (
v: V,
prefix?: string,
context?: string,
opts?: any,
) => T,
): (
v: any,
prefix?: string,
context?: string,
opts?: any,
) => ConvertedAsyncIterable<V, T>;

interface ConvertedAsyncIterable<V, T> extends AsyncIterableIterator<T> {
value: V;
}

/**
* Create a converter that converts a Promise of the inner type.
*/
Expand Down
1 change: 1 addition & 0 deletions tests/integration/node_unit_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ util::unit_test_factory!(
dgram_test,
domain_test,
fs_test,
fetch_test,
http_test,
http2_test,
_randomBytes_test = internal / _randomBytes_test,
Expand Down
15 changes: 14 additions & 1 deletion tests/unit/streams_test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
import { assertEquals, assertRejects, fail } from "./test_util.ts";
import {
assertEquals,
assertRejects,
assertThrows,
fail,
} from "./test_util.ts";

const {
core,
Expand Down Expand Up @@ -533,3 +538,11 @@ Deno.test(async function decompressionStreamInvalidGzipStillReported() {
"corrupt gzip stream does not have a matching checksum",
);
});

Deno.test(function readableStreamFromWithStringThrows() {
assertThrows(
() => ReadableStream.from("string"),
TypeError,
"Failed to execute 'ReadableStream.from': Argument 1 can not be converted to async iterable.",
);
});
18 changes: 18 additions & 0 deletions tests/unit_node/fetch_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

import { assertEquals } from "@std/assert";
import { createReadStream } from "node:fs";

Deno.test("fetch node stream", async () => {
const file = createReadStream("tests/testdata/assets/fixture.json");

const response = await fetch("http://localhost:4545/echo_server", {
method: "POST",
body: file,
});

assertEquals(
await response.text(),
await Deno.readTextFile("tests/testdata/assets/fixture.json"),
);
});
Loading

0 comments on commit ba40347

Please sign in to comment.