]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG/Backend: split recover_missing out of prepare_pull
authorSamuel Just <sam.just@inktank.com>
Tue, 3 Sep 2013 22:39:18 +0000 (15:39 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 26 Sep 2013 18:24:27 +0000 (11:24 -0700)
Also, move prepare_pull to PGBackend.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/common/hobject.h
src/osd/ReplicatedBackend.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 4b6a33c66973005c3d98ddebf12b80a5341468bd..a2e7e5a9215c608e53897fcd9a065768be24d0e7 100644 (file)
@@ -79,9 +79,28 @@ public:
     return ret;
   }
 
+  /// @return head version of this hobject_t
+  hobject_t get_head() const {
+    hobject_t ret(*this);
+    ret.snap = CEPH_NOSNAP;
+    return ret;
+  }
+
+  /// @return snapdir version of this hobject_t
+  hobject_t get_snapdir() const {
+    hobject_t ret(*this);
+    ret.snap = CEPH_SNAPDIR;
+    return ret;
+  }
+
+  /// @return true if object is neither head nor snapdir
+  bool is_snap() const {
+    return (snap != CEPH_NOSNAP) && (snap != CEPH_SNAPDIR);
+  }
+
   /// @return true iff the object should have a snapset in it's attrs
   bool has_snapset() const {
-    return (snap == CEPH_NOSNAP) || (snap == CEPH_SNAPDIR);
+    return !is_snap();
   }
 
   /* Do not use when a particular hash function is needed */
index 3115afab956e2bc9776b46db29547c7a3f1c9778..e703d4c333c0972b4d0d445748b89dffea2f0494 100644 (file)
@@ -256,6 +256,17 @@ private:
                        ObjectStore::Transaction *t);
   void submit_push_complete(ObjectRecoveryInfo &recovery_info,
                            ObjectStore::Transaction *t);
+
+  void calc_clone_subsets(
+    SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing,
+    const hobject_t &last_backfill,
+    interval_set<uint64_t>& data_subset,
+    map<hobject_t, interval_set<uint64_t> >& clone_subsets);
+  void prepare_pull(
+    const hobject_t& soid,
+    ObjectContextRef headctx,
+    int priority,
+    RPGHandle *h);
 };
 
 #endif
index 36da431ad8c9ee14d1412f03519dfc149f17b1fc..462aed0c3eefa6c6c2b230ac2b925c4272ec7a9f 100644 (file)
@@ -278,10 +278,9 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef o
   }
   else {
     dout(7) << "missing " << soid << " v " << v << ", pulling." << dendl;
-    map<int, vector<PullOp> > pulls;
-    prepare_pull(soid, v, cct->_conf->osd_client_op_priority, &pulls);
-    // TODOSAM: replace
-    //send_pulls(g_conf->osd_client_op_priority, pulls);
+    PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
+    recover_missing(soid, v, cct->_conf->osd_client_op_priority, h);
+    pgbackend->run_recovery_op(h, cct->_conf->osd_client_op_priority);
   }
   waiting_for_missing_object[soid].push_back(op);
   op->mark_delayed("waiting for missing object");
@@ -5214,13 +5213,11 @@ ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid,
     obc->obs.oi = oi;
     obc->obs.exists = true;
 
-    if (can_create) {
-      obc->ssc = get_snapset_context(
-       soid.oid, soid.get_key(), soid.hash,
-       true, soid.get_namespace(),
-       soid.has_snapset() ? attrs : 0);
-      register_snapset_context(obc->ssc);
-    }
+    obc->ssc = get_snapset_context(
+      soid.oid, soid.get_key(), soid.hash,
+      true, soid.get_namespace(),
+      soid.has_snapset() ? attrs : 0);
+    register_snapset_context(obc->ssc);
 
     populate_obc_watchers(obc);
     dout(10) << "get_object_context " << obc << " " << soid << " 0 -> 1 read " << obc->obs.oi << dendl;
@@ -5773,11 +5770,12 @@ void ReplicatedPG::calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, con
           << "  clone_subsets " << clone_subsets << dendl;
 }
 
-void ReplicatedPG::calc_clone_subsets(SnapSet& snapset, const hobject_t& soid,
-                                     const pg_missing_t& missing,
-                                     const hobject_t &last_backfill,
-                                     interval_set<uint64_t>& data_subset,
-                                     map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+void ReplicatedBackend::calc_clone_subsets(
+  SnapSet& snapset, const hobject_t& soid,
+  const pg_missing_t& missing,
+  const hobject_t &last_backfill,
+  interval_set<uint64_t>& data_subset,
+  map<hobject_t, interval_set<uint64_t> >& clone_subsets)
 {
   dout(10) << "calc_clone_subsets " << soid
           << " clone_overlap " << snapset.clone_overlap << dendl;
@@ -5862,95 +5860,70 @@ void ReplicatedPG::calc_clone_subsets(SnapSet& snapset, const hobject_t& soid,
  */
 enum { PULL_NONE, PULL_OTHER, PULL_YES };
 
-int ReplicatedPG::prepare_pull(
-  const hobject_t& soid, eversion_t v,
+void ReplicatedBackend::prepare_pull(
+  const hobject_t& soid,
+  ObjectContextRef headctx,
   int priority,
-  map<int, vector<PullOp> > *pulls)
-{
+  RPGHandle *h)
+{
+  assert(get_parent()->get_local_missing().missing.count(soid));
+  eversion_t v = get_parent()->get_local_missing().missing.find(
+    soid)->second.need;
+  const map<hobject_t, set<int> > &missing_loc(
+    get_parent()->get_missing_loc());
+  const map<int, pg_missing_t > &peer_missing(
+    get_parent()->get_peer_missing());
   int fromosd = -1;
-  map<hobject_t,set<int> >::iterator q = missing_loc.find(soid);
-  if (q != missing_loc.end()) {
-    // randomize the list of possible sources
-    // should we take weights into account?
-    vector<int> shuffle(q->second.begin(), q->second.end());
-    random_shuffle(shuffle.begin(), shuffle.end());
-    for (vector<int>::iterator p = shuffle.begin();
-        p != shuffle.end();
-        ++p) {
-      if (get_osdmap()->is_up(*p)) {
-       fromosd = *p;
-       break;
-      }
-    }
-  }
-  if (fromosd < 0) {
-    dout(7) << "pull " << soid
-           << " v " << v 
-           << " but it is unfound" << dendl;
-    return PULL_NONE;
-  }
+  map<hobject_t,set<int> >::const_iterator q = missing_loc.find(soid);
+  assert(q != missing_loc.end());
+  assert(!q->second.empty());
+
+  // pick a pullee
+  vector<int> shuffle(q->second.begin(), q->second.end());
+  random_shuffle(shuffle.begin(), shuffle.end());
+  vector<int>::iterator p = shuffle.begin();
+  assert(get_osdmap()->is_up(*p));
+  fromosd = *p;
+  assert(fromosd >= 0);
+
+  dout(7) << "pull " << soid
+         << "v " << v
+         << " on osds " << *p
+         << " from osd." << fromosd
+         << dendl;
 
   assert(peer_missing.count(fromosd));
-  if (peer_missing[fromosd].is_missing(soid, v)) {
-    assert(peer_missing[fromosd].missing[soid].have != v);
+  const pg_missing_t &pmissing = peer_missing.find(fromosd)->second;
+  if (pmissing.is_missing(soid, v)) {
+    assert(pmissing.missing.find(soid)->second.have != v);
     dout(10) << "pulling soid " << soid << " from osd " << fromosd
-            << " at version " << peer_missing[fromosd].missing[soid].have
+            << " at version " << pmissing.missing.find(soid)->second.have
             << " rather than at version " << v << dendl;
-    v = peer_missing[fromosd].missing[soid].have;
-    assert(pg_log.get_log().objects.count(soid) &&
-          pg_log.get_log().objects.find(soid)->second->op == pg_log_entry_t::LOST_REVERT &&
-          pg_log.get_log().objects.find(soid)->second->reverting_to == v);
+    v = pmissing.missing.find(soid)->second.have;
+    assert(get_parent()->get_log().get_log().objects.count(soid) &&
+          (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
+           pg_log_entry_t::LOST_REVERT) &&
+          (get_parent()->get_log().get_log().objects.find(
+            soid)->second->reverting_to ==
+           v));
   }
   
-  dout(7) << "pull " << soid
-         << " v " << v 
-         << " on osds " << missing_loc[soid]
-         << " from osd." << fromosd
-         << dendl;
-
   ObjectRecoveryInfo recovery_info;
 
-  // is this a snapped object?  if so, consult the snapset.. we may not need the entire object!
-  if (soid.snap && soid.snap < CEPH_NOSNAP) {
-    // do we have the head and/or snapdir?
-    hobject_t head = soid;
-    head.snap = CEPH_NOSNAP;
-    if (pg_log.get_missing().is_missing(head)) {
-      if (pulling.count(head)) {
-       dout(10) << " missing but already pulling head " << head << dendl;
-       return PULL_NONE;
-      } else {
-       int r = prepare_pull(
-         head, pg_log.get_missing().missing.find(head)->second.need, priority,
-         pulls);
-       if (r != PULL_NONE)
-         return PULL_OTHER;
-       return PULL_NONE;
-      }
-    }
-    head.snap = CEPH_SNAPDIR;
-    if (pg_log.get_missing().is_missing(head)) {
-      if (pulling.count(head)) {
-       dout(10) << " missing but already pulling snapdir " << head << dendl;
-       return PULL_NONE;
-      } else {
-       int r = prepare_pull(
-         head, pg_log.get_missing().missing.find(head)->second.need, priority,
-         pulls);
-       if (r != PULL_NONE)
-         return PULL_OTHER;
-       return PULL_NONE;
-      }
-    }
-
+  if (soid.is_snap()) {
+    assert(!get_parent()->get_local_missing().is_missing(
+            soid.get_head()) ||
+          !get_parent()->get_local_missing().is_missing(
+            soid.get_snapdir()));
+    assert(headctx);
     // check snapset
-    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace());
+    SnapSetContext *ssc = headctx->ssc;
     assert(ssc);
     dout(10) << " snapset " << ssc->snapset << dendl;
-    calc_clone_subsets(ssc->snapset, soid, pg_log.get_missing(), info.last_backfill,
+    calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(),
+                      get_info().last_backfill,
                       recovery_info.copy_subset,
                       recovery_info.clone_subset);
-    put_snapset_context(ssc);
     // FIXME: this may overestimate if we are pulling multiple clones in parallel...
     dout(10) << " pulling " << recovery_info << dendl;
   } else {
@@ -5960,8 +5933,8 @@ int ReplicatedPG::prepare_pull(
     recovery_info.size = ((uint64_t)-1);
   }
 
-  (*pulls)[fromosd].push_back(PullOp());
-  PullOp &op = (*pulls)[fromosd].back();
+  h->pulls[fromosd].push_back(PullOp());
+  PullOp &op = h->pulls[fromosd].back();
   op.soid = soid;
 
   op.recovery_info = recovery_info;
@@ -5979,7 +5952,74 @@ int ReplicatedPG::prepare_pull(
   pi.recovery_progress = op.recovery_progress;
   pi.priority = priority;
 
+  // TODOSAM: do something??
+}
+
+int ReplicatedPG::recover_missing(
+  const hobject_t &soid, eversion_t v,
+  int priority,
+  PGBackend::RecoveryHandle *h)
+{
+  map<hobject_t,set<int> >::iterator q = missing_loc.find(soid);
+  if (q == missing_loc.end()) {
+    dout(7) << "pull " << soid
+           << " v " << v 
+           << " but it is unfound" << dendl;
+    return PULL_NONE;
+  }
+
+  // is this a snapped object?  if so, consult the snapset.. we may not need the entire object!
+  ObjectContextRef obc;
+  ObjectContextRef head_obc;
+  if (soid.snap && soid.snap < CEPH_NOSNAP) {
+    // do we have the head and/or snapdir?
+    hobject_t head = soid.get_head();
+    if (pg_log.get_missing().is_missing(head)) {
+      if (recovering.count(head)) {
+       dout(10) << " missing but already recovering head " << head << dendl;
+       return PULL_NONE;
+      } else {
+       int r = recover_missing(
+         head, pg_log.get_missing().missing.find(head)->second.need, priority,
+         h);
+       if (r != PULL_NONE)
+         return PULL_OTHER;
+       return PULL_NONE;
+      }
+    }
+    head = soid.get_snapdir();
+    if (pg_log.get_missing().is_missing(head)) {
+      if (recovering.count(head)) {
+       dout(10) << " missing but already recovering snapdir " << head << dendl;
+       return PULL_NONE;
+      } else {
+       int r = recover_missing(
+         head, pg_log.get_missing().missing.find(head)->second.need, priority,
+         h);
+       if (r != PULL_NONE)
+         return PULL_OTHER;
+       return PULL_NONE;
+      }
+    }
+
+    // we must have one or the other
+    head_obc = get_object_context(
+      soid.get_head(),
+      false,
+      0);
+    if (!head_obc)
+      head_obc = get_object_context(
+       soid.get_snapdir(),
+       false,
+       0);
+    assert(head_obc);
+  }
   start_recovery_op(soid);
+  pgbackend->recover_object(
+    soid,
+    head_obc,
+    obc,
+    h);
   return PULL_YES;
 }
 
@@ -6038,9 +6078,12 @@ void ReplicatedPG::prep_push_to_replica(
     SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace());
     assert(ssc);
     dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
+// TODOSAM: fix
+#if 0
     calc_clone_subsets(ssc->snapset, soid, peer_missing[peer],
                       peer_info[peer].last_backfill,
                       data_subset, clone_subsets);
+#endif
     put_snapset_context(ssc);
   } else if (soid.snap == CEPH_NOSNAP) {
     // pushing head or unversioned object.
@@ -6227,8 +6270,11 @@ ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recove
   new_info.copy_subset.clear();
   new_info.clone_subset.clear();
   assert(ssc);
+// TODOSAM: fix
+#if 0
   calc_clone_subsets(ssc->snapset, new_info.soid, pg_log.get_missing(), info.last_backfill,
                     new_info.copy_subset, new_info.clone_subset);
+#endif
   put_snapset_context(ssc);
   return new_info;
 }
@@ -7571,7 +7617,7 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
   int started = 0;
   int skipped = 0;
 
-  map<int, vector<PullOp> > pulls;
+  PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
   map<version_t, hobject_t>::const_iterator p =
     missing.rmissing.lower_bound(pg_log.get_log().last_requested);
   while (p != missing.rmissing.end()) {
@@ -7678,14 +7724,14 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
       }
     }
    
-    if (!pulling.count(soid)) {
-      if (pulling.count(head)) {
+    if (!recovering.count(soid)) {
+      if (recovering.count(head)) {
        ++skipped;
       } else if (unfound) {
        ++skipped;
       } else {
-       int r = prepare_pull(
-         soid, need, cct->_conf->osd_recovery_op_priority, &pulls);
+       int r = recover_missing(
+         soid, need, cct->_conf->osd_recovery_op_priority, h);
        switch (r) {
        case PULL_YES:
          ++started;
@@ -7708,8 +7754,7 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
       pg_log.set_last_requested(v);
   }
  
-  // TODOSAM: replace
-  //send_pulls(g_conf->osd_recovery_op_priority, pulls);
+  pgbackend->run_recovery_op(h, cct->_conf->osd_recovery_op_priority);
   return started;
 }
 
index 7d5013394bb3ce43f4b772c1687bf4b2f07a1362..fa32e8bf8706fa27a11c47698b4c6b7891bda12d 100644 (file)
@@ -543,6 +543,7 @@ protected:
     }
   };
   map<hobject_t, PullInfo> pulling;
+  set<hobject_t> recovering;
 
   ObjectRecoveryInfo recalc_subsets(const ObjectRecoveryInfo& recovery_info);
     
@@ -631,10 +632,6 @@ protected:
                         const hobject_t &last_backfill,
                         interval_set<uint64_t>& data_subset,
                         map<hobject_t, interval_set<uint64_t> >& clone_subsets);
-  void calc_clone_subsets(SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing,
-                         const hobject_t &last_backfill,
-                         interval_set<uint64_t>& data_subset,
-                         map<hobject_t, interval_set<uint64_t> >& clone_subsets);
   void prep_push_to_replica(
     ObjectContextRef obc,
     const hobject_t& oid,
@@ -658,11 +655,11 @@ protected:
   // Cancels/resets pulls from peer
   void check_recovery_sources(const OSDMapRef map);
 
-  int prepare_pull(
-    const hobject_t& oid, eversion_t v,
+  int recover_missing(
+    const hobject_t& oid,
+    eversion_t v,
     int priority,
-    map<int, vector<PullOp> > *pulls
-    );
+    PGBackend::RecoveryHandle *h);
 
   // low level ops