From 74ad8779e432a1195e00c3bb0ca5ee17958dc372 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 22 Jan 2025 17:08:35 +0800 Subject: [PATCH] refactor and fix tests --- .../apache/pegasus/rpc/async/MetaSession.java | 13 +++++--- .../pegasus/rpc/async/MetaSessionTest.java | 31 +++++-------------- 2 files changed, 17 insertions(+), 27 deletions(-) diff --git a/java-client/src/main/java/org/apache/pegasus/rpc/async/MetaSession.java b/java-client/src/main/java/org/apache/pegasus/rpc/async/MetaSession.java index 8931afc993..cbd22d68b7 100644 --- a/java-client/src/main/java/org/apache/pegasus/rpc/async/MetaSession.java +++ b/java-client/src/main/java/org/apache/pegasus/rpc/async/MetaSession.java @@ -54,8 +54,8 @@ public MetaSession( } } else { for (String addr : addrList) { - rpc_address rpcAddr = new rpc_address(); - if (rpcAddr.fromString(addr)) { + rpc_address rpcAddr = rpc_address.fromIpPort(addr); + if (rpcAddr != null) { logger.info("add {} as meta server", addr); metaList.add(clusterManager.getReplicaSession(rpcAddr)); } else { @@ -105,9 +105,14 @@ public static rpc_address getMetaServiceForwardAddress(client_operator metaQuery } java.util.List partitions = op.get_response().getPartitions(); - if (partitions == null || partitions.isEmpty()) return null; + if (partitions == null || partitions.isEmpty()) { + return null; + } + addr = partitions.get(0).getPrimary(); - if (addr == null || addr.isInvalid()) return null; + if (addr == null || addr.isInvalid()) { + return null; + } } return addr; diff --git a/java-client/src/test/java/org/apache/pegasus/rpc/async/MetaSessionTest.java b/java-client/src/test/java/org/apache/pegasus/rpc/async/MetaSessionTest.java index 01261757a5..75fc4ee976 100644 --- a/java-client/src/test/java/org/apache/pegasus/rpc/async/MetaSessionTest.java +++ b/java-client/src/test/java/org/apache/pegasus/rpc/async/MetaSessionTest.java @@ -24,7 +24,6 @@ import java.time.Duration; import java.util.*; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import org.apache.commons.lang3.reflect.FieldUtils; @@ -56,9 +55,7 @@ public void before() throws Exception {} @AfterEach public void after() throws Exception { - rpc_address addr = new rpc_address(); - addr.fromString("127.0.0.1:34602"); - Toollet.tryStartServer(addr); + Toollet.tryStartServer(Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34602"))); } private static void ensureNotLeader(rpc_address addr) { @@ -87,22 +84,18 @@ public void testMetaConnect() throws Exception { new ClusterManager(ClientOptions.builder().metaServers(address_list).build()); MetaSession session = manager.getMetaSession(); - rpc_address addr = new rpc_address(); - addr.fromString("127.0.0.1:34602"); + rpc_address addr = Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34602")); ensureNotLeader(addr); - ArrayList> callbacks = new ArrayList>(); + ArrayList> callbacks = new ArrayList<>(); for (int i = 0; i < 1000; ++i) { query_cfg_request req = new query_cfg_request("temp", new ArrayList()); final client_operator op = new query_cfg_operator(new gpid(-1, -1), req); FutureTask callback = - new FutureTask( - new Callable() { - @Override - public Void call() throws Exception { - assertEquals(error_code.error_types.ERR_OK, op.rpc_error.errno); - return null; - } + new FutureTask<>( + () -> { + assertEquals(error_code.error_types.ERR_OK, op.rpc_error.errno); + return null; }); callbacks.add(callback); session.asyncExecute(op, callback, 10); @@ -257,17 +250,9 @@ public void testMetaForwardUnknownPrimary() throws Exception { op.get_response().err = new error_code(); op.get_response().err.errno = error_code.error_types.ERR_FORWARD_TO_OTHERS; op.get_response().partitions = Collections.singletonList(new partition_configuration()); - op.get_response().partitions.set(0, new partition_configuration()); op.get_response().partitions.get(0).primary = rpc_address.fromIpPort("172.0.0.3:34601"); MetaSession.MetaRequestRound round = - new MetaSession.MetaRequestRound( - op, - new Runnable() { - @Override - public void run() {} - }, - 10, - meta.getMetaList().get(0)); + new MetaSession.MetaRequestRound(op, () -> {}, 10, meta.getMetaList().get(0)); // do not retry after a failed QueryMeta. Mockito.doNothing().when(meta).retryQueryMeta(round, false);