Skip to content

Commit

Permalink
Merge pull request #3 from jgroups-extras/gccloud-config
Browse files Browse the repository at this point in the history
Throughput optimisations
  • Loading branch information
baizel authored May 17, 2020
2 parents 1729974 + 057404f commit c8447dc
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 425 deletions.
13 changes: 13 additions & 0 deletions src/main/java/netty/listeners/ChannelLifecycleListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package netty.listeners;

import io.netty.channel.Channel;
import org.jgroups.stack.IpAddress;

/***
* @author Baizel Mathew
*/
public interface ChannelLifecycleListener {
void channelInactive(Channel channel);

void channelRead(Channel channel, IpAddress sender);
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package org.jgroups.blocks.cs.netty;
package netty.listeners;

import org.jgroups.Address;

/***
* @author Baizel Mathew
*/
public interface NettyReceiverCallback {
public interface NettyReceiverListener {
void onReceive(Address sender, byte[] msg, int offset, int length);

void onError(Throwable ex);
}
}
36 changes: 36 additions & 0 deletions src/main/java/netty/utils/PipelineChannelInitializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package netty.utils;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.util.concurrent.EventExecutorGroup;
import netty.listeners.ChannelLifecycleListener;
import netty.listeners.NettyReceiverListener;

/***
* @author Baizel Mathew
*/
public class PipelineChannelInitializer extends ChannelInitializer<SocketChannel> {
private NettyReceiverListener nettyReceiverListener;
private ChannelLifecycleListener lifecycleListener;
private EventExecutorGroup separateWorkerGroup;

public final int MAX_FRAME_LENGTH = Integer.MAX_VALUE; // not sure if this is a great idea
public final int LENGTH_OF_FIELD = Integer.BYTES;

public PipelineChannelInitializer(NettyReceiverListener nettyReceiverListener, ChannelLifecycleListener lifecycleListener, EventExecutorGroup separateWorkerGroup) {
this.nettyReceiverListener = nettyReceiverListener;
this.lifecycleListener = lifecycleListener;
this.separateWorkerGroup = separateWorkerGroup;
}

@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addFirst(new FlushConsolidationHandler(1000 * 32, true));//outbound and inbound (1)
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, LENGTH_OF_FIELD));//inbound head (2)
ch.pipeline().addLast(separateWorkerGroup, "handlerThread", new ReceiverHandler(nettyReceiverListener, lifecycleListener)); // (4)
// inbound ---> 1, 2, 4
// outbound --> 1
}
}
60 changes: 60 additions & 0 deletions src/main/java/netty/utils/ReceiverHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package netty.utils;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import netty.listeners.ChannelLifecycleListener;
import netty.listeners.NettyReceiverListener;
import org.jgroups.stack.IpAddress;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;

/***
* @author Baizel Mathew
*/
@ChannelHandler.Sharable
public class ReceiverHandler extends ChannelInboundHandlerAdapter {
private NettyReceiverListener nettyReceiverListener;
private ChannelLifecycleListener lifecycleListener;
private byte[] buffer = new byte[65200];

public ReceiverHandler(NettyReceiverListener nettyReceiverListener, ChannelLifecycleListener lifecycleListener) {
super();
this.nettyReceiverListener = nettyReceiverListener;
this.lifecycleListener = lifecycleListener;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf msgbuf = (ByteBuf) msg;
int totalLength = msgbuf.readInt();
int addrLen = msgbuf.readInt();
int dataLen = totalLength - Integer.BYTES - addrLen;

msgbuf.readBytes(buffer, 0, addrLen);
msgbuf.readBytes(buffer, addrLen, dataLen);

IpAddress sender = new IpAddress();
sender.readFrom(new DataInputStream(new ByteArrayInputStream(buffer, 0, addrLen)));
synchronized (this) {
nettyReceiverListener.onReceive(sender, buffer, addrLen, dataLen);
}
msgbuf.release();
lifecycleListener.channelRead(ctx.channel(), sender);

}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
lifecycleListener.channelInactive(ctx.channel());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
nettyReceiverListener.onError(cause);
}

}

140 changes: 0 additions & 140 deletions src/main/java/org/jgroups/blocks/cs/netty/NettyClient.java

This file was deleted.

Loading

0 comments on commit c8447dc

Please sign in to comment.