diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 7e39ab42750d8e..aa087a29f0ab10 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -3056,6 +3056,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } fn endRequestStreaming(this: *RequestContext) bool { assert(this.server != null); + + this.request_body_buf.clearAndFree(bun.default_allocator); + // if we cannot, we have to reject pending promises // first, we reject the request body promise if (this.request_body) |body| { @@ -3069,6 +3072,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp return false; } fn detachResponse(this: *RequestContext) void { + this.request_body_buf.clearAndFree(bun.default_allocator); + if (this.resp) |resp| { this.resp = null; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 933df5ecae3db9..60069e37bd2aae 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -4233,7 +4233,7 @@ pub const FileReader = struct { pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState) bool { var buf = init_buf; - log("onReadChunk() = {d} ({s})", .{ buf.len, @tagName(state) }); + log("onReadChunk() = {d} ({s}) - read_inside_on_pull: {s}", .{ buf.len, @tagName(state), @tagName(this.read_inside_on_pull) }); if (this.done) { this.reader.close(); @@ -4286,10 +4286,9 @@ pub const FileReader = struct { if (this.buffered.capacity > 0) { this.buffered.clearAndFree(bun.default_allocator); } + var taken = this.reader.takeBuffer(); - if (this.reader.buffer().items.len != 0) { - this.buffered = this.reader.buffer().moveToUnmanaged(); - } + this.buffered = taken.moveToUnmanaged(); } var buffer = &this.buffered; diff --git a/src/js/builtins/ProcessObjectInternals.ts b/src/js/builtins/ProcessObjectInternals.ts index 878917fc434431..6295e7e01eafe2 100644 --- a/src/js/builtins/ProcessObjectInternals.ts +++ b/src/js/builtins/ProcessObjectInternals.ts @@ -70,28 +70,23 @@ export function getStdioWriteStream(fd) { export function getStdinStream(fd) { const native = Bun.stdin.stream(); + // @ts-expect-error + const source = native.$bunNativePtr; var reader: ReadableStreamDefaultReader | undefined; - var readerRef; var shouldUnref = false; function ref() { $debug("ref();", reader ? "already has reader" : "getting reader"); reader ??= native.getReader(); - // TODO: remove this. likely we are dereferencing the stream - // when there is still more data to be read. - readerRef ??= setInterval(() => {}, 1 << 30); + source.updateRef(true); shouldUnref = false; } function unref() { $debug("unref();"); - if (readerRef) { - clearInterval(readerRef); - readerRef = undefined; - $debug("cleared timeout"); - } + if (reader) { try { reader.releaseLock(); @@ -103,22 +98,7 @@ export function getStdinStream(fd) { // Releasing the lock is not possible as there are active reads // we will instead pretend we are unref'd, and release the lock once the reads are finished. - shouldUnref = true; - - // unref the native part of the stream - try { - $getByIdDirectPrivate( - $getByIdDirectPrivate(native, "readableStreamController"), - "underlyingByteSource", - ).$resume(false); - } catch (e) { - if (IS_BUN_DEVELOPMENT) { - // we assume this isn't possible, but because we aren't sure - // we will ignore if error during release, but make a big deal in debug - console.error(e); - $assert(!"reachable"); - } - } + source?.updateRef?.(false); } } } @@ -167,25 +147,11 @@ export function getStdinStream(fd) { async function internalRead(stream) { $debug("internalRead();"); try { - var done: boolean, value: Uint8Array[]; $assert(reader); - const pendingRead = reader.readMany(); - - if ($isPromise(pendingRead)) { - ({ done, value } = await pendingRead); - } else { - $debug("readMany() did not return a promise"); - ({ done, value } = pendingRead); - } + const { done, value } = await reader.read(); - if (!done) { - stream.push(value[0]); - - // shouldn't actually happen, but just in case - const length = value.length; - for (let i = 1; i < length; i++) { - stream.push(value[i]); - } + if (value) { + stream.push(value); if (shouldUnref) unref(); } else { @@ -246,7 +212,6 @@ export function getStdinStream(fd) { return stream; } - export function initializeNextTickQueue(process, nextTickQueue, drainMicrotasksFn, reportUncaughtExceptionFn) { var queue; var process; diff --git a/src/js/builtins/ReadableStreamDefaultReader.ts b/src/js/builtins/ReadableStreamDefaultReader.ts index c1004488a3ca7b..c79d3a78486907 100644 --- a/src/js/builtins/ReadableStreamDefaultReader.ts +++ b/src/js/builtins/ReadableStreamDefaultReader.ts @@ -51,19 +51,23 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau const state = $getByIdDirectPrivate(stream, "state"); stream.$disturbed = true; - if (state === $streamClosed) return { value: [], size: 0, done: true }; - else if (state === $streamErrored) { + if (state === $streamErrored) { throw $getByIdDirectPrivate(stream, "storedError"); } var controller = $getByIdDirectPrivate(stream, "readableStreamController"); - var queue = $getByIdDirectPrivate(controller, "queue"); - if (!queue) { + if (controller) { + var queue = $getByIdDirectPrivate(controller, "queue"); + } + + if (!queue && state !== $streamClosed) { // This is a ReadableStream direct controller implemented in JS // It hasn't been started yet. return controller.$pull(controller).$then(function ({ done, value }) { - return done ? { done: true, value: [], size: 0 } : { value: [value], size: 1, done: false }; + return done ? { done: true, value: value ? [value] : [], size: 0 } : { value: [value], size: 1, done: false }; }); + } else if (!queue) { + return { done: true, value: [], size: 0 }; } const content = queue.content; @@ -98,27 +102,31 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau $putByValDirect(outValues, i, values[i].value); } } - $resetQueue($getByIdDirectPrivate(controller, "queue")); - if ($getByIdDirectPrivate(controller, "closeRequested")) { - $readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream")); - } else if ($isReadableStreamDefaultController(controller)) { - $readableStreamDefaultControllerCallPullIfNeeded(controller); - } else if ($isReadableByteStreamController(controller)) { - $readableByteStreamControllerCallPullIfNeeded(controller); + if (state !== $streamClosed) { + if ($getByIdDirectPrivate(controller, "closeRequested")) { + $readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream")); + } else if ($isReadableStreamDefaultController(controller)) { + $readableStreamDefaultControllerCallPullIfNeeded(controller); + } else if ($isReadableByteStreamController(controller)) { + $readableByteStreamControllerCallPullIfNeeded(controller); + } } + $resetQueue($getByIdDirectPrivate(controller, "queue")); return { value: outValues, size, done: false }; } var onPullMany = result => { + const resultValue = result.value; + if (result.done) { - return { value: [], size: 0, done: true }; + return { value: resultValue ? [resultValue] : [], size: 0, done: true }; } var controller = $getByIdDirectPrivate(stream, "readableStreamController"); var queue = $getByIdDirectPrivate(controller, "queue"); - var value = [result.value].concat(queue.content.toArray(false)); + var value = [resultValue].concat(queue.content.toArray(false)); var length = value.length; if ($isReadableByteStreamController(controller)) { @@ -136,8 +144,6 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau } var size = queue.size; - $resetQueue(queue); - if ($getByIdDirectPrivate(controller, "closeRequested")) { $readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream")); } else if ($isReadableStreamDefaultController(controller)) { @@ -146,12 +152,18 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau $readableByteStreamControllerCallPullIfNeeded(controller); } + $resetQueue($getByIdDirectPrivate(controller, "queue")); + return { value: value, size: size, done: false }; }; + if (state === $streamClosed) { + return { value: [], size: 0, done: true }; + } + var pullResult = controller.$pull(controller); if (pullResult && $isPromise(pullResult)) { - return pullResult.$then(onPullMany) as any; + return pullResult.then(onPullMany) as any; } return onPullMany(pullResult); diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts index f81b71d6bdcd1b..256f91ee9d9749 100644 --- a/src/js/builtins/ReadableStreamInternals.ts +++ b/src/js/builtins/ReadableStreamInternals.ts @@ -1638,9 +1638,7 @@ export function readableStreamReaderGenericRelease(reader) { var stream = $getByIdDirectPrivate(reader, "ownerReadableStream"); if (stream.$bunNativePtr) { - $getByIdDirectPrivate($getByIdDirectPrivate(stream, "readableStreamController"), "underlyingByteSource").$resume( - false, - ); + $getByIdDirectPrivate($getByIdDirectPrivate(stream, "readableStreamController"), "underlyingSource").$resume(false); } $putByIdDirectPrivate(stream, "reader", undefined); $putByIdDirectPrivate(reader, "ownerReadableStream", undefined); @@ -1768,162 +1766,243 @@ export function readableStreamFromAsyncIterator(target, fn) { }); } -export function lazyLoadStream(stream, autoAllocateChunkSize) { - $debug("lazyLoadStream", stream, autoAllocateChunkSize); - var handle = stream.$bunNativePtr; - if (handle === -1) return; - var Prototype = $lazyStreamPrototypeMap.$get($getPrototypeOf(handle)); - if (Prototype === undefined) { - var closer = [false]; - var handleResult; - function handleNativeReadableStreamPromiseResult(val) { - var { c, v } = this; - this.c = undefined; - this.v = undefined; - handleResult(val, c, v); +export function createLazyLoadedStreamPrototype(): typeof ReadableStreamDefaultController { + const closer = [false]; + + function callClose(controller) { + try { + var source = controller.$underlyingSource; + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + if (!stream) { + return; + } + + if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return; + controller.close(); + } catch (e) { + globalThis.reportError(e); + } finally { + if (source?.$stream) { + source.$stream = undefined; + } + + if (source) { + source.$data = undefined; + } } + } - function callClose(controller) { - try { - var underlyingByteSource = controller.$underlyingByteSource; - const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); - if (!stream) { - return; - } + // This was a type: "bytes" until Bun v1.1.44, but pendingPullIntos was not really + // compatible with how we send data to the stream, and "mode: 'byob'" wasn't + // supported so changing it isn't an observable change. + // + // When we receive chunks of data from native code, we sometimes read more + // than what the input buffer provided. When that happens, we return a typed + // array instead of the number of bytes read. + // + // When that happens, the ReadableByteStreamController creates (byteLength / autoAllocateChunkSize) pending pull into descriptors. + // So if that number is something like 16 * 1024, and we actually read 2 MB, you're going to create 128 pending pull into descriptors. + // + // And those pendingPullIntos were often never actually drained. + class NativeReadableStreamSource { + constructor(handle, autoAllocateChunkSize, drainValue) { + $putByIdDirectPrivate(this, "stream", handle); + this.pull = this.#pull.bind(this); + this.cancel = this.#cancel.bind(this); + this.autoAllocateChunkSize = autoAllocateChunkSize; + + if (drainValue !== undefined) { + this.start = controller => { + this.start = undefined; + this.#controller = new WeakRef(controller); + controller.enqueue(drainValue); + }; + } - if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return; - controller.close(); - } catch (e) { - globalThis.reportError(e); - } finally { - if (underlyingByteSource?.$stream) { - underlyingByteSource.$stream = undefined; - } + handle.onClose = this.#onClose.bind(this); + handle.onDrain = this.#onDrain.bind(this); + } + + #onDrain(chunk) { + var controller = this.#controller?.deref?.(); + if (controller) { + controller.enqueue(chunk); } } - handleResult = function handleResult(result, controller, view) { - $assert(controller, "controller is missing"); + #hasResized = false; - if (result && $isPromise(result)) { - return result.$then( - handleNativeReadableStreamPromiseResult.bind({ - c: controller, - v: view, - }), - err => controller.error(err), - ); - } else if (typeof result === "number") { - if (view && view.byteLength === result && view.buffer === controller?.byobRequest?.view?.buffer) { - controller.byobRequest.respondWithNewView(view); - } else { - controller.byobRequest.respond(result); - } - } else if ($isTypedArrayView(result)) { - controller.enqueue(result); + #adjustHighWaterMark(result) { + const autoAllocateChunkSize = this.autoAllocateChunkSize; + if (result >= autoAllocateChunkSize && !this.#hasResized) { + this.#hasResized = true; + this.autoAllocateChunkSize = Math.min(autoAllocateChunkSize * 2, 1024 * 1024 * 2); } + } + + #controller: WeakRef; + + pull; + cancel; + start; - if (closer[0] || result === false) { + autoAllocateChunkSize = 0; + #chunk; + #closed = false; + + $data?: Uint8Array; + + #onClose() { + this.#closed = true; + this.#controller = undefined; + this.$data = undefined; + + var controller = this.#controller?.deref?.(); + + $putByIdDirectPrivate(this, "stream", undefined); + if (controller) { $enqueueJob(callClose, controller); - closer[0] = false; } - }; + } - function createResult(handle, controller, view, closer) { - closer[0] = false; + #getInternalBuffer(chunkSize) { + return new Uint8Array(chunkSize); + } - var result; - try { - result = handle.pull(view, closer); - } catch (err) { - return controller.error(err); + #handleArrayBufferViewResult(result, view, isClosed, controller) { + if (result.byteLength > 0) { + controller.enqueue(result); + } + + if (isClosed) { + $enqueueJob(callClose, controller); + return undefined; } - return handleResult(result, controller, view); + return view; } - Prototype = class NativeReadableStreamSource { - constructor(handle, autoAllocateChunkSize, drainValue) { - $putByIdDirectPrivate(this, "stream", handle); - this.pull = this.#pull.bind(this); - this.cancel = this.#cancel.bind(this); - this.autoAllocateChunkSize = autoAllocateChunkSize; - - if (drainValue !== undefined) { - this.start = controller => { - this.#controller = new WeakRef(controller); - controller.enqueue(drainValue); - }; + #handleNumberResult(result, view, isClosed, controller) { + if (result > 0) { + const remaining = view.length - result; + let toEnqueue = view; + + if (remaining > 0) { + toEnqueue = view.subarray(0, result); } - handle.onClose = this.#onClose.bind(this); - handle.onDrain = this.#onDrain.bind(this); + controller.enqueue(toEnqueue); } - #onDrain(chunk) { - var controller = this.#controller?.deref?.(); - if (controller) { - controller.enqueue(chunk); - } + if (isClosed) { + $enqueueJob(callClose, controller); + return undefined; } - #controller: WeakRef; - - pull; - cancel; - start; + return view; + } - type = "bytes"; - autoAllocateChunkSize = 0; - #closed = false; + #onNativeReadableStreamResult(result, view, isClosed, controller) { + if (typeof result === "number") { + if (!isClosed) this.#adjustHighWaterMark(result); + return this.#handleNumberResult(result, view, isClosed, controller); + } else if (typeof result === "boolean") { + $enqueueJob(callClose, controller); + return undefined; + } else if ($isTypedArrayView(result)) { + if (!isClosed) this.#adjustHighWaterMark(result.byteLength); + return this.#handleArrayBufferViewResult(result, view, isClosed, controller); + } - #onClose() { - this.#closed = true; - this.#controller = undefined; + $debug("Unknown result type", result); + throw $ERR_INVALID_STATE("Internal error: invalid result from pull. This is a bug in Bun. Please report it."); + } - var controller = this.#controller?.deref?.(); + #pull(controller) { + var handle = $getByIdDirectPrivate(this, "stream"); + if (!handle || this.#closed) { + this.#controller = undefined; + this.#closed = true; $putByIdDirectPrivate(this, "stream", undefined); - if (controller) { - $enqueueJob(callClose, controller); - } + $enqueueJob(callClose, controller); + this.$data = undefined; + return; } - #pull(controller) { - var handle = $getByIdDirectPrivate(this, "stream"); + if (!this.#controller) { + this.#controller = new WeakRef(controller); + } - if (!handle || this.#closed) { - this.#controller = undefined; - $putByIdDirectPrivate(this, "stream", undefined); - $enqueueJob(callClose, controller); - return; - } + closer[0] = false; - if (!this.#controller) { - this.#controller = new WeakRef(controller); - } + for (let drainResult = handle.drain(); drainResult; drainResult = handle.drain()) { + this.$data = this.#onNativeReadableStreamResult(drainResult, this.$data, (closer[0] = false), controller); + if ((this.#closed = closer[0])) return; + } + if (this.#closed) return; - createResult(handle, controller, controller.byobRequest.view, closer); + const view = this.#getInternalBuffer(this.autoAllocateChunkSize); + const result = handle.pull(view, closer); + if ($isPromise(result)) { + return result.$then( + result => { + this.$data = this.#onNativeReadableStreamResult(result, view, closer[0], controller); + if (this.#closed) { + this.$data = undefined; + } + }, + err => { + this.$data = undefined; + this.#closed = true; + this.#controller = undefined; + controller.error(err); + this.#onClose(); + }, + ); } - #cancel(reason) { - var handle = $getByIdDirectPrivate(this, "stream"); - if (handle) { - handle.updateRef(false); - handle.cancel(reason); - $putByIdDirectPrivate(this, "stream", undefined); - } + this.$data = this.#onNativeReadableStreamResult(result, view, closer[0], controller); + if (this.#closed) { + this.$data = undefined; } - }; - // this is reuse of an existing private symbol - Prototype.prototype.$resume = function (has_ref) { + } + + #cancel(reason) { var handle = $getByIdDirectPrivate(this, "stream"); - if (handle) handle.updateRef(has_ref); - }; - $lazyStreamPrototypeMap.$set($getPrototypeOf(handle), Prototype); + this.$data = undefined; + if (handle) { + handle.updateRef(false); + handle.cancel(reason); + $putByIdDirectPrivate(this, "stream", undefined); + } + } + } + // this is reuse of an existing private symbol + NativeReadableStreamSource.prototype.$resume = function (has_ref) { + var handle = $getByIdDirectPrivate(this, "stream"); + if (handle) handle.updateRef(has_ref); + }; + + return NativeReadableStreamSource; +} + +export function lazyLoadStream(stream, autoAllocateChunkSize) { + $debug("lazyLoadStream", stream, autoAllocateChunkSize); + var handle = stream.$bunNativePtr; + if (handle === -1) return; + var Prototype = $lazyStreamPrototypeMap.$get($getPrototypeOf(handle)); + if (Prototype === undefined) { + $lazyStreamPrototypeMap.$set($getPrototypeOf(handle), (Prototype = $createLazyLoadedStreamPrototype())); } stream.$disturbed = true; + + if (autoAllocateChunkSize === undefined) { + // This default is what Node.js uses as well. + autoAllocateChunkSize = 256 * 1024; + } + const chunkSizeOrCompleteBuffer = handle.start(autoAllocateChunkSize); let chunkSize, drainValue; if ($isTypedArrayView(chunkSizeOrCompleteBuffer)) { @@ -1945,7 +2024,6 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) { pull(controller) { controller.close(); }, - type: "bytes", }; } @@ -1956,11 +2034,10 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) { pull(controller) { controller.close(); }, - type: "bytes", }; } - return new Prototype(handle, chunkSize, drainValue); + return new Prototype(handle, Math.max(chunkSize, autoAllocateChunkSize), drainValue); } export function readableStreamIntoArray(stream) { diff --git a/src/js/node/child_process.ts b/src/js/node/child_process.ts index a1fe447c8c7f21..a94dfa43b18c0b 100644 --- a/src/js/node/child_process.ts +++ b/src/js/node/child_process.ts @@ -1120,12 +1120,14 @@ class ChildProcess extends EventEmitter { } } + const handle = this.#handle; const io = this.#stdioOptions[i]; switch (i) { case 0: { switch (io) { case "pipe": { - const stdin = this.#handle.stdin; + const stdin = handle?.stdin; + if (!stdin) // This can happen if the process was already killed. return new ShimmedStdin(); @@ -1143,7 +1145,7 @@ class ChildProcess extends EventEmitter { case 1: { switch (io) { case "pipe": { - const value = this.#handle[fdToStdioName(i) as any as number]; + const value = handle?.[fdToStdioName(i as 1 | 2)!]; // This can happen if the process was already killed. if (!value) return new ShimmedStdioOutStream(); @@ -1163,7 +1165,7 @@ class ChildProcess extends EventEmitter { switch (io) { case "pipe": if (!NetModule) NetModule = require("node:net"); - const fd = this.#handle.stdio[i]; + const fd = handle && handle.stdio[i]; if (!fd) return null; return new NetModule.connect({ fd }); } @@ -1480,7 +1482,7 @@ function isNodeStreamWritable(item) { return true; } -function fdToStdioName(fd) { +function fdToStdioName(fd: number) { switch (fd) { case 0: return "stdin"; diff --git a/test/js/bun/http/body-leak-test-fixture.ts b/test/js/bun/http/body-leak-test-fixture.ts index 5dbc7a1ea8a487..7c50ad88488cd4 100644 --- a/test/js/bun/http/body-leak-test-fixture.ts +++ b/test/js/bun/http/body-leak-test-fixture.ts @@ -11,6 +11,16 @@ const server = Bun.serve({ "Content-Type": "application/json", }, }); + } else if (url.endsWith("/heap-snapshot")) { + Bun.gc(true); + await Bun.sleep(10); + require("v8").writeHeapSnapshot("/tmp/heap.heapsnapshot"); + console.log("Wrote heap snapshot to /tmp/heap.heapsnapshot"); + return new Response(JSON.stringify(process.memoryUsage.rss()), { + headers: { + "Content-Type": "application/json", + }, + }); } if (url.endsWith("/json-buffering")) { await req.json(); @@ -44,3 +54,16 @@ const server = Bun.serve({ }); console.log(server.url.href); process?.send?.(server.url.href); + +if (!process.send) { + setInterval(() => { + Bun.gc(true); + const rss = (process.memoryUsage.rss() / 1024 / 1024) | 0; + console.log("RSS", rss, "MB"); + console.log("Active requests", server.pendingRequests); + + if (rss > 1024) { + require("v8").writeHeapSnapshot("/tmp/heap.heapsnapshot"); + } + }, 5000); +} diff --git a/test/js/bun/http/serve-body-leak.test.ts b/test/js/bun/http/serve-body-leak.test.ts index 510a00a07853f1..2f9f38c66ab7d5 100644 --- a/test/js/bun/http/serve-body-leak.test.ts +++ b/test/js/bun/http/serve-body-leak.test.ts @@ -1,6 +1,6 @@ import type { Subprocess } from "bun"; import { afterEach, beforeEach, expect, it } from "bun:test"; -import { bunEnv, bunExe, isDebug, isFlaky, isLinux, isWindows } from "harness"; +import { bunEnv, bunExe, isCI, isDebug, isFlaky, isLinux, isWindows } from "harness"; import { join } from "path"; const payload = Buffer.alloc(512 * 1024, "1").toString("utf-8"); // decent size payload to test memory leak @@ -9,7 +9,22 @@ const totalCount = 10_000; const zeroCopyPayload = new Blob([payload]); const zeroCopyJSONPayload = new Blob([JSON.stringify({ bun: payload })]); +// let HARDCODED_URL = "http://localhost:52666/"; +let HARDCODED_URL = null; + async function getURL() { + if (HARDCODED_URL) { + const url = new URL(HARDCODED_URL); + await warmup(url); + return { + url, + process: { + [Symbol.asyncDispose]() { + return Promise.resolve(); + }, + }, + }; + } let defer = Promise.withResolvers(); const process = Bun.spawn([bunExe(), "--smol", join(import.meta.dirname, "body-leak-test-fixture.ts")], { env: bunEnv, @@ -153,13 +168,30 @@ for (const test_info of [ testName, async () => { const { url, process } = await getURL(); - await using processHandle = process; - const report = await calculateMemoryLeak(fn, url); - // peak memory is too high - expect(report.peak_memory).not.toBeGreaterThan(report.start_memory * 2.5); - // acceptable memory leak - expect(report.leak).toBeLessThanOrEqual(maxMemoryGrowth); - expect(report.end_memory).toBeLessThanOrEqual(512 * 1024 * 1024); + try { + const report = await calculateMemoryLeak(fn, url); + console.log(report); + // peak memory is too high + expect(report.peak_memory).not.toBeGreaterThan(report.start_memory * 2.5); + + // acceptable memory leak + expect(report.leak).toBeLessThanOrEqual(maxMemoryGrowth); + + expect(report.end_memory).toBeLessThanOrEqual(512 * 1024 * 1024); + } catch (e) { + if (!isCI && process.platform !== "win32") { + try { + await fetch(`${url.origin}/heap-snapshot`); + await Bun.sleep(10); + } catch (e) { + console.error(e); + } + } + + throw e; + } finally { + process.kill?.(); + } }, isDebug ? 60_000 : 40_000, ); diff --git a/test/js/bun/s3/s3.test.ts b/test/js/bun/s3/s3.test.ts index a1c280b2666918..0476c672fb0481 100644 --- a/test/js/bun/s3/s3.test.ts +++ b/test/js/bun/s3/s3.test.ts @@ -5,7 +5,7 @@ import { S3Client, s3 as defaultS3, file, which } from "bun"; const s3 = (...args) => defaultS3.file(...args); const S3 = (...args) => new S3Client(...args); import child_process from "child_process"; -import type { S3Options } from "bun"; +import type { S3File, S3Options } from "bun"; import path from "path"; const dockerCLI = which("docker") as string; @@ -93,10 +93,7 @@ for (let credentials of allCredentials) { const S3Bucket = credentials.bucket; function makePayLoadFrom(text: string, size: number): string { - while (Buffer.byteLength(text) < size) { - text += text; - } - return text.slice(0, size); + return Buffer.alloc(size, text).toString(); } // 10 MiB big enough to Multipart upload in more than one part @@ -559,8 +556,17 @@ for (let credentials of allCredentials) { bytes += value?.length ?? 0; if (value) chunks.push(value as Buffer); } - expect(bytes).toBe(Buffer.byteLength(bigishPayload)); - expect(Buffer.concat(chunks).toString()).toBe(bigishPayload); + + const bigishPayloadString = Buffer.concat(chunks).toString(); + expect(bigishPayload.length).toBe(bigishPayloadString.length); + + // if this test fails, then we want to avoid printing megabytes to stderr. + + if (bigishPayloadString !== bigishPayload) { + const SHA1 = Bun.SHA1.hash(bigishPayloadString, "hex"); + const SHA1_2 = Bun.SHA1.hash(bigishPayload, "hex"); + expect(SHA1).toBe(SHA1_2); + } }, 30_000); }); }); @@ -591,33 +597,39 @@ for (let credentials of allCredentials) { await s3file.unlink(); expect().pass(); }); - it("should allow starting with slashs and backslashes", async () => { + it("should allow starting with forward slash", async () => { const options = { ...s3Options, bucket: S3Bucket }; - { - const s3file = s3(`/${randomUUID()}test.txt`, options); - await s3file.write("Hello Bun!"); - await s3file.unlink(); - } - { - const s3file = s3(`\\${randomUUID()}test.txt`, options); - await s3file.write("Hello Bun!"); - await s3file.unlink(); - } + const s3file = s3(`/${randomUUID()}test.txt`, options); + await s3file.write("Hello Bun!"); + await s3file.exists(); + await s3file.unlink(); expect().pass(); }); - it("should allow ending with slashs and backslashes", async () => { + it("should allow starting with backslash", async () => { const options = { ...s3Options, bucket: S3Bucket }; - { - const s3file = s3(`${randomUUID()}/`, options); - await s3file.write("Hello Bun!"); - await s3file.unlink(); - } - { - const s3file = s3(`${randomUUID()}\\`, options); - await s3file.write("Hello Bun!"); - await s3file.unlink(); - } + const s3file = s3(`\\${randomUUID()}test.txt`, options); + await s3file.write("Hello Bun!"); + await s3file.exists(); + await s3file.unlink(); + expect().pass(); + }); + + it("should allow ending with forward slash", async () => { + const options = { ...s3Options, bucket: S3Bucket }; + const s3file = s3(`${randomUUID()}/`, options); + await s3file.write("Hello Bun!"); + await s3file.exists(); + await s3file.unlink(); + expect().pass(); + }); + + it("should allow ending with backslash", async () => { + const options = { ...s3Options, bucket: S3Bucket }; + const s3file = s3(`${randomUUID()}\\`, options); + await s3file.write("Hello Bun!"); + await s3file.exists(); + await s3file.unlink(); expect().pass(); }); }); diff --git a/test/js/node/child_process/child-process-stdio.test.js b/test/js/node/child_process/child-process-stdio.test.js index 59865a533f40a5..8b133ec15081d9 100644 --- a/test/js/node/child_process/child-process-stdio.test.js +++ b/test/js/node/child_process/child-process-stdio.test.js @@ -25,7 +25,7 @@ describe("process.stdout", () => { describe("process.stdin", () => { it("should allow us to read from stdin in readable mode", done => { - const input = "hello\n"; + const input = "hello there\n"; // Child should read from stdin and write it back const child = spawn(bunExe(), [CHILD_PROCESS_FILE, "STDIN", "READABLE"], { env: bunEnv, diff --git a/test/js/node/http/fixtures/log-events.mjs b/test/js/node/http/fixtures/log-events.mjs index bb5dcdd820afcb..c3e6ec55f2a242 100644 --- a/test/js/node/http/fixtures/log-events.mjs +++ b/test/js/node/http/fixtures/log-events.mjs @@ -1,27 +1,36 @@ import * as http from "node:http"; -const options = { - hostname: "www.example.com", - port: 80, - path: "/", - method: "GET", - headers: {}, -}; - -const req = http.request(options, res => { - patchEmitter(res, "res"); - console.log(`STATUS: ${res.statusCode}`); - res.setEncoding("utf8"); +let server = http.createServer((req, res) => { + res.end("Hello, World!"); }); -patchEmitter(req, "req"); +server.listen(0, "localhost", 0, () => { + const options = { + hostname: "localhost", + port: server.address().port, + path: "/", + method: "GET", + headers: {}, + }; -req.end(); + const req = http.request(options, res => { + patchEmitter(res, "res"); + console.log(`STATUS: ${res.statusCode}`); + res.setEncoding("utf8"); + }); + patchEmitter(req, "req"); -function patchEmitter(emitter, prefix) { - var oldEmit = emitter.emit; + req.end().once("close", () => { + setTimeout(() => { + server.close(); + }, 1); + }); - emitter.emit = function () { - console.log([prefix, arguments[0]]); - oldEmit.apply(emitter, arguments); - }; -} + function patchEmitter(emitter, prefix) { + var oldEmit = emitter.emit; + + emitter.emit = function () { + console.log([prefix, arguments[0]]); + oldEmit.apply(emitter, arguments); + }; + } +}); diff --git a/test/js/node/http/node-http.test.ts b/test/js/node/http/node-http.test.ts index a65e3bbb4a4966..bbade224e625ee 100644 --- a/test/js/node/http/node-http.test.ts +++ b/test/js/node/http/node-http.test.ts @@ -1859,17 +1859,14 @@ it("#11425 http no payload limit", done => { }); it("should emit events in the right order", async () => { - const { stdout, stderr, exited } = Bun.spawn({ + const { stdout, exited } = Bun.spawn({ cmd: [bunExe(), "run", path.join(import.meta.dir, "fixtures/log-events.mjs")], stdout: "pipe", stdin: "ignore", - stderr: "pipe", + stderr: "inherit", env: bunEnv, }); - const err = await new Response(stderr).text(); - expect(err).toBeEmpty(); const out = await new Response(stdout).text(); - // TODO prefinish and socket are not emitted in the right order expect(out.split("\n")).toEqual([ `[ "req", "prefinish" ]`, `[ "req", "socket" ]`, @@ -1884,6 +1881,7 @@ it("should emit events in the right order", async () => { // `[ "res", "close" ]`, "", ]); + expect(await exited).toBe(0); }); it("destroy should end download", async () => { diff --git a/test/js/web/fetch/fetch.stream.test.ts b/test/js/web/fetch/fetch.stream.test.ts index b3414f453bf96d..09d90dd9930bbf 100644 --- a/test/js/web/fetch/fetch.stream.test.ts +++ b/test/js/web/fetch/fetch.stream.test.ts @@ -22,58 +22,56 @@ const fixtures = { const invalid = Buffer.from([0xc0]); -const bigText = Buffer.from("a".repeat(1 * 1024 * 1024)); -const smallText = Buffer.from("Hello".repeat(16)); +const bigText = Buffer.alloc(1 * 1024 * 1024, "a"); +const smallText = Buffer.alloc(16 * "Hello".length, "Hello"); const empty = Buffer.alloc(0); describe("fetch() with streaming", () => { [-1, 0, 20, 50, 100].forEach(timeout => { it(`should be able to fail properly when reading from readable stream with timeout ${timeout}`, async () => { - { - using server = Bun.serve({ - port: 0, - async fetch(req) { - return new Response( - new ReadableStream({ - async start(controller) { - controller.enqueue("Hello, World!"); - await Bun.sleep(1000); - controller.enqueue("Hello, World!"); - controller.close(); - }, - }), - { - status: 200, - headers: { - "Content-Type": "text/plain", - }, + using server = Bun.serve({ + port: 0, + async fetch(req) { + return new Response( + new ReadableStream({ + async start(controller) { + controller.enqueue("Hello, World!"); + await Bun.sleep(1000); + controller.enqueue("Hello, World!"); + controller.close(); }, - ); - }, - }); + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + }, + }, + ); + }, + }); - const server_url = `http://${server.hostname}:${server.port}`; - try { - const res = await fetch(server_url, { - signal: timeout < 0 ? AbortSignal.abort() : AbortSignal.timeout(timeout), - }); + const server_url = `http://${server.hostname}:${server.port}`; + try { + const res = await fetch(server_url, { + signal: timeout < 0 ? AbortSignal.abort() : AbortSignal.timeout(timeout), + }); - const reader = res.body?.getReader(); - let results = []; - while (true) { - const { done, data } = await reader?.read(); - if (data) results.push(data); - if (done) break; - } - expect.unreachable(); - } catch (err: any) { - if (timeout < 0) { - if (err.name !== "AbortError") throw err; - expect(err.message).toBe("The operation was aborted."); - } else { - if (err.name !== "TimeoutError") throw err; - expect(err.message).toBe("The operation timed out."); - } + const reader = res.body?.getReader(); + let results = []; + while (true) { + const { done, data } = await reader?.read(); + if (data) results.push(data); + if (done) break; + } + expect.unreachable(); + } catch (err: any) { + if (timeout < 0) { + if (err.name !== "AbortError") throw err; + expect(err.message).toBe("The operation was aborted."); + } else { + if (err.name !== "TimeoutError") throw err; + expect(err.message).toBe("The operation timed out."); } } }); @@ -682,354 +680,423 @@ describe("fetch() with streaming", () => { const test = skip ? it.skip : it; test(`with invalid utf8 with ${compression} compression`, async () => { - { - const content = Buffer.concat([invalid, Buffer.from("Hello, world!\n".repeat(5), "utf8"), invalid]); - using server = Bun.serve({ - port: 0, - fetch(req) { - return new Response( - new ReadableStream({ - type: "direct", - async pull(controller) { - const data = compress(compression, content); - const size = data.byteLength / 4; - controller.write(data.slice(0, size)); - await controller.flush(); - await Bun.sleep(100); - controller.write(data.slice(size, size * 2)); - await controller.flush(); - await Bun.sleep(100); - controller.write(data.slice(size * 2, size * 3)); - await controller.flush(); - await Bun.sleep(100); - controller.write(data.slice(size * 3, size * 4)); - await controller.flush(); - - controller.close(); - }, - }), - { - status: 200, - headers: { - "Content-Type": "text/plain", - ...headers, - }, + const content = Buffer.concat([invalid, Buffer.from("Hello, world!\n".repeat(5), "utf8"), invalid]); + using server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = compress(compression, content); + const size = data.byteLength / 4; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 4)); + await controller.flush(); + + controller.close(); }, - ); - }, - }); + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }, + ); + }, + }); - let res = await fetch(`http://${server.hostname}:${server.port}`, {}); - gcTick(false); - const reader = res.body?.getReader(); + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); - let buffer = Buffer.alloc(0); - while (true) { - gcTick(false); + let buffer = Buffer.alloc(0); + while (true) { + gcTick(false); - const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; - if (value) { - buffer = Buffer.concat([buffer, value]); - } - if (done) { - break; - } + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + if (done) { + break; } - - gcTick(false); - expect(buffer).toEqual(content); } + + gcTick(false); + expect(buffer).toEqual(content); }); test(`chunked response works (single chunk) with ${compression} compression`, async () => { - { - const content = "Hello, world!\n".repeat(5); - using server = Bun.serve({ - port: 0, - fetch(req) { - return new Response( - new ReadableStream({ - type: "direct", - async pull(controller) { - const data = compress(compression, Buffer.from(content, "utf8")); - controller.write(data); - await controller.flush(); - controller.close(); - }, - }), - { - status: 200, - headers: { - "Content-Type": "text/plain", - ...headers, - }, + const content = "Hello, world!\n".repeat(5); + using server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = compress(compression, Buffer.from(content, "utf8")); + controller.write(data); + await controller.flush(); + controller.close(); }, - ); - }, - }); - let res = await fetch(`http://${server.hostname}:${server.port}`, {}); - gcTick(false); - const result = await res.text(); - gcTick(false); - expect(result).toBe(content); + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }, + ); + }, + }); + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const result = await res.text(); + gcTick(false); + expect(result).toBe(content); - res = await fetch(`http://${server.hostname}:${server.port}`, {}); - gcTick(false); - const reader = res.body?.getReader(); + res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); - let buffer = Buffer.alloc(0); - let parts = 0; - while (true) { - gcTick(false); + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); - const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; - if (value) { - buffer = Buffer.concat([buffer, value]); - parts++; - } - if (done) { - break; - } + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; + if (value) { + buffer = Buffer.concat([buffer, value]); + parts++; + } + if (done) { + break; } - - gcTick(false); - expect(buffer.toString("utf8")).toBe(content); - expect(parts).toBe(1); } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBe(1); }); test(`chunked response works (multiple chunks) with ${compression} compression`, async () => { - { - const content = "Hello, world!\n".repeat(5); - using server = Bun.serve({ - port: 0, - fetch(req) { - return new Response( - new ReadableStream({ - type: "direct", - async pull(controller) { - const data = compress(compression, Buffer.from(content, "utf8")); - const size = data.byteLength / 5; - controller.write(data.slice(0, size)); - await controller.flush(); - await Bun.sleep(100); - controller.write(data.slice(size, size * 2)); - await controller.flush(); - await Bun.sleep(100); - controller.write(data.slice(size * 2, size * 3)); - await controller.flush(); - await Bun.sleep(100); - controller.write(data.slice(size * 3, size * 5)); - await controller.flush(); - - controller.close(); - }, - }), - { - status: 200, - headers: { - "Content-Type": "text/plain", - ...headers, - }, + const content = "Hello, world!\n".repeat(5); + using server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = compress(compression, Buffer.from(content, "utf8")); + const size = data.byteLength / 5; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 5)); + await controller.flush(); + + controller.close(); }, - ); - }, - }); - let res = await fetch(`http://${server.hostname}:${server.port}`, {}); - gcTick(false); - const result = await res.text(); - gcTick(false); - expect(result).toBe(content); + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }, + ); + }, + }); + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const result = await res.text(); + gcTick(false); + expect(result).toBe(content); - res = await fetch(`http://${server.hostname}:${server.port}`, {}); - gcTick(false); - const reader = res.body?.getReader(); + res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); - let buffer = Buffer.alloc(0); - let parts = 0; - while (true) { - gcTick(false); + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); - const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; - if (value) { - buffer = Buffer.concat([buffer, value]); - } - parts++; - if (done) { - break; - } + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; + if (value) { + buffer = Buffer.concat([buffer, value]); } + parts++; + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBeGreaterThan(1); + }); + test(`Content-Length response works (single part) with ${compression} compression`, async () => { + const content = "a".repeat(1024); + using server = Bun.serve({ + port: 0, + fetch(req) { + return new Response(compress(compression, Buffer.from(content)), { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }); + }, + }); + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const result = await res.text(); + gcTick(false); + expect(result).toBe(content); + + res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { gcTick(false); - expect(buffer.toString("utf8")).toBe(content); - expect(parts).toBeGreaterThan(1); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; + if (value) { + buffer = Buffer.concat([buffer, value]); + parts++; + } + if (done) { + break; + } } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBe(1); }); - test(`Content-Length response works (single part) with ${compression} compression`, async () => { - { - const content = "a".repeat(1024); - using server = Bun.serve({ - port: 0, - fetch(req) { - return new Response(compress(compression, Buffer.from(content)), { + test(`Content-Length response works (multiple parts) with ${compression} compression`, async () => { + const rawBytes = Buffer.allocUnsafe(128 * 1024); + // Random data doesn't compress well. We need enough random data that + // the compressed data is larger than 64 bytes. + require("crypto").randomFillSync(rawBytes); + const content = rawBytes.toString("hex"); + var onReceivedHeaders = Promise.withResolvers(); + using server = Bun.serve({ + port: 0, + async fetch(req) { + const data = compress(compression, Buffer.from(content)); + return new Response( + new ReadableStream({ + async pull(controller) { + const firstChunk = data.slice(0, 64); + const secondChunk = data.slice(firstChunk.length); + controller.enqueue(firstChunk); + await onReceivedHeaders.promise; + await Bun.sleep(64); + controller.enqueue(secondChunk); + controller.close(); + }, + }), + { status: 200, headers: { "Content-Type": "text/plain", ...headers, }, - }); - }, - }); - let res = await fetch(`http://${server.hostname}:${server.port}`, {}); - gcTick(false); - const result = await res.text(); - gcTick(false); - expect(result).toBe(content); - - res = await fetch(`http://${server.hostname}:${server.port}`, {}); - gcTick(false); - const reader = res.body?.getReader(); + }, + ); + }, + }); + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + let onReceiveHeadersResolve = onReceivedHeaders.resolve; + onReceivedHeaders = Promise.withResolvers(); + onReceiveHeadersResolve(); + gcTick(false); + const result = await res.text(); + gcTick(false); + expect(result).toBe(content); - let buffer = Buffer.alloc(0); - let parts = 0; - while (true) { - gcTick(false); + res = await fetch(`http://${server.hostname}:${server.port}`, {}); + onReceiveHeadersResolve = onReceivedHeaders.resolve; + onReceivedHeaders = Promise.withResolvers(); + onReceiveHeadersResolve(); - const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; - if (value) { - buffer = Buffer.concat([buffer, value]); - parts++; - } - if (done) { - break; - } - } + gcTick(false); + const reader = res.body?.getReader(); + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { gcTick(false); - expect(buffer.toString("utf8")).toBe(content); - expect(parts).toBe(1); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; + if (value) { + buffer = Buffer.concat([buffer, value]); + parts++; + } + if (done) { + break; + } } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBeGreaterThan(1); }); - test(`Content-Length response works (multiple parts) with ${compression} compression`, async () => { - { - const content = "a".repeat(64 * 1024); - var onReceivedHeaders = Promise.withResolvers(); - using server = Bun.serve({ - port: 0, - async fetch(req) { - const data = compress(compression, Buffer.from(content)); - return new Response( - new ReadableStream({ - async pull(controller) { - const firstChunk = data.slice(0, 64); - const secondChunk = data.slice(firstChunk.length); - controller.enqueue(firstChunk); - await onReceivedHeaders.promise; - await Bun.sleep(1); - controller.enqueue(secondChunk); - controller.close(); - }, - }), - { - status: 200, - headers: { - "Content-Type": "text/plain", - ...headers, - }, - }, - ); - }, - }); - let res = await fetch(`http://${server.hostname}:${server.port}`, {}); - onReceivedHeaders.resolve(); - onReceivedHeaders = Promise.withResolvers(); - gcTick(false); - const result = await res.text(); - gcTick(false); - expect(result).toBe(content); + test(`Extra data should be ignored on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => { + const parts = 5; + const content = "Hello".repeat(parts); + using server = Bun.listen({ + port: 0, + hostname: "0.0.0.0", + socket: { + async open(socket) { + var corked: any[] = []; + var cork = true; + async function write(chunk: any) { + await new Promise((resolve, reject) => { + if (cork) { + corked.push(chunk); + } - res = await fetch(`http://${server.hostname}:${server.port}`, {}); - onReceivedHeaders.resolve(); - onReceivedHeaders = Promise.withResolvers(); - gcTick(false); - const reader = res.body?.getReader(); + if (!cork && corked.length) { + socket.write(corked.join("")); + corked.length = 0; + socket.flush(); + } - let buffer = Buffer.alloc(0); - let parts = 0; - while (true) { - gcTick(false); + if (!cork) { + socket.write(chunk); + socket.flush(); + } - const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; - if (value) { - buffer = Buffer.concat([buffer, value]); - parts++; - } - if (done) { - break; - } - } + resolve(); + }); + } + const compressed = compress(compression, Buffer.from(content, "utf8")); + await write("HTTP/1.1 200 OK\r\n"); + await write("Content-Type: text/plain\r\n"); + for (const [key, value] of Object.entries(headers)) { + await write(key + ": " + value + "\r\n"); + } + await write("Content-Length: " + compressed.byteLength + "\r\n"); + await write("\r\n"); + const size = compressed.byteLength / 5; + for (var i = 0; i < 5; i++) { + cork = false; + await write(compressed.slice(size * i, size * (i + 1))); + } + await write("Extra Data!"); + await write("Extra Data!"); + socket.flush(); + }, + drain(socket) {}, + }, + }); + + const res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + let buffer = Buffer.alloc(0); + while (true) { gcTick(false); - expect(buffer.toString("utf8")).toBe(content); - expect(parts).toBeGreaterThan(1); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + if (done) { + break; + } } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); }); - test(`Extra data should be ignored on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => { - { - const parts = 5; - const content = "Hello".repeat(parts); - using server = Bun.listen({ - port: 0, - hostname: "0.0.0.0", - socket: { - async open(socket) { - var corked: any[] = []; - var cork = true; - async function write(chunk: any) { - await new Promise((resolve, reject) => { - if (cork) { - corked.push(chunk); - } - - if (!cork && corked.length) { - socket.write(corked.join("")); - corked.length = 0; - socket.flush(); - } - - if (!cork) { - socket.write(chunk); - socket.flush(); - } - - resolve(); - }); - } - const compressed = compress(compression, Buffer.from(content, "utf8")); - await write("HTTP/1.1 200 OK\r\n"); - await write("Content-Type: text/plain\r\n"); - for (const [key, value] of Object.entries(headers)) { - await write(key + ": " + value + "\r\n"); - } - await write("Content-Length: " + compressed.byteLength + "\r\n"); - await write("\r\n"); - const size = compressed.byteLength / 5; - for (var i = 0; i < 5; i++) { - cork = false; - await write(compressed.slice(size * i, size * (i + 1))); - } - await write("Extra Data!"); - await write("Extra Data!"); - socket.flush(); - }, - drain(socket) {}, + test(`Missing data should timeout on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => { + const parts = 5; + const content = "Hello".repeat(parts); + using server = Bun.listen({ + port: 0, + hostname: "0.0.0.0", + socket: { + async open(socket) { + var corked: any[] = []; + var cork = true; + async function write(chunk: any) { + await new Promise((resolve, reject) => { + if (cork) { + corked.push(chunk); + } + + if (!cork && corked.length) { + socket.write(corked.join("")); + corked.length = 0; + socket.flush(); + } + + if (!cork) { + socket.write(chunk); + socket.flush(); + } + + resolve(); + }); + } + const compressed = compress(compression, Buffer.from(content, "utf8")); + await write("HTTP/1.1 200 OK\r\n"); + await write("Content-Type: text/plain\r\n"); + for (const [key, value] of Object.entries(headers)) { + await write(key + ": " + value + "\r\n"); + } + // 10 extra missing bytes that we will never sent + await write("Content-Length: " + compressed.byteLength + 10 + "\r\n"); + await write("\r\n"); + const size = compressed.byteLength / 5; + for (var i = 0; i < 5; i++) { + cork = false; + await write(compressed.slice(size * i, size * (i + 1))); + } + socket.flush(); }, + drain(socket) {}, + }, + }); + try { + const res = await fetch(`http://${server.hostname}:${server.port}`, { + signal: AbortSignal.timeout(1000), }); - - const res = await fetch(`http://${server.hostname}:${server.port}`, {}); gcTick(false); const reader = res.body?.getReader(); @@ -1047,85 +1114,9 @@ describe("fetch() with streaming", () => { } gcTick(false); - expect(buffer.toString("utf8")).toBe(content); - } - }); - - test(`Missing data should timeout on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => { - { - const parts = 5; - const content = "Hello".repeat(parts); - using server = Bun.listen({ - port: 0, - hostname: "0.0.0.0", - socket: { - async open(socket) { - var corked: any[] = []; - var cork = true; - async function write(chunk: any) { - await new Promise((resolve, reject) => { - if (cork) { - corked.push(chunk); - } - - if (!cork && corked.length) { - socket.write(corked.join("")); - corked.length = 0; - socket.flush(); - } - - if (!cork) { - socket.write(chunk); - socket.flush(); - } - - resolve(); - }); - } - const compressed = compress(compression, Buffer.from(content, "utf8")); - await write("HTTP/1.1 200 OK\r\n"); - await write("Content-Type: text/plain\r\n"); - for (const [key, value] of Object.entries(headers)) { - await write(key + ": " + value + "\r\n"); - } - // 10 extra missing bytes that we will never sent - await write("Content-Length: " + compressed.byteLength + 10 + "\r\n"); - await write("\r\n"); - const size = compressed.byteLength / 5; - for (var i = 0; i < 5; i++) { - cork = false; - await write(compressed.slice(size * i, size * (i + 1))); - } - socket.flush(); - }, - drain(socket) {}, - }, - }); - try { - const res = await fetch(`http://${server.hostname}:${server.port}`, { - signal: AbortSignal.timeout(1000), - }); - gcTick(false); - const reader = res.body?.getReader(); - - let buffer = Buffer.alloc(0); - while (true) { - gcTick(false); - - const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; - if (value) { - buffer = Buffer.concat([buffer, value]); - } - if (done) { - break; - } - } - - gcTick(false); - expect(buffer.toString("utf8")).toBe("unreachable"); - } catch (err) { - expect((err as Error).name).toBe("TimeoutError"); - } + expect(buffer.toString("utf8")).toBe("unreachable"); + } catch (err) { + expect((err as Error).name).toBe("TimeoutError"); } }); @@ -1225,93 +1216,91 @@ describe("fetch() with streaming", () => { } test(`can handle socket close with ${compression} compression`, async () => { - { - const parts = 5; - const content = "Hello".repeat(parts); - const { promise, resolve: resolveSocket } = Promise.withResolvers(); - using server = Bun.listen({ - port: 0, - hostname: "0.0.0.0", - socket: { - async open(socket) { - var corked: any[] = []; - var cork = true; - async function write(chunk: any) { - await new Promise((resolve, reject) => { - if (cork) { - corked.push(chunk); - } - - if (!cork && corked.length) { - socket.write(corked.join("")); - corked.length = 0; - socket.flush(); - } - - if (!cork) { - socket.write(chunk); - socket.flush(); - } - - resolve(); - }); - } - const compressed = compress(compression, Buffer.from(content, "utf8")); - await write("HTTP/1.1 200 OK\r\n"); - await write("Content-Type: text/plain\r\n"); - for (const [key, value] of Object.entries(headers)) { - await write(key + ": " + value + "\r\n"); - } - // 10 extra missing bytes that we will never sent in this case we will wait to close - await write("Content-Length: " + compressed.byteLength + 10 + "\r\n"); - await write("\r\n"); + const parts = 5; + const content = "Hello".repeat(parts); + const { promise, resolve: resolveSocket } = Promise.withResolvers(); + using server = Bun.listen({ + port: 0, + hostname: "0.0.0.0", + socket: { + async open(socket) { + var corked: any[] = []; + var cork = true; + async function write(chunk: any) { + await new Promise((resolve, reject) => { + if (cork) { + corked.push(chunk); + } - resolveSocket(socket); + if (!cork && corked.length) { + socket.write(corked.join("")); + corked.length = 0; + socket.flush(); + } - const size = compressed.byteLength / 5; - for (var i = 0; i < 5; i++) { - cork = false; - await write(compressed.slice(size * i, size * (i + 1))); - } - socket.flush(); - }, - drain(socket) {}, - }, - }); + if (!cork) { + socket.write(chunk); + socket.flush(); + } - let socket: Socket | null = null; + resolve(); + }); + } + const compressed = compress(compression, Buffer.from(content, "utf8")); + await write("HTTP/1.1 200 OK\r\n"); + await write("Content-Type: text/plain\r\n"); + for (const [key, value] of Object.entries(headers)) { + await write(key + ": " + value + "\r\n"); + } + // 10 extra missing bytes that we will never sent in this case we will wait to close + await write("Content-Length: " + compressed.byteLength + 10 + "\r\n"); + await write("\r\n"); - try { - const res = await fetch(`http://${server.hostname}:${server.port}`, {}); - socket = await promise; - gcTick(false); + resolveSocket(socket); - const reader = res.body?.getReader(); + const size = compressed.byteLength / 5; + for (var i = 0; i < 5; i++) { + cork = false; + await write(compressed.slice(size * i, size * (i + 1))); + } + socket.flush(); + }, + drain(socket) {}, + }, + }); - let buffer = Buffer.alloc(0); + let socket: Socket | null = null; - while (true) { - gcTick(false); - const read_promise = reader?.read(); - socket?.end(); - socket = null; - const { done, value } = (await read_promise) as ReadableStreamDefaultReadResult; + try { + const res = await fetch(`http://${server.hostname}:${server.port}`, {}); + socket = await promise; + gcTick(false); - if (value) { - buffer = Buffer.concat([buffer, value]); - } + const reader = res.body?.getReader(); - if (done) { - break; - } - } + let buffer = Buffer.alloc(0); + while (true) { gcTick(false); - expect(buffer.toString("utf8")).toBe("unreachable"); - } catch (err) { - expect((err as Error).name).toBe("Error"); - expect((err as Error).code).toBe("ConnectionClosed"); + const read_promise = reader?.read(); + socket?.end(); + socket = null; + const { done, value } = (await read_promise) as ReadableStreamDefaultReadResult; + + if (value) { + buffer = Buffer.concat([buffer, value]); + } + + if (done) { + break; + } } + + gcTick(false); + expect(buffer.toString("utf8")).toBe("unreachable"); + } catch (err) { + expect((err as Error).name).toBe("Error"); + expect((err as Error).code).toBe("ConnectionClosed"); } }); } diff --git a/test/js/web/streams/streams-leak.test.ts b/test/js/web/streams/streams-leak.test.ts new file mode 100644 index 00000000000000..29e9a61af78804 --- /dev/null +++ b/test/js/web/streams/streams-leak.test.ts @@ -0,0 +1,62 @@ +import { expect, test } from "bun:test"; +import { isWindows } from "harness"; + +const BYTES_TO_WRITE = 500_000; + +// https://github.com/oven-sh/bun/issues/12198 +test.skipIf(isWindows)( + "Absolute memory usage remains relatively constant when reading and writing to a pipe", + async () => { + async function write(bytes: number) { + const buf = Buffer.alloc(bytes, "foo"); + await cat.stdin.write(buf); + } + async function read(bytes: number) { + let i = 0; + while (true) { + const { value } = await r.read(); + i += value?.length ?? 0; + if (i >= bytes) { + return; + } + } + } + + async function readAndWrite(bytes = BYTES_TO_WRITE) { + await Promise.all([write(bytes), read(bytes)]); + } + + await using cat = Bun.spawn(["cat"], { + stdin: "pipe", + stdout: "pipe", + stderr: "inherit", + }); + const r = cat.stdout.getReader() as any; + + const rounds = 5000; + + for (let i = 0; i < rounds; i++) { + await readAndWrite(BYTES_TO_WRITE); + } + Bun.gc(true); + const before = process.memoryUsage.rss(); + + for (let i = 0; i < rounds; i++) { + await readAndWrite(); + } + Bun.gc(true); + const after = process.memoryUsage.rss(); + + for (let i = 0; i < rounds; i++) { + await readAndWrite(); + } + Bun.gc(true); + const after2 = process.memoryUsage.rss(); + console.log({ after, after2 }); + console.log(require("bun:jsc").heapStats()); + console.log("RSS delta", ((after - before) | 0) / 1024 / 1024); + console.log("RSS total", (after / 1024 / 1024) | 0, "MB"); + expect(after).toBeLessThan(250 * 1024 * 1024); + expect(after).toBeLessThan(before * 1.5); + }, +); diff --git a/test/regression/issue/11297/11297.fixture.ts b/test/regression/issue/11297/11297.fixture.ts index 97de15a5513fc8..bc6ad091283eb0 100644 --- a/test/regression/issue/11297/11297.fixture.ts +++ b/test/regression/issue/11297/11297.fixture.ts @@ -19,7 +19,7 @@ const writer = (async function () { // 1. Remove "await" from proc.stdin.write(string) (keep the .end() await) // 2. Run `hyperfine "bun test/regression/issue/011297.fixture.ts"` (or run this many times on macOS.) // - await proc.stdin.write(string); + proc.stdin.write(string); } await proc.stdin.end(); console.timeEnd("Sent " + string.length + " bytes x 10");