]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD: convert handle_push to use PushOp
authorSamuel Just <sam.just@inktank.com>
Wed, 12 Jun 2013 22:10:59 +0000 (15:10 -0700)
committerSamuel Just <sam.just@inktank.com>
Mon, 8 Jul 2013 23:43:31 +0000 (16:43 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index d10f88ffed9d215ea37afb1c4c5e911e9a930d67..237b80f119a2844f0b2e173a64d125c673e40247 100644 (file)
@@ -5655,59 +5655,41 @@ struct C_OnPushCommit : public Context {
   }
 };
 
-void ReplicatedPG::handle_push(OpRequestRef op)
+void ReplicatedPG::handle_push(
+  int from, PushOp &pop, PushReplyOp *response,
+  ObjectStore::Transaction *t)
 {
-  MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
   dout(10) << "handle_push "
-          << m->recovery_info
-          << m->recovery_progress
+          << pop.recovery_info
+          << pop.after_progress
           << dendl;
   bufferlist data;
-  m->claim_data(data);
-  bool first = m->current_progress.first;
-  bool complete = m->recovery_progress.data_complete &&
-    m->recovery_progress.omap_complete;
-  ObjectStore::Transaction *t = new ObjectStore::Transaction;
+  data.claim(pop.data);
+  bool first = pop.before_progress.first;
+  bool complete = pop.after_progress.data_complete &&
+    pop.after_progress.omap_complete;
 
   // keep track of active pushes for scrub
   ++active_pushes;
 
-  MOSDSubOpReply *reply = new MOSDSubOpReply(
-    m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
-  assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
-
-  Context *oncomplete = new C_OSD_CompletedPushedObjectReplica(
-    osd, reply, m->get_connection());
-  Context *onreadable = new C_OSD_AppliedRecoveredObjectReplica(this, t);
-  Context *onreadable_sync = 0;
-  submit_push_data(m->recovery_info,
+  response->soid = pop.recovery_info.soid;
+  t->register_on_applied(
+    new C_OSD_AppliedRecoveredObjectReplica(this));
+  submit_push_data(pop.recovery_info,
                   first,
                   complete,
-                  m->data_included,
+                  pop.data_included,
                   data,
-                  m->omap_header,
-                  m->attrset,
-                  m->omap_entries,
+                  pop.omap_header,
+                  pop.attrset,
+                  pop.omap_entries,
                   t);
 
-  t->register_on_commit(new C_OnPushCommit(this, op));
-  int r = osd->store->
-    queue_transaction(
-      osr.get(), t,
-      onreadable,
-      new C_OSD_CommittedPushedObject(
-       this,
-       get_osdmap()->get_epoch(),
-       info.last_complete),
-      onreadable_sync,
-      oncomplete,
-      OpRequestRef()
-      );
-  assert(r == 0);
-
-  osd->logger->inc(l_osd_push_in);
-  osd->logger->inc(l_osd_push_inb, m->ops[0].indata.length());
-
+  t->register_on_commit(
+    new C_OSD_CommittedPushedObject(
+      this,
+      get_osdmap()->get_epoch(),
+      info.last_complete));
 }
 
 int ReplicatedPG::send_push(int prio, int peer,
@@ -6052,7 +6034,7 @@ void ReplicatedPG::_applied_recovered_object(ObjectContext *obc)
   unlock();
 }
 
-void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t)
+void ReplicatedPG::_applied_recovered_object_replica()
 {
   lock();
   dout(10) << "_applied_recovered_object_replica" << dendl;
@@ -6068,7 +6050,6 @@ void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t
   }
 
   unlock();
-  delete t;
 }
 
 void ReplicatedPG::recover_got(hobject_t oid, eversion_t v)
@@ -6145,35 +6126,41 @@ void ReplicatedPG::sub_op_push(OpRequestRef op)
   op->mark_started();
   MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
 
-  if (is_primary()) {
-    PushOp pop;
-    pop.soid = m->recovery_info.soid;
-    pop.version = m->version;
-    m->claim_data(pop.data);
-    pop.data_included.swap(m->data_included);
-    pop.omap_header.swap(m->omap_header);
-    pop.omap_entries.swap(m->omap_entries);
-    pop.attrset.swap(m->attrset);
-    pop.recovery_info = m->recovery_info;
-    pop.before_progress = m->current_progress;
-    pop.after_progress = m->recovery_progress;
+  PushOp pop;
+  pop.soid = m->recovery_info.soid;
+  pop.version = m->version;
+  m->claim_data(pop.data);
+  pop.data_included.swap(m->data_included);
+  pop.omap_header.swap(m->omap_header);
+  pop.omap_entries.swap(m->omap_entries);
+  pop.attrset.swap(m->attrset);
+  pop.recovery_info = m->recovery_info;
+  pop.before_progress = m->current_progress;
+  pop.after_progress = m->recovery_progress;
+  ObjectStore::Transaction *t = new ObjectStore::Transaction;
 
+  if (is_primary()) {
     PullOp resp;
-    ObjectStore::Transaction *t = new ObjectStore::Transaction;
-    t->register_on_applied(new ObjectStore::C_DeleteTransaction(t));
     bool more = handle_pull_response(m->get_source().num(), pop, &resp, t);
-    t->register_on_commit(new C_OnPushCommit(this, op));
-    osd->store->queue_transaction(osr.get(), t);
     if (more) {
       send_pull(
-       m->get_source().num(),
        m->get_priority(),
+       m->get_source().num(),
        resp.recovery_info,
        resp.recovery_progress);
     }
   } else {
-    handle_push(op);
+    PushReplyOp resp;
+    MOSDSubOpReply *reply = new MOSDSubOpReply(
+      m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+    reply->set_priority(m->get_priority());
+    assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
+    handle_push(m->get_source().num(), pop, &resp, t);
+    t->register_on_complete(new C_OSD_SendMessageOnConn(
+                            osd, reply, m->get_connection()));
   }
+  t->register_on_commit(new C_OnPushCommit(this, op));
+  osd->store->queue_transaction(osr.get(), t);
   return;
 }
 
index 4e749e02d55f2508e825811fd8175169e8bae018..a20ebad9cff10ca1fc21c74b05f18351227c3d7f 100644 (file)
@@ -559,7 +559,9 @@ protected:
   bool handle_pull_response(
     int from, PushOp &op, PullOp *response,
     ObjectStore::Transaction *t);
-  void handle_push(OpRequestRef op);
+  void handle_push(
+    int from, PushOp &op, PushReplyOp *response,
+    ObjectStore::Transaction *t);
   int send_push(int priority, int peer,
                const ObjectRecoveryInfo& recovery_info,
                const ObjectRecoveryProgress &progress,
@@ -830,11 +832,11 @@ protected:
       pg->_committed_pushed_object(epoch, last_complete);
     }
   };
-  struct C_OSD_CompletedPushedObjectReplica : public Context {
+  struct C_OSD_SendMessageOnConn: public Context {
     OSDService *osd;
     Message *reply;
     ConnectionRef conn;
-    C_OSD_CompletedPushedObjectReplica (
+    C_OSD_SendMessageOnConn(
       OSDService *osd,
       Message *reply,
       ConnectionRef conn) : osd(osd), reply(reply), conn(conn) {}
@@ -861,11 +863,10 @@ protected:
   friend class C_OSD_CompletedPull;
   struct C_OSD_AppliedRecoveredObjectReplica : public Context {
     ReplicatedPGRef pg;
-    ObjectStore::Transaction *t;
-    C_OSD_AppliedRecoveredObjectReplica(ReplicatedPG *p, ObjectStore::Transaction *tt) :
-      pg(p), t(tt) {}
+    C_OSD_AppliedRecoveredObjectReplica(ReplicatedPG *p) :
+      pg(p) {}
     void finish(int r) {
-      pg->_applied_recovered_object_replica(t);
+      pg->_applied_recovered_object_replica();
     }
   };
 
@@ -877,7 +878,7 @@ protected:
 
   void sub_op_modify_reply(OpRequestRef op);
   void _applied_recovered_object(ObjectContext *obc);
-  void _applied_recovered_object_replica(ObjectStore::Transaction *t);
+  void _applied_recovered_object_replica();
   void _committed_pushed_object(epoch_t epoch, eversion_t lc);
   void recover_got(hobject_t oid, eversion_t v);
   void sub_op_push(OpRequestRef op);