Skip to content

Commit

Permalink
Revert "Rewrite the internal Web Stream native bindings to use less m…
Browse files Browse the repository at this point in the history
…emory (#16349)"

This reverts commit 75a95aa.
  • Loading branch information
Jarred-Sumner committed Jan 27, 2025
1 parent 0d73927 commit 2c5acd8
Show file tree
Hide file tree
Showing 20 changed files with 285 additions and 479 deletions.
41 changes: 21 additions & 20 deletions bench/snippets/readdir.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,33 @@ const syncCount = readdirSync(dir, { recursive: true }).length;

const hash = createHash("sha256").update(result.sort().join("\n")).digest("hex");

bench(`await readdir("${dir}", {recursive: true})`, async () => {
await readdir(dir, { recursive: true });
});
// bench(`await readdir("${dir}", {recursive: true})`, async () => {
// await readdir(dir, { recursive: true });
// });

bench(`await readdir("${dir}", {recursive: true}) x 10`, async () => {
for (let i = 0; i < 64; i++) {
const promises = [
readdir(dir, { recursive: true }),
readdir(dir, { recursive: true }),
readdir(dir, { recursive: true }),
readdir(dir, { recursive: true }),
readdir(dir, { recursive: true }),
readdir(dir, { recursive: true }),
readdir(dir, { recursive: true }),
readdir(dir, { recursive: true }),
readdir(dir, { recursive: true }),
readdir(dir, { recursive: true }),
readdir(dir, { recursive: true }),
readdir(dir, { recursive: true, withFileTypes: true }),
readdir(dir, { recursive: true, withFileTypes: true }),
readdir(dir, { recursive: true, withFileTypes: true }),
readdir(dir, { recursive: true, withFileTypes: true }),
readdir(dir, { recursive: true, withFileTypes: true }),
readdir(dir, { recursive: true, withFileTypes: true }),
readdir(dir, { recursive: true, withFileTypes: true }),
readdir(dir, { recursive: true, withFileTypes: true }),
readdir(dir, { recursive: true, withFileTypes: true }),
readdir(dir, { recursive: true, withFileTypes: true }),
readdir(dir, { recursive: true, withFileTypes: true }),
];
await Promise.all(promises);
});
console.log("RSS", (process.memoryUsage.rss() / 1024 / 1024) | 0, "MB");
}

bench(`await readdir("${dir}", {recursive: false})`, async () => {
await readdir(dir, { recursive: false });
});
// bench(`await readdir("${dir}", {recursive: false})`, async () => {
// await readdir(dir, { recursive: false });
// });

await run();
// await run();

if (!process?.env?.BENCHMARK_RUNNER) {
console.log("\n", count, "files/dirs in", dir, "\n", "SHA256:", hash, "\n");
Expand Down
5 changes: 0 additions & 5 deletions src/bun.js/api/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3056,9 +3056,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
fn endRequestStreaming(this: *RequestContext) bool {
assert(this.server != null);

this.request_body_buf.clearAndFree(bun.default_allocator);

// if we cannot, we have to reject pending promises
// first, we reject the request body promise
if (this.request_body) |body| {
Expand All @@ -3072,8 +3069,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
return false;
}
fn detachResponse(this: *RequestContext) void {
this.request_body_buf.clearAndFree(bun.default_allocator);

if (this.resp) |resp| {
this.resp = null;

Expand Down
1 change: 1 addition & 0 deletions src/bun.js/bindings/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2312,6 +2312,7 @@ void JSC__JSValue__jsonStringify(JSC__JSValue JSValue0, JSC__JSGlobalObject* arg
ASSERT_NO_PENDING_EXCEPTION(arg1);
JSC::JSValue value = JSC::JSValue::decode(JSValue0);
WTF::String str = JSC::JSONStringify(arg1, value, (unsigned)arg2);

*arg3 = Bun::toStringRef(str);
}
unsigned char JSC__JSValue__jsType(JSC__JSValue JSValue0)
Expand Down
4 changes: 4 additions & 0 deletions src/bun.js/javascript.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,10 @@ pub const VirtualMachine = struct {
return VMHolder.vm.?;
}

pub fn has() bool {
return VMHolder.vm != null;
}

pub fn getMainThreadVM() ?*VirtualMachine {
return VMHolder.main_thread_vm;
}
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/webcore/response.zig
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ pub const Response = struct {
}
const json_value = args.nextEat() orelse JSC.JSValue.zero;

if (@intFromEnum(json_value) != 0) {
if (json_value != .zero) {
var str = bun.String.empty;
// calling JSON.stringify on an empty string adds extra quotes
// so this is correct
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4233,7 +4233,7 @@ pub const FileReader = struct {

pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState) bool {
var buf = init_buf;
log("onReadChunk() = {d} ({s}) - read_inside_on_pull: {s}", .{ buf.len, @tagName(state), @tagName(this.read_inside_on_pull) });
log("onReadChunk() = {d} ({s})", .{ buf.len, @tagName(state) });

if (this.done) {
this.reader.close();
Expand Down
51 changes: 43 additions & 8 deletions src/js/builtins/ProcessObjectInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,28 @@ export function getStdioWriteStream(fd) {

export function getStdinStream(fd, isTTY: boolean, fdType: BunProcessStdinFdType) {
const native = Bun.stdin.stream();
// @ts-expect-error
const source = native.$bunNativePtr;

var reader: ReadableStreamDefaultReader<Uint8Array> | undefined;
var readerRef;

var shouldUnref = false;

function ref() {
$debug("ref();", reader ? "already has reader" : "getting reader");
reader ??= native.getReader();
source.updateRef(true);
// TODO: remove this. likely we are dereferencing the stream
// when there is still more data to be read.
readerRef ??= setInterval(() => {}, 1 << 30);
shouldUnref = false;
}

function unref() {
$debug("unref();");

if (readerRef) {
clearInterval(readerRef);
readerRef = undefined;
$debug("cleared timeout");
}
if (reader) {
try {
reader.releaseLock();
Expand All @@ -104,7 +109,22 @@ export function getStdinStream(fd, isTTY: boolean, fdType: BunProcessStdinFdType

// Releasing the lock is not possible as there are active reads
// we will instead pretend we are unref'd, and release the lock once the reads are finished.
source?.updateRef?.(false);
shouldUnref = true;

// unref the native part of the stream
try {
$getByIdDirectPrivate(
$getByIdDirectPrivate(native, "readableStreamController"),
"underlyingByteSource",
).$resume(false);
} catch (e) {
if (IS_BUN_DEVELOPMENT) {
// we assume this isn't possible, but because we aren't sure
// we will ignore if error during release, but make a big deal in debug
console.error(e);
$assert(!"reachable");
}
}
}
}
}
Expand Down Expand Up @@ -166,11 +186,25 @@ export function getStdinStream(fd, isTTY: boolean, fdType: BunProcessStdinFdType
async function internalRead(stream) {
$debug("internalRead();");
try {
var done: boolean, value: Uint8Array[];
$assert(reader);
const { done, value } = await reader.read();
const pendingRead = reader.readMany();

if ($isPromise(pendingRead)) {
({ done, value } = await pendingRead);
} else {
$debug("readMany() did not return a promise");
({ done, value } = pendingRead);
}

if (value) {
stream.push(value);
if (!done) {
stream.push(value[0]);

// shouldn't actually happen, but just in case
const length = value.length;
for (let i = 1; i < length; i++) {
stream.push(value[i]);
}

if (shouldUnref) unref();
} else {
Expand Down Expand Up @@ -231,6 +265,7 @@ export function getStdinStream(fd, isTTY: boolean, fdType: BunProcessStdinFdType

return stream;
}

export function initializeNextTickQueue(process, nextTickQueue, drainMicrotasksFn, reportUncaughtExceptionFn) {
var queue;
var process;
Expand Down
46 changes: 17 additions & 29 deletions src/js/builtins/ReadableStreamDefaultReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,19 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau

const state = $getByIdDirectPrivate(stream, "state");
stream.$disturbed = true;
if (state === $streamErrored) {
if (state === $streamClosed) return { value: [], size: 0, done: true };
else if (state === $streamErrored) {
throw $getByIdDirectPrivate(stream, "storedError");
}

var controller = $getByIdDirectPrivate(stream, "readableStreamController");
if (controller) {
var queue = $getByIdDirectPrivate(controller, "queue");
}

if (!queue && state !== $streamClosed) {
var queue = $getByIdDirectPrivate(controller, "queue");
if (!queue) {
// This is a ReadableStream direct controller implemented in JS
// It hasn't been started yet.
return controller.$pull(controller).$then(function ({ done, value }) {
return done ? { done: true, value: value ? [value] : [], size: 0 } : { value: [value], size: 1, done: false };
return done ? { done: true, value: [], size: 0 } : { value: [value], size: 1, done: false };
});
} else if (!queue) {
return { done: true, value: [], size: 0 };
}

const content = queue.content;
Expand Down Expand Up @@ -102,31 +98,27 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
$putByValDirect(outValues, i, values[i].value);
}
}
$resetQueue($getByIdDirectPrivate(controller, "queue"));

if (state !== $streamClosed) {
if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
} else if ($isReadableByteStreamController(controller)) {
$readableByteStreamControllerCallPullIfNeeded(controller);
}
if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
} else if ($isReadableByteStreamController(controller)) {
$readableByteStreamControllerCallPullIfNeeded(controller);
}
$resetQueue($getByIdDirectPrivate(controller, "queue"));

return { value: outValues, size, done: false };
}

var onPullMany = result => {
const resultValue = result.value;

if (result.done) {
return { value: resultValue ? [resultValue] : [], size: 0, done: true };
return { value: [], size: 0, done: true };
}
var controller = $getByIdDirectPrivate(stream, "readableStreamController");

var queue = $getByIdDirectPrivate(controller, "queue");
var value = [resultValue].concat(queue.content.toArray(false));
var value = [result.value].concat(queue.content.toArray(false));
var length = value.length;

if ($isReadableByteStreamController(controller)) {
Expand All @@ -144,6 +136,8 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
}

var size = queue.size;
$resetQueue(queue);

if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
Expand All @@ -152,18 +146,12 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
$readableByteStreamControllerCallPullIfNeeded(controller);
}

$resetQueue($getByIdDirectPrivate(controller, "queue"));

return { value: value, size: size, done: false };
};

if (state === $streamClosed) {
return { value: [], size: 0, done: true };
}

var pullResult = controller.$pull(controller);
if (pullResult && $isPromise(pullResult)) {
return pullResult.then(onPullMany) as any;
return pullResult.$then(onPullMany) as any;
}

return onPullMany(pullResult);
Expand Down
Loading

0 comments on commit 2c5acd8

Please sign in to comment.