]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: send compound messages to enlightened peers
authorSamuel Just <sam.just@inktank.com>
Wed, 19 Jun 2013 20:26:50 +0000 (13:26 -0700)
committerSamuel Just <sam.just@inktank.com>
Mon, 8 Jul 2013 23:43:32 +0000 (16:43 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/common/config_opts.h
src/include/ceph_features.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 15e65e3bf98a50cfab17c34728a8998c352198e3..97c7ba6050b8f43c567de70f0fcd6934e6a561eb 100644 (file)
@@ -426,6 +426,8 @@ OPTION(osd_recovery_delay_start, OPT_FLOAT, 0)
 OPTION(osd_recovery_max_active, OPT_INT, 5)
 OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20)  // max size of push chunk
 OPTION(osd_push_per_object_cost, OPT_U64, 1000)  // push cost per object
+OPTION(osd_max_push_cost, OPT_U64, 8<<20)  // max size of push message
+OPTION(osd_max_push_objects, OPT_U64, 10)  // max objects in single push op
 OPTION(osd_recovery_forget_lost_objects, OPT_BOOL, false)   // off for now
 OPTION(osd_max_scrubs, OPT_INT, 1)
 OPTION(osd_scrub_load_threshold, OPT_FLOAT, 0.5)
index b5a01c5647df4d558be1ddeaee7bc463fa64eda4..579dcb3c02304957466f7a2b3c8c950f9c1082ff 100644 (file)
@@ -37,6 +37,7 @@
 #define CEPH_FEATURE_OSDHASHPSPOOL  (1ULL<<30)
 #define CEPH_FEATURE_MON_SINGLE_PAXOS (1ULL<<31)
 #define CEPH_FEATURE_OSD_SNAPMAPPER (1ULL<<32)
+#define CEPH_FEATURE_OSD_PACKED_RECOVERY (1ULL<<33)
 
 /*
  * Features supported.  Should be everything above.
@@ -74,7 +75,8 @@
         CEPH_FEATURE_MDSENC |                  \
         CEPH_FEATURE_OSDHASHPSPOOL |       \
         CEPH_FEATURE_MON_SINGLE_PAXOS |    \
-   CEPH_FEATURE_OSD_SNAPMAPPER)
+        CEPH_FEATURE_OSD_SNAPMAPPER |  \
+        CEPH_FEATURE_OSD_PACKED_RECOVERY)
 
 #define CEPH_FEATURES_SUPPORTED_DEFAULT  CEPH_FEATURES_ALL
 
index a297f5df17118b25774a7fe993be30467fbcdabb..f150ccc92ff97dea4e954426d2a404cf595e17ab 100644 (file)
@@ -5472,7 +5472,7 @@ void ReplicatedPG::prep_push(
   pi.recovery_progress = new_progress;
 }
 
-int ReplicatedPG::send_pull(int prio, int peer,
+int ReplicatedPG::send_pull_legacy(int prio, int peer,
                            const ObjectRecoveryInfo &recovery_info,
                            ObjectRecoveryProgress progress)
 {
@@ -5816,10 +5816,42 @@ void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
   for (map<int, vector<PushOp> >::iterator i = pushes.begin();
        i != pushes.end();
        ++i) {
-    for (vector<PushOp>::iterator j = i->second.begin();
-        j != i->second.end();
-        ++j) {
-      send_push_op(prio, i->first, *j);
+    ConnectionRef con = osd->get_con_osd_cluster(
+      i->first,
+      get_osdmap()->get_epoch());
+    if (!con)
+      continue;
+    if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
+      for (vector<PushOp>::iterator j = i->second.begin();
+          j != i->second.end();
+          ++j) {
+       dout(20) << __func__ << ": sending push (legacy) " << *j
+                << " to osd." << i->first << dendl;
+       send_push_op_legacy(prio, i->first, *j);
+      }
+    } else {
+      vector<PushOp>::iterator j = i->second.begin();
+      while (j != i->second.end()) {
+       uint64_t cost = 0;
+       uint64_t pushes = 0;
+       MOSDPGPush *msg = new MOSDPGPush();
+       msg->pgid = info.pgid;
+       msg->map_epoch = get_osdmap()->get_epoch();
+       msg->set_priority(prio);
+       for (;
+            (j != i->second.end() &&
+             cost < g_conf->osd_max_push_cost &&
+             pushes < g_conf->osd_max_push_objects) ;
+            ++j) {
+         dout(20) << __func__ << ": sending push " << *j
+                  << " to osd." << i->first << dendl;
+         cost += j->cost(g_ceph_context);
+         pushes += 1;
+         msg->pushes.push_back(*j);
+       }
+       msg->compute_cost(g_ceph_context);
+       osd->send_message_osd_cluster(msg, con);
+      }
     }
   }
 }
@@ -5829,14 +5861,33 @@ void ReplicatedPG::send_pulls(int prio, map<int, vector<PullOp> > &pulls)
   for (map<int, vector<PullOp> >::iterator i = pulls.begin();
        i != pulls.end();
        ++i) {
-    for (vector<PullOp>::iterator j = i->second.begin();
-        j != i->second.end();
-        ++j) {
-      send_pull(
-       prio,
-       i->first,
-       j->recovery_info,
-       j->recovery_progress);
+    ConnectionRef con = osd->get_con_osd_cluster(
+      i->first,
+      get_osdmap()->get_epoch());
+    if (!con)
+      continue;
+    if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
+      for (vector<PullOp>::iterator j = i->second.begin();
+          j != i->second.end();
+          ++j) {
+       dout(20) << __func__ << ": sending pull (legacy) " << *j
+                << " to osd." << i->first << dendl;
+       send_pull_legacy(
+         prio,
+         i->first,
+         j->recovery_info,
+         j->recovery_progress);
+      }
+    } else {
+      dout(20) << __func__ << ": sending pulls " << i->second
+              << " to osd." << i->first << dendl;
+      MOSDPGPull *msg = new MOSDPGPull();
+      msg->set_priority(prio);
+      msg->pgid = info.pgid;
+      msg->map_epoch = get_osdmap()->get_epoch();
+      msg->pulls.swap(i->second);
+      msg->compute_cost(g_ceph_context);
+      osd->send_message_osd_cluster(msg, con);
     }
   }
 }
@@ -5850,7 +5901,7 @@ int ReplicatedPG::send_push(int prio, int peer,
   int r = build_push_op(recovery_info, progress, out_progress, &op);
   if (r < 0)
     return r;
-  return send_push_op(prio, peer, op);
+  return send_push_op_legacy(prio, peer, op);
 }
 
 int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info,
@@ -5961,7 +6012,7 @@ int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info,
   return 0;
 }
 
-int ReplicatedPG::send_push_op(int prio, int peer, PushOp &pop)
+int ReplicatedPG::send_push_op_legacy(int prio, int peer, PushOp &pop)
 {
   tid_t tid = osd->get_tid();
   osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
@@ -6008,7 +6059,7 @@ void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
   PushOp pop;
   bool more = handle_push_reply(peer, rop, &pop);
   if (more)
-    send_push_op(pushing[soid][peer].priority, peer, pop);
+    send_push_op_legacy(pushing[soid][peer].priority, peer, pop);
 }
 
 bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
@@ -6120,7 +6171,7 @@ void ReplicatedPG::sub_op_pull(OpRequestRef op)
 
   PushOp reply;
   handle_pull(m->get_source().num(), pop, &reply);
-  send_push_op(
+  send_push_op_legacy(
     m->get_priority(),
     m->get_source().num(),
     reply);
@@ -6312,7 +6363,7 @@ void ReplicatedPG::sub_op_push(OpRequestRef op)
     PullOp resp;
     bool more = handle_pull_response(m->get_source().num(), pop, &resp, t);
     if (more) {
-      send_pull(
+      send_pull_legacy(
        m->get_priority(),
        m->get_source().num(),
        resp.recovery_info,
index 2ec2ebc40f8c32e10f14fd15c5c6df0d1ac3c8db..7dcf9db3f911259a6837eb58503ea5ce15c2fbb7 100644 (file)
@@ -571,10 +571,10 @@ protected:
                    const ObjectRecoveryProgress &progress,
                    ObjectRecoveryProgress *out_progress,
                    PushOp *out_op);
-  int send_push_op(int priority, int peer,
+  int send_push_op_legacy(int priority, int peer,
                   PushOp &pop);
     
-  int send_pull(int priority, int peer,
+  int send_pull_legacy(int priority, int peer,
                const ObjectRecoveryInfo& recovery_info,
                ObjectRecoveryProgress progress);
   void submit_push_data(ObjectRecoveryInfo &recovery_info,