Skip to content

Commit

Permalink
HDDS-11816. Ozone stream to support Hsync,Hflush. (#7592)
Browse files Browse the repository at this point in the history
Co-authored-by: ashishk <[email protected]>
  • Loading branch information
ashishkumar50 and ashishk authored Jan 9, 2025
1 parent 990b5bf commit 93dab91
Show file tree
Hide file tree
Showing 21 changed files with 327 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,22 @@ public void flush() throws IOException {
}
}

@Override
public void hflush() throws IOException {
hsync();
}

@Override
public void hsync() throws IOException {
try {
if (!isClosed()) {
handleFlush(false);
}
} catch (Exception e) {

}
}

public void waitFuturesComplete() throws IOException {
try {
CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.hdds.scm.storage;

import org.apache.hadoop.fs.Syncable;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -26,7 +28,7 @@
* This interface is similar to {@link java.io.OutputStream}
* except that this class support {@link ByteBuffer} instead of byte[].
*/
public interface ByteBufferStreamOutput extends Closeable {
public interface ByteBufferStreamOutput extends Closeable, Syncable {
/**
* Similar to {@link java.io.OutputStream#write(byte[])},
* except that the parameter of this method is a {@link ByteBuffer}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ public void flush() throws IOException {
}
}

@Override
public void hflush() throws IOException {
hsync();
}

@Override
public void hsync() throws IOException {
if (this.byteBufferStreamOutput != null) {
this.byteBufferStreamOutput.hsync();
}
}

@Override
public void close() throws IOException {
if (this.byteBufferStreamOutput != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.client.io;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class BlockDataStreamOutputEntryPool implements KeyMetadataAware {
private final long openID;
private final ExcludeList excludeList;
private List<StreamBuffer> bufferList;
private ContainerBlockID lastUpdatedBlockId = new ContainerBlockID(-1, -1);

@SuppressWarnings({"parameternumber", "squid:S00107"})
public BlockDataStreamOutputEntryPool(
Expand Down Expand Up @@ -152,6 +154,33 @@ public List<OmKeyLocationInfo> getLocationInfoList() {
return locationInfoList;
}

void hsyncKey(long offset) throws IOException {
if (keyArgs != null) {
// in test, this could be null
keyArgs.setDataSize(offset);
keyArgs.setLocationInfoList(getLocationInfoList());
// When the key is multipart upload part file upload, we should not
// commit the key, as this is not an actual key, this is a just a
// partial key of a large file.
if (keyArgs.getIsMultipartKey()) {
throw new IOException("Hsync is unsupported for multipart keys.");
} else {
if (keyArgs.getLocationInfoList().size() == 0) {
omClient.hsyncKey(keyArgs, openID);
} else {
ContainerBlockID lastBLockId = keyArgs.getLocationInfoList().get(keyArgs.getLocationInfoList().size() - 1)
.getBlockID().getContainerBlockID();
if (!lastUpdatedBlockId.equals(lastBLockId)) {
omClient.hsyncKey(keyArgs, openID);
lastUpdatedBlockId = lastBLockId;
}
}
}
} else {
LOG.warn("Closing KeyOutputStream, but key args is null");
}
}

/**
* Discards the subsequent pre allocated blocks and removes the streamEntries
* from the streamEntries list for the container which is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class KeyDataStreamOutput extends AbstractDataStreamOutput
* Defines stream action while calling handleFlushOrClose.
*/
enum StreamAction {
FLUSH, CLOSE, FULL
FLUSH, HSYNC, CLOSE, FULL
}

public static final Logger LOG =
Expand Down Expand Up @@ -234,6 +234,21 @@ private int writeToDataStreamOutput(BlockDataStreamOutputEntry current,
return writeLen;
}

@Override
public void hflush() throws IOException {
hsync();
}

@Override
public void hsync() throws IOException {
checkNotClosed();
final long hsyncPos = writeOffset;
handleFlushOrClose(KeyDataStreamOutput.StreamAction.HSYNC);
Preconditions.checkState(offset >= hsyncPos,
"offset = %s < hsyncPos = %s", offset, hsyncPos);
blockDataStreamOutputEntryPool.hsyncKey(hsyncPos);
}

/**
* It performs following actions :
* a. Updates the committed length at datanode for the current stream in
Expand Down Expand Up @@ -394,6 +409,9 @@ private void handleStreamAction(BlockDataStreamOutputEntry entry,
case FLUSH:
entry.flush();
break;
case HSYNC:
entry.hsync();
break;
default:
throw new IOException("Invalid Operation");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
package org.apache.hadoop.ozone.client.io;

import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* OzoneDataStreamOutput is used to write data into Ozone.
Expand All @@ -32,14 +35,52 @@ public class OzoneDataStreamOutput extends ByteBufferOutputStream
implements KeyMetadataAware {

private final ByteBufferStreamOutput byteBufferStreamOutput;
private boolean enableHsync;
private final Syncable syncable;

/**
* Constructs OzoneDataStreamOutput with KeyDataStreamOutput.
* Constructs an instance with a {@link Syncable} {@link OutputStream}.
*
* @param byteBufferStreamOutput the underlying ByteBufferStreamOutput
* @param outputStream an {@link OutputStream} which is {@link Syncable}.
* @param enableHsync if false, hsync() executes flush() instead.
*/
public OzoneDataStreamOutput(ByteBufferStreamOutput byteBufferStreamOutput) {
this.byteBufferStreamOutput = byteBufferStreamOutput;
public OzoneDataStreamOutput(Syncable outputStream, boolean enableHsync) {
this(Optional.of(Objects.requireNonNull(outputStream,
"outputStream == null"))
.filter(s -> s instanceof OzoneDataStreamOutput)
.map(s -> (OzoneDataStreamOutput)s)
.orElseThrow(() -> new IllegalArgumentException(
"The parameter syncable is not an OutputStream")),
outputStream, enableHsync);
}

/**
* Constructs an instance with a (non-{@link Syncable}) {@link ByteBufferStreamOutput}
* with an optional {@link Syncable} object.
*
* @param byteBufferStreamOutput for writing data.
* @param syncable an optional parameter
* for accessing the {@link Syncable} feature.
*/
public OzoneDataStreamOutput(ByteBufferStreamOutput byteBufferStreamOutput, Syncable syncable) {
this(byteBufferStreamOutput, syncable, false);
}

/**
* Constructs an instance with a (non-{@link Syncable}) {@link ByteBufferStreamOutput}
* with an optional {@link Syncable} object.
*
* @param byteBufferStreamOutput for writing data.
* @param syncable an optional parameter
* for accessing the {@link Syncable} feature.
* @param enableHsync if false, hsync() executes flush() instead.
*/
public OzoneDataStreamOutput(ByteBufferStreamOutput byteBufferStreamOutput, Syncable syncable,
boolean enableHsync) {
this.byteBufferStreamOutput = Objects.requireNonNull(byteBufferStreamOutput,
"byteBufferStreamOutput == null");
this.syncable = syncable != null ? syncable : byteBufferStreamOutput;
this.enableHsync = enableHsync;
}

@Override
Expand Down Expand Up @@ -93,6 +134,27 @@ public KeyDataStreamOutput getKeyDataStreamOutput() {
return null;
}

public void hflush() throws IOException {
hsync();
}

public void hsync() throws IOException {
// Disable the feature flag restores the prior behavior.
if (!enableHsync) {
byteBufferStreamOutput.flush();
return;
}
if (syncable != null) {
if (byteBufferStreamOutput != syncable) {
byteBufferStreamOutput.flush();
}
syncable.hsync();
} else {
throw new UnsupportedOperationException(byteBufferStreamOutput.getClass()
+ " is not " + Syncable.class.getSimpleName());
}
}

public ByteBufferStreamOutput getByteBufStreamOutput() {
return byteBufferStreamOutput;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ public synchronized void close() throws IOException {
outputStream.close();
}

@Override
public void hflush() throws IOException {
hsync();
}

@Override
public void hsync() throws IOException {
// Disable the feature flag restores the prior behavior.
if (!enableHsync) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1996,7 +1996,7 @@ public OzoneDataStreamOutput createMultipartStreamKey(
} else {
out = createMultipartOutputStream(openKey, uploadID, partNumber);
}
return new OzoneDataStreamOutput(out);
return new OzoneDataStreamOutput(out, out);
}

@Override
Expand Down Expand Up @@ -2417,7 +2417,7 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey)
} else {
out = createOutputStream(openKey);
}
return new OzoneDataStreamOutput(out);
return new OzoneDataStreamOutput(out, out);
}

private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.client.io;

import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.ratis.util.function.CheckedFunction;

Expand All @@ -37,7 +38,7 @@
* @param <OUT> The underlying {@link OutputStream} type.
*/
public class SelectorOutputStream<OUT extends OutputStream>
extends OutputStream implements Syncable {
extends OutputStream implements Syncable, StreamCapabilities {
/** A buffer backed by a byte[]. */
static final class ByteArrayBuffer {
private byte[] array;
Expand Down Expand Up @@ -182,6 +183,20 @@ public void hsync() throws IOException {
}
}

@Override
public boolean hasCapability(String capability) {
try {
final OUT out = select();
if (out instanceof StreamCapabilities) {
return ((StreamCapabilities) out).hasCapability(capability);
} else {
return false;
}
} catch (Exception e) {
return false;
}
}

@Override
public void close() throws IOException {
select().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
Expand Down Expand Up @@ -175,6 +177,7 @@ public class TestHSync {
private static final int WAL_HEADER_LEN = 83;

private static OpenKeyCleanupService openKeyCleanupService;
private static final int AUTO_THRESHOLD = 0;

@BeforeAll
public static void init() throws Exception {
Expand Down Expand Up @@ -1059,6 +1062,32 @@ public void testStreamCapability() throws Exception {
testEncryptedStreamCapabilities(false);
}

@Test
public void testOzoneStreamCapabilityForHsyncHflush() throws Exception {
final String rootPath = String.format("%s://%s/",
OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
CONF.set(OZONE_FS_DATASTREAM_AUTO_THRESHOLD, AUTO_THRESHOLD + "B");
CONF.setBoolean(OZONE_FS_DATASTREAM_ENABLED, true);

final String dir = OZONE_ROOT + bucket.getVolumeName()
+ OZONE_URI_DELIMITER + bucket.getName();
final Path file = new Path(dir, "file");

try (FileSystem fs = FileSystem.get(CONF);
FSDataOutputStream os = fs.create(file, true)) {
os.write(100);
// Verify output stream supports hsync() and hflush().
assertTrue(os.hasCapability(StreamCapabilities.HFLUSH),
"KeyOutputStream should support hflush()!");
assertTrue(os.hasCapability(StreamCapabilities.HSYNC),
"KeyOutputStream should support hsync()!");
os.hsync();
}

CONF.setBoolean(OZONE_FS_DATASTREAM_ENABLED, false);
}

@Test
public void testECStreamCapability() throws Exception {
// create EC bucket to be used by OzoneFileSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ static void createFile(FileSystem fs, Path path, boolean overwrite,
assertNotNull(underlying);
LOG.info("underlying after close: {}", underlying.getClass());
if (belowThreshold) {
assertInstanceOf(OzoneFSOutputStream.class, underlying);
assertInstanceOf(CapableOzoneFSOutputStream.class, underlying);
} else {
assertEquals(OzoneFSDataStreamOutput.class, underlying.getClass());
assertEquals(CapableOzoneFSDataStreamOutput.class, underlying.getClass());
}
}

Expand All @@ -184,7 +184,7 @@ static void assertUnderlying(SelectorOutputStream<?> selector,
assertNull(underlying);
} else {
assertNotNull(underlying);
assertEquals(OzoneFSDataStreamOutput.class,
assertEquals(CapableOzoneFSDataStreamOutput.class,
underlying.getClass());
}
}
Expand Down
Loading

0 comments on commit 93dab91

Please sign in to comment.