]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: preserve write order when waiting on src_oids
authorSage Weil <sage@newdream.net>
Thu, 15 Dec 2011 02:41:10 +0000 (18:41 -0800)
committerSage Weil <sage@newdream.net>
Thu, 15 Dec 2011 03:08:27 +0000 (19:08 -0800)
We need to preserve the order of write operations on each object.  If we
have a write on X that needs to read from Y, and Y is degraded, then we
need to wait for Y to repair.  Doing so blindly will allow other writes
to X to proceed while our clone op is still waiting, violating the
ordering.

Fix this by adding blocked_by and blocking vars to the ObjectContext.  If
we wait on a src_oid, the oid is "blocked" by that object, and any
subsequent writes should also wait on the same queue.

Use a helper to do the cleanup when we complete recovery, or when the
pg resets.

Signed-off-by: Sage Weil <sage@newdream.net>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index dc0892fde4a8b2b71c576cf85d75de6db66051b7..19ad90d6fdbf67085d90232a102c944e5cab0e96 100644 (file)
@@ -519,6 +519,15 @@ void ReplicatedPG::do_op(MOSDOp *op)
 
   dout(10) << "do_op mode now " << mode << dendl;
 
+  // are writes blocked by another object?
+  if (obc->blocked_by) {
+    dout(10) << "do_op writes for " << obc->obs.oi.soid << " blocked by "
+            << obc->blocked_by->obs.oi.soid << dendl;
+    wait_for_degraded_object(obc->blocked_by->obs.oi.soid, op);
+    put_object_context(obc);
+    return;
+  }
+
   // src_oids
   map<hobject_t,ObjectContext*> src_obc;
   for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
@@ -543,6 +552,10 @@ void ReplicatedPG::do_op(MOSDOp *op)
          osd->reply_op_error(op, r);
        } else if (is_degraded_object(sobc->obs.oi.soid)) { 
          wait_for_degraded_object(sobc->obs.oi.soid, op);
+         dout(10) << " writes for " << obc->obs.oi.soid << " now blocked by "
+                  << sobc->obs.oi.soid << dendl; 
+         obc->blocked_by = sobc;
+         sobc->blocking.insert(obc);
        } else if (sobc->obs.oi.oloc.key != obc->obs.oi.oloc.key &&
                   sobc->obs.oi.oloc.key != obc->obs.oi.soid.oid.name &&
                   sobc->obs.oi.soid.oid.name != obc->obs.oi.oloc.key) {
@@ -4181,11 +4194,7 @@ void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
          osd->requeue_ops(this, waiting_for_degraded_object[soid]);
          waiting_for_degraded_object.erase(soid);
        }
-       map<hobject_t, ObjectContext *>::iterator i =
-         object_contexts.find(soid);
-       if (i != object_contexts.end()) {
-         populate_obc_watchers(i->second);
-       }
+       finish_degraded_object(soid);
       } else {
        dout(10) << "pushed " << soid << ", still waiting for push ack from " 
                 << pushing[soid].size() << " others" << dendl;
@@ -4195,6 +4204,20 @@ void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
   reply->put();
 }
 
+void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
+{
+  dout(10) << "finish_degraded_object " << oid << dendl;
+  map<hobject_t, ObjectContext *>::iterator i = object_contexts.find(oid);
+  if (i != object_contexts.end()) {
+    populate_obc_watchers(i->second);
+    for (set<ObjectContext*>::iterator j = i->second->blocking.begin();
+        j != i->second->blocking.end();
+        i->second->blocking.erase(j++)) {
+      dout(10) << " no longer blocking writes for " << (*j)->obs.oi.soid << dendl;
+      (*j)->blocked_by = NULL;
+    }
+  }
+}
 
 /** op_pull
  * process request to pull an entire object.
@@ -4942,7 +4965,12 @@ void ReplicatedPG::on_change()
 
   // take object waiters
   requeue_object_waiters(waiting_for_missing_object);
-  requeue_object_waiters(waiting_for_degraded_object);
+  for (map<hobject_t,list<Message*> >::iterator p = waiting_for_degraded_object.begin();
+       p != waiting_for_degraded_object.end();
+       waiting_for_degraded_object.erase(p++)) {
+    osd->requeue_ops(this, p->second);
+    finish_degraded_object(p->first);
+  }
   osd->requeue_ops(this, waiting_for_all_missing);
   waiting_for_all_missing.clear();
 
index 1f0d83d93573219be2bfe4f7d2d778d119846ec8..f9eda15ffcc9d2621dbd3016e74022797ddf4069 100644 (file)
@@ -271,6 +271,10 @@ public:
     Cond cond;
     int unstable_writes, readers, writers_waiting, readers_waiting;
 
+    // set if writes for this object are blocked on another objects recovery
+    ObjectContext *blocked_by;      // object blocking our writes
+    set<ObjectContext*> blocking;   // objects whose writes we block
+
     // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
     map<entity_name_t, OSD::Session *> watchers;
     map<entity_name_t, Context *> unconnected_watchers;
@@ -584,6 +588,8 @@ protected:
                   map<hobject_t, interval_set<uint64_t> >& clone_subsets);
   void send_push_op_blank(const hobject_t& soid, int peer);
 
+  void finish_degraded_object(const hobject_t& oid);
+
   // Cancels/resets pulls from peer
   void check_recovery_op_pulls(const OSDMapRef map);
   int pull(const hobject_t& oid, eversion_t v);