]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 16 Jun 2005 16:01:22 +0000 (16:01 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 16 Jun 2005 16:01:22 +0000 (16:01 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@326 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/msg/TCPMessenger.cc

index dd66b5858be967e2bc02bc17d199f69b6f7684d3..d9b74424a5246e41fe0fb867f1aa66a12198a7b2 100644 (file)
@@ -41,6 +41,9 @@ hash_map<int, TCPMessenger*>  directory;  // local
 list<Message*>                incoming;
 Mutex                         incoming_lock;
 Cond                          incoming_cond;
+list<Message*>                outgoing;
+Mutex                         outgoing_lock;
+Cond                          outgoing_cond;
 
 struct sockaddr_in *remote_addr;
 int                *in_sd;     // incoming sockets
@@ -58,6 +61,7 @@ int mpi_rank;
 bool tcp_done = false;     // set this flag to stop the event loop
 
 pthread_t dispatch_thread_id = 0;   // thread id of the event loop.  init value == nobody
+pthread_t out_thread_id = 0;   // thread id of the event loop.  init value == nobody
 pthread_t listen_thread_id = 0;
 Mutex sender_lock;
 
@@ -460,6 +464,23 @@ int tcp_send(Message *m)
 
 // recv event loop, for unsolicited messages.
 
+void* tcp_sendthread(void*)
+{
+  outgoing_lock.Lock();
+  while (!tcp_done) {
+       
+       while (outgoing.size()) {
+         Message *m = outgoing.front();
+         outgoing.pop_front();
+         tcp_send(m);
+       }
+
+       outgoing_cond.Wait(outgoing_lock);
+       
+  }
+  outgoing_lock.Unlock();
+}
+
 void* tcpmessenger_loop(void*)
 {
   dout(5) << "tcpmessenger_loop start pid " << getpid() << endl;
@@ -490,6 +511,8 @@ void* tcpmessenger_loop(void*)
        while (incoming.size()) {
          Message *m = incoming.front();
          incoming.pop_front();
+
+         incoming_lock.Unlock();
          
          int dest = m->get_dest();
          if (directory.count(dest)) {
@@ -507,6 +530,8 @@ void* tcpmessenger_loop(void*)
                assert(0);
                break;
          }
+
+         incoming_lock.Lock();
        }
   }
 
@@ -535,6 +560,13 @@ int tcpmessenger_start()
                                 tcpmessenger_loop, 
                                 0);
 
+
+  dout(5) << "starting outgoing thread" << endl;
+  pthread_create(&out_thread_id, 
+                                NULL, 
+                                tcp_sendthread,
+                                0);
+
 }
 
 
@@ -642,6 +674,7 @@ int TCPMessenger::shutdown()
 
        tcp_done = true;
        incoming_cond.Signal();
+       outgoing_cond.Signal();
        /*
 
        dout(15) << "whoami = " << whoami << ", thread = " << dispatch_thread_id << endl;
@@ -680,7 +713,14 @@ int TCPMessenger::send_message(Message *m, msg_addr_t dest, int port, int frompo
   m->set_source(myaddr, fromport);
   m->set_dest(dest, port);
 
-  tcp_send(m);
+  if (0) {
+       tcp_send(m);
+  } else {
+       outgoing_lock.Lock();
+       outgoing.push_back(m);
+       outgoing_lock.Unlock();
+       outgoing_cond.Signal();
+  }
 }