]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: allow repops to apply with commit
authorSamuel Just <sjust@redhat.com>
Sat, 3 Dec 2016 00:33:00 +0000 (16:33 -0800)
committerSamuel Just <sjust@redhat.com>
Sun, 4 Dec 2016 00:05:48 +0000 (16:05 -0800)
Up to now, the repop machinery depended on all repops commiting and
applying in order.  For MOSDPGUpdateLogMissing operations, however,
we don't really want to send two messages in the ECBackend case.
Instead, just allow those repops to skip the applied stage and be
completed in order once the repops ahead of them finish.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index c1cbe8ce42039e2244032c3e177c7a9433757949..689567e2da759ec0e6388e5e9c0aee0e3c38957b 100644 (file)
@@ -8402,6 +8402,7 @@ void ReplicatedPG::repop_all_applied(RepGather *repop)
 {
   dout(10) << __func__ << ": repop tid " << repop->rep_tid << " all applied "
           << dendl;
+  assert(!repop->applies_with_commit);
   repop->all_applied = true;
   if (!repop->rep_aborted) {
     eval_repop(repop);
@@ -8424,6 +8425,10 @@ void ReplicatedPG::repop_all_committed(RepGather *repop)
   dout(10) << __func__ << ": repop tid " << repop->rep_tid << " all committed "
           << dendl;
   repop->all_committed = true;
+  if (repop->applies_with_commit) {
+    assert(!repop->all_applied);
+    repop->all_applied = true;
+  }
 
   if (!repop->rep_aborted) {
     if (repop->v != eversion_t()) {
@@ -8509,11 +8514,13 @@ void ReplicatedPG::eval_repop(RepGather *repop)
       assert(waiting_for_ack.begin()->first == repop->v);
       waiting_for_ack.erase(repop->v);
     }
-
   }
 
   // applied?
   if (repop->all_applied) {
+    if (repop->applies_with_commit) {
+      assert(repop->on_applied.empty());
+    }
     dout(10) << " applied: " << *repop << " " << dendl;
     for (auto p = repop->on_applied.begin();
         p != repop->on_applied.end();
@@ -8529,22 +8536,28 @@ void ReplicatedPG::eval_repop(RepGather *repop)
     publish_stats_to_osd();
     calc_min_last_complete_ondisk();
 
-    for (auto p = repop->on_success.begin();
-        p != repop->on_success.end();
-        repop->on_success.erase(p++)) {
-      (*p)();
-    }
-
     dout(10) << " removing " << *repop << dendl;
     assert(!repop_queue.empty());
     dout(20) << "   q front is " << *repop_queue.front() << dendl; 
     if (repop_queue.front() != repop) {
-      dout(0) << " removing " << *repop << dendl;
-      dout(0) << "   q front is " << *repop_queue.front() << dendl; 
-      assert(repop_queue.front() == repop);
+      if (!repop->applies_with_commit) {
+       dout(0) << " removing " << *repop << dendl;
+       dout(0) << "   q front is " << *repop_queue.front() << dendl;
+       assert(repop_queue.front() == repop);
+      }
+    } else {
+      RepGather *to_remove = nullptr;
+      while (!repop_queue.empty() &&
+            (to_remove = repop_queue.front())->rep_done) {
+       repop_queue.pop_front();
+       for (auto p = to_remove->on_success.begin();
+            p != to_remove->on_success.end();
+            to_remove->on_success.erase(p++)) {
+         (*p)();
+       }
+       remove_repop(to_remove);
+      }
     }
-    repop_queue.pop_front();
-    remove_repop(repop);
   }
 }
 
@@ -8623,7 +8636,8 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(
   else
     dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl;
 
-  RepGather *repop = new RepGather(ctx, rep_tid, info.last_complete);
+  RepGather *repop = new RepGather(
+    ctx, rep_tid, info.last_complete, false);
 
   repop->start = ceph_clock_now(cct);
 
@@ -8647,7 +8661,8 @@ boost::intrusive_ptr<ReplicatedPG::RepGather> ReplicatedPG::new_repop(
     std::move(op),
     std::move(on_complete),
     osd->get_tid(),
-    info.last_complete);
+    info.last_complete,
+    true);
   repop->v = version;
 
   repop->start = ceph_clock_now(cct);
@@ -8790,7 +8805,6 @@ void ReplicatedPG::submit_log_entries(
              assert(it2 != it->second.waiting_on.end());
              it->second.waiting_on.erase(it2);
              if (it->second.waiting_on.empty()) {
-               pg->repop_all_applied(it->second.repop.get());
                pg->repop_all_committed(it->second.repop.get());
                pg->log_entry_update_waiting_on.erase(it);
              }
@@ -8798,7 +8812,7 @@ void ReplicatedPG::submit_log_entries(
            pg->unlock();
          }
        };
-       t.register_on_complete(
+       t.register_on_commit(
          new OnComplete{this, rep_tid, get_osdmap()->get_epoch()});
       } else {
        if (on_complete) {
@@ -9798,12 +9812,19 @@ void ReplicatedPG::do_update_log_missing(OpRequestRef &op)
       unlock();
     });
 
-  /* Hack to work around the fact that ReplicatedBackend sends
-   * ack+commit if commit happens first */
-  if (pool.info.ec_pool()) {
-    t.register_on_complete(complete);
-  } else {
+  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
     t.register_on_commit(complete);
+  } else {
+    /* Hack to work around the fact that ReplicatedBackend sends
+     * ack+commit if commit happens first
+     *
+     * This behavior is no longer necessary, but we preserve it so old
+     * primaries can keep their repops in order */
+    if (pool.info.ec_pool()) {
+      t.register_on_complete(complete);
+    } else {
+      t.register_on_commit(complete);
+    }
   }
   t.register_on_applied(
     new C_OSD_OnApplied{this, get_osdmap()->get_epoch(), info.last_update});
@@ -9834,7 +9855,6 @@ void ReplicatedPG::do_update_log_missing_reply(OpRequestRef &op)
     }
 
     if (it->second.waiting_on.empty()) {
-      repop_all_applied(it->second.repop.get());
       repop_all_committed(it->second.repop.get());
       log_entry_update_waiting_on.erase(it);
     }
index d56d4e8a9125cc66308edc69e8d09417ff6a436c..c621e2d2b485cf6d92d61ca39be4e62f2a21b760 100644 (file)
@@ -652,6 +652,7 @@ public:
 
     bool all_applied;
     bool all_committed;
+    const bool applies_with_commit;
     
     utime_t   start;
     
@@ -664,8 +665,10 @@ public:
     list<std::function<void()>> on_success;
     list<std::function<void()>> on_finish;
     
-    RepGather(OpContext *c, ceph_tid_t rt,
-             eversion_t lc) :
+    RepGather(
+      OpContext *c, ceph_tid_t rt,
+      eversion_t lc,
+      bool applies_with_commit) :
       hoid(c->obc->obs.oi.soid),
       op(c->op),
       queue_item(this),
@@ -673,6 +676,7 @@ public:
       rep_tid(rt), 
       rep_aborted(false), rep_done(false),
       all_applied(false), all_committed(false),
+      applies_with_commit(applies_with_commit),
       pg_local_last_complete(lc),
       lock_manager(std::move(c->lock_manager)),
       on_applied(std::move(c->on_applied)),
@@ -685,13 +689,15 @@ public:
       OpRequestRef &&o,
       boost::optional<std::function<void(void)> > &&on_complete,
       ceph_tid_t rt,
-      eversion_t lc) :
+      eversion_t lc,
+      bool applies_with_commit) :
       op(o),
       queue_item(this),
       nref(1),
       rep_tid(rt),
       rep_aborted(false), rep_done(false),
       all_applied(false), all_committed(false),
+      applies_with_commit(applies_with_commit),
       pg_local_last_complete(lc),
       lock_manager(std::move(manager)) {
       if (on_complete) {