diff --git a/src/main/java/netty/listeners/ChannelLifecycleListener.java b/src/main/java/netty/listeners/ChannelLifecycleListener.java new file mode 100644 index 0000000..597364e --- /dev/null +++ b/src/main/java/netty/listeners/ChannelLifecycleListener.java @@ -0,0 +1,13 @@ +package netty.listeners; + +import io.netty.channel.Channel; +import org.jgroups.stack.IpAddress; + +/*** + * @author Baizel Mathew + */ +public interface ChannelLifecycleListener { + void channelInactive(Channel channel); + + void channelRead(Channel channel, IpAddress sender); +} diff --git a/src/main/java/org/jgroups/blocks/cs/netty/NettyReceiverCallback.java b/src/main/java/netty/listeners/NettyReceiverListener.java similarity index 67% rename from src/main/java/org/jgroups/blocks/cs/netty/NettyReceiverCallback.java rename to src/main/java/netty/listeners/NettyReceiverListener.java index 5367d25..46d2977 100644 --- a/src/main/java/org/jgroups/blocks/cs/netty/NettyReceiverCallback.java +++ b/src/main/java/netty/listeners/NettyReceiverListener.java @@ -1,11 +1,12 @@ -package org.jgroups.blocks.cs.netty; +package netty.listeners; import org.jgroups.Address; /*** * @author Baizel Mathew */ -public interface NettyReceiverCallback { +public interface NettyReceiverListener { void onReceive(Address sender, byte[] msg, int offset, int length); + void onError(Throwable ex); -} +} \ No newline at end of file diff --git a/src/main/java/netty/utils/PipelineChannelInitializer.java b/src/main/java/netty/utils/PipelineChannelInitializer.java new file mode 100644 index 0000000..46a270f --- /dev/null +++ b/src/main/java/netty/utils/PipelineChannelInitializer.java @@ -0,0 +1,36 @@ +package netty.utils; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.flush.FlushConsolidationHandler; +import io.netty.util.concurrent.EventExecutorGroup; +import netty.listeners.ChannelLifecycleListener; +import netty.listeners.NettyReceiverListener; + +/*** + * @author Baizel Mathew + */ +public class PipelineChannelInitializer extends ChannelInitializer { + private NettyReceiverListener nettyReceiverListener; + private ChannelLifecycleListener lifecycleListener; + private EventExecutorGroup separateWorkerGroup; + + public final int MAX_FRAME_LENGTH = Integer.MAX_VALUE; // not sure if this is a great idea + public final int LENGTH_OF_FIELD = Integer.BYTES; + + public PipelineChannelInitializer(NettyReceiverListener nettyReceiverListener, ChannelLifecycleListener lifecycleListener, EventExecutorGroup separateWorkerGroup) { + this.nettyReceiverListener = nettyReceiverListener; + this.lifecycleListener = lifecycleListener; + this.separateWorkerGroup = separateWorkerGroup; + } + + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addFirst(new FlushConsolidationHandler(1000 * 32, true));//outbound and inbound (1) + ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, LENGTH_OF_FIELD));//inbound head (2) + ch.pipeline().addLast(separateWorkerGroup, "handlerThread", new ReceiverHandler(nettyReceiverListener, lifecycleListener)); // (4) + // inbound ---> 1, 2, 4 + // outbound --> 1 + } +} diff --git a/src/main/java/netty/utils/ReceiverHandler.java b/src/main/java/netty/utils/ReceiverHandler.java new file mode 100644 index 0000000..11d8664 --- /dev/null +++ b/src/main/java/netty/utils/ReceiverHandler.java @@ -0,0 +1,60 @@ +package netty.utils; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import netty.listeners.ChannelLifecycleListener; +import netty.listeners.NettyReceiverListener; +import org.jgroups.stack.IpAddress; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; + +/*** + * @author Baizel Mathew + */ +@ChannelHandler.Sharable +public class ReceiverHandler extends ChannelInboundHandlerAdapter { + private NettyReceiverListener nettyReceiverListener; + private ChannelLifecycleListener lifecycleListener; + private byte[] buffer = new byte[65200]; + + public ReceiverHandler(NettyReceiverListener nettyReceiverListener, ChannelLifecycleListener lifecycleListener) { + super(); + this.nettyReceiverListener = nettyReceiverListener; + this.lifecycleListener = lifecycleListener; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ByteBuf msgbuf = (ByteBuf) msg; + int totalLength = msgbuf.readInt(); + int addrLen = msgbuf.readInt(); + int dataLen = totalLength - Integer.BYTES - addrLen; + + msgbuf.readBytes(buffer, 0, addrLen); + msgbuf.readBytes(buffer, addrLen, dataLen); + + IpAddress sender = new IpAddress(); + sender.readFrom(new DataInputStream(new ByteArrayInputStream(buffer, 0, addrLen))); + synchronized (this) { + nettyReceiverListener.onReceive(sender, buffer, addrLen, dataLen); + } + msgbuf.release(); + lifecycleListener.channelRead(ctx.channel(), sender); + + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + lifecycleListener.channelInactive(ctx.channel()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + nettyReceiverListener.onError(cause); + } + +} + diff --git a/src/main/java/org/jgroups/blocks/cs/netty/NettyClient.java b/src/main/java/org/jgroups/blocks/cs/netty/NettyClient.java deleted file mode 100644 index bd65fe8..0000000 --- a/src/main/java/org/jgroups/blocks/cs/netty/NettyClient.java +++ /dev/null @@ -1,140 +0,0 @@ -//package org.jgroups.blocks.cs.netty; -// -//import io.netty.bootstrap.Bootstrap; -//import io.netty.buffer.PooledByteBufAllocator; -//import io.netty.channel.*; -//import io.netty.channel.epoll.EpollServerDomainSocketChannel; -//import io.netty.channel.epoll.EpollSocketChannel; -//import io.netty.channel.group.ChannelGroup; -//import io.netty.channel.group.DefaultChannelGroup; -//import io.netty.channel.socket.SocketChannel; -//import io.netty.channel.socket.SocketChannelConfig; -//import io.netty.channel.socket.nio.NioSocketChannel; -//import io.netty.handler.codec.bytes.ByteArrayDecoder; -//import io.netty.handler.codec.bytes.ByteArrayEncoder; -//import io.netty.handler.flush.FlushConsolidationHandler; -//import io.netty.util.concurrent.FutureListener; -//import io.netty.util.concurrent.GlobalEventExecutor; -//import org.jgroups.protocols.netty.Netty; -//import org.jgroups.stack.IpAddress; -// -//import java.io.IOException; -//import java.net.InetAddress; -//import java.net.InetSocketAddress; -//import java.net.SocketAddress; -//import java.nio.ByteBuffer; -//import java.util.HashMap; -//import java.util.Map; -// -///*** -// * @author Baizel Mathew -// */ -//public class NettyClient { -//// protected final Log log = LogFactory.getLog(this.getClass()); -// -// ChannelGroup connections; -// private Map channelIds; -// private Bootstrap bootstrap; -// -// public NettyClient(EventLoopGroup group, InetAddress local_addr, int max_timeout_interval, boolean isNativeTransport) { -// -// channelIds = new HashMap<>(); -// connections = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); -// -// bootstrap = new Bootstrap(); -// bootstrap.group(group) -// .localAddress(local_addr, 0) -// .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, max_timeout_interval) -//// .option(ChannelOption.SO_REUSEADDR, true) -//// .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024, 64 * 1024)) -//// .option(ChannelOption.SO_SNDBUF,64*1024) // default is 16 * 1024 -// .option(ChannelOption.SO_RCVBUF,0) // default is 128 * 1024 -// .option(ChannelOption.AUTO_READ,false) -// .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) -// .option(ChannelOption.TCP_NODELAY, true) -// .handler(new ChannelInitializer() { -// @Override -// protected void initChannel(SocketChannel ch) throws Exception { -// ch.pipeline().addFirst(new FlushConsolidationHandler(512)); -// ch.pipeline().addLast(new ByteArrayEncoder()); -// ch.pipeline().addLast(new ByteArrayDecoder()); -// ch.pipeline().addLast(new ClientHandler()); -// } -// }); -// if (isNativeTransport) -// bootstrap.channel(EpollSocketChannel.class); -// else -// bootstrap.channel(NioSocketChannel.class); -// } -// -// public NettyClient(EventLoopGroup group, InetAddress local_addr, boolean isNativeTransport) { -// this(group, local_addr, 1000, isNativeTransport); -// } -// -// public Bootstrap getBootstrap(){ -// return bootstrap; -// } -//// public void send(IpAddress dest, byte[] data, int offset, int length) throws InterruptedException { -//// send(dest.getIpAddress(), dest.getPort(), data, offset, length); -//// } -// -//// public void send(InetAddress remote_addr, int remote_port, byte[] data, int offset, int length) throws InterruptedException { -//// InetSocketAddress dest = new InetSocketAddress(remote_addr, remote_port); -//// Channel ch = connect(dest); -//// if (ch != null && ch.isOpen()) { -//// byte[] packedData = Netty.pack(data, offset, length); -//// ch.eventLoop().execute(() -> { -//// ch.writeAndFlush(packedData, ch.voidPromise()); -//// }); -//// } -//// } -// -// -//// public Channel connect(InetSocketAddress remote_addr) throws InterruptedException { -//// ChannelId chId = channelIds.get(remote_addr); -//// if (chId != null) -//// return connections.find(chId); -//// -//// ChannelFuture cf = bootstrap.connect(remote_addr); -//// cf.addListener( -//// (ChannelFutureListener) -//// future1 -> { -//// if (future1.isSuccess()) { -//// } else { -//// } -//// }); -//// cf.awaitUninterruptibly(); // Wait max_timeout_interval seconds for conn -//// if (cf.isDone()) { -//// Channel ch = cf.channel(); -//// if (cf.isSuccess()) { -//// connections.add(ch); -//// channelIds.put(remote_addr, ch.id()); -//// return ch; -//// } else { -//// ch.close().sync(); -//// } -//// } -//// return null; -//// } -// -//// @ChannelHandler.Sharable -//// private class ClientHandler extends ChannelInboundHandlerAdapter { -//// -//// @Override -//// public void channelInactive(ChannelHandlerContext ctx) throws Exception { -//// Channel ch = ctx.channel(); -//// connections.remove(ch); -//// channelIds.remove(ch.remoteAddress()); -//// ch.close(); -//// } -//// -//// @Override -//// public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { -//// if (!(cause instanceof IOException)) { -////// log.error("Error caught in client " + cause.toString()); -//// cause.printStackTrace(); -//// } -//// } -//// -//// } -//} diff --git a/src/main/java/org/jgroups/blocks/cs/netty/NettyConnection.java b/src/main/java/org/jgroups/blocks/cs/netty/NettyConnection.java new file mode 100644 index 0000000..df2705f --- /dev/null +++ b/src/main/java/org/jgroups/blocks/cs/netty/NettyConnection.java @@ -0,0 +1,185 @@ +package org.jgroups.blocks.cs.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.*; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.unix.Errors; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; +import netty.listeners.ChannelLifecycleListener; +import netty.listeners.NettyReceiverListener; +import netty.utils.PipelineChannelInitializer; +import org.jgroups.Address; +import org.jgroups.stack.IpAddress; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; + +/*** + * @author Baizel Mathew + */ +public class NettyConnection { + + private final EventExecutorGroup separateWorkerGroup = new DefaultEventExecutorGroup(4); + private final Bootstrap outgoingBootstrap = new Bootstrap(); + private final Map ipAddressChannelMap = new HashMap<>(); + private byte[] replyAdder = null; + private int port; + private InetAddress bind_addr; + private EventLoopGroup boss_group; // Handles incoming connections + private EventLoopGroup worker_group; + private boolean isNativeTransport; + private NettyReceiverListener callback; + private ChannelLifecycleListener lifecycleListener; + + public NettyConnection(InetAddress bind_addr, int port, NettyReceiverListener callback, boolean isNativeTransport) { + this.port = port; + this.bind_addr = bind_addr; + this.callback = callback; + this.isNativeTransport = isNativeTransport; + boss_group = isNativeTransport ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1); + worker_group = isNativeTransport ? new EpollEventLoopGroup() : new NioEventLoopGroup(); + + lifecycleListener = new ChannelLifecycleListener() { + @Override + public void channelInactive(Channel channel) { + ipAddressChannelMap.values().remove(channel); + } + + @Override + public void channelRead(Channel channel, IpAddress sender) { + updateMap(channel, sender); + } + }; + + outgoingBootstrap.group(worker_group) + .handler(new PipelineChannelInitializer(this.callback, lifecycleListener, separateWorkerGroup)) + .localAddress(bind_addr, 0) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.TCP_NODELAY, true); + if (isNativeTransport) + outgoingBootstrap.channel(EpollSocketChannel.class); + else + outgoingBootstrap.channel(NioSocketChannel.class); + } + + public void run() throws InterruptedException, BindException, Errors.NativeIoException { + ServerBootstrap inboundBootstrap = new ServerBootstrap(); + inboundBootstrap.group(boss_group, worker_group) + .localAddress(bind_addr, port) + .childHandler(new PipelineChannelInitializer(this.callback, lifecycleListener, separateWorkerGroup)) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.TCP_NODELAY, true); + if (isNativeTransport) { + inboundBootstrap.channel(EpollServerSocketChannel.class); + } else { + inboundBootstrap.channel(NioServerSocketChannel.class); + } + inboundBootstrap.bind().sync(); + + try { + ByteArrayOutputStream replyAddByteStream = new ByteArrayOutputStream(); + DataOutputStream dStream = new DataOutputStream(replyAddByteStream); + new IpAddress(bind_addr, port).writeTo(dStream); + replyAdder = replyAddByteStream.toByteArray(); + } catch (IOException e) { + //Nodes will have to use two channels per connection + e.printStackTrace(); + } + } + + public final void send(IpAddress destAddr, byte[] data, int offset, int length) { + Channel opened = ipAddressChannelMap.getOrDefault(destAddr, null); + if (opened != null) { + writeToChannel(opened, data, offset, length); + } else + connectAndSend(destAddr, data, offset, length); + + } + + public final void connectAndSend(IpAddress addr, byte[] data, int offset, int length) { + ChannelFuture cf = outgoingBootstrap.connect(new InetSocketAddress(addr.getIpAddress(), addr.getPort())); + // Putting pack(...) inside the lambda causes unexpected behaviour. + // Both send and receive works fine but it does not get passed up properly, might be something to do with the buffer + ByteBuf packed = pack(cf.channel().alloc(), data, offset, length, replyAdder); + cf.addListener((ChannelFutureListener) channelFuture -> { + if (channelFuture.isSuccess()) { + Channel ch = channelFuture.channel(); + writeToChannel(ch, packed); + updateMap(ch, addr); + } + }); + } + + public final void connectAndSend(IpAddress addr) { + //Send an empty message so receiver knows reply addr. otherwise Receiver will make another connection + connectAndSend(addr, null, 0, 0); + } + + public Address getLocalAddress() { + return new IpAddress(bind_addr, port); + } + + public void shutdown() throws InterruptedException { + boss_group.shutdownGracefully(); + worker_group.shutdownGracefully(); + separateWorkerGroup.shutdownGracefully(); + } + + private void writeToChannel(Channel ch, byte[] data, int offset, int length) { + ByteBuf packed = pack(ch.alloc(), data, offset, length, replyAdder); + writeToChannel(ch, packed); + } + + private void writeToChannel(Channel ch, ByteBuf data) { + ch.eventLoop().execute(() -> { + ch.writeAndFlush(data, ch.voidPromise()); + }); + } + + private void updateMap(Channel connected, IpAddress destAddr) { + Channel channel = ipAddressChannelMap.get(destAddr); + if (channel != null && channel.id() == connected.id()) + return; + + if (channel != null) { + //if we already have a connection and then this will only be true in one of the nodes thus only closing one connection instead of two + if (connected.remoteAddress().equals(new InetSocketAddress(destAddr.getIpAddress(), destAddr.getPort()))) { + connected.close(); + } + return; + } + ipAddressChannelMap.put(destAddr, connected); + } + + private static ByteBuf pack(ByteBufAllocator allocator, byte[] data, int offset, int length, byte[] replyAdder) { + int allocSize = Integer.BYTES + length + Integer.BYTES + replyAdder.length; + ByteBuf buf = allocator.buffer(allocSize); + // size of data + size replyAddr.length field + space for reply addr bytes = total frame size + buf.writeInt(length + replyAdder.length + Integer.BYTES); //encode frame size and data length + buf.writeInt(replyAdder.length); + buf.writeBytes(replyAdder); + if (data != null) + buf.writeBytes(data, offset, length); + return buf; + } +} + diff --git a/src/main/java/org/jgroups/blocks/cs/netty/NettyServer.java b/src/main/java/org/jgroups/blocks/cs/netty/NettyServer.java deleted file mode 100644 index 5870774..0000000 --- a/src/main/java/org/jgroups/blocks/cs/netty/NettyServer.java +++ /dev/null @@ -1,262 +0,0 @@ -package org.jgroups.blocks.cs.netty; - -import io.netty.bootstrap.Bootstrap; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.*; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.epoll.EpollServerSocketChannel; -import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.group.DefaultChannelGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.channel.unix.Errors; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.handler.codec.bytes.ByteArrayEncoder; -import io.netty.handler.flush.FlushConsolidationHandler; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; -import io.netty.util.concurrent.GlobalEventExecutor; -import org.jgroups.Address; -import org.jgroups.stack.IpAddress; - -import java.io.*; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; - -/*** - * @author Baizel Mathew - */ -public class NettyServer { - - private int port; - private InetAddress bind_addr; - - private static final int CORES = Runtime.getRuntime().availableProcessors(); - //TODO: decide the optimal amount of threads for each loop - private EventLoopGroup boss_group; // Handles incoming connections - private EventLoopGroup worker_group; - private final EventExecutorGroup separateWorkerGroup = new DefaultEventExecutorGroup(4); - private boolean isNativeTransport; - private NettyReceiverCallback callback; - private Bootstrap outgoingBootstrap; - private ChannelInactiveListener inactive; - private byte[] replyAdder = null; -// private ByteBuffer sendBuffer = ByteBuffer.allocate(6500); - - private ChannelGroup allChannels; - private Map ipAddressChannelIdMap; - - public NettyServer(InetAddress bind_addr, int port, NettyReceiverCallback callback, boolean isNativeTransport) { - this.port = port; - this.bind_addr = bind_addr; - this.callback = callback; - ipAddressChannelIdMap = new HashMap<>(); - allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - - this.isNativeTransport = isNativeTransport; - boss_group = isNativeTransport ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1); - worker_group = isNativeTransport ? new EpollEventLoopGroup(4) : new NioEventLoopGroup(4); - inactive = channel -> { - allChannels.remove(channel); - ipAddressChannelIdMap.values().remove(channel.id()); - }; - outgoingBootstrap = new Bootstrap(); - outgoingBootstrap.group(worker_group) - .handler(new PipeLine(this.callback, inactive)) - .localAddress(bind_addr, 0) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .option(ChannelOption.TCP_NODELAY, true); - if (isNativeTransport) - outgoingBootstrap.channel(EpollSocketChannel.class); - else - outgoingBootstrap.channel(NioSocketChannel.class); -// } - } - - public Address getLocalAddress() { - return new IpAddress(bind_addr, port); - } - - public void shutdown() throws InterruptedException { - boss_group.shutdownGracefully(); - worker_group.shutdownGracefully(); - separateWorkerGroup.shutdownGracefully(); - } - - public void run() throws InterruptedException, BindException, Errors.NativeIoException { - ServerBootstrap inboundBootstrap = new ServerBootstrap(); - inboundBootstrap.group(boss_group, worker_group) - .localAddress(bind_addr, port) - .childHandler(new PipeLine(this.callback, inactive)) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.SO_BACKLOG, 128) - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .childOption(ChannelOption.TCP_NODELAY, true); - if (isNativeTransport) { - inboundBootstrap.channel(EpollServerSocketChannel.class); - } else { - inboundBootstrap.channel(NioServerSocketChannel.class); - } - inboundBootstrap.bind().sync(); - - try { - ByteArrayOutputStream replyAddByteStream = new ByteArrayOutputStream(); - DataOutputStream dStream = new DataOutputStream(replyAddByteStream); - new IpAddress(bind_addr, port).writeTo(dStream); - replyAdder = replyAddByteStream.toByteArray(); - } catch (IOException e) { - //Nodes will have to use two channels per connection - e.printStackTrace(); - } - } - - public void send(IpAddress destAddr, byte[] data, int offset, int length) throws InterruptedException, IOException { - byte[] packed = pack(data, offset, length, replyAdder); - //TODO: check if this is right behavior - if (destAddr == null) { - allChannels.writeAndFlush(packed); - return; - } - ChannelId opened = ipAddressChannelIdMap.getOrDefault(destAddr, null); - if (opened != null) - writeToChannel(allChannels.find(opened), packed); - else - connectAndSend(destAddr, packed); - - } - - private void writeToChannel(Channel ch, byte[] data) { - ch.eventLoop().execute(() -> ch.writeAndFlush(data, ch.voidPromise())); - } - - public void connectAndSend(IpAddress addr, byte[] data) { - ChannelFuture cf = outgoingBootstrap.connect(new InetSocketAddress(addr.getIpAddress(), addr.getPort())); - cf.addListener((ChannelFutureListener) channelFuture -> { - if (channelFuture.isSuccess()) { - Channel ch = channelFuture.channel(); - writeToChannel(ch, data); - updateMap(ch, addr); - } - }); - } - - public void connectAndSend(IpAddress addr) throws IOException { - //Send an empty message so receiver knows reply addr - connectAndSend(addr, pack(new byte[0], 0, 0, replyAdder)); - } - - private void updateMap(Channel connected, IpAddress destAddr) { - ChannelId id = ipAddressChannelIdMap.get(destAddr); - if (id != null && id == connected.id()) - return; - - if (id != null) { - //if we already have a connection and then this will only be true in one of the nodes thus only closing one connection instead of two - if (connected.remoteAddress().equals(new InetSocketAddress(destAddr.getIpAddress(), destAddr.getPort()))) { - connected.close(); - } - return; - } - ipAddressChannelIdMap.put(destAddr, connected.id()); - allChannels.add(connected); - } - - @ChannelHandler.Sharable - private class ReceiverHandler extends ChannelInboundHandlerAdapter { - private NettyReceiverCallback cb; - private ChannelInactiveListener lifecycleListener; - private byte[] buffer = new byte[65200]; - - public ReceiverHandler(NettyReceiverCallback cb, ChannelInactiveListener lifecycleListener) { - super(); - this.cb = cb; - this.lifecycleListener = lifecycleListener; - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - InetSocketAddress soc = (InetSocketAddress) ctx.channel().remoteAddress(); - Address sender = new IpAddress(soc.getAddress(), soc.getPort()); - - ByteBuf msgbuf = (ByteBuf) msg; - int length = msgbuf.readInt(); - int addrLen = msgbuf.readInt(); - - if (buffer.length < length + addrLen) { - buffer = new byte[length + addrLen]; - } - - msgbuf.readBytes(buffer, 0, addrLen); - msgbuf.readBytes(buffer, addrLen, length); - - IpAddress ad = new IpAddress(); - ad.readFrom(new DataInputStream(new ByteArrayInputStream(buffer, 0, addrLen))); - //TODO: should the sender be the client or server address from remote - cb.onReceive(sender, buffer, addrLen, length); - msgbuf.release(); - updateMap(ctx.channel(), ad); - - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - lifecycleListener.channelInactive(ctx.channel()); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cb.onError(cause); - } - - } - - private class PipeLine extends ChannelInitializer { - private NettyReceiverCallback cb; - private ChannelInactiveListener lifecycleListener; - - public final int MAX_FRAME_LENGTH = Integer.MAX_VALUE; // not sure if this is a great idea - public final int LENGTH_OF_FIELD = Integer.BYTES; - - public PipeLine(NettyReceiverCallback cb, ChannelInactiveListener lifecycleListener) { - this.cb = cb; - this.lifecycleListener = lifecycleListener; - } - - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addFirst(new FlushConsolidationHandler(1000 * 32, true));//outbound and inbound (1) - ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, LENGTH_OF_FIELD, 0, LENGTH_OF_FIELD));//inbound head (2) - ch.pipeline().addLast(new ByteArrayEncoder()); //outbound tail (3) - ch.pipeline().addLast(separateWorkerGroup, "handlerThread", new ReceiverHandler(cb, lifecycleListener)); // (4) - // inbound ---> 1, 2, 4 - // outbound --> 3,1 - } - } - - private interface ChannelInactiveListener { - void channelInactive(Channel channel); - } - - private static byte[] pack(byte[] data, int offset, int length, byte[] replyAdder) throws IOException { - int allocSize = Integer.BYTES + Integer.BYTES + length + Integer.BYTES + replyAdder.length; - ByteBuffer buf = ByteBuffer.allocate(allocSize); - buf.putInt(allocSize - Integer.BYTES); -// buf.putInt( Integer.BYTES + length + Integer.BYTES + replyAdder.length); - buf.putInt(length); - buf.putInt(replyAdder.length); - buf.put(replyAdder); - buf.put(data, offset, length); - return buf.array(); - } -} - diff --git a/src/main/java/org/jgroups/protocols/netty/Netty.java b/src/main/java/org/jgroups/protocols/netty/Netty.java index 6f0f288..bf53e35 100644 --- a/src/main/java/org/jgroups/protocols/netty/Netty.java +++ b/src/main/java/org/jgroups/protocols/netty/Netty.java @@ -1,11 +1,12 @@ package org.jgroups.protocols.netty; import io.netty.channel.unix.Errors; -import org.jgroups.blocks.cs.netty.NettyReceiverCallback; -import org.jgroups.blocks.cs.netty.NettyServer; +import io.netty.util.ResourceLeakDetector; +import netty.listeners.NettyReceiverListener; import org.jgroups.Address; import org.jgroups.PhysicalAddress; import org.jgroups.annotations.Property; +import org.jgroups.blocks.cs.netty.NettyConnection; import org.jgroups.protocols.TP; import org.jgroups.stack.IpAddress; @@ -15,10 +16,13 @@ * @author Baizel Mathew */ public class Netty extends TP { - @Property(description="Use INative packages when available") + @Property(description = "Use Native packages when available") protected boolean use_native_transport; - private NettyServer server; + @Property(description = "Use Native packages when available") + protected String resource_leak_detector_level; + + private NettyConnection server; private IpAddress selfAddress = null; @@ -29,7 +33,7 @@ public boolean supportsMulticasting() { @Override public void sendMulticast(byte[] data, int offset, int length) throws Exception { - _send(null, data, offset, length); + sendToMembers(members, data, offset, length); } @Override @@ -44,6 +48,8 @@ public String getInfo() { @Override public void start() throws Exception { + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(resource_leak_detector_level)); + boolean isServerCreated = createServer(); while (!isServerCreated && bind_port < bind_port + port_range) { //Keep trying to create server until @@ -86,13 +92,11 @@ private void _send(Address dest, byte[] data, int offset, int length) throws Exc private boolean createServer() throws InterruptedException { try { - server = new NettyServer(bind_addr, bind_port, new NettyReceiverCallback() { + server = new NettyConnection(bind_addr, bind_port, new NettyReceiverListener() { @Override public void onReceive(Address sender, byte[] msg, int offset, int length) { //This method is called from a non IO thread. it should be safe for this to block without affecting netty receive - synchronized (this) { - receive(sender, msg, offset, length); - } + receive(sender, msg, offset, length); } @Override diff --git a/src/main/resources/netty.xml b/src/main/resources/netty.xml index 7d9688d..c895257 100644 --- a/src/main/resources/netty.xml +++ b/src/main/resources/netty.xml @@ -1,22 +1,19 @@ diff --git a/src/main/resources/tcp.xml b/src/main/resources/tcp.xml index a7c691b..612e952 100644 --- a/src/main/resources/tcp.xml +++ b/src/main/resources/tcp.xml @@ -9,13 +9,12 @@ xmlns="urn:org:jgroups" xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">