Skip to content

Commit

Permalink
Reduce RSS of reading files
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarred-Sumner committed Jan 26, 2025
1 parent 5519877 commit 186d5a6
Show file tree
Hide file tree
Showing 15 changed files with 936 additions and 751 deletions.
5 changes: 5 additions & 0 deletions src/bun.js/api/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3056,6 +3056,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
fn endRequestStreaming(this: *RequestContext) bool {
assert(this.server != null);

this.request_body_buf.clearAndFree(bun.default_allocator);

// if we cannot, we have to reject pending promises
// first, we reject the request body promise
if (this.request_body) |body| {
Expand All @@ -3069,6 +3072,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
return false;
}
fn detachResponse(this: *RequestContext) void {
this.request_body_buf.clearAndFree(bun.default_allocator);

if (this.resp) |resp| {
this.resp = null;

Expand Down
7 changes: 3 additions & 4 deletions src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4233,7 +4233,7 @@ pub const FileReader = struct {

pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState) bool {
var buf = init_buf;
log("onReadChunk() = {d} ({s})", .{ buf.len, @tagName(state) });
log("onReadChunk() = {d} ({s}) - read_inside_on_pull: {s}", .{ buf.len, @tagName(state), @tagName(this.read_inside_on_pull) });

if (this.done) {
this.reader.close();
Expand Down Expand Up @@ -4286,10 +4286,9 @@ pub const FileReader = struct {
if (this.buffered.capacity > 0) {
this.buffered.clearAndFree(bun.default_allocator);
}
var taken = this.reader.takeBuffer();

if (this.reader.buffer().items.len != 0) {
this.buffered = this.reader.buffer().moveToUnmanaged();
}
this.buffered = taken.moveToUnmanaged();
}

var buffer = &this.buffered;
Expand Down
51 changes: 8 additions & 43 deletions src/js/builtins/ProcessObjectInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,28 +70,23 @@ export function getStdioWriteStream(fd) {

export function getStdinStream(fd) {
const native = Bun.stdin.stream();
// @ts-expect-error
const source = native.$bunNativePtr;

var reader: ReadableStreamDefaultReader<Uint8Array> | undefined;
var readerRef;

var shouldUnref = false;

function ref() {
$debug("ref();", reader ? "already has reader" : "getting reader");
reader ??= native.getReader();
// TODO: remove this. likely we are dereferencing the stream
// when there is still more data to be read.
readerRef ??= setInterval(() => {}, 1 << 30);
source.updateRef(true);
shouldUnref = false;
}

function unref() {
$debug("unref();");
if (readerRef) {
clearInterval(readerRef);
readerRef = undefined;
$debug("cleared timeout");
}

if (reader) {
try {
reader.releaseLock();
Expand All @@ -103,22 +98,7 @@ export function getStdinStream(fd) {

// Releasing the lock is not possible as there are active reads
// we will instead pretend we are unref'd, and release the lock once the reads are finished.
shouldUnref = true;

// unref the native part of the stream
try {
$getByIdDirectPrivate(
$getByIdDirectPrivate(native, "readableStreamController"),
"underlyingByteSource",
).$resume(false);
} catch (e) {
if (IS_BUN_DEVELOPMENT) {
// we assume this isn't possible, but because we aren't sure
// we will ignore if error during release, but make a big deal in debug
console.error(e);
$assert(!"reachable");
}
}
source?.updateRef?.(false);
}
}
}
Expand Down Expand Up @@ -167,25 +147,11 @@ export function getStdinStream(fd) {
async function internalRead(stream) {
$debug("internalRead();");
try {
var done: boolean, value: Uint8Array[];
$assert(reader);
const pendingRead = reader.readMany();

if ($isPromise(pendingRead)) {
({ done, value } = await pendingRead);
} else {
$debug("readMany() did not return a promise");
({ done, value } = pendingRead);
}
const { done, value } = await reader.read();

if (!done) {
stream.push(value[0]);

// shouldn't actually happen, but just in case
const length = value.length;
for (let i = 1; i < length; i++) {
stream.push(value[i]);
}
if (value) {
stream.push(value);

if (shouldUnref) unref();
} else {
Expand Down Expand Up @@ -246,7 +212,6 @@ export function getStdinStream(fd) {

return stream;
}

export function initializeNextTickQueue(process, nextTickQueue, drainMicrotasksFn, reportUncaughtExceptionFn) {
var queue;
var process;
Expand Down
46 changes: 29 additions & 17 deletions src/js/builtins/ReadableStreamDefaultReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,23 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau

const state = $getByIdDirectPrivate(stream, "state");
stream.$disturbed = true;
if (state === $streamClosed) return { value: [], size: 0, done: true };
else if (state === $streamErrored) {
if (state === $streamErrored) {
throw $getByIdDirectPrivate(stream, "storedError");
}

var controller = $getByIdDirectPrivate(stream, "readableStreamController");
var queue = $getByIdDirectPrivate(controller, "queue");
if (!queue) {
if (controller) {
var queue = $getByIdDirectPrivate(controller, "queue");
}

if (!queue && state !== $streamClosed) {
// This is a ReadableStream direct controller implemented in JS
// It hasn't been started yet.
return controller.$pull(controller).$then(function ({ done, value }) {
return done ? { done: true, value: [], size: 0 } : { value: [value], size: 1, done: false };
return done ? { done: true, value: value ? [value] : [], size: 0 } : { value: [value], size: 1, done: false };
});
} else if (!queue) {
return { done: true, value: [], size: 0 };
}

const content = queue.content;
Expand Down Expand Up @@ -98,27 +102,31 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
$putByValDirect(outValues, i, values[i].value);
}
}
$resetQueue($getByIdDirectPrivate(controller, "queue"));

if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
} else if ($isReadableByteStreamController(controller)) {
$readableByteStreamControllerCallPullIfNeeded(controller);
if (state !== $streamClosed) {
if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
} else if ($isReadableByteStreamController(controller)) {
$readableByteStreamControllerCallPullIfNeeded(controller);
}
}
$resetQueue($getByIdDirectPrivate(controller, "queue"));

return { value: outValues, size, done: false };
}

var onPullMany = result => {
const resultValue = result.value;

if (result.done) {
return { value: [], size: 0, done: true };
return { value: resultValue ? [resultValue] : [], size: 0, done: true };
}
var controller = $getByIdDirectPrivate(stream, "readableStreamController");

var queue = $getByIdDirectPrivate(controller, "queue");
var value = [result.value].concat(queue.content.toArray(false));
var value = [resultValue].concat(queue.content.toArray(false));
var length = value.length;

if ($isReadableByteStreamController(controller)) {
Expand All @@ -136,8 +144,6 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
}

var size = queue.size;
$resetQueue(queue);

if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
Expand All @@ -146,12 +152,18 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
$readableByteStreamControllerCallPullIfNeeded(controller);
}

$resetQueue($getByIdDirectPrivate(controller, "queue"));

return { value: value, size: size, done: false };
};

if (state === $streamClosed) {
return { value: [], size: 0, done: true };
}

var pullResult = controller.$pull(controller);
if (pullResult && $isPromise(pullResult)) {
return pullResult.$then(onPullMany) as any;
return pullResult.then(onPullMany) as any;
}

return onPullMany(pullResult);
Expand Down
Loading

0 comments on commit 186d5a6

Please sign in to comment.