Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11816. Ozone stream to support Hsync,Hflush. #7592

Merged
merged 2 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,22 @@ public void flush() throws IOException {
}
}

@Override
public void hflush() throws IOException {
hsync();
}

@Override
public void hsync() throws IOException {
try {
if (!isClosed()) {
handleFlush(false);
}
} catch (Exception e) {

}
}

public void waitFuturesComplete() throws IOException {
try {
CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.hdds.scm.storage;

import org.apache.hadoop.fs.Syncable;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -26,7 +28,7 @@
* This interface is similar to {@link java.io.OutputStream}
* except that this class support {@link ByteBuffer} instead of byte[].
*/
public interface ByteBufferStreamOutput extends Closeable {
public interface ByteBufferStreamOutput extends Closeable, Syncable {
/**
* Similar to {@link java.io.OutputStream#write(byte[])},
* except that the parameter of this method is a {@link ByteBuffer}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ public void flush() throws IOException {
}
}

@Override
public void hflush() throws IOException {
hsync();
}

@Override
public void hsync() throws IOException {
if (this.byteBufferStreamOutput != null) {
this.byteBufferStreamOutput.hsync();
}
}

@Override
public void close() throws IOException {
if (this.byteBufferStreamOutput != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.client.io;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class BlockDataStreamOutputEntryPool implements KeyMetadataAware {
private final long openID;
private final ExcludeList excludeList;
private List<StreamBuffer> bufferList;
private ContainerBlockID lastUpdatedBlockId = new ContainerBlockID(-1, -1);

@SuppressWarnings({"parameternumber", "squid:S00107"})
public BlockDataStreamOutputEntryPool(
Expand Down Expand Up @@ -152,6 +154,33 @@ public List<OmKeyLocationInfo> getLocationInfoList() {
return locationInfoList;
}

void hsyncKey(long offset) throws IOException {
if (keyArgs != null) {
// in test, this could be null
keyArgs.setDataSize(offset);
keyArgs.setLocationInfoList(getLocationInfoList());
// When the key is multipart upload part file upload, we should not
// commit the key, as this is not an actual key, this is a just a
// partial key of a large file.
if (keyArgs.getIsMultipartKey()) {
throw new IOException("Hsync is unsupported for multipart keys.");
} else {
if (keyArgs.getLocationInfoList().size() == 0) {
omClient.hsyncKey(keyArgs, openID);
} else {
ContainerBlockID lastBLockId = keyArgs.getLocationInfoList().get(keyArgs.getLocationInfoList().size() - 1)
.getBlockID().getContainerBlockID();
if (!lastUpdatedBlockId.equals(lastBLockId)) {
omClient.hsyncKey(keyArgs, openID);
lastUpdatedBlockId = lastBLockId;
}
}
}
} else {
LOG.warn("Closing KeyOutputStream, but key args is null");
}
}

/**
* Discards the subsequent pre allocated blocks and removes the streamEntries
* from the streamEntries list for the container which is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class KeyDataStreamOutput extends AbstractDataStreamOutput
* Defines stream action while calling handleFlushOrClose.
*/
enum StreamAction {
FLUSH, CLOSE, FULL
FLUSH, HSYNC, CLOSE, FULL
}

public static final Logger LOG =
Expand Down Expand Up @@ -234,6 +234,21 @@ private int writeToDataStreamOutput(BlockDataStreamOutputEntry current,
return writeLen;
}

@Override
public void hflush() throws IOException {
hsync();
}

@Override
public void hsync() throws IOException {
checkNotClosed();
final long hsyncPos = writeOffset;
handleFlushOrClose(KeyDataStreamOutput.StreamAction.HSYNC);
Preconditions.checkState(offset >= hsyncPos,
"offset = %s < hsyncPos = %s", offset, hsyncPos);
blockDataStreamOutputEntryPool.hsyncKey(hsyncPos);
}

/**
* It performs following actions :
* a. Updates the committed length at datanode for the current stream in
Expand Down Expand Up @@ -394,6 +409,9 @@ private void handleStreamAction(BlockDataStreamOutputEntry entry,
case FLUSH:
entry.flush();
break;
case HSYNC:
entry.hsync();
break;
default:
throw new IOException("Invalid Operation");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
package org.apache.hadoop.ozone.client.io;

import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* OzoneDataStreamOutput is used to write data into Ozone.
Expand All @@ -32,14 +35,52 @@ public class OzoneDataStreamOutput extends ByteBufferOutputStream
implements KeyMetadataAware {

private final ByteBufferStreamOutput byteBufferStreamOutput;
private boolean enableHsync;
private final Syncable syncable;

/**
* Constructs OzoneDataStreamOutput with KeyDataStreamOutput.
* Constructs an instance with a {@link Syncable} {@link OutputStream}.
*
* @param byteBufferStreamOutput the underlying ByteBufferStreamOutput
* @param outputStream an {@link OutputStream} which is {@link Syncable}.
* @param enableHsync if false, hsync() executes flush() instead.
*/
public OzoneDataStreamOutput(ByteBufferStreamOutput byteBufferStreamOutput) {
this.byteBufferStreamOutput = byteBufferStreamOutput;
public OzoneDataStreamOutput(Syncable outputStream, boolean enableHsync) {
this(Optional.of(Objects.requireNonNull(outputStream,
"outputStream == null"))
.filter(s -> s instanceof OzoneDataStreamOutput)
.map(s -> (OzoneDataStreamOutput)s)
.orElseThrow(() -> new IllegalArgumentException(
"The parameter syncable is not an OutputStream")),
outputStream, enableHsync);
}

/**
* Constructs an instance with a (non-{@link Syncable}) {@link ByteBufferStreamOutput}
* with an optional {@link Syncable} object.
*
* @param byteBufferStreamOutput for writing data.
* @param syncable an optional parameter
* for accessing the {@link Syncable} feature.
*/
public OzoneDataStreamOutput(ByteBufferStreamOutput byteBufferStreamOutput, Syncable syncable) {
this(byteBufferStreamOutput, syncable, false);
}

/**
* Constructs an instance with a (non-{@link Syncable}) {@link ByteBufferStreamOutput}
* with an optional {@link Syncable} object.
*
* @param byteBufferStreamOutput for writing data.
* @param syncable an optional parameter
* for accessing the {@link Syncable} feature.
* @param enableHsync if false, hsync() executes flush() instead.
*/
public OzoneDataStreamOutput(ByteBufferStreamOutput byteBufferStreamOutput, Syncable syncable,
boolean enableHsync) {
this.byteBufferStreamOutput = Objects.requireNonNull(byteBufferStreamOutput,
"byteBufferStreamOutput == null");
this.syncable = syncable != null ? syncable : byteBufferStreamOutput;
this.enableHsync = enableHsync;
}

@Override
Expand Down Expand Up @@ -93,6 +134,27 @@ public KeyDataStreamOutput getKeyDataStreamOutput() {
return null;
}

public void hflush() throws IOException {
hsync();
}

public void hsync() throws IOException {
// Disable the feature flag restores the prior behavior.
if (!enableHsync) {
byteBufferStreamOutput.flush();
return;
}
if (syncable != null) {
if (byteBufferStreamOutput != syncable) {
byteBufferStreamOutput.flush();
}
syncable.hsync();
} else {
throw new UnsupportedOperationException(byteBufferStreamOutput.getClass()
+ " is not " + Syncable.class.getSimpleName());
}
}

public ByteBufferStreamOutput getByteBufStreamOutput() {
return byteBufferStreamOutput;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ public synchronized void close() throws IOException {
outputStream.close();
}

@Override
public void hflush() throws IOException {
hsync();
}

@Override
public void hsync() throws IOException {
// Disable the feature flag restores the prior behavior.
if (!enableHsync) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1996,7 +1996,7 @@ public OzoneDataStreamOutput createMultipartStreamKey(
} else {
out = createMultipartOutputStream(openKey, uploadID, partNumber);
}
return new OzoneDataStreamOutput(out);
return new OzoneDataStreamOutput(out, out);
}

@Override
Expand Down Expand Up @@ -2417,7 +2417,7 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey)
} else {
out = createOutputStream(openKey);
}
return new OzoneDataStreamOutput(out);
return new OzoneDataStreamOutput(out, out);
}

private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.client.io;

import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.ratis.util.function.CheckedFunction;

Expand All @@ -37,7 +38,7 @@
* @param <OUT> The underlying {@link OutputStream} type.
*/
public class SelectorOutputStream<OUT extends OutputStream>
extends OutputStream implements Syncable {
extends OutputStream implements Syncable, StreamCapabilities {
/** A buffer backed by a byte[]. */
static final class ByteArrayBuffer {
private byte[] array;
Expand Down Expand Up @@ -182,6 +183,20 @@ public void hsync() throws IOException {
}
}

@Override
public boolean hasCapability(String capability) {
try {
final OUT out = select();
if (out instanceof StreamCapabilities) {
return ((StreamCapabilities) out).hasCapability(capability);
} else {
return false;
}
} catch (Exception e) {
return false;
}
}

@Override
public void close() throws IOException {
select().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
Expand Down Expand Up @@ -176,6 +178,7 @@ public class TestHSync {
private static final int WAL_HEADER_LEN = 83;

private static OpenKeyCleanupService openKeyCleanupService;
private static final int AUTO_THRESHOLD = 0;

@BeforeAll
public static void init() throws Exception {
Expand Down Expand Up @@ -1061,6 +1064,32 @@ public void testStreamCapability() throws Exception {
testEncryptedStreamCapabilities(false);
}

@Test
public void testOzoneStreamCapabilityForHsyncHflush() throws Exception {
final String rootPath = String.format("%s://%s/",
OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
CONF.set(OZONE_FS_DATASTREAM_AUTO_THRESHOLD, AUTO_THRESHOLD + "B");
CONF.setBoolean(OZONE_FS_DATASTREAM_ENABLED, true);

final String dir = OZONE_ROOT + bucket.getVolumeName()
+ OZONE_URI_DELIMITER + bucket.getName();
final Path file = new Path(dir, "file");

try (FileSystem fs = FileSystem.get(CONF);
FSDataOutputStream os = fs.create(file, true)) {
os.write(100);
// Verify output stream supports hsync() and hflush().
assertTrue(os.hasCapability(StreamCapabilities.HFLUSH),
"KeyOutputStream should support hflush()!");
assertTrue(os.hasCapability(StreamCapabilities.HSYNC),
"KeyOutputStream should support hsync()!");
os.hsync();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
os.hsync();
}


CONF.setBoolean(OZONE_FS_DATASTREAM_ENABLED, false);
}

@Test
public void testECStreamCapability() throws Exception {
// create EC bucket to be used by OzoneFileSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ static void createFile(FileSystem fs, Path path, boolean overwrite,
assertNotNull(underlying);
LOG.info("underlying after close: {}", underlying.getClass());
if (belowThreshold) {
assertInstanceOf(OzoneFSOutputStream.class, underlying);
assertInstanceOf(CapableOzoneFSOutputStream.class, underlying);
} else {
assertEquals(OzoneFSDataStreamOutput.class, underlying.getClass());
assertEquals(CapableOzoneFSDataStreamOutput.class, underlying.getClass());
}
}

Expand All @@ -186,7 +186,7 @@ static void assertUnderlying(SelectorOutputStream<?> selector,
assertNull(underlying);
} else {
assertNotNull(underlying);
assertEquals(OzoneFSDataStreamOutput.class,
assertEquals(CapableOzoneFSDataStreamOutput.class,
underlying.getClass());
}
}
Expand Down
Loading
Loading