From: Samuel Just Date: Wed, 19 Jun 2013 20:26:50 +0000 (-0700) Subject: ReplicatedPG: send compound messages to enlightened peers X-Git-Tag: v0.67-rc1~138^2~1^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ad65de40ffad5cecb959f710e9bceb8accc2ae3b;p=ceph.git ReplicatedPG: send compound messages to enlightened peers Signed-off-by: Samuel Just --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 15e65e3bf98..97c7ba6050b 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -426,6 +426,8 @@ OPTION(osd_recovery_delay_start, OPT_FLOAT, 0) OPTION(osd_recovery_max_active, OPT_INT, 5) OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20) // max size of push chunk OPTION(osd_push_per_object_cost, OPT_U64, 1000) // push cost per object +OPTION(osd_max_push_cost, OPT_U64, 8<<20) // max size of push message +OPTION(osd_max_push_objects, OPT_U64, 10) // max objects in single push op OPTION(osd_recovery_forget_lost_objects, OPT_BOOL, false) // off for now OPTION(osd_max_scrubs, OPT_INT, 1) OPTION(osd_scrub_load_threshold, OPT_FLOAT, 0.5) diff --git a/src/include/ceph_features.h b/src/include/ceph_features.h index b5a01c5647d..579dcb3c023 100644 --- a/src/include/ceph_features.h +++ b/src/include/ceph_features.h @@ -37,6 +37,7 @@ #define CEPH_FEATURE_OSDHASHPSPOOL (1ULL<<30) #define CEPH_FEATURE_MON_SINGLE_PAXOS (1ULL<<31) #define CEPH_FEATURE_OSD_SNAPMAPPER (1ULL<<32) +#define CEPH_FEATURE_OSD_PACKED_RECOVERY (1ULL<<33) /* * Features supported. Should be everything above. @@ -74,7 +75,8 @@ CEPH_FEATURE_MDSENC | \ CEPH_FEATURE_OSDHASHPSPOOL | \ CEPH_FEATURE_MON_SINGLE_PAXOS | \ - CEPH_FEATURE_OSD_SNAPMAPPER) + CEPH_FEATURE_OSD_SNAPMAPPER | \ + CEPH_FEATURE_OSD_PACKED_RECOVERY) #define CEPH_FEATURES_SUPPORTED_DEFAULT CEPH_FEATURES_ALL diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index a297f5df171..f150ccc92ff 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -5472,7 +5472,7 @@ void ReplicatedPG::prep_push( pi.recovery_progress = new_progress; } -int ReplicatedPG::send_pull(int prio, int peer, +int ReplicatedPG::send_pull_legacy(int prio, int peer, const ObjectRecoveryInfo &recovery_info, ObjectRecoveryProgress progress) { @@ -5816,10 +5816,42 @@ void ReplicatedPG::send_pushes(int prio, map > &pushes) for (map >::iterator i = pushes.begin(); i != pushes.end(); ++i) { - for (vector::iterator j = i->second.begin(); - j != i->second.end(); - ++j) { - send_push_op(prio, i->first, *j); + ConnectionRef con = osd->get_con_osd_cluster( + i->first, + get_osdmap()->get_epoch()); + if (!con) + continue; + if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) { + for (vector::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + dout(20) << __func__ << ": sending push (legacy) " << *j + << " to osd." << i->first << dendl; + send_push_op_legacy(prio, i->first, *j); + } + } else { + vector::iterator j = i->second.begin(); + while (j != i->second.end()) { + uint64_t cost = 0; + uint64_t pushes = 0; + MOSDPGPush *msg = new MOSDPGPush(); + msg->pgid = info.pgid; + msg->map_epoch = get_osdmap()->get_epoch(); + msg->set_priority(prio); + for (; + (j != i->second.end() && + cost < g_conf->osd_max_push_cost && + pushes < g_conf->osd_max_push_objects) ; + ++j) { + dout(20) << __func__ << ": sending push " << *j + << " to osd." << i->first << dendl; + cost += j->cost(g_ceph_context); + pushes += 1; + msg->pushes.push_back(*j); + } + msg->compute_cost(g_ceph_context); + osd->send_message_osd_cluster(msg, con); + } } } } @@ -5829,14 +5861,33 @@ void ReplicatedPG::send_pulls(int prio, map > &pulls) for (map >::iterator i = pulls.begin(); i != pulls.end(); ++i) { - for (vector::iterator j = i->second.begin(); - j != i->second.end(); - ++j) { - send_pull( - prio, - i->first, - j->recovery_info, - j->recovery_progress); + ConnectionRef con = osd->get_con_osd_cluster( + i->first, + get_osdmap()->get_epoch()); + if (!con) + continue; + if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) { + for (vector::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + dout(20) << __func__ << ": sending pull (legacy) " << *j + << " to osd." << i->first << dendl; + send_pull_legacy( + prio, + i->first, + j->recovery_info, + j->recovery_progress); + } + } else { + dout(20) << __func__ << ": sending pulls " << i->second + << " to osd." << i->first << dendl; + MOSDPGPull *msg = new MOSDPGPull(); + msg->set_priority(prio); + msg->pgid = info.pgid; + msg->map_epoch = get_osdmap()->get_epoch(); + msg->pulls.swap(i->second); + msg->compute_cost(g_ceph_context); + osd->send_message_osd_cluster(msg, con); } } } @@ -5850,7 +5901,7 @@ int ReplicatedPG::send_push(int prio, int peer, int r = build_push_op(recovery_info, progress, out_progress, &op); if (r < 0) return r; - return send_push_op(prio, peer, op); + return send_push_op_legacy(prio, peer, op); } int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info, @@ -5961,7 +6012,7 @@ int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info, return 0; } -int ReplicatedPG::send_push_op(int prio, int peer, PushOp &pop) +int ReplicatedPG::send_push_op_legacy(int prio, int peer, PushOp &pop) { tid_t tid = osd->get_tid(); osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); @@ -6008,7 +6059,7 @@ void ReplicatedPG::sub_op_push_reply(OpRequestRef op) PushOp pop; bool more = handle_push_reply(peer, rop, &pop); if (more) - send_push_op(pushing[soid][peer].priority, peer, pop); + send_push_op_legacy(pushing[soid][peer].priority, peer, pop); } bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply) @@ -6120,7 +6171,7 @@ void ReplicatedPG::sub_op_pull(OpRequestRef op) PushOp reply; handle_pull(m->get_source().num(), pop, &reply); - send_push_op( + send_push_op_legacy( m->get_priority(), m->get_source().num(), reply); @@ -6312,7 +6363,7 @@ void ReplicatedPG::sub_op_push(OpRequestRef op) PullOp resp; bool more = handle_pull_response(m->get_source().num(), pop, &resp, t); if (more) { - send_pull( + send_pull_legacy( m->get_priority(), m->get_source().num(), resp.recovery_info, diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 2ec2ebc40f8..7dcf9db3f91 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -571,10 +571,10 @@ protected: const ObjectRecoveryProgress &progress, ObjectRecoveryProgress *out_progress, PushOp *out_op); - int send_push_op(int priority, int peer, + int send_push_op_legacy(int priority, int peer, PushOp &pop); - int send_pull(int priority, int peer, + int send_pull_legacy(int priority, int peer, const ObjectRecoveryInfo& recovery_info, ObjectRecoveryProgress progress); void submit_push_data(ObjectRecoveryInfo &recovery_info,