]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: send pulls en masse in recover_primary
authorSamuel Just <sam.just@inktank.com>
Fri, 14 Jun 2013 21:58:39 +0000 (14:58 -0700)
committerSamuel Just <sam.just@inktank.com>
Mon, 8 Jul 2013 23:43:31 +0000 (16:43 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 7d44e56ba9d61b0e67da4b5fdec5d96318bca0f7..fa5fa36ffecf35159077446587606676dc7e59af 100644 (file)
@@ -119,7 +119,9 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef o
   }
   else {
     dout(7) << "missing " << soid << " v " << v << ", pulling." << dendl;
-    pull(soid, v, g_conf->osd_client_op_priority);
+    map<int, vector<PullOp> > pulls;
+    prepare_pull(soid, v, g_conf->osd_client_op_priority, &pulls);
+    send_pulls(g_conf->osd_client_op_priority, pulls);
   }
   waiting_for_missing_object[soid].push_back(op);
   op->mark_delayed("waiting for missing object");
@@ -5127,9 +5129,10 @@ void ReplicatedPG::calc_clone_subsets(SnapSet& snapset, const hobject_t& soid,
  */
 enum { PULL_NONE, PULL_OTHER, PULL_YES };
 
-int ReplicatedPG::pull(
+int ReplicatedPG::prepare_pull(
   const hobject_t& soid, eversion_t v,
-  int priority)
+  int priority,
+  map<int, vector<PullOp> > *pulls)
 {
   int fromosd = -1;
   map<hobject_t,set<int> >::iterator q = missing_loc.find(soid);
@@ -5184,7 +5187,9 @@ int ReplicatedPG::pull(
        dout(10) << " missing but already pulling head " << head << dendl;
        return PULL_NONE;
       } else {
-       int r = pull(head, pg_log.get_missing().missing.find(head)->second.need, priority);
+       int r = prepare_pull(
+         head, pg_log.get_missing().missing.find(head)->second.need, priority,
+         pulls);
        if (r != PULL_NONE)
          return PULL_OTHER;
        return PULL_NONE;
@@ -5196,7 +5201,9 @@ int ReplicatedPG::pull(
        dout(10) << " missing but already pulling snapdir " << head << dendl;
        return PULL_NONE;
       } else {
-       int r = pull(head, pg_log.get_missing().missing.find(head)->second.need, priority);
+       int r = prepare_pull(
+         head, pg_log.get_missing().missing.find(head)->second.need, priority,
+         pulls);
        if (r != PULL_NONE)
          return PULL_OTHER;
        return PULL_NONE;
@@ -5220,21 +5227,24 @@ int ReplicatedPG::pull(
     recovery_info.size = ((uint64_t)-1);
   }
 
+  (*pulls)[fromosd].push_back(PullOp());
+  PullOp &op = (*pulls)[fromosd].back();
+  op.soid = soid;
+
+  op.recovery_info = recovery_info;
+  op.recovery_info.soid = soid;
+  op.recovery_info.version = v;
+  op.recovery_progress.data_complete = false;
+  op.recovery_progress.omap_complete = false;
+  op.recovery_progress.data_recovered_to = 0;
+  op.recovery_progress.first = true;
 
-  recovery_info.soid = soid;
-  recovery_info.version = v;
-  ObjectRecoveryProgress progress;
-  progress.data_complete = false;
-  progress.omap_complete = false;
-  progress.data_recovered_to = 0;
-  progress.first = true;
   assert(!pulling.count(soid));
   pull_from_peer[fromosd].insert(soid);
   PullInfo &pi = pulling[soid];
-  pi.recovery_info = recovery_info;
-  pi.recovery_progress = progress;
+  pi.recovery_info = op.recovery_info;
+  pi.recovery_progress = op.recovery_progress;
   pi.priority = priority;
-  send_pull(priority, fromosd, recovery_info, progress);
 
   start_recovery_op(soid);
   return PULL_YES;
@@ -5713,6 +5723,23 @@ void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
   }
 }
 
+void ReplicatedPG::send_pulls(int prio, map<int, vector<PullOp> > &pulls)
+{
+  for (map<int, vector<PullOp> >::iterator i = pulls.begin();
+       i != pulls.end();
+       ++i) {
+    for (vector<PullOp>::iterator j = i->second.begin();
+        j != i->second.end();
+        ++j) {
+      send_pull(
+       prio,
+       i->first,
+       j->recovery_info,
+       j->recovery_progress);
+    }
+  }
+}
+
 int ReplicatedPG::send_push(int prio, int peer,
                            const ObjectRecoveryInfo &recovery_info,
                            const ObjectRecoveryProgress &progress,
@@ -6798,6 +6825,7 @@ int ReplicatedPG::recover_primary(int max)
   int started = 0;
   int skipped = 0;
 
+  map<int, vector<PullOp> > pulls;
   map<version_t, hobject_t>::const_iterator p =
     missing.rmissing.lower_bound(pg_log.get_log().last_requested);
   while (p != missing.rmissing.end()) {
@@ -6913,7 +6941,8 @@ int ReplicatedPG::recover_primary(int max)
       } else if (unfound) {
        ++skipped;
       } else {
-       int r = pull(soid, need, g_conf->osd_recovery_op_priority);
+       int r = prepare_pull(
+         soid, need, g_conf->osd_recovery_op_priority, &pulls);
        switch (r) {
        case PULL_YES:
          ++started;
@@ -6927,7 +6956,7 @@ int ReplicatedPG::recover_primary(int max)
          assert(0);
        }
        if (started >= max)
-         return started;
+         break;
       }
     }
     
@@ -6936,6 +6965,7 @@ int ReplicatedPG::recover_primary(int max)
       pg_log.set_last_requested(v);
   }
 
+  send_pulls(g_conf->osd_recovery_op_priority, pulls);
   return started;
 }
 
index 6121227c9b884b9b0da5dcfaf1761e202a1c5ced..dd2ba7f3113668e52ed3af241e36cada63c02555 100644 (file)
@@ -723,9 +723,15 @@ protected:
 
   // Cancels/resets pulls from peer
   void check_recovery_sources(const OSDMapRef map);
-  int pull(
+
+  void send_pulls(
+    int priority,
+    map<int, vector<PullOp> > &pulls);
+  int prepare_pull(
     const hobject_t& oid, eversion_t v,
-    int priority);
+    int priority,
+    map<int, vector<PullOp> > *pulls
+    );
 
   // low level ops