Skip to content

Commit

Permalink
Address comment to add a new system store type filter
Browse files Browse the repository at this point in the history
  • Loading branch information
sixpluszero committed Jan 15, 2025
1 parent 393a7ab commit d1611a0
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,6 @@ private static void runClusterCommand(CommandLine cmd) {
System.out.println(
"[**** Cluster Command Params ****] Cluster: " + clusterName + ", Task: " + task + ", Checkpoint: "
+ checkpointFile + ", Parallelism: " + parallelism);

// Create child data center controller client map.
ChildAwareResponse childAwareResponse = controllerClient.listChildControllers(clusterName);
Map<String, ControllerClient> controllerClientMap = getControllerClientMap(clusterName, childAwareResponse);
Expand Down Expand Up @@ -912,7 +911,19 @@ private static void runClusterCommand(CommandLine cmd) {
// Validate task type. For now, we only has one task, if we have more task in the future, we can extend this logic.
Supplier<Function<String, Boolean>> functionSupplier;
if (SystemStorePushTask.TASK_NAME.equals(task)) {
functionSupplier = () -> new SystemStorePushTask(controllerClient, controllerClientMap, clusterName);
String systemStoreType = getOptionalArgument(cmd, Arg.SYSTEM_STORE_TYPE);
if (systemStoreType != null) {
if (!(systemStoreType.equalsIgnoreCase(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.toString())
|| systemStoreType.equalsIgnoreCase(VeniceSystemStoreType.META_STORE.toString()))) {
printErrAndExit("System store type: " + systemStoreType + " is not supported.");
}
}
System.out.println(
functionSupplier = () -> new SystemStorePushTask(
controllerClient,
controllerClientMap,
clusterName,
systemStoreType == null ? Optional.empty() : Optional.of(systemStoreType)));
} else {
System.out.println("Undefined task: " + task);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.utils.Utils;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
Expand All @@ -33,8 +35,7 @@ public class SystemStorePushTask implements Function<String, Boolean> {
private static final int JOB_POLLING_RETRY_COUNT = 200;
private static final int JOB_POLLING_RETRY_PERIOD_IN_SECONDS = 5;
private static final String SYSTEM_STORE_PUSH_TASK_LOG_PREFIX = "[**** SYSTEM STORE PUSH ****]";
private static final List<VeniceSystemStoreType> SYSTEM_STORE_TYPE =
Arrays.asList(VeniceSystemStoreType.META_STORE, VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE);
private final List<VeniceSystemStoreType> systemStoreTypeList;

private final ControllerClient parentControllerClient;
private final String clusterName;
Expand All @@ -43,10 +44,17 @@ public class SystemStorePushTask implements Function<String, Boolean> {
public SystemStorePushTask(
ControllerClient parentControllerClient,
Map<String, ControllerClient> controllerClientMap,
String clusterName) {
String clusterName,
Optional<String> systemStoreTypeFilter) {
this.parentControllerClient = parentControllerClient;
this.childControllerClientMap = controllerClientMap;
this.clusterName = clusterName;
if (systemStoreTypeFilter.isPresent()) {
systemStoreTypeList = Collections.singletonList(VeniceSystemStoreType.valueOf(systemStoreTypeFilter.get()));
} else {
systemStoreTypeList =
Arrays.asList(VeniceSystemStoreType.META_STORE, VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE);
}
}

public Boolean apply(String storeName) {
Expand All @@ -64,7 +72,7 @@ public Boolean apply(String storeName) {
return false;
}

for (VeniceSystemStoreType type: SYSTEM_STORE_TYPE) {
for (VeniceSystemStoreType type: systemStoreTypeList) {
String systemStoreName = type.getSystemStoreName(storeName);
/**
* In current implementation, a push to system store will flip the flag to true, which can introduce unexpected
Expand Down

0 comments on commit d1611a0

Please sign in to comment.