From fbf06885fb55b60c0d9f1bcddd7de5285c674010 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 26 Mar 2019 18:24:21 -0700 Subject: [PATCH] osd/: move flush state into PeeringState Signed-off-by: Samuel Just --- src/osd/PG.cc | 31 ++++++++----------------- src/osd/PG.h | 8 +++---- src/osd/PeeringState.cc | 51 ++++++++++++++++++++++++++++++----------- src/osd/PeeringState.h | 19 ++++++++++++++- src/osd/PrimaryLogPG.cc | 14 ++++------- 5 files changed, 73 insertions(+), 50 deletions(-) diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 1c9ee2c5beb..8dc4171120c 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -208,7 +208,6 @@ PG::PG(OSDService *o, OSDMapRef curmap, last_complete_ondisk(recovery_state.last_complete_ondisk), last_update_applied(recovery_state.last_update_applied), last_rollback_info_trimmed_to_applied(recovery_state.last_rollback_info_trimmed_to_applied), - flushes_in_progress(recovery_state.flushes_in_progress), stray_set(recovery_state.stray_set), peer_info(recovery_state.peer_info), peer_bytes(recovery_state.peer_bytes), @@ -1916,7 +1915,7 @@ void PG::_activate_committed(epoch_t epoch, epoch_t activation_epoch) osd->send_message_osd_cluster(get_primary().osd, m, get_osdmap_epoch()); // waiters - if (flushes_in_progress == 0) { + if (recovery_state.needs_flush() == 0) { requeue_ops(waiting_for_peered); } else if (!waiting_for_peered.empty()) { dout(10) << __func__ << " flushes in progress, moving " @@ -5800,51 +5799,39 @@ bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch) return false; } -void PG::set_last_peering_reset() -{ - dout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl; - if (last_peering_reset != get_osdmap_epoch()) { - last_peering_reset = get_osdmap_epoch(); - reset_interval_flush(); - } -} - struct FlushState { PGRef pg; epoch_t epoch; FlushState(PG *pg, epoch_t epoch) : pg(pg), epoch(epoch) {} ~FlushState() { pg->lock(); - if (!pg->pg_has_reset_since(epoch)) - pg->on_flushed(); + if (!pg->pg_has_reset_since(epoch)) { + pg->recovery_state.complete_flush(); + } pg->unlock(); } }; typedef std::shared_ptr FlushStateRef; -void PG::start_flush(ObjectStore::Transaction *t) +void PG::start_flush_on_transaction(ObjectStore::Transaction *t) { // flush in progress ops FlushStateRef flush_trigger (std::make_shared( this, get_osdmap_epoch())); - flushes_in_progress++; t->register_on_applied(new ContainerContext(flush_trigger)); t->register_on_commit(new ContainerContext(flush_trigger)); } -void PG::reset_interval_flush() +bool PG::try_flush_or_schedule_async() { - dout(10) << "Clearing blocked outgoing recovery messages" << dendl; - recovery_state.clear_blocked_outgoing(); Context *c = new QueuePeeringEvt( this, get_osdmap_epoch(), PeeringState::IntervalFlush()); if (!ch->flush_commit(c)) { - dout(10) << "Beginning to block outgoing recovery messages" << dendl; - recovery_state.begin_block_outgoing(); + return false; } else { - dout(10) << "Not blocking outgoing recovery messages" << dendl; delete c; + return true; } } @@ -5857,7 +5844,7 @@ void PG::start_peering_interval( { const OSDMapRef osdmap = get_osdmap(); - set_last_peering_reset(); + recovery_state.set_last_peering_reset(); vector oldacting, oldup; int oldrole = get_role(); diff --git a/src/osd/PG.h b/src/osd/PG.h index b1c4c164162..e16a2bc959a 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -172,6 +172,7 @@ class PG : public DoutPrefixProvider, public PeeringState::PeeringListener { friend class NamedState; friend class PeeringState; +protected: PeeringState recovery_state; public: using PeeringCtx = PeeringState::PeeringCtx; @@ -203,7 +204,6 @@ protected: eversion_t &last_complete_ondisk; eversion_t &last_update_applied; eversion_t &last_rollback_info_trimmed_to_applied; - unsigned &flushes_in_progress; set &stray_set; map &peer_info; map &peer_bytes; @@ -1658,7 +1658,7 @@ protected: boost::optional trim_to, boost::optional roll_forward_to); - void reset_interval_flush(); + bool try_flush_or_schedule_async() override; void start_peering_interval( const OSDMapRef lastmap, const vector& newup, int up_primary, @@ -1666,8 +1666,8 @@ protected: ObjectStore::Transaction *t); void on_new_interval(); virtual void _on_new_interval() = 0; - void start_flush(ObjectStore::Transaction *t); - void set_last_peering_reset(); + void start_flush_on_transaction( + ObjectStore::Transaction *t) override; void update_history(const pg_history_t& history) { recovery_state.update_history(history); diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index 694aebae159..89010e2b6fd 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -372,6 +372,28 @@ void PeeringState::activate_map(PeeringCtx *rctx) write_if_dirty(*rctx->transaction); } +void PeeringState::set_last_peering_reset() +{ + psdout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl; + if (last_peering_reset != get_osdmap_epoch()) { + dout(10) << "Clearing blocked outgoing recovery messages" << dendl; + clear_blocked_outgoing(); + if (!pl->try_flush_or_schedule_async()) { + psdout(10) << "Beginning to block outgoing recovery messages" << dendl; + begin_block_outgoing(); + } else { + psdout(10) << "Not blocking outgoing recovery messages" << dendl; + } + } +} + +void PeeringState::complete_flush() +{ + flushes_in_progress--; + if (flushes_in_progress == 0) { + pl->on_flushed(); + } +} /*------------ Peering State Machine----------------*/ #undef dout_prefix @@ -401,10 +423,9 @@ PeeringState::Initial::Initial(my_context ctx) boost::statechart::result PeeringState::Initial::react(const MNotifyRec& notify) { PeeringState *ps = context< PeeringMachine >().state; - PG *pg = context< PeeringMachine >().pg; ps->proc_replica_info( notify.from, notify.notify.info, notify.notify.epoch_sent); - pg->set_last_peering_reset(); + ps->set_last_peering_reset(); return transit< Primary >(); } @@ -444,7 +465,7 @@ boost::statechart::result PeeringState::Started::react(const IntervalFlush&) { psdout(10) << "Ending blocked outgoing recovery messages" << dendl; - context< PeeringMachine >().pg->recovery_state.end_block_outgoing(); + context< PeeringMachine >().state->end_block_outgoing(); return discard_event(); } @@ -493,17 +514,17 @@ PeeringState::Reset::Reset(my_context ctx) NamedState(context< PeeringMachine >().state_history, "Reset") { context< PeeringMachine >().log_enter(state_name); - PG *pg = context< PeeringMachine >().pg; + PeeringState *ps = context< PeeringMachine >().state; - pg->flushes_in_progress = 0; - pg->set_last_peering_reset(); + ps->flushes_in_progress = 0; + ps->set_last_peering_reset(); } boost::statechart::result PeeringState::Reset::react(const IntervalFlush&) { psdout(10) << "Ending blocked outgoing recovery messages" << dendl; - context< PeeringMachine >().pg->recovery_state.end_block_outgoing(); + context< PeeringMachine >().state->end_block_outgoing(); return discard_event(); } @@ -1699,13 +1720,14 @@ PeeringState::Active::Active(my_context ctx) { context< PeeringMachine >().log_enter(state_name); + PeeringState *ps = context< PeeringMachine >().state; PG *pg = context< PeeringMachine >().pg; ceph_assert(!pg->backfill_reserving); ceph_assert(!pg->backfill_reserved); ceph_assert(pg->is_primary()); psdout(10) << "In Active, about to call activate" << dendl; - pg->start_flush(context< PeeringMachine >().get_cur_transaction()); + ps->start_flush(context< PeeringMachine >().get_cur_transaction()); pg->activate(*context< PeeringMachine >().get_cur_transaction(), pg->get_osdmap_epoch(), *context< PeeringMachine >().get_query_map(), @@ -2048,6 +2070,7 @@ boost::statechart::result PeeringState::Active::react(const QueryState& q) boost::statechart::result PeeringState::Active::react(const AllReplicasActivated &evt) { + PeeringState *ps = context< PeeringMachine >().state; PG *pg = context< PeeringMachine >().pg; pg_t pgid = context< PeeringMachine >().spgid.pgid; @@ -2092,7 +2115,7 @@ boost::statechart::result PeeringState::Active::react(const AllReplicasActivated pg->check_local(); // waiters - if (pg->flushes_in_progress == 0) { + if (ps->flushes_in_progress == 0) { pg->requeue_ops(pg->waiting_for_peered); } else if (!pg->waiting_for_peered.empty()) { psdout(10) << __func__ << " flushes in progress, moving " @@ -2136,8 +2159,8 @@ PeeringState::ReplicaActive::ReplicaActive(my_context ctx) { context< PeeringMachine >().log_enter(state_name); - PG *pg = context< PeeringMachine >().pg; - pg->start_flush(context< PeeringMachine >().get_cur_transaction()); + PeeringState *ps = context< PeeringMachine >().state; + ps->start_flush(context< PeeringMachine >().get_cur_transaction()); } @@ -2232,6 +2255,7 @@ PeeringState::Stray::Stray(my_context ctx) { context< PeeringMachine >().log_enter(state_name); + PeeringState *ps = context< PeeringMachine >().state; PG *pg = context< PeeringMachine >().pg; ceph_assert(!pg->is_peered()); ceph_assert(!pg->is_peering()); @@ -2241,7 +2265,7 @@ PeeringState::Stray::Stray(my_context ctx) ldout(pg->cct,10) << __func__ << " pool is deleted" << dendl; post_event(DeleteStart()); } else { - pg->start_flush(context< PeeringMachine >().get_cur_transaction()); + ps->start_flush(context< PeeringMachine >().get_cur_transaction()); } } @@ -2658,6 +2682,7 @@ boost::statechart::result PeeringState::GetLog::react(const MLogRec& logevt) boost::statechart::result PeeringState::GetLog::react(const GotLog&) { + PeeringState *ps = context< PeeringMachine >().state; PG *pg = context< PeeringMachine >().pg; psdout(10) << "leaving GetLog" << dendl; if (msg) { @@ -2666,7 +2691,7 @@ boost::statechart::result PeeringState::GetLog::react(const GotLog&) msg->info, msg->log, msg->missing, auth_log_shard); } - pg->start_flush(context< PeeringMachine >().get_cur_transaction()); + ps->start_flush(context< PeeringMachine >().get_cur_transaction()); return transit< GetMissing >(); } diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index b32f6914b7b..a9674471a41 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -71,12 +71,17 @@ public: virtual void send_cluster_message(int osd, Message *m, epoch_t epoch) = 0; + // Flush state + virtual bool try_flush_or_schedule_async() = 0; + virtual void start_flush_on_transaction( + ObjectStore::Transaction *t) = 0; + virtual void on_flushed() = 0; + virtual void check_recovery_sources(const OSDMapRef& newmap) = 0; virtual void on_pool_change() = 0; virtual void on_role_change() = 0; virtual void on_change(ObjectStore::Transaction *t) = 0; virtual void on_activate() = 0; - virtual void on_flushed() = 0; virtual void check_blacklisted_watchers() = 0; virtual ~PeeringListener() {} }; @@ -1152,6 +1157,7 @@ public: void purge_strays(); void update_history(const pg_history_t& new_history); void check_recovery_sources(const OSDMapRef& map); + void set_last_peering_reset(); public: PeeringState( @@ -1317,4 +1323,15 @@ public: bool is_repair() const { return state_test(PG_STATE_REPAIR); } bool is_empty() const { return info.last_update == eversion_t(0,0); } + // Flush control interface +private: + void start_flush(ObjectStore::Transaction *t) { + flushes_in_progress++; + pl->start_flush_on_transaction(t); + } +public: + bool needs_flush() const { + return flushes_in_progress > 0; + } + void complete_flush(); }; diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 3ff2ae824cf..9136127b799 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -1835,16 +1835,14 @@ void PrimaryLogPG::do_request( } } - if (flushes_in_progress > 0) { - dout(20) << flushes_in_progress - << " flushes_in_progress pending " - << "waiting for flush on " << op << dendl; + if (recovery_state.needs_flush()) { + dout(20) << "waiting for flush on " << op << dendl; waiting_for_flush.push_back(op); op->mark_delayed("waiting for flush"); return; } - ceph_assert(is_peered() && flushes_in_progress == 0); + ceph_assert(is_peered() && !recovery_state.needs_flush()); if (pgbackend->handle_message(op)) return; @@ -12047,11 +12045,7 @@ void PrimaryLogPG::apply_and_flush_repops(bool requeue) void PrimaryLogPG::on_flushed() { - ceph_assert(flushes_in_progress > 0); - flushes_in_progress--; - if (flushes_in_progress == 0) { - requeue_ops(waiting_for_flush); - } + requeue_ops(waiting_for_flush); if (!is_peered() || !is_primary()) { pair i; while (object_contexts.get_next(i.first, &i)) { -- 2.39.5