return new_info;
}
-void ReplicatedPG::handle_pull_response(OpRequestRef op)
+bool ReplicatedPG::handle_pull_response(
+ int from, PushOp &pop, PullOp *response,
+ ObjectStore::Transaction *t)
{
- MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
+ interval_set<uint64_t> data_included = pop.data_included;
bufferlist data;
- m->claim_data(data);
- interval_set<uint64_t> data_included = m->data_included;
+ data.claim(pop.data);
dout(10) << "handle_pull_response "
- << m->recovery_info
- << m->recovery_progress
+ << pop.recovery_info
+ << pop.after_progress
<< " data.size() is " << data.length()
<< " data_included: " << data_included
<< dendl;
- if (m->version == eversion_t()) {
+ if (pop.version == eversion_t()) {
// replica doesn't have it!
- _failed_push(op);
- return;
+ _failed_push(from, pop.soid);
+ return false;
}
- hobject_t &hoid = m->recovery_info.soid;
+ hobject_t &hoid = pop.soid;
assert((data_included.empty() && data.length() == 0) ||
(!data_included.empty() && data.length() > 0));
if (!pulling.count(hoid)) {
- return;
+ return false;
}
PullInfo &pi = pulling[hoid];
if (pi.recovery_info.size == (uint64_t(-1))) {
- pi.recovery_info.size = m->recovery_info.size;
+ pi.recovery_info.size = pop.recovery_info.size;
pi.recovery_info.copy_subset.intersection_of(
- m->recovery_info.copy_subset);
+ pop.recovery_info.copy_subset);
}
pi.recovery_info = recalc_subsets(pi.recovery_info);
info.stats.stats.sum.num_bytes_recovered += data.length();
bool first = pi.recovery_progress.first;
- pi.recovery_progress = m->recovery_progress;
+ pi.recovery_progress = pop.after_progress;
dout(10) << "new recovery_info " << pi.recovery_info
<< ", new progress " << pi.recovery_progress
if (first) {
bufferlist oibl;
- if (m->attrset.count(OI_ATTR)) {
- oibl.push_back(m->attrset[OI_ATTR]);
+ if (pop.attrset.count(OI_ATTR)) {
+ oibl.push_back(pop.attrset[OI_ATTR]);
::decode(pi.recovery_info.oi, oibl);
} else {
assert(0);
}
bufferlist ssbl;
- if (m->attrset.count(SS_ATTR)) {
- ssbl.push_back(m->attrset[SS_ATTR]);
+ if (pop.attrset.count(SS_ATTR)) {
+ ssbl.push_back(pop.attrset[SS_ATTR]);
::decode(pi.recovery_info.ss, ssbl);
} else {
assert(pi.recovery_info.soid.snap != CEPH_NOSNAP &&
bool complete = pi.is_complete();
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- Context *onreadable = 0;
- Context *onreadable_sync = 0;
- Context *oncomplete = 0;
submit_push_data(pi.recovery_info, first,
complete,
data_included, data,
- m->omap_header,
- m->attrset,
- m->omap_entries,
+ pop.omap_header,
+ pop.attrset,
+ pop.omap_entries,
t);
- info.stats.stats.sum.num_keys_recovered += m->omap_entries.size();
+ info.stats.stats.sum.num_keys_recovered += pop.omap_entries.size();
if (complete) {
info.stats.stats.sum.num_objects_recovered++;
// keep track of active pushes for scrub
++active_pushes;
- onreadable = new C_OSD_AppliedRecoveredObject(this, t, obc);
- onreadable_sync = new C_OSD_OndiskWriteUnlock(obc);
- oncomplete = new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch());
- } else {
- onreadable = new ObjectStore::C_DeleteTransaction(t);
+ t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
+ t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
+ t->register_on_complete(
+ new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch()));
}
- int r = osd->store->
- queue_transaction(
- osr.get(), t,
- onreadable,
- new C_OSD_CommittedPushedObject(
- this, op,
- get_osdmap()->get_epoch(),
- info.last_complete),
- onreadable_sync,
- oncomplete,
- TrackedOpRef()
- );
- assert(r == 0);
+ t->register_on_commit(
+ new C_OSD_CommittedPushedObject(
+ this,
+ get_osdmap()->get_epoch(),
+ info.last_complete));
if (complete) {
pulling.erase(hoid);
- pull_from_peer[m->get_source().num()].erase(hoid);
+ pull_from_peer[from].erase(hoid);
publish_stats_to_osd();
if (waiting_for_missing_object.count(hoid)) {
dout(20) << " kicking waiters on " << hoid << dendl;
waiting_for_all_missing.clear();
}
}
+ return false;
} else {
- send_pull(pi.priority,
- m->get_source().num(),
- pi.recovery_info,
- pi.recovery_progress);
+ response->soid = pop.soid;
+ response->recovery_info = pi.recovery_info;
+ response->recovery_progress = pi.recovery_progress;
+ return true;
}
}
unlock();
}
-void ReplicatedPG::_applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc)
+void ReplicatedPG::_applied_recovered_object(ObjectContext *obc)
{
lock();
dout(10) << "_applied_recovered_object " << *obc << dendl;
}
unlock();
- delete t;
}
void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t)
void ReplicatedPG::sub_op_push(OpRequestRef op)
{
op->mark_started();
+ MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
if (is_primary()) {
- handle_pull_response(op);
+ PushOp pop;
+ pop.soid = m->recovery_info.soid;
+ pop.version = m->version;
+ m->claim_data(pop.data);
+ pop.data_included.swap(m->data_included);
+ pop.omap_header.swap(m->omap_header);
+ pop.omap_entries.swap(m->omap_entries);
+ pop.attrset.swap(m->attrset);
+ pop.recovery_info = m->recovery_info;
+ pop.before_progress = m->current_progress;
+ pop.after_progress = m->recovery_progress;
+
+ PullOp resp;
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ t->register_on_applied(new ObjectStore::C_DeleteTransaction(t));
+ bool more = handle_pull_response(m->get_source().num(), pop, &resp, t);
+ t->register_on_commit(new C_OnPushCommit(this, op));
+ osd->store->queue_transaction(osr.get(), t);
+ if (more) {
+ send_pull(
+ m->get_source().num(),
+ m->get_priority(),
+ resp.recovery_info,
+ resp.recovery_progress);
+ }
} else {
handle_push(op);
}
return;
}
-void ReplicatedPG::_failed_push(OpRequestRef op)
+void ReplicatedPG::_failed_push(int from, const hobject_t &soid)
{
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
- assert(m->get_header().type == MSG_OSD_SUBOP);
- const hobject_t& soid = m->poid;
- int from = m->get_source().num();
map<hobject_t,set<int> >::iterator p = missing_loc.find(soid);
if (p != missing_loc.end()) {
dout(0) << "_failed_push " << soid << " from osd." << from
obc->obs.oi.version = latest->version;
ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ t->register_on_applied(new ObjectStore::C_DeleteTransaction(t));
bufferlist b2;
obc->obs.oi.encode(b2);
t->setattr(coll, soid, OI_ATTR, b2);
++active_pushes;
osd->store->queue_transaction(osr.get(), t,
- new C_OSD_AppliedRecoveredObject(this, t, obc),
+ new C_OSD_AppliedRecoveredObject(this, obc),
new C_OSD_CommittedPushedObject(
- this, OpRequestRef(),
+ this,
get_osdmap()->get_epoch(),
info.last_complete),
new C_OSD_OndiskWriteUnlock(obc));
bufferlist data_received,
interval_set<uint64_t> *intervals_usable,
bufferlist *data_usable);
- void handle_pull_response(OpRequestRef op);
+ bool handle_pull_response(
+ int from, PushOp &op, PullOp *response,
+ ObjectStore::Transaction *t);
void handle_push(OpRequestRef op);
int send_push(int priority, int peer,
const ObjectRecoveryInfo& recovery_info,
};
struct C_OSD_AppliedRecoveredObject : public Context {
ReplicatedPGRef pg;
- ObjectStore::Transaction *t;
ObjectContext *obc;
- C_OSD_AppliedRecoveredObject(ReplicatedPG *p, ObjectStore::Transaction *tt, ObjectContext *o) :
- pg(p), t(tt), obc(o) {}
+ C_OSD_AppliedRecoveredObject(ReplicatedPG *p, ObjectContext *o) :
+ pg(p), obc(o) {}
void finish(int r) {
- pg->_applied_recovered_object(t, obc);
+ pg->_applied_recovered_object(obc);
}
};
struct C_OSD_CommittedPushedObject : public Context {
void sub_op_modify_commit(RepModify *rm);
void sub_op_modify_reply(OpRequestRef op);
- void _applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc);
+ void _applied_recovered_object(ObjectContext *obc);
void _applied_recovered_object_replica(ObjectStore::Transaction *t);
void _committed_pushed_object(epoch_t epoch, eversion_t lc);
void recover_got(hobject_t oid, eversion_t v);
void sub_op_push(OpRequestRef op);
- void _failed_push(OpRequestRef op);
+ void _failed_push(int from, const hobject_t &soid);
void sub_op_push_reply(OpRequestRef op);
void sub_op_pull(OpRequestRef op);