]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: don't close close_on_empty until outgoing messages are acked
authorSage Weil <sage.weil@dreamhost.com>
Fri, 20 May 2011 21:43:57 +0000 (14:43 -0700)
committerSage Weil <sage.weil@dreamhost.com>
Fri, 20 May 2011 22:15:12 +0000 (15:15 -0700)
Otherwise, if we close the socket, we may lose in-flight data.

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 5734555a09097c7878a86f3f639dcf801e892882..a96363ee8ab5c7145e136a3377084e8d40677e55 100644 (file)
@@ -1714,7 +1714,7 @@ void SimpleMessenger::Pipe::writer()
       Message *m = _get_next_outgoing();
       if (m) {
        m->set_seq(++out_seq);
-       if (!policy.lossy) {
+       if (!policy.lossy || close_on_empty) {
          // put on sent list
          sent.push_back(m); 
          m->get();
@@ -1743,10 +1743,9 @@ void SimpleMessenger::Pipe::writer()
       continue;
     }
     
-    if (close_on_empty) {
+    if (sent.empty() && close_on_empty) {
       // this is slightly hacky
-      dout(10) << "writer queue empty, closing" << dendl;
-      policy.lossy = true;
+      dout(10) << "writer out and sent queues empty, closing" << dendl;
       fault();
       continue;
     }
index fcf72043dfb73c668b9d80538b361e44957975b2..228ff7321ed629125f92461a9cc7bb631268f3d0 100644 (file)
@@ -187,6 +187,13 @@ private:
             << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
         m->put();
       }
+
+      if (sent.empty() && close_on_empty) {
+       // this is slightly hacky
+       dout(10) << "reader got last ack, queue empty, closing" << dendl;
+       policy.lossy = true;
+       fault();
+      }
     }
 
     // threads