Skip to content

Commit

Permalink
HDDS-9322. Remove duplicate containers when loading volumes on a data…
Browse files Browse the repository at this point in the history
…node (#5324)
  • Loading branch information
sodonnel authored Dec 20, 2023
1 parent 24f6ea4 commit dc0a104
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.DELETED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerDataProto.State.RECOVERING;
Expand Down Expand Up @@ -225,7 +226,15 @@ public void verifyAndFixupContainerData(ContainerData containerData)
cleanupContainer(hddsVolume, kvContainer);
return;
}
containerSet.addContainer(kvContainer);
try {
containerSet.addContainer(kvContainer);
} catch (StorageContainerException e) {
if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) {
throw e;
}
resolveDuplicate((KeyValueContainer) containerSet.getContainer(
kvContainer.getContainerData().getContainerID()), kvContainer);
}
} else {
throw new StorageContainerException("Container File is corrupted. " +
"ContainerType is KeyValueContainer but cast to " +
Expand All @@ -240,6 +249,80 @@ public void verifyAndFixupContainerData(ContainerData containerData)
}
}

private void resolveDuplicate(KeyValueContainer existing,
KeyValueContainer toAdd) throws IOException {
if (existing.getContainerData().getReplicaIndex() != 0 ||
toAdd.getContainerData().getReplicaIndex() != 0) {
// This is an EC container. As EC Containers don't have a BSCID, we can't
// know which one has the most recent data. Additionally, it is possible
// for both copies to have a different replica index for the same
// container. Therefore we just let whatever one is loaded first win AND
// leave the other one on disk.
LOG.warn("Container {} is present at {} and at {}. Both are EC " +
"containers. Leaving both containers on disk.",
existing.getContainerData().getContainerID(),
existing.getContainerData().getContainerPath(),
toAdd.getContainerData().getContainerPath());
return;
}

long existingBCSID = existing.getBlockCommitSequenceId();
ContainerProtos.ContainerDataProto.State existingState
= existing.getContainerState();
long toAddBCSID = toAdd.getBlockCommitSequenceId();
ContainerProtos.ContainerDataProto.State toAddState
= toAdd.getContainerState();

if (existingState != toAddState) {
if (existingState == CLOSED) {
// If we have mis-matched states, always pick a closed one
LOG.warn("Container {} is present at {} with state CLOSED and at " +
"{} with state {}. Removing the latter container.",
existing.getContainerData().getContainerID(),
existing.getContainerData().getContainerPath(),
toAdd.getContainerData().getContainerPath(), toAddState);
KeyValueContainerUtil.removeContainer(toAdd.getContainerData(),
hddsVolume.getConf());
return;
} else if (toAddState == CLOSED) {
LOG.warn("Container {} is present at {} with state CLOSED and at " +
"{} with state {}. Removing the latter container.",
toAdd.getContainerData().getContainerID(),
toAdd.getContainerData().getContainerPath(),
existing.getContainerData().getContainerPath(), existingState);
swapAndRemoveContainer(existing, toAdd);
return;
}
}

if (existingBCSID >= toAddBCSID) {
// existing is newer or equal, so remove the one we have yet to load.
LOG.warn("Container {} is present at {} with a newer or equal BCSID " +
"than at {}. Removing the latter container.",
existing.getContainerData().getContainerID(),
existing.getContainerData().getContainerPath(),
toAdd.getContainerData().getContainerPath());
KeyValueContainerUtil.removeContainer(toAdd.getContainerData(),
hddsVolume.getConf());
} else {
LOG.warn("Container {} is present at {} with a lesser BCSID " +
"than at {}. Removing the former container.",
existing.getContainerData().getContainerID(),
existing.getContainerData().getContainerPath(),
toAdd.getContainerData().getContainerPath());
swapAndRemoveContainer(existing, toAdd);
}
}

private void swapAndRemoveContainer(KeyValueContainer existing,
KeyValueContainer toAdd) throws IOException {
containerSet.removeContainer(
existing.getContainerData().getContainerID());
containerSet.addContainer(toAdd);
KeyValueContainerUtil.removeContainer(existing.getContainerData(),
hddsVolume.getConf());
}

private void cleanupContainer(
HddsVolume volume, KeyValueContainer kvContainer) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
Expand All @@ -49,8 +51,10 @@
import org.mockito.Mockito;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -321,6 +325,10 @@ public void testMultipleContainerReader(ContainerTestVersionInfo versionInfo)
MutableVolumeSet volumeSets =
new MutableVolumeSet(datanodeId.toString(), clusterId, conf, null,
StorageVolume.VolumeType.DATA_VOLUME, null);
for (StorageVolume v : volumeSets.getVolumesList()) {
StorageVolumeUtil.checkVolume(v, clusterId, clusterId, conf,
null, null);
}
createDbInstancesForTestIfNeeded(volumeSets, clusterId, clusterId, conf);
ContainerCache cache = ContainerCache.getInstance(conf);
cache.shutdownCache();
Expand All @@ -330,24 +338,42 @@ public void testMultipleContainerReader(ContainerTestVersionInfo versionInfo)

final int containerCount = 100;
blockCount = containerCount;
for (int i = 0; i < containerCount; i++) {
KeyValueContainerData keyValueContainerData =
new KeyValueContainerData(i, layout,
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
datanodeId.toString());

KeyValueContainer keyValueContainer =
new KeyValueContainer(keyValueContainerData,
conf);
keyValueContainer.create(volumeSets, policy, clusterId);
KeyValueContainer conflict01 = null;
KeyValueContainer conflict02 = null;
KeyValueContainer conflict11 = null;
KeyValueContainer conflict12 = null;
KeyValueContainer conflict21 = null;
KeyValueContainer conflict22 = null;
KeyValueContainer ec1 = null;
KeyValueContainer ec2 = null;
long baseBCSID = 10L;

List<Long> blkNames;
if (i % 2 == 0) {
blkNames = addBlocks(keyValueContainer, true);
markBlocksForDelete(keyValueContainer, true, blkNames, i);
for (int i = 0; i < containerCount; i++) {
if (i == 0) {
// Create a duplicate container with ID 0. Both have the same BSCID
conflict01 =
createContainerWithId(0, volumeSets, policy, baseBCSID, 0);
conflict02 =
createContainerWithId(0, volumeSets, policy, baseBCSID, 0);
} else if (i == 1) {
// Create a duplicate container with ID 1 so that the one has a
// larger BCSID
conflict11 =
createContainerWithId(1, volumeSets, policy, baseBCSID, 0);
conflict12 = createContainerWithId(
1, volumeSets, policy, baseBCSID - 1, 0);
} else if (i == 2) {
conflict21 =
createContainerWithId(i, volumeSets, policy, baseBCSID, 0);
conflict22 =
createContainerWithId(i, volumeSets, policy, baseBCSID, 0);
conflict22.close();
} else if (i == 3) {
ec1 = createContainerWithId(i, volumeSets, policy, baseBCSID, 1);
ec2 = createContainerWithId(i, volumeSets, policy, baseBCSID, 1);
} else {
blkNames = addBlocks(keyValueContainer, false);
markBlocksForDelete(keyValueContainer, false, blkNames, i);
createContainerWithId(i, volumeSets, policy, baseBCSID, 0);
}
}
// Close the RocksDB instance for this container and remove from the cache
Expand All @@ -374,11 +400,88 @@ public void testMultipleContainerReader(ContainerTestVersionInfo versionInfo)
" costs " + (System.currentTimeMillis() - startTime) / 1000 + "s");
Assertions.assertEquals(containerCount,
containerSet.getContainerMap().entrySet().size());
Assertions.assertEquals(volumeSet.getFailedVolumesList().size(), 0);

// One of the conflict01 or conflict02 should have had its container path
// removed.
List<Path> paths = new ArrayList<>();
paths.add(Paths.get(conflict01.getContainerData().getContainerPath()));
paths.add(Paths.get(conflict02.getContainerData().getContainerPath()));
int exist = 0;
for (Path p : paths) {
if (Files.exists(p)) {
exist++;
}
}
Assertions.assertEquals(1, exist);
Assertions.assertTrue(paths.contains(Paths.get(
containerSet.getContainer(0).getContainerData().getContainerPath())));

// For conflict1, the one with the larger BCSID should win, which is
// conflict11.
Assertions.assertFalse(Files.exists(Paths.get(
conflict12.getContainerData().getContainerPath())));
Assertions.assertEquals(conflict11.getContainerData().getContainerPath(),
containerSet.getContainer(1).getContainerData().getContainerPath());
Assertions.assertEquals(baseBCSID, containerSet.getContainer(1)
.getContainerData().getBlockCommitSequenceId());

// For conflict2, the closed on (conflict22) should win.
Assertions.assertFalse(Files.exists(Paths.get(
conflict21.getContainerData().getContainerPath())));
Assertions.assertEquals(conflict22.getContainerData().getContainerPath(),
containerSet.getContainer(2).getContainerData().getContainerPath());
Assertions.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
containerSet.getContainer(2).getContainerData().getState());

// For the EC conflict, both containers should be left on disk
Assertions.assertTrue(Files.exists(Paths.get(
ec1.getContainerData().getContainerPath())));
Assertions.assertTrue(Files.exists(Paths.get(
ec2.getContainerData().getContainerPath())));
Assertions.assertNotNull(containerSet.getContainer(3));

// There should be no open containers cached by the ContainerReader as it
// opens and closed them avoiding the cache.
Assertions.assertEquals(0, cache.size());
}

private KeyValueContainer createContainerWithId(int id, VolumeSet volSet,
VolumeChoosingPolicy policy, long bcsid, int replicaIndex)
throws Exception {
KeyValueContainerData keyValueContainerData =
new KeyValueContainerData(id, layout,
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
datanodeId.toString());
keyValueContainerData.setReplicaIndex(replicaIndex);

KeyValueContainer keyValueContainer =
new KeyValueContainer(keyValueContainerData,
conf);
keyValueContainer.create(volSet, policy, clusterId);

List<Long> blkNames;
if (id % 2 == 0) {
blkNames = addBlocks(keyValueContainer, true);
markBlocksForDelete(keyValueContainer, true, blkNames, id);
} else {
blkNames = addBlocks(keyValueContainer, false);
markBlocksForDelete(keyValueContainer, false, blkNames, id);
}
setBlockCommitSequence(keyValueContainerData, bcsid);
return keyValueContainer;
}

private void setBlockCommitSequence(KeyValueContainerData cData, long val)
throws IOException {
try (DBHandle metadataStore = BlockUtils.getDB(cData, conf)) {
metadataStore.getStore().getMetadataTable()
.put(cData.getBcsIdKey(), val);
metadataStore.getStore().flushDB();
}
cData.updateBlockCommitSequenceId(val);
}

@ContainerTestVersionInfo.ContainerTest
public void testMarkedDeletedContainerCleared(
ContainerTestVersionInfo versionInfo) throws Exception {
Expand Down

0 comments on commit dc0a104

Please sign in to comment.