Skip to content

Commit

Permalink
[GOBBLIN-1957] GobblinOrcwriter improvements for large records (#3828)
Browse files Browse the repository at this point in the history
* WIP

* Optimization to limit batchsize based on large record sizes

* Address review
  • Loading branch information
Will-Lo committed Nov 21, 2023
1 parent 0801407 commit 7e0fe91
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
private int orcFileWriterRowsBetweenCheck;
private long orcStripeSize;
private int maxOrcBatchSize;
private int batchSizeRowCheckFactor;
private boolean enableLimitBufferSizeOrcStripe;

private int concurrentWriterTasks;
private long orcWriterStripeSizeBytes;
Expand All @@ -109,6 +111,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
this.typeDescription = getOrcSchema();
this.selfTuningWriter = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, false);
this.validateORCAfterClose = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE, false);
this.batchSizeRowCheckFactor = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR, GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR);
this.maxOrcBatchSize = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
this.batchSize = this.selfTuningWriter ?
Expand All @@ -133,6 +136,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
GobblinOrcWriterConfigs.DEFAULT_MIN_ORC_WRITER_ROWCHECK);
this.orcFileWriterMaxRowsBetweenCheck = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_MAX_ROWCHECK,
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_ROWCHECK);
this.enableLimitBufferSizeOrcStripe = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE, false);
// Create file-writer
this.writerConfig = new Configuration();
// Populate job Configurations into Conf as well so that configurations related to ORC writer can be tuned easily.
Expand Down Expand Up @@ -312,11 +316,18 @@ void tuneBatchSize(long averageSizePerRecord) throws IOException {
this.currentOrcWriterMaxUnderlyingMemory = Math.max(this.currentOrcWriterMaxUnderlyingMemory, orcFileWriter.estimateMemory());
}
long maxMemoryInFileWriter = Math.max(currentOrcWriterMaxUnderlyingMemory, prevOrcWriterMaxUnderlyingMemory);

int newBatchSize = (int) ((this.availableMemory*1.0 / currentPartitionedWriters * this.rowBatchMemoryUsageFactor - maxMemoryInFileWriter
- this.estimatedBytesAllocatedConverterMemory) / averageSizePerRecord);

if (this.enableLimitBufferSizeOrcStripe) {
// For large records, prevent the batch size from greatly exceeding the size of a stripe as the native ORC Writer will flush its buffer after a stripe is filled
int maxRecordsPerStripeSize = (int) Math.round(this.orcWriterStripeSizeBytes * 1.0 / averageSizePerRecord);
this.orcFileWriterMaxRowsBetweenCheck = Math.min(this.orcFileWriterMaxRowsBetweenCheck, maxRecordsPerStripeSize);
this.maxOrcBatchSize = Math.min(this.maxOrcBatchSize, maxRecordsPerStripeSize);
}
// Handle scenarios where new batch size can be 0 or less due to overestimating memory used by other components
newBatchSize = Math.min(Math.max(1, newBatchSize), this.maxOrcBatchSize);

if (Math.abs(newBatchSize - this.batchSize) > GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_TUNE_BATCHSIZE_SENSITIVITY * this.batchSize) {
// Add a factor when tuning up the batch size to prevent large sudden increases in memory usage
if (newBatchSize > this.batchSize) {
Expand All @@ -337,7 +348,7 @@ void tuneBatchSize(long averageSizePerRecord) throws IOException {
void initializeOrcFileWriter() {
try {
this.orcFileWriterRowsBetweenCheck = Math.max(
Math.min(this.batchSize * GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR, this.orcFileWriterMaxRowsBetweenCheck),
Math.min(this.batchSize * this.batchSizeRowCheckFactor, this.orcFileWriterMaxRowsBetweenCheck),
this.orcFileWriterMinRowsBetweenCheck
);
this.writerConfig.set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), String.valueOf(this.orcFileWriterRowsBetweenCheck));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public class GobblinOrcWriterConfigs {
* Max buffer size of the Gobblin ORC Writer that it can be tuned to
*/
public static final String ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE = ORC_WRITER_PREFIX + "auto.selfTune.max.batch.size";
/**
* The ratio of native ORC Writer buffer size to Gobblin ORC Writer buffer size
*/
public static final String ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR = "auto.selfTune.rowCheck.factor";

/**
* How often should the Gobblin ORC Writer check for tuning
*/
Expand All @@ -60,6 +65,12 @@ public class GobblinOrcWriterConfigs {
*/
public static final String ORC_WRITER_MAX_ROWCHECK = ORC_WRITER_PREFIX + "max.rows.between.memory.checks";

/**
* Enable a maximum buffer size of both the native ORC writer and the Gobblin ORC writer by the size of a stripe divided by the estimated
* size of each record. This is to capture the case when records are extremely large and cause large buffer sizes to dominate the memory usage
*/
public static final String ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE = ORC_WRITER_PREFIX + "auto.selfTune.max.buffer.orc.stripe";

public static final String ORC_WRITER_INSTRUMENTED = ORC_WRITER_PREFIX + "instrumented";

public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
Expand All @@ -70,10 +81,9 @@ public class GobblinOrcWriterConfigs {
*/
public static final int DEFAULT_CONCURRENT_WRITERS = 3;
public static final double DEFAULT_ORC_WRITER_BATCHSIZE_MEMORY_USAGE_FACTOR = 0.3;
/**
* The ratio of native ORC Writer buffer size to Gobblin ORC Writer buffer size
*/

public static final int DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR = 5;

public static final int DEFAULT_MAX_ORC_WRITER_BATCH_SIZE = DEFAULT_ORC_WRITER_BATCH_SIZE;
public static final int DEFAULT_ORC_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK = 500;
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,21 +268,23 @@ public void testSelfTuneRowBatchCalculation() throws Exception {
// Force a larger initial batchSize that can be tuned down
orcWriter.batchSize = 10;
orcWriter.rowBatch.ensureSize(10);
// Given that the available memory is very high, the resulting batchsize should be maxed out
orcWriter.availableMemory = 100000000;
// Given the amount of available memory and a low stripe size, and estimated rowBatchSize, the resulting batchsize should be maxed out
// Consider that the batch size incrementally increases based on the difference between target and current batchsize (10)
orcWriter.tuneBatchSize(10);
System.out.println(orcWriter.batchSize);
// Take into account that increases in batchsize are multiplied by a factor to prevent large jumps in batchsize
Assert.assertTrue(orcWriter.batchSize == (GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCH_SIZE+10)/2);
Assert.assertEquals(orcWriter.batchSize, 505);
orcWriter.tuneBatchSize(10);
Assert.assertEquals(orcWriter.batchSize, 752);

orcWriter.availableMemory = 100;
orcWriter.tuneBatchSize(10);
// Given that the amount of available memory is low, the resulting batchsize should be 1
Assert.assertTrue(orcWriter.batchSize == 1);
Assert.assertEquals(orcWriter.batchSize,1);
orcWriter.availableMemory = 10000;
orcWriter.rowBatch.ensureSize(10000);
// Since the rowBatch is large, the resulting batchsize should still be 1 even with more memory
orcWriter.tuneBatchSize(10);
Assert.assertTrue(orcWriter.batchSize == 1);
Assert.assertEquals(orcWriter.batchSize, 1);
}

@Test
Expand Down Expand Up @@ -322,4 +324,42 @@ public void testStatePersistenceWhenClosingWriter() throws IOException {
Assert.assertNotNull(dummyState.getProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_NATIVE_WRITER_MEMORY));
Assert.assertNotNull(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute());
}
}

@Test
public void testSelfTuneRowBatchCalculationWithStripeMax() throws Exception {
Schema schema =
new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc"));
List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), schema, "orc_writer_test/data_multi.json");

// Mock WriterBuilder, bunch of mocking behaviors to work-around precondition checks in writer builder
FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
(FsDataWriterBuilder<Schema, GenericRecord>) Mockito.mock(FsDataWriterBuilder.class);
when(mockBuilder.getSchema()).thenReturn(schema);

State dummyState = new WorkUnit();
String stagingDir = Files.createTempDir().getAbsolutePath();
String outputDir = Files.createTempDir().getAbsolutePath();
dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH, "selfTune");
dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, "true");
dummyState.setProp(OrcConf.STRIPE_SIZE.getAttribute(), "100");
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE, "true");
when(mockBuilder.getFileName(dummyState)).thenReturn("file");

// Having a closer to manage the life-cycle of the writer object.
Closer closer = Closer.create();
GobblinOrcWriter orcWriter = closer.register(new GobblinOrcWriter(mockBuilder, dummyState));
// Force a larger initial batchSize that can be tuned down
orcWriter.batchSize = 10;
orcWriter.rowBatch.ensureSize(10);
orcWriter.availableMemory = 100000000;
// Since the stripe size is 100, the resulting batchsize should be 10 (100/10)
orcWriter.tuneBatchSize(10);
Assert.assertEquals(orcWriter.batchSize,10);

// Increasing the estimated record size should decrease the max batch size
orcWriter.tuneBatchSize(100);
Assert.assertEquals(orcWriter.batchSize,1);
}
}

0 comments on commit 7e0fe91

Please sign in to comment.