From: Samuel Just Date: Wed, 12 Jun 2013 22:10:59 +0000 (-0700) Subject: OSD: convert handle_push to use PushOp X-Git-Tag: v0.67-rc1~138^2~1^2~8 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=eec86b8d3c4be7e3100da9705f53b04fa63cae1f;p=ceph.git OSD: convert handle_push to use PushOp Signed-off-by: Samuel Just --- diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index d10f88ffed9d..237b80f119a2 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -5655,59 +5655,41 @@ struct C_OnPushCommit : public Context { } }; -void ReplicatedPG::handle_push(OpRequestRef op) +void ReplicatedPG::handle_push( + int from, PushOp &pop, PushReplyOp *response, + ObjectStore::Transaction *t) { - MOSDSubOp *m = static_cast(op->request); dout(10) << "handle_push " - << m->recovery_info - << m->recovery_progress + << pop.recovery_info + << pop.after_progress << dendl; bufferlist data; - m->claim_data(data); - bool first = m->current_progress.first; - bool complete = m->recovery_progress.data_complete && - m->recovery_progress.omap_complete; - ObjectStore::Transaction *t = new ObjectStore::Transaction; + data.claim(pop.data); + bool first = pop.before_progress.first; + bool complete = pop.after_progress.data_complete && + pop.after_progress.omap_complete; // keep track of active pushes for scrub ++active_pushes; - MOSDSubOpReply *reply = new MOSDSubOpReply( - m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); - assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); - - Context *oncomplete = new C_OSD_CompletedPushedObjectReplica( - osd, reply, m->get_connection()); - Context *onreadable = new C_OSD_AppliedRecoveredObjectReplica(this, t); - Context *onreadable_sync = 0; - submit_push_data(m->recovery_info, + response->soid = pop.recovery_info.soid; + t->register_on_applied( + new C_OSD_AppliedRecoveredObjectReplica(this)); + submit_push_data(pop.recovery_info, first, complete, - m->data_included, + pop.data_included, data, - m->omap_header, - m->attrset, - m->omap_entries, + pop.omap_header, + pop.attrset, + pop.omap_entries, t); - t->register_on_commit(new C_OnPushCommit(this, op)); - int r = osd->store-> - queue_transaction( - osr.get(), t, - onreadable, - new C_OSD_CommittedPushedObject( - this, - get_osdmap()->get_epoch(), - info.last_complete), - onreadable_sync, - oncomplete, - OpRequestRef() - ); - assert(r == 0); - - osd->logger->inc(l_osd_push_in); - osd->logger->inc(l_osd_push_inb, m->ops[0].indata.length()); - + t->register_on_commit( + new C_OSD_CommittedPushedObject( + this, + get_osdmap()->get_epoch(), + info.last_complete)); } int ReplicatedPG::send_push(int prio, int peer, @@ -6052,7 +6034,7 @@ void ReplicatedPG::_applied_recovered_object(ObjectContext *obc) unlock(); } -void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t) +void ReplicatedPG::_applied_recovered_object_replica() { lock(); dout(10) << "_applied_recovered_object_replica" << dendl; @@ -6068,7 +6050,6 @@ void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t } unlock(); - delete t; } void ReplicatedPG::recover_got(hobject_t oid, eversion_t v) @@ -6145,35 +6126,41 @@ void ReplicatedPG::sub_op_push(OpRequestRef op) op->mark_started(); MOSDSubOp *m = static_cast(op->request); - if (is_primary()) { - PushOp pop; - pop.soid = m->recovery_info.soid; - pop.version = m->version; - m->claim_data(pop.data); - pop.data_included.swap(m->data_included); - pop.omap_header.swap(m->omap_header); - pop.omap_entries.swap(m->omap_entries); - pop.attrset.swap(m->attrset); - pop.recovery_info = m->recovery_info; - pop.before_progress = m->current_progress; - pop.after_progress = m->recovery_progress; + PushOp pop; + pop.soid = m->recovery_info.soid; + pop.version = m->version; + m->claim_data(pop.data); + pop.data_included.swap(m->data_included); + pop.omap_header.swap(m->omap_header); + pop.omap_entries.swap(m->omap_entries); + pop.attrset.swap(m->attrset); + pop.recovery_info = m->recovery_info; + pop.before_progress = m->current_progress; + pop.after_progress = m->recovery_progress; + ObjectStore::Transaction *t = new ObjectStore::Transaction; + if (is_primary()) { PullOp resp; - ObjectStore::Transaction *t = new ObjectStore::Transaction; - t->register_on_applied(new ObjectStore::C_DeleteTransaction(t)); bool more = handle_pull_response(m->get_source().num(), pop, &resp, t); - t->register_on_commit(new C_OnPushCommit(this, op)); - osd->store->queue_transaction(osr.get(), t); if (more) { send_pull( - m->get_source().num(), m->get_priority(), + m->get_source().num(), resp.recovery_info, resp.recovery_progress); } } else { - handle_push(op); + PushReplyOp resp; + MOSDSubOpReply *reply = new MOSDSubOpReply( + m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + reply->set_priority(m->get_priority()); + assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); + handle_push(m->get_source().num(), pop, &resp, t); + t->register_on_complete(new C_OSD_SendMessageOnConn( + osd, reply, m->get_connection())); } + t->register_on_commit(new C_OnPushCommit(this, op)); + osd->store->queue_transaction(osr.get(), t); return; } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 4e749e02d55f..a20ebad9cff1 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -559,7 +559,9 @@ protected: bool handle_pull_response( int from, PushOp &op, PullOp *response, ObjectStore::Transaction *t); - void handle_push(OpRequestRef op); + void handle_push( + int from, PushOp &op, PushReplyOp *response, + ObjectStore::Transaction *t); int send_push(int priority, int peer, const ObjectRecoveryInfo& recovery_info, const ObjectRecoveryProgress &progress, @@ -830,11 +832,11 @@ protected: pg->_committed_pushed_object(epoch, last_complete); } }; - struct C_OSD_CompletedPushedObjectReplica : public Context { + struct C_OSD_SendMessageOnConn: public Context { OSDService *osd; Message *reply; ConnectionRef conn; - C_OSD_CompletedPushedObjectReplica ( + C_OSD_SendMessageOnConn( OSDService *osd, Message *reply, ConnectionRef conn) : osd(osd), reply(reply), conn(conn) {} @@ -861,11 +863,10 @@ protected: friend class C_OSD_CompletedPull; struct C_OSD_AppliedRecoveredObjectReplica : public Context { ReplicatedPGRef pg; - ObjectStore::Transaction *t; - C_OSD_AppliedRecoveredObjectReplica(ReplicatedPG *p, ObjectStore::Transaction *tt) : - pg(p), t(tt) {} + C_OSD_AppliedRecoveredObjectReplica(ReplicatedPG *p) : + pg(p) {} void finish(int r) { - pg->_applied_recovered_object_replica(t); + pg->_applied_recovered_object_replica(); } }; @@ -877,7 +878,7 @@ protected: void sub_op_modify_reply(OpRequestRef op); void _applied_recovered_object(ObjectContext *obc); - void _applied_recovered_object_replica(ObjectStore::Transaction *t); + void _applied_recovered_object_replica(); void _committed_pushed_object(epoch_t epoch, eversion_t lc); void recover_got(hobject_t oid, eversion_t v); void sub_op_push(OpRequestRef op);