snapdir.snap = CEPH_SNAPDIR;
if (pg_log.get_missing().is_missing(snapdir)) {
dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl;
- return push_start(prio, obc, soid, peer);
+ return prep_push(prio, obc, soid, peer, pop);
}
- SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
+ SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace());
assert(ssc);
dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
calc_clone_subsets(ssc->snapset, soid, peer_missing[peer],
return started;
}
- int ReplicatedPG::recover_object_replicas(
- const hobject_t& soid, eversion_t v, int prio)
+ int ReplicatedPG::prep_object_replica_pushes(
+ const hobject_t& soid, eversion_t v, int prio,
+ map<int, vector<PushOp> > *pushes)
{
- dout(10) << "recover_object_replicas " << soid << dendl;
+ dout(10) << __func__ << ": on " << soid << dendl;
// NOTE: we know we will get a valid oloc off of disk here.
- ObjectContext *obc = get_object_context(soid, OLOC_BLANK, false);
+ ObjectContext *obc = get_object_context(soid, false);
if (!obc) {
pg_log.missing_add(soid, v, eversion_t());
bool uhoh = true;
if (!pushing.count(oid))
start_recovery_op(oid);
- ObjectContext *obc = get_object_context(oid, OLOC_BLANK, false);
+ ObjectContext *obc = get_object_context(oid, false);
obc->ondisk_read_lock();
- push_to_replica(obc, oid, peer, g_conf->osd_recovery_op_priority);
+ (*pushes)[peer].push_back(PushOp());
+ prep_push_to_replica(obc, oid, peer, g_conf->osd_recovery_op_priority,
+ &((*pushes)[peer].back()));
obc->ondisk_read_unlock();
put_object_context(obc);
}