Skip to content

Commit

Permalink
Remove support for Batch Compression without Batch Grouping (#23017)
Browse files Browse the repository at this point in the history
## Description
We can and should get rid of the mode where we send ops with empty
content for Compression.

Configuration-wises, at this point both compression & op grouping are ON
in main, and OFF in LTS. So, we can group them together into one
modality and stop mentioning (and dealing with) a modality where
compression is on, but op grouping is off.

Acceptance Criteria

The code that compresses batches of 2 or more messages by creating empty
message placeholders should be removed. The code to reconstitute such
batches must remain, of course.

Execution Plan

There are two exceptions we need to deal with first as part of this:
* Stop compressing the blobAttach batch, since Grouping is disabled (See
discussion in ADO for justification of this change)
* Always group a batch of 2 or more messages - Remove the ability to
configure this

Then it's just a matter of updating OpCompressor to require
single-message batches and remove the empty placeholder stuff.

## Reviewer Guidance

Let me know if there's a better way to accomplish the task. Also review
the tests to make sure that the flow the tests are trying to recreate is
still being followed.

Fixes:
[AB#28649](https://dev.azure.com/fluidframework/235294da-091d-4c29-84fc-cdfc3d90890b/_workitems/edit/28649)
  • Loading branch information
MarioJGMsoft authored Jan 29, 2025
1 parent a4fc086 commit f55ab97
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 151 deletions.
2 changes: 0 additions & 2 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1703,8 +1703,6 @@ export class ContainerRuntime
const opGroupingManager = new OpGroupingManager(
{
groupedBatchingEnabled: this.groupedBatchingEnabled,
opCountThreshold:
this.mc.config.getNumber("Fluid.ContainerRuntime.GroupedBatchingOpCount") ?? 2,
},
this.mc.logger,
);
Expand Down
33 changes: 13 additions & 20 deletions packages/runtime/container-runtime/src/opLifecycle/opCompressor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ export class OpCompressor {
* Combines the contents of the batch into a single JSON string and compresses it, putting
* the resulting string as the first message of the batch. The rest of the messages are
* empty placeholders to reserve sequence numbers.
* This should only take a single message batch and compress it.
* @param batch - The batch to compress
* @returns A batch of the same length as the input batch, containing a single compressed message followed by empty placeholders
*/
public compressBatch(batch: IBatch): IBatch {
public compressBatch(batch: IBatch): IBatch<[BatchMessage]> {
assert(
batch.contentSizeInBytes > 0 && batch.messages.length > 0,
0x5a4 /* Batch should not be empty */,
batch.contentSizeInBytes > 0 && batch.messages.length === 1,
0x5a4 /* Batch should not be empty and should contain a single message */,
);

const compressionStart = Date.now();
Expand All @@ -49,24 +50,16 @@ export class OpCompressor {
const compressedContent = IsoBuffer.from(compressedContents).toString("base64");
const duration = Date.now() - compressionStart;

const messages: BatchMessage[] = [];
messages.push({
...batch.messages[0],
contents: JSON.stringify({ packedContents: compressedContent }),
metadata: batch.messages[0].metadata,
compression: CompressionAlgorithms.lz4,
});
const messages: [BatchMessage] = [
{
...batch.messages[0],
contents: JSON.stringify({ packedContents: compressedContent }),
metadata: batch.messages[0].metadata,
compression: CompressionAlgorithms.lz4,
},
];

// Add empty placeholder messages to reserve the sequence numbers
for (const message of batch.messages.slice(1)) {
messages.push({
localOpMetadata: message.localOpMetadata,
metadata: message.metadata,
referenceSequenceNumber: message.referenceSequenceNumber,
});
}

const compressedBatch: IBatch = {
const compressedBatch = {
contentSizeInBytes: compressedContent.length,
messages,
referenceSequenceNumber: batch.referenceSequenceNumber,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ interface IPackedContentsContents {
* 2. Messages in the middle of a compressed batch will have neither batch metadata nor the compression property set
* 3. The final message of a batch will have batch metadata set to false
* 4. An individually compressed op will have undefined batch metadata and compression set to true
*
* Compressed batches from current code are always a single message but this class needs to handle a legacy compressed batch with multiple messages
* because we need that functionality for back compat.
*/
export class OpDecompressor {
private activeBatch = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ export function isGroupedBatch(op: ISequencedDocumentMessage): boolean {

export interface OpGroupingManagerConfig {
readonly groupedBatchingEnabled: boolean;
readonly opCountThreshold: number;
}

export class OpGroupingManager {
Expand Down Expand Up @@ -102,7 +101,6 @@ export class OpGroupingManager {
this.logger.sendTelemetryEvent({
eventName: "GroupLargeBatch",
length: batch.messages.length,
threshold: this.config.opCountThreshold,
reentrant: batch.hasReentrantOps,
referenceSequenceNumber: batch.messages[0].referenceSequenceNumber,
});
Expand Down Expand Up @@ -159,10 +157,13 @@ export class OpGroupingManager {
return (
// Grouped batching must be enabled
this.config.groupedBatchingEnabled &&
// The number of ops in the batch must surpass the configured threshold
// The number of ops in the batch must be 2 or more
// or be empty (to allow for empty batches to be grouped)
(batch.messages.length === 0 || batch.messages.length >= this.config.opCountThreshold)
batch.messages.length !== 1
// Support for reentrant batches will be on by default
);
}
public groupedBatchingEnabled(): boolean {
return this.config.groupedBatchingEnabled;
}
}
23 changes: 13 additions & 10 deletions packages/runtime/container-runtime/src/opLifecycle/opSplitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,28 @@ export class OpSplitter {
* Splits the first op of a compressed batch in chunks, sends the chunks separately and
* returns a new batch composed of the last chunk and the rest of the ops in the original batch.
*
* A compressed batch is formed by one large op at the first position, followed by a series of placeholder ops
* which are used in order to reserve the sequence numbers for when the first op gets unrolled into the original
* uncompressed ops at ingestion in the runtime.
* A compressed batch is formed by one large op at the first position.
*
* If the first op is too large, it can be chunked (split into smaller op) which can be sent individually over the wire
* If the op is too large, it can be chunked (split into smaller op) which can be sent individually over the wire
* and accumulate at ingestion, until the last op in the chunk is processed, when the original op is unrolled.
*
* This method will send the first N - 1 chunks separately and use the last chunk as the first message in the result batch
* and then appends the original placeholder ops. This will ensure that the batch semantics of the original (non-compressed) batch
* are preserved, as the original chunked op will be unrolled by the runtime when the first message in the batch is processed
* (as it is the last chunk).
* This method will send the first N - 1 chunks separately and use the last chunk as the first message in the result batch.
* This will ensure that the batch semantics of the original (non-compressed) batch are preserved, as the original chunked op
* will be unrolled by the runtime when the first message in the batch is processed (as it is the last chunk).
*
* To illustrate, if the input is `[largeOp, emptyOp, emptyOp]`, `largeOp` will be split into `[chunk1, chunk2, chunk3, chunk4]`.
* To handle legacy compressed batches with empty placeholders this method can attach the empty placeholder ops at the end
* of the result batch, ensuring that the batch semantics are preserved.
*
* To illustrate the current functionality, if the input is `[largeOp]`, `largeOp` will be split into `[chunk1, chunk2, chunk3, chunk4]`.
* `chunk1`, `chunk2` and `chunk3` will be sent individually and `[chunk4]` will be returned.
*
* With the legacy code, if the input is `[largeOp, emptyOp, emptyOp]`, `largeOp` will be split into `[chunk1, chunk2, chunk3, chunk4]`.
* `chunk1`, `chunk2` and `chunk3` will be sent individually and `[chunk4, emptyOp, emptyOp]` will be returned.
*
* @remarks - A side effect here is that 1 or more chunks are queued immediately for sending in next JS turn.
*
* @param batch - the compressed batch which needs to be processed
* @returns A new adjusted batch (last chunk + empty placeholders) which can be sent over the wire
* @returns A batch with the last chunk of the original message
*/
public splitFirstBatchMessage(batch: IBatch): IBatch {
assert(this.isBatchChunkingEnabled, 0x513 /* Chunking needs to be enabled */);
Expand Down
29 changes: 10 additions & 19 deletions packages/runtime/container-runtime/src/opLifecycle/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,6 @@ export class Outbox {
this.maybeFlushPartialBatch();

this.addMessageToBatchManager(this.blobAttachBatch, message);

// If compression is enabled, we will always successfully receive
// blobAttach ops and compress then send them at the next JS turn, regardless
// of the overall size of the accumulated ops in the batch.
// However, it is more efficient to flush these ops faster, preferably
// after they reach a size which would benefit from compression.
if (
this.blobAttachBatch.contentSizeInBytes >=
this.params.config.compressionOptions.minimumBatchSizeInBytes
) {
this.flushInternal(this.blobAttachBatch);
}
}

public submitIdAllocation(message: BatchMessage): void {
Expand Down Expand Up @@ -359,9 +347,11 @@ export class Outbox {
// If so, do nothing, as pending state manager will resubmit it correctly on reconnect.
// Because flush() is a task that executes async (on clean stack), we can get here in disconnected state.
if (this.params.shouldSend()) {
const processedBatch = this.compressBatch(
shouldGroup ? this.params.groupingManager.groupBatch(rawBatch) : rawBatch,
);
const processedBatch = disableGroupedBatching
? rawBatch
: this.compressAndChunkBatch(
shouldGroup ? this.params.groupingManager.groupBatch(rawBatch) : rawBatch,
);
clientSequenceNumber = this.sendBatch(processedBatch);
assert(
clientSequenceNumber === undefined || clientSequenceNumber >= 0,
Expand Down Expand Up @@ -422,16 +412,17 @@ export class Outbox {
* @remarks - If chunking happens, a side effect here is that 1 or more chunks are queued immediately for sending in next JS turn.
*
* @param batch - Raw or Grouped batch to consider for compression/chunking
* @returns Either (A) the original batch, (B) a compressed batch (same length as original),
* or (C) a batch containing the last chunk (plus empty placeholders from compression if applicable).
* @returns Either (A) the original batch, (B) a compressed batch (same length as original)
* or (C) a batch containing the last chunk.
*/
private compressBatch(batch: IBatch): IBatch {
private compressAndChunkBatch(batch: IBatch): IBatch {
if (
batch.messages.length === 0 ||
this.params.config.compressionOptions === undefined ||
this.params.config.compressionOptions.minimumBatchSizeInBytes >
batch.contentSizeInBytes ||
this.params.submitBatchFn === undefined
this.params.submitBatchFn === undefined ||
!this.params.groupingManager.groupedBatchingEnabled()
) {
// Nothing to do if the batch is empty or if compression is disabled or not supported, or if we don't need to compress
return batch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ describe("OpGroupingManager", () => {
new OpGroupingManager(
{
groupedBatchingEnabled: option.enabled,
opCountThreshold: option.tooSmall === true ? 10 : 2,
},
mockLogger,
).shouldGroup(createBatch(5, option.reentrant)),
).shouldGroup(
option.tooSmall
? createBatch(1, option.reentrant)
: createBatch(5, option.reentrant),
),
option.expectedResult,
);
});
Expand All @@ -86,7 +89,6 @@ describe("OpGroupingManager", () => {
new OpGroupingManager(
{
groupedBatchingEnabled: false,
opCountThreshold: 2,
},
mockLogger,
).groupBatch(createBatch(5));
Expand All @@ -97,7 +99,6 @@ describe("OpGroupingManager", () => {
const result = new OpGroupingManager(
{
groupedBatchingEnabled: true,
opCountThreshold: 2,
},
mockLogger,
).groupBatch(createBatch(5));
Expand All @@ -117,7 +118,6 @@ describe("OpGroupingManager", () => {
const result = new OpGroupingManager(
{
groupedBatchingEnabled: true,
opCountThreshold: 2,
},
mockLogger,
).groupBatch(createBatch(5, false, false, batchId));
Expand All @@ -130,7 +130,6 @@ describe("OpGroupingManager", () => {
new OpGroupingManager(
{
groupedBatchingEnabled: false,
opCountThreshold: 2,
},
mockLogger,
).createEmptyGroupedBatch("resubmittingBatchId", 0);
Expand All @@ -142,7 +141,6 @@ describe("OpGroupingManager", () => {
const result = new OpGroupingManager(
{
groupedBatchingEnabled: true,
opCountThreshold: 2,
},
mockLogger,
).createEmptyGroupedBatch(batchId, 0);
Expand All @@ -160,7 +158,6 @@ describe("OpGroupingManager", () => {
const result = new OpGroupingManager(
{
groupedBatchingEnabled: true,
opCountThreshold: 2,
},
mockLogger,
).shouldGroup({
Expand All @@ -177,10 +174,9 @@ describe("OpGroupingManager", () => {
new OpGroupingManager(
{
groupedBatchingEnabled: true,
opCountThreshold: 10,
},
mockLogger,
).groupBatch(createBatch(5));
).groupBatch(createBatch(1));
});
});

Expand All @@ -189,7 +185,6 @@ describe("OpGroupingManager", () => {
new OpGroupingManager(
{
groupedBatchingEnabled: true,
opCountThreshold: 2,
},
mockLogger,
).groupBatch(createBatch(5, false, true));
Expand All @@ -201,7 +196,6 @@ describe("OpGroupingManager", () => {
new OpGroupingManager(
{
groupedBatchingEnabled: true,
opCountThreshold: 2,
},
mockLogger,
).groupBatch(createBatch(5, false, true, "batchId"));
Expand All @@ -214,7 +208,6 @@ describe("OpGroupingManager", () => {
const opGroupingManager = new OpGroupingManager(
{
groupedBatchingEnabled: true,
opCountThreshold: 2,
},
mockLogger,
);
Expand Down Expand Up @@ -266,7 +259,6 @@ describe("OpGroupingManager", () => {
const opGroupingManager = new OpGroupingManager(
{
groupedBatchingEnabled: true,
opCountThreshold: 2,
},
mockLogger,
);
Expand All @@ -284,7 +276,6 @@ describe("OpGroupingManager", () => {
const opGroupingManager = new OpGroupingManager(
{
groupedBatchingEnabled: false,
opCountThreshold: 2,
},
mockLogger,
);
Expand Down Expand Up @@ -328,7 +319,6 @@ describe("OpGroupingManager", () => {
const opGroupingManager = new OpGroupingManager(
{
groupedBatchingEnabled: false,
opCountThreshold: 2,
},
mockLogger,
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/

import { IsoBuffer } from "@fluid-internal/client-utils";
import { compress } from "lz4js";

import { CompressionAlgorithms } from "../../index.js";
import type { BatchMessage, IBatch } from "../../opLifecycle/index.js";

/**
* The code in this file recreates the legacy flow for batches that have multiple messages.
* It should not be removed since we still need to be able to read batches that were compressed with the old flow.
* */

/**
* Combine the batch's content strings into a single JSON string (a serialized array)
*/
function serializeBatchContents(batch: IBatch): string {
// Yields a valid JSON array, since each message.contents is already serialized to JSON
return `[${batch.messages.map(({ contents }) => contents).join(",")}]`;
}

/**
* This is a helper function that replicates the now deprecated process for compressing a batch that creates empty placeholder messages.
* It was added since the new process cannot compress a batch with multiple messages, it now only compresses individual messages (which can be a regular message or a grouped one).
* But we need to ensure the current code still supports READING the old op format (where an old client compressed a multi-message batch)
* @internal
* @param batch - batch with messages that are going to be compressed
* @returns compresed batch with empty placeholder messages
*/
export function compressMultipleMessageBatch(batch: IBatch): IBatch {
const contentsAsBuffer = new TextEncoder().encode(serializeBatchContents(batch));
const compressedContents = compress(contentsAsBuffer);
const compressedContent = IsoBuffer.from(compressedContents).toString("base64");

const messages: BatchMessage[] = [];
messages.push({
...batch.messages[0],
contents: JSON.stringify({ packedContents: compressedContent }),
metadata: batch.messages[0].metadata,
compression: CompressionAlgorithms.lz4,
});

// Add empty placeholder messages to reserve the sequence numbers
for (const message of batch.messages.slice(1)) {
messages.push({
localOpMetadata: message.localOpMetadata,
metadata: message.metadata,
referenceSequenceNumber: message.referenceSequenceNumber,
});
}

const compressedBatch: IBatch = {
contentSizeInBytes: compressedContent.length,
messages,
referenceSequenceNumber: batch.referenceSequenceNumber,
};
return compressedBatch;
}
Loading

0 comments on commit f55ab97

Please sign in to comment.