From dc0a10403a069892aec8152a65d646eeccd43a1d Mon Sep 17 00:00:00 2001 From: Stephen O'Donnell Date: Wed, 20 Dec 2023 20:05:47 +0000 Subject: [PATCH] HDDS-9322. Remove duplicate containers when loading volumes on a datanode (#5324) --- .../container/ozoneimpl/ContainerReader.java | 85 ++++++++++- .../ozoneimpl/TestContainerReader.java | 133 ++++++++++++++++-- 2 files changed, 202 insertions(+), 16 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java index 5f300a446d6..edbff14aca8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java @@ -35,6 +35,7 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.DELETED; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerDataProto.State.RECOVERING; @@ -225,7 +226,15 @@ public void verifyAndFixupContainerData(ContainerData containerData) cleanupContainer(hddsVolume, kvContainer); return; } - containerSet.addContainer(kvContainer); + try { + containerSet.addContainer(kvContainer); + } catch (StorageContainerException e) { + if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) { + throw e; + } + resolveDuplicate((KeyValueContainer) containerSet.getContainer( + kvContainer.getContainerData().getContainerID()), kvContainer); + } } else { throw new StorageContainerException("Container File is corrupted. " + "ContainerType is KeyValueContainer but cast to " + @@ -240,6 +249,80 @@ public void verifyAndFixupContainerData(ContainerData containerData) } } + private void resolveDuplicate(KeyValueContainer existing, + KeyValueContainer toAdd) throws IOException { + if (existing.getContainerData().getReplicaIndex() != 0 || + toAdd.getContainerData().getReplicaIndex() != 0) { + // This is an EC container. As EC Containers don't have a BSCID, we can't + // know which one has the most recent data. Additionally, it is possible + // for both copies to have a different replica index for the same + // container. Therefore we just let whatever one is loaded first win AND + // leave the other one on disk. + LOG.warn("Container {} is present at {} and at {}. Both are EC " + + "containers. Leaving both containers on disk.", + existing.getContainerData().getContainerID(), + existing.getContainerData().getContainerPath(), + toAdd.getContainerData().getContainerPath()); + return; + } + + long existingBCSID = existing.getBlockCommitSequenceId(); + ContainerProtos.ContainerDataProto.State existingState + = existing.getContainerState(); + long toAddBCSID = toAdd.getBlockCommitSequenceId(); + ContainerProtos.ContainerDataProto.State toAddState + = toAdd.getContainerState(); + + if (existingState != toAddState) { + if (existingState == CLOSED) { + // If we have mis-matched states, always pick a closed one + LOG.warn("Container {} is present at {} with state CLOSED and at " + + "{} with state {}. Removing the latter container.", + existing.getContainerData().getContainerID(), + existing.getContainerData().getContainerPath(), + toAdd.getContainerData().getContainerPath(), toAddState); + KeyValueContainerUtil.removeContainer(toAdd.getContainerData(), + hddsVolume.getConf()); + return; + } else if (toAddState == CLOSED) { + LOG.warn("Container {} is present at {} with state CLOSED and at " + + "{} with state {}. Removing the latter container.", + toAdd.getContainerData().getContainerID(), + toAdd.getContainerData().getContainerPath(), + existing.getContainerData().getContainerPath(), existingState); + swapAndRemoveContainer(existing, toAdd); + return; + } + } + + if (existingBCSID >= toAddBCSID) { + // existing is newer or equal, so remove the one we have yet to load. + LOG.warn("Container {} is present at {} with a newer or equal BCSID " + + "than at {}. Removing the latter container.", + existing.getContainerData().getContainerID(), + existing.getContainerData().getContainerPath(), + toAdd.getContainerData().getContainerPath()); + KeyValueContainerUtil.removeContainer(toAdd.getContainerData(), + hddsVolume.getConf()); + } else { + LOG.warn("Container {} is present at {} with a lesser BCSID " + + "than at {}. Removing the former container.", + existing.getContainerData().getContainerID(), + existing.getContainerData().getContainerPath(), + toAdd.getContainerData().getContainerPath()); + swapAndRemoveContainer(existing, toAdd); + } + } + + private void swapAndRemoveContainer(KeyValueContainer existing, + KeyValueContainer toAdd) throws IOException { + containerSet.removeContainer( + existing.getContainerData().getContainerID()); + containerSet.addContainer(toAdd); + KeyValueContainerUtil.removeContainer(existing.getContainerData(), + hddsVolume.getConf()); + } + private void cleanupContainer( HddsVolume volume, KeyValueContainer kvContainer) { try { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java index 3e947e135b9..5248caaf65b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java @@ -32,12 +32,14 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; +import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.utils.ContainerCache; import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; @@ -49,8 +51,10 @@ import org.mockito.Mockito; import java.io.File; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -321,6 +325,10 @@ public void testMultipleContainerReader(ContainerTestVersionInfo versionInfo) MutableVolumeSet volumeSets = new MutableVolumeSet(datanodeId.toString(), clusterId, conf, null, StorageVolume.VolumeType.DATA_VOLUME, null); + for (StorageVolume v : volumeSets.getVolumesList()) { + StorageVolumeUtil.checkVolume(v, clusterId, clusterId, conf, + null, null); + } createDbInstancesForTestIfNeeded(volumeSets, clusterId, clusterId, conf); ContainerCache cache = ContainerCache.getInstance(conf); cache.shutdownCache(); @@ -330,24 +338,42 @@ public void testMultipleContainerReader(ContainerTestVersionInfo versionInfo) final int containerCount = 100; blockCount = containerCount; - for (int i = 0; i < containerCount; i++) { - KeyValueContainerData keyValueContainerData = - new KeyValueContainerData(i, layout, - (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(), - datanodeId.toString()); - KeyValueContainer keyValueContainer = - new KeyValueContainer(keyValueContainerData, - conf); - keyValueContainer.create(volumeSets, policy, clusterId); + KeyValueContainer conflict01 = null; + KeyValueContainer conflict02 = null; + KeyValueContainer conflict11 = null; + KeyValueContainer conflict12 = null; + KeyValueContainer conflict21 = null; + KeyValueContainer conflict22 = null; + KeyValueContainer ec1 = null; + KeyValueContainer ec2 = null; + long baseBCSID = 10L; - List blkNames; - if (i % 2 == 0) { - blkNames = addBlocks(keyValueContainer, true); - markBlocksForDelete(keyValueContainer, true, blkNames, i); + for (int i = 0; i < containerCount; i++) { + if (i == 0) { + // Create a duplicate container with ID 0. Both have the same BSCID + conflict01 = + createContainerWithId(0, volumeSets, policy, baseBCSID, 0); + conflict02 = + createContainerWithId(0, volumeSets, policy, baseBCSID, 0); + } else if (i == 1) { + // Create a duplicate container with ID 1 so that the one has a + // larger BCSID + conflict11 = + createContainerWithId(1, volumeSets, policy, baseBCSID, 0); + conflict12 = createContainerWithId( + 1, volumeSets, policy, baseBCSID - 1, 0); + } else if (i == 2) { + conflict21 = + createContainerWithId(i, volumeSets, policy, baseBCSID, 0); + conflict22 = + createContainerWithId(i, volumeSets, policy, baseBCSID, 0); + conflict22.close(); + } else if (i == 3) { + ec1 = createContainerWithId(i, volumeSets, policy, baseBCSID, 1); + ec2 = createContainerWithId(i, volumeSets, policy, baseBCSID, 1); } else { - blkNames = addBlocks(keyValueContainer, false); - markBlocksForDelete(keyValueContainer, false, blkNames, i); + createContainerWithId(i, volumeSets, policy, baseBCSID, 0); } } // Close the RocksDB instance for this container and remove from the cache @@ -374,11 +400,88 @@ public void testMultipleContainerReader(ContainerTestVersionInfo versionInfo) " costs " + (System.currentTimeMillis() - startTime) / 1000 + "s"); Assertions.assertEquals(containerCount, containerSet.getContainerMap().entrySet().size()); + Assertions.assertEquals(volumeSet.getFailedVolumesList().size(), 0); + + // One of the conflict01 or conflict02 should have had its container path + // removed. + List paths = new ArrayList<>(); + paths.add(Paths.get(conflict01.getContainerData().getContainerPath())); + paths.add(Paths.get(conflict02.getContainerData().getContainerPath())); + int exist = 0; + for (Path p : paths) { + if (Files.exists(p)) { + exist++; + } + } + Assertions.assertEquals(1, exist); + Assertions.assertTrue(paths.contains(Paths.get( + containerSet.getContainer(0).getContainerData().getContainerPath()))); + + // For conflict1, the one with the larger BCSID should win, which is + // conflict11. + Assertions.assertFalse(Files.exists(Paths.get( + conflict12.getContainerData().getContainerPath()))); + Assertions.assertEquals(conflict11.getContainerData().getContainerPath(), + containerSet.getContainer(1).getContainerData().getContainerPath()); + Assertions.assertEquals(baseBCSID, containerSet.getContainer(1) + .getContainerData().getBlockCommitSequenceId()); + + // For conflict2, the closed on (conflict22) should win. + Assertions.assertFalse(Files.exists(Paths.get( + conflict21.getContainerData().getContainerPath()))); + Assertions.assertEquals(conflict22.getContainerData().getContainerPath(), + containerSet.getContainer(2).getContainerData().getContainerPath()); + Assertions.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, + containerSet.getContainer(2).getContainerData().getState()); + + // For the EC conflict, both containers should be left on disk + Assertions.assertTrue(Files.exists(Paths.get( + ec1.getContainerData().getContainerPath()))); + Assertions.assertTrue(Files.exists(Paths.get( + ec2.getContainerData().getContainerPath()))); + Assertions.assertNotNull(containerSet.getContainer(3)); + // There should be no open containers cached by the ContainerReader as it // opens and closed them avoiding the cache. Assertions.assertEquals(0, cache.size()); } + private KeyValueContainer createContainerWithId(int id, VolumeSet volSet, + VolumeChoosingPolicy policy, long bcsid, int replicaIndex) + throws Exception { + KeyValueContainerData keyValueContainerData = + new KeyValueContainerData(id, layout, + (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(), + datanodeId.toString()); + keyValueContainerData.setReplicaIndex(replicaIndex); + + KeyValueContainer keyValueContainer = + new KeyValueContainer(keyValueContainerData, + conf); + keyValueContainer.create(volSet, policy, clusterId); + + List blkNames; + if (id % 2 == 0) { + blkNames = addBlocks(keyValueContainer, true); + markBlocksForDelete(keyValueContainer, true, blkNames, id); + } else { + blkNames = addBlocks(keyValueContainer, false); + markBlocksForDelete(keyValueContainer, false, blkNames, id); + } + setBlockCommitSequence(keyValueContainerData, bcsid); + return keyValueContainer; + } + + private void setBlockCommitSequence(KeyValueContainerData cData, long val) + throws IOException { + try (DBHandle metadataStore = BlockUtils.getDB(cData, conf)) { + metadataStore.getStore().getMetadataTable() + .put(cData.getBcsIdKey(), val); + metadataStore.getStore().flushDB(); + } + cData.updateBlockCommitSequenceId(val); + } + @ContainerTestVersionInfo.ContainerTest public void testMarkedDeletedContainerCleared( ContainerTestVersionInfo versionInfo) throws Exception {