diff --git a/Core/test.cpp b/Core/test.cpp index 3c7c31e..8f57f5d 100644 --- a/Core/test.cpp +++ b/Core/test.cpp @@ -119,8 +119,8 @@ class MyMsgHandlerClass : public MsgHandler } bool testMsgBus1(const std::string& msgid, MsgBusParam& param, bool& is_continue) { - printMsg(msgid, param, __FUNCTION__); - GenerateNextTestParam(param); + //printMsg(msgid, param, __FUNCTION__); + //GenerateNextTestParam(param); //NetMsgBusSendMsg("test.receiverclient_C", "rsp_msg_netmsgbus_testmsg1", param, SendDirectToClient); //NetMsgBusSendMsg("", "rsp_msg_netmsgbus_testmsg1", param, SendUseServerRelay); //sleep(1); @@ -137,7 +137,7 @@ class MyMsgHandlerClass : public MsgHandler } bool testMsgBus3(const std::string& msgid, MsgBusParam& param, bool& is_continue) { - printMsg(msgid, param, __FUNCTION__); + //printMsg(msgid, param, __FUNCTION__); std::string rspstr("Yeah! I send the rsp data to you."); param = CustomType2Param(rspstr); //sleep(3); @@ -983,8 +983,8 @@ int main() //threadpool::queue_work_task(boost::bind(testlocalmsgbus), 0); //threadpool::queue_work_task(boost::bind(testlocalmsgbus), 1); //testconcurrent_local(); - testremotemsgbus(); - //testremotemsgbus_without_server(); + //testremotemsgbus(); + testremotemsgbus_without_server(); MsgHandlerMgr::DropAllInstance(); EventLoopPool::DestroyEventLoopPool(); NetMsgBusDisConnectAll(); diff --git a/PyPort/NetMsgBusReq2ReceiverMgr.py b/PyPort/NetMsgBusReq2ReceiverMgr.py index 7df8d32..bbde7bd 100755 --- a/PyPort/NetMsgBusReq2ReceiverMgr.py +++ b/PyPort/NetMsgBusReq2ReceiverMgr.py @@ -87,7 +87,7 @@ def ReceivePack(self): return rsp.data = self.read_buffer[rsp.HeadSize():rsp.data_len + rsp.HeadSize()] self.read_buffer = self.read_buffer[rsp.data_len + rsp.HeadSize():] - log.debug('reading receiver rsp : future_id :%d, data:%s ', rsp.sync_sid, rsp.data) + #log.debug('reading receiver rsp : future_id :%d, data:%s ', rsp.sync_sid, rsp.data) self.req2receivermgr.handle_channel_rsp(rsp.sync_sid, rsp.data) class TcpClientPool: @@ -150,7 +150,7 @@ def SendMsgDirectToClient(self, ipport_or_name, data, timeout): if self.GetCachedClient(ipport_or_name) is None: destclient = self.server_conn_mgr.ReqReceiverInfo(ipport_or_name) if destclient is None: - return False + return (False, None) with self.cache_lock: self.cached_client_info[ipport_or_name] = destclient; ipport_or_name = destclient diff --git a/PyPort/NetMsgBusTest.py b/PyPort/NetMsgBusTest.py index 6645084..1290e2a 100755 --- a/PyPort/NetMsgBusTest.py +++ b/PyPort/NetMsgBusTest.py @@ -24,6 +24,21 @@ def StaticOnMsg(msgid, msgparam): return test_localmsg_handler(msgid, msgparam) +class ConcurrentTest(threading.Thread): + def __init__(self): + threading.Thread.__init__(self) + + def run(self): + times = 10000 + while times > 0: + times = times - 1; + rsp = NetMsgBus.NetSyncGetData(('127.0.0.1', 9101), 'msg_netmsgbus_testmsg1', '{"testkey":11111, "testlongdata": "frompythondata"}', 3) + if rsp[0] and rsp[1]: + pass + #print 'sync get data from receiver: ' + rsp[1] + else: + print 'sync get data from receiver failed.' + LocalMsgBus.InitMsgBus() test_handler = TestHandler() @@ -93,7 +108,19 @@ def StaticOnMsg(msgid, msgparam): else: print 'async get data from receiver using name failed' +log.debug('begin ConcurrentTest') +concurrent_tests = [] +for i in range(10): + concurrent_tests.append(ConcurrentTest()) + concurrent_tests[i].daemon = True + +for i in range(10): + concurrent_tests[i].start() # wait for test receiver server, wait for data from other client. and test for long no active +for i in range(10): + concurrent_tests[i].join() + +log.debug('concurrent_tests finished.') log.debug('waiting 30s to quit ...') NetMsgBus.Wait(30) NetMsgBus.Destroy()