]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: fix ACK ordering on resent ops
authorSage Weil <sage@inktank.com>
Mon, 23 Jul 2012 23:51:03 +0000 (16:51 -0700)
committerSage Weil <sage@inktank.com>
Mon, 23 Jul 2012 23:51:03 +0000 (16:51 -0700)
The wait_for_ondisk handling fixed COMMIT ordering, but the ACKs need to
go back in the same order too.  For example:

 - op A is queued
 - client disconnects, both ACK and COMMIT replies are lost
 - client reconnects
 - op A and B are sent
 - op A is queued
 - op B is applied, ACK is sent
 - op A and B COMMITs are sent
 -> client's ack callbacks will see B and then A.

Fix this by creating a waiting_for_ack queue as well, and sending ACK
responses as needed.  Also handle the case where the ACK should be sent
immediately when the retry event is received.

Fixes: #2823
Signed-off-by: Sage Weil <sage@inktank.com>
Reviewed-by: Mike Ryan <mike.ryan@inktank.com>
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 1f7bb6285c7844032e3f441ea17a2b6b452e2dcb..53358215de7c58c4d60e4c2042615666fa896ddc 100644 (file)
@@ -627,7 +627,7 @@ protected:
   list<OpRequestRef>            waiting_for_all_missing;
   map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
                                         waiting_for_degraded_object;
-  map<eversion_t,list<OpRequestRef> > waiting_for_ondisk;
+  map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
   map<eversion_t,OpRequestRef>   replay_queue;
 
   void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m);
index 16af52ea0229e200c9fedfee07142a6b48acfd6f..79efb7cff81c5961898f40c6a122c8ae8d3b875a 100644 (file)
@@ -849,8 +849,18 @@ void ReplicatedPG::do_op(OpRequestRef op)
       if (already_complete(oldv)) {
        osd->reply_op_error(op, 0, oldv);
       } else {
+       if (m->wants_ack()) {
+         if (already_ack(oldv)) {
+           MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
+           reply->add_flags(CEPH_OSD_FLAG_ACK);
+           osd->client_messenger->send_message(reply, m->get_connection());
+         } else {
+           dout(10) << " waiting for " << oldv << " to ack" << dendl;
+           waiting_for_ack[oldv].push_back(op);
+         }
+       }
        dout(10) << " waiting for " << oldv << " to commit" << dendl;
-       waiting_for_ondisk[oldv].push_back(op);
+       waiting_for_ondisk[oldv].push_back(op);  // always queue ondisk waiters, so that we can requeue if needed
        op->mark_delayed();
       }
       return;
@@ -3533,6 +3543,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
       log_op_stats(repop->ctx);
       update_stats();
 
+      // send dup commits, in order
       if (waiting_for_ondisk.count(repop->v)) {
        assert(waiting_for_ondisk.begin()->first == repop->v);
        for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
@@ -3560,6 +3571,21 @@ void ReplicatedPG::eval_repop(RepGather *repop)
 
     // applied?
     if (repop->waitfor_ack.empty()) {
+
+      // send dup acks, in order
+      if (waiting_for_ack.count(repop->v)) {
+       assert(waiting_for_ack.begin()->first == repop->v);
+       for (list<OpRequestRef>::iterator i = waiting_for_ack[repop->v].begin();
+            i != waiting_for_ack[repop->v].end();
+            ++i) {
+         MOSDOp *m = (MOSDOp*)(*i)->request;
+         MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
+         reply->add_flags(CEPH_OSD_FLAG_ACK);
+         osd->client_messenger->send_message(reply, m->get_connection());
+       }
+       waiting_for_ack.erase(repop->v);
+      }
+
       if (m->wants_ack() && !repop->sent_ack && !repop->sent_disk) {
        // send ack
        MOSDOpReply *reply = repop->ctx->reply;
@@ -5785,6 +5811,7 @@ void ReplicatedPG::on_role_change()
        p++)
     requeue_ops(p->second);
   waiting_for_ondisk.clear();
+  waiting_for_ack.clear();
 }
 
 
index fb9861b04057fa673923c3b4b391f55be34a3f08..a96f62df01e6e8c178cbd365d7578be76ba75e64 100644 (file)
@@ -470,17 +470,6 @@ protected:
   // replica ops
   // [primary|tail]
   xlist<RepGather*> repop_queue;
-  bool already_complete(eversion_t v) {
-    for (xlist<RepGather*>::iterator i = repop_queue.begin();
-        !i.end();
-        ++i) {
-      if ((*i)->v > v)
-        break;
-      if (!(*i)->waitfor_disk.empty())
-       return false;
-    }
-    return true;
-  }
   map<tid_t, RepGather*> repop_map;
 
   void apply_repop(RepGather *repop);
@@ -495,6 +484,31 @@ protected:
                  int result, int ack_type,
                  int fromosd, eversion_t pg_complete_thru=eversion_t(0,0));
 
+  /// true if we can send an ondisk/commit for v
+  bool already_complete(eversion_t v) {
+    for (xlist<RepGather*>::iterator i = repop_queue.begin();
+        !i.end();
+        ++i) {
+      if ((*i)->v > v)
+        break;
+      if (!(*i)->waitfor_disk.empty())
+       return false;
+    }
+    return true;
+  }
+  /// true if we can send an ack for v
+  bool already_ack(eversion_t v) {
+    for (xlist<RepGather*>::iterator i = repop_queue.begin();
+        !i.end();
+        ++i) {
+      if ((*i)->v > v)
+        break;
+      if (!(*i)->waitfor_ack.empty())
+       return false;
+    }
+    return true;
+  }
+
   friend class C_OSD_OpCommit;
   friend class C_OSD_OpApplied;