]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: ack push only after transaction has completed
authorSamuel Just <sam.just@inktank.com>
Wed, 23 Jan 2013 19:35:47 +0000 (11:35 -0800)
committerSamuel Just <sam.just@inktank.com>
Sat, 26 Jan 2013 01:23:12 +0000 (17:23 -0800)
Signed-off-by: Samuel Just <sam.just@inktank.com>
(cherry picked from commit 20278c4f77b890d5b2b95d2ccbeb4fbe106667ac)

src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index ecae8094a3391795c10af2dd0eb4b4290bb71f62..c39be05813aa82ea54741e854e37ee7b846ab9e9 100644 (file)
@@ -5461,6 +5461,12 @@ void ReplicatedPG::handle_push(OpRequestRef op)
   // keep track of active pushes for scrub
   ++active_pushes;
 
+  MOSDSubOpReply *reply = new MOSDSubOpReply(
+    m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+  assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
+
+  Context *oncomplete = new C_OSD_CompletedPushedObjectReplica(
+    osd, reply, m->get_connection());
   Context *onreadable = new C_OSD_AppliedRecoveredObjectReplica(this, t);
   Context *onreadable_sync = 0;
   submit_push_data(m->recovery_info,
@@ -5476,22 +5482,22 @@ void ReplicatedPG::handle_push(OpRequestRef op)
                         t);
 
   int r = osd->store->
-    queue_transaction(osr.get(), t,
-                     onreadable,
-                     new C_OSD_CommittedPushedObject(
-                       this, op,
-                       info.history.same_interval_since,
-                       info.last_complete),
-                     onreadable_sync);
+    queue_transaction(
+      osr.get(), t,
+      onreadable,
+      new C_OSD_CommittedPushedObject(
+       this, op,
+       info.history.same_interval_since,
+       info.last_complete),
+      onreadable_sync,
+      oncomplete,
+      OpRequestRef()
+      );
   assert(r == 0);
 
   osd->logger->inc(l_osd_push_in);
   osd->logger->inc(l_osd_push_inb, m->ops[0].indata.length());
 
-  MOSDSubOpReply *reply = new MOSDSubOpReply(
-    m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
-  assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
-  osd->send_message_osd_cluster(reply, m->get_connection());
 }
 
 int ReplicatedPG::send_push(int prio, int peer,
index b86305eacf812c9860df3102ca56b3b79c662b05..03f04458f2a83111176a70b58c340f86cef2a57c 100644 (file)
@@ -895,6 +895,18 @@ protected:
       pg->put();
     }
   };
+  struct C_OSD_CompletedPushedObjectReplica : public Context {
+    OSDService *osd;
+    Message *reply;
+    ConnectionRef conn;
+    C_OSD_CompletedPushedObjectReplica (
+      OSDService *osd,
+      Message *reply,
+      ConnectionRef conn) : osd(osd), reply(reply), conn(conn) {}
+    void finish(int) {
+      osd->send_message_osd_cluster(reply, conn.get());
+    }
+  };
   struct C_OSD_AppliedRecoveredObjectReplica : public Context {
     boost::intrusive_ptr<ReplicatedPG> pg;
     ObjectStore::Transaction *t;