Skip to content

Commit

Permalink
[#1472][part-6] fix(netty): Make UTs truly test Netty mode (#1540)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Fix [#1008](#1008). It does not actually test `GRPC_NETTY` mode, because it uses `ShuffleServerGrpcClient` everywhere instead of `ShuffleServerGrpcNettyClient`. 
Setting the shuffle server's tags to `GRPC_NETTY,GRPC` is useless, because we are not using `ShuffleServerGrpcNettyClient` at all.

### Why are the changes needed?

It is a sub PR for: #1519
Also, it is a follow-up PR for: #1008

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.
  • Loading branch information
rickyma authored Feb 26, 2024
1 parent 903113b commit 2c3761b
Show file tree
Hide file tree
Showing 64 changed files with 1,938 additions and 795 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private RssShuffleWriter(
Function<String, Boolean> taskFailureCallback,
ShuffleHandleInfo shuffleHandleInfo,
TaskContext context) {
LOG.warn("RssShuffle start write taskAttemptId data" + taskAttemptId);
LOG.info("RssShuffle start write taskAttemptId data" + taskAttemptId);
this.shuffleManager = shuffleManager;
this.appId = appId;
this.shuffleId = shuffleId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testAbandonEventWhenTaskFailed() {
List<ShuffleBlockInfo> shuffleBlockInfoList =
Lists.newArrayList(
new ShuffleBlockInfo(
0, 0, 10, 10, 10, new byte[] {1}, shuffleServerInfoList, 10, 100, 0));
0, 0, 10, 10, 10, new byte[] {10}, shuffleServerInfoList, 10, 100, 0));

// It should directly exit and wont do rpc request.
Awaitility.await()
Expand Down Expand Up @@ -123,7 +123,7 @@ public void testSendData() {
List<ShuffleBlockInfo> shuffleBlockInfoList =
Lists.newArrayList(
new ShuffleBlockInfo(
0, 0, 10, 10, 10, new byte[] {1}, shuffleServerInfoList, 10, 100, 0));
0, 0, 10, 10, 10, new byte[] {10}, shuffleServerInfoList, 10, 100, 0));
SendShuffleDataResult result =
spyClient.sendShuffleData("appId", shuffleBlockInfoList, () -> false);

Expand Down Expand Up @@ -202,7 +202,7 @@ public void testSendDataWithDefectiveServers() {
List<ShuffleBlockInfo> shuffleBlockInfoList =
Lists.newArrayList(
new ShuffleBlockInfo(
0, 0, 10, 10, 10, new byte[] {1}, shuffleServerInfoList, 10, 100, 0));
0, 0, 10, 10, 10, new byte[] {10}, shuffleServerInfoList, 10, 100, 0));
SendShuffleDataResult result =
spyClient.sendShuffleData(appId, shuffleBlockInfoList, () -> false);
assertEquals(0, result.getFailedBlockIds().size());
Expand Down Expand Up @@ -248,7 +248,7 @@ public void testSendDataWithDefectiveServers() {
List<ShuffleBlockInfo> shuffleBlockInfoList2 =
Lists.newArrayList(
new ShuffleBlockInfo(
0, 0, 10, 10, 10, new byte[] {1}, shuffleServerInfoList2, 10, 100, 0));
0, 0, 10, 10, 10, new byte[] {10}, shuffleServerInfoList2, 10, 100, 0));
result = spyClient.sendShuffleData(appId, shuffleBlockInfoList2, () -> false);
assertEquals(0, result.getFailedBlockIds().size());
assertEquals(1, spyClient.getDefectiveServers().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.List;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;

import org.apache.uniffle.proto.RssProtos;

public class ShuffleServerInfo implements Serializable {
Expand All @@ -33,13 +35,21 @@ public class ShuffleServerInfo implements Serializable {

private int nettyPort = -1;

// Only for test
@VisibleForTesting
public ShuffleServerInfo(String host, int port) {
this.id = host + "-" + port;
this.host = host;
this.grpcPort = port;
}

@VisibleForTesting
public ShuffleServerInfo(String host, int grpcPort, int nettyPort) {
this.id = host + "-" + grpcPort + "-" + nettyPort;
this.host = host;
this.grpcPort = grpcPort;
this.nettyPort = nettyPort;
}

public ShuffleServerInfo(String id, String host, int port) {
this.id = id;
this.host = host;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.LinkedList;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
Expand Down Expand Up @@ -81,6 +82,7 @@ public void channelRead(ChannelHandlerContext ctx, Object data) {
}
}

@VisibleForTesting
static boolean shouldRelease(Message msg) {
if (msg == null || msg.body() == null || msg.body().byteBuf() == null) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.uniffle.client.request.RssAccessClusterRequest;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.AccessManager;
Expand Down Expand Up @@ -146,7 +147,7 @@ public void test(@TempDir File tempDir) throws Exception {
+ "org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker");
createCoordinatorServer(coordinatorConf);

ShuffleServerConf shuffleServerConf = getShuffleServerConf();
ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC);
createShuffleServer(shuffleServerConf);
startServers();
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
Expand All @@ -166,8 +167,10 @@ public void test(@TempDir File tempDir) throws Exception {
assertEquals(StatusCode.ACCESS_DENIED, response.getStatusCode());
assertTrue(response.getMessage().startsWith("Denied by AccessClusterLoadChecker"));

shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 2);
shuffleServerConf.setInteger("rss.jetty.http.port", 18082);
shuffleServerConf.setInteger(
"rss.rpc.server.port", shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + 2);
shuffleServerConf.setInteger(
"rss.jetty.http.port", shuffleServerConf.getInteger(ShuffleServerConf.JETTY_HTTP_PORT) + 1);
ShuffleServer shuffleServer = new ShuffleServer(shuffleServerConf);
shuffleServer.start();
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
Expand Down Expand Up @@ -79,7 +80,7 @@ private static List<Integer> findAvailablePorts(int num) throws IOException {

private static void createAndStartShuffleServerWithTags(Set<String> tags, File tmpDir)
throws Exception {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC);
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);

File dataDir1 = new File(tmpDir, "data1");
Expand Down Expand Up @@ -108,7 +109,7 @@ private static void createAndStartShuffleServerWithTags(Set<String> tags, File t
ports.get(1));

ShuffleServer server = new ShuffleServer(shuffleServerConf);
shuffleServers.add(server);
grpcShuffleServers.add(server);
server.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.ReconfigurableBase;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.SimpleClusterManager;
import org.apache.uniffle.server.ShuffleServer;
Expand Down Expand Up @@ -83,9 +84,9 @@ public static void setupServers(@TempDir File tmpDir) throws Exception {
createCoordinatorServer(coordinatorConf2);

for (int i = 0; i < SERVER_NUM; i++) {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
File dataDir1 = new File(tmpDir, "data1");
String basePath = dataDir1.getAbsolutePath();
ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC);
File dataDir = new File(tmpDir, "data" + i);
String basePath = dataDir.getAbsolutePath();
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
Expand Down Expand Up @@ -252,7 +253,7 @@ public void testGetReShuffleAssignments() {
shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
Set<String> excludeServer = Sets.newConcurrentHashSet();
List<ShuffleServer> excludeShuffleServer =
shuffleServers.stream().limit(3).collect(Collectors.toList());
grpcShuffleServers.stream().limit(3).collect(Collectors.toList());
excludeShuffleServer.stream().map(ss -> ss.getId()).peek(excludeServer::add);
ShuffleAssignmentsInfo shuffleAssignmentsInfo =
shuffleWriteClient.getShuffleAssignments(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.storage.StorageMedia;
Expand Down Expand Up @@ -73,11 +74,13 @@ public static void setupServers() throws Exception {
coordinatorConf.setLong("rss.coordinator.app.expired", 2000);
coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout", 3000);
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC);
shuffleServerConf.remove(ShuffleServerConf.NETTY_SERVER_PORT.key());
createShuffleServer(shuffleServerConf);
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 1);
shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
shuffleServerConf.setInteger(
"rss.rpc.server.port", shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + 1);
shuffleServerConf.setInteger(
"rss.jetty.http.port", shuffleServerConf.getInteger(ShuffleServerConf.JETTY_HTTP_PORT) + 1);
createShuffleServer(shuffleServerConf);
startServers();
}
Expand Down Expand Up @@ -146,8 +149,8 @@ public void getShuffleAssignmentsTest() throws Exception {
// When the shuffleServerHeartbeat Test is completed before the current test,
// the server's tags will be [ss_v4, GRPC_NETTY] and [ss_v4, GRPC], respectively.
// We need to remove the first machine's tag from GRPC_NETTY to GRPC
shuffleServers.get(0).stopServer();
RssConf shuffleServerConf = shuffleServers.get(0).getShuffleServerConf();
grpcShuffleServers.get(0).stopServer();
RssConf shuffleServerConf = grpcShuffleServers.get(0).getShuffleServerConf();
Class<RssConf> clazz = RssConf.class;
Field field = clazz.getDeclaredField("settings");
field.setAccessible(true);
Expand All @@ -161,7 +164,7 @@ public void getShuffleAssignmentsTest() throws Exception {
shuffleServerConf.remove(ShuffleServerConf.NETTY_SERVER_PORT.key());
ShuffleServer ss = new ShuffleServer((ShuffleServerConf) shuffleServerConf);
ss.start();
shuffleServers.set(0, ss);
grpcShuffleServers.set(0, ss);
});
Thread.sleep(5000);
CoordinatorServer coordinatorServer = coordinators.get(0);
Expand Down Expand Up @@ -287,7 +290,7 @@ public void appHeartbeatTest() throws Exception {
@Test
public void shuffleServerHeartbeatTest() throws Exception {
CoordinatorTestUtils.waitForRegister(coordinatorClient, 2);
shuffleServers.get(0).stopServer();
grpcShuffleServers.get(0).stopServer();
Thread.sleep(5000);
SimpleClusterManager scm = (SimpleClusterManager) coordinators.get(0).getClusterManager();
List<ServerNode> nodes = scm.getServerList(Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION));
Expand All @@ -296,15 +299,16 @@ public void shuffleServerHeartbeatTest() throws Exception {
assertEquals(1, node.getStorageInfo().size());
StorageInfo infoHead = node.getStorageInfo().values().iterator().next();
final StorageInfo expectedStorageInfo =
shuffleServers.get(1).getStorageManager().getStorageInfo().values().iterator().next();
grpcShuffleServers.get(1).getStorageManager().getStorageInfo().values().iterator().next();
assertEquals(expectedStorageInfo, infoHead);
assertEquals(StorageStatus.NORMAL, infoHead.getStatus());
assertTrue(node.getTags().contains(Constants.SHUFFLE_SERVER_VERSION));
assertTrue(scm.getTagToNodes().get(Constants.SHUFFLE_SERVER_VERSION).contains(node));
ShuffleServerConf shuffleServerConf = shuffleServers.get(0).getShuffleServerConf();
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 2);
shuffleServerConf.setInteger("rss.jetty.http.port", 18082);
shuffleServerConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT, SHUFFLE_SERVER_PORT + 5);
ShuffleServerConf shuffleServerConf = grpcShuffleServers.get(0).getShuffleServerConf();
shuffleServerConf.setInteger(
"rss.rpc.server.port", shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + 2);
shuffleServerConf.setInteger(
"rss.jetty.http.port", shuffleServerConf.getInteger(ShuffleServerConf.JETTY_HTTP_PORT) + 1);
shuffleServerConf.set(ShuffleServerConf.STORAGE_MEDIA_PROVIDER_ENV_KEY, "RSS_ENV_KEY");
String baseDir = shuffleServerConf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH).get(0);
String storageTypeJsonSource = String.format("{\"%s\": \"ssd\"}", baseDir);
Expand All @@ -315,15 +319,14 @@ public void shuffleServerHeartbeatTest() throws Exception {
shuffleServerConf.set(ShuffleServerConf.TAGS, Lists.newArrayList("SSD"));
ShuffleServer ss = new ShuffleServer(shuffleServerConf);
ss.start();
shuffleServers.set(0, ss);
grpcShuffleServers.set(0, ss);
});
Thread.sleep(3000);
assertEquals(2, coordinators.get(0).getClusterManager().getNodesNum());
nodes = scm.getServerList(Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION, "SSD"));
assertEquals(1, nodes.size());
ServerNode ssdNode = nodes.get(0);
infoHead = ssdNode.getStorageInfo().values().iterator().next();
assertEquals(SHUFFLE_SERVER_PORT + 5, ssdNode.getNettyPort());
assertEquals(StorageMedia.SSD, infoHead.getType());

scm.close();
Expand Down
Loading

0 comments on commit 2c3761b

Please sign in to comment.