Skip to content

Commit

Permalink
update pyport
Browse files Browse the repository at this point in the history
tiny log
add ignore
  • Loading branch information
absolute8511 committed Dec 20, 2012
1 parent 2849159 commit 8522be4
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 23 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
*.so
*.so.*
*.dylib
*.o
*.dSYM
*.DS_Store
Expand Down
8 changes: 8 additions & 0 deletions PyPort/NetMsgBusDataDef.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def PackData(self):

def UnPackBody(self, data):
(self.service_name, ) = struct.unpack_from('!' + MAX_SERVICE_NAME_STR + 's', data, 0)
self.service_name = self.service_name.strip('\0')
self.service_host.unpack(data[MAX_SERVICE_NAME:])

def UnPackData(self, data):
Expand Down Expand Up @@ -126,6 +127,7 @@ def PackData(self):

def UnPackBody(self, data):
(self.ret_code, self.service_name, self.err_msg_len) = struct.unpack_from('!H' + MAX_SERVICE_NAME_STR + 'sH', data, 0)
self.service_name = self.service_name.strip('\0')
used_size = struct.calcsize('!H' + MAX_SERVICE_NAME_STR + 'sH')
(self.err_msg, ) = struct.unpack_from('!' + str(self.err_msg_len) + 's', data, used_size)

Expand Down Expand Up @@ -163,6 +165,7 @@ def PackData(self):

def UnPackBody(self, data):
(self.service_name, ) = struct.unpack_from('!' + MAX_SERVICE_NAME_STR + 's', data, 0)
self.service_name = self.service_name.strip('\0')
self.service_host.unpack(data[MAX_SERVICE_NAME:])

def UnPackData(self, data):
Expand Down Expand Up @@ -230,6 +233,7 @@ def PackData(self):

def UnPackBody(self, data):
(self.dest_name, ) = struct.unpack_from('!' + MAX_SERVICE_NAME_STR + 's', data, 0)
self.dest_name = self.dest_name.strip('\0')

def UnPackData(self, data):
self.UnPackReqHead(data)
Expand All @@ -254,6 +258,7 @@ def PackData(self):

def UnPackBody(self, data):
(self.ret_code, self.dest_name) = struct.unpack_from('!H' + MAX_SERVICE_NAME_STR + 's', data, 0)
self.dest_name = self.dest_name.strip('\0')
used_size = struct.calcsize('!H' + MAX_SERVICE_NAME_STR + 's')
self.dest_host.unpack(data[used_size:])

Expand Down Expand Up @@ -284,6 +289,8 @@ def PackData(self):
def UnPackBody(self, data):
(self.dest_name, self.from_name, self.msg_id, self.msg_len) = struct.unpack_from(
'!' + MAX_SERVICE_NAME_STR + 's' + MAX_SERVICE_NAME_STR + 'sII', data, 0)
self.dest_name = self.dest_name.strip('\0')
self.from_name = self.from_name.strip('\0')
used_size = struct.calcsize('!' + MAX_SERVICE_NAME_STR + 's' + MAX_SERVICE_NAME_STR + 'sII')
(self.msg_content, ) = struct.unpack_from( '!' + str(self.msg_len) + 's', data, used_size)

Expand Down Expand Up @@ -377,6 +384,7 @@ def UnPackBody(self, data):
(self.pbtype_len, self.pbdata_len) = struct.unpack_from('!ii', data, 0)
used_size = struct.calcsize('!ii')
(self.pbtype, self.pbdata) = struct.unpack_from('!' + str(self.pbtype_len) + 's' + str(self.pbdata_len) + 's', data, used_size)
self.pbtype = self.pbtype.rstrip('\0')

def UnPackData(self, data):
self.UnPackHead(data)
Expand Down
4 changes: 2 additions & 2 deletions PyPort/NetMsgBusReceiverMgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def handle_write(self):

def handle_error(self):
self.close()
log.error('client connection has error : %s', self.addr)
log.error('!!!!! client connection has error : %s !!!!!!', self.addr)

def ReceivePack(self):
msg_pack = ReceiverSendMsgReq()
Expand Down Expand Up @@ -138,7 +138,7 @@ def handle_close(self):
def handle_error(self):
self.close()
self.is_closed = True
log.error('receiver server connection has error')
log.error('!!!!! receiver server connection has error.!!!!!!')

class ReceiverMgrServerRunner(threading.Thread):
def __init__(self, receivermgr):
Expand Down
37 changes: 24 additions & 13 deletions PyPort/NetMsgBusReq2ReceiverMgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def handle_close(self):
log.info('receiver connection to %s:%d closed', self.dest_ip, self.dest_port)

def handle_read(self):
if self.need_stop:
self.close()
self.is_closed = True
return
log.debug('begin read data from receiver')
self.ReceivePack()

Expand All @@ -45,6 +49,10 @@ def writable(self):
return (len(self.buffer) > 0)

def handle_write(self):
if self.need_stop:
self.close()
self.is_closed = True
return
log.debug('begin write data to netmsgbus receiver')
sent = self.send(self.buffer)
self.buffer = self.buffer[sent:]
Expand All @@ -53,7 +61,7 @@ def handle_error(self):
self.req2receivermgr.handle_channel_close()
self.close()
self.is_closed = True
log.error('netmsgbus receiver connection has error')
log.error('!!!! receiver channel connection has error !!!!!')

def ReceivePack(self):
rsp = ReceiverSendMsgRsp()
Expand All @@ -79,10 +87,10 @@ def set_result(self, rsp):
with self.lock:
self.rsp = rsp
self.ready = True
if callable(self.callback):
log.debug('future ready callback')
self.callback(self)
self.cond.notify_all()
if callable(self.callback):
log.debug('future ready callback')
self.callback(self)

def get(self, timeout):
with self.lock:
Expand Down Expand Up @@ -117,7 +125,7 @@ def CreateTcpConn(self, ipport, num=5):
def ClearAll(self):
for ipport,host_channels in self.channels.items():
for channel in host_channels:
channel.close()
channel.need_stop = True
self.channels.clear()

# req2task : (syncflag, retry, futurepair, timeout, (destip, desthost)/destname, data)
Expand All @@ -126,6 +134,7 @@ def __init__(self, server_conn_mgr):
MsgBusHandlerBase.__init__(self)
self.server_conn_mgr = server_conn_mgr
self.wait2send_task = {}
self.wait2send_task_lock = threading.Lock()
self.task_queue = []
self.task_queue_lock = threading.Lock()
self.task_queue_cond = threading.Condition(self.task_queue_lock)
Expand All @@ -137,7 +146,7 @@ def __init__(self, server_conn_mgr):
self.sockmap = {}
self.tcp_conn_pool = TcpClientPool(self.sockmap, self)
self.stop = False
self.AddHandler("netmsgbus.server.getclient.rsp", self.HandleRspGetClient)
self.AddHandler("netmsg.sever.rsp.getclient", self.HandleRspGetClient)

def Stop(self):
with self.task_queue_lock:
Expand All @@ -150,17 +159,17 @@ def SendMsgDirectToClient(self, ipport_or_name, data, timeout):
task = {'sync':True, 'retry':False, 'future':self.GetFuture(), 'timeout':timeout, 'dest':ipport_or_name, 'data':data}
return self.ProcessReqToReceiver(task)

def PostMsgDirectToClient(self, ipport_or_name, data):
def PostMsgDirectToClient(self, ipport_or_name, data, callback = None):
if self.stop:
return None
future_pair = self.GetFuture()
future_pair = self.GetFuture(callback)
retry = True
if isinstance(ipport_or_name, tuple):
# using (ip, port) no retry getting host info need.
retry = False
task = {'sync':False, 'retry':retry, 'future':future_pair, 'timeout':None, 'dest':ipport_or_name, 'data':data}
self.QueueReqTaskToReceiver(task)
log.debug('post task and return :%d ', future_pair[0])
log.debug('post task and return futureid:%d ', future_pair[0])
return future_pair[1]

def ClearData(self):
Expand All @@ -169,13 +178,13 @@ def ClearData(self):
self.future_map.clear()
self.tcp_conn_pool.ClearAll()

def GetFuture(self):
def GetFuture(self, callback = None):
with self.future_map_lock:
self.futureid += 1
if self.futureid == 0:
self.futureid += 1
futureid = self.futureid
new_future = Future()
new_future = Future(callback)
self.future_map[futureid] = new_future
return (futureid, new_future)

Expand Down Expand Up @@ -210,8 +219,6 @@ def HandleRspGetClient(self, msgid, msgparam):
else:
log.debug('pending task client rsp:%s, but no pending task in client', clientname)
if ret_code == 0:
hostinfo.host_ip = ip;
hostinfo.host_port = ntohs(rsp.dest_host.server_port);
log.debug('get client info returned. ret name : %s, ip:port : %s:%d', clientname, hostinfo[0],
hostinfo[1]);
with self.cache_lock:
Expand All @@ -234,6 +241,8 @@ def QueueReqTaskToReceiver(self, task):

def QueueWaitingTask(self, task):
with self.wait2send_task_lock:
if task['dest'] not in self.wait2send_task.keys():
self.wait2send_task[task['dest']] = []
self.wait2send_task[task['dest']].append(task)
self.server_conn_mgr.ReqReceiverInfo(task['dest'])

Expand All @@ -253,6 +262,8 @@ def ProcessReqToReceiver(self, task):
destclient = task['dest']
else:
destclient = self.GetCachedClient(task['dest'])
log.debug('use cached client info : %s,', task['dest'])
print destclient
if destclient is None:
if task['retry']:
self.QueueWaitingTask(task)
Expand Down
8 changes: 4 additions & 4 deletions PyPort/NetMsgBusServerConnMgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
import logging
from NetMsgBusDataDef import *
from LocalMsgBus import *
import LocalMsgBus
from NetMsgBus import * # for protobuf Type

logging.basicConfig(level=logging.DEBUG, format="%(created)-15s %(msecs)d %(levelname)8s %(thread)d %(name)s %(message)s")
Expand Down Expand Up @@ -98,7 +98,7 @@ def handle_write(self):
def handle_error(self):
self.close()
self.is_closed = True
log.error('netmsgbus server connection has error')
log.error('!!!!netmsgbus server connection has error!!!!!')

def ReceivePack(self):
head = MsgBusPackHead()
Expand Down Expand Up @@ -140,7 +140,7 @@ def HandleRspGetClient(self, bodybuffer):
dest_port = rsp.dest_host.server_port
dest_clientname = rsp.dest_name
log.info('get client info returned. ret name: %s, ip:port : %s:%d', dest_clientname, dest_ip, dest_port)
LocalMsgBus.SendMsg('netmsg.sever.rsp.getclient', [dest_clientname, (dest_ip, dest_port)])
LocalMsgBus.SendMsg('netmsg.sever.rsp.getclient', (rsp.ret_code, dest_clientname, (dest_ip, dest_port)))
else:
log.info('msgbus server return error while get client info, ret_code: %d.', rsp.ret_code)

Expand Down Expand Up @@ -172,7 +172,7 @@ def HandleRspPBBody(self, bodybuffer):
pbpack = MsgBusPackPBType()
pbpack.UnPackBody(bodybuffer)
log.debug('got pbrsp, pbtype:%s.', pbpack.GetPBType())
self.pbdata_handlers.get(pbpack.GetPBType().rstrip('\0'), self.HandlePBUnknown)(pbpack.GetPBData())
self.pbdata_handlers.get(pbpack.GetPBType(), self.HandlePBUnknown)(pbpack.GetPBData())

def HandleUnknown(self, bodybuffer):
log.error('got unknown body from netmsgbus server.')
Expand Down
15 changes: 12 additions & 3 deletions PyPort/msgbus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ def test_localmsg_handler(msgid, msgparam):
print 'handler in test local msgbus: param' + msgparam
return ('retparam', True)

def test_future_call_back(future):
if future.ready:
log.info('callback from future, ready for rsp: %s', future.rsp)
else:
log.info('callback from future, failed to get rsp')

class TestHandler:
def OnMsg(self, msgid, msgparam):
print 'handle in OnMsg object'
Expand Down Expand Up @@ -57,14 +63,17 @@ def StaticOnMsg(msgid, msgparam):
serverconn_bg.join(3)

# test for req2receivermgr
rsp = req2receivermgr.SendMsgDirectToClient(('127.0.0.1', 9101), 'msgid=msg_netmsgbus_testmsg1&msgparam=datafrompython', 3)
rsp = req2receivermgr.SendMsgDirectToClient(('127.0.0.1', 9101), 'msgid=msg_netmsgbus_testmsg1&msgparam={"testkey":11111, "testlongdata": "frompythondata"}', 3)
if rsp[0]:
print 'sync get data from receiver: ' + rsp[1]
else:
print 'sync get data from receiver failed'

future = req2receivermgr.PostMsgDirectToClient(('127.0.0.1', 9101), 'msgid=msg_netmsgbus_testmsg1&msgparam=datafrompython')
print 'async get data from receiver: ' + future.get(3)
#future = req2receivermgr.PostMsgDirectToClient(('127.0.0.1', 9101), 'msgid=msg_netmsgbus_testmsg1&msgparam={"testkey":11112, "testlongdata": "frompythondata"}', test_future_call_back)
#print 'async get data from receiver: ' + future.get(3)

future = req2receivermgr.PostMsgDirectToClient('test.receiverclient_B', 'msgid=msg_netmsgbus_testmsg1&msgparam={"testkey":11113, "testlongdata": "frompythondata"}', test_future_call_back)
print 'async get data using name : ' + future.get(5)

# wait for test receiver server, wait for data from other client. and test for long no active
receiver_bg.join(30)
Expand Down

0 comments on commit 8522be4

Please sign in to comment.