]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG/Backend: move prep_push and friends to ReplicatedBackend
authorSamuel Just <sam.just@inktank.com>
Mon, 9 Sep 2013 18:25:10 +0000 (11:25 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 26 Sep 2013 18:24:27 +0000 (11:24 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 10d743d95e46eaaa89aab69b81b4f0a85faecec1..6193f2e0e780d689d2e9d9498c2670623306ceef 100644 (file)
@@ -33,32 +33,37 @@ ReplicatedBackend::ReplicatedBackend(
   coll(coll), osd(osd), cct(osd->cct) {}
 
 void ReplicatedBackend::run_recovery_op(
-  PGBackend::RecoveryHandle *h,
+  PGBackend::RecoveryHandle *_h,
   int priority)
 {
+  RPGHandle *h = static_cast<RPGHandle *>(_h);
+  send_pushes(priority, h->pushes);
+  send_pulls(priority, h->pulls);
+  delete h;
 }
 
 void ReplicatedBackend::recover_object(
   const hobject_t &hoid,
   ObjectContextRef head,
   ObjectContextRef obc,
-  RecoveryHandle *h
+  RecoveryHandle *_h
   )
 {
-#if 0
-  op.recovery_progress.data_complete = false;
-  op.recovery_progress.omap_complete = false;
-  op.recovery_progress.data_recovered_to = 0;
-  op.recovery_progress.first = true;
-
-  assert(!pulling.count(soid));
-  pull_from_peer[fromosd].insert(soid);
-  PullInfo &pi = pulling[soid];
-  pi.recovery_info = op.recovery_info;
-  pi.recovery_progress = op.recovery_progress;
-  pi.priority = priority;
-#endif
-  dout(10) << __func__ << dendl;
+  dout(10) << __func__ << ": " << hoid << dendl;
+  RPGHandle *h = static_cast<RPGHandle *>(_h);
+  if (get_parent()->get_local_missing().is_missing(hoid)) {
+    assert(!obc);
+    // pull
+    prepare_pull(
+      hoid,
+      head,
+      h);
+    return;
+  } else {
+    assert(obc);
+    assert(head);
+    // TODOSAM: handle recovering replicas
+  }
 }
 
 void ReplicatedBackend::check_recovery_sources(const OSDMapRef osdmap)
index 44ff3bc62a8386d47d531d12860dd14b0faca21f..c57b10cbf1ce56ba5bb500f52c69d12949ecb7b8 100644 (file)
@@ -267,8 +267,28 @@ private:
   void prepare_pull(
     const hobject_t& soid,
     ObjectContextRef headctx,
-    int priority,
     RPGHandle *h);
+  void start_pushes(
+    const hobject_t &soid,
+    ObjectContextRef obj,
+    RPGHandle *h);
+  void prep_push_to_replica(
+    ObjectContextRef obc, const hobject_t& soid, int peer,
+    PushOp *pop);
+  void prep_push(ObjectContextRef obc,
+                const hobject_t& oid, int dest,
+                PushOp *op);
+  void prep_push(ObjectContextRef obc,
+                const hobject_t& soid, int peer,
+                eversion_t version,
+                interval_set<uint64_t> &data_subset,
+                map<hobject_t, interval_set<uint64_t> >& clone_subsets,
+                PushOp *op);
+  void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
+                        const pg_missing_t& missing,
+                        const hobject_t &last_backfill,
+                        interval_set<uint64_t>& data_subset,
+                        map<hobject_t, interval_set<uint64_t> >& clone_subsets);
 };
 
 #endif
index 3a7aa14f53c27e86c7266cb29bfb348940912c8b..bf514671dcece830ecc5e9c89b9b5ea309dcb2ea 100644 (file)
@@ -230,6 +230,7 @@ void ReplicatedPG::begin_peer_recover(
   int peer,
   const hobject_t soid)
 {
+  peer_missing[peer].revise_have(soid, eversion_t());
 }
 
 // =======================
@@ -337,10 +338,9 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef
        break;
       }
     }
-    map<int, vector<PushOp> > pushes;
-    prep_object_replica_pushes(soid, v, cct->_conf->osd_client_op_priority, &pushes);
-    // TODOSAM: replace
-    //send_pushes(g_conf->osd_client_op_priority, pushes);
+    PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
+    prep_object_replica_pushes(soid, v, h);
+    pgbackend->run_recovery_op(h, cct->_conf->osd_client_op_priority);
   }
   waiting_for_degraded_object[soid].push_back(op);
   op->mark_delayed("waiting for degraded object");
@@ -5717,11 +5717,12 @@ void ReplicatedPG::sub_op_modify_reply(OpRequestRef op)
 
 // ===========================================================
 
-void ReplicatedPG::calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
-                                    pg_missing_t& missing,
-                                    const hobject_t &last_backfill,
-                                    interval_set<uint64_t>& data_subset,
-                                    map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+void ReplicatedBackend::calc_head_subsets(
+  ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
+  const pg_missing_t& missing,
+  const hobject_t &last_backfill,
+  interval_set<uint64_t>& data_subset,
+  map<hobject_t, interval_set<uint64_t> >& clone_subsets)
 {
   dout(10) << "calc_head_subsets " << head
           << " clone_overlap " << snapset.clone_overlap << dendl;
@@ -5864,7 +5865,6 @@ enum { PULL_NONE, PULL_OTHER, PULL_YES };
 void ReplicatedBackend::prepare_pull(
   const hobject_t& soid,
   ObjectContextRef headctx,
-  int priority,
   RPGHandle *h)
 {
   assert(get_parent()->get_local_missing().missing.count(soid));
@@ -5951,9 +5951,6 @@ void ReplicatedBackend::prepare_pull(
   PullInfo &pi = pulling[soid];
   pi.recovery_info = op.recovery_info;
   pi.recovery_progress = op.recovery_progress;
-  pi.priority = priority;
-
-  // TODOSAM: do something??
 }
 
 int ReplicatedPG::recover_missing(
@@ -6046,9 +6043,8 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
  * intelligently push an object to a replica.  make use of existing
  * clones/heads and dup data ranges where possible.
  */
-void ReplicatedPG::prep_push_to_replica(
+void ReplicatedBackend::prep_push_to_replica(
   ObjectContextRef obc, const hobject_t& soid, int peer,
-  int prio,
   PushOp *pop)
 {
   const object_info_t& oi = obc->obs.oi;
@@ -6067,44 +6063,48 @@ void ReplicatedPG::prep_push_to_replica(
 
     // try to base push off of clones that succeed/preceed poid
     // we need the head (and current SnapSet) locally to do that.
-    if (pg_log.get_missing().is_missing(head)) {
+    if (get_parent()->get_local_missing().is_missing(head)) {
       dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
-      return prep_push(prio, obc, soid, peer, pop);
+      return prep_push(obc, soid, peer, pop);
     }
     hobject_t snapdir = head;
     snapdir.snap = CEPH_SNAPDIR;
-    if (pg_log.get_missing().is_missing(snapdir)) {
-      dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl;
-      return prep_push(prio, obc, soid, peer, pop);
+    if (get_parent()->get_local_missing().is_missing(snapdir)) {
+      dout(15) << "push_to_replica missing snapdir " << snapdir
+              << ", pushing raw clone" << dendl;
+      return prep_push(obc, soid, peer, pop);
     }
     
-    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace());
+    SnapSetContext *ssc = obc->ssc;
     assert(ssc);
     dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
-// TODOSAM: fix
-#if 0
-    calc_clone_subsets(ssc->snapset, soid, peer_missing[peer],
-                      peer_info[peer].last_backfill,
+    map<int, pg_missing_t>::const_iterator pm =
+      get_parent()->get_peer_missing().find(peer);
+    assert(pm != get_parent()->get_peer_missing().end());
+    map<int, pg_info_t>::const_iterator pi =
+      get_parent()->get_peer_info().find(peer);
+    assert(pi != get_parent()->get_peer_info().end());
+    calc_clone_subsets(ssc->snapset, soid,
+                      pm->second,
+                      pi->second.last_backfill,
                       data_subset, clone_subsets);
-#endif
-    put_snapset_context(ssc);
   } else if (soid.snap == CEPH_NOSNAP) {
     // pushing head or unversioned object.
     // base this on partially on replica's clones?
-    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace());
+    SnapSetContext *ssc = obc->ssc;
     assert(ssc);
     dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
-    calc_head_subsets(obc, ssc->snapset, soid, peer_missing[peer],
-                     peer_info[peer].last_backfill,
-                     data_subset, clone_subsets);
-    put_snapset_context(ssc);
+    calc_head_subsets(
+      obc,
+      ssc->snapset, soid, get_parent()->get_peer_missing().find(peer)->second,
+      get_parent()->get_peer_info().find(peer)->second.last_backfill,
+      data_subset, clone_subsets);
   }
 
-  prep_push(prio, obc, soid, peer, oi.version, data_subset, clone_subsets, pop);
+  prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop);
 }
 
-void ReplicatedPG::prep_push(int prio,
-                            ObjectContextRef obc,
+void ReplicatedBackend::prep_push(ObjectContextRef obc,
                             const hobject_t& soid, int peer,
                             PushOp *pop)
 {
@@ -6113,13 +6113,12 @@ void ReplicatedPG::prep_push(int prio,
     data_subset.insert(0, obc->obs.oi.size);
   map<hobject_t, interval_set<uint64_t> > clone_subsets;
 
-  prep_push(prio, obc, soid, peer,
+  prep_push(obc, soid, peer,
            obc->obs.oi.version, data_subset, clone_subsets,
            pop);
 }
 
-void ReplicatedPG::prep_push(
-  int prio,
+void ReplicatedBackend::prep_push(
   ObjectContextRef obc,
   const hobject_t& soid, int peer,
   eversion_t version,
@@ -6127,7 +6126,7 @@ void ReplicatedPG::prep_push(
   map<hobject_t, interval_set<uint64_t> >& clone_subsets,
   PushOp *pop)
 {
-  peer_missing[peer].revise_have(soid, eversion_t());
+  get_parent()->begin_peer_recover(peer, soid);
   // take note.
   PushInfo &pi = pushing[soid][peer];
   pi.recovery_info.size = obc->obs.oi.size;
@@ -6140,16 +6139,13 @@ void ReplicatedPG::prep_push(
   pi.recovery_progress.data_recovered_to = 0;
   pi.recovery_progress.data_complete = 0;
   pi.recovery_progress.omap_complete = 0;
-  pi.priority = prio;
-// TODOSAM: replace
-#if 0
+
   ObjectRecoveryProgress new_progress;
   build_push_op(pi.recovery_info,
                pi.recovery_progress,
                &new_progress,
                pop);
   pi.recovery_progress = new_progress;
-#endif
 }
 
 int ReplicatedBackend::send_pull_legacy(int prio, int peer,
@@ -7750,8 +7746,8 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
 }
 
 int ReplicatedPG::prep_object_replica_pushes(
-  const hobject_t& soid, eversion_t v, int prio,
-  map<int, vector<PushOp> > *pushes)
+  const hobject_t& soid, eversion_t v,
+  PGBackend::RecoveryHandle *h)
 {
   dout(10) << __func__ << ": on " << soid << dendl;
 
@@ -7778,30 +7774,37 @@ int ReplicatedPG::prep_object_replica_pushes(
     return 0;
   }
 
-  dout(10) << " ondisk_read_lock for " << soid << dendl;
-  obc->ondisk_read_lock();
-  
+  start_recovery_op(soid);
+  assert(!recovering.count(soid));
+  recovering.insert(soid);
+  pgbackend->recover_object(
+    soid,
+    ObjectContextRef(),
+    obc, // has snapset context
+    h);
+  return 1;
+}
+
+void ReplicatedBackend::start_pushes(
+  const hobject_t &soid,
+  ObjectContextRef obc,
+  RPGHandle *h)
+{
   // who needs it?  
-  bool started = false;
-  for (unsigned i=1; i<acting.size(); i++) {
-    int peer = acting[i];
-    if (peer_missing.count(peer) &&
-       peer_missing[peer].is_missing(soid)) {
-      if (!started) {
-       start_recovery_op(soid);
-       started = true;
-      }
-      (*pushes)[peer].push_back(PushOp());
-      prep_push_to_replica(obc, soid, peer, prio,
-                     &((*pushes)[peer].back())
+  for (unsigned i=1; i<get_parent()->get_acting().size(); i++) {
+    int peer = get_parent()->get_acting()[i];
+    map<int, pg_missing_t>::const_iterator j =
+      get_parent()->get_peer_missing().find(peer);
+    assert(j != get_parent()->get_peer_missing().end());
+    if (j->second.is_missing(soid)) {
+      h->pushes[peer].push_back(PushOp());
+      prep_push_to_replica(obc, soid, peer,
+                          &(h->pushes[peer].back())
        );
     }
   }
   
   dout(10) << " ondisk_read_unlock on " << soid << dendl;
-  obc->ondisk_read_unlock();
-
-  return 1;
 }
 
 int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
@@ -7809,7 +7812,7 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
   dout(10) << __func__ << "(" << max << ")" << dendl;
   int started = 0;
 
-  map<int, vector<PushOp> > pushes;
+  PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
 
   // this is FAR from an optimal recovery order.  pretty lame, really.
   for (unsigned i=1; i<acting.size(); i++) {
@@ -7845,14 +7848,11 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
       dout(10) << __func__ << ": recover_object_replicas(" << soid << ")" << dendl;
       map<hobject_t,pg_missing_t::item>::const_iterator r = m.missing.find(soid);
       started += prep_object_replica_pushes(soid, r->second.need,
-                                           cct->_conf->osd_recovery_op_priority,
-                                           &pushes);
+                                           h);
     }
   }
 
-  // TODOSAM: replace
-  //send_pushes(g_conf->osd_recovery_op_priority, pushes);
-
+  pgbackend->run_recovery_op(h, cct->_conf->osd_recovery_op_priority);
   return started;
 }
 
@@ -8079,11 +8079,14 @@ void ReplicatedPG::prep_backfill_object_push(
   if (!recovering.count(oid))
     start_recovery_op(oid);
   ObjectContextRef obc = get_object_context(oid, false);
+// TODOSAM: fix
+#if 0
   obc->ondisk_read_lock();
   (*pushes)[peer].push_back(PushOp());
   prep_push_to_replica(obc, oid, peer, cct->_conf->osd_recovery_op_priority,
                       &((*pushes)[peer].back()));
   obc->ondisk_read_unlock();
+#endif
 }
 
 void ReplicatedPG::scan_range(
index 11e7153c0a33b2be92d224942fdb2c283eeac90b..8b2be5ecdb8d388a7680bc91380b70b4436892bd 100644 (file)
@@ -578,30 +578,7 @@ protected:
   hobject_t backfill_pos;
 
   int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
-                                int priority,
-                                map<int, vector<PushOp> > *pushes);
-  void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
-                        pg_missing_t& missing,
-                        const hobject_t &last_backfill,
-                        interval_set<uint64_t>& data_subset,
-                        map<hobject_t, interval_set<uint64_t> >& clone_subsets);
-  void prep_push_to_replica(
-    ObjectContextRef obc,
-    const hobject_t& oid,
-    int dest,
-    int priority,
-    PushOp *push_op);
-  void prep_push(int priority,
-                ObjectContextRef obc,
-                const hobject_t& oid, int dest,
-                PushOp *op);
-  void prep_push(int priority,
-                ObjectContextRef obc,
-                const hobject_t& soid, int peer,
-                eversion_t version,
-                interval_set<uint64_t> &data_subset,
-                map<hobject_t, interval_set<uint64_t> >& clone_subsets,
-                PushOp *op);
+                                PGBackend::RecoveryHandle *h);
 
   void finish_degraded_object(const hobject_t& oid);