]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
minor simplemessenger bugs
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 30 Jan 2007 20:52:22 +0000 (20:52 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 30 Jan 2007 20:52:22 +0000 (20:52 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1055 29311d96-e01e-0410-9327-a35deaab8ce9

branches/sage/cephmds2/msg/SimpleMessenger.cc

index 4b3e3944abe18f9da44686c69977ea9e857e0d75..10a937762c03bc99048aaa703ad45eb0883c8872 100644 (file)
@@ -117,8 +117,10 @@ void *Rank::Accepter::entry()
       dout(10) << "accepted incoming on sd " << sd << endl;
       
       rank.lock.Lock();
-      Pipe *p = new Pipe(sd);
-      rank.pipes.insert(p);
+      if (!rank.local.empty()) {
+       Pipe *p = new Pipe(sd);
+       rank.pipes.insert(p);
+      }
       rank.lock.Unlock();
     } else {
       dout(10) << "no incoming connection?" << endl;
@@ -271,6 +273,9 @@ void Rank::Pipe::close()
   // queue close message.
   if (socket_error) {
     dout(10) << "pipe(" << peer_inst << ' ' << this << ").close not queueing MSG_CLOSE, socket error" << endl;
+  } 
+  else if (!writer_running) {
+    dout(10) << "pipe(" << peer_inst << ' ' << this << ").close not queueing MSG_CLOSE, no writer running" << endl;  
   } else {
     dout(10) << "pipe(" << peer_inst << ' ' << this << ").close queueing MSG_CLOSE" << endl;
     lock.Lock();
@@ -943,6 +948,8 @@ void Rank::wait()
     if (local.empty()) {
       dout(10) << "wait: everything stopped" << endl;
       break;   // everything stopped.
+    } else {
+      dout(10) << "wait: local still has " << local.size() << " items, waiting" << endl;
     }
     
     wait_cond.Wait(lock);
@@ -951,6 +958,9 @@ void Rank::wait()
   
   // done!  clean up.
 
+  //dout(10) << "wait: stopping accepter thread" << endl;
+  //accepter.stop();
+
   // stop dispatch thread
   if (g_conf.ms_single_dispatch) {
     dout(10) << "wait: stopping dispatch thread" << endl;
@@ -1018,6 +1028,11 @@ void Rank::EntityMessenger::dispatch_entry()
       {
         // deliver
         while (!ls.empty()) {
+         if (stop) {
+           dout(1) << "dispatch: stop=true, discarding " << ls.size() 
+                   << " messages in dispatch queue" << endl;
+           break;
+         }
           Message *m = ls.front();
           ls.pop_front();
           dout(1) << m->get_dest() 
@@ -1034,6 +1049,9 @@ void Rank::EntityMessenger::dispatch_entry()
     cond.Wait(lock);
   }
   lock.Unlock();
+
+  // deregister
+  rank.unregister_entity(this);
 }
 
 void Rank::EntityMessenger::ready()
@@ -1060,9 +1078,6 @@ int Rank::EntityMessenger::shutdown()
 {
   dout(10) << "shutdown " << get_myaddr() << endl;
   
-  // deregister
-  rank.unregister_entity(this);
-  
   // stop my dispatch thread
   if (dispatch_thread.am_self()) {
     dout(1) << "shutdown i am dispatch, setting stop flag" << endl;