Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add recent netty bundle changes #29205

Open
wants to merge 4 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2011, 2023 IBM Corporation and others.
* Copyright (c) 2011, 2024 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand Down Expand Up @@ -85,10 +85,6 @@ public class NettyInboundChain implements InboundChain{

private ChainConfiguration _currentConfig;

/**
* The TCP based bootstrap.
*/
// private ServerBootstrapExtended serverBootstrap;
/** The bootstrap this object wraps */
private ServerBootstrapExtended bootstrap;
private Channel serverChan;
Expand Down Expand Up @@ -176,9 +172,7 @@ public void stopChannel(boolean closeGroups) {
//stopchain() first quiesce's(invokes chainQuiesced) depending on the chainQuiesceTimeOut
//Once the chain is quiesced StopChainTask is initiated.Hence we block until the actual stopChain is invoked
try {
ChannelFuture future = _nettyFramework.stop(serverChan);
if(future != null)
future.await(_nettyFramework.getDefaultChainQuiesceTimeout(), TimeUnit.MILLISECONDS); //BLOCK till stopChain actually completes from StopChainTask
_nettyFramework.stop(serverChan, -1);
} catch (Exception e) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled())
SibTr.debug(tc, "Failed in successfully cleaning(i.e stopping/destorying/removing) chain: ", e);
Expand Down Expand Up @@ -299,7 +293,7 @@ public void update() {
}
bootstrap.childHandler(new JMSServerInitializer(bootstrap.getBaseInitializer(), this));
NettyInboundChain parent = this;
this.channelFuture = _nettyFramework.start(bootstrap, ep.getHost(), ep.getPort(), f ->{
this.serverChan = _nettyFramework.start(bootstrap, ep.getHost(), ep.getPort(), f ->{
if (f.isCancelled() || !f.isSuccess()) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
SibTr.debug(this, tc, "Channel exception during connect: " + f.cause().getMessage());
Expand All @@ -309,7 +303,6 @@ public void update() {
}else {
if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) SibTr.entry(parent, tc, "ready", f);
Channel chan = f.channel();
parent.serverChan = chan;
f.addListener(innerFuture -> {
if (innerFuture.isCancelled() || !innerFuture.isSuccess()) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021, 2023 IBM Corporation and others.
* Copyright (c) 2021, 2024 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand Down Expand Up @@ -28,5 +28,7 @@ public interface NettyConstants {
public final String MAX_OPEN_CONNECTIONS_HANDLER_NAME = "maxConnectionHandler";
/** Max Connections Handler Name */
public final String ACCESSLIST_HANDLER_NAME = "accessListHandler";
/** Netty enablement */
String USE_NETTY = "useNettyTransport";

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021, 2023 IBM Corporation and others.
* Copyright (c) 2021, 2024 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand All @@ -14,6 +14,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class NettyFrameworkImpl implements ServerQuiesceListener, NettyFramework

/** server started logic borrowed from CHFWBundle */
private static AtomicBoolean serverCompletelyStarted = new AtomicBoolean(false);
private static Queue<FutureTask<?>> serverStartedTasks = new LinkedBlockingQueue<>();
private static Queue<FutureTask<ChannelFuture>> serverStartedTasks = new LinkedBlockingQueue<>();
private static Object syncStarted = new Object() {
}; // use brackets/inner class to make lock appear in dumps using class name

Expand Down Expand Up @@ -309,27 +310,67 @@ protected void setServerStarted(ServiceReference<ServerStarted> ref) {
// output. Use this signal to run tasks, mostly likely tasks that will
// finish the port listening logic, that need to run at the end of server
// startup

FutureTask<?> task;
FutureTask<ChannelFuture> task;
CountDownLatch latch = new CountDownLatch(serverStartedTasks.size());
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(this, tc, "Netty Framework signaled- Server Completely Started signal received");
}
while ((task = serverStartedTasks.poll()) != null) {
try {
if(!task.isCancelled())
executorService.submit(task);
} catch (Exception e) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "caught exception performing late cycle server startup task: " + e);
synchronized (syncStarted) {
while ((task = serverStartedTasks.poll()) != null) {
try {
if(!task.isCancelled()) {
executorService.submit(new StartTaskRunnable(task, latch));
}else
latch.countDown();
} catch (Exception e) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "caught exception performing late cycle server startup task: " + e);
}
}
}
}

synchronized (syncStarted) {

try {
latch.await();
} catch (InterruptedException e) {
// Verify how to handle exception
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "caught exception performing late cycle server startup task: " + e);
}
throw new RuntimeException(e);
}

serverCompletelyStarted.set(true);
isActive = true;
syncStarted.notifyAll();
}
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(this, tc, "Netty Framework - Server Completely Started completed");
}
}

private class StartTaskRunnable implements Runnable{

private FutureTask<ChannelFuture> task;
private CountDownLatch latch;

public StartTaskRunnable(FutureTask<ChannelFuture> task, CountDownLatch latch) {
this.task = task;
this.latch = latch;
}

@Override
public void run() {
task.run();
try {
task.get(getDefaultChainQuiesceTimeout(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "caught exception performing startup task: " + e);
}
}
latch.countDown();
}

}

/**
Expand All @@ -342,13 +383,13 @@ protected void setServerStarted(ServiceReference<ServerStarted> ref) {
* the task to denote it has ran.
* @throws Exception
*/
public <T> FutureTask<T> runWhenServerStarted(Callable<T> callable) throws Exception {
public FutureTask<ChannelFuture> runWhenServerStarted(Callable<ChannelFuture> callable) throws Exception {
synchronized (syncStarted) {
FutureTask<T> future = new FutureTask<T>(callable);
FutureTask<ChannelFuture> future = new FutureTask<ChannelFuture>(callable);
if (!serverCompletelyStarted.get()) {
serverStartedTasks.add(future);
}else {
this.executorService.submit(future);
} else {
this.executorService.submit(future);
}
return future;
}
Expand Down Expand Up @@ -388,14 +429,16 @@ public static boolean isServerCompletelyStarted() {

@Override
public void registerEndpointQuiesce(Channel chan, Callable quiesce) {
if(chan != null && getActiveChannelsMap().containsKey(chan)) {
ChannelHandler quiesceHandler = new QuiesceHandler(quiesce);
chan.pipeline().addLast(quiesceHandler);
}else {
if (TraceComponent.isAnyTracingEnabled() && tc.isWarningEnabled()) {
Tr.warning(tc, "Attempted to add a Quiesce Task to a channel which is not an endpoint. Quiesce will not be added and will be ignored.");
synchronized (activeChannelMap) {
if(chan != null && getActiveChannelsMap().containsKey(chan)) {
ChannelHandler quiesceHandler = new QuiesceHandler(quiesce);
chan.pipeline().addLast(quiesceHandler);
} else {
if (TraceComponent.isAnyTracingEnabled() && tc.isWarningEnabled()) {
Tr.warning(tc, "Attempted to add a Quiesce Task to a channel which is not an endpoint. Quiesce will not be added and will be ignored.");
}
}
}
}
}

/**
Expand Down Expand Up @@ -429,19 +472,20 @@ public BootstrapExtended createUDPBootstrapOutbound(Map<String, Object> options)
}

@Override
public FutureTask<ChannelFuture> start(ServerBootstrapExtended bootstrap, String inetHost, int inetPort,
public Channel start(ServerBootstrapExtended bootstrap, String inetHost, int inetPort,
ChannelFutureListener bindListener) throws NettyException {
return TCPUtils.start(this, bootstrap, inetHost, inetPort, bindListener);
}


@Override
public FutureTask<ChannelFuture> start(BootstrapExtended bootstrap, String inetHost, int inetPort,
public Channel start(BootstrapExtended bootstrap, String inetHost, int inetPort,
ChannelFutureListener bindListener) throws NettyException {
return UDPUtils.start(this, bootstrap, inetHost, inetPort, bindListener);
}

@Override
public FutureTask<ChannelFuture> startOutbound(BootstrapExtended bootstrap, String inetHost, int inetPort,
public Channel startOutbound(BootstrapExtended bootstrap, String inetHost, int inetPort,
ChannelFutureListener bindListener) throws NettyException {
if (bootstrap.getConfiguration() instanceof TCPConfigurationImpl) {
return TCPUtils.startOutbound(this, bootstrap, inetHost, inetPort, bindListener);
Expand All @@ -452,35 +496,41 @@ public FutureTask<ChannelFuture> startOutbound(BootstrapExtended bootstrap, Stri

@Override
public ChannelFuture stop(Channel channel) {
ChannelGroup group = activeChannelMap.get(channel);
if(group != null) {
group.close().addListener(innerFuture -> {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "channel group" + group + " has closed...");
}
});
activeChannelMap.remove(channel);
}
return channel.close();
synchronized (activeChannelMap) {
ChannelGroup group = activeChannelMap.get(channel);
if(group != null) {
group.close().addListener(innerFuture -> {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "channel group" + group + " has closed...");
}
});
activeChannelMap.remove(channel);
}
return channel.close();
}
}

@Override
public void stop(Channel channel, long timeout) {
if (timeout == -1) {
timeout = getDefaultChainQuiesceTimeout();
}
ChannelFuture future = stop(channel);
if (future != null) {
future.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);
}
}
if (timeout == -1) {
timeout = getDefaultChainQuiesceTimeout();
}
ChannelFuture future;

synchronized(activeChannelMap) {
future = stop(channel);
}
if (future != null) {
future.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);
}
}


@Override
public Set<Channel> getActiveChannels() {
return activeChannelMap.keySet();
}


public Map<Channel, ChannelGroup> getActiveChannelsMap() {
return activeChannelMap;
Expand All @@ -506,45 +556,45 @@ public void destroy() {

@Override
public String toString() {
StringBuffer buf = new StringBuffer();
buf.append("NettyFrameworkImpl@").append(Integer.toHexString(System.identityHashCode(this)));
buf.append(": {");
buf.append("Parent Group: ");
buf.append(getParentGroup());
if(getParentGroup() != null) {
buf.append(" isShuttingDown? ");
buf.append(getParentGroup().isShuttingDown());
buf.append(" isShutDown? ");
buf.append(getParentGroup().isShutdown());
buf.append(" isTerminated? ");
buf.append(getParentGroup().isTerminated());
}
buf.append(", Child Group: ");
buf.append(getChildGroup());
if(getChildGroup() != null) {
buf.append(" isShuttingDown? ");
buf.append(getChildGroup().isShuttingDown());
buf.append(" isShutDown? ");
buf.append(getChildGroup().isShutdown());
buf.append(" isTerminated? ");
buf.append(getChildGroup().isTerminated());
}
buf.append(", EndpointManager: ");
buf.append(getEndpointManager());
buf.append(", Default Chain Quiesce Timeout: ");
buf.append(getDefaultChainQuiesceTimeout());
buf.append(", Outbound Connections: ");
buf.append(getOutboundConnections());
buf.append(", Active Endpoints: ");
buf.append(getActiveChannels());
buf.append(", Active endpoint maps: ");
buf.append(getActiveChannelsMap());
buf.append(", Is Active: ");
buf.append(isActive());
buf.append(", Is Stopping: ");
buf.append(isStopping());
buf.append("}");
return buf.toString();
StringBuffer buf = new StringBuffer();
buf.append("NettyFrameworkImpl@").append(Integer.toHexString(System.identityHashCode(this)));
buf.append(": {");
buf.append("Parent Group: ");
buf.append(getParentGroup());
if(getParentGroup() != null) {
buf.append(" isShuttingDown? ");
buf.append(getParentGroup().isShuttingDown());
buf.append(" isShutDown? ");
buf.append(getParentGroup().isShutdown());
buf.append(" isTerminated? ");
buf.append(getParentGroup().isTerminated());
}
buf.append(", Child Group: ");
buf.append(getChildGroup());
if(getChildGroup() != null) {
buf.append(" isShuttingDown? ");
buf.append(getChildGroup().isShuttingDown());
buf.append(" isShutDown? ");
buf.append(getChildGroup().isShutdown());
buf.append(" isTerminated? ");
buf.append(getChildGroup().isTerminated());
}
buf.append(", EndpointManager: ");
buf.append(getEndpointManager());
buf.append(", Default Chain Quiesce Timeout: ");
buf.append(getDefaultChainQuiesceTimeout());
buf.append(", Outbound Connections: ");
buf.append(getOutboundConnections());
buf.append(", Active Endpoints: ");
buf.append(getActiveChannels());
buf.append(", Active endpoint maps: ");
buf.append(getActiveChannelsMap());
buf.append(", Is Active: ");
buf.append(isActive());
buf.append(", Is Stopping: ");
buf.append(isStopping());
buf.append("}");
return buf.toString();
}

public EventLoopGroup getParentGroup() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021 IBM Corporation and others.
* Copyright (c) 2021, 2024 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand Down Expand Up @@ -44,6 +44,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "connection closed due to idle timeout: " + ctx.channel());
}
return;
}
ctx.fireUserEventTriggered(evt);
}
}
Loading