]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pipe close bugfix
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 17 Feb 2007 19:02:38 +0000 (19:02 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 17 Feb 2007 19:02:38 +0000 (19:02 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1102 29311d96-e01e-0410-9327-a35deaab8ce9

branches/sage/cephmds2/msg/SimpleMessenger.cc
branches/sage/cephmds2/msg/SimpleMessenger.h

index c9cef7239e4cb3386a31c62fb2f38bf6ab66c7b9..1c9fd7ebca4a6f4e41fb2f508b7c5104673904b8 100644 (file)
@@ -279,10 +279,6 @@ int Rank::Pipe::connect()
 
 void Rank::Pipe::close()
 {
-  if (sent_close) {
-    dout(10) << "pipe(" << peer_addr << ' ' << this << ").close already closing" << endl;
-    return;
-  }
   dout(10) << "pipe(" << peer_addr << ' ' << this << ").close" << endl;
 
   // unreg ourselves
@@ -290,24 +286,30 @@ void Rank::Pipe::close()
   {
     if (rank.rank_pipe.count(peer_addr) &&
         rank.rank_pipe[peer_addr] == this) {
-      dout(10) << "pipe(" << peer_addr << ' ' << this << ").close unregistering pipe" << endl;
+      dout(10) << "pipe(" << peer_addr << ' ' << this
+              << ").close unregistering pipe" << endl;
       rank.rank_pipe.erase(peer_addr);
     }
   }
   rank.lock.Unlock();
 
-  // queue close message.
-  if (socket_error) {
-    dout(10) << "pipe(" << peer_addr << ' ' << this << ").close not queueing MSG_CLOSE, socket error" << endl;
-  } 
-  else if (!writer_running) {
-    dout(10) << "pipe(" << peer_addr << ' ' << this << ").close not queueing MSG_CLOSE, no writer running" << endl;  
+  // queue close message?
+  if (!need_to_send_close) {
+    dout(10) << "pipe(" << peer_addr << ' ' << this
+            << ").close already closing/closed" << endl;
+    return;
+  }
+  
+  if (!writer_running) {
+    dout(10) << "pipe(" << peer_addr << ' ' << this
+            << ").close not queueing MSG_CLOSE, no writer running" << endl;  
   } else {
-    dout(10) << "pipe(" << peer_addr << ' ' << this << ").close queueing MSG_CLOSE" << endl;
+    dout(10) << "pipe(" << peer_addr << ' ' << this
+            << ").close queueing MSG_CLOSE" << endl;
     lock.Lock();
     q.push_back(new MGenericMessage(MSG_CLOSE));
     cond.Signal();
-    sent_close = true;
+    need_to_send_close = false;
     lock.Unlock();  
   }
 }
@@ -329,12 +331,12 @@ void Rank::Pipe::reader()
       if (m) {
        delete m;
        dout(10) << "pipe(" << peer_addr << ' ' << this << ").reader read MSG_CLOSE message" << endl;
+       need_to_send_close = false;
       } else {
        derr(10) << "pipe(" << peer_addr << ' ' << this << ").reader read null message" << endl;
       }
 
-      if (!sent_close)
-       close();
+      close();
 
       done = true;
       cond.Signal();  // wake up writer too.
@@ -500,7 +502,7 @@ Message *Rank::Pipe::read_message()
   
   msg_envelope_t env; 
   if (!tcp_read( sd, (char*)&env, sizeof(env) )) {
-    socket_error = true;
+    need_to_send_close = false;
     return 0;
   }
   
@@ -514,7 +516,7 @@ Message *Rank::Pipe::read_message()
   for (int i=0; i<env.nchunks; i++) {
     int size;
     if (!tcp_read( sd, (char*)&size, sizeof(size) )) {
-      socket_error = true;
+      need_to_send_close = false;
       return 0;
     }
     
@@ -523,7 +525,7 @@ Message *Rank::Pipe::read_message()
     bufferptr bp(size);
     
     if (!tcp_read( sd, bp.c_str(), size )) {
-      socket_error = true;
+      need_to_send_close = false;
       return 0;
     }
     
@@ -567,7 +569,7 @@ int Rank::Pipe::write_message(Message *m)
   if (r < 0) { 
     derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error sending envelope for " << *m
              << " to " << m->get_dest() << endl; 
-    socket_error = true;
+    need_to_send_close = false;
     return -1;
   }
 
@@ -583,13 +585,13 @@ int Rank::Pipe::write_message(Message *m)
     r = tcp_write( sd, (char*)&size, sizeof(size) );
     if (r < 0) { 
       derr(10) << "pipe(" << peer_addr << ' ' << this << ").writer error sending chunk len for " << *m << " to " << m->get_dest() << endl; 
-      socket_error = true;
+      need_to_send_close = false;
       return -1;
     }
     r = tcp_write( sd, (*it).c_str(), size );
     if (r < 0) { 
       derr(10) << "pipe(" << peer_addr << ' ' << this << ").writer error sending data chunk for " << *m << " to " << m->get_dest() << endl; 
-      socket_error = true;
+      need_to_send_close = false;
       return -1;
     }
     i++;
@@ -600,7 +602,7 @@ int Rank::Pipe::write_message(Message *m)
   r = tcp_write( sd, (char*)&size, sizeof(size) );
   if (r < 0) { 
     derr(10) << "pipe(" << peer_addr << ' ' << this << ").writer error sending data len for " << *m << " to " << m->get_dest() << endl; 
-    socket_error = true;
+    need_to_send_close = false;
     return -1;
   }
   dout(20) << "pipe(" << peer_addr << ' ' << this << ").writer data len is " << size << " in " << blist.buffers().size() << " buffers" << endl;
@@ -612,7 +614,7 @@ int Rank::Pipe::write_message(Message *m)
     r = tcp_write( sd, (char*)(*it).c_str(), (*it).length() );
     if (r < 0) { 
       derr(10) << "pipe(" << peer_addr << ' ' << this << ").writer error sending data megachunk for " << *m << " to " << m->get_dest() << " : len " << (*it).length() << endl; 
-      socket_error = true;
+      need_to_send_close = false;
       return -1;
     }
   }
index fb8e3dcee9fe13181dac85ecc096aaeb4beaecf7..e1265423edb1344d61475c57a82a9c218a39b1b0 100644 (file)
@@ -74,8 +74,7 @@ private:
     bool done;
     entity_addr_t peer_addr;
     bool server;
-    bool sent_close;
-    bool socket_error;
+    bool need_to_send_close;
 
     bool reader_running;
     bool writer_running;
@@ -109,11 +108,11 @@ private:
       void *entry() { pipe->writer(); return 0; }
     } writer_thread;
     friend class Writer;
-
+    
   public:
     Pipe(int s) : sd(s),
                  done(false), server(true), 
-                 sent_close(false), socket_error(false),
+                 need_to_send_close(true),
                  reader_running(false), writer_running(false),
                  reader_thread(this), writer_thread(this) {
       // server
@@ -121,10 +120,10 @@ private:
       reader_thread.create();
     }
     Pipe(const entity_addr_t &pi) : sd(0),
-      done(false), peer_addr(pi), server(false), 
-      sent_close(false),
-      reader_running(false), writer_running(false),
-      reader_thread(this), writer_thread(this) {
+                                   done(false), peer_addr(pi), server(false), 
+                                   need_to_send_close(true),
+                                   reader_running(false), writer_running(false),
+                                   reader_thread(this), writer_thread(this) {
       // client
       writer_running = true;
       writer_thread.create();