]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: fix replay op ordering
authorSamuel Just <sam.just@inktank.com>
Mon, 9 Jul 2012 22:53:31 +0000 (15:53 -0700)
committerSage Weil <sage@inktank.com>
Thu, 26 Jul 2012 22:03:35 +0000 (15:03 -0700)
After a client reconnect, the client replays outstanding ops.  The
OSD then immediately responds with success if the op has already
committed (version < ReplicatedPG::get_first_in_progress).
Otherwise, we stick it in waiting_for_ondisk to be replied to when
eval_repop concludes that waitfor_disk is empty.

Fixes #2508

Signed-off-by: Samuel Just <sam.just@inktank.com>
Conflicts:

src/osd/ReplicatedPG.cc

src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index fa6527433b7a13dae7a47a5ca0b2332d765ae3c1..58c197d7f859b6d81224f84acf73233c881bca8b 100644 (file)
@@ -845,7 +845,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
       delete ctx;
       put_object_context(obc);
       put_object_contexts(src_obc);
-      if (oldv <= last_update_ondisk) {
+      if (already_complete(oldv)) {
        osd->reply_op_error(op, 0, oldv);
       } else {
        dout(10) << " waiting for " << oldv << " to commit" << dendl;
@@ -3457,10 +3457,6 @@ void ReplicatedPG::op_commit(RepGather *repop)
     repop->waitfor_ack.erase(whoami);
     
     last_update_ondisk = repop->v;
-    if (waiting_for_ondisk.count(repop->v)) {
-      osd->requeue_ops(this, waiting_for_ondisk[repop->v]);
-      waiting_for_ondisk.erase(repop->v);
-    }
 
     last_complete_ondisk = repop->pg_local_last_complete;
     eval_repop(repop);
@@ -3509,6 +3505,16 @@ void ReplicatedPG::eval_repop(RepGather *repop)
       log_op_stats(repop->ctx);
       update_stats();
 
+      if (waiting_for_ondisk.count(repop->v)) {
+       assert(waiting_for_ondisk.begin()->first == repop->v);
+       for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
+            i != waiting_for_ondisk[repop->v].end();
+            ++i) {
+         osd->reply_op_error(*i, 0, repop->v);
+       }
+       waiting_for_ondisk.erase(repop->v);
+      }
+
       if (m->wants_ondisk() && !repop->sent_disk) {
        // send commit.
        MOSDOpReply *reply = repop->ctx->reply;
index f09d5d05a4ce9fc05e6da7e88edd43b8fffed5dd..6387cf889cba942af9db77f44434bf3c94f2258a 100644 (file)
@@ -470,6 +470,17 @@ protected:
   // replica ops
   // [primary|tail]
   xlist<RepGather*> repop_queue;
+  bool already_complete(eversion_t v) {
+    for (xlist<RepGather*>::iterator i = repop_queue.begin();
+        !i.end();
+        ++i) {
+      if ((*i)->v > v)
+        break;
+      if (!(*i)->waitfor_disk.empty())
+       return false;
+    }
+    return true;
+  }
   map<tid_t, RepGather*> repop_map;
 
   void apply_repop(RepGather *repop);