]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: pass a PushOp into handle_pull_response
authorSamuel Just <sam.just@inktank.com>
Wed, 12 Jun 2013 20:28:15 +0000 (13:28 -0700)
committerSamuel Just <sam.just@inktank.com>
Mon, 8 Jul 2013 23:43:30 +0000 (16:43 -0700)
This is the first step toward packaging multiple
pushes/pulls into a single message.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index f0f959fa16be93f3369bed22209c51090659560d..d10f88ffed9d215ea37afb1c4c5e911e9a930d67 100644 (file)
@@ -5507,37 +5507,38 @@ ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recove
   return new_info;
 }
 
-void ReplicatedPG::handle_pull_response(OpRequestRef op)
+bool ReplicatedPG::handle_pull_response(
+  int from, PushOp &pop, PullOp *response,
+  ObjectStore::Transaction *t)
 {
-  MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
+  interval_set<uint64_t> data_included = pop.data_included;
   bufferlist data;
-  m->claim_data(data);
-  interval_set<uint64_t> data_included = m->data_included;
+  data.claim(pop.data);
   dout(10) << "handle_pull_response "
-          << m->recovery_info
-          << m->recovery_progress
+          << pop.recovery_info
+          << pop.after_progress
           << " data.size() is " << data.length()
           << " data_included: " << data_included
           << dendl;
-  if (m->version == eversion_t()) {
+  if (pop.version == eversion_t()) {
     // replica doesn't have it!
-    _failed_push(op);
-    return;
+    _failed_push(from, pop.soid);
+    return false;
   }
 
-  hobject_t &hoid = m->recovery_info.soid;
+  hobject_t &hoid = pop.soid;
   assert((data_included.empty() && data.length() == 0) ||
         (!data_included.empty() && data.length() > 0));
 
   if (!pulling.count(hoid)) {
-    return;
+    return false;
   }
 
   PullInfo &pi = pulling[hoid];
   if (pi.recovery_info.size == (uint64_t(-1))) {
-    pi.recovery_info.size = m->recovery_info.size;
+    pi.recovery_info.size = pop.recovery_info.size;
     pi.recovery_info.copy_subset.intersection_of(
-      m->recovery_info.copy_subset);
+      pop.recovery_info.copy_subset);
   }
 
   pi.recovery_info = recalc_subsets(pi.recovery_info);
@@ -5555,7 +5556,7 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
   info.stats.stats.sum.num_bytes_recovered += data.length();
 
   bool first = pi.recovery_progress.first;
-  pi.recovery_progress = m->recovery_progress;
+  pi.recovery_progress = pop.after_progress;
 
   dout(10) << "new recovery_info " << pi.recovery_info
           << ", new progress " << pi.recovery_progress
@@ -5563,15 +5564,15 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
 
   if (first) {
     bufferlist oibl;
-    if (m->attrset.count(OI_ATTR)) {
-      oibl.push_back(m->attrset[OI_ATTR]);
+    if (pop.attrset.count(OI_ATTR)) {
+      oibl.push_back(pop.attrset[OI_ATTR]);
       ::decode(pi.recovery_info.oi, oibl);
     } else {
       assert(0);
     }
     bufferlist ssbl;
-    if (m->attrset.count(SS_ATTR)) {
-      ssbl.push_back(m->attrset[SS_ATTR]);
+    if (pop.attrset.count(SS_ATTR)) {
+      ssbl.push_back(pop.attrset[SS_ATTR]);
       ::decode(pi.recovery_info.ss, ssbl);
     } else {
       assert(pi.recovery_info.soid.snap != CEPH_NOSNAP &&
@@ -5581,19 +5582,15 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
 
   bool complete = pi.is_complete();
 
-  ObjectStore::Transaction *t = new ObjectStore::Transaction;
-  Context *onreadable = 0;
-  Context *onreadable_sync = 0;
-  Context *oncomplete = 0;
   submit_push_data(pi.recovery_info, first,
                   complete,
                   data_included, data,
-                  m->omap_header,
-                  m->attrset,
-                  m->omap_entries,
+                  pop.omap_header,
+                  pop.attrset,
+                  pop.omap_entries,
                   t);
 
-  info.stats.stats.sum.num_keys_recovered += m->omap_entries.size();
+  info.stats.stats.sum.num_keys_recovered += pop.omap_entries.size();
 
   if (complete) {
     info.stats.stats.sum.num_objects_recovered++;
@@ -5614,30 +5611,21 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
     // keep track of active pushes for scrub
     ++active_pushes;
 
-    onreadable = new C_OSD_AppliedRecoveredObject(this, t, obc);
-    onreadable_sync = new C_OSD_OndiskWriteUnlock(obc);
-    oncomplete = new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch());
-  } else {
-    onreadable = new ObjectStore::C_DeleteTransaction(t);
+    t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
+    t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
+    t->register_on_complete(
+      new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch()));
   }
 
-  int r = osd->store->
-    queue_transaction(
-      osr.get(), t,
-      onreadable,
-      new C_OSD_CommittedPushedObject(
-       this, op,
-       get_osdmap()->get_epoch(),
-       info.last_complete),
-      onreadable_sync,
-      oncomplete,
-      TrackedOpRef()
-      );
-  assert(r == 0);
+  t->register_on_commit(
+    new C_OSD_CommittedPushedObject(
+      this,
+      get_osdmap()->get_epoch(),
+      info.last_complete));
 
   if (complete) {
     pulling.erase(hoid);
-    pull_from_peer[m->get_source().num()].erase(hoid);
+    pull_from_peer[from].erase(hoid);
     publish_stats_to_osd();
     if (waiting_for_missing_object.count(hoid)) {
       dout(20) << " kicking waiters on " << hoid << dendl;
@@ -5648,11 +5636,12 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
        waiting_for_all_missing.clear();
       }
     }
+    return false;
   } else {
-    send_pull(pi.priority,
-             m->get_source().num(),
-             pi.recovery_info,
-             pi.recovery_progress);
+    response->soid = pop.soid;
+    response->recovery_info = pi.recovery_info;
+    response->recovery_progress = pi.recovery_progress;
+    return true;
   }
 }
 
@@ -6045,7 +6034,7 @@ void ReplicatedPG::_committed_pushed_object(
   unlock();
 }
 
-void ReplicatedPG::_applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc)
+void ReplicatedPG::_applied_recovered_object(ObjectContext *obc)
 {
   lock();
   dout(10) << "_applied_recovered_object " << *obc << dendl;
@@ -6061,7 +6050,6 @@ void ReplicatedPG::_applied_recovered_object(ObjectStore::Transaction *t, Object
   }
 
   unlock();
-  delete t;
 }
 
 void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t)
@@ -6155,21 +6143,42 @@ void ReplicatedPG::trim_pushed_data(
 void ReplicatedPG::sub_op_push(OpRequestRef op)
 {
   op->mark_started();
+  MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
 
   if (is_primary()) {
-    handle_pull_response(op);
+    PushOp pop;
+    pop.soid = m->recovery_info.soid;
+    pop.version = m->version;
+    m->claim_data(pop.data);
+    pop.data_included.swap(m->data_included);
+    pop.omap_header.swap(m->omap_header);
+    pop.omap_entries.swap(m->omap_entries);
+    pop.attrset.swap(m->attrset);
+    pop.recovery_info = m->recovery_info;
+    pop.before_progress = m->current_progress;
+    pop.after_progress = m->recovery_progress;
+
+    PullOp resp;
+    ObjectStore::Transaction *t = new ObjectStore::Transaction;
+    t->register_on_applied(new ObjectStore::C_DeleteTransaction(t));
+    bool more = handle_pull_response(m->get_source().num(), pop, &resp, t);
+    t->register_on_commit(new C_OnPushCommit(this, op));
+    osd->store->queue_transaction(osr.get(), t);
+    if (more) {
+      send_pull(
+       m->get_source().num(),
+       m->get_priority(),
+       resp.recovery_info,
+       resp.recovery_progress);
+    }
   } else {
     handle_push(op);
   }
   return;
 }
 
-void ReplicatedPG::_failed_push(OpRequestRef op)
+void ReplicatedPG::_failed_push(int from, const hobject_t &soid)
 {
-  MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
-  assert(m->get_header().type == MSG_OSD_SUBOP);
-  const hobject_t& soid = m->poid;
-  int from = m->get_source().num();
   map<hobject_t,set<int> >::iterator p = missing_loc.find(soid);
   if (p != missing_loc.end()) {
     dout(0) << "_failed_push " << soid << " from osd." << from
@@ -6842,6 +6851,7 @@ int ReplicatedPG::recover_primary(int max)
              obc->obs.oi.version = latest->version;
 
              ObjectStore::Transaction *t = new ObjectStore::Transaction;
+             t->register_on_applied(new ObjectStore::C_DeleteTransaction(t));
              bufferlist b2;
              obc->obs.oi.encode(b2);
              t->setattr(coll, soid, OI_ATTR, b2);
@@ -6851,9 +6861,9 @@ int ReplicatedPG::recover_primary(int max)
              ++active_pushes;
 
              osd->store->queue_transaction(osr.get(), t,
-                                           new C_OSD_AppliedRecoveredObject(this, t, obc),
+                                           new C_OSD_AppliedRecoveredObject(this, obc),
                                            new C_OSD_CommittedPushedObject(
-                                             this, OpRequestRef(),
+                                             this,
                                              get_osdmap()->get_epoch(),
                                              info.last_complete),
                                            new C_OSD_OndiskWriteUnlock(obc));
index bc44dee0cff36e893cf54faa2b1ef9de5734d9ed..4e749e02d55f2508e825811fd8175169e8bae018 100644 (file)
@@ -556,7 +556,9 @@ protected:
                               bufferlist data_received,
                               interval_set<uint64_t> *intervals_usable,
                               bufferlist *data_usable);
-  void handle_pull_response(OpRequestRef op);
+  bool handle_pull_response(
+    int from, PushOp &op, PullOp *response,
+    ObjectStore::Transaction *t);
   void handle_push(OpRequestRef op);
   int send_push(int priority, int peer,
                const ObjectRecoveryInfo& recovery_info,
@@ -809,12 +811,11 @@ protected:
   };
   struct C_OSD_AppliedRecoveredObject : public Context {
     ReplicatedPGRef pg;
-    ObjectStore::Transaction *t;
     ObjectContext *obc;
-    C_OSD_AppliedRecoveredObject(ReplicatedPG *p, ObjectStore::Transaction *tt, ObjectContext *o) :
-      pg(p), t(tt), obc(o) {}
+    C_OSD_AppliedRecoveredObject(ReplicatedPG *p, ObjectContext *o) :
+      pg(p), obc(o) {}
     void finish(int r) {
-      pg->_applied_recovered_object(t, obc);
+      pg->_applied_recovered_object(obc);
     }
   };
   struct C_OSD_CommittedPushedObject : public Context {
@@ -875,12 +876,12 @@ protected:
   void sub_op_modify_commit(RepModify *rm);
 
   void sub_op_modify_reply(OpRequestRef op);
-  void _applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc);
+  void _applied_recovered_object(ObjectContext *obc);
   void _applied_recovered_object_replica(ObjectStore::Transaction *t);
   void _committed_pushed_object(epoch_t epoch, eversion_t lc);
   void recover_got(hobject_t oid, eversion_t v);
   void sub_op_push(OpRequestRef op);
-  void _failed_push(OpRequestRef op);
+  void _failed_push(int from, const hobject_t &soid);
   void sub_op_push_reply(OpRequestRef op);
   void sub_op_pull(OpRequestRef op);