Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix empty stream responses #15322

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/bun.js/api/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2893,6 +2893,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
signal.clear();
assert(signal.isDead());

// we need to render metadata before assignToStream because the stream can call res.end
// and this would auto write an 200 status
if (!this.flags.has_written_status) {
this.renderMetadata();
}

// We are already corked!
const assignment_result: JSValue = ResponseStream.JSSink.assignToStream(
globalThis,
Expand Down
23 changes: 19 additions & 4 deletions src/bun.js/webcore/response.zig
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,18 @@ pub const Fetch = struct {
}
}

pub fn derefFromThread(this: *FetchTasklet) void {
const count = this.ref_count.fetchSub(1, .monotonic);
bun.debugAssert(count > 0);

if (count == 1) {
// this is really unlikely to happen, but can happen
// lets make sure that we always call deinit from main thread

this.javascript_vm.eventLoop().enqueueTaskConcurrent(JSC.ConcurrentTask.fromCallback(this, FetchTasklet.deinit));
}
}

pub const HTTPRequestBody = union(enum) {
AnyBlob: AnyBlob,
Sendfile: http.Sendfile,
Expand Down Expand Up @@ -918,7 +930,7 @@ pub const Fetch = struct {
this.clearAbortSignal();
}

fn deinit(this: *FetchTasklet) void {
pub fn deinit(this: *FetchTasklet) void {
log("deinit", .{});

bun.assert(this.ref_count.load(.monotonic) == 0);
Expand Down Expand Up @@ -1785,12 +1797,15 @@ pub const Fetch = struct {
}

pub fn callback(task: *FetchTasklet, async_http: *http.AsyncHTTP, result: http.HTTPClientResult) void {
task.mutex.lock();
defer task.mutex.unlock();
// at this point only this thread is accessing result to is no race condition
const is_done = !result.has_more;
// we are done with the http client so we can deref our side
defer if (is_done) task.deref();
// this is a atomic operation and will enqueue a task to deinit on the main thread
defer if (is_done) task.derefFromThread();

task.mutex.lock();
// we need to unlock before task.deref();
defer task.mutex.unlock();
task.http.?.* = async_http.*;
task.http.?.response_buffer = async_http.response_buffer;

Expand Down
44 changes: 37 additions & 7 deletions test/js/bun/http/bun-server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,7 @@ describe("Server", () => {
}
});


test('server should return a body for a OPTIONS Request', async () => {
test("server should return a body for a OPTIONS Request", async () => {
using server = Bun.serve({
port: 0,
fetch(req) {
Expand All @@ -327,16 +326,17 @@ describe("Server", () => {
});
{
const url = `http://${server.hostname}:${server.port}/`;
const response = await fetch(new Request(url, {
method: 'OPTIONS',
}));
const response = await fetch(
new Request(url, {
method: "OPTIONS",
}),
);
expect(await response.text()).toBe("Hello World!");
expect(response.status).toBe(200);
expect(response.url).toBe(url);
}
});


test("abort signal on server with stream", async () => {
{
let signalOnServer = false;
Expand Down Expand Up @@ -456,7 +456,7 @@ describe("Server", () => {
env: bunEnv,
stderr: "pipe",
});
expect(stderr.toString('utf-8')).toBeEmpty();
expect(stderr.toString("utf-8")).toBeEmpty();
expect(exitCode).toBe(0);
});
});
Expand Down Expand Up @@ -768,3 +768,33 @@ test.skip("should be able to stream huge amounts of data", async () => {
expect(written).toBe(CONTENT_LENGTH);
expect(received).toBe(CONTENT_LENGTH);
}, 30_000);

test("should be able to redirect when using empty streams #15320", async () => {
using server = Bun.serve({
port: 0,
websocket: void 0,
async fetch(req, server2) {
const url = new URL(req.url);
if (url.pathname === "/redirect") {
const emptyStream = new ReadableStream({
start(controller) {
// Immediately close the stream to make it empty
controller.close();
},
});

return new Response(emptyStream, {
status: 307,
headers: {
location: "/",
},
});
}

return new Response("Hello, World");
},
});

const response = await fetch(`http://localhost:${server.port}/redirect`);
expect(await response.text()).toBe("Hello, World");
});
5 changes: 2 additions & 3 deletions test/js/web/fetch/body-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ for (let doClone of [true, false]) {
},
async url => {
called = true;
expect(await fetch(url).then(res => res.text())).toContain(
"Welcome to Bun! To get started, return a Response object.",
);
// if we can flush it will be "hey" otherwise will be empty
expect(await fetch(url).then(res => res.text())).toBeOneOf(["hey", ""]);
},
);

Expand Down