Skip to content

Commit

Permalink
Fix QuorumTest to use random port.
Browse files Browse the repository at this point in the history
  • Loading branch information
maobaolong committed Aug 20, 2024
1 parent 13fc984 commit 1335e5d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class CoordinatorServer {
private GRPCMetrics grpcMetrics;
private MetricReporter metricReporter;
private String id;
private int rpcListenPort;

public CoordinatorServer(CoordinatorConf coordinatorConf) throws Exception {
this.startTimeMs = System.currentTimeMillis();
Expand Down Expand Up @@ -106,7 +107,7 @@ public void start() throws Exception {
LOG.info(
"{} version: {}", this.getClass().getSimpleName(), Constants.VERSION_AND_REVISION_SHORT);
jettyServer.start();
server.start();
rpcListenPort = server.start();
if (metricReporter != null) {
metricReporter.start();
}
Expand Down Expand Up @@ -280,4 +281,8 @@ protected void blockUntilShutdown() throws InterruptedException {
public long getStartTimeMs() {
return startTimeMs;
}

public int getRpcListenPort() {
return rpcListenPort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,32 +85,45 @@ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
.readBufferSize(1000);
}

public static MockedShuffleServer createServer(int id, File tmpDir) throws Exception {
public static MockedShuffleServer createServer(int id, File tmpDir, int coordinatorRpcPort)
throws Exception {
ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC);
shuffleServerConf.setInteger("rss.rpc.server.port", 0);
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000);
shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
File dataDir1 = new File(tmpDir, id + "_1");
File dataDir2 = new File(tmpDir, id + "_2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
shuffleServerConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE.name());
shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100);
shuffleServerConf.setInteger("rss.jetty.http.port", 0);
shuffleServerConf.setString("rss.storage.basePath", basePath);
shuffleServerConf.setString("rss.coordinator.quorum", LOCALHOST + ":" + coordinatorRpcPort);
return new MockedShuffleServer(shuffleServerConf);
}

@BeforeEach
public void initCluster(@TempDir File tmpDir) throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 0);
coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 0);
createCoordinatorServer(coordinatorConf);

for (CoordinatorServer coordinator : coordinators) {
coordinator.start();
}

ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC);
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000);

grpcShuffleServers.add(createServer(0, tmpDir));
grpcShuffleServers.add(createServer(1, tmpDir));
grpcShuffleServers.add(createServer(2, tmpDir));
grpcShuffleServers.add(createServer(3, tmpDir));
grpcShuffleServers.add(createServer(4, tmpDir));
grpcShuffleServers.add(createServer(0, tmpDir, coordinators.get(0).getRpcListenPort()));
grpcShuffleServers.add(createServer(1, tmpDir, coordinators.get(0).getRpcListenPort()));
grpcShuffleServers.add(createServer(2, tmpDir, coordinators.get(0).getRpcListenPort()));
grpcShuffleServers.add(createServer(3, tmpDir, coordinators.get(0).getRpcListenPort()));
grpcShuffleServers.add(createServer(4, tmpDir, coordinators.get(0).getRpcListenPort()));

for (ShuffleServer shuffleServer : grpcShuffleServers) {
shuffleServer.start();
}

shuffleServerInfo0 =
new ShuffleServerInfo(
Expand All @@ -137,12 +150,6 @@ public void initCluster(@TempDir File tmpDir) throws Exception {
String.format("127.0.0.1-%s", grpcShuffleServers.get(4).getGrpcPort()),
grpcShuffleServers.get(4).getIp(),
grpcShuffleServers.get(4).getGrpcPort());
for (CoordinatorServer coordinator : coordinators) {
coordinator.start();
}
for (ShuffleServer shuffleServer : grpcShuffleServers) {
shuffleServer.start();
}

// simulator of failed servers
fakedShuffleServerInfo0 =
Expand Down Expand Up @@ -643,7 +650,7 @@ public void case5(@TempDir File tmpDir) throws Exception {

// when one server is restarted, getShuffleResult should success
grpcShuffleServers.get(1).stopServer();
grpcShuffleServers.set(1, createServer(1, tmpDir));
grpcShuffleServers.set(1, createServer(1, tmpDir, coordinators.get(0).getRpcListenPort()));
grpcShuffleServers.get(1).start();
report =
shuffleWriteClientImpl.getShuffleResult(
Expand All @@ -656,7 +663,7 @@ public void case5(@TempDir File tmpDir) throws Exception {

// when two servers are restarted, getShuffleResult should fail
grpcShuffleServers.get(2).stopServer();
grpcShuffleServers.set(2, createServer(2, tmpDir));
grpcShuffleServers.set(2, createServer(2, tmpDir, coordinators.get(0).getRpcListenPort()));
grpcShuffleServers.get(2).start();
try {
report =
Expand Down

0 comments on commit 1335e5d

Please sign in to comment.