From: Samuel Just Date: Sat, 3 Dec 2016 00:33:00 +0000 (-0800) Subject: ReplicatedPG: allow repops to apply with commit X-Git-Tag: v11.1.0~58^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7c97cd21cba983017a861944d091850e97a9bf0b;p=ceph.git ReplicatedPG: allow repops to apply with commit Up to now, the repop machinery depended on all repops commiting and applying in order. For MOSDPGUpdateLogMissing operations, however, we don't really want to send two messages in the ECBackend case. Instead, just allow those repops to skip the applied stage and be completed in order once the repops ahead of them finish. Signed-off-by: Samuel Just --- diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index c1cbe8ce420..689567e2da7 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -8402,6 +8402,7 @@ void ReplicatedPG::repop_all_applied(RepGather *repop) { dout(10) << __func__ << ": repop tid " << repop->rep_tid << " all applied " << dendl; + assert(!repop->applies_with_commit); repop->all_applied = true; if (!repop->rep_aborted) { eval_repop(repop); @@ -8424,6 +8425,10 @@ void ReplicatedPG::repop_all_committed(RepGather *repop) dout(10) << __func__ << ": repop tid " << repop->rep_tid << " all committed " << dendl; repop->all_committed = true; + if (repop->applies_with_commit) { + assert(!repop->all_applied); + repop->all_applied = true; + } if (!repop->rep_aborted) { if (repop->v != eversion_t()) { @@ -8509,11 +8514,13 @@ void ReplicatedPG::eval_repop(RepGather *repop) assert(waiting_for_ack.begin()->first == repop->v); waiting_for_ack.erase(repop->v); } - } // applied? if (repop->all_applied) { + if (repop->applies_with_commit) { + assert(repop->on_applied.empty()); + } dout(10) << " applied: " << *repop << " " << dendl; for (auto p = repop->on_applied.begin(); p != repop->on_applied.end(); @@ -8529,22 +8536,28 @@ void ReplicatedPG::eval_repop(RepGather *repop) publish_stats_to_osd(); calc_min_last_complete_ondisk(); - for (auto p = repop->on_success.begin(); - p != repop->on_success.end(); - repop->on_success.erase(p++)) { - (*p)(); - } - dout(10) << " removing " << *repop << dendl; assert(!repop_queue.empty()); dout(20) << " q front is " << *repop_queue.front() << dendl; if (repop_queue.front() != repop) { - dout(0) << " removing " << *repop << dendl; - dout(0) << " q front is " << *repop_queue.front() << dendl; - assert(repop_queue.front() == repop); + if (!repop->applies_with_commit) { + dout(0) << " removing " << *repop << dendl; + dout(0) << " q front is " << *repop_queue.front() << dendl; + assert(repop_queue.front() == repop); + } + } else { + RepGather *to_remove = nullptr; + while (!repop_queue.empty() && + (to_remove = repop_queue.front())->rep_done) { + repop_queue.pop_front(); + for (auto p = to_remove->on_success.begin(); + p != to_remove->on_success.end(); + to_remove->on_success.erase(p++)) { + (*p)(); + } + remove_repop(to_remove); + } } - repop_queue.pop_front(); - remove_repop(repop); } } @@ -8623,7 +8636,8 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop( else dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl; - RepGather *repop = new RepGather(ctx, rep_tid, info.last_complete); + RepGather *repop = new RepGather( + ctx, rep_tid, info.last_complete, false); repop->start = ceph_clock_now(cct); @@ -8647,7 +8661,8 @@ boost::intrusive_ptr ReplicatedPG::new_repop( std::move(op), std::move(on_complete), osd->get_tid(), - info.last_complete); + info.last_complete, + true); repop->v = version; repop->start = ceph_clock_now(cct); @@ -8790,7 +8805,6 @@ void ReplicatedPG::submit_log_entries( assert(it2 != it->second.waiting_on.end()); it->second.waiting_on.erase(it2); if (it->second.waiting_on.empty()) { - pg->repop_all_applied(it->second.repop.get()); pg->repop_all_committed(it->second.repop.get()); pg->log_entry_update_waiting_on.erase(it); } @@ -8798,7 +8812,7 @@ void ReplicatedPG::submit_log_entries( pg->unlock(); } }; - t.register_on_complete( + t.register_on_commit( new OnComplete{this, rep_tid, get_osdmap()->get_epoch()}); } else { if (on_complete) { @@ -9798,12 +9812,19 @@ void ReplicatedPG::do_update_log_missing(OpRequestRef &op) unlock(); }); - /* Hack to work around the fact that ReplicatedBackend sends - * ack+commit if commit happens first */ - if (pool.info.ec_pool()) { - t.register_on_complete(complete); - } else { + if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) { t.register_on_commit(complete); + } else { + /* Hack to work around the fact that ReplicatedBackend sends + * ack+commit if commit happens first + * + * This behavior is no longer necessary, but we preserve it so old + * primaries can keep their repops in order */ + if (pool.info.ec_pool()) { + t.register_on_complete(complete); + } else { + t.register_on_commit(complete); + } } t.register_on_applied( new C_OSD_OnApplied{this, get_osdmap()->get_epoch(), info.last_update}); @@ -9834,7 +9855,6 @@ void ReplicatedPG::do_update_log_missing_reply(OpRequestRef &op) } if (it->second.waiting_on.empty()) { - repop_all_applied(it->second.repop.get()); repop_all_committed(it->second.repop.get()); log_entry_update_waiting_on.erase(it); } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index d56d4e8a912..c621e2d2b48 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -652,6 +652,7 @@ public: bool all_applied; bool all_committed; + const bool applies_with_commit; utime_t start; @@ -664,8 +665,10 @@ public: list> on_success; list> on_finish; - RepGather(OpContext *c, ceph_tid_t rt, - eversion_t lc) : + RepGather( + OpContext *c, ceph_tid_t rt, + eversion_t lc, + bool applies_with_commit) : hoid(c->obc->obs.oi.soid), op(c->op), queue_item(this), @@ -673,6 +676,7 @@ public: rep_tid(rt), rep_aborted(false), rep_done(false), all_applied(false), all_committed(false), + applies_with_commit(applies_with_commit), pg_local_last_complete(lc), lock_manager(std::move(c->lock_manager)), on_applied(std::move(c->on_applied)), @@ -685,13 +689,15 @@ public: OpRequestRef &&o, boost::optional > &&on_complete, ceph_tid_t rt, - eversion_t lc) : + eversion_t lc, + bool applies_with_commit) : op(o), queue_item(this), nref(1), rep_tid(rt), rep_aborted(false), rep_done(false), all_applied(false), all_committed(false), + applies_with_commit(applies_with_commit), pg_local_last_complete(lc), lock_manager(std::move(manager)) { if (on_complete) {