From: Samuel Just Date: Wed, 12 Jun 2013 20:28:15 +0000 (-0700) Subject: ReplicatedPG: pass a PushOp into handle_pull_response X-Git-Tag: v0.67-rc1~138^2~1^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a4984328be4e5707ec94196eb5babf8e14989448;p=ceph.git ReplicatedPG: pass a PushOp into handle_pull_response This is the first step toward packaging multiple pushes/pulls into a single message. Signed-off-by: Samuel Just --- diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index f0f959fa16be..d10f88ffed9d 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -5507,37 +5507,38 @@ ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recove return new_info; } -void ReplicatedPG::handle_pull_response(OpRequestRef op) +bool ReplicatedPG::handle_pull_response( + int from, PushOp &pop, PullOp *response, + ObjectStore::Transaction *t) { - MOSDSubOp *m = static_cast(op->request); + interval_set data_included = pop.data_included; bufferlist data; - m->claim_data(data); - interval_set data_included = m->data_included; + data.claim(pop.data); dout(10) << "handle_pull_response " - << m->recovery_info - << m->recovery_progress + << pop.recovery_info + << pop.after_progress << " data.size() is " << data.length() << " data_included: " << data_included << dendl; - if (m->version == eversion_t()) { + if (pop.version == eversion_t()) { // replica doesn't have it! - _failed_push(op); - return; + _failed_push(from, pop.soid); + return false; } - hobject_t &hoid = m->recovery_info.soid; + hobject_t &hoid = pop.soid; assert((data_included.empty() && data.length() == 0) || (!data_included.empty() && data.length() > 0)); if (!pulling.count(hoid)) { - return; + return false; } PullInfo &pi = pulling[hoid]; if (pi.recovery_info.size == (uint64_t(-1))) { - pi.recovery_info.size = m->recovery_info.size; + pi.recovery_info.size = pop.recovery_info.size; pi.recovery_info.copy_subset.intersection_of( - m->recovery_info.copy_subset); + pop.recovery_info.copy_subset); } pi.recovery_info = recalc_subsets(pi.recovery_info); @@ -5555,7 +5556,7 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op) info.stats.stats.sum.num_bytes_recovered += data.length(); bool first = pi.recovery_progress.first; - pi.recovery_progress = m->recovery_progress; + pi.recovery_progress = pop.after_progress; dout(10) << "new recovery_info " << pi.recovery_info << ", new progress " << pi.recovery_progress @@ -5563,15 +5564,15 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op) if (first) { bufferlist oibl; - if (m->attrset.count(OI_ATTR)) { - oibl.push_back(m->attrset[OI_ATTR]); + if (pop.attrset.count(OI_ATTR)) { + oibl.push_back(pop.attrset[OI_ATTR]); ::decode(pi.recovery_info.oi, oibl); } else { assert(0); } bufferlist ssbl; - if (m->attrset.count(SS_ATTR)) { - ssbl.push_back(m->attrset[SS_ATTR]); + if (pop.attrset.count(SS_ATTR)) { + ssbl.push_back(pop.attrset[SS_ATTR]); ::decode(pi.recovery_info.ss, ssbl); } else { assert(pi.recovery_info.soid.snap != CEPH_NOSNAP && @@ -5581,19 +5582,15 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op) bool complete = pi.is_complete(); - ObjectStore::Transaction *t = new ObjectStore::Transaction; - Context *onreadable = 0; - Context *onreadable_sync = 0; - Context *oncomplete = 0; submit_push_data(pi.recovery_info, first, complete, data_included, data, - m->omap_header, - m->attrset, - m->omap_entries, + pop.omap_header, + pop.attrset, + pop.omap_entries, t); - info.stats.stats.sum.num_keys_recovered += m->omap_entries.size(); + info.stats.stats.sum.num_keys_recovered += pop.omap_entries.size(); if (complete) { info.stats.stats.sum.num_objects_recovered++; @@ -5614,30 +5611,21 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op) // keep track of active pushes for scrub ++active_pushes; - onreadable = new C_OSD_AppliedRecoveredObject(this, t, obc); - onreadable_sync = new C_OSD_OndiskWriteUnlock(obc); - oncomplete = new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch()); - } else { - onreadable = new ObjectStore::C_DeleteTransaction(t); + t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc)); + t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc)); + t->register_on_complete( + new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch())); } - int r = osd->store-> - queue_transaction( - osr.get(), t, - onreadable, - new C_OSD_CommittedPushedObject( - this, op, - get_osdmap()->get_epoch(), - info.last_complete), - onreadable_sync, - oncomplete, - TrackedOpRef() - ); - assert(r == 0); + t->register_on_commit( + new C_OSD_CommittedPushedObject( + this, + get_osdmap()->get_epoch(), + info.last_complete)); if (complete) { pulling.erase(hoid); - pull_from_peer[m->get_source().num()].erase(hoid); + pull_from_peer[from].erase(hoid); publish_stats_to_osd(); if (waiting_for_missing_object.count(hoid)) { dout(20) << " kicking waiters on " << hoid << dendl; @@ -5648,11 +5636,12 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op) waiting_for_all_missing.clear(); } } + return false; } else { - send_pull(pi.priority, - m->get_source().num(), - pi.recovery_info, - pi.recovery_progress); + response->soid = pop.soid; + response->recovery_info = pi.recovery_info; + response->recovery_progress = pi.recovery_progress; + return true; } } @@ -6045,7 +6034,7 @@ void ReplicatedPG::_committed_pushed_object( unlock(); } -void ReplicatedPG::_applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc) +void ReplicatedPG::_applied_recovered_object(ObjectContext *obc) { lock(); dout(10) << "_applied_recovered_object " << *obc << dendl; @@ -6061,7 +6050,6 @@ void ReplicatedPG::_applied_recovered_object(ObjectStore::Transaction *t, Object } unlock(); - delete t; } void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t) @@ -6155,21 +6143,42 @@ void ReplicatedPG::trim_pushed_data( void ReplicatedPG::sub_op_push(OpRequestRef op) { op->mark_started(); + MOSDSubOp *m = static_cast(op->request); if (is_primary()) { - handle_pull_response(op); + PushOp pop; + pop.soid = m->recovery_info.soid; + pop.version = m->version; + m->claim_data(pop.data); + pop.data_included.swap(m->data_included); + pop.omap_header.swap(m->omap_header); + pop.omap_entries.swap(m->omap_entries); + pop.attrset.swap(m->attrset); + pop.recovery_info = m->recovery_info; + pop.before_progress = m->current_progress; + pop.after_progress = m->recovery_progress; + + PullOp resp; + ObjectStore::Transaction *t = new ObjectStore::Transaction; + t->register_on_applied(new ObjectStore::C_DeleteTransaction(t)); + bool more = handle_pull_response(m->get_source().num(), pop, &resp, t); + t->register_on_commit(new C_OnPushCommit(this, op)); + osd->store->queue_transaction(osr.get(), t); + if (more) { + send_pull( + m->get_source().num(), + m->get_priority(), + resp.recovery_info, + resp.recovery_progress); + } } else { handle_push(op); } return; } -void ReplicatedPG::_failed_push(OpRequestRef op) +void ReplicatedPG::_failed_push(int from, const hobject_t &soid) { - MOSDSubOp *m = static_cast(op->request); - assert(m->get_header().type == MSG_OSD_SUBOP); - const hobject_t& soid = m->poid; - int from = m->get_source().num(); map >::iterator p = missing_loc.find(soid); if (p != missing_loc.end()) { dout(0) << "_failed_push " << soid << " from osd." << from @@ -6842,6 +6851,7 @@ int ReplicatedPG::recover_primary(int max) obc->obs.oi.version = latest->version; ObjectStore::Transaction *t = new ObjectStore::Transaction; + t->register_on_applied(new ObjectStore::C_DeleteTransaction(t)); bufferlist b2; obc->obs.oi.encode(b2); t->setattr(coll, soid, OI_ATTR, b2); @@ -6851,9 +6861,9 @@ int ReplicatedPG::recover_primary(int max) ++active_pushes; osd->store->queue_transaction(osr.get(), t, - new C_OSD_AppliedRecoveredObject(this, t, obc), + new C_OSD_AppliedRecoveredObject(this, obc), new C_OSD_CommittedPushedObject( - this, OpRequestRef(), + this, get_osdmap()->get_epoch(), info.last_complete), new C_OSD_OndiskWriteUnlock(obc)); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index bc44dee0cff3..4e749e02d55f 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -556,7 +556,9 @@ protected: bufferlist data_received, interval_set *intervals_usable, bufferlist *data_usable); - void handle_pull_response(OpRequestRef op); + bool handle_pull_response( + int from, PushOp &op, PullOp *response, + ObjectStore::Transaction *t); void handle_push(OpRequestRef op); int send_push(int priority, int peer, const ObjectRecoveryInfo& recovery_info, @@ -809,12 +811,11 @@ protected: }; struct C_OSD_AppliedRecoveredObject : public Context { ReplicatedPGRef pg; - ObjectStore::Transaction *t; ObjectContext *obc; - C_OSD_AppliedRecoveredObject(ReplicatedPG *p, ObjectStore::Transaction *tt, ObjectContext *o) : - pg(p), t(tt), obc(o) {} + C_OSD_AppliedRecoveredObject(ReplicatedPG *p, ObjectContext *o) : + pg(p), obc(o) {} void finish(int r) { - pg->_applied_recovered_object(t, obc); + pg->_applied_recovered_object(obc); } }; struct C_OSD_CommittedPushedObject : public Context { @@ -875,12 +876,12 @@ protected: void sub_op_modify_commit(RepModify *rm); void sub_op_modify_reply(OpRequestRef op); - void _applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc); + void _applied_recovered_object(ObjectContext *obc); void _applied_recovered_object_replica(ObjectStore::Transaction *t); void _committed_pushed_object(epoch_t epoch, eversion_t lc); void recover_got(hobject_t oid, eversion_t v); void sub_op_push(OpRequestRef op); - void _failed_push(OpRequestRef op); + void _failed_push(int from, const hobject_t &soid); void sub_op_push_reply(OpRequestRef op); void sub_op_pull(OpRequestRef op);