]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedBackend: take read locks for clone sources during recovery 12844/head
authorSamuel Just <sjust@redhat.com>
Wed, 23 Nov 2016 23:41:13 +0000 (15:41 -0800)
committerSamuel Just <sjust@redhat.com>
Thu, 12 Jan 2017 20:43:18 +0000 (12:43 -0800)
Otherwise, we run the risk of a clone source which hasn't actually
come into existence yet being used if we grab a clone which *just*
got added the the ssc, but has not yet actually had time to be
created (can't rely on message ordering here since recovery messages
don't necessarily order with client IO!).

Fixes: http://tracker.ceph.com/issues/17831
Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/PGBackend.h
src/osd/PrimaryLogPG.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h
src/osd/osd_types.h

index d4e0091ca735b219ce2588ce4c4d6579ce437224..46d66e7194edfa1d6440e1f2b3f6e2c7f3eb714e 100644 (file)
@@ -186,6 +186,12 @@ typedef ceph::shared_ptr<const OSDMap> OSDMapRef;
        const hobject_t &hoid,
        map<string, bufferlist> &attrs) = 0;
 
+     virtual bool try_lock_for_read(
+       const hobject_t &hoid,
+       ObcLockManager &manager) = 0;
+
+     virtual void release_locks(ObcLockManager &manager) = 0;
+
      virtual void op_applied(
        const eversion_t &applied_version) = 0;
 
index 019e871e12d7e37693d5a31ad5e090919b054bc5..114a21c66255157f40b5146742a756cc064bba70 100644 (file)
@@ -335,11 +335,26 @@ public:
   const pg_pool_t &get_pool() const override {
     return pool.info;
   }
+
   ObjectContextRef get_obc(
     const hobject_t &hoid,
     map<string, bufferlist> &attrs) override {
     return get_object_context(hoid, true, &attrs);
   }
+
+  bool try_lock_for_read(
+    const hobject_t &hoid,
+    ObcLockManager &manager) override {
+    auto obc = get_object_context(hoid, false, nullptr);
+    if (!obc)
+      return false;
+    return manager.try_get_read_lock(hoid, obc);
+  }
+
+  void release_locks(ObcLockManager &manager) {
+    release_object_locks(manager);
+  }
+
   void pgb_set_object_snap_mapping(
     const hobject_t &soid,
     const set<snapid_t> &snaps,
@@ -352,7 +367,6 @@ public:
     return clear_object_snap_mapping(t, soid);
   }
 
-
   void log_operation(
     const vector<pg_log_entry_t> &logv,
     boost::optional<pg_hit_set_history_t> &hset_history,
index c051db8828efa814bce5b73cf4d767393e96bf9a..070bbbf0150ef6e74110ffeb781a9541e3a9a4d0 100644 (file)
@@ -166,9 +166,8 @@ void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
       for (set<hobject_t, hobject_t::BitwiseComparator>::iterator j = i->second.begin();
           j != i->second.end();
           ++j) {
-       assert(pulling.count(*j) == 1);
        get_parent()->cancel_pull(*j);
-       pulling.erase(*j);
+       clear_pull(pulling.find(*j), false);
       }
       pull_from_peer.erase(i++);
     } else {
@@ -274,7 +273,16 @@ bool ReplicatedBackend::handle_message(
 void ReplicatedBackend::clear_recovery_state()
 {
   // clear pushing/pulling maps
+  for (auto &&i: pushing) {
+    for (auto &&j: i.second) {
+      get_parent()->release_locks(j.second.lock_manager);
+    }
+  }
   pushing.clear();
+
+  for (auto &&i: pulling) {
+    get_parent()->release_locks(i.second.lock_manager);
+  }
   pulling.clear();
   pull_from_peer.clear();
 }
@@ -859,25 +867,18 @@ void ReplicatedBackend::_do_push(OpRequestRef op)
 
 struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
   ReplicatedBackend *bc;
-  list<hobject_t> to_continue;
+  list<ReplicatedBackend::pull_complete_info> to_continue;
   int priority;
   C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
     : bc(bc), priority(priority) {}
 
   void finish(ThreadPool::TPHandle &handle) {
     ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
-    for (list<hobject_t>::iterator i =
-          to_continue.begin();
-        i != to_continue.end();
-        ++i) {
-      map<hobject_t, ReplicatedBackend::PullInfo, hobject_t::BitwiseComparator>::iterator j =
-       bc->pulling.find(*i);
-      assert(j != bc->pulling.end());
-      if (!bc->start_pushes(*i, j->second.obc, h)) {
+    for (auto &&i: to_continue) {
+      if (!bc->start_pushes(i.hoid, i.obc, h)) {
        bc->get_parent()->on_global_recover(
-         *i, j->second.stat);
+         i.hoid, i.stat);
       }
-      bc->pulling.erase(*i);
       handle.reset_tp_timeout();
     }
     bc->run_recovery_op(h, priority);
@@ -894,7 +895,7 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
 
   vector<PullOp> replies(1);
   ObjectStore::Transaction t;
-  list<hobject_t> to_continue;
+  list<pull_complete_info> to_continue;
   for (vector<PushOp>::iterator i = m->pushes.begin();
        i != m->pushes.end();
        ++i) {
@@ -1255,7 +1256,8 @@ void ReplicatedBackend::calc_head_subsets(
   const pg_missing_t& missing,
   const hobject_t &last_backfill,
   interval_set<uint64_t>& data_subset,
-  map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets)
+  map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
+  ObcLockManager &manager)
 {
   dout(10) << "calc_head_subsets " << head
           << " clone_overlap " << snapset.clone_overlap << dendl;
@@ -1285,7 +1287,8 @@ void ReplicatedBackend::calc_head_subsets(
     c.snap = snapset.clones[j];
     prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
     if (!missing.is_missing(c) &&
-       cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0) {
+       cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0 &&
+       get_parent()->try_lock_for_read(c, manager)) {
       dout(10) << "calc_head_subsets " << head << " has prev " << c
               << " overlap " << prev << dendl;
       clone_subsets[c] = prev;
@@ -1299,6 +1302,7 @@ void ReplicatedBackend::calc_head_subsets(
 
   if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
     dout(10) << "skipping clone, too many holes" << dendl;
+    get_parent()->release_locks(manager);
     clone_subsets.clear();
     cloning.clear();
   }
@@ -1316,7 +1320,8 @@ void ReplicatedBackend::calc_clone_subsets(
   const pg_missing_t& missing,
   const hobject_t &last_backfill,
   interval_set<uint64_t>& data_subset,
-  map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets)
+  map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
+  ObcLockManager &manager)
 {
   dout(10) << "calc_clone_subsets " << soid
           << " clone_overlap " << snapset.clone_overlap << dendl;
@@ -1350,7 +1355,8 @@ void ReplicatedBackend::calc_clone_subsets(
     c.snap = snapset.clones[j];
     prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
     if (!missing.is_missing(c) &&
-       cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0) {
+       cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0 &&
+       get_parent()->try_lock_for_read(c, manager)) {
       dout(10) << "calc_clone_subsets " << soid << " has prev " << c
               << " overlap " << prev << dendl;
       clone_subsets[c] = prev;
@@ -1370,7 +1376,8 @@ void ReplicatedBackend::calc_clone_subsets(
     c.snap = snapset.clones[j];
     next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
     if (!missing.is_missing(c) &&
-       cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0) {
+       cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0 &&
+       get_parent()->try_lock_for_read(c, manager)) {
       dout(10) << "calc_clone_subsets " << soid << " has next " << c
               << " overlap " << next << dendl;
       clone_subsets[c] = next;
@@ -1383,6 +1390,7 @@ void ReplicatedBackend::calc_clone_subsets(
 
   if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
     dout(10) << "skipping clone, too many holes" << dendl;
+    get_parent()->release_locks(manager);
     clone_subsets.clear();
     cloning.clear();
   }
@@ -1444,6 +1452,7 @@ void ReplicatedBackend::prepare_pull(
   }
 
   ObjectRecoveryInfo recovery_info;
+  ObcLockManager lock_manager;
 
   if (soid.is_snap()) {
     assert(!get_parent()->get_local_missing().is_missing(
@@ -1455,10 +1464,12 @@ void ReplicatedBackend::prepare_pull(
     SnapSetContext *ssc = headctx->ssc;
     assert(ssc);
     dout(10) << " snapset " << ssc->snapset << dendl;
-    calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(),
-                      get_info().last_backfill,
-                      recovery_info.copy_subset,
-                      recovery_info.clone_subset);
+    calc_clone_subsets(
+      ssc->snapset, soid, get_parent()->get_local_missing(),
+      get_info().last_backfill,
+      recovery_info.copy_subset,
+      recovery_info.clone_subset,
+      lock_manager);
     // FIXME: this may overestimate if we are pulling multiple clones in parallel...
     dout(10) << " pulling " << recovery_info << dendl;
 
@@ -1486,10 +1497,13 @@ void ReplicatedBackend::prepare_pull(
   assert(!pulling.count(soid));
   pull_from_peer[fromshard].insert(soid);
   PullInfo &pi = pulling[soid];
+  pi.from = fromshard;
+  pi.soid = soid;
   pi.head_ctx = headctx;
   pi.recovery_info = op.recovery_info;
   pi.recovery_progress = op.recovery_progress;
   pi.cache_dont_need = h->cache_dont_need;
+  pi.lock_manager = std::move(lock_manager);
 }
 
 /*
@@ -1509,6 +1523,7 @@ void ReplicatedBackend::prep_push_to_replica(
   map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator> clone_subsets;
   interval_set<uint64_t> data_subset;
 
+  ObcLockManager lock_manager;
   // are we doing a clone on the replica?
   if (soid.snap && soid.snap < CEPH_NOSNAP) {
     hobject_t head = soid;
@@ -1537,10 +1552,12 @@ void ReplicatedBackend::prep_push_to_replica(
     map<pg_shard_t, pg_info_t>::const_iterator pi =
       get_parent()->get_shard_info().find(peer);
     assert(pi != get_parent()->get_shard_info().end());
-    calc_clone_subsets(ssc->snapset, soid,
-                      pm->second,
-                      pi->second.last_backfill,
-                      data_subset, clone_subsets);
+    calc_clone_subsets(
+      ssc->snapset, soid,
+      pm->second,
+      pi->second.last_backfill,
+      data_subset, clone_subsets,
+      lock_manager);
   } else if (soid.snap == CEPH_NOSNAP) {
     // pushing head or unversioned object.
     // base this on partially on replica's clones?
@@ -1551,10 +1568,20 @@ void ReplicatedBackend::prep_push_to_replica(
       obc,
       ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
       get_parent()->get_shard_info().find(peer)->second.last_backfill,
-      data_subset, clone_subsets);
+      data_subset, clone_subsets,
+      lock_manager);
   }
 
-  prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop, cache_dont_need);
+  prep_push(
+    obc,
+    soid,
+    peer,
+    oi.version,
+    data_subset,
+    clone_subsets,
+    pop,
+    cache_dont_need,
+    std::move(lock_manager));
 }
 
 void ReplicatedBackend::prep_push(ObjectContextRef obc,
@@ -1568,7 +1595,7 @@ void ReplicatedBackend::prep_push(ObjectContextRef obc,
 
   prep_push(obc, soid, peer,
            obc->obs.oi.version, data_subset, clone_subsets,
-           pop, cache_dont_need);
+           pop, cache_dont_need, ObcLockManager());
 }
 
 void ReplicatedBackend::prep_push(
@@ -1578,7 +1605,8 @@ void ReplicatedBackend::prep_push(
   interval_set<uint64_t> &data_subset,
   map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
   PushOp *pop,
-  bool cache_dont_need)
+  bool cache_dont_need,
+  ObcLockManager &&lock_manager)
 {
   get_parent()->begin_peer_recover(peer, soid);
   // take note.
@@ -1594,6 +1622,7 @@ void ReplicatedBackend::prep_push(
   pi.recovery_progress.data_recovered_to = 0;
   pi.recovery_progress.data_complete = 0;
   pi.recovery_progress.omap_complete = 0;
+  pi.lock_manager = std::move(lock_manager);
 
   ObjectRecoveryProgress new_progress;
   int r = build_push_op(pi.recovery_info,
@@ -1731,7 +1760,8 @@ void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info,
 
 ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
   const ObjectRecoveryInfo& recovery_info,
-  SnapSetContext *ssc)
+  SnapSetContext *ssc,
+  ObcLockManager &manager)
 {
   if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
     return recovery_info;
@@ -1739,17 +1769,19 @@ ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
   new_info.copy_subset.clear();
   new_info.clone_subset.clear();
   assert(ssc);
-  calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
-                    get_info().last_backfill,
-                    new_info.copy_subset, new_info.clone_subset);
+  get_parent()->release_locks(manager); // might already have locks
+  calc_clone_subsets(
+    ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
+    get_info().last_backfill,
+    new_info.copy_subset, new_info.clone_subset,
+    manager);
   return new_info;
 }
 
 bool ReplicatedBackend::handle_pull_response(
   pg_shard_t from, PushOp &pop, PullOp *response,
-  list<hobject_t> *to_continue,
-  ObjectStore::Transaction *t
-  )
+  list<pull_complete_info> *to_continue,
+  ObjectStore::Transaction *t)
 {
   interval_set<uint64_t> data_included = pop.data_included;
   bufferlist data;
@@ -1794,7 +1826,10 @@ bool ReplicatedBackend::handle_pull_response(
     }
     pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset);
     pi.recovery_info.oi = pi.obc->obs.oi;
-    pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc);
+    pi.recovery_info = recalc_subsets(
+      pi.recovery_info,
+      pi.obc->ssc,
+      pi.lock_manager);
   }
 
 
@@ -1830,12 +1865,10 @@ bool ReplicatedBackend::handle_pull_response(
 
   if (complete) {
     pi.stat.num_objects_recovered++;
-    to_continue->push_back(hoid);
+    to_continue->push_back({hoid, pi.obc, pi.stat});
     get_parent()->on_local_recover(
       hoid, pi.recovery_info, pi.obc, t);
-    pull_from_peer[from].erase(hoid);
-    if (pull_from_peer[from].empty())
-      pull_from_peer.erase(from);
+    clear_pull(pulling.find(hoid));
     return false;
   } else {
     response->soid = pop.soid;
@@ -2182,6 +2215,7 @@ bool ReplicatedBackend::handle_push_reply(pg_shard_t peer, PushReplyOp &op, Push
       stat.num_keys_recovered = reply->omap_entries.size();
       stat.num_objects_recovered = 1;
 
+      get_parent()->release_locks(pi->lock_manager);
       pushing[soid].erase(peer);
       pi = NULL;
 
@@ -2328,7 +2362,7 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
   if (is_primary()) {
     PullOp resp;
     RPGHandle *h = _open_recovery_op();
-    list<hobject_t> to_continue;
+    list<pull_complete_info> to_continue;
     bool more = handle_pull_response(
       m->from, pop, &resp,
       &to_continue, &t);
@@ -2369,10 +2403,22 @@ void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid)
 {
   list<pg_shard_t> fl = { from };
   get_parent()->failed_push(fl, soid);
-  pull_from_peer[from].erase(soid);
-  if (pull_from_peer[from].empty())
-    pull_from_peer.erase(from);
-  pulling.erase(soid);
+
+  clear_pull(pulling.find(soid));
+}
+
+void ReplicatedBackend::clear_pull(
+  map<hobject_t, PullInfo, hobject_t::BitwiseComparator>::iterator piter,
+  bool clear_pull_from_peer)
+{
+  auto from = piter->second.from;
+  if (clear_pull_from_peer) {
+    pull_from_peer[from].erase(piter->second.soid);
+    if (pull_from_peer[from].empty())
+      pull_from_peer.erase(from);
+  }
+  get_parent()->release_locks(piter->second.lock_manager);
+  pulling.erase(piter);
 }
 
 int ReplicatedBackend::start_pushes(
index b9431cc0a8d01f8b6a34eed4474af5f3f2e3c365..67e8784238cfeb0d38f403fc3072ca05c8e7f4db 100644 (file)
@@ -169,6 +169,7 @@ private:
     ObjectRecoveryInfo recovery_info;
     ObjectContextRef obc;
     object_stat_sum_t stat;
+    ObcLockManager lock_manager;
 
     void dump(Formatter *f) const {
       {
@@ -187,12 +188,15 @@ private:
 
   // pull
   struct PullInfo {
+    pg_shard_t from;
+    hobject_t soid;
     ObjectRecoveryProgress recovery_progress;
     ObjectRecoveryInfo recovery_info;
     ObjectContextRef head_ctx;
     ObjectContextRef obc;
     object_stat_sum_t stat;
     bool cache_dont_need;
+    ObcLockManager lock_manager;
 
     void dump(Formatter *f) const {
       {
@@ -216,6 +220,9 @@ private:
 
   // Reverse mapping from osd peer to objects beging pulled from that peer
   map<pg_shard_t, set<hobject_t, hobject_t::BitwiseComparator> > pull_from_peer;
+  void clear_pull(
+    map<hobject_t, PullInfo, hobject_t::BitwiseComparator>::iterator piter,
+    bool clear_pull_from_peer = true);
 
   void sub_op_push(OpRequestRef op);
   void sub_op_push_reply(OpRequestRef op);
@@ -235,9 +242,15 @@ private:
 
   bool handle_push_reply(pg_shard_t peer, PushReplyOp &op, PushOp *reply);
   void handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply);
+
+  struct pull_complete_info {
+    hobject_t hoid;
+    ObjectContextRef obc;
+    object_stat_sum_t stat;
+  };
   bool handle_pull_response(
     pg_shard_t from, PushOp &op, PullOp *response,
-    list<hobject_t> *to_continue,
+    list<pull_complete_info> *to_continue,
     ObjectStore::Transaction *t);
   void handle_push(pg_shard_t from, PushOp &op, PushReplyOp *response,
                   ObjectStore::Transaction *t);
@@ -283,7 +296,8 @@ private:
     SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing,
     const hobject_t &last_backfill,
     interval_set<uint64_t>& data_subset,
-    map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets);
+    map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
+    ObcLockManager &lock_manager);
   void prepare_pull(
     eversion_t v,
     const hobject_t& soid,
@@ -296,26 +310,31 @@ private:
   void prep_push_to_replica(
     ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
     PushOp *pop, bool cache_dont_need = true);
-  void prep_push(ObjectContextRef obc,
-                const hobject_t& oid, pg_shard_t dest,
-                PushOp *op,
-                bool cache_dont_need);
-  void prep_push(ObjectContextRef obc,
-                const hobject_t& soid, pg_shard_t peer,
-                eversion_t version,
-                interval_set<uint64_t> &data_subset,
-                map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
-                PushOp *op,
-                 bool cache = false);
-  void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
-                        const pg_missing_t& missing,
-                        const hobject_t &last_backfill,
-                        interval_set<uint64_t>& data_subset,
-                        map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets);
+  void prep_push(
+    ObjectContextRef obc,
+    const hobject_t& oid, pg_shard_t dest,
+    PushOp *op,
+    bool cache_dont_need);
+  void prep_push(
+    ObjectContextRef obc,
+    const hobject_t& soid, pg_shard_t peer,
+    eversion_t version,
+    interval_set<uint64_t> &data_subset,
+    map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
+    PushOp *op,
+    bool cache,
+    ObcLockManager &&lock_manager);
+  void calc_head_subsets(
+    ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
+    const pg_missing_t& missing,
+    const hobject_t &last_backfill,
+    interval_set<uint64_t>& data_subset,
+    map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
+    ObcLockManager &lock_manager);
   ObjectRecoveryInfo recalc_subsets(
     const ObjectRecoveryInfo& recovery_info,
-    SnapSetContext *ssc
-    );
+    SnapSetContext *ssc,
+    ObcLockManager &lock_manager);
 
   /**
    * Client IO
index f8aeb083fc1b797cc8e0586eda1236d85e302258..212a32ef5a1c176ebc6aad92504ca1b67eae425f 100644 (file)
@@ -4345,6 +4345,9 @@ public:
     }
     return false;
   }
+  bool try_get_read_lock() {
+    return rwstate.get_read_lock();
+  }
   void drop_recovery_read(list<OpRequestRef> *ls) {
     assert(rwstate.recovery_read_marker);
     rwstate.put_read(ls);
@@ -4504,6 +4507,7 @@ public:
   ObcLockManager() = default;
   ObcLockManager(ObcLockManager &&) = default;
   ObcLockManager(const ObcLockManager &) = delete;
+  ObcLockManager &operator=(ObcLockManager &&) = default;
   bool empty() const {
     return locks.empty();
   }
@@ -4564,6 +4568,23 @@ public:
       return false;
     }
   }
+
+  /// try get read lock
+  bool try_get_read_lock(
+    const hobject_t &hoid,
+    ObjectContextRef obc) {
+    assert(locks.find(hoid) == locks.end());
+    if (obc->try_get_read_lock()) {
+      locks.insert(
+       make_pair(
+         hoid,
+         ObjectLockState(obc, ObjectContext::RWState::RWREAD)));
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   void put_locks(
     list<pair<hobject_t, list<OpRequestRef> > > *to_requeue,
     bool *requeue_recovery,