Skip to content

Commit

Permalink
PR comments WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
DevenAhluwalia committed Dec 20, 2024
1 parent bdd7024 commit 1e6cefa
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1,73 +1,77 @@
package com.github.ambry.store;

/**
* The types of hydration protocol transitions that can happen.
* The possible options are :- Blob based and File copy based hydration
*/
public enum ReplicationProtocolTransitionType {
/**
* Pre restart protocol: NA
* Bootstrap status: NA
* Post restart protocol: Blob based
*/
NEW_PARTITION_TO_BLOB_BASED_BOOTSTRAP,
NEW_PARTITION_TO_BLOB_BASED_HYDRATION,

/**
* Pre restart protocol: NA
* Bootstrap status: NA
* Post restart protocol: File based
*/
NEW_PARTITION_TO_FILE_BASED_BOOTSTRAP,
NEW_PARTITION_TO_FILE_BASED_HYDRATION,

/**
* Pre restart protocol: Blob based
* Bootstrap status: Complete
* Post restart protocol: File based
*/
BLOB_BASED_COMPLETE_TO_FILE_BASED_BOOTSTRAP,
BLOB_BASED_HYDRATION_COMPLETE_TO_FILE_BASED_HYDRATION,

/**
* Pre restart protocol: File based
* Bootstrap status: Complete
* Post restart protocol: Blob based
*/
FILE_BASED_COMPLETE_TO_BLOB_BASED_BOOTSTRAP,
FILE_BASED_HYDRATION_COMPLETE_TO_BLOB_BASED_HYDRATION,

/**
* Pre restart protocol: Blob based
* Bootstrap status: Complete
* Post restart protocol: Blob based
*/
BLOB_BASED_COMPLETE_TO_BLOB_BASED_BOOTSTRAP,
BLOB_BASED_HYDRATION_COMPLETE_TO_BLOB_BASED_HYDRATION,

/**
* Pre restart protocol: File based
* Bootstrap status: Complete
* Post restart protocol: File based
*/
FILE_BASED_COMPLETE_TO_FILE_BASED_BOOTSTRAP,
FILE_BASED_HYDRATION_COMPLETE_TO_FILE_BASED_HYDRATION,

/**
* Pre restart protocol: Blob based
* Bootstrap status: InComplete
* Post restart protocol: Blob based
*/
BLOB_BASED_INCOMPLETE_TO_BLOB_BASED_BOOTSTRAP,
BLOB_BASED_HYDRATION_INCOMPLETE_TO_BLOB_BASED_HYDRATION,

/**
* Pre restart protocol: File based
* Bootstrap status: InComplete
* Post restart protocol: File based
*/
FILE_BASED_INCOMPLETE_TO_FILE_BASED_BOOTSTRAP,
FILE_BASED_HYDRATION_INCOMPLETE_TO_FILE_BASED_HYDRATION,

/**
* Pre restart protocol: Blob based
* Bootstrap status: InComplete
* Post restart protocol: File based
*/
BLOB_BASED_INCOMPLETE_TO_FILE_BASED_BOOTSTRAP,
BLOB_BASED_HYDRATION_INCOMPLETE_TO_FILE_BASED_HYDRATION,

/**
* Pre restart protocol: File based
* Bootstrap status: InComplete
* Post restart protocol: Blob based
*/
FILE_BASED_INCOMPLETE_TO_BLOB_BASED_BOOTSTRAP
FILE_BASED_HYDRATION_INCOMPLETE_TO_BLOB_BASED_HYDRATION
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.github.ambry.clustermap;

import org.json.JSONException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import com.github.ambry.rest.ServerSecurityServiceFactory;
import com.github.ambry.rest.StorageServerNettyFactory;
import com.github.ambry.server.storagestats.AggregatedAccountStorageStats;
import com.github.ambry.store.BootstrapController;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.StorageManager;
import com.github.ambry.store.StoreKeyConverterFactory;
Expand Down Expand Up @@ -328,10 +327,6 @@ public void startup() throws InstantiationException {
new BlobStoreHardDelete(), clusterParticipants, time, new BlobStoreRecovery(), accountService);
storageManager.start();

BootstrapController bootstrapController = new BootstrapController(
storageManager, storeConfig, serverConfig, clusterParticipant);
bootstrapController.start();

// if there are more than one participant on local node, we create a consistency checker to monitor and alert any
// mismatch in sealed/stopped replica lists that maintained by each participant.
if (clusterParticipants != null && clusterParticipants.size() > 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@
import static com.github.ambry.clustermap.StateTransitionException.TransitionErrorCode.*;


/**
* BootstrapController is responsible for managing the hydration protocol
* for Offline -> Bootstrap state transition of partitions.
*/
public class BootstrapController {

private boolean isRunning = false;
private final String BOOTSTRAP_IN_PROGRESS_FILE_NAME;
private final String FILECOPY_IN_PROGRESS_FILE_NAME;
private final Pattern allLogSegmentFilesPattern = Pattern.compile(".*_log.*");
Expand Down Expand Up @@ -70,12 +74,29 @@ public BootstrapController(
partitionStateChangeListeners.get(StateModelListenerType.FileCopyManagerListener);
}

public void start() {
void start() {
if(!isRunning) {
//Start the FileStore
isRunning = true;
}
}

void shutdown() {
//Implement shutdown Hook.
isRunning = false;
}

boolean isRunning() {
return isRunning;
}

class BootstrapControllerImpl implements PartitionStateChangeListener {
EnumSet<ReplicationProtocolTransitionType> replicationProtocolTransitionType;

/**
* TODO: Add context
* @param partitionName of the partition.
*/
@Override
public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName) {
logger.info("Bootstrap Controller's state change listener invoked for partition `{}`, state change `{}`",
Expand All @@ -85,24 +106,21 @@ public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName)
PartitionStateChangeListener listenerToInvoke = null;

if (null == replica) {
// there can be two scenarios:
// 1. this is the first time to add new replica onto current node;
// 2. last replica addition failed at some point before updating InstanceConfig in Helix
if (isFileCopyFeatureEnabled()) {
// "New partition -> FC"
// This is a new partition placement and FileCopy bootstrap protocol is enabled.
listenerToInvoke = fileCopyManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.NEW_PARTITION_TO_FILE_BASED_BOOTSTRAP);
ReplicationProtocolTransitionType.NEW_PARTITION_TO_FILE_BASED_HYDRATION);
logStateChange("New partition -> FC", partitionName);
} else {
// "New partition -> R"
// This is a new partition placement and FileCopy bootstrap protocol is disabled.
listenerToInvoke = storageManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.NEW_PARTITION_TO_BLOB_BASED_BOOTSTRAP);
ReplicationProtocolTransitionType.NEW_PARTITION_TO_BLOB_BASED_HYDRATION);
logStateChange("New partition -> R", partitionName);
}
} else {
Expand All @@ -114,7 +132,7 @@ public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName)
listenerToInvoke = storageManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.BLOB_BASED_INCOMPLETE_TO_FILE_BASED_BOOTSTRAP);
ReplicationProtocolTransitionType.BLOB_BASED_HYDRATION_INCOMPLETE_TO_FILE_BASED_HYDRATION);
logStateChange("R.Incomplete -> FC", partitionName);
} else if (isAnyLogSegmentExists(replica.getPartitionId())) {
if (isFileExists(replica.getPartitionId(), FILECOPY_IN_PROGRESS_FILE_NAME)) {
Expand All @@ -124,7 +142,7 @@ public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName)
listenerToInvoke = fileCopyManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.FILE_BASED_INCOMPLETE_TO_FILE_BASED_BOOTSTRAP);
ReplicationProtocolTransitionType.FILE_BASED_HYDRATION_INCOMPLETE_TO_FILE_BASED_HYDRATION);
logStateChange("FC.Incomplete -> FC", partitionName);
} else {
// R.complete -> FC or FC.complete -> FC
Expand All @@ -134,8 +152,8 @@ public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName)
listenerToInvoke = storageManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.BLOB_BASED_COMPLETE_TO_FILE_BASED_BOOTSTRAP,
ReplicationProtocolTransitionType.FILE_BASED_COMPLETE_TO_FILE_BASED_BOOTSTRAP);
ReplicationProtocolTransitionType.BLOB_BASED_HYDRATION_COMPLETE_TO_FILE_BASED_HYDRATION,
ReplicationProtocolTransitionType.FILE_BASED_HYDRATION_COMPLETE_TO_FILE_BASED_HYDRATION);
logStateChange("R.complete -> FC or FC.complete -> FC", partitionName);
}
}
Expand All @@ -147,7 +165,7 @@ public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName)
listenerToInvoke = storageManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.BLOB_BASED_INCOMPLETE_TO_BLOB_BASED_BOOTSTRAP);
ReplicationProtocolTransitionType.BLOB_BASED_HYDRATION_INCOMPLETE_TO_BLOB_BASED_HYDRATION);
logStateChange("R.Incomplete -> R", partitionName);
} else if (isAnyLogSegmentExists(replica.getPartitionId())) {
if (isFileExists(replica.getPartitionId(), FILECOPY_IN_PROGRESS_FILE_NAME)) {
Expand All @@ -164,7 +182,7 @@ public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName)
listenerToInvoke = storageManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.FILE_BASED_INCOMPLETE_TO_BLOB_BASED_BOOTSTRAP);
ReplicationProtocolTransitionType.FILE_BASED_HYDRATION_INCOMPLETE_TO_BLOB_BASED_HYDRATION);
logStateChange("FC.Incomplete -> R", partitionName);
} else {
// R.complete -> R or FC.complete -> R
Expand All @@ -174,8 +192,8 @@ public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName)
listenerToInvoke = storageManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.BLOB_BASED_COMPLETE_TO_BLOB_BASED_BOOTSTRAP,
ReplicationProtocolTransitionType.FILE_BASED_COMPLETE_TO_BLOB_BASED_BOOTSTRAP);
ReplicationProtocolTransitionType.BLOB_BASED_HYDRATION_COMPLETE_TO_BLOB_BASED_HYDRATION,
ReplicationProtocolTransitionType.FILE_BASED_HYDRATION_COMPLETE_TO_BLOB_BASED_HYDRATION);
logStateChange("R.complete -> R or FC.complete -> R", partitionName);
}
}
Expand Down Expand Up @@ -238,13 +256,13 @@ private boolean isFileCopyFeatureEnabled() {
}

// Helper method to check if a file exists in the partition.
private boolean isFileExists(
boolean isFileExists(
@Nonnull PartitionId partitionId, @Nonnull String fileName) {
return storeManager.isFileExists(partitionId, fileName);
}

// Helper method to check if any log segment files exist in the partition.
private boolean isAnyLogSegmentExists(@Nonnull PartitionId partitionId) {
boolean isAnyLogSegmentExists(@Nonnull PartitionId partitionId) {
try {
return storeManager.isFilesExistForPattern(partitionId, allLogSegmentFilesPattern);
} catch (IOException e) {
Expand All @@ -255,10 +273,12 @@ private boolean isAnyLogSegmentExists(@Nonnull PartitionId partitionId) {
}

// Helper method to delete the file copy data.
private void deleteFileCopyData(@Nonnull PartitionId partitionId) throws IOException, StoreException {
void deleteFileCopyData(@Nonnull PartitionId partitionId) throws IOException, StoreException {
// Currently we’ll delete all datasets by removing this partition's BlobStore
// TODO: An optimisation could be explored to only delete incomplete datasets.
storeManager.removeBlobStore(partitionId);
// TODO: Write logic to cleanup the files inside the directory

throw new UnsupportedOperationException("Not implemented yet");
}
}
}
Loading

0 comments on commit 1e6cefa

Please sign in to comment.