Skip to content

Commit

Permalink
refactor and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Jan 22, 2025
1 parent e52f281 commit 74ad877
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public MetaSession(
}
} else {
for (String addr : addrList) {
rpc_address rpcAddr = new rpc_address();
if (rpcAddr.fromString(addr)) {
rpc_address rpcAddr = rpc_address.fromIpPort(addr);
if (rpcAddr != null) {
logger.info("add {} as meta server", addr);
metaList.add(clusterManager.getReplicaSession(rpcAddr));
} else {
Expand Down Expand Up @@ -105,9 +105,14 @@ public static rpc_address getMetaServiceForwardAddress(client_operator metaQuery
}

java.util.List<partition_configuration> partitions = op.get_response().getPartitions();
if (partitions == null || partitions.isEmpty()) return null;
if (partitions == null || partitions.isEmpty()) {
return null;
}

addr = partitions.get(0).getPrimary();
if (addr == null || addr.isInvalid()) return null;
if (addr == null || addr.isInvalid()) {
return null;
}
}

return addr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.time.Duration;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import org.apache.commons.lang3.reflect.FieldUtils;
Expand Down Expand Up @@ -56,9 +55,7 @@ public void before() throws Exception {}

@AfterEach
public void after() throws Exception {
rpc_address addr = new rpc_address();
addr.fromString("127.0.0.1:34602");
Toollet.tryStartServer(addr);
Toollet.tryStartServer(Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34602")));
}

private static void ensureNotLeader(rpc_address addr) {
Expand Down Expand Up @@ -87,22 +84,18 @@ public void testMetaConnect() throws Exception {
new ClusterManager(ClientOptions.builder().metaServers(address_list).build());
MetaSession session = manager.getMetaSession();

rpc_address addr = new rpc_address();
addr.fromString("127.0.0.1:34602");
rpc_address addr = Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34602"));
ensureNotLeader(addr);

ArrayList<FutureTask<Void>> callbacks = new ArrayList<FutureTask<Void>>();
ArrayList<FutureTask<Void>> callbacks = new ArrayList<>();
for (int i = 0; i < 1000; ++i) {
query_cfg_request req = new query_cfg_request("temp", new ArrayList<Integer>());
final client_operator op = new query_cfg_operator(new gpid(-1, -1), req);
FutureTask<Void> callback =
new FutureTask<Void>(
new Callable<Void>() {
@Override
public Void call() throws Exception {
assertEquals(error_code.error_types.ERR_OK, op.rpc_error.errno);
return null;
}
new FutureTask<>(
() -> {
assertEquals(error_code.error_types.ERR_OK, op.rpc_error.errno);
return null;
});
callbacks.add(callback);
session.asyncExecute(op, callback, 10);
Expand Down Expand Up @@ -257,17 +250,9 @@ public void testMetaForwardUnknownPrimary() throws Exception {
op.get_response().err = new error_code();
op.get_response().err.errno = error_code.error_types.ERR_FORWARD_TO_OTHERS;
op.get_response().partitions = Collections.singletonList(new partition_configuration());
op.get_response().partitions.set(0, new partition_configuration());
op.get_response().partitions.get(0).primary = rpc_address.fromIpPort("172.0.0.3:34601");
MetaSession.MetaRequestRound round =
new MetaSession.MetaRequestRound(
op,
new Runnable() {
@Override
public void run() {}
},
10,
meta.getMetaList().get(0));
new MetaSession.MetaRequestRound(op, () -> {}, 10, meta.getMetaList().get(0));

// do not retry after a failed QueryMeta.
Mockito.doNothing().when(meta).retryQueryMeta(round, false);
Expand Down

0 comments on commit 74ad877

Please sign in to comment.