}
else {
dout(7) << "missing " << soid << " v " << v << ", pulling." << dendl;
- pull(soid, v, g_conf->osd_client_op_priority);
+ map<int, vector<PullOp> > pulls;
+ prepare_pull(soid, v, g_conf->osd_client_op_priority, &pulls);
+ send_pulls(g_conf->osd_client_op_priority, pulls);
}
waiting_for_missing_object[soid].push_back(op);
op->mark_delayed("waiting for missing object");
*/
enum { PULL_NONE, PULL_OTHER, PULL_YES };
-int ReplicatedPG::pull(
+int ReplicatedPG::prepare_pull(
const hobject_t& soid, eversion_t v,
- int priority)
+ int priority,
+ map<int, vector<PullOp> > *pulls)
{
int fromosd = -1;
map<hobject_t,set<int> >::iterator q = missing_loc.find(soid);
dout(10) << " missing but already pulling head " << head << dendl;
return PULL_NONE;
} else {
- int r = pull(head, pg_log.get_missing().missing.find(head)->second.need, priority);
+ int r = prepare_pull(
+ head, pg_log.get_missing().missing.find(head)->second.need, priority,
+ pulls);
if (r != PULL_NONE)
return PULL_OTHER;
return PULL_NONE;
dout(10) << " missing but already pulling snapdir " << head << dendl;
return PULL_NONE;
} else {
- int r = pull(head, pg_log.get_missing().missing.find(head)->second.need, priority);
+ int r = prepare_pull(
+ head, pg_log.get_missing().missing.find(head)->second.need, priority,
+ pulls);
if (r != PULL_NONE)
return PULL_OTHER;
return PULL_NONE;
recovery_info.size = ((uint64_t)-1);
}
+ (*pulls)[fromosd].push_back(PullOp());
+ PullOp &op = (*pulls)[fromosd].back();
+ op.soid = soid;
+
+ op.recovery_info = recovery_info;
+ op.recovery_info.soid = soid;
+ op.recovery_info.version = v;
+ op.recovery_progress.data_complete = false;
+ op.recovery_progress.omap_complete = false;
+ op.recovery_progress.data_recovered_to = 0;
+ op.recovery_progress.first = true;
- recovery_info.soid = soid;
- recovery_info.version = v;
- ObjectRecoveryProgress progress;
- progress.data_complete = false;
- progress.omap_complete = false;
- progress.data_recovered_to = 0;
- progress.first = true;
assert(!pulling.count(soid));
pull_from_peer[fromosd].insert(soid);
PullInfo &pi = pulling[soid];
- pi.recovery_info = recovery_info;
- pi.recovery_progress = progress;
+ pi.recovery_info = op.recovery_info;
+ pi.recovery_progress = op.recovery_progress;
pi.priority = priority;
- send_pull(priority, fromosd, recovery_info, progress);
start_recovery_op(soid);
return PULL_YES;
}
}
+void ReplicatedPG::send_pulls(int prio, map<int, vector<PullOp> > &pulls)
+{
+ for (map<int, vector<PullOp> >::iterator i = pulls.begin();
+ i != pulls.end();
+ ++i) {
+ for (vector<PullOp>::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ send_pull(
+ prio,
+ i->first,
+ j->recovery_info,
+ j->recovery_progress);
+ }
+ }
+}
+
int ReplicatedPG::send_push(int prio, int peer,
const ObjectRecoveryInfo &recovery_info,
const ObjectRecoveryProgress &progress,
int started = 0;
int skipped = 0;
+ map<int, vector<PullOp> > pulls;
map<version_t, hobject_t>::const_iterator p =
missing.rmissing.lower_bound(pg_log.get_log().last_requested);
while (p != missing.rmissing.end()) {
} else if (unfound) {
++skipped;
} else {
- int r = pull(soid, need, g_conf->osd_recovery_op_priority);
+ int r = prepare_pull(
+ soid, need, g_conf->osd_recovery_op_priority, &pulls);
switch (r) {
case PULL_YES:
++started;
assert(0);
}
if (started >= max)
- return started;
+ break;
}
}
pg_log.set_last_requested(v);
}
+ send_pulls(g_conf->osd_recovery_op_priority, pulls);
return started;
}