OPTION(osd_recovery_max_active, OPT_INT, 5)
OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20) // max size of push chunk
OPTION(osd_push_per_object_cost, OPT_U64, 1000) // push cost per object
+OPTION(osd_max_push_cost, OPT_U64, 8<<20) // max size of push message
+OPTION(osd_max_push_objects, OPT_U64, 10) // max objects in single push op
OPTION(osd_recovery_forget_lost_objects, OPT_BOOL, false) // off for now
OPTION(osd_max_scrubs, OPT_INT, 1)
OPTION(osd_scrub_load_threshold, OPT_FLOAT, 0.5)
pi.recovery_progress = new_progress;
}
-int ReplicatedPG::send_pull(int prio, int peer,
+int ReplicatedPG::send_pull_legacy(int prio, int peer,
const ObjectRecoveryInfo &recovery_info,
ObjectRecoveryProgress progress)
{
for (map<int, vector<PushOp> >::iterator i = pushes.begin();
i != pushes.end();
++i) {
- for (vector<PushOp>::iterator j = i->second.begin();
- j != i->second.end();
- ++j) {
- send_push_op(prio, i->first, *j);
+ ConnectionRef con = osd->get_con_osd_cluster(
+ i->first,
+ get_osdmap()->get_epoch());
+ if (!con)
+ continue;
+ if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
+ for (vector<PushOp>::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ dout(20) << __func__ << ": sending push (legacy) " << *j
+ << " to osd." << i->first << dendl;
+ send_push_op_legacy(prio, i->first, *j);
+ }
+ } else {
+ vector<PushOp>::iterator j = i->second.begin();
+ while (j != i->second.end()) {
+ uint64_t cost = 0;
+ uint64_t pushes = 0;
+ MOSDPGPush *msg = new MOSDPGPush();
+ msg->pgid = info.pgid;
+ msg->map_epoch = get_osdmap()->get_epoch();
+ msg->set_priority(prio);
+ for (;
+ (j != i->second.end() &&
+ cost < g_conf->osd_max_push_cost &&
+ pushes < g_conf->osd_max_push_objects) ;
+ ++j) {
+ dout(20) << __func__ << ": sending push " << *j
+ << " to osd." << i->first << dendl;
+ cost += j->cost(g_ceph_context);
+ pushes += 1;
+ msg->pushes.push_back(*j);
+ }
+ msg->compute_cost(g_ceph_context);
+ osd->send_message_osd_cluster(msg, con);
+ }
}
}
}
for (map<int, vector<PullOp> >::iterator i = pulls.begin();
i != pulls.end();
++i) {
- for (vector<PullOp>::iterator j = i->second.begin();
- j != i->second.end();
- ++j) {
- send_pull(
- prio,
- i->first,
- j->recovery_info,
- j->recovery_progress);
+ ConnectionRef con = osd->get_con_osd_cluster(
+ i->first,
+ get_osdmap()->get_epoch());
+ if (!con)
+ continue;
+ if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
+ for (vector<PullOp>::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ dout(20) << __func__ << ": sending pull (legacy) " << *j
+ << " to osd." << i->first << dendl;
+ send_pull_legacy(
+ prio,
+ i->first,
+ j->recovery_info,
+ j->recovery_progress);
+ }
+ } else {
+ dout(20) << __func__ << ": sending pulls " << i->second
+ << " to osd." << i->first << dendl;
+ MOSDPGPull *msg = new MOSDPGPull();
+ msg->set_priority(prio);
+ msg->pgid = info.pgid;
+ msg->map_epoch = get_osdmap()->get_epoch();
+ msg->pulls.swap(i->second);
+ msg->compute_cost(g_ceph_context);
+ osd->send_message_osd_cluster(msg, con);
}
}
}
int r = build_push_op(recovery_info, progress, out_progress, &op);
if (r < 0)
return r;
- return send_push_op(prio, peer, op);
+ return send_push_op_legacy(prio, peer, op);
}
int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info,
return 0;
}
-int ReplicatedPG::send_push_op(int prio, int peer, PushOp &pop)
+int ReplicatedPG::send_push_op_legacy(int prio, int peer, PushOp &pop)
{
tid_t tid = osd->get_tid();
osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
PushOp pop;
bool more = handle_push_reply(peer, rop, &pop);
if (more)
- send_push_op(pushing[soid][peer].priority, peer, pop);
+ send_push_op_legacy(pushing[soid][peer].priority, peer, pop);
}
bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
PushOp reply;
handle_pull(m->get_source().num(), pop, &reply);
- send_push_op(
+ send_push_op_legacy(
m->get_priority(),
m->get_source().num(),
reply);
PullOp resp;
bool more = handle_pull_response(m->get_source().num(), pop, &resp, t);
if (more) {
- send_pull(
+ send_pull_legacy(
m->get_priority(),
m->get_source().num(),
resp.recovery_info,