assert(g != missing.missing.end());
const eversion_t &v(g->second.need);
- map<hobject_t, pull_info_t>::const_iterator p = pulling.find(soid);
+ map<hobject_t, PullInfo>::const_iterator p = pulling.find(soid);
if (p != pulling.end()) {
dout(7) << "missing " << soid << " v " << v << ", already pulling." << dendl;
}
<< " from osd." << fromosd
<< dendl;
- map<hobject_t, interval_set<uint64_t> > clone_subsets;
- interval_set<uint64_t> data_subset;
+ ObjectRecoveryInfo recovery_info;
bool need_size = false;
// is this a snapped object? if so, consult the snapset.. we may not need the entire object!
SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
dout(10) << " snapset " << ssc->snapset << dendl;
calc_clone_subsets(ssc->snapset, soid, missing, info.last_backfill,
- data_subset, clone_subsets);
+ recovery_info.copy_subset,
+ recovery_info.clone_subset);
put_snapset_context(ssc);
// FIXME: this may overestimate if we are pulling multiple clones in parallel...
- dout(10) << " pulling " << data_subset << ", will clone " << clone_subsets
- << dendl;
+ dout(10) << " pulling " << recovery_info << dendl;
} else {
// pulling head or unversioned object.
// always pull the whole thing.
need_size = true;
- data_subset.insert(0, (uint64_t)-1);
+ recovery_info.copy_subset.insert(0, (uint64_t)-1);
+ recovery_info.size = ((uint64_t)-1);
}
- // only pull so much at a time
- interval_set<uint64_t> pullsub;
- pullsub.span_of(data_subset, 0, g_conf->osd_recovery_max_chunk);
- // take note
- assert(pulling.count(soid) == 0);
+ recovery_info.soid = soid;
+ recovery_info.version = v;
+ ObjectRecoveryProgress progress;
+ progress.data_complete = false;
+ progress.data_recovered_to = 0;
+ progress.first = true;
+ assert(!pulling.count(soid));
pull_from_peer[fromosd].insert(soid);
- pull_info_t& p = pulling[soid];
- p.version = v;
- p.from = fromosd;
- p.data_subset = data_subset;
- p.data_subset_pulling = pullsub;
- p.need_size = need_size;
-
- send_pull_op(soid, v, true, p.data_subset_pulling, fromosd);
-
+ PullInfo &pi = pulling[soid];
+ pi.recovery_info = recovery_info;
+ pi.recovery_progress = progress;
+ send_pull(fromosd, recovery_info, progress);
+
start_recovery_op(soid);
return PULL_YES;
}
-void ReplicatedPG::send_pull_op(const hobject_t& soid, eversion_t v, bool first,
- const interval_set<uint64_t>& data_subset, int fromosd)
-{
- // send op
- tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
-
- dout(10) << "send_pull_op " << soid << " " << v
- << " first=" << first
- << " data " << data_subset << " from osd." << fromosd
- << " tid " << tid << dendl;
-
- MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, CEPH_OSD_FLAG_ACK,
- get_osdmap()->get_epoch(), tid, v);
- subop->ops = vector<OSDOp>(1);
- subop->ops[0].op.op = CEPH_OSD_OP_PULL;
- subop->data_subset = data_subset;
- subop->first = first;
-
- // do not include clone_subsets in pull request; we will recalculate this
- // when the object is pushed back.
- //subop->clone_subsets.swap(clone_subsets);
-
- osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(fromosd));
-
- osd->logger->inc(l_osd_pull);
-}
-
void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
{
tid_t tid = osd->get_tid();
map<hobject_t, interval_set<uint64_t> > clone_subsets;
if (size)
clone_subsets[head].insert(0, size);
- push_start(soid, peer, size, oi.version, data_subset, clone_subsets);
+ push_start(obc, soid, peer, oi.version, data_subset, clone_subsets);
return;
}
// we need the head (and current SnapSet) locally to do that.
if (missing.is_missing(head)) {
dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
- return push_start(soid, peer);
+ return push_start(obc, soid, peer);
}
hobject_t snapdir = head;
snapdir.snap = CEPH_SNAPDIR;
if (missing.is_missing(snapdir)) {
dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl;
- return push_start(soid, peer);
+ return push_start(obc, soid, peer);
}
SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
put_snapset_context(ssc);
}
- push_start(soid, peer, size, oi.version, data_subset, clone_subsets);
+ push_start(obc, soid, peer, oi.version, data_subset, clone_subsets);
}
-void ReplicatedPG::push_start(const hobject_t& soid, int peer)
+void ReplicatedPG::push_start(ObjectContext *obc,
+ const hobject_t& soid, int peer)
{
- struct stat st;
- int r = osd->store->stat(coll, soid, &st);
- assert(r == 0);
- uint64_t size = st.st_size;
-
- bufferlist bl;
- r = osd->store->getattr(coll, soid, OI_ATTR, bl);
- object_info_t oi(bl);
-
interval_set<uint64_t> data_subset;
+ data_subset.insert(0, obc->obs.oi.size);
map<hobject_t, interval_set<uint64_t> > clone_subsets;
- data_subset.insert(0, size);
- push_start(soid, peer, size, oi.version, data_subset, clone_subsets);
+ push_start(obc, soid, peer, obc->obs.oi.version, data_subset, clone_subsets);
}
-void ReplicatedPG::push_start(const hobject_t& soid, int peer,
- uint64_t size, eversion_t version,
- interval_set<uint64_t> &data_subset,
- map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+void ReplicatedPG::push_start(
+ ObjectContext *obc,
+ const hobject_t& soid, int peer,
+ eversion_t version,
+ interval_set<uint64_t> &data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets)
{
// take note.
- push_info_t *pi = &pushing[soid][peer];
- pi->size = size;
- pi->version = version;
- pi->data_subset = data_subset;
- pi->clone_subsets = clone_subsets;
+ PushInfo &pi = pushing[soid][peer];
+ pi.recovery_info.size = obc->obs.oi.size;
+ pi.recovery_info.copy_subset = data_subset;
+ pi.recovery_info.clone_subset = clone_subsets;
+ pi.recovery_info.soid = soid;
+ pi.recovery_info.oi = obc->obs.oi;
+ pi.recovery_info.version = version;
+ pi.recovery_progress.first = true;
+ pi.recovery_progress.data_recovered_to = 0;
+ pi.recovery_progress.data_complete = 0;
+
+ ObjectRecoveryProgress new_progress;
+ send_push(peer, pi.recovery_info, pi.recovery_progress, &new_progress);
+ pi.recovery_progress = new_progress;
+}
+
+int ReplicatedPG::send_pull(int peer,
+ ObjectRecoveryInfo recovery_info,
+ ObjectRecoveryProgress progress)
+{
+ // send op
+ tid_t tid = osd->get_tid();
+ osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+
+ dout(10) << "send_pull_op " << recovery_info.soid << " "
+ << recovery_info.version
+ << " first=" << progress.first
+ << " data " << recovery_info.copy_subset
+ << " from osd." << peer
+ << " tid " << tid << dendl;
+
+ MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid,
+ false, CEPH_OSD_FLAG_ACK,
+ get_osdmap()->get_epoch(), tid,
+ recovery_info.version);
+ subop->ops = vector<OSDOp>(1);
+ subop->ops[0].op.op = CEPH_OSD_OP_PULL;
+ subop->recovery_info = recovery_info;
+ subop->recovery_progress = progress;
- pi->data_subset_pushing.span_of(pi->data_subset, 0, g_conf->osd_recovery_max_chunk);
- bool complete = pi->data_subset_pushing == pi->data_subset;
+ osd->cluster_messenger->send_message(subop,
+ get_osdmap()->get_cluster_inst(peer));
- dout(10) << "push_start " << soid << " size " << size << " data " << data_subset
- << " cloning " << clone_subsets << dendl;
- send_push_op(soid, version, peer, size, true, complete, pi->data_subset_pushing, pi->clone_subsets);
+ osd->logger->inc(l_osd_pull);
+ return 0;
}
+void ReplicatedPG::submit_push_data(
+ const ObjectRecoveryInfo &recovery_info,
+ bool first,
+ const interval_set<uint64_t> &intervals_included,
+ bufferlist data_included,
+ map<string, bufferptr> &attrs,
+ ObjectStore::Transaction *t)
+{
+ if (first) {
+ t->remove(coll_t::TEMP_COLL, recovery_info.soid);
+ t->touch(coll_t::TEMP_COLL, recovery_info.soid);
+ }
+ uint64_t off = 0;
+ for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
+ p != intervals_included.end();
+ ++p) {
+ bufferlist bit;
+ bit.substr_of(data_included, off, p.get_len());
+ t->write(coll_t::TEMP_COLL, recovery_info.soid,
+ p.get_start(), p.get_len(), bit);
+ off += p.get_len();
+ }
-/*
- * push - send object to a peer
- */
+ t->setattrs(coll_t::TEMP_COLL, recovery_info.soid,
+ attrs);
+}
-int ReplicatedPG::send_push_op(const hobject_t& soid, eversion_t version, int peer,
- uint64_t size, bool first, bool complete,
- interval_set<uint64_t> &data_subset,
- map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info,
+ ObjectStore::Transaction *t)
{
- // read data+attrs
- bufferlist bl;
- map<string,bufferptr> attrset;
+ remove_object_with_snap_hardlinks(*t, recovery_info.soid);
+ t->collection_add(coll, coll_t::TEMP_COLL, recovery_info.soid);
+ t->collection_remove(coll_t::TEMP_COLL, recovery_info.soid);
+ for (map<hobject_t, interval_set<uint64_t> >::const_iterator p =
+ recovery_info.clone_subset.begin();
+ p != recovery_info.clone_subset.end();
+ ++p) {
+ for (interval_set<uint64_t>::const_iterator q = p->second.begin();
+ q != p->second.end();
+ ++q) {
+ dout(15) << " clone_range " << p->first << " "
+ << q.get_start() << "~" << q.get_len() << dendl;
+ t->clone_range(coll, p->first, recovery_info.soid,
+ q.get_start(), q.get_len(), q.get_start());
+ }
+ }
+
+ if (recovery_info.soid.snap < CEPH_NOSNAP) {
+ if (recovery_info.oi.snaps.size()) {
+ coll_t lc = make_snap_collection(*t,
+ recovery_info.oi.snaps[0]);
+ t->collection_add(lc, coll, recovery_info.soid);
+ if (recovery_info.oi.snaps.size() > 1) {
+ coll_t hc = make_snap_collection(
+ *t,
+ recovery_info.oi.snaps[recovery_info.oi.snaps.size()-1]);
+ t->collection_add(hc, coll, recovery_info.soid);
+ }
+ }
+ }
+
+ if (missing.is_missing(recovery_info.soid) &&
+ missing.missing[recovery_info.soid].need > recovery_info.version) {
+ assert(is_primary());
+ pg_log_entry_t *latest = log.objects[recovery_info.soid];
+ if (latest->op == pg_log_entry_t::LOST_REVERT &&
+ latest->prior_version == recovery_info.version) {
+ dout(10) << " got old revert version " << recovery_info.version
+ << " for " << *latest << dendl;
+ recovery_info.version = latest->version;
+ // update the attr to the revert event version
+ recovery_info.oi.prior_version = recovery_info.oi.version;
+ recovery_info.oi.version = latest->version;
+ bufferlist bl;
+ ::encode(recovery_info.oi, bl);
+ t->setattr(coll, recovery_info.soid, OI_ATTR, bl);
+ }
+ }
+ recover_got(recovery_info.soid, recovery_info.version);
+
+ // update pg
+ write_info(*t);
+}
+
+ObjectRecoveryInfo ReplicatedPG::recalc_subsets(ObjectRecoveryInfo recovery_info)
+{
+ if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
+ return recovery_info;
+
+ SnapSetContext *ssc = get_snapset_context(recovery_info.soid.oid,
+ recovery_info.soid.get_key(),
+ recovery_info.soid.hash,
+ false);
+ ObjectRecoveryInfo new_info = recovery_info;
+ new_info.copy_subset.clear();
+ new_info.clone_subset.clear();
+ assert(ssc);
+ calc_clone_subsets(ssc->snapset, new_info.soid, missing, info.last_backfill,
+ new_info.copy_subset, new_info.clone_subset);
+ put_snapset_context(ssc);
+ return new_info;
+}
+
+void ReplicatedPG::handle_pull_response(OpRequest *op)
+{
+ MOSDSubOp *m = (MOSDSubOp *)op->request;
+ bufferlist data;
+ m->claim_data(data);
+ interval_set<uint64_t> data_included = m->data_included;
+ dout(10) << "handle_push "
+ << m->recovery_info
+ << m->recovery_progress
+ << " data.size() is " << data.length()
+ << " data_included: " << data_included
+ << dendl;
+ if (m->version == eversion_t()) {
+ // replica doesn't have it!
+ _failed_push(op);
+ return;
+ }
+
+ hobject_t &hoid = m->recovery_info.soid;
+ assert((data_included.empty() && data.length() == 0) ||
+ (!data_included.empty() && data.length() > 0));
+
+
+ if (!pulling.count(hoid)) {
+ return;
+ }
+
+ PullInfo &pi = pulling[hoid];
+ if (pi.recovery_info.size == (uint64_t(-1))) {
+ pi.recovery_info.size = m->recovery_info.size;
+ pi.recovery_info.copy_subset.intersection_of(
+ m->recovery_info.copy_subset);
+ }
+
+ pi.recovery_info = recalc_subsets(pi.recovery_info);
+
+ interval_set<uint64_t> usable_intervals;
+ bufferlist usable_data;
+ trim_pushed_data(pi.recovery_info.copy_subset,
+ data_included,
+ data,
+ &usable_intervals,
+ &usable_data);
+ data_included = usable_intervals;
+ data.claim(usable_data);
+
+ bool first = pi.recovery_progress.first;
+ pi.recovery_progress = m->recovery_progress;
+
+ dout(10) << "new recovery_info: "
+ << pi.recovery_info
+ << ", new progress " << pi.recovery_progress
+ << dendl;
+
+ if (first) {
+ bufferlist oibl;
+ if (m->attrset.count(OI_ATTR)) {
+ oibl.push_back(m->attrset[OI_ATTR]);
+ ::decode(pi.recovery_info.oi, oibl);
+ } else {
+ assert(0);
+ }
+ bufferlist ssbl;
+ if (m->attrset.count(SS_ATTR)) {
+ ssbl.push_back(m->attrset[SS_ATTR]);
+ ::decode(pi.recovery_info.ss, ssbl);
+ } else {
+ assert(pi.recovery_info.soid.snap != CEPH_NOSNAP &&
+ pi.recovery_info.soid.snap != CEPH_SNAPDIR);
+ }
+ }
+
+ bool complete = pi.recovery_progress.data_recovered_to >=
+ (pi.recovery_info.copy_subset.empty() ?
+ 0 : pi.recovery_info.copy_subset.range_end());
+
+ if (complete && !pi.recovery_progress.data_complete) {
+ dout(0) << " uh oh, we reached EOF on peer before we got everything we wanted"
+ << dendl;
+ _failed_push(op);
+ return;
+ }
+
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ Context *onreadable = 0;
+ Context *onreadable_sync = 0;
+ submit_push_data(pi.recovery_info, first,
+ data_included, data, m->attrset,
+ t);
+
+ if (complete) {
+ submit_push_complete(pi.recovery_info, t);
+
+ ObjectContext *obc = get_object_context(hoid,
+ pi.recovery_info.oi.oloc,
+ true);
+ obc->ondisk_write_lock();
+ obc->obs.exists = true;
+ obc->obs.oi = pi.recovery_info.oi;
+
+ if (hoid.snap == CEPH_NOSNAP || hoid.snap == CEPH_SNAPDIR) {
+ obc->ssc->snapset = pi.recovery_info.ss;
+ }
+
+ onreadable = new C_OSD_AppliedRecoveredObject(this, t, obc);
+ onreadable_sync = new C_OSD_OndiskWriteUnlock(obc);
+ } else {
+ onreadable = new ObjectStore::C_DeleteTransaction(t);
+ }
+
+ int r = osd->store->
+ queue_transaction(&osr, t,
+ onreadable,
+ new C_OSD_CommittedPushedObject(this, op,
+ info.history.same_interval_since,
+ info.last_complete),
+ onreadable_sync);
+ assert(r == 0);
+
+ if (complete) {
+ finish_recovery_op(hoid);
+ pulling.erase(hoid);
+ pull_from_peer[m->get_source().num()].erase(hoid);
+ update_stats();
+ if (waiting_for_missing_object.count(hoid)) {
+ dout(20) << " kicking waiters on " << hoid << dendl;
+ osd->requeue_ops(this, waiting_for_missing_object[hoid]);
+ waiting_for_missing_object.erase(hoid);
+ if (missing.missing.size() == 0) {
+ osd->requeue_ops(this, waiting_for_all_missing);
+ waiting_for_all_missing.clear();
+ }
+ }
+ } else {
+ send_pull(m->get_source().num(), pi.recovery_info, pi.recovery_progress);
+ }
+}
+
+void ReplicatedPG::handle_push(OpRequest *op)
+{
+ MOSDSubOp *m = (MOSDSubOp *)op->request;
+ dout(10) << "handle_push "
+ << m->recovery_info
+ << m->recovery_progress
+ << dendl;
+ bufferlist data;
+ m->claim_data(data);
+ bool first = m->current_progress.first;
+ bool complete = m->recovery_progress.data_complete;
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ Context *onreadable = new ObjectStore::C_DeleteTransaction(t);
+ Context *onreadable_sync = 0;
+ submit_push_data(m->recovery_info,
+ first,
+ m->data_included,
+ data,
+ m->attrset,
+ t);
+ if (complete)
+ submit_push_complete(m->recovery_info,
+ t);
+
+ int r = osd->store->
+ queue_transaction(&osr, t,
+ onreadable,
+ new C_OSD_CommittedPushedObject(
+ this, op,
+ info.history.same_interval_since,
+ info.last_complete),
+ onreadable_sync);
+ assert(r == 0);
+
+ MOSDSubOpReply *reply = new MOSDSubOpReply(
+ m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
+ osd->cluster_messenger->send_message(reply, m->get_connection());
+}
+
+int ReplicatedPG::send_push(int peer,
+ ObjectRecoveryInfo recovery_info,
+ ObjectRecoveryProgress progress,
+ ObjectRecoveryProgress *out_progress)
+{
+ ObjectRecoveryProgress new_progress = progress;
+
+ tid_t tid = osd->get_tid();
+ osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+ MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid,
+ false, 0, get_osdmap()->get_epoch(),
+ tid, recovery_info.version);
+
+ dout(7) << "send_push_op " << recovery_info.soid
+ << " v " << recovery_info.version
+ << " size " << recovery_info.size
+ << " to osd." << peer
+ << " recovery_info: " << recovery_info
+ << dendl;
+
+ subop->ops = vector<OSDOp>(1);
+ subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
+
+ if (progress.first) {
+ osd->store->getattrs(coll, recovery_info.soid, subop->attrset);
+
+ // Debug
+ bufferlist bv;
+ bv.push_back(subop->attrset[OI_ATTR]);
+ object_info_t oi(bv);
- for (interval_set<uint64_t>::iterator p = data_subset.begin();
- p != data_subset.end();
+ if (oi.version != recovery_info.version) {
+ osd->clog.error() << info.pgid << " push "
+ << recovery_info.soid << " v "
+ << recovery_info.version << " to osd." << peer
+ << " failed because local copy is "
+ << oi.version << "\n";
+ subop->put();
+ return -1;
+ }
+
+ new_progress.first = false;
+ }
+
+
+ subop->data_included.span_of(recovery_info.copy_subset,
+ progress.data_recovered_to,
+ g_conf->osd_recovery_max_chunk);
+
+ for (interval_set<uint64_t>::iterator p = subop->data_included.begin();
+ p != subop->data_included.end();
++p) {
bufferlist bit;
- osd->store->read(coll,
- soid, p.get_start(), p.get_len(), bit);
+ osd->store->read(coll, recovery_info.soid,
+ p.get_start(), p.get_len(), bit);
if (p.get_len() != bit.length()) {
dout(10) << " extent " << p.get_start() << "~" << p.get_len()
- << " is actually " << p.get_start() << "~" << bit.length() << dendl;
+ << " is actually " << p.get_start() << "~" << bit.length()
+ << dendl;
p.set_len(bit.length());
+ new_progress.data_complete = true;
}
- bl.claim_append(bit);
+ subop->ops[0].indata.claim_append(bit);
}
- osd->store->getattrs(coll, soid, attrset);
+ if (!subop->data_included.empty())
+ new_progress.data_recovered_to = subop->data_included.range_end();
- bufferlist bv;
- bv.push_back(attrset[OI_ATTR]);
- object_info_t oi(bv);
+ if (recovery_info.copy_subset.empty() ||
+ new_progress.data_recovered_to >= recovery_info.copy_subset.range_end())
+ new_progress.data_complete = true;
- if (oi.version != version) {
- osd->clog.error() << info.pgid << " push " << soid << " v " << version << " to osd." << peer
- << " failed because local copy is " << oi.version << "\n";
- return -1;
- }
-
- // ok
- dout(7) << "send_push_op " << soid << " v " << oi.version
- << " size " << size
- << " subset " << data_subset
- << " data " << bl.length()
- << " to osd." << peer
- << dendl;
osd->logger->inc(l_osd_push);
- osd->logger->inc(l_osd_push_outb, bl.length());
+ osd->logger->inc(l_osd_push_outb, subop->ops[0].indata.length());
// send
- tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
- MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0,
- get_osdmap()->get_epoch(), tid, oi.version);
- subop->oloc = oi.oloc;
- subop->ops = vector<OSDOp>(1);
- subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
- //subop->ops[0].op.extent.offset = 0;
- //subop->ops[0].op.extent.length = size;
- subop->ops[0].indata = bl;
- subop->data_subset = data_subset;
- subop->clone_subsets = clone_subsets;
- subop->attrset.swap(attrset);
- subop->old_size = size;
- subop->first = first;
- subop->complete = complete;
+ subop->recovery_info = recovery_info;
+ subop->recovery_progress = new_progress;
+ subop->current_progress = progress;
osd->cluster_messenger->
send_message(subop, get_osdmap()->get_cluster_inst(peer));
+ if (out_progress)
+ *out_progress = new_progress;
return 0;
}
dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
<< dendl;
} else {
- push_info_t *pi = &pushing[soid][peer];
-
- bool complete = false;
- if (pi->data_subset.empty() ||
- pi->data_subset.range_end() == pi->data_subset_pushing.range_end())
- complete = true;
-
- if (!complete) {
- // push more
- uint64_t from = pi->data_subset_pushing.range_end();
- pi->data_subset_pushing.span_of(pi->data_subset, from, g_conf->osd_recovery_max_chunk);
- dout(10) << " pushing more, " << pi->data_subset_pushing << " of " << pi->data_subset << dendl;
- complete = pi->data_subset.range_end() == pi->data_subset_pushing.range_end();
- send_push_op(soid, pi->version, peer, pi->size, false, complete,
- pi->data_subset_pushing, pi->clone_subsets);
+ PushInfo *pi = &pushing[soid][peer];
+
+ if (!pi->recovery_progress.data_complete) {
+ dout(10) << " pushing more from, "
+ << pi->recovery_progress.data_recovered_to
+ << " of " << pi->recovery_info.copy_subset << dendl;
+ ObjectRecoveryProgress new_progress;
+ send_push(
+ peer, pi->recovery_info, pi->recovery_progress, &new_progress);
+ pi->recovery_progress = new_progress;
} else {
// done!
if (peer == backfill_target && backfills_in_flight.count(soid))
backfills_in_flight.erase(soid);
else
- peer_missing[peer].got(soid, pi->version);
+ peer_missing[peer].got(soid, pi->recovery_info.version);
pushing[soid].erase(peer);
pi = NULL;
<< " but got " << cpp_strerror(-r) << "\n";
send_push_op_blank(soid, m->get_source().num());
} else {
- uint64_t size = st.st_size;
-
- bool complete = false;
- if (!m->data_subset.empty() && m->data_subset.range_end() >= size)
- complete = true;
-
- // complete==true implies we are definitely complete.
- // complete==false means nothing. we don't know because the primary may
- // not be pulling the entire object.
+ ObjectRecoveryInfo recovery_info = m->recovery_info;
+ ObjectRecoveryProgress progress = m->recovery_progress;
+ if (progress.first && recovery_info.size == ((uint64_t)-1)) {
+ // Adjust size and copy_subset
+ recovery_info.size = st.st_size;
+ recovery_info.copy_subset.clear();
+ if (st.st_size)
+ recovery_info.copy_subset.insert(0, st.st_size);
+ assert(recovery_info.clone_subset.empty());
+ }
- r = send_push_op(soid, m->version, m->get_source().num(), size, m->first, complete,
- m->data_subset, m->clone_subsets);
+ r = send_push(m->get_source().num(), recovery_info, progress);
if (r < 0)
send_push_op_blank(soid, m->get_source().num());
}
log_subop_stats(op, 0, l_osd_sop_pull_lat);
-
op->put();
}
}
}
-/** op_push
- * NOTE: called from opqueue.
- */
-void ReplicatedPG::sub_op_push(OpRequest *op)
+void ReplicatedPG::trim_pushed_data(
+ const interval_set<uint64_t> ©_subset,
+ const interval_set<uint64_t> &intervals_received,
+ bufferlist data_received,
+ interval_set<uint64_t> *intervals_usable,
+ bufferlist *data_usable)
{
- MOSDSubOp *m = (MOSDSubOp*)op->request;
- assert(m->get_header().type == MSG_OSD_SUBOP);
-
- const hobject_t& soid = m->poid;
- eversion_t v = m->version;
- OSDOp& push = m->ops[0];
-
- dout(7) << "op_push "
- << soid
- << " v " << v
- << " " << m->oloc
- << " len " << push.op.extent.length
- << " data_subset " << m->data_subset
- << " clone_subsets " << m->clone_subsets
- << " data len " << m->get_data().length()
- << dendl;
-
- if (v == eversion_t()) {
- // replica doesn't have it!
- _failed_push(op);
+ if (intervals_received.subset_of(copy_subset)) {
+ *intervals_usable = intervals_received;
+ *data_usable = data_received;
return;
}
- op->mark_started();
-
- interval_set<uint64_t> data_subset;
- map<hobject_t, interval_set<uint64_t> > clone_subsets;
-
- bufferlist data;
- m->claim_data(data);
-
- // we need these later, and they get clobbered by t.setattrs()
- bufferlist oibl;
- if (m->attrset.count(OI_ATTR))
- oibl.push_back(m->attrset[OI_ATTR]);
- bufferlist ssbl;
- if (m->attrset.count(SS_ATTR))
- ssbl.push_back(m->attrset[SS_ATTR]);
-
- // determine data/clone subsets
- data_subset = m->data_subset;
- if (data_subset.empty() && push.op.extent.length && push.op.extent.length == data.length())
- data_subset.insert(0, push.op.extent.length);
- clone_subsets = m->clone_subsets;
-
- pull_info_t *pi = 0;
- bool first = m->first;
- bool complete = m->complete;
-
- // op->complete == true means we reached the end of the object (file size)
- // op->complete == false means nothing; we may not have asked for the whole thing.
-
- if (is_primary()) {
- if (pulling.count(soid) == 0) {
- dout(10) << " not pulling, ignoring" << dendl;
- op->put();
- return;
- }
- pi = &pulling[soid];
-
- // did we learn object size?
- if (pi->need_size) {
- dout(10) << " learned object size is " << m->old_size << dendl;
- pi->data_subset.erase(m->old_size, (uint64_t)-1 - m->old_size);
- pi->need_size = false;
- }
-
- if (soid.snap && soid.snap < CEPH_NOSNAP) {
- // clone. make sure we have enough data.
- SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
- assert(ssc);
-
- clone_subsets.clear(); // forget what pusher said; recalculate cloning.
-
- interval_set<uint64_t> data_needed;
- calc_clone_subsets(ssc->snapset, soid, missing, info.last_backfill,
- data_needed, clone_subsets);
- pi->data_subset = data_needed;
- put_snapset_context(ssc);
-
- interval_set<uint64_t> overlap;
- overlap.intersection_of(data_subset, data_needed);
-
- dout(10) << "sub_op_push need " << data_needed << ", got " << data_subset
- << ", overlap " << overlap << dendl;
-
- // did we get more data than we need?
- if (!data_subset.subset_of(data_needed)) {
- interval_set<uint64_t> extra = data_subset;
- interval_set<uint64_t> usable;
- usable.intersection_of(extra, data_needed);
- extra.subtract(usable);
- dout(10) << " we got some extra: " << extra << dendl;
-
- bufferlist result;
- int off = 0;
- for (interval_set<uint64_t>::const_iterator p = usable.begin();
- p != usable.end();
- ++p) {
- interval_set<uint64_t> x;
- x.insert(p.get_start(), p.get_len());
- x.intersection_of(data_needed);
- dout(20) << " data_subset object extent " << p.get_start() << "~" << p.get_len() << " need " << x << dendl;
- if (!x.empty()) {
- uint64_t first = x.begin().get_start();
- uint64_t len = x.begin().get_len();
- bufferlist sub;
- int boff = off + (first - p.get_start());
- dout(20) << " keeping buffer extent " << boff << "~" << len << dendl;
- sub.substr_of(data, boff, len);
- result.claim_append(sub);
- }
- off += p.get_len();
- }
- data.claim(result);
- data_subset.intersection_of(data_needed);
- dout(20) << " new data len is " << data.length() << dendl;
- }
-
- // did we get everything we wanted?
- if (pi->data_subset.empty()) {
- complete = true;
- } else if (data_subset.empty()) {
- complete = false;
- } else {
- complete = pi->data_subset.range_end() == data_subset.range_end();
- }
-
- if (m->complete && !complete) {
- dout(0) << " uh oh, we reached EOF on peer before we got everything we wanted" << dendl;
- _failed_push(op);
- return;
- }
-
- } else {
- // head|unversioned. for now, primary will _only_ pull data copies of the head (no cloning)
- assert(m->clone_subsets.empty());
- }
- }
- dout(15) << " data_subset " << data_subset
- << " clone_subsets " << clone_subsets
- << " first=" << first << " complete=" << complete
- << dendl;
-
- coll_t target;
- if (first && complete)
- target = coll;
- else
- target = coll_t::TEMP_COLL;
-
- // write object and add it to the PG
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- Context *onreadable = 0;
- Context *onreadable_sync = 0;
+ intervals_usable->intersection_of(copy_subset,
+ intervals_received);
- if (first && complete && soid.snap != CEPH_NOSNAP)
- remove_object_with_snap_hardlinks(*t, soid);
- else if (first)
- t->remove(target, soid); // in case old version exists
-
- // write data
- uint64_t boff = 0;
- for (interval_set<uint64_t>::const_iterator p = data_subset.begin();
- p != data_subset.end();
+ uint64_t off = 0;
+ for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
+ p != intervals_received.end();
++p) {
- bufferlist bit;
- bit.substr_of(data, boff, p.get_len());
- dout(15) << " write " << p.get_start() << "~" << p.get_len() << dendl;
- t->write(target, soid, p.get_start(), p.get_len(), bit);
- boff += p.get_len();
- }
-
- if (complete) {
- // Clear out old snapdir contents
- if (!first) {
- if (soid.snap != CEPH_NOSNAP) {
- remove_object_with_snap_hardlinks(*t, soid);
- } else {
- t->remove(coll, soid);
- }
- t->collection_add(coll, target, soid);
- t->collection_remove(target, soid);
- }
-
- // clone bits
- for (map<hobject_t, interval_set<uint64_t> >::const_iterator p = clone_subsets.begin();
- p != clone_subsets.end();
- ++p)
- {
- for (interval_set<uint64_t>::const_iterator q = p->second.begin();
- q != p->second.end();
- ++q)
- {
- dout(15) << " clone_range " << p->first << " "
- << q.get_start() << "~" << q.get_len() << dendl;
- t->clone_range(coll, p->first, soid,
- q.get_start(), q.get_len(), q.get_start());
- }
- }
-
- if (data_subset.empty())
- t->touch(coll, soid);
-
- t->setattrs(coll, soid, m->attrset);
- if (soid.snap && soid.snap < CEPH_NOSNAP &&
- m->attrset.count(OI_ATTR)) {
- bufferlist bl;
- bl.push_back(m->attrset[OI_ATTR]);
- object_info_t oi(bl);
- if (oi.snaps.size()) {
- coll_t lc = make_snap_collection(*t, oi.snaps[0]);
- t->collection_add(lc, coll, soid);
- if (oi.snaps.size() > 1) {
- coll_t hc = make_snap_collection(*t, oi.snaps[oi.snaps.size()-1]);
- t->collection_add(hc, coll, soid);
- }
- }
- }
-
- bool revert = false;
- if (missing.is_missing(soid) && missing.missing[soid].need > v) {
- pg_log_entry_t *latest = log.objects[soid];
- if (latest->op == pg_log_entry_t::LOST_REVERT &&
- latest->prior_version == v) {
- dout(10) << " got old revert version " << v << " for " << *latest << dendl;
- revert = true;
- v = latest->version;
- }
+ interval_set<uint64_t> x;
+ x.insert(p.get_start(), p.get_len());
+ x.intersection_of(copy_subset);
+ for (interval_set<uint64_t>::const_iterator q = x.begin();
+ q != x.end();
+ ++q) {
+ bufferlist sub;
+ uint64_t data_off = off + (q.get_start() - p.get_start());
+ sub.substr_of(data_received, data_off, q.get_len());
+ data_usable->claim_append(sub);
}
-
- recover_got(soid, v);
-
- // update pg
- write_info(*t);
-
- // track ObjectContext
- if (is_primary()) {
- dout(10) << " setting up obc for " << soid << dendl;
- ObjectContext *obc = get_object_context(soid, m->oloc, true);
- assert(obc->registered);
- obc->ondisk_write_lock();
-
- obc->obs.exists = true;
- obc->obs.oi.decode(oibl);
-
- if (revert) {
- // update the attr to the revert event version
- obc->obs.oi.prior_version = obc->obs.oi.version;
- obc->obs.oi.version = v;
- bufferlist bl;
- ::encode(obc->obs.oi, bl);
- t->setattr(coll, soid, OI_ATTR, bl);
- }
-
- // suck in snapset context?
- SnapSetContext *ssc = obc->ssc;
- if (ssbl.length()) {
- bufferlist::iterator sp = ssbl.begin();
- ssc->snapset.decode(sp);
- }
-
- onreadable = new C_OSD_AppliedRecoveredObject(this, t, obc);
- onreadable_sync = new C_OSD_OndiskWriteUnlock(obc);
- } else {
- onreadable = new ObjectStore::C_DeleteTransaction(t);
- }
-
- } else {
- onreadable = new ObjectStore::C_DeleteTransaction(t);
+ off += p.get_len();
}
+}
- // apply to disk!
- int r = osd->store->queue_transaction(&osr, t,
- onreadable,
- new C_OSD_CommittedPushedObject(this, op,
- info.history.same_interval_since,
- info.last_complete),
- onreadable_sync);
- assert(r == 0);
-
+/** op_push
+ * NOTE: called from opqueue.
+ */
+void ReplicatedPG::sub_op_push(OpRequest *op)
+{
+ op->mark_started();
if (is_primary()) {
- assert(pi);
-
- if (complete) {
- // close out pull op
- pulling.erase(soid);
- pull_from_peer[pi->from].erase(soid);
- finish_recovery_op(soid);
-
- update_stats();
- } else {
- // pull more
- pi->data_subset_pulling.span_of(pi->data_subset, data_subset.empty() ? 0 : data_subset.range_end(),
- g_conf->osd_recovery_max_chunk);
- dout(10) << " pulling more, " << pi->data_subset_pulling << " of " << pi->data_subset << dendl;
- send_pull_op(soid, v, false, pi->data_subset_pulling, pi->from);
- }
-
-
- /*
- if (is_active()) {
- // are others missing this too? (only if we're active.. skip
- // this part if we're still repeering, it'll just confuse us)
- for (unsigned i=1; i<acting.size(); i++) {
- int peer = acting[i];
- assert(peer_missing.count(peer));
- if (peer_missing[peer].is_missing(soid)) {
- push_to_replica(soid, peer); // ok, push it, and they (will) have it now.
- start_recovery_op(soid);
- }
- }
- }
- */
-
+ handle_pull_response(op);
} else {
- // ack if i'm a replica and being pushed to.
- MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
- assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
- osd->cluster_messenger->send_message(reply, m->get_connection());
- }
-
- if (complete) {
- // kick waiters
- if (waiting_for_missing_object.count(soid)) {
- dout(20) << " kicking waiters on " << soid << dendl;
- osd->requeue_ops(this, waiting_for_missing_object[soid]);
- waiting_for_missing_object.erase(soid);
- if (missing.missing.size() == 0) {
- osd->requeue_ops(this, waiting_for_all_missing);
- waiting_for_all_missing.clear();
- }
- } else {
- dout(20) << " no waiters on " << soid << dendl;
- /*for (hash_map<hobject_t,list<class Message*> >::iterator p = waiting_for_missing_object.begin();
- p != waiting_for_missing_object.end();
- p++)
- dout(20) << " " << p->first << dendl;
- */
- }
+ handle_push(op);
}
-
- op->put(); // at the end... soid is a ref to op->soid!
+ op->put();
+ return;
}
void ReplicatedPG::_failed_push(OpRequest *op)