* intelligently push an object to a replica. make use of existing
* clones/heads and dup data ranges where possible.
*/
-void ReplicatedPG::push_to_replica(const sobject_t& soid, int peer)
+void ReplicatedPG::push_to_replica(ObjectContext *obc, const sobject_t& soid, int peer)
{
- dout(10) << "push_to_replica " << soid << " osd" << peer << dendl;
+ const object_info_t& oi = obc->obs.oi;
+ uint64_t size = obc->obs.oi.size;
+
+ dout(10) << "push_to_replica " << soid << " v" << oi.version << " size " << size << " to osd" << peer << dendl;
- // get size
- struct stat st;
- int r = osd->store->stat(coll_t::build_pg_coll(info.pgid), soid, &st);
- assert(r == 0);
-
map<sobject_t, interval_set<uint64_t> > clone_subsets;
interval_set<uint64_t> data_subset;
-
- bufferlist bv;
- r = osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, OI_ATTR, bv);
- assert(r >= 0);
- object_info_t oi(bv);
// are we doing a clone on the replica?
if (soid.snap && soid.snap < CEPH_NOSNAP) {
<< ", pushing " << soid << " attrs as a clone op" << dendl;
interval_set<uint64_t> data_subset;
map<sobject_t, interval_set<uint64_t> > clone_subsets;
- if (st.st_size)
- clone_subsets[head].insert(0, st.st_size);
- push(soid, peer, data_subset, clone_subsets);
+ if (size)
+ clone_subsets[head].insert(0, size);
+ push_start(soid, peer, size, oi.version, data_subset, clone_subsets);
return;
}
// we need the head (and current SnapSet) to do that.
if (missing.is_missing(head)) {
dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
- return push(soid, peer);
+ return push_start(soid, peer);
}
sobject_t snapdir = head;
snapdir.snap = CEPH_SNAPDIR;
if (missing.is_missing(snapdir)) {
dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl;
- return push(snapdir, peer);
+ return push_start(snapdir, peer);
}
SnapSetContext *ssc = get_snapset_context(soid.oid, false);
put_snapset_context(ssc);
}
- dout(10) << "push_to_replica " << soid << " pushing " << data_subset
+ push_start(soid, peer, size, oi.version, data_subset, clone_subsets);
+}
+
+void ReplicatedPG::push_start(const sobject_t& soid, int peer)
+{
+ struct stat st;
+ int r = osd->store->stat(coll_t::build_pg_coll(info.pgid), soid, &st);
+ assert(r == 0);
+ uint64_t size = st.st_size;
+
+ bufferlist bl;
+ r = osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, OI_ATTR, bl);
+ object_info_t oi(bl);
+
+ interval_set<uint64_t> data_subset;
+ map<sobject_t, interval_set<uint64_t> > clone_subsets;
+ data_subset.insert(0, size);
+
+ push_start(soid, peer, size, oi.version, data_subset, clone_subsets);
+}
+
+void ReplicatedPG::push_start(const sobject_t& soid, int peer,
+ uint64_t size, eversion_t version,
+ interval_set<uint64_t> &data_subset,
+ map<sobject_t, interval_set<uint64_t> >& clone_subsets)
+{
+ // take note.
+ push_info_t *pi = &pushing[soid][peer];
+ pi->version = version;
+ pi->data_subset = data_subset;
+ pi->clone_subsets = clone_subsets;
+ //pi->data_subset_pushing.span_of(pi->data_subset, 0, g_conf.osd_recovery_max_chunk);
+
+ dout(10) << "push_start " << soid << " size " << size << " data " << data_subset
<< " cloning " << clone_subsets << dendl;
- push(soid, peer, data_subset, clone_subsets);
+ send_push_op(soid, peer, size, pi->data_subset, pi->clone_subsets);
}
+
/*
* push - send object to a peer
*/
-void ReplicatedPG::push(const sobject_t& soid, int peer)
-{
- interval_set<uint64_t> subset;
- map<sobject_t, interval_set<uint64_t> > clone_subsets;
- push(soid, peer, subset, clone_subsets);
-}
-void ReplicatedPG::push(const sobject_t& soid, int peer,
- interval_set<uint64_t> &data_subset,
- map<sobject_t, interval_set<uint64_t> >& clone_subsets)
+void ReplicatedPG::send_push_op(const sobject_t& soid, int peer,
+ uint64_t size,
+ interval_set<uint64_t> &data_subset,
+ map<sobject_t, interval_set<uint64_t> >& clone_subsets)
{
// read data+attrs
bufferlist bl;
map<string,bufferptr> attrset;
- uint64_t size;
-
- if (data_subset.size() || clone_subsets.size()) {
- struct stat st;
- int r = osd->store->stat(coll_t::build_pg_coll(info.pgid), soid, &st);
- assert(r == 0);
- size = st.st_size;
-
- for (map<uint64_t,uint64_t>::iterator p = data_subset.m.begin();
- p != data_subset.m.end();
- p++) {
- bufferlist bit;
- osd->store->read(coll_t::build_pg_coll(info.pgid), soid, p->first, p->second, bit);
- if (p->second != bit.length()) {
- dout(10) << " extent " << p->first << "~" << p->second
- << " is actually " << p->first << "~" << bit.length() << dendl;
- p->second = bit.length();
- }
- bl.claim_append(bit);
+
+ for (map<uint64_t,uint64_t>::iterator p = data_subset.m.begin();
+ p != data_subset.m.end();
+ p++) {
+ bufferlist bit;
+ osd->store->read(coll_t::build_pg_coll(info.pgid), soid, p->first, p->second, bit);
+ if (p->second != bit.length()) {
+ dout(10) << " extent " << p->first << "~" << p->second
+ << " is actually " << p->first << "~" << bit.length() << dendl;
+ p->second = bit.length();
}
- } else {
- osd->store->read(coll_t::build_pg_coll(info.pgid), soid, 0, 0, bl);
- size = bl.length();
+ bl.claim_append(bit);
}
osd->store->getattrs(coll_t::build_pg_coll(info.pgid), soid, attrset);
object_info_t oi(bv);
// ok
- dout(7) << "push " << soid << " v " << oi.version
- << " size " << size
+ dout(7) << "send_push_op " << soid << " v " << oi.version
+ << " size " << size
<< " subset " << data_subset
<< " data " << bl.length()
<< " to osd" << peer
osd->osdmap->get_epoch(), osd->get_tid(), oi.version);
subop->ops = vector<OSDOp>(1);
subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
- subop->ops[0].op.extent.offset = 0;
- subop->ops[0].op.extent.length = size;
+ //subop->ops[0].op.extent.offset = 0;
+ //subop->ops[0].op.extent.length = size;
subop->ops[0].data = bl;
subop->data_subset.swap(data_subset);
subop->clone_subsets.swap(clone_subsets);
subop->attrset.swap(attrset);
subop->old_size = size;
osd->messenger->send_message(subop, osd->osdmap->get_inst(peer));
-
- if (is_primary()) {
- pushing[soid].insert(peer);
- peer_missing[peer].got(soid, oi.version);
- }
}
void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
<< dendl;
} else if (pushing[soid].count(peer) == 0) {
dout(10) << "huh, i wasn't pushing " << soid << " to osd" << peer
- << ", only " << pushing[soid]
<< dendl;
} else {
- pushing[soid].erase(peer);
+ push_info_t *pi = &pushing[soid][peer];
- if (peer_missing.count(peer) == 0 ||
- peer_missing[peer].num_missing() == 0)
+ peer_missing[peer].got(soid, pi->version);
+ if (peer_missing[peer].num_missing() == 0)
uptodate_set.insert(peer);
+ pushing[soid].erase(peer);
+ pi = NULL;
+
update_stats();
if (pushing[soid].empty()) {
+ pushing.erase(soid);
dout(10) << "pushed " << soid << " to all replicas" << dendl;
finish_recovery_op(soid);
if (waiting_for_degraded_object.count(soid)) {
}
} else {
dout(10) << "pushed " << soid << ", still waiting for push ack from "
- << pushing[soid] << dendl;
+ << pushing[soid].size() << " others" << dendl;
}
}
reply->put();
assert(!is_primary()); // we should be a replica or stray.
- push(soid, op->get_source().num(), op->data_subset, op->clone_subsets);
+ struct stat st;
+ int r = osd->store->stat(coll_t::build_pg_coll(info.pgid), soid, &st);
+ assert(r == 0);
+ uint64_t size = st.st_size;
+
+ send_push_op(soid, op->get_source().num(), size, op->data_subset, op->clone_subsets);
op->put();
}
dout(10) << "recover_object_replicas " << soid << dendl;
- ObjectContext *obc = lookup_object_context(soid);
- if (obc) {
- dout(10) << " ondisk_read_lock for " << soid << dendl;
- obc->ondisk_read_lock();
- }
+ ObjectContext *obc = get_object_context(soid);
+ dout(10) << " ondisk_read_lock for " << soid << dendl;
+ obc->ondisk_read_lock();
start_recovery_op(soid);
started++;
for (unsigned i=1; i<acting.size(); i++) {
int peer = acting[i];
if (peer_missing.count(peer) &&
- peer_missing[peer].is_missing(soid))
- push_to_replica(soid, peer);
+ peer_missing[peer].is_missing(soid)) {
+ push_to_replica(obc, soid, peer);
+ }
}
- if (obc) {
- dout(10) << " ondisk_read_unlock on " << soid << dendl;
- obc->ondisk_read_unlock();
- put_object_context(obc);
- }
+ dout(10) << " ondisk_read_unlock on " << soid << dendl;
+ obc->ondisk_read_unlock();
+ put_object_context(obc);
return started;
}
map<sobject_t, pull_info_t> pulling;
// push
- map<sobject_t, set<int> > pushing;
+ struct push_info_t {
+ eversion_t version;
+ interval_set<uint64_t> data_subset;
+ map<sobject_t, interval_set<uint64_t> > clone_subsets;
+ };
+ map<sobject_t, map<int, push_info_t> > pushing;
int recover_object_replicas(const sobject_t& soid);
void calc_head_subsets(SnapSet& snapset, const sobject_t& head,
void calc_clone_subsets(SnapSet& snapset, const sobject_t& poid, Missing& missing,
interval_set<uint64_t>& data_subset,
map<sobject_t, interval_set<uint64_t> >& clone_subsets);
- void push_to_replica(const sobject_t& oid, int dest);
- void push(const sobject_t& oid, int dest);
- void push(const sobject_t& oid, int dest, interval_set<uint64_t>& data_subset,
- map<sobject_t, interval_set<uint64_t> >& clone_subsets);
+ void push_to_replica(ObjectContext *obc, const sobject_t& oid, int dest);
+ void push_start(const sobject_t& oid, int dest);
+ void push_start(const sobject_t& soid, int peer,
+ uint64_t size, eversion_t version,
+ interval_set<uint64_t> &data_subset,
+ map<sobject_t, interval_set<uint64_t> >& clone_subsets);
+ void send_push_op(const sobject_t& oid, int dest,
+ uint64_t size,
+ interval_set<uint64_t>& data_subset,
+ map<sobject_t, interval_set<uint64_t> >& clone_subsets);
+
bool pull(const sobject_t& oid);
void send_pull_op(const sobject_t& soid, eversion_t v, const interval_set<uint64_t>& data_subset, int fromosd);