diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 48c77f2c863..5c2d4959ed9 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -507,6 +507,22 @@ public void flush() throws IOException { } } + @Override + public void hflush() throws IOException { + hsync(); + } + + @Override + public void hsync() throws IOException { + try { + if (!isClosed()) { + handleFlush(false); + } + } catch (Exception e) { + + } + } + public void waitFuturesComplete() throws IOException { try { CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get(); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java index b213bb1f4c6..baaff09e6cf 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.scm.storage; +import org.apache.hadoop.fs.Syncable; + import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; @@ -26,7 +28,7 @@ * This interface is similar to {@link java.io.OutputStream} * except that this class support {@link ByteBuffer} instead of byte[]. */ -public interface ByteBufferStreamOutput extends Closeable { +public interface ByteBufferStreamOutput extends Closeable, Syncable { /** * Similar to {@link java.io.OutputStream#write(byte[])}, * except that the parameter of this method is a {@link ByteBuffer}. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java index 4e5a35a539c..67fc205cbf7 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java @@ -116,6 +116,18 @@ public void flush() throws IOException { } } + @Override + public void hflush() throws IOException { + hsync(); + } + + @Override + public void hsync() throws IOException { + if (this.byteBufferStreamOutput != null) { + this.byteBufferStreamOutput.hsync(); + } + } + @Override public void close() throws IOException { if (this.byteBufferStreamOutput != null) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index 8e80b381041..153d514cfef 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client.io; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -62,6 +63,7 @@ public class BlockDataStreamOutputEntryPool implements KeyMetadataAware { private final long openID; private final ExcludeList excludeList; private List bufferList; + private ContainerBlockID lastUpdatedBlockId = new ContainerBlockID(-1, -1); @SuppressWarnings({"parameternumber", "squid:S00107"}) public BlockDataStreamOutputEntryPool( @@ -152,6 +154,33 @@ public List getLocationInfoList() { return locationInfoList; } + void hsyncKey(long offset) throws IOException { + if (keyArgs != null) { + // in test, this could be null + keyArgs.setDataSize(offset); + keyArgs.setLocationInfoList(getLocationInfoList()); + // When the key is multipart upload part file upload, we should not + // commit the key, as this is not an actual key, this is a just a + // partial key of a large file. + if (keyArgs.getIsMultipartKey()) { + throw new IOException("Hsync is unsupported for multipart keys."); + } else { + if (keyArgs.getLocationInfoList().size() == 0) { + omClient.hsyncKey(keyArgs, openID); + } else { + ContainerBlockID lastBLockId = keyArgs.getLocationInfoList().get(keyArgs.getLocationInfoList().size() - 1) + .getBlockID().getContainerBlockID(); + if (!lastUpdatedBlockId.equals(lastBLockId)) { + omClient.hsyncKey(keyArgs, openID); + lastUpdatedBlockId = lastBLockId; + } + } + } + } else { + LOG.warn("Closing KeyOutputStream, but key args is null"); + } + } + /** * Discards the subsequent pre allocated blocks and removes the streamEntries * from the streamEntries list for the container which is closed. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index e5a43819a3c..811435b8489 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -65,7 +65,7 @@ public class KeyDataStreamOutput extends AbstractDataStreamOutput * Defines stream action while calling handleFlushOrClose. */ enum StreamAction { - FLUSH, CLOSE, FULL + FLUSH, HSYNC, CLOSE, FULL } public static final Logger LOG = @@ -234,6 +234,21 @@ private int writeToDataStreamOutput(BlockDataStreamOutputEntry current, return writeLen; } + @Override + public void hflush() throws IOException { + hsync(); + } + + @Override + public void hsync() throws IOException { + checkNotClosed(); + final long hsyncPos = writeOffset; + handleFlushOrClose(KeyDataStreamOutput.StreamAction.HSYNC); + Preconditions.checkState(offset >= hsyncPos, + "offset = %s < hsyncPos = %s", offset, hsyncPos); + blockDataStreamOutputEntryPool.hsyncKey(hsyncPos); + } + /** * It performs following actions : * a. Updates the committed length at datanode for the current stream in @@ -394,6 +409,9 @@ private void handleStreamAction(BlockDataStreamOutputEntry entry, case FLUSH: entry.flush(); break; + case HSYNC: + entry.hsync(); + break; default: throw new IOException("Invalid Operation"); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java index c0af1c53010..da61b3e30ef 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.client.io; import org.apache.hadoop.crypto.CryptoOutputStream; +import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; @@ -24,6 +25,8 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Map; +import java.util.Objects; +import java.util.Optional; /** * OzoneDataStreamOutput is used to write data into Ozone. @@ -32,14 +35,52 @@ public class OzoneDataStreamOutput extends ByteBufferOutputStream implements KeyMetadataAware { private final ByteBufferStreamOutput byteBufferStreamOutput; + private boolean enableHsync; + private final Syncable syncable; /** - * Constructs OzoneDataStreamOutput with KeyDataStreamOutput. + * Constructs an instance with a {@link Syncable} {@link OutputStream}. * - * @param byteBufferStreamOutput the underlying ByteBufferStreamOutput + * @param outputStream an {@link OutputStream} which is {@link Syncable}. + * @param enableHsync if false, hsync() executes flush() instead. */ - public OzoneDataStreamOutput(ByteBufferStreamOutput byteBufferStreamOutput) { - this.byteBufferStreamOutput = byteBufferStreamOutput; + public OzoneDataStreamOutput(Syncable outputStream, boolean enableHsync) { + this(Optional.of(Objects.requireNonNull(outputStream, + "outputStream == null")) + .filter(s -> s instanceof OzoneDataStreamOutput) + .map(s -> (OzoneDataStreamOutput)s) + .orElseThrow(() -> new IllegalArgumentException( + "The parameter syncable is not an OutputStream")), + outputStream, enableHsync); + } + + /** + * Constructs an instance with a (non-{@link Syncable}) {@link ByteBufferStreamOutput} + * with an optional {@link Syncable} object. + * + * @param byteBufferStreamOutput for writing data. + * @param syncable an optional parameter + * for accessing the {@link Syncable} feature. + */ + public OzoneDataStreamOutput(ByteBufferStreamOutput byteBufferStreamOutput, Syncable syncable) { + this(byteBufferStreamOutput, syncable, false); + } + + /** + * Constructs an instance with a (non-{@link Syncable}) {@link ByteBufferStreamOutput} + * with an optional {@link Syncable} object. + * + * @param byteBufferStreamOutput for writing data. + * @param syncable an optional parameter + * for accessing the {@link Syncable} feature. + * @param enableHsync if false, hsync() executes flush() instead. + */ + public OzoneDataStreamOutput(ByteBufferStreamOutput byteBufferStreamOutput, Syncable syncable, + boolean enableHsync) { + this.byteBufferStreamOutput = Objects.requireNonNull(byteBufferStreamOutput, + "byteBufferStreamOutput == null"); + this.syncable = syncable != null ? syncable : byteBufferStreamOutput; + this.enableHsync = enableHsync; } @Override @@ -93,6 +134,27 @@ public KeyDataStreamOutput getKeyDataStreamOutput() { return null; } + public void hflush() throws IOException { + hsync(); + } + + public void hsync() throws IOException { + // Disable the feature flag restores the prior behavior. + if (!enableHsync) { + byteBufferStreamOutput.flush(); + return; + } + if (syncable != null) { + if (byteBufferStreamOutput != syncable) { + byteBufferStreamOutput.flush(); + } + syncable.hsync(); + } else { + throw new UnsupportedOperationException(byteBufferStreamOutput.getClass() + + " is not " + Syncable.class.getSimpleName()); + } + } + public ByteBufferStreamOutput getByteBufStreamOutput() { return byteBufferStreamOutput; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java index bd056185e75..f161d80c834 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java @@ -105,6 +105,12 @@ public synchronized void close() throws IOException { outputStream.close(); } + @Override + public void hflush() throws IOException { + hsync(); + } + + @Override public void hsync() throws IOException { // Disable the feature flag restores the prior behavior. if (!enableHsync) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 93c675d9b90..7ce446d446e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -1996,7 +1996,7 @@ public OzoneDataStreamOutput createMultipartStreamKey( } else { out = createMultipartOutputStream(openKey, uploadID, partNumber); } - return new OzoneDataStreamOutput(out); + return new OzoneDataStreamOutput(out, out); } @Override @@ -2417,7 +2417,7 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey) } else { out = createOutputStream(openKey); } - return new OzoneDataStreamOutput(out); + return new OzoneDataStreamOutput(out, out); } private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() { diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/SelectorOutputStream.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/SelectorOutputStream.java index c7e20fb7e8b..540efe1f88c 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/SelectorOutputStream.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/client/io/SelectorOutputStream.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.client.io; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; import org.apache.ratis.util.function.CheckedFunction; @@ -37,7 +38,7 @@ * @param The underlying {@link OutputStream} type. */ public class SelectorOutputStream - extends OutputStream implements Syncable { + extends OutputStream implements Syncable, StreamCapabilities { /** A buffer backed by a byte[]. */ static final class ByteArrayBuffer { private byte[] array; @@ -182,6 +183,20 @@ public void hsync() throws IOException { } } + @Override + public boolean hasCapability(String capability) { + try { + final OUT out = select(); + if (out instanceof StreamCapabilities) { + return ((StreamCapabilities) out).hasCapability(capability); + } else { + return false; + } + } catch (Exception e) { + return false; + } + } + @Override public void close() throws IOException { select().close(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index f185addf6b8..376f5fc5059 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -126,6 +126,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; @@ -176,6 +178,7 @@ public class TestHSync { private static final int WAL_HEADER_LEN = 83; private static OpenKeyCleanupService openKeyCleanupService; + private static final int AUTO_THRESHOLD = 0; @BeforeAll public static void init() throws Exception { @@ -1061,6 +1064,32 @@ public void testStreamCapability() throws Exception { testEncryptedStreamCapabilities(false); } + @Test + public void testOzoneStreamCapabilityForHsyncHflush() throws Exception { + final String rootPath = String.format("%s://%s/", + OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY)); + CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + CONF.set(OZONE_FS_DATASTREAM_AUTO_THRESHOLD, AUTO_THRESHOLD + "B"); + CONF.setBoolean(OZONE_FS_DATASTREAM_ENABLED, true); + + final String dir = OZONE_ROOT + bucket.getVolumeName() + + OZONE_URI_DELIMITER + bucket.getName(); + final Path file = new Path(dir, "file"); + + try (FileSystem fs = FileSystem.get(CONF); + FSDataOutputStream os = fs.create(file, true)) { + os.write(100); + // Verify output stream supports hsync() and hflush(). + assertTrue(os.hasCapability(StreamCapabilities.HFLUSH), + "KeyOutputStream should support hflush()!"); + assertTrue(os.hasCapability(StreamCapabilities.HSYNC), + "KeyOutputStream should support hsync()!"); + os.hsync(); + } + + CONF.setBoolean(OZONE_FS_DATASTREAM_ENABLED, false); + } + @Test public void testECStreamCapability() throws Exception { // create EC bucket to be used by OzoneFileSystem diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java index 059f7b3e03d..0926044ffc5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java @@ -171,9 +171,9 @@ static void createFile(FileSystem fs, Path path, boolean overwrite, assertNotNull(underlying); LOG.info("underlying after close: {}", underlying.getClass()); if (belowThreshold) { - assertInstanceOf(OzoneFSOutputStream.class, underlying); + assertInstanceOf(CapableOzoneFSOutputStream.class, underlying); } else { - assertEquals(OzoneFSDataStreamOutput.class, underlying.getClass()); + assertEquals(CapableOzoneFSDataStreamOutput.class, underlying.getClass()); } } @@ -186,7 +186,7 @@ static void assertUnderlying(SelectorOutputStream selector, assertNull(underlying); } else { assertNotNull(underlying); - assertEquals(OzoneFSDataStreamOutput.class, + assertEquals(CapableOzoneFSDataStreamOutput.class, underlying.getClass()); } } diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java index ed8d99d67fa..2a3f97038e8 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java @@ -301,7 +301,7 @@ private OutputStream selectOutputStream(String key, short replication, boolean overwrite, boolean recursive, int byteWritten) throws IOException { return isRatisStreamingEnabled && byteWritten > streamingAutoThreshold ? - adapter.createStreamFile(key, replication, overwrite, recursive) + createFSDataStreamOutput(adapter.createStreamFile(key, replication, overwrite, recursive)) : createFSOutputStream(adapter.createFile( key, replication, overwrite, recursive)); } @@ -327,6 +327,11 @@ protected OzoneFSOutputStream createFSOutputStream( return outputStream; } + protected OzoneFSDataStreamOutput createFSDataStreamOutput( + OzoneFSDataStreamOutput outputDataStream) { + return outputDataStream; + } + @Override public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java index 66b0037cf33..2926088e19f 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java @@ -299,7 +299,7 @@ private OutputStream selectOutputStream(String key, short replication, boolean overwrite, boolean recursive, int byteWritten) throws IOException { return isRatisStreamingEnabled && byteWritten > streamingAutoThreshold ? - adapter.createStreamFile(key, replication, overwrite, recursive) + createFSDataStreamOutput(adapter.createStreamFile(key, replication, overwrite, recursive)) : createFSOutputStream(adapter.createFile( key, replication, overwrite, recursive)); } @@ -324,6 +324,11 @@ protected OzoneFSOutputStream createFSOutputStream( return outputStream; } + protected OzoneFSDataStreamOutput createFSDataStreamOutput( + OzoneFSDataStreamOutput outputDataStream) { + return outputDataStream; + } + @Override public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/CapableOzoneFSDataStreamOutput.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/CapableOzoneFSDataStreamOutput.java new file mode 100644 index 00000000000..7f723708e22 --- /dev/null +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/CapableOzoneFSDataStreamOutput.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.ozone; + +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; +import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; +import org.apache.hadoop.util.StringUtils; + + +/** + * This class is used to workaround Hadoop2 compatibility issues. + * + * Hadoop 2 does not support StreamCapabilities, so we create different modules + * for Hadoop2 and Hadoop3 profiles. + * + * The OzoneFileSystem and RootedOzoneFileSystem in Hadoop3 profile uses + * CapableOzoneFSDataStreamOutput which implements StreamCapabilities interface, + * whereas the ones in Hadoop2 profile does not. + */ +public class CapableOzoneFSDataStreamOutput extends OzoneFSDataStreamOutput + implements StreamCapabilities { + private final boolean isHsyncEnabled; + public CapableOzoneFSDataStreamOutput(OzoneFSDataStreamOutput outputStream, + boolean enabled) { + super(outputStream.getByteBufferStreamOutput()); + this.isHsyncEnabled = enabled; + } + + @Override + public boolean hasCapability(String capability) { + ByteBufferStreamOutput os = getByteBufferStreamOutput(); + return hasWrappedCapability(os, capability); + } + + private boolean hasWrappedCapability(ByteBufferStreamOutput os, String capability) { + if (os instanceof KeyDataStreamOutput) { + switch (StringUtils.toLowerCase(capability)) { + case StreamCapabilities.HFLUSH: + case StreamCapabilities.HSYNC: + return isHsyncEnabled; + default: + return false; + } + } + return false; + } +} diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java index dcb917f2f9b..44b838def84 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.ozone; import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.client.io.ByteBufferOutputStream; import java.io.IOException; @@ -77,4 +78,19 @@ public void flush() throws IOException { public void close() throws IOException { byteBufferStreamOutput.close(); } + + @Override + public void hflush() throws IOException { + hsync(); + } + + @Override + public void hsync() throws IOException { + TracingUtil.executeInNewSpan("OzoneFSDataStreamOutput.hsync", + byteBufferStreamOutput::hsync); + } + + protected ByteBufferStreamOutput getByteBufferStreamOutput() { + return byteBufferStreamOutput; + } } diff --git a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java index 65e3145e7d4..62f4d3f0019 100644 --- a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java +++ b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java @@ -122,6 +122,12 @@ protected OzoneFSOutputStream createFSOutputStream( return new CapableOzoneFSOutputStream(outputStream, isHsyncEnabled()); } + @Override + protected OzoneFSDataStreamOutput createFSDataStreamOutput( + OzoneFSDataStreamOutput outputDataStream) { + return new CapableOzoneFSDataStreamOutput(outputDataStream, isHsyncEnabled()); + } + @Override public boolean hasPathCapability(final Path path, final String capability) throws IOException { diff --git a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java index bae01eafdea..7d263b8fa73 100644 --- a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java @@ -120,6 +120,12 @@ protected OzoneFSOutputStream createFSOutputStream( return new CapableOzoneFSOutputStream(outputStream, isHsyncEnabled()); } + @Override + protected OzoneFSDataStreamOutput createFSDataStreamOutput( + OzoneFSDataStreamOutput outputDataStream) { + return new CapableOzoneFSDataStreamOutput(outputDataStream, isHsyncEnabled()); + } + @Override public boolean hasPathCapability(final Path path, final String capability) throws IOException { diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java index 65e3145e7d4..62f4d3f0019 100644 --- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java +++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java @@ -122,6 +122,12 @@ protected OzoneFSOutputStream createFSOutputStream( return new CapableOzoneFSOutputStream(outputStream, isHsyncEnabled()); } + @Override + protected OzoneFSDataStreamOutput createFSDataStreamOutput( + OzoneFSDataStreamOutput outputDataStream) { + return new CapableOzoneFSDataStreamOutput(outputDataStream, isHsyncEnabled()); + } + @Override public boolean hasPathCapability(final Path path, final String capability) throws IOException { diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java index bed3505fe9b..815113afe17 100644 --- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java @@ -122,6 +122,12 @@ protected OzoneFSOutputStream createFSOutputStream( return new CapableOzoneFSOutputStream(outputStream, isHsyncEnabled()); } + @Override + protected OzoneFSDataStreamOutput createFSDataStreamOutput( + OzoneFSDataStreamOutput outputDataStream) { + return new CapableOzoneFSDataStreamOutput(outputDataStream, isHsyncEnabled()); + } + @Override public boolean hasPathCapability(final Path path, final String capability) throws IOException { diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java index 21f2414c0a7..b9b63bf9699 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java @@ -742,6 +742,16 @@ public void close() throws IOException { } + @Override + public void hflush() { + + } + + @Override + public void hsync() throws IOException { + + } + @Override public Map getMetadata() { return metadata; diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneDataStreamOutputStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneDataStreamOutputStub.java index b472320b7fe..858c4236ed7 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneDataStreamOutputStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneDataStreamOutputStub.java @@ -42,7 +42,7 @@ public class OzoneDataStreamOutputStub extends OzoneDataStreamOutput { public OzoneDataStreamOutputStub( ByteBufferStreamOutput byteBufferStreamOutput, String partName) { - super(byteBufferStreamOutput); + super(byteBufferStreamOutput, byteBufferStreamOutput); this.partName = partName; }