]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: add infrastructure to block io on an obc
authorSage Weil <sage@inktank.com>
Wed, 11 Sep 2013 22:10:47 +0000 (15:10 -0700)
committerSage Weil <sage@inktank.com>
Tue, 17 Sep 2013 18:06:27 +0000 (11:06 -0700)
Add an is_blocked() method for the obc, and add infrastructure to block
any operations if it returns true.  Clean up on_change(), and add a helper
to kick an obc when whatever condition leading to it being blocked is no
longer true.

For now, is_blocked() is always false...

Signed-off-by: Sage Weil <sage@inktank.com>
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.h

index cbafd0f43d90bb64be4c673c77dbd2d5dfe3bd29..cdbe827a4a96e27ac9a2711dae28c33e9cf6ac71 100644 (file)
@@ -523,7 +523,8 @@ protected:
   list<OpRequestRef>            waiting_for_active;
   list<OpRequestRef>            waiting_for_all_missing;
   map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
-                                        waiting_for_degraded_object;
+                            waiting_for_degraded_object,
+                            waiting_for_blocked_object;
   // Callbacks should assume pg (and nothing else) is locked
   map<hobject_t, list<Context*> > callbacks_for_degraded_object;
   map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
index c42825259b6d587aa66229a2864f6bed9e0a8eaf..c9f4cb624deabe2daaf49e271409b22cfd5ffaa5 100644 (file)
@@ -192,6 +192,13 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef
   op->mark_delayed("waiting for degraded object");
 }
 
+void ReplicatedPG::wait_for_blocked_object(const hobject_t& soid, OpRequestRef op)
+{
+  dout(10) << __func__ << " " << soid << " " << op << dendl;
+  waiting_for_blocked_object[soid].push_back(op);
+  op->mark_delayed("waiting for blocked object");
+}
+
 void ReplicatedPG::wait_for_backfill_pos(OpRequestRef op)
 {
   waiting_for_backfill_pos.push_back(op);
@@ -735,7 +742,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
     osd->reply_op_error(op, r);
     return;
   }
-  
+
   // make sure locator is consistent
   object_locator_t oloc(obc->obs.oi.soid);
   if (m->get_object_locator() != oloc) {
@@ -746,6 +753,12 @@ void ReplicatedPG::do_op(OpRequestRef op)
                     << " op " << *m << "\n";
   }
 
+  // io blocked on obc?
+  if (obc->is_blocked()) {
+    wait_for_blocked_object(obc->obs.oi.soid, op);
+    return;
+  }
+
   if ((op->may_read()) && (obc->obs.oi.lost)) {
     // This object is lost. Reading from it returns an error.
     dout(20) << __func__ << ": object " << obc->obs.oi.soid
@@ -5140,6 +5153,24 @@ void ReplicatedPG::add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t
     pgstat->stats.cat_sum[oi.category].add(stat);
 }
 
+void ReplicatedPG::kick_object_context_blocked(ObjectContextRef obc)
+{
+  const hobject_t& soid = obc->obs.oi.soid;
+  map<hobject_t, list<OpRequestRef> >::iterator p = waiting_for_blocked_object.find(soid);
+  if (p == waiting_for_blocked_object.end())
+    return;
+
+  if (obc->is_blocked()) {
+    dout(10) << __func__ << " " << soid << " still blocked" << dendl;
+    return;
+  }
+
+  list<OpRequestRef>& ls = waiting_for_blocked_object[soid];
+  dout(10) << __func__ << " " << soid << " requeuing " << ls.size() << " requests" << dendl;
+  requeue_ops(ls);
+  waiting_for_blocked_object.erase(soid);
+}
+
 SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid)
 {
   Mutex::Locker l(snapset_contexts_lock);
@@ -7095,6 +7126,14 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
       p->second.clear();
     finish_degraded_object(p->first);
   }
+  for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_blocked_object.begin();
+       p != waiting_for_blocked_object.end();
+       waiting_for_blocked_object.erase(p++)) {
+    if (is_primary())
+      requeue_ops(p->second);
+    else
+      p->second.clear();
+  }
 
   if (is_primary())
     requeue_ops(waiting_for_all_missing);
index 80ee9cf8d298e6e397a7ee6d4ad727effa9f0d9c..a58e10707390684a947918da06ec2e91a6acef80 100644 (file)
@@ -936,6 +936,9 @@ public:
   bool is_degraded_object(const hobject_t& oid);
   void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
 
+  void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
+  void kick_object_context_blocked(ObjectContextRef obc);
+
   void mark_all_unfound_lost(int what);
   eversion_t pick_newest_available(const hobject_t& oid);
   ObjectContextRef mark_object_lost(ObjectStore::Transaction *t,
index da139b853b16e567e87b6d93fcd3e3884772e39e..45937a91dd898e50a9cca75777630aa9e891d6a9 100644 (file)
@@ -2151,6 +2151,11 @@ public:
     if (destructor_callback)
       destructor_callback->complete(0);
   }
+
+  bool is_blocked() const {
+    return false;
+  }
+
   // do simple synchronous mutual exclusion, for now.  now waitqueues or anything fancy.
   void ondisk_write_lock() {
     lock.Lock();