diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index 36d80b6e57..63b4666d14 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -317,4 +317,10 @@ public class RssClientConf { .asList() .noDefaultValue() .withDescription("the report include properties could be configured by this option"); + + public static final ConfigOption RSS_CLIENT_GRPC_EVENT_LOOP_THREADS = + ConfigOptions.key("rss.client.grpc.nettyEventLoopThreads") + .intType() + .defaultValue(-1) + .withDescription("the event loop threads of netty impl for grpc"); } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java index 2f5b09c789..1d5d167f10 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java @@ -37,7 +37,7 @@ public abstract class GrpcClient { protected ManagedChannel channel; protected GrpcClient(String host, int port, int maxRetryAttempts, boolean usePlaintext) { - this(host, port, maxRetryAttempts, usePlaintext, 0, 0, 0); + this(host, port, maxRetryAttempts, usePlaintext, 0, 0, 0, -1); } protected GrpcClient( @@ -47,12 +47,18 @@ protected GrpcClient( boolean usePlaintext, int pageSize, int maxOrder, - int smallCacheSize) { + int smallCacheSize, + int nettyEventLoopThreads) { this.host = host; this.port = port; this.maxRetryAttempts = maxRetryAttempts; this.usePlaintext = usePlaintext; + if (nettyEventLoopThreads > 0) { + System.setProperty( + "io.grpc.netty.shaded.io.netty.eventLoopThreads", String.valueOf(nettyEventLoopThreads)); + } + NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port) .withOption( diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 399c215479..4e2aee1c6b 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -118,6 +118,7 @@ import org.apache.uniffle.proto.ShuffleServerGrpc; import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerBlockingStub; +import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_GRPC_EVENT_LOOP_THREADS; import static org.apache.uniffle.proto.RssProtos.StatusCode.NO_BUFFER; public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServerClient { @@ -155,7 +156,8 @@ public ShuffleServerGrpcClient(String host, int port) { true, 0, 0, - 0); + 0, + -1); } public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) { @@ -171,7 +173,8 @@ public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) { true, 0, 0, - 0); + 0, + rssConf.get(RSS_CLIENT_GRPC_EVENT_LOOP_THREADS)); } public ShuffleServerGrpcClient( @@ -182,8 +185,17 @@ public ShuffleServerGrpcClient( boolean usePlaintext, int pageSize, int maxOrder, - int smallCacheSize) { - super(host, port, maxRetryAttempts, usePlaintext, pageSize, maxOrder, smallCacheSize); + int smallCacheSize, + int nettyEventLoopThreads) { + super( + host, + port, + maxRetryAttempts, + usePlaintext, + pageSize, + maxOrder, + smallCacheSize, + nettyEventLoopThreads); blockingStub = ShuffleServerGrpc.newBlockingStub(channel); rpcTimeout = rpcTimeoutMs; } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java index fbc4e363bc..78732733b2 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java @@ -64,6 +64,8 @@ import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.RetryUtils; +import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_GRPC_EVENT_LOOP_THREADS; + public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { private static final Logger LOG = LoggerFactory.getLogger(ShuffleServerGrpcNettyClient.class); private int nettyPort; @@ -107,7 +109,16 @@ public ShuffleServerGrpcNettyClient( int pageSize, int maxOrder, int smallCacheSize) { - super(host, grpcPort, maxRetryAttempts, rpcTimeoutMs, true, pageSize, maxOrder, smallCacheSize); + super( + host, + grpcPort, + maxRetryAttempts, + rpcTimeoutMs, + true, + pageSize, + maxOrder, + smallCacheSize, + rssConf.get(RSS_CLIENT_GRPC_EVENT_LOOP_THREADS)); this.nettyPort = nettyPort; TransportContext transportContext = new TransportContext(new TransportConf(rssConf)); this.clientFactory = new TransportClientFactory(transportContext);