]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedBackend: wire in start_pushes
authorSamuel Just <sam.just@inktank.com>
Mon, 9 Sep 2013 22:41:10 +0000 (15:41 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 26 Sep 2013 18:24:27 +0000 (11:24 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h
src/osd/ReplicatedPG.cc

index 6193f2e0e780d689d2e9d9498c2670623306ceef..59ce9bbcceb77612f2a2a3dd61c1daff0e890c3d 100644 (file)
@@ -62,7 +62,11 @@ void ReplicatedBackend::recover_object(
   } else {
     assert(obc);
     assert(head);
-    // TODOSAM: handle recovering replicas
+    int started = start_pushes(
+      hoid,
+      obc,
+      h);
+    assert(started > 0);
   }
 }
 
index e52c65fcbd09aac2196087236c221e5a014949b5..ae33b3f5588e0048660fd071d763e899ff18e1d2 100644 (file)
 #include "PGBackend.h"
 #include "osd_types.h"
 
+struct C_ReplicatedBackend_OnPullComplete;
 class ReplicatedBackend : public PGBackend {
   struct RPGHandle : public PGBackend::RecoveryHandle {
     map<int, vector<PushOp> > pushes;
     map<int, vector<PullOp> > pulls;
   };
+  friend struct C_ReplicatedBackend_OnPullComplete;
 private:
   bool temp_created;
   const coll_t temp_coll;
@@ -42,9 +44,12 @@ public:
   ReplicatedBackend(PGBackend::Listener *pg, coll_t coll, OSDService *osd);
 
   /// @see PGBackend::open_recovery_op
-  PGBackend::RecoveryHandle *open_recovery_op() {
+  RPGHandle *_open_recovery_op() {
     return new RPGHandle();
   }
+  PGBackend::RecoveryHandle *open_recovery_op() {
+    return _open_recovery_op();
+  }
 
   /// @see PGBackend::run_recovery_op
   void run_recovery_op(
@@ -223,8 +228,10 @@ private:
 
   bool handle_push_reply(int peer, PushReplyOp &op, PushOp *reply);
   void handle_pull(int peer, PullOp &op, PushOp *reply);
-  bool handle_pull_response(int from, PushOp &op, PullOp *response,
-                           ObjectStore::Transaction *t);
+  bool handle_pull_response(
+    int from, PushOp &op, PullOp *response,
+    list<pair<hobject_t, ObjectContextRef> > *to_continue,
+    ObjectStore::Transaction *t);
   void handle_push(int from, PushOp &op, PushReplyOp *response,
                   ObjectStore::Transaction *t);
 
index ba42805638cfd0514d29ea40e91724d2e4e159d5..f5c816c52c4d5a8ca80816a5964caa65f13df185 100644 (file)
@@ -1604,6 +1604,28 @@ void ReplicatedBackend::_do_push(OpRequestRef op)
   get_parent()->queue_transaction(t);
 }
 
+struct C_ReplicatedBackend_OnPullComplete : Context {
+  ReplicatedBackend *bc;
+  list<pair<hobject_t, ObjectContextRef> > to_continue;
+  int priority;
+  C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
+    : bc(bc), priority(priority) {}
+
+  void finish(int) {
+    ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
+    for (list<pair<hobject_t, ObjectContextRef> >::iterator i =
+          to_continue.begin();
+        i != to_continue.end();
+        ++i) {
+      if (!bc->start_pushes(i->first, i->second, h)) {
+       bc->get_parent()->on_global_recover(
+         i->first);
+      }
+    }
+    bc->run_recovery_op(h, priority);
+  }
+};
+
 void ReplicatedBackend::_do_pull_response(OpRequestRef op)
 {
   MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
@@ -1612,13 +1634,23 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
 
   vector<PullOp> replies(1);
   ObjectStore::Transaction *t = new ObjectStore::Transaction;
+  list<pair<hobject_t, ObjectContextRef> > to_continue;
   for (vector<PushOp>::iterator i = m->pushes.begin();
        i != m->pushes.end();
        ++i) {
-    bool more = handle_pull_response(from, *i, &(replies.back()), t);
+    bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t);
     if (more)
       replies.push_back(PullOp());
   }
+  if (!to_continue.empty()) {
+    C_ReplicatedBackend_OnPullComplete *c =
+      new C_ReplicatedBackend_OnPullComplete(
+       this,
+       m->get_priority());
+    c->to_continue.swap(to_continue);
+    t->register_on_complete(
+      get_parent()->bless_context(c));
+  }
   replies.erase(replies.end() - 1);
 
   if (replies.size()) {
@@ -6286,7 +6318,9 @@ ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recove
 
 bool ReplicatedBackend::handle_pull_response(
   int from, PushOp &pop, PullOp *response,
-  ObjectStore::Transaction *t)
+  list<pair<hobject_t, ObjectContextRef> > *to_continue,
+  ObjectStore::Transaction *t
+  )
 {
   interval_set<uint64_t> data_included = pop.data_included;
   bufferlist data;
@@ -6359,11 +6393,14 @@ bool ReplicatedBackend::handle_pull_response(
   pi.stat.num_keys_recovered += pop.omap_entries.size();
 
   if (complete) {
-    pulling.erase(hoid);
-    pull_from_peer[from].erase(hoid);
+    to_continue->push_back(make_pair(hoid, pi.obc));
     pi.stat.num_objects_recovered++;
     get_parent()->on_local_recover(
       hoid, pi.stat, pi.recovery_info, pi.obc, t);
+    pulling.erase(hoid);
+    pull_from_peer[from].erase(hoid);
+    if (pull_from_peer[from].empty())
+      pull_from_peer.erase(from);
     return false;
   } else {
     response->soid = pop.soid;
@@ -6947,14 +6984,27 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
 
   if (is_primary()) {
     PullOp resp;
-    bool more = handle_pull_response(m->get_source().num(), pop, &resp, t);
+    RPGHandle *h = _open_recovery_op();
+    list<pair<hobject_t, ObjectContextRef> > to_continue;
+    bool more = handle_pull_response(
+      m->get_source().num(), pop, &resp,
+      &to_continue, t);
     if (more) {
       send_pull_legacy(
        m->get_priority(),
        m->get_source().num(),
        resp.recovery_info,
        resp.recovery_progress);
-    }
+    } else {
+      C_ReplicatedBackend_OnPullComplete *c =
+       new C_ReplicatedBackend_OnPullComplete(
+         this,
+         op->request->get_priority());
+      c->to_continue.swap(to_continue);
+      t->register_on_complete(
+       get_parent()->bless_context(c));
+    }
+    run_recovery_op(h, op->request->get_priority());
   } else {
     PushReplyOp resp;
     MOSDSubOpReply *reply = new MOSDSubOpReply(