]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG,osd_types: move rw tracking from its own map to ObjectContext
authorSamuel Just <sam.just@inktank.com>
Sat, 26 Oct 2013 23:52:16 +0000 (16:52 -0700)
committerSamuel Just <sam.just@inktank.com>
Mon, 28 Oct 2013 20:32:56 +0000 (13:32 -0700)
We also modify recovering to hold a reference to the recovering obc
in order to ensure that our backfill_read_lock doesn't outlive the
obc.

ReplicatedPG::op_applied no longer clears repop->obc since we need
it to live until the op is finally cleaned up.  This is fine since
repop->obc is now an ObjectContextRef and can clean itself up.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.h

index fc9ce53d27d0cac4e42262d2541ad4341b874761..3aaf912d7e0583e15a38f94b27f8c7a63e83d9c4 100644 (file)
@@ -15,6 +15,7 @@
  * 
  */
 
+#include "boost/tuple/tuple.hpp"
 #include "PG.h"
 #include "ReplicatedPG.h"
 #include "OSD.h"
@@ -202,8 +203,9 @@ void ReplicatedPG::on_global_recover(
 {
   publish_stats_to_osd();
   dout(10) << "pushed " << soid << " to all replicas" << dendl;
-  assert(recovering.count(soid));
-  recovering.erase(soid);
+  map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
+  assert(i != recovering.end());
+  recovering.erase(i);
   finish_recovery_op(soid);
   if (waiting_for_degraded_object.count(soid)) {
     requeue_ops(waiting_for_degraded_object[soid]);
@@ -223,10 +225,12 @@ void ReplicatedPG::on_peer_recover(
   // done!
   peer_missing[peer].got(soid, recovery_info.version);
   if (peer == backfill_target && backfills_in_flight.count(soid)) {
-    backfills_in_flight.erase(soid);
+    map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
+    assert(i != recovering.end());
     list<OpRequestRef> requeue_list;
-    rw_manager.drop_backfill_read(soid, &requeue_list);
+    i->second->drop_backfill_read(&requeue_list);
     requeue_ops(requeue_list);
+    backfills_in_flight.erase(soid);
   }
 }
 
@@ -275,7 +279,7 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef o
   assert(g != missing.missing.end());
   const eversion_t &v(g->second.need);
 
-  set<hobject_t>::const_iterator p = recovering.find(soid);
+  map<hobject_t, ObjectContextRef>::const_iterator p = recovering.find(soid);
   if (p != recovering.end()) {
     dout(7) << "missing " << soid << " v " << v << ", already recovering." << dendl;
   }
@@ -794,7 +798,6 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap,
                           const hobject_t& ioid) :
   PG(o, curmap, _pool, p, oid, ioid),
   pgbackend(new ReplicatedBackend(this, coll_t(p), o)),
-  rw_manager(),
   snapset_contexts_lock("ReplicatedPG::snapset_contexts"),
   temp_seq(0),
   snap_trimmer_machine(this)
@@ -1121,6 +1124,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
   OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops,
                                 &obc->obs, obc->ssc, 
                                 this);
+  ctx->obc = obc;
   if (!get_rw_locks(ctx)) {
     op->mark_delayed("waiting for rw locks");
     close_op_ctx(ctx);
@@ -1143,7 +1147,6 @@ void ReplicatedPG::do_op(OpRequestRef op)
   }
 
   op->mark_started();
-  ctx->obc = obc;
   ctx->src_obc = src_obc;
 
   execute_ctx(ctx);
@@ -4661,7 +4664,6 @@ void ReplicatedPG::op_applied(RepGather *repop)
   }
 
   repop->src_obc.clear();
-  repop->obc = ObjectContextRef();
 
   if (!repop->aborted) {
     assert(repop->waitfor_ack.count(whoami) ||
@@ -6049,7 +6051,7 @@ int ReplicatedPG::recover_missing(
   }
   start_recovery_op(soid);
   assert(!recovering.count(soid));
-  recovering.insert(soid);
+  recovering.insert(make_pair(soid, obc));
   pgbackend->recover_object(
     soid,
     head_obc,
@@ -7422,7 +7424,8 @@ void ReplicatedPG::_clear_recovery_state()
   list<OpRequestRef> blocked_ops;
   set<hobject_t>::iterator i = backfills_in_flight.begin();
   while (i != backfills_in_flight.end()) {
-    rw_manager.drop_backfill_read(*i, &blocked_ops);
+    assert(recovering.count(*i));
+    recovering[*i]->drop_backfill_read(&blocked_ops);
     requeue_ops(blocked_ops);
     backfills_in_flight.erase(i++);
   }
@@ -7834,7 +7837,7 @@ int ReplicatedPG::prep_object_replica_pushes(
 
   start_recovery_op(soid);
   assert(!recovering.count(soid));
-  recovering.insert(soid);
+  recovering.insert(make_pair(soid, obc));
 
   /* We need this in case there is an in progress write on the object.  In fact,
    * the only possible write is an update to the xattr due to a lost_revert --
@@ -7982,7 +7985,8 @@ int ReplicatedPG::recover_backfill(
   update_range(&backfill_info, handle);
 
   int ops = 0;
-  map<hobject_t, pair<eversion_t, eversion_t> > to_push;
+  map<hobject_t,
+      boost::tuple<eversion_t, eversion_t, ObjectContextRef> > to_push;
   map<hobject_t, eversion_t> to_remove;
   set<hobject_t> add_to_stat;
 
@@ -8040,10 +8044,13 @@ int ReplicatedPG::recover_backfill(
     } else if (pbi.begin == backfill_info.begin) {
       eversion_t& obj_v = backfill_info.objects.begin()->second;
       if (pbi.objects.begin()->second != obj_v) {
-       if (rw_manager.get_backfill_read(backfill_info.begin)) {
+       ObjectContextRef obc = get_object_context(backfill_info.begin, false);
+       assert(obc);
+       if (obc->get_backfill_read()) {
          dout(20) << " replacing peer " << pbi.begin << " with local "
                   << obj_v << dendl;
-         to_push[pbi.begin] = make_pair(obj_v, pbi.objects.begin()->second);
+         to_push[pbi.begin] = boost::make_tuple(
+           obj_v, pbi.objects.begin()->second, obc);
          ops++;
        } else {
          *work_started = true;
@@ -8064,13 +8071,17 @@ int ReplicatedPG::recover_backfill(
       backfill_info.pop_front();
       pbi.pop_front();
     } else {
-      if (rw_manager.get_backfill_read(backfill_info.begin)) {
+      ObjectContextRef obc = get_object_context(backfill_info.begin, false);
+      assert(obc);
+      if (obc->get_backfill_read()) {
        dout(20) << " pushing local " << backfill_info.begin << " "
                 << backfill_info.objects.begin()->second
                 << " to peer osd." << backfill_target << dendl;
        to_push[backfill_info.begin] =
-         make_pair(backfill_info.objects.begin()->second,
-                   eversion_t());
+         boost::make_tuple(
+           backfill_info.objects.begin()->second,
+           eversion_t(),
+           obc);
        add_to_stat.insert(backfill_info.begin);
        backfill_info.pop_front();
        ops++;
@@ -8101,12 +8112,15 @@ int ReplicatedPG::recover_backfill(
 
   PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
   map<int, vector<PushOp> > pushes;
-  for (map<hobject_t, pair<eversion_t, eversion_t> >::iterator i = to_push.begin();
+  for (map<hobject_t,
+          boost::tuple<eversion_t, eversion_t, ObjectContextRef> >::iterator i =
+            to_push.begin();
        i != to_push.end();
        ++i) {
     handle.reset_tp_timeout();
     prep_backfill_object_push(
-      i->first, i->second.first, i->second.second, backfill_target, h);
+      i->first, i->second.get<0>(), i->second.get<1>(), i->second.get<2>(),
+      backfill_target, h);
   }
   pgbackend->run_recovery_op(h, cct->_conf->osd_recovery_op_priority);
 
@@ -8155,7 +8169,9 @@ int ReplicatedPG::recover_backfill(
 }
 
 void ReplicatedPG::prep_backfill_object_push(
-  hobject_t oid, eversion_t v, eversion_t have, int peer,
+  hobject_t oid, eversion_t v, eversion_t have,
+  ObjectContextRef obc,
+  int peer,
   PGBackend::RecoveryHandle *h)
 {
   dout(10) << "push_backfill_object " << oid << " v " << v << " to osd." << peer << dendl;
@@ -8168,8 +8184,7 @@ void ReplicatedPG::prep_backfill_object_push(
   assert(!recovering.count(oid));
 
   start_recovery_op(oid);
-  recovering.insert(oid);
-  ObjectContextRef obc = get_object_context(oid, false);
+  recovering.insert(make_pair(oid, obc));
 
   // We need to take the read_lock here in order to flush in-progress writes
   obc->ondisk_read_lock();
index e897925f091be012e74ba160c875f97a56e53b6f..455aaf60482571a617e3187b23532a49a3529a34 100644 (file)
@@ -474,7 +474,6 @@ public:
     void put() {
       assert(nref > 0);
       if (--nref == 0) {
-       assert(!obc);
        assert(src_obc.empty());
        delete ctx; // must already be unlocked
        delete this;
@@ -495,144 +494,6 @@ public:
 
 protected:
 
-  /// Tracks pending readers or writers on an object
-  class RWTracker {
-    struct ObjState {
-      enum State {
-       NONE,
-       READ,
-       WRITE
-      };
-      State state;                 /// rw state
-      uint64_t count;              /// number of readers or writers
-      list<OpRequestRef> waiters;  /// ops waiting on state change
-
-      /// if set, restart backfill when we can get a read lock
-      bool backfill_read_marker;
-
-      ObjState() : state(NONE), count(0), backfill_read_marker(false) {}
-      bool get_read(OpRequestRef op) {
-       if (get_read_lock()) {
-         return true;
-       } // else
-       waiters.push_back(op);
-       return false;
-      }
-      /// this function adjusts the counts if necessary
-      bool get_read_lock() {
-       // don't starve anybody!
-       if (!waiters.empty()) {
-         return false;
-       }
-       switch (state) {
-       case NONE:
-         assert(count == 0);
-         state = READ;
-         // fall through
-       case READ:
-         count++;
-         return true;
-       case WRITE:
-         return false;
-       default:
-         assert(0 == "unhandled case");
-         return false;
-       }
-      }
-
-      bool get_write(OpRequestRef op) {
-       if (get_write_lock()) {
-         return true;
-       } // else
-       waiters.push_back(op);
-       return false;
-      }
-      bool get_write_lock() {
-       // don't starve anybody!
-       if (!waiters.empty() ||
-           backfill_read_marker) {
-         return false;
-       }
-       switch (state) {
-       case NONE:
-         assert(count == 0);
-         state = WRITE;
-         // fall through
-       case WRITE:
-         count++;
-         return true;
-       case READ:
-         return false;
-       default:
-         assert(0 == "unhandled case");
-         return false;
-       }
-      }
-      void dec(list<OpRequestRef> *requeue) {
-       assert(count > 0);
-       assert(requeue);
-       assert(requeue->empty());
-       count--;
-       if (count == 0) {
-         state = NONE;
-         requeue->swap(waiters);
-       }
-      }
-      void put_read(list<OpRequestRef> *requeue) {
-       assert(state == READ);
-       dec(requeue);
-      }
-      void put_write(list<OpRequestRef> *requeue) {
-       assert(state == WRITE);
-       dec(requeue);
-      }
-      bool empty() const { return state == NONE; }
-    };
-    map<hobject_t, ObjState > obj_state; ///< map of rw_lock states
-  public:
-    RWTracker() {}
-
-    bool get_read(const hobject_t &hoid, OpRequestRef op) {
-      return obj_state[hoid].get_read(op);
-    }
-    bool get_write(const hobject_t &hoid, OpRequestRef op) {
-      return obj_state[hoid].get_write(op);
-    }
-    void put_read(const hobject_t &hoid, list<OpRequestRef> *to_wake) {
-      obj_state[hoid].put_read(to_wake);
-      if (obj_state[hoid].empty()) {
-       obj_state.erase(hoid);
-      }
-    }
-    void put_write(const hobject_t &hoid, list<OpRequestRef> *to_wake,
-                   bool *requeue_recovery) {
-      obj_state[hoid].put_write(to_wake);
-      if (obj_state[hoid].empty()) {
-       if (obj_state[hoid].backfill_read_marker)
-         *requeue_recovery = true;
-       obj_state.erase(hoid);
-      }
-    }
-    bool get_backfill_read(const hobject_t &hoid) {
-      ObjState& obj_locker = obj_state[hoid];
-      obj_locker.backfill_read_marker = true;
-      if (obj_locker.get_read_lock()) {
-       return true;
-      } // else
-      return false;
-    }
-    void drop_backfill_read(const hobject_t &hoid, list<OpRequestRef> *ls) {
-      map<hobject_t, ObjState>::iterator i = obj_state.find(hoid);
-      ObjState& obj_locker = i->second;
-      assert(obj_locker.backfill_read_marker = true);
-      obj_locker.put_read(ls);
-      if (obj_locker.empty())
-       obj_state.erase(i);
-      else
-       obj_locker.backfill_read_marker = false;
-    }
-  } rw_manager;
-
   /**
    * Grabs locks for OpContext, should be cleaned up in close_op_ctx
    *
@@ -641,7 +502,7 @@ protected:
    */
   bool get_rw_locks(OpContext *ctx) {
     if (ctx->op->may_write()) {
-      if (rw_manager.get_write(ctx->obs->oi.soid, ctx->op)) {
+      if (ctx->obc->get_write(ctx->op)) {
        ctx->lock_to_release = OpContext::W_LOCK;
        return true;
       } else {
@@ -649,7 +510,7 @@ protected:
       }
     } else {
       assert(ctx->op->may_read());
-      if (rw_manager.get_read(ctx->obs->oi.soid, ctx->op)) {
+      if (ctx->obc->get_read(ctx->op)) {
        ctx->lock_to_release = OpContext::R_LOCK;
        return true;
       } else {
@@ -678,12 +539,12 @@ protected:
     bool requeue_recovery = false;
     switch (ctx->lock_to_release) {
     case OpContext::W_LOCK:
-      rw_manager.put_write(ctx->obs->oi.soid, &to_req, &requeue_recovery);
+      ctx->obc->put_write(&to_req, &requeue_recovery);
       if (requeue_recovery)
        osd->recovery_wq.queue(this);
       break;
     case OpContext::R_LOCK:
-      rw_manager.put_read(ctx->obs->oi.soid, &to_req);
+      ctx->obc->put_read(&to_req);
       break;
     case OpContext::NONE:
       break;
@@ -803,7 +664,7 @@ protected:
   }
   void put_snapset_context(SnapSetContext *ssc);
 
-  set<hobject_t> recovering;
+  map<hobject_t, ObjectContextRef> recovering;
 
   /*
    * Backfill
@@ -847,10 +708,10 @@ protected:
     }
     {
       f->open_array_section("recovering");
-      for (set<hobject_t>::const_iterator i = recovering.begin();
+      for (map<hobject_t, ObjectContextRef>::const_iterator i = recovering.begin();
           i != recovering.end();
           ++i) {
-       f->dump_stream("object") << *i;
+       f->dump_stream("object") << i->first;
       }
       f->close_section();
     }
@@ -938,7 +799,8 @@ protected:
     );
 
   void prep_backfill_object_push(
-    hobject_t oid, eversion_t v, eversion_t have, int peer,
+    hobject_t oid, eversion_t v, eversion_t have, ObjectContextRef obc,
+    int peer,
     PGBackend::RecoveryHandle *h);
   void send_remove_op(const hobject_t& oid, eversion_t v, int peer);
 
index 293870d09f6ada45b99cd2ce5b7e2a04ebf520b7..351d050c476a6652c1cc4404a11a1ab407db4cfc 100644 (file)
@@ -2210,6 +2210,128 @@ public:
   // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
   map<pair<uint64_t, entity_name_t>, WatchRef> watchers;
 
+  struct RWState {
+    enum State {
+      RWNONE,
+      RWREAD,
+      RWWRITE
+    };
+    State state;                 /// rw state
+    uint64_t count;              /// number of readers or writers
+    list<OpRequestRef> waiters;  /// ops waiting on state change
+
+    /// if set, restart backfill when we can get a read lock
+    bool backfill_read_marker;
+
+    RWState() : state(RWNONE), count(0), backfill_read_marker(false) {}
+    bool get_read(OpRequestRef op) {
+      if (get_read_lock()) {
+       return true;
+      } // else
+      waiters.push_back(op);
+      return false;
+    }
+    /// this function adjusts the counts if necessary
+    bool get_read_lock() {
+      // don't starve anybody!
+      if (!waiters.empty()) {
+       return false;
+      }
+      switch (state) {
+      case RWNONE:
+       assert(count == 0);
+       state = RWREAD;
+       // fall through
+      case RWREAD:
+       count++;
+       return true;
+      case RWWRITE:
+       return false;
+      default:
+       assert(0 == "unhandled case");
+       return false;
+      }
+    }
+
+    bool get_write(OpRequestRef op) {
+      if (get_write_lock()) {
+       return true;
+      } // else
+      waiters.push_back(op);
+      return false;
+    }
+    bool get_write_lock() {
+      // don't starve anybody!
+      if (!waiters.empty() ||
+         backfill_read_marker) {
+       return false;
+      }
+      switch (state) {
+      case RWNONE:
+       assert(count == 0);
+       state = RWWRITE;
+       // fall through
+      case RWWRITE:
+       count++;
+       return true;
+      case RWREAD:
+       return false;
+      default:
+       assert(0 == "unhandled case");
+       return false;
+      }
+    }
+    void dec(list<OpRequestRef> *requeue) {
+      assert(count > 0);
+      assert(requeue);
+      assert(requeue->empty());
+      count--;
+      if (count == 0) {
+       state = RWNONE;
+       requeue->swap(waiters);
+      }
+    }
+    void put_read(list<OpRequestRef> *requeue) {
+      assert(state == RWREAD);
+      dec(requeue);
+    }
+    void put_write(list<OpRequestRef> *requeue) {
+      assert(state == RWWRITE);
+      dec(requeue);
+    }
+    bool empty() const { return state == RWNONE; }
+  } rwstate;
+
+  bool get_read(OpRequestRef op) {
+    return rwstate.get_read(op);
+  }
+  bool get_write(OpRequestRef op) {
+    return rwstate.get_write(op);
+  }
+  bool get_backfill_read() {
+    rwstate.backfill_read_marker = true;
+    if (rwstate.get_read_lock()) {
+      return true;
+    }
+    return false;
+  }
+  void drop_backfill_read(list<OpRequestRef> *ls) {
+    assert(rwstate.backfill_read_marker);
+    rwstate.put_read(ls);
+    rwstate.backfill_read_marker = false;
+  }
+  void put_read(list<OpRequestRef> *to_wake) {
+    rwstate.put_read(to_wake);
+  }
+  void put_write(list<OpRequestRef> *to_wake,
+                bool *requeue_recovery) {
+    rwstate.put_write(to_wake);
+    if (rwstate.empty() && rwstate.backfill_read_marker) {
+      rwstate.backfill_read_marker = false;
+      *requeue_recovery = true;
+    }
+  }
+
   ObjectContext()
     : ssc(NULL),
       destructor_callback(0),
@@ -2218,6 +2340,7 @@ public:
       copyfrom_readside(0) {}
 
   ~ObjectContext() {
+    assert(rwstate.empty());
     if (destructor_callback)
       destructor_callback->complete(0);
   }