diff --git a/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java b/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java index e42d86a1ef..df8acb9d9b 100644 --- a/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java +++ b/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java @@ -167,7 +167,7 @@ public void start() throws Exception { try { server.start(); } catch (BindException e) { - ExitUtils.terminate(1, "Fail to start jetty http server", e, LOG); + ExitUtils.terminate(1, "Fail to start jetty http server with port " + getHttpPort(), e, LOG); } LOG.info("Jetty http server started, listening on port {}", httpPort); } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java index 847afda837..74d34bac60 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java @@ -72,6 +72,7 @@ public class CoordinatorServer { private GRPCMetrics grpcMetrics; private MetricReporter metricReporter; private String id; + private int rpcListenPort; public CoordinatorServer(CoordinatorConf coordinatorConf) throws Exception { this.startTimeMs = System.currentTimeMillis(); @@ -106,7 +107,7 @@ public void start() throws Exception { LOG.info( "{} version: {}", this.getClass().getSimpleName(), Constants.VERSION_AND_REVISION_SHORT); jettyServer.start(); - server.start(); + rpcListenPort = server.start(); if (metricReporter != null) { metricReporter.start(); } @@ -280,4 +281,8 @@ protected void blockUntilShutdown() throws InterruptedException { public long getStartTimeMs() { return startTimeMs; } + + public int getRpcListenPort() { + return rpcListenPort; + } } diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java index 35037470b1..4fc51ee339 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java @@ -85,32 +85,44 @@ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() { .readBufferSize(1000); } - public static MockedShuffleServer createServer(int id, File tmpDir) throws Exception { + public static MockedShuffleServer createServer(int id, File tmpDir, int coordinatorRpcPort) throws Exception { ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC); + shuffleServerConf.setInteger("rss.rpc.server.port", 0); shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000); shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000); File dataDir1 = new File(tmpDir, id + "_1"); File dataDir2 = new File(tmpDir, id + "_2"); String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); shuffleServerConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE.name()); - shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100); + shuffleServerConf.setInteger("rss.jetty.http.port", 0); shuffleServerConf.setString("rss.storage.basePath", basePath); + shuffleServerConf.setString("rss.coordinator.quorum", LOCALHOST + ":" + coordinatorRpcPort); return new MockedShuffleServer(shuffleServerConf); } @BeforeEach public void initCluster(@TempDir File tmpDir) throws Exception { CoordinatorConf coordinatorConf = getCoordinatorConf(); + coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 0); + coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 0); createCoordinatorServer(coordinatorConf); + for (CoordinatorServer coordinator : coordinators) { + coordinator.start(); + } + ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC); shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000); - grpcShuffleServers.add(createServer(0, tmpDir)); - grpcShuffleServers.add(createServer(1, tmpDir)); - grpcShuffleServers.add(createServer(2, tmpDir)); - grpcShuffleServers.add(createServer(3, tmpDir)); - grpcShuffleServers.add(createServer(4, tmpDir)); + grpcShuffleServers.add(createServer(0, tmpDir, coordinators.get(0).getRpcListenPort())); + grpcShuffleServers.add(createServer(1, tmpDir, coordinators.get(0).getRpcListenPort())); + grpcShuffleServers.add(createServer(2, tmpDir, coordinators.get(0).getRpcListenPort())); + grpcShuffleServers.add(createServer(3, tmpDir, coordinators.get(0).getRpcListenPort())); + grpcShuffleServers.add(createServer(4, tmpDir, coordinators.get(0).getRpcListenPort())); + + for (ShuffleServer shuffleServer : grpcShuffleServers) { + shuffleServer.start(); + } shuffleServerInfo0 = new ShuffleServerInfo( @@ -137,12 +149,6 @@ public void initCluster(@TempDir File tmpDir) throws Exception { String.format("127.0.0.1-%s", grpcShuffleServers.get(4).getGrpcPort()), grpcShuffleServers.get(4).getIp(), grpcShuffleServers.get(4).getGrpcPort()); - for (CoordinatorServer coordinator : coordinators) { - coordinator.start(); - } - for (ShuffleServer shuffleServer : grpcShuffleServers) { - shuffleServer.start(); - } // simulator of failed servers fakedShuffleServerInfo0 = @@ -643,7 +649,7 @@ public void case5(@TempDir File tmpDir) throws Exception { // when one server is restarted, getShuffleResult should success grpcShuffleServers.get(1).stopServer(); - grpcShuffleServers.set(1, createServer(1, tmpDir)); + grpcShuffleServers.set(1, createServer(1, tmpDir, coordinators.get(0).getRpcListenPort())); grpcShuffleServers.get(1).start(); report = shuffleWriteClientImpl.getShuffleResult( @@ -656,7 +662,7 @@ public void case5(@TempDir File tmpDir) throws Exception { // when two servers are restarted, getShuffleResult should fail grpcShuffleServers.get(2).stopServer(); - grpcShuffleServers.set(2, createServer(2, tmpDir)); + grpcShuffleServers.set(2, createServer(2, tmpDir, coordinators.get(0).getRpcListenPort())); grpcShuffleServers.get(2).start(); try { report =