From da34c77b93e3f880c01329711ab8eca7776b1830 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 23 Jan 2013 11:35:47 -0800 Subject: [PATCH] ReplicatedPG: ack push only after transaction has completed Signed-off-by: Samuel Just (cherry picked from commit 20278c4f77b890d5b2b95d2ccbeb4fbe106667ac) --- src/osd/ReplicatedPG.cc | 28 +++++++++++++++++----------- src/osd/ReplicatedPG.h | 12 ++++++++++++ 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index ecae8094a3391..c39be05813aa8 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -5461,6 +5461,12 @@ void ReplicatedPG::handle_push(OpRequestRef op) // keep track of active pushes for scrub ++active_pushes; + MOSDSubOpReply *reply = new MOSDSubOpReply( + m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); + + Context *oncomplete = new C_OSD_CompletedPushedObjectReplica( + osd, reply, m->get_connection()); Context *onreadable = new C_OSD_AppliedRecoveredObjectReplica(this, t); Context *onreadable_sync = 0; submit_push_data(m->recovery_info, @@ -5476,22 +5482,22 @@ void ReplicatedPG::handle_push(OpRequestRef op) t); int r = osd->store-> - queue_transaction(osr.get(), t, - onreadable, - new C_OSD_CommittedPushedObject( - this, op, - info.history.same_interval_since, - info.last_complete), - onreadable_sync); + queue_transaction( + osr.get(), t, + onreadable, + new C_OSD_CommittedPushedObject( + this, op, + info.history.same_interval_since, + info.last_complete), + onreadable_sync, + oncomplete, + OpRequestRef() + ); assert(r == 0); osd->logger->inc(l_osd_push_in); osd->logger->inc(l_osd_push_inb, m->ops[0].indata.length()); - MOSDSubOpReply *reply = new MOSDSubOpReply( - m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); - assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); - osd->send_message_osd_cluster(reply, m->get_connection()); } int ReplicatedPG::send_push(int prio, int peer, diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index b86305eacf812..03f04458f2a83 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -895,6 +895,18 @@ protected: pg->put(); } }; + struct C_OSD_CompletedPushedObjectReplica : public Context { + OSDService *osd; + Message *reply; + ConnectionRef conn; + C_OSD_CompletedPushedObjectReplica ( + OSDService *osd, + Message *reply, + ConnectionRef conn) : osd(osd), reply(reply), conn(conn) {} + void finish(int) { + osd->send_message_osd_cluster(reply, conn.get()); + } + }; struct C_OSD_AppliedRecoveredObjectReplica : public Context { boost::intrusive_ptr pg; ObjectStore::Transaction *t; -- 2.39.5