diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java
index 79da773511..b718058e88 100644
--- a/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java
+++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java
@@ -1,12 +1,12 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -35,6 +35,7 @@
import com.github.ambry.store.StoreKeyConverterFactory;
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.utils.SystemTime;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -49,234 +50,239 @@
* {@link VcrReplicationManager} is used to backup partitions to Cloud. Partitions assignment is handled by Helix.
*/
public class VcrReplicationManager extends ReplicationEngine {
- private final CloudConfig cloudConfig;
- private final StoreConfig storeConfig;
- private final VcrMetrics vcrMetrics;
- private final VirtualReplicatorCluster virtualReplicatorCluster;
- private final CloudStorageCompactor cloudStorageCompactor;
- private final CloudContainerCompactor cloudContainerCompactor;
- private final Map partitionStoreMap = new HashMap<>();
- private final boolean trackPerDatacenterLagInMetric;
+ private final CloudConfig cloudConfig;
+ private final StoreConfig storeConfig;
+ private final VcrMetrics vcrMetrics;
+ private final VirtualReplicatorCluster virtualReplicatorCluster;
+ private final CloudStorageCompactor cloudStorageCompactor;
+ private final CloudContainerCompactor cloudContainerCompactor;
+ private final Map partitionStoreMap = new HashMap<>();
+ private final boolean trackPerDatacenterLagInMetric;
- public VcrReplicationManager(CloudConfig cloudConfig, ReplicationConfig replicationConfig,
- ClusterMapConfig clusterMapConfig, StoreConfig storeConfig, StoreManager storeManager,
- StoreKeyFactory storeKeyFactory, ClusterMap clusterMap, VirtualReplicatorCluster virtualReplicatorCluster,
- CloudDestination cloudDestination, ScheduledExecutorService scheduler, ConnectionPool connectionPool,
- VcrMetrics vcrMetrics, NotificationSystem requestNotification, StoreKeyConverterFactory storeKeyConverterFactory,
- String transformerClassName) throws ReplicationException, IllegalStateException {
- super(replicationConfig, clusterMapConfig, storeKeyFactory, clusterMap, scheduler,
- virtualReplicatorCluster.getCurrentDataNodeId(), Collections.emptyList(), connectionPool,
- vcrMetrics.getMetricRegistry(), requestNotification, storeKeyConverterFactory, transformerClassName, null,
- storeManager, null);
- this.cloudConfig = cloudConfig;
- this.storeConfig = storeConfig;
- this.virtualReplicatorCluster = virtualReplicatorCluster;
- this.vcrMetrics = vcrMetrics;
- this.persistor =
- new CloudTokenPersistor(replicaTokenFileName, mountPathToPartitionInfos, replicationMetrics, clusterMap,
- tokenHelper, cloudDestination);
- this.cloudStorageCompactor =
- cloudConfig.cloudBlobCompactionEnabled ? new CloudStorageCompactor(cloudDestination, cloudConfig,
- partitionToPartitionInfo.keySet(), vcrMetrics) : null;
- this.cloudContainerCompactor = cloudDestination.getContainerCompactor();
- trackPerDatacenterLagInMetric = replicationConfig.replicationTrackPerDatacenterLagFromLocal;
- // We need a datacenter to replicate from, which should be specified in the cloud config.
- if (cloudConfig.vcrSourceDatacenters.isEmpty()) {
- throw new IllegalStateException("One or more VCR cross colo replication peer datacenter should be specified");
+ public VcrReplicationManager(CloudConfig cloudConfig, ReplicationConfig replicationConfig,
+ ClusterMapConfig clusterMapConfig, StoreConfig storeConfig, StoreManager storeManager,
+ StoreKeyFactory storeKeyFactory, ClusterMap clusterMap, VirtualReplicatorCluster virtualReplicatorCluster,
+ CloudDestination cloudDestination, ScheduledExecutorService scheduler, ConnectionPool connectionPool,
+ VcrMetrics vcrMetrics, NotificationSystem requestNotification, StoreKeyConverterFactory storeKeyConverterFactory,
+ String transformerClassName) throws ReplicationException, IllegalStateException {
+ super(replicationConfig, clusterMapConfig, storeKeyFactory, clusterMap, scheduler,
+ virtualReplicatorCluster.getCurrentDataNodeId(), Collections.emptyList(), connectionPool,
+ vcrMetrics.getMetricRegistry(), requestNotification, storeKeyConverterFactory, transformerClassName, null,
+ storeManager, null);
+ this.cloudConfig = cloudConfig;
+ this.storeConfig = storeConfig;
+ this.virtualReplicatorCluster = virtualReplicatorCluster;
+ this.vcrMetrics = vcrMetrics;
+ this.persistor =
+ new CloudTokenPersistor(replicaTokenFileName, mountPathToPartitionInfos, replicationMetrics, clusterMap,
+ tokenHelper, cloudDestination);
+ this.cloudStorageCompactor =
+ cloudConfig.cloudBlobCompactionEnabled ? new CloudStorageCompactor(cloudDestination, cloudConfig,
+ partitionToPartitionInfo.keySet(), vcrMetrics) : null;
+ this.cloudContainerCompactor = cloudDestination.getContainerCompactor();
+ trackPerDatacenterLagInMetric = replicationConfig.replicationTrackPerDatacenterLagFromLocal;
+ // We need a datacenter to replicate from, which should be specified in the cloud config.
+ if (cloudConfig.vcrSourceDatacenters.isEmpty()) {
+ throw new IllegalStateException("One or more VCR cross colo replication peer datacenter should be specified");
+ }
}
- }
- @Override
- public void start() throws ReplicationException {
- // Add listener for new coming assigned partition
- virtualReplicatorCluster.addListener(new VirtualReplicatorClusterListener() {
- @Override
- public void onPartitionAdded(PartitionId partitionId) {
- try {
- addReplica(partitionId);
- logger.info("Partition {} added to {}", partitionId, dataNodeId);
- } catch (ReplicationException e) {
- vcrMetrics.addPartitionErrorCount.inc();
- logger.error("Exception on adding Partition {} to {}: ", partitionId, dataNodeId, e);
- } catch (Exception e) {
- // Helix will run into error state if exception throws in Helix context.
- vcrMetrics.addPartitionErrorCount.inc();
- logger.error("Unknown Exception on adding Partition {} to {}: ", partitionId, dataNodeId, e);
- }
- }
+ @Override
+ public void start() throws ReplicationException {
+ // Add listener for new coming assigned partition
+ virtualReplicatorCluster.addListener(new VirtualReplicatorClusterListener() {
+ @Override
+ public void onPartitionAdded(PartitionId partitionId) {
+ try {
+ addReplica(partitionId);
+ logger.info("Partition {} added to {}", partitionId, dataNodeId);
+ } catch (ReplicationException e) {
+ vcrMetrics.addPartitionErrorCount.inc();
+ logger.error("Exception on adding Partition {} to {}: ", partitionId, dataNodeId, e);
+ } catch (Exception e) {
+ // Helix will run into error state if exception throws in Helix context.
+ vcrMetrics.addPartitionErrorCount.inc();
+ logger.error("Unknown Exception on adding Partition {} to {}: ", partitionId, dataNodeId, e);
+ }
+ }
+
+ @Override
+ public void onPartitionRemoved(PartitionId partitionId) {
+ try {
+ removeReplica(partitionId);
+ } catch (Exception e) {
+ // Helix will run into error state if exception throws in Helix context.
+ vcrMetrics.removePartitionErrorCount.inc();
+ logger.error("Exception on removing Partition {} from {}: ", partitionId, dataNodeId, e);
+ }
+ }
+ });
- @Override
- public void onPartitionRemoved(PartitionId partitionId) {
try {
- removeReplica(partitionId);
+ virtualReplicatorCluster.participate();
} catch (Exception e) {
- // Helix will run into error state if exception throws in Helix context.
- vcrMetrics.removePartitionErrorCount.inc();
- logger.error("Exception on removing Partition {} from {}: ", partitionId, dataNodeId, e);
+ throw new ReplicationException("Cluster participate failed.", e);
}
- }
- });
- try {
- virtualReplicatorCluster.participate();
- } catch (Exception e) {
- throw new ReplicationException("Cluster participate failed.", e);
- }
+ // start background persistent thread
+ // start scheduler thread to persist index in the background
+ scheduleTask(persistor, true, replicationConfig.replicationTokenFlushDelaySeconds,
+ replicationConfig.replicationTokenFlushIntervalSeconds, "replica token persistor");
- // start background persistent thread
- // start scheduler thread to persist index in the background
- scheduleTask(persistor, true, replicationConfig.replicationTokenFlushDelaySeconds,
- replicationConfig.replicationTokenFlushIntervalSeconds, "replica token persistor");
+ // Schedule thread to purge dead blobs for this VCR's partitions
+ // after delay to allow startup to finish.
+ scheduleTask(cloudStorageCompactor, cloudConfig.cloudBlobCompactionEnabled,
+ cloudConfig.cloudBlobCompactionStartupDelaySecs,
+ TimeUnit.HOURS.toSeconds(cloudConfig.cloudBlobCompactionIntervalHours), "cloud blob compaction");
- // Schedule thread to purge dead blobs for this VCR's partitions
- // after delay to allow startup to finish.
- scheduleTask(cloudStorageCompactor, cloudConfig.cloudBlobCompactionEnabled,
- cloudConfig.cloudBlobCompactionStartupDelaySecs,
- TimeUnit.HOURS.toSeconds(cloudConfig.cloudBlobCompactionIntervalHours), "cloud blob compaction");
-
- // Schedule thread to purge blobs belonging to deprecated containers for this VCR's partitions
- // after delay to allow startup to finish.
- scheduleTask(() -> cloudContainerCompactor.compactAssignedDeprecatedContainers(
- virtualReplicatorCluster.getAssignedPartitionIds()), cloudConfig.cloudContainerCompactionEnabled,
- cloudConfig.cloudContainerCompactionStartupDelaySecs,
- TimeUnit.HOURS.toSeconds(cloudConfig.cloudContainerCompactionIntervalHours), "cloud container compaction");
- }
-
- /**
- * Schedule the specified task if enabled with the specified delay and interval.
- * @param task {@link Runnable} task to be scheduled.
- * @param isEnabled flag indicating if the task is enabled. If false the task is not scheduled.
- * @param delaySec initial delay to allow startup to finish before starting task.
- * @param intervalSec period between successive executions.
- * @param taskName name of the task being scheduled.
- */
- private void scheduleTask(Runnable task, boolean isEnabled, long delaySec, long intervalSec, String taskName) {
- if (isEnabled) {
- scheduler.scheduleAtFixedRate(task, delaySec, intervalSec, TimeUnit.SECONDS);
- logger.info("Scheduled {} task to run every {} seconds starting in {} seconds.", taskName, intervalSec, delaySec);
- } else {
- logger.warn("Running with {} turned off!", taskName);
+ // Schedule thread to purge blobs belonging to deprecated containers for this VCR's partitions
+ // after delay to allow startup to finish.
+ scheduleTask(() -> cloudContainerCompactor.compactAssignedDeprecatedContainers(
+ virtualReplicatorCluster.getAssignedPartitionIds()), cloudConfig.cloudContainerCompactionEnabled,
+ cloudConfig.cloudContainerCompactionStartupDelaySecs,
+ TimeUnit.HOURS.toSeconds(cloudConfig.cloudContainerCompactionIntervalHours), "cloud container compaction");
}
- }
- /**
- * Add a replica of given {@link PartitionId} and its {@link RemoteReplicaInfo}s to backup list.
- * @param partitionId the {@link PartitionId} of the replica to add.
- * @throws ReplicationException if replicas initialization failed.
- */
- void addReplica(PartitionId partitionId) throws ReplicationException {
- if (partitionToPartitionInfo.containsKey(partitionId)) {
- throw new ReplicationException("Partition " + partitionId + " already exists on " + dataNodeId);
- }
- ReplicaId cloudReplica = new CloudReplica(partitionId, virtualReplicatorCluster.getCurrentDataNodeId());
- if (!storeManager.addBlobStore(cloudReplica)) {
- logger.error("Can't start cloudstore for replica {}", cloudReplica);
- throw new ReplicationException("Can't start cloudstore for replica " + cloudReplica);
- }
- List extends ReplicaId> peerReplicas = cloudReplica.getPeerReplicaIds();
- List remoteReplicaInfos = new ArrayList<>();
- Store store = storeManager.getStore(partitionId);
- if (peerReplicas != null) {
- for (ReplicaId peerReplica : peerReplicas) {
- if (!shouldReplicateFromDc(peerReplica.getDataNodeId().getDatacenterName())) {
- continue;
+ /**
+ * Schedule the specified task if enabled with the specified delay and interval.
+ * @param task {@link Runnable} task to be scheduled.
+ * @param isEnabled flag indicating if the task is enabled. If false the task is not scheduled.
+ * @param delaySec initial delay to allow startup to finish before starting task.
+ * @param intervalSec period between successive executions.
+ * @param taskName name of the task being scheduled.
+ */
+ private void scheduleTask(Runnable task, boolean isEnabled, long delaySec, long intervalSec, String taskName) {
+ if (isEnabled) {
+ scheduler.scheduleAtFixedRate(task, delaySec, intervalSec, TimeUnit.SECONDS);
+ logger.info("Scheduled {} task to run every {} seconds starting in {} seconds.", taskName, intervalSec, delaySec);
+ } else {
+ logger.warn("Running with {} turned off!", taskName);
}
- // We need to ensure that a replica token gets persisted only after the corresponding data in the
- // store gets flushed to cloud. We use the store flush interval multiplied by a constant factor
- // to determine the token flush interval
- FindTokenFactory findTokenFactory =
- tokenHelper.getFindTokenFactoryFromReplicaType(peerReplica.getReplicaType());
- RemoteReplicaInfo remoteReplicaInfo =
- new RemoteReplicaInfo(peerReplica, cloudReplica, store, findTokenFactory.getNewFindToken(),
- storeConfig.storeDataFlushIntervalSeconds * SystemTime.MsPerSec * Replication_Delay_Multiplier,
- SystemTime.getInstance(), peerReplica.getDataNodeId().getPortToConnectTo());
- replicationMetrics.addMetricsForRemoteReplicaInfo(remoteReplicaInfo, trackPerDatacenterLagInMetric);
- remoteReplicaInfos.add(remoteReplicaInfo);
- }
- PartitionInfo partitionInfo = new PartitionInfo(remoteReplicaInfos, partitionId, store, cloudReplica);
- partitionToPartitionInfo.put(partitionId, partitionInfo);
- // For CloudBackupManager, at most one PartitionInfo in the set.
- mountPathToPartitionInfos.computeIfAbsent(cloudReplica.getMountPath(), key -> ConcurrentHashMap.newKeySet())
- .add(partitionInfo);
- partitionStoreMap.put(partitionId.toPathString(), store);
- } else {
- try {
- storeManager.shutdownBlobStore(partitionId);
- storeManager.removeBlobStore(partitionId);
- } finally {
- throw new ReplicationException(
- "Failed to add Partition " + partitionId + " on " + dataNodeId + " , because no peer replicas found.");
- }
}
- // Reload replication token if exist.
- int tokenReloadFailCount = reloadReplicationTokenIfExists(cloudReplica, remoteReplicaInfos);
- vcrMetrics.tokenReloadWarnCount.inc(tokenReloadFailCount);
- // Add remoteReplicaInfos to {@link ReplicaThread}.
- addRemoteReplicaInfoToReplicaThread(remoteReplicaInfos, true);
- if (replicationConfig.replicationTrackPerPartitionLagFromRemote) {
- replicationMetrics.addLagMetricForPartition(partitionId);
+ /**
+ * Add a replica of given {@link PartitionId} and its {@link RemoteReplicaInfo}s to backup list.
+ * @param partitionId the {@link PartitionId} of the replica to add.
+ * @throws ReplicationException if replicas initialization failed.
+ */
+ void addReplica(PartitionId partitionId) throws ReplicationException {
+ if (partitionToPartitionInfo.containsKey(partitionId)) {
+ throw new ReplicationException("Partition " + partitionId + " already exists on " + dataNodeId);
+ }
+ ReplicaId cloudReplica = new CloudReplica(partitionId, virtualReplicatorCluster.getCurrentDataNodeId());
+ if (!storeManager.addBlobStore(cloudReplica)) {
+ logger.error("Can't start cloudstore for replica {}", cloudReplica);
+ throw new ReplicationException("Can't start cloudstore for replica " + cloudReplica);
+ }
+ List extends ReplicaId> peerReplicas = cloudReplica.getPeerReplicaIds();
+ List remoteReplicaInfos = new ArrayList<>();
+ Store store = storeManager.getStore(partitionId);
+ if (peerReplicas != null) {
+ for (ReplicaId peerReplica : peerReplicas) {
+ if (!shouldReplicateFromDc(peerReplica.getDataNodeId().getDatacenterName())) {
+ continue;
+ }
+ // We need to ensure that a replica token gets persisted only after the corresponding data in the
+ // store gets flushed to cloud. We use the store flush interval multiplied by a constant factor
+ // to determine the token flush interval
+ FindTokenFactory findTokenFactory =
+ tokenHelper.getFindTokenFactoryFromReplicaType(peerReplica.getReplicaType());
+ RemoteReplicaInfo remoteReplicaInfo =
+ new RemoteReplicaInfo(peerReplica, cloudReplica, store, findTokenFactory.getNewFindToken(),
+ storeConfig.storeDataFlushIntervalSeconds * SystemTime.MsPerSec * Replication_Delay_Multiplier,
+ SystemTime.getInstance(), peerReplica.getDataNodeId().getPortToConnectTo());
+ replicationMetrics.addMetricsForRemoteReplicaInfo(remoteReplicaInfo, trackPerDatacenterLagInMetric);
+ remoteReplicaInfos.add(remoteReplicaInfo);
+ }
+ PartitionInfo partitionInfo = new PartitionInfo(remoteReplicaInfos, partitionId, store, cloudReplica);
+ partitionToPartitionInfo.put(partitionId, partitionInfo);
+ // For CloudBackupManager, at most one PartitionInfo in the set.
+ mountPathToPartitionInfos.computeIfAbsent(cloudReplica.getMountPath(), key -> ConcurrentHashMap.newKeySet())
+ .add(partitionInfo);
+ partitionStoreMap.put(partitionId.toPathString(), store);
+ } else {
+ try {
+ storeManager.shutdownBlobStore(partitionId);
+ storeManager.removeBlobStore(partitionId);
+ } finally {
+ throw new ReplicationException(
+ "Failed to add Partition " + partitionId + " on " + dataNodeId + " , because no peer replicas found.");
+ }
+ }
+ // Reload replication token if exist.
+ int tokenReloadFailCount = reloadReplicationTokenIfExists(cloudReplica, remoteReplicaInfos);
+ vcrMetrics.tokenReloadWarnCount.inc(tokenReloadFailCount);
+
+ // Add remoteReplicaInfos to {@link ReplicaThread}.
+ addRemoteReplicaInfoToReplicaThread(remoteReplicaInfos, true);
+ if (replicationConfig.replicationTrackPerPartitionLagFromRemote) {
+ replicationMetrics.addLagMetricForPartition(partitionId);
+ }
}
- }
- /**
- * Remove a replica of given {@link PartitionId} and its {@link RemoteReplicaInfo}s from the backup list.
- * @param partitionId the {@link PartitionId} of the replica to removed.
- */
- void removeReplica(PartitionId partitionId) {
- stopPartitionReplication(partitionId);
- Store cloudStore = partitionStoreMap.get(partitionId.toPathString());
- if (cloudStore != null) {
- storeManager.shutdownBlobStore(partitionId);
- storeManager.removeBlobStore(partitionId);
- } else {
- logger.warn("Store not found for partition {}", partitionId);
+ /**
+ * Remove a replica of given {@link PartitionId} and its {@link RemoteReplicaInfo}s from the backup list.
+ * @param partitionId the {@link PartitionId} of the replica to removed.
+ */
+ void removeReplica(PartitionId partitionId) {
+ stopPartitionReplication(partitionId);
+ Store cloudStore = partitionStoreMap.get(partitionId.toPathString());
+ if (cloudStore != null) {
+ storeManager.shutdownBlobStore(partitionId);
+ storeManager.removeBlobStore(partitionId);
+ } else {
+ logger.warn("Store not found for partition {}", partitionId);
+ }
+ logger.info("Partition {} removed from {}", partitionId, dataNodeId);
+ // We don't close cloudBlobStore because because replicate in ReplicaThread is using a copy of
+ // remoteReplicaInfo which needs CloudBlobStore.
}
- logger.info("Partition {} removed from {}", partitionId, dataNodeId);
- // We don't close cloudBlobStore because because replicate in ReplicaThread is using a copy of
- // remoteReplicaInfo which needs CloudBlobStore.
- }
- @Override
- public void shutdown() throws ReplicationException {
- // TODO: can do these in parallel
- if (cloudStorageCompactor != null) {
- cloudStorageCompactor.shutdown();
+ @Override
+ public void shutdown() throws ReplicationException {
+ // TODO: can do these in parallel
+ if (cloudStorageCompactor != null) {
+ cloudStorageCompactor.shutdown();
+ }
+ if (cloudContainerCompactor != null) {
+ cloudContainerCompactor.shutdown();
+ }
+ super.shutdown();
}
- if (cloudContainerCompactor != null) {
- cloudContainerCompactor.shutdown();
+
+ public VcrMetrics getVcrMetrics() {
+ return vcrMetrics;
}
- super.shutdown();
- }
- public VcrMetrics getVcrMetrics() {
- return vcrMetrics;
- }
+ /** For testing only */
+ CloudStorageCompactor getCloudStorageCompactor() {
+ return cloudStorageCompactor;
+ }
- /** For testing only */
- CloudStorageCompactor getCloudStorageCompactor() {
- return cloudStorageCompactor;
- }
+ @Override
+ public void updateTotalBytesReadByRemoteReplica(PartitionId partitionId, String hostName, String replicaPath,
+ long totalBytesRead) {
+ // Since replica metadata request for a single partition can goto multiple vcr nodes, totalBytesReadByRemoteReplica
+ // cannot be populated locally on any vcr node.
+ }
- @Override
- public void updateTotalBytesReadByRemoteReplica(PartitionId partitionId, String hostName, String replicaPath,
- long totalBytesRead) {
- // Since replica metadata request for a single partition can goto multiple vcr nodes, totalBytesReadByRemoteReplica
- // cannot be populated locally on any vcr node.
- }
+ @Override
+ public long getRemoteReplicaLagFromLocalInBytes(PartitionId partitionId, String hostName, String replicaPath) {
+ // TODO get replica lag from cosmos?
+ return -1;
+ }
- @Override
- public long getRemoteReplicaLagFromLocalInBytes(PartitionId partitionId, String hostName, String replicaPath) {
- // TODO get replica lag from cosmos?
- return -1;
- }
+ @Override
+ protected String getReplicaThreadName(String datacenterToReplicateFrom, int threadIndexWithinPool) {
+ return "Vcr" + super.getReplicaThreadName(datacenterToReplicateFrom, threadIndexWithinPool);
+ }
- /**
- * Check if replication is allowed from given datacenter.
- * @param datacenterName datacenter name to check.
- * @return true if replication is allowed. false otherwise.
- */
- private boolean shouldReplicateFromDc(String datacenterName) {
- return cloudConfig.vcrSourceDatacenters.contains(datacenterName);
- }
+ /**
+ * Check if replication is allowed from given datacenter.
+ * @param datacenterName datacenter name to check.
+ * @return true if replication is allowed. false otherwise.
+ */
+ private boolean shouldReplicateFromDc(String datacenterName) {
+ return cloudConfig.vcrSourceDatacenters.contains(datacenterName);
+ }
}
diff --git a/ambry-replication/src/main/java/com/github/ambry/replication/ReplicationEngine.java b/ambry-replication/src/main/java/com/github/ambry/replication/ReplicationEngine.java
index b86e227a12..120c56f14d 100644
--- a/ambry-replication/src/main/java/com/github/ambry/replication/ReplicationEngine.java
+++ b/ambry-replication/src/main/java/com/github/ambry/replication/ReplicationEngine.java
@@ -343,9 +343,7 @@ private List createThreadPool(String datacenter, int numberOfThre
ResponseHandler responseHandler = new ResponseHandler(clusterMap);
for (int i = 0; i < numberOfThreads; i++) {
boolean replicatingOverSsl = sslEnabledDatacenters.contains(datacenter);
- String threadIdentity =
- (startThread ? "Vcr" : "") + "ReplicaThread-" + (dataNodeId.getDatacenterName().equals(datacenter) ? "Intra-"
- : "Inter-") + i + "-" + datacenter;
+ String threadIdentity = getReplicaThreadName(datacenter, i);
try {
StoreKeyConverter threadSpecificKeyConverter = storeKeyConverterFactory.getStoreKeyConverter();
Transformer threadSpecificTransformer =
@@ -370,6 +368,17 @@ private List createThreadPool(String datacenter, int numberOfThre
return replicaThreads;
}
+ /**
+ * Chooses a name for a new replica thread.
+ * @param datacenterToReplicateFrom The datacenter that we replicate from in this thread.
+ * @param threadIndexWithinPool The index of the thread within the thread pool.
+ * @return The name of the thread.
+ */
+ protected String getReplicaThreadName(String datacenterToReplicateFrom, int threadIndexWithinPool) {
+ return "ReplicaThread-" + (dataNodeId.getDatacenterName().equals(datacenterToReplicateFrom) ? "Intra-"
+ : "Inter-") + threadIndexWithinPool + "-" + datacenterToReplicateFrom;
+ }
+
/**
* Reads the replica tokens from storage, populates the Remote replica info,
* and re-persists the tokens if they have been reset.