Skip to content

Commit

Permalink
HDDS-9976. Memory leak for DeleteBlocksCommand when queue is full (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen01 authored Jan 2, 2024
1 parent 72e231e commit 16589ce
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,22 @@ public void handle(SCMCommand command, OzoneContainer container,
SCMCommandProto.Type.deleteBlocksCommand, command.getType());
return;
}

DeleteCmdInfo cmd = new DeleteCmdInfo((DeleteBlocksCommand) command,
container, context, connectionManager);
try {
DeleteCmdInfo cmd = new DeleteCmdInfo((DeleteBlocksCommand) command,
container, context, connectionManager);
deleteCommandQueues.add(cmd);
} catch (IllegalStateException e) {
String dnId = context.getParent().getDatanodeDetails().getUuidString();
Consumer<CommandStatus> updateFailure = (cmdStatus) -> {
cmdStatus.markAsFailed();
ContainerBlocksDeletionACKProto emptyACK =
ContainerBlocksDeletionACKProto
.newBuilder()
.setDnId(dnId)
.build();
((DeleteBlockCommandStatus)cmdStatus).setBlocksDeletionAck(emptyACK);
};
updateCommandStatus(cmd.getContext(), cmd.getCmd(), updateFailure, LOG);
LOG.warn("Command is discarded because of the command queue is full");
}
}
Expand Down Expand Up @@ -382,9 +392,13 @@ private void processCmd(DeleteCmdInfo cmd) {
} finally {
final ContainerBlocksDeletionACKProto deleteAck =
blockDeletionACK;
final boolean status = cmdExecuted;
final boolean executedStatus = cmdExecuted;
Consumer<CommandStatus> statusUpdater = (cmdStatus) -> {
cmdStatus.setStatus(status);
if (executedStatus) {
cmdStatus.markAsExecuted();
} else {
cmdStatus.markAsFailed();
}
((DeleteBlockCommandStatus)cmdStatus).setBlocksDeletionAck(deleteAck);
};
updateCommandStatus(cmd.getContext(), cmd.getCmd(), statusUpdater, LOG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,22 @@ public String getMsg() {
*
* @param status
*/
public void setStatus(Status status) {
private void setStatus(Status status) {
this.status = status;
}

public void setStatus(boolean cmdExecuted) {
setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED);
/**
* Marks the command status as executed.
*/
public void markAsExecuted() {
setStatus(Status.EXECUTED);
}

/**
* Marks the command status as failed.
*/
public void markAsFailed() {
setStatus(Status.FAILED);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,35 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler.SchemaHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand All @@ -41,6 +53,7 @@
.DeleteBlockTransactionResult;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -51,6 +64,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.BLOCK_DELETE_COMMAND_WORKER_INTERVAL;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.BLOCK_DELETE_COMMAND_WORKER_INTERVAL_DEFAULT;
Expand All @@ -70,7 +84,8 @@
*/
@Timeout(300)
public class TestDeleteBlocksCommandHandler {

@TempDir
private Path folder;
private OzoneConfiguration conf;
private ContainerLayoutVersion layout;
private OzoneContainer ozoneContainer;
Expand Down Expand Up @@ -278,6 +293,43 @@ public void testDeleteCmdWorkerInterval(
Assertions.assertEquals(deleteCmdWorker.getInterval(), 4000);
}

@Test
public void testDeleteBlockCommandHandleWhenDeleteCommandQueuesFull()
throws IOException {
int blockDeleteQueueLimit = 5;
// Setting up the test environment
OzoneConfiguration configuration = new OzoneConfiguration();
configuration.set(HddsConfigKeys.OZONE_METADATA_DIRS, folder.toString());
DatanodeDetails datanodeDetails = MockDatanodeDetails.randomDatanodeDetails();
DatanodeConfiguration dnConf =
configuration.getObject(DatanodeConfiguration.class);
OzoneContainer container = ContainerTestUtils.getOzoneContainer(datanodeDetails, configuration);
DatanodeStateMachine stateMachine = Mockito.mock(DatanodeStateMachine.class);
Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(datanodeDetails);
StateContext context = new StateContext(configuration,
Mockito.mock(DatanodeStateMachine.DatanodeStates.class),
stateMachine, "");

// Set Queue limit
dnConf.setBlockDeleteQueueLimit(blockDeleteQueueLimit);
handler = new DeleteBlocksCommandHandler(
container, configuration, dnConf, "");

// Check if the command status is as expected: PENDING when queue is not full, FAILED when queue is full
for (int i = 0; i < blockDeleteQueueLimit + 2; i++) {
DeleteBlocksCommand deleteBlocksCommand = new DeleteBlocksCommand(emptyList());
context.addCommand(deleteBlocksCommand);
handler.handle(deleteBlocksCommand, container, context, Mockito.mock(SCMConnectionManager.class));
CommandStatus cmdStatus = context.getCmdStatus(deleteBlocksCommand.getId());
if (i < blockDeleteQueueLimit) {
Assertions.assertEquals(cmdStatus.getStatus(), Status.PENDING);
} else {
Assertions.assertEquals(cmdStatus.getStatus(), Status.FAILED);
Assertions.assertEquals(cmdStatus.getProtoBufMessage().getBlockDeletionAck().getResultsCount(), 0);
}
}
}

private DeletedBlocksTransaction createDeletedBlocksTransaction(long txID,
long containerID) {
return DeletedBlocksTransaction.newBuilder()
Expand Down

0 comments on commit 16589ce

Please sign in to comment.