Skip to content

Commit

Permalink
[#2354] feat(client): Explicitly setting grpc netty based event loop …
Browse files Browse the repository at this point in the history
…threads to avoid too much threads
  • Loading branch information
Junfan Zhang committed Jan 27, 2025
1 parent 3590940 commit 9f8ab64
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,10 @@ public class RssClientConf {
.asList()
.noDefaultValue()
.withDescription("the report include properties could be configured by this option");

public static final ConfigOption<Integer> RSS_CLIENT_GRPC_EVENT_LOOP_THREADS =
ConfigOptions.key("rss.client.grpc.nettyEventLoopThreads")
.intType()
.defaultValue(-1)
.withDescription("the event loop threads of netty impl for grpc");
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract class GrpcClient {
protected ManagedChannel channel;

protected GrpcClient(String host, int port, int maxRetryAttempts, boolean usePlaintext) {
this(host, port, maxRetryAttempts, usePlaintext, 0, 0, 0);
this(host, port, maxRetryAttempts, usePlaintext, 0, 0, 0, -1);
}

protected GrpcClient(
Expand All @@ -47,12 +47,18 @@ protected GrpcClient(
boolean usePlaintext,
int pageSize,
int maxOrder,
int smallCacheSize) {
int smallCacheSize,
int nettyEventLoopThreads) {
this.host = host;
this.port = port;
this.maxRetryAttempts = maxRetryAttempts;
this.usePlaintext = usePlaintext;

if (nettyEventLoopThreads > 0) {
System.setProperty(
"io.grpc.netty.shaded.io.netty.eventLoopThreads", String.valueOf(nettyEventLoopThreads));
}

NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(host, port)
.withOption(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import org.apache.uniffle.proto.ShuffleServerGrpc;
import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerBlockingStub;

import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_GRPC_EVENT_LOOP_THREADS;
import static org.apache.uniffle.proto.RssProtos.StatusCode.NO_BUFFER;

public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServerClient {
Expand Down Expand Up @@ -155,7 +156,8 @@ public ShuffleServerGrpcClient(String host, int port) {
true,
0,
0,
0);
0,
-1);
}

public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) {
Expand All @@ -171,7 +173,8 @@ public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) {
true,
0,
0,
0);
0,
rssConf.get(RSS_CLIENT_GRPC_EVENT_LOOP_THREADS));
}

public ShuffleServerGrpcClient(
Expand All @@ -182,8 +185,17 @@ public ShuffleServerGrpcClient(
boolean usePlaintext,
int pageSize,
int maxOrder,
int smallCacheSize) {
super(host, port, maxRetryAttempts, usePlaintext, pageSize, maxOrder, smallCacheSize);
int smallCacheSize,
int nettyEventLoopThreads) {
super(
host,
port,
maxRetryAttempts,
usePlaintext,
pageSize,
maxOrder,
smallCacheSize,
nettyEventLoopThreads);
blockingStub = ShuffleServerGrpc.newBlockingStub(channel);
rpcTimeout = rpcTimeoutMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RetryUtils;

import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_GRPC_EVENT_LOOP_THREADS;

public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleServerGrpcNettyClient.class);
private int nettyPort;
Expand Down Expand Up @@ -107,7 +109,16 @@ public ShuffleServerGrpcNettyClient(
int pageSize,
int maxOrder,
int smallCacheSize) {
super(host, grpcPort, maxRetryAttempts, rpcTimeoutMs, true, pageSize, maxOrder, smallCacheSize);
super(
host,
grpcPort,
maxRetryAttempts,
rpcTimeoutMs,
true,
pageSize,
maxOrder,
smallCacheSize,
rssConf.get(RSS_CLIENT_GRPC_EVENT_LOOP_THREADS));
this.nettyPort = nettyPort;
TransportContext transportContext = new TransportContext(new TransportConf(rssConf));
this.clientFactory = new TransportClientFactory(transportContext);
Expand Down

0 comments on commit 9f8ab64

Please sign in to comment.