]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: on dups, reply if committed, or wait until ondisk
authorSage Weil <sage@newdream.net>
Sat, 24 Jul 2010 16:50:31 +0000 (09:50 -0700)
committerSage Weil <sage@newdream.net>
Sat, 24 Jul 2010 16:50:31 +0000 (09:50 -0700)
src/osd/PG.h
src/osd/ReplicatedPG.cc

index 28bade7ccca72cf267e81f2ede8a3c4805bdda92..d03da957eb690de88be5c7eaec45ab0ee5de0126 100644 (file)
@@ -778,6 +778,7 @@ public:
   list<class Message*>            waiting_for_active;
   hash_map<sobject_t, 
            list<class Message*> > waiting_for_missing_object, waiting_for_degraded_object;   
+  map<eversion_t,list<Message*> > waiting_for_ondisk;
   map<eversion_t,class MOSDOp*>   replay_queue;
 
   void take_object_waiters(hash_map<sobject_t, list<Message*> >& m);
@@ -975,10 +976,6 @@ public:
 
   void queue_snap_trim();
 
-  bool is_dup(osd_reqid_t rid) {
-    return log.logged_req(rid);
-  }
-
 
 
   // abstract bits
index da052bcd1e2c08f5bc95fc1e7f0305a653e63727..fe416b31bccee9d27dc8ee8447894f5a0a599a9c 100644 (file)
@@ -565,9 +565,18 @@ void ReplicatedPG::do_op(MOSDOp *op)
       return;
     }
 
-    if (is_dup(ctx->reqid)) {
-      dout(3) << "do_op dup " << ctx->reqid << ", doing WRNOOP" << dendl;
-      noop = true;
+    eversion_t oldv = log.get_request_version(ctx->reqid);
+    if (oldv != eversion_t()) {
+      dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl;
+      delete ctx;
+      put_object_context(obc);
+      if (oldv >= last_update_ondisk) {
+       osd->reply_op_error(op, 0);
+      } else {
+       dout(10) << " waiting for " << oldv << " to commit" << dendl;
+       waiting_for_ondisk[oldv].push_back(op);
+      }
+      return;
     }
 
     // version
@@ -2179,7 +2188,13 @@ void ReplicatedPG::op_commit(RepGather *repop)
     dout(10) << "op_commit " << *repop << dendl;
     repop->waitfor_disk.erase(osd->get_nodeid());
     //repop->waitfor_nvram.erase(osd->get_nodeid());
+
     last_update_ondisk = repop->v;
+    if (waiting_for_ondisk.count(repop->v)) {
+      osd->take_waiters(waiting_for_ondisk[repop->v]);
+      waiting_for_ondisk.erase(repop->v);
+    }
+
     last_complete_ondisk = repop->pg_local_last_complete;
     eval_repop(repop);
   }
@@ -3783,6 +3798,13 @@ void ReplicatedPG::on_role_change()
 {
   dout(10) << "on_role_change" << dendl;
 
+  // take commit waiters
+  for (map<eversion_t, list<Message*> >::iterator p = waiting_for_ondisk.begin();
+       p != waiting_for_ondisk.end();
+       p++)
+    osd->take_waiters(p->second);
+  waiting_for_ondisk.clear();
+
   // take object waiters
   take_object_waiters(waiting_for_missing_object);
   take_object_waiters(waiting_for_degraded_object);