]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: send pushes en mass in recover_replicas, recover_backfill
authorSamuel Just <sam.just@inktank.com>
Fri, 14 Jun 2013 20:44:34 +0000 (13:44 -0700)
committerSamuel Just <sam.just@inktank.com>
Mon, 8 Jul 2013 23:43:31 +0000 (16:43 -0700)
This way, the pushes might be later merged into a smaller number of
messages.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 237b80f119a2844f0b2e173a64d125c673e40247..7d44e56ba9d61b0e67da4b5fdec5d96318bca0f7 100644 (file)
@@ -175,7 +175,9 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef
        break;
       }
     }
-    recover_object_replicas(soid, v, g_conf->osd_client_op_priority);
+    map<int, vector<PushOp> > pushes;
+    prep_object_replica_pushes(soid, v, g_conf->osd_client_op_priority, &pushes);
+    send_pushes(g_conf->osd_client_op_priority, pushes);
   }
   waiting_for_degraded_object[soid].push_back(op);
   op->mark_delayed("waiting for degraded object");
@@ -5258,14 +5260,16 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
  * intelligently push an object to a replica.  make use of existing
  * clones/heads and dup data ranges where possible.
  */
-void ReplicatedPG::push_to_replica(
-                                  ObjectContext *obc, const hobject_t& soid, int peer,
-                                  int prio)
+void ReplicatedPG::prep_push_to_replica(
+  ObjectContext *obc, const hobject_t& soid, int peer,
+  int prio,
+  PushOp *pop)
 {
   const object_info_t& oi = obc->obs.oi;
   uint64_t size = obc->obs.oi.size;
 
-  dout(10) << "push_to_replica " << soid << " v" << oi.version << " size " << size << " to osd." << peer << dendl;
+  dout(10) << __func__ << soid << " v" << oi.version
+          << " size " << size << " to osd." << peer << dendl;
 
   map<hobject_t, interval_set<uint64_t> > clone_subsets;
   interval_set<uint64_t> data_subset;
@@ -5279,13 +5283,13 @@ void ReplicatedPG::push_to_replica(
     // we need the head (and current SnapSet) locally to do that.
     if (pg_log.get_missing().is_missing(head)) {
       dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
-      return push_start(prio, obc, soid, peer);
+      return prep_push(prio, obc, soid, peer, pop);
     }
     hobject_t snapdir = head;
     snapdir.snap = CEPH_SNAPDIR;
     if (pg_log.get_missing().is_missing(snapdir)) {
       dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl;
-      return push_start(prio, obc, soid, peer);
+      return prep_push(prio, obc, soid, peer, pop);
     }
     
     SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
@@ -5307,29 +5311,32 @@ void ReplicatedPG::push_to_replica(
     put_snapset_context(ssc);
   }
 
-  push_start(prio, obc, soid, peer, oi.version, data_subset, clone_subsets);
+  prep_push(prio, obc, soid, peer, oi.version, data_subset, clone_subsets, pop);
 }
 
-void ReplicatedPG::push_start(int prio,
-                             ObjectContext *obc,
-                             const hobject_t& soid, int peer)
+void ReplicatedPG::prep_push(int prio,
+                            ObjectContext *obc,
+                            const hobject_t& soid, int peer,
+                            PushOp *pop)
 {
   interval_set<uint64_t> data_subset;
   if (obc->obs.oi.size)
     data_subset.insert(0, obc->obs.oi.size);
   map<hobject_t, interval_set<uint64_t> > clone_subsets;
 
-  push_start(prio, obc, soid, peer,
-            obc->obs.oi.version, data_subset, clone_subsets);
+  prep_push(prio, obc, soid, peer,
+           obc->obs.oi.version, data_subset, clone_subsets,
+           pop);
 }
 
-void ReplicatedPG::push_start(
+void ReplicatedPG::prep_push(
   int prio,
   ObjectContext *obc,
   const hobject_t& soid, int peer,
   eversion_t version,
   interval_set<uint64_t> &data_subset,
-  map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+  map<hobject_t, interval_set<uint64_t> >& clone_subsets,
+  PushOp *pop)
 {
   peer_missing[peer].revise_have(soid, eversion_t());
   // take note.
@@ -5347,9 +5354,10 @@ void ReplicatedPG::push_start(
   pi.priority = prio;
 
   ObjectRecoveryProgress new_progress;
-  send_push(pi.priority,
-           peer, pi.recovery_info,
-           pi.recovery_progress, &new_progress);
+  build_push_op(pi.recovery_info,
+               pi.recovery_progress,
+               &new_progress,
+               pop);
   pi.recovery_progress = new_progress;
 }
 
@@ -5692,6 +5700,19 @@ void ReplicatedPG::handle_push(
       info.last_complete));
 }
 
+void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
+{
+  for (map<int, vector<PushOp> >::iterator i = pushes.begin();
+       i != pushes.end();
+       ++i) {
+    for (vector<PushOp>::iterator j = i->second.begin();
+        j != i->second.end();
+        ++j) {
+      send_push_op(prio, i->first, *j);
+    }
+  }
+}
+
 int ReplicatedPG::send_push(int prio, int peer,
                            const ObjectRecoveryInfo &recovery_info,
                            const ObjectRecoveryProgress &progress,
@@ -6918,10 +6939,11 @@ int ReplicatedPG::recover_primary(int max)
   return started;
 }
 
-int ReplicatedPG::recover_object_replicas(
-  const hobject_t& soid, eversion_t v, int prio)
+int ReplicatedPG::prep_object_replica_pushes(
+  const hobject_t& soid, eversion_t v, int prio,
+  map<int, vector<PushOp> > *pushes)
 {
-  dout(10) << "recover_object_replicas " << soid << dendl;
+  dout(10) << __func__ << ": on " << soid << dendl;
 
   // NOTE: we know we will get a valid oloc off of disk here.
   ObjectContext *obc = get_object_context(soid, OLOC_BLANK, false);
@@ -6959,7 +6981,10 @@ int ReplicatedPG::recover_object_replicas(
        start_recovery_op(soid);
        started = true;
       }
-      push_to_replica(obc, soid, peer, prio);
+      (*pushes)[peer].push_back(PushOp());
+      prep_push_to_replica(obc, soid, peer, prio,
+                     &((*pushes)[peer].back())
+       );
     }
   }
   
@@ -6975,6 +7000,8 @@ int ReplicatedPG::recover_replicas(int max)
   dout(10) << __func__ << "(" << max << ")" << dendl;
   int started = 0;
 
+  map<int, vector<PushOp> > pushes;
+
   // this is FAR from an optimal recovery order.  pretty lame, really.
   for (unsigned i=1; i<acting.size(); i++) {
     int peer = acting[i];
@@ -7007,11 +7034,14 @@ int ReplicatedPG::recover_replicas(int max)
 
       dout(10) << __func__ << ": recover_object_replicas(" << soid << ")" << dendl;
       map<hobject_t,pg_missing_t::item>::const_iterator r = m.missing.find(soid);
-      started += recover_object_replicas(soid, r->second.need,
-                                        g_conf->osd_recovery_op_priority);
+      started += prep_object_replica_pushes(soid, r->second.need,
+                                           g_conf->osd_recovery_op_priority,
+                                           &pushes);
     }
   }
 
+  send_pushes(g_conf->osd_recovery_op_priority, pushes);
+
   return started;
 }
 
@@ -7170,11 +7200,15 @@ int ReplicatedPG::recover_backfill(int max)
        ++i) {
     send_remove_op(i->first, i->second, backfill_target);
   }
+
+  map<int, vector<PushOp> > pushes;
   for (map<hobject_t, pair<eversion_t, eversion_t> >::iterator i = to_push.begin();
        i != to_push.end();
        ++i) {
-    push_backfill_object(i->first, i->second.first, i->second.second, backfill_target);
+    prep_backfill_object_push(
+      i->first, i->second.first, i->second.second, backfill_target, &pushes);
   }
+  send_pushes(g_conf->osd_recovery_op_priority, pushes);
 
   release_waiting_for_backfill_pos();
   dout(5) << "backfill_pos is " << backfill_pos << " and pinfo.last_backfill is "
@@ -7218,7 +7252,9 @@ int ReplicatedPG::recover_backfill(int max)
   return ops;
 }
 
-void ReplicatedPG::push_backfill_object(hobject_t oid, eversion_t v, eversion_t have, int peer)
+void ReplicatedPG::prep_backfill_object_push(
+  hobject_t oid, eversion_t v, eversion_t have, int peer,
+  map<int, vector<PushOp> > *pushes)
 {
   dout(10) << "push_backfill_object " << oid << " v " << v << " to osd." << peer << dendl;
 
@@ -7228,7 +7264,9 @@ void ReplicatedPG::push_backfill_object(hobject_t oid, eversion_t v, eversion_t
     start_recovery_op(oid);
   ObjectContext *obc = get_object_context(oid, OLOC_BLANK, false);
   obc->ondisk_read_lock();
-  push_to_replica(obc, oid, peer, g_conf->osd_recovery_op_priority);
+  (*pushes)[peer].push_back(PushOp());
+  prep_push_to_replica(obc, oid, peer, g_conf->osd_recovery_op_priority,
+                      &((*pushes)[peer].back()));
   obc->ondisk_read_unlock();
   put_object_context(obc);
 }
index a20ebad9cff10ca1fc21c74b05f18351227c3d7f..6121227c9b884b9b0da5dcfaf1761e202a1c5ced 100644 (file)
@@ -562,6 +562,7 @@ protected:
   void handle_push(
     int from, PushOp &op, PushReplyOp *response,
     ObjectStore::Transaction *t);
+  void send_pushes(int prio, map<int, vector<PushOp> > &pushes);
   int send_push(int priority, int peer,
                const ObjectRecoveryInfo& recovery_info,
                const ObjectRecoveryProgress &progress,
@@ -687,8 +688,9 @@ protected:
   // Reverse mapping from osd peer to objects beging pulled from that peer
   map<int, set<hobject_t> > pull_from_peer;
 
-  int recover_object_replicas(const hobject_t& soid, eversion_t v,
-                             int priority);
+  int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
+                                int priority,
+                                map<int, vector<PushOp> > *pushes);
   void calc_head_subsets(ObjectContext *obc, SnapSet& snapset, const hobject_t& head,
                         pg_missing_t& missing,
                         const hobject_t &last_backfill,
@@ -698,20 +700,23 @@ protected:
                          const hobject_t &last_backfill,
                          interval_set<uint64_t>& data_subset,
                          map<hobject_t, interval_set<uint64_t> >& clone_subsets);
-  void push_to_replica(
+  void prep_push_to_replica(
     ObjectContext *obc,
     const hobject_t& oid,
     int dest,
-    int priority);
-  void push_start(int priority,
-                 ObjectContext *obc,
-                 const hobject_t& oid, int dest);
-  void push_start(int priority,
-                 ObjectContext *obc,
-                 const hobject_t& soid, int peer,
-                 eversion_t version,
-                 interval_set<uint64_t> &data_subset,
-                 map<hobject_t, interval_set<uint64_t> >& clone_subsets);
+    int priority,
+    PushOp *push_op);
+  void prep_push(int priority,
+                ObjectContext *obc,
+                const hobject_t& oid, int dest,
+                PushOp *op);
+  void prep_push(int priority,
+                ObjectContext *obc,
+                const hobject_t& soid, int peer,
+                eversion_t version,
+                interval_set<uint64_t> &data_subset,
+                map<hobject_t, interval_set<uint64_t> >& clone_subsets,
+                PushOp *op);
   void send_push_op_blank(const hobject_t& soid, int peer);
 
   void finish_degraded_object(const hobject_t& oid);
@@ -758,7 +763,9 @@ protected:
    */
   void scan_range(hobject_t begin, int min, int max, BackfillInterval *bi);
 
-  void push_backfill_object(hobject_t oid, eversion_t v, eversion_t have, int peer);
+  void prep_backfill_object_push(
+    hobject_t oid, eversion_t v, eversion_t have, int peer,
+    map<int, vector<PushOp> > *pushes);
   void send_remove_op(const hobject_t& oid, eversion_t v, int peer);