Skip to content

Commit

Permalink
added a length param to start_proxy, to enable partial proxying.
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Gupta <[email protected]>
  • Loading branch information
vadims authored and tmm1 committed Apr 4, 2010
1 parent 24135cb commit feaf12a
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 14 deletions.
4 changes: 2 additions & 2 deletions ext/cmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -796,12 +796,12 @@ extern "C" int evma_send_file_data_to_connection (const unsigned long binding, c
evma_start_proxy
*****************/

extern "C" void evma_start_proxy (const unsigned long from, const unsigned long to, const unsigned long bufsize)
extern "C" void evma_start_proxy (const unsigned long from, const unsigned long to, const unsigned long bufsize, const unsigned long length)
{
ensure_eventmachine("evma_start_proxy");
EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (from));
if (ed)
ed->StartProxy(to, bufsize);
ed->StartProxy(to, bufsize, length);
}


Expand Down
23 changes: 19 additions & 4 deletions ext/ed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,13 @@ bool EventableDescriptor::IsCloseScheduled()
EventableDescriptor::StartProxy
*******************************/

void EventableDescriptor::StartProxy(const unsigned long to, const unsigned long bufsize)
void EventableDescriptor::StartProxy(const unsigned long to, const unsigned long bufsize, const unsigned long length)
{
EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (to));
if (ed) {
StopProxy();
ProxyTarget = ed;
BytesToProxy = length;
ed->SetProxiedFrom(this, bufsize);
return;
}
Expand Down Expand Up @@ -236,10 +237,24 @@ void EventableDescriptor::_GenericInboundDispatch(const char *buf, int size)
{
assert(EventCallback);

if (ProxyTarget)
ProxyTarget->SendOutboundData(buf, size);
else
if (ProxyTarget) {
if (BytesToProxy > 0) {
unsigned long proxied = std::min(BytesToProxy, (unsigned long) size);
ProxyTarget->SendOutboundData(buf, proxied);
BytesToProxy -= proxied;
if (BytesToProxy == 0) {
StopProxy();
(*EventCallback)(GetBinding(), EM_PROXY_COMPLETED, NULL, 0);
if (proxied < size) {
(*EventCallback)(GetBinding(), EM_CONNECTION_READ, buf + proxied, size - proxied);
}
}
} else {
ProxyTarget->SendOutboundData(buf, size);
}
} else {
(*EventCallback)(GetBinding(), EM_CONNECTION_READ, buf, size);
}
}


Expand Down
4 changes: 3 additions & 1 deletion ext/ed.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class EventableDescriptor: public Bindable_t
struct epoll_event *GetEpollEvent() { return &EpollEvent; }
#endif

virtual void StartProxy(const unsigned long, const unsigned long);
virtual void StartProxy(const unsigned long, const unsigned long, const unsigned long);
virtual void StopProxy();
virtual void SetProxiedFrom(EventableDescriptor*, const unsigned long);
virtual int SendOutboundData(const char*,int){ return -1; }
Expand All @@ -109,6 +109,8 @@ class EventableDescriptor: public Bindable_t
uint64_t CreatedAt;
bool bCallbackUnbind;
int UnbindReasonCode;

unsigned long BytesToProxy;
EventableDescriptor *ProxyTarget;
EventableDescriptor *ProxiedFrom;

Expand Down
5 changes: 3 additions & 2 deletions ext/eventmachine.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ extern "C" {
EM_CONNECTION_NOTIFY_WRITABLE = 107,
EM_SSL_HANDSHAKE_COMPLETED = 108,
EM_SSL_VERIFY = 109,
EM_PROXY_TARGET_UNBOUND = 110
EM_PROXY_TARGET_UNBOUND = 110,
EM_PROXY_COMPLETED = 111

};

Expand Down Expand Up @@ -105,7 +106,7 @@ extern "C" {
const unsigned long evma_watch_pid (int);
void evma_unwatch_pid (const unsigned long);

void evma_start_proxy(const unsigned long, const unsigned long, const unsigned long);
void evma_start_proxy(const unsigned long, const unsigned long, const unsigned long, const unsigned long);
void evma_stop_proxy(const unsigned long);

int evma_set_rlimit_nofile (int n_files);
Expand Down
14 changes: 11 additions & 3 deletions ext/rubymain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ static VALUE Intern_ssl_verify_peer;
static VALUE Intern_notify_readable;
static VALUE Intern_notify_writable;
static VALUE Intern_proxy_target_unbound;
static VALUE Intern_proxy_completed;
static VALUE Intern_connection_completed;

static VALUE rb_cProcStatus;
Expand Down Expand Up @@ -156,6 +157,12 @@ static inline void event_callback (struct em_event* e)
rb_funcall (conn, Intern_proxy_target_unbound, 0);
return;
}
case EM_PROXY_COMPLETED:
{
VALUE conn = ensure_conn(signature);
rb_funcall (conn, Intern_proxy_completed, 0);
return;
}
}
}

Expand Down Expand Up @@ -1019,9 +1026,9 @@ static VALUE t_get_loop_time (VALUE self)
t_start_proxy
**************/

static VALUE t_start_proxy (VALUE self, VALUE from, VALUE to, VALUE bufsize)
static VALUE t_start_proxy (VALUE self, VALUE from, VALUE to, VALUE bufsize, VALUE length)
{
evma_start_proxy(NUM2ULONG (from), NUM2ULONG (to), NUM2ULONG(bufsize));
evma_start_proxy(NUM2ULONG (from), NUM2ULONG (to), NUM2ULONG(bufsize), NUM2ULONG(length));
return Qnil;
}

Expand Down Expand Up @@ -1086,6 +1093,7 @@ extern "C" void Init_rubyeventmachine()
Intern_notify_readable = rb_intern ("notify_readable");
Intern_notify_writable = rb_intern ("notify_writable");
Intern_proxy_target_unbound = rb_intern ("proxy_target_unbound");
Intern_proxy_completed = rb_intern ("proxy_completed");
Intern_connection_completed = rb_intern ("connection_completed");

// INCOMPLETE, we need to define class Connections inside module EventMachine
Expand Down Expand Up @@ -1130,7 +1138,7 @@ extern "C" void Init_rubyeventmachine()
rb_define_module_function (EmModule, "resume_connection", (VALUE (*)(...))t_resume, 1);
rb_define_module_function (EmModule, "connection_paused?", (VALUE (*)(...))t_paused_p, 1);

rb_define_module_function (EmModule, "start_proxy", (VALUE (*)(...))t_start_proxy, 3);
rb_define_module_function (EmModule, "start_proxy", (VALUE (*)(...))t_start_proxy, 4);
rb_define_module_function (EmModule, "stop_proxy", (VALUE (*)(...))t_stop_proxy, 1);

rb_define_module_function (EmModule, "watch_filename", (VALUE (*)(...))t_watch_filename, 1);
Expand Down
1 change: 1 addition & 0 deletions java/src/com/rubyeventmachine/EmReactor.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class EmReactor {
public final int EM_SSL_HANDSHAKE_COMPLETED = 108;
public final int EM_SSL_VERIFY = 109;
public final int EM_PROXY_TARGET_UNBOUND = 110;
public final int EM_PROXY_COMPLETED = 111

private Selector mySelector;
private TreeMap<Long, ArrayList<Long>> Timers;
Expand Down
5 changes: 5 additions & 0 deletions lib/em/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ def unbind
def proxy_target_unbound
end

# EventMachine::Connection#proxy_completed is called when the reactor finished proxying all
# of the requested bytes.
def proxy_completed
end

# EventMachine::Connection#proxy_incoming_to is called only by user code. It sets up
# a low-level proxy relay for all data inbound for this connection, to the connection given
# as the argument. This is essentially just a helper method for enable_proxy.
Expand Down
4 changes: 2 additions & 2 deletions lib/eventmachine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1369,8 +1369,8 @@ def self.error_handler cb = nil, &blk
# EM.run {
# EM.start_server("127.0.0.1", 8080, ProxyServer)
# }
def self.enable_proxy(from, to, bufsize=0)
EM::start_proxy(from.signature, to.signature, bufsize)
def self.enable_proxy(from, to, bufsize=0, length=0)
EM::start_proxy(from.signature, to.signature, bufsize, length)
end

# disable_proxy takes just one argument, a Connection that has proxying enabled via enable_proxy.
Expand Down
52 changes: 52 additions & 0 deletions tests/test_proxy_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,40 @@ def unbind
end
end

module PartialProxyConnection
def initialize(client, request, length)
@client, @request, @length = client, request, length
end

def post_init
EM::enable_proxy(self, @client, 0, @length)
end

def receive_data(data)
$unproxied_data = data
@client.send_data(data)
end

def connection_completed
EM.next_tick {
send_data @request
}
end

def proxy_target_unbound
$unbound_early = true
EM.stop
end

def proxy_completed
$proxy_completed = true
end

def unbind
@client.close_connection_after_writing
end
end

module Client
def connection_completed
send_data "EventMachine rocks!"
Expand Down Expand Up @@ -61,6 +95,12 @@ def receive_data(data)
end
end

module PartialProxyServer
def receive_data(data)
EM.connect("127.0.0.1", 54321, PartialProxyConnection, self, data, 1)
end
end

module EarlyClosingProxy
def receive_data(data)
EM.connect("127.0.0.1", 54321, ProxyConnection, self, data)
Expand All @@ -78,6 +118,18 @@ def test_proxy_connection
assert_equal("I know!", $client_data)
end

def test_partial_proxy_connection
EM.run {
EM.start_server("127.0.0.1", 54321, Server)
EM.start_server("127.0.0.1", 12345, PartialProxyServer)
EM.connect("127.0.0.1", 12345, Client)
}

assert_equal("I know!", $client_data)
assert_equal(" know!", $unproxied_data)
assert($proxy_completed)
end

def test_early_close
$client_data = nil
EM.run {
Expand Down

0 comments on commit feaf12a

Please sign in to comment.