break;
}
}
- recover_object_replicas(soid, v, g_conf->osd_client_op_priority);
+ map<int, vector<PushOp> > pushes;
+ prep_object_replica_pushes(soid, v, g_conf->osd_client_op_priority, &pushes);
+ send_pushes(g_conf->osd_client_op_priority, pushes);
}
waiting_for_degraded_object[soid].push_back(op);
op->mark_delayed("waiting for degraded object");
* intelligently push an object to a replica. make use of existing
* clones/heads and dup data ranges where possible.
*/
-void ReplicatedPG::push_to_replica(
- ObjectContext *obc, const hobject_t& soid, int peer,
- int prio)
+void ReplicatedPG::prep_push_to_replica(
+ ObjectContext *obc, const hobject_t& soid, int peer,
+ int prio,
+ PushOp *pop)
{
const object_info_t& oi = obc->obs.oi;
uint64_t size = obc->obs.oi.size;
- dout(10) << "push_to_replica " << soid << " v" << oi.version << " size " << size << " to osd." << peer << dendl;
+ dout(10) << __func__ << soid << " v" << oi.version
+ << " size " << size << " to osd." << peer << dendl;
map<hobject_t, interval_set<uint64_t> > clone_subsets;
interval_set<uint64_t> data_subset;
// we need the head (and current SnapSet) locally to do that.
if (pg_log.get_missing().is_missing(head)) {
dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
- return push_start(prio, obc, soid, peer);
+ return prep_push(prio, obc, soid, peer, pop);
}
hobject_t snapdir = head;
snapdir.snap = CEPH_SNAPDIR;
if (pg_log.get_missing().is_missing(snapdir)) {
dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl;
- return push_start(prio, obc, soid, peer);
+ return prep_push(prio, obc, soid, peer, pop);
}
SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
put_snapset_context(ssc);
}
- push_start(prio, obc, soid, peer, oi.version, data_subset, clone_subsets);
+ prep_push(prio, obc, soid, peer, oi.version, data_subset, clone_subsets, pop);
}
-void ReplicatedPG::push_start(int prio,
- ObjectContext *obc,
- const hobject_t& soid, int peer)
+void ReplicatedPG::prep_push(int prio,
+ ObjectContext *obc,
+ const hobject_t& soid, int peer,
+ PushOp *pop)
{
interval_set<uint64_t> data_subset;
if (obc->obs.oi.size)
data_subset.insert(0, obc->obs.oi.size);
map<hobject_t, interval_set<uint64_t> > clone_subsets;
- push_start(prio, obc, soid, peer,
- obc->obs.oi.version, data_subset, clone_subsets);
+ prep_push(prio, obc, soid, peer,
+ obc->obs.oi.version, data_subset, clone_subsets,
+ pop);
}
-void ReplicatedPG::push_start(
+void ReplicatedPG::prep_push(
int prio,
ObjectContext *obc,
const hobject_t& soid, int peer,
eversion_t version,
interval_set<uint64_t> &data_subset,
- map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets,
+ PushOp *pop)
{
peer_missing[peer].revise_have(soid, eversion_t());
// take note.
pi.priority = prio;
ObjectRecoveryProgress new_progress;
- send_push(pi.priority,
- peer, pi.recovery_info,
- pi.recovery_progress, &new_progress);
+ build_push_op(pi.recovery_info,
+ pi.recovery_progress,
+ &new_progress,
+ pop);
pi.recovery_progress = new_progress;
}
info.last_complete));
}
+void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
+{
+ for (map<int, vector<PushOp> >::iterator i = pushes.begin();
+ i != pushes.end();
+ ++i) {
+ for (vector<PushOp>::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ send_push_op(prio, i->first, *j);
+ }
+ }
+}
+
int ReplicatedPG::send_push(int prio, int peer,
const ObjectRecoveryInfo &recovery_info,
const ObjectRecoveryProgress &progress,
return started;
}
-int ReplicatedPG::recover_object_replicas(
- const hobject_t& soid, eversion_t v, int prio)
+int ReplicatedPG::prep_object_replica_pushes(
+ const hobject_t& soid, eversion_t v, int prio,
+ map<int, vector<PushOp> > *pushes)
{
- dout(10) << "recover_object_replicas " << soid << dendl;
+ dout(10) << __func__ << ": on " << soid << dendl;
// NOTE: we know we will get a valid oloc off of disk here.
ObjectContext *obc = get_object_context(soid, OLOC_BLANK, false);
start_recovery_op(soid);
started = true;
}
- push_to_replica(obc, soid, peer, prio);
+ (*pushes)[peer].push_back(PushOp());
+ prep_push_to_replica(obc, soid, peer, prio,
+ &((*pushes)[peer].back())
+ );
}
}
dout(10) << __func__ << "(" << max << ")" << dendl;
int started = 0;
+ map<int, vector<PushOp> > pushes;
+
// this is FAR from an optimal recovery order. pretty lame, really.
for (unsigned i=1; i<acting.size(); i++) {
int peer = acting[i];
dout(10) << __func__ << ": recover_object_replicas(" << soid << ")" << dendl;
map<hobject_t,pg_missing_t::item>::const_iterator r = m.missing.find(soid);
- started += recover_object_replicas(soid, r->second.need,
- g_conf->osd_recovery_op_priority);
+ started += prep_object_replica_pushes(soid, r->second.need,
+ g_conf->osd_recovery_op_priority,
+ &pushes);
}
}
+ send_pushes(g_conf->osd_recovery_op_priority, pushes);
+
return started;
}
++i) {
send_remove_op(i->first, i->second, backfill_target);
}
+
+ map<int, vector<PushOp> > pushes;
for (map<hobject_t, pair<eversion_t, eversion_t> >::iterator i = to_push.begin();
i != to_push.end();
++i) {
- push_backfill_object(i->first, i->second.first, i->second.second, backfill_target);
+ prep_backfill_object_push(
+ i->first, i->second.first, i->second.second, backfill_target, &pushes);
}
+ send_pushes(g_conf->osd_recovery_op_priority, pushes);
release_waiting_for_backfill_pos();
dout(5) << "backfill_pos is " << backfill_pos << " and pinfo.last_backfill is "
return ops;
}
-void ReplicatedPG::push_backfill_object(hobject_t oid, eversion_t v, eversion_t have, int peer)
+void ReplicatedPG::prep_backfill_object_push(
+ hobject_t oid, eversion_t v, eversion_t have, int peer,
+ map<int, vector<PushOp> > *pushes)
{
dout(10) << "push_backfill_object " << oid << " v " << v << " to osd." << peer << dendl;
start_recovery_op(oid);
ObjectContext *obc = get_object_context(oid, OLOC_BLANK, false);
obc->ondisk_read_lock();
- push_to_replica(obc, oid, peer, g_conf->osd_recovery_op_priority);
+ (*pushes)[peer].push_back(PushOp());
+ prep_push_to_replica(obc, oid, peer, g_conf->osd_recovery_op_priority,
+ &((*pushes)[peer].back()));
obc->ondisk_read_unlock();
put_object_context(obc);
}
void handle_push(
int from, PushOp &op, PushReplyOp *response,
ObjectStore::Transaction *t);
+ void send_pushes(int prio, map<int, vector<PushOp> > &pushes);
int send_push(int priority, int peer,
const ObjectRecoveryInfo& recovery_info,
const ObjectRecoveryProgress &progress,
// Reverse mapping from osd peer to objects beging pulled from that peer
map<int, set<hobject_t> > pull_from_peer;
- int recover_object_replicas(const hobject_t& soid, eversion_t v,
- int priority);
+ int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
+ int priority,
+ map<int, vector<PushOp> > *pushes);
void calc_head_subsets(ObjectContext *obc, SnapSet& snapset, const hobject_t& head,
pg_missing_t& missing,
const hobject_t &last_backfill,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t> >& clone_subsets);
- void push_to_replica(
+ void prep_push_to_replica(
ObjectContext *obc,
const hobject_t& oid,
int dest,
- int priority);
- void push_start(int priority,
- ObjectContext *obc,
- const hobject_t& oid, int dest);
- void push_start(int priority,
- ObjectContext *obc,
- const hobject_t& soid, int peer,
- eversion_t version,
- interval_set<uint64_t> &data_subset,
- map<hobject_t, interval_set<uint64_t> >& clone_subsets);
+ int priority,
+ PushOp *push_op);
+ void prep_push(int priority,
+ ObjectContext *obc,
+ const hobject_t& oid, int dest,
+ PushOp *op);
+ void prep_push(int priority,
+ ObjectContext *obc,
+ const hobject_t& soid, int peer,
+ eversion_t version,
+ interval_set<uint64_t> &data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets,
+ PushOp *op);
void send_push_op_blank(const hobject_t& soid, int peer);
void finish_degraded_object(const hobject_t& oid);
*/
void scan_range(hobject_t begin, int min, int max, BackfillInterval *bi);
- void push_backfill_object(hobject_t oid, eversion_t v, eversion_t have, int peer);
+ void prep_backfill_object_push(
+ hobject_t oid, eversion_t v, eversion_t have, int peer,
+ map<int, vector<PushOp> > *pushes);
void send_remove_op(const hobject_t& oid, eversion_t v, int peer);