From: sage Date: Thu, 16 Jun 2005 16:01:22 +0000 (+0000) Subject: *** empty log message *** X-Git-Tag: v0.1~2055 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=478b50d7a39b4a1d5b2ee0f52c258a1bbb496497;p=ceph.git *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@326 29311d96-e01e-0410-9327-a35deaab8ce9 --- diff --git a/ceph/msg/TCPMessenger.cc b/ceph/msg/TCPMessenger.cc index dd66b5858be96..d9b74424a5246 100644 --- a/ceph/msg/TCPMessenger.cc +++ b/ceph/msg/TCPMessenger.cc @@ -41,6 +41,9 @@ hash_map directory; // local list incoming; Mutex incoming_lock; Cond incoming_cond; +list outgoing; +Mutex outgoing_lock; +Cond outgoing_cond; struct sockaddr_in *remote_addr; int *in_sd; // incoming sockets @@ -58,6 +61,7 @@ int mpi_rank; bool tcp_done = false; // set this flag to stop the event loop pthread_t dispatch_thread_id = 0; // thread id of the event loop. init value == nobody +pthread_t out_thread_id = 0; // thread id of the event loop. init value == nobody pthread_t listen_thread_id = 0; Mutex sender_lock; @@ -460,6 +464,23 @@ int tcp_send(Message *m) // recv event loop, for unsolicited messages. +void* tcp_sendthread(void*) +{ + outgoing_lock.Lock(); + while (!tcp_done) { + + while (outgoing.size()) { + Message *m = outgoing.front(); + outgoing.pop_front(); + tcp_send(m); + } + + outgoing_cond.Wait(outgoing_lock); + + } + outgoing_lock.Unlock(); +} + void* tcpmessenger_loop(void*) { dout(5) << "tcpmessenger_loop start pid " << getpid() << endl; @@ -490,6 +511,8 @@ void* tcpmessenger_loop(void*) while (incoming.size()) { Message *m = incoming.front(); incoming.pop_front(); + + incoming_lock.Unlock(); int dest = m->get_dest(); if (directory.count(dest)) { @@ -507,6 +530,8 @@ void* tcpmessenger_loop(void*) assert(0); break; } + + incoming_lock.Lock(); } } @@ -535,6 +560,13 @@ int tcpmessenger_start() tcpmessenger_loop, 0); + + dout(5) << "starting outgoing thread" << endl; + pthread_create(&out_thread_id, + NULL, + tcp_sendthread, + 0); + } @@ -642,6 +674,7 @@ int TCPMessenger::shutdown() tcp_done = true; incoming_cond.Signal(); + outgoing_cond.Signal(); /* dout(15) << "whoami = " << whoami << ", thread = " << dispatch_thread_id << endl; @@ -680,7 +713,14 @@ int TCPMessenger::send_message(Message *m, msg_addr_t dest, int port, int frompo m->set_source(myaddr, fromport); m->set_dest(dest, port); - tcp_send(m); + if (0) { + tcp_send(m); + } else { + outgoing_lock.Lock(); + outgoing.push_back(m); + outgoing_lock.Unlock(); + outgoing_cond.Signal(); + } }