Skip to content

Commit

Permalink
Revert "Rewrite the internal Web Stream native bindings to use less m…
Browse files Browse the repository at this point in the history
…emory (#16349)"

This reverts commit 75a95aa.

Update ProcessObjectInternals.ts

Update ProcessObjectInternals.ts
  • Loading branch information
Jarred-Sumner committed Jan 27, 2025
1 parent 0d73927 commit 4366af2
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 337 deletions.
39 changes: 35 additions & 4 deletions src/js/builtins/ProcessObjectInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,25 @@ export function getStdinStream(fd, isTTY: boolean, fdType: BunProcessStdinFdType

// Releasing the lock is not possible as there are active reads
// we will instead pretend we are unref'd, and release the lock once the reads are finished.
source?.updateRef?.(false);
shouldUnref = true;

// unref the native part of the stream
try {
$getByIdDirectPrivate(
$getByIdDirectPrivate(native, "readableStreamController"),
"underlyingByteSource",
).$resume(false);
} catch (e) {
if (IS_BUN_DEVELOPMENT) {
// we assume this isn't possible, but because we aren't sure
// we will ignore if error during release, but make a big deal in debug
console.error(e);
$assert(!"reachable");
}
}
}
} else if (source) {
source.updateRef(false);
}
}

Expand Down Expand Up @@ -166,11 +183,25 @@ export function getStdinStream(fd, isTTY: boolean, fdType: BunProcessStdinFdType
async function internalRead(stream) {
$debug("internalRead();");
try {
var done: boolean, value: Uint8Array[];
$assert(reader);
const { done, value } = await reader.read();
const pendingRead = reader.readMany();

if (value) {
stream.push(value);
if ($isPromise(pendingRead)) {
({ done, value } = await pendingRead);
} else {
$debug("readMany() did not return a promise");
({ done, value } = pendingRead);
}

if (!done) {
stream.push(value[0]);

// shouldn't actually happen, but just in case
const length = value.length;
for (let i = 1; i < length; i++) {
stream.push(value[i]);
}

if (shouldUnref) unref();
} else {
Expand Down
46 changes: 17 additions & 29 deletions src/js/builtins/ReadableStreamDefaultReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,19 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau

const state = $getByIdDirectPrivate(stream, "state");
stream.$disturbed = true;
if (state === $streamErrored) {
if (state === $streamClosed) return { value: [], size: 0, done: true };
else if (state === $streamErrored) {
throw $getByIdDirectPrivate(stream, "storedError");
}

var controller = $getByIdDirectPrivate(stream, "readableStreamController");
if (controller) {
var queue = $getByIdDirectPrivate(controller, "queue");
}

if (!queue && state !== $streamClosed) {
var queue = $getByIdDirectPrivate(controller, "queue");
if (!queue) {
// This is a ReadableStream direct controller implemented in JS
// It hasn't been started yet.
return controller.$pull(controller).$then(function ({ done, value }) {
return done ? { done: true, value: value ? [value] : [], size: 0 } : { value: [value], size: 1, done: false };
return done ? { done: true, value: [], size: 0 } : { value: [value], size: 1, done: false };
});
} else if (!queue) {
return { done: true, value: [], size: 0 };
}

const content = queue.content;
Expand Down Expand Up @@ -102,31 +98,27 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
$putByValDirect(outValues, i, values[i].value);
}
}
$resetQueue($getByIdDirectPrivate(controller, "queue"));

if (state !== $streamClosed) {
if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
} else if ($isReadableByteStreamController(controller)) {
$readableByteStreamControllerCallPullIfNeeded(controller);
}
if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
} else if ($isReadableByteStreamController(controller)) {
$readableByteStreamControllerCallPullIfNeeded(controller);
}
$resetQueue($getByIdDirectPrivate(controller, "queue"));

return { value: outValues, size, done: false };
}

var onPullMany = result => {
const resultValue = result.value;

if (result.done) {
return { value: resultValue ? [resultValue] : [], size: 0, done: true };
return { value: [], size: 0, done: true };
}
var controller = $getByIdDirectPrivate(stream, "readableStreamController");

var queue = $getByIdDirectPrivate(controller, "queue");
var value = [resultValue].concat(queue.content.toArray(false));
var value = [result.value].concat(queue.content.toArray(false));
var length = value.length;

if ($isReadableByteStreamController(controller)) {
Expand All @@ -144,6 +136,8 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
}

var size = queue.size;
$resetQueue(queue);

if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
Expand All @@ -152,18 +146,12 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
$readableByteStreamControllerCallPullIfNeeded(controller);
}

$resetQueue($getByIdDirectPrivate(controller, "queue"));

return { value: value, size: size, done: false };
};

if (state === $streamClosed) {
return { value: [], size: 0, done: true };
}

var pullResult = controller.$pull(controller);
if (pullResult && $isPromise(pullResult)) {
return pullResult.then(onPullMany) as any;
return pullResult.$then(onPullMany) as any;
}

return onPullMany(pullResult);
Expand Down
Loading

0 comments on commit 4366af2

Please sign in to comment.