]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd/: move flush state into PeeringState
authorSamuel Just <sjust@redhat.com>
Wed, 27 Mar 2019 01:24:21 +0000 (18:24 -0700)
committersjust@redhat.com <sjust@redhat.com>
Wed, 1 May 2019 18:22:13 +0000 (11:22 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/PG.cc
src/osd/PG.h
src/osd/PeeringState.cc
src/osd/PeeringState.h
src/osd/PrimaryLogPG.cc

index 1c9ee2c5beb64824f118d81f89f03cb70cdf0670..8dc4171120c147e648efce1f4c12001aedd0f22a 100644 (file)
@@ -208,7 +208,6 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   last_complete_ondisk(recovery_state.last_complete_ondisk),
   last_update_applied(recovery_state.last_update_applied),
   last_rollback_info_trimmed_to_applied(recovery_state.last_rollback_info_trimmed_to_applied),
-  flushes_in_progress(recovery_state.flushes_in_progress),
   stray_set(recovery_state.stray_set),
   peer_info(recovery_state.peer_info),
   peer_bytes(recovery_state.peer_bytes),
@@ -1916,7 +1915,7 @@ void PG::_activate_committed(epoch_t epoch, epoch_t activation_epoch)
     osd->send_message_osd_cluster(get_primary().osd, m, get_osdmap_epoch());
 
     // waiters
-    if (flushes_in_progress == 0) {
+    if (recovery_state.needs_flush() == 0) {
       requeue_ops(waiting_for_peered);
     } else if (!waiting_for_peered.empty()) {
       dout(10) << __func__ << " flushes in progress, moving "
@@ -5800,51 +5799,39 @@ bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
   return false;
 }
 
-void PG::set_last_peering_reset()
-{
-  dout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl;
-  if (last_peering_reset != get_osdmap_epoch()) {
-    last_peering_reset = get_osdmap_epoch();
-    reset_interval_flush();
-  }
-}
-
 struct FlushState {
   PGRef pg;
   epoch_t epoch;
   FlushState(PG *pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
   ~FlushState() {
     pg->lock();
-    if (!pg->pg_has_reset_since(epoch))
-      pg->on_flushed();
+    if (!pg->pg_has_reset_since(epoch)) {
+      pg->recovery_state.complete_flush();
+    }
     pg->unlock();
   }
 };
 typedef std::shared_ptr<FlushState> FlushStateRef;
 
-void PG::start_flush(ObjectStore::Transaction *t)
+void PG::start_flush_on_transaction(ObjectStore::Transaction *t)
 {
   // flush in progress ops
   FlushStateRef flush_trigger (std::make_shared<FlushState>(
                                this, get_osdmap_epoch()));
-  flushes_in_progress++;
   t->register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
   t->register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
 }
 
-void PG::reset_interval_flush()
+bool PG::try_flush_or_schedule_async()
 {
-  dout(10) << "Clearing blocked outgoing recovery messages" << dendl;
-  recovery_state.clear_blocked_outgoing();
   
   Context *c = new QueuePeeringEvt<PeeringState::PeeringState::IntervalFlush>(
     this, get_osdmap_epoch(), PeeringState::IntervalFlush());
   if (!ch->flush_commit(c)) {
-    dout(10) << "Beginning to block outgoing recovery messages" << dendl;
-    recovery_state.begin_block_outgoing();
+    return false;
   } else {
-    dout(10) << "Not blocking outgoing recovery messages" << dendl;
     delete c;
+    return true;
   }
 }
 
@@ -5857,7 +5844,7 @@ void PG::start_peering_interval(
 {
   const OSDMapRef osdmap = get_osdmap();
 
-  set_last_peering_reset();
+  recovery_state.set_last_peering_reset();
 
   vector<int> oldacting, oldup;
   int oldrole = get_role();
index b1c4c16416284d3de74768e6e3bd7a1cd4ef3c7d..e16a2bc959acbe8e4e7957e4a77527e140c15749 100644 (file)
@@ -172,6 +172,7 @@ class PG : public DoutPrefixProvider, public PeeringState::PeeringListener {
   friend class NamedState;
   friend class PeeringState;
 
+protected:
   PeeringState recovery_state;
 public:
   using PeeringCtx = PeeringState::PeeringCtx;
@@ -203,7 +204,6 @@ protected:
   eversion_t &last_complete_ondisk;
   eversion_t &last_update_applied;
   eversion_t &last_rollback_info_trimmed_to_applied;
-  unsigned &flushes_in_progress;
   set<pg_shard_t> &stray_set;
   map<pg_shard_t, pg_info_t> &peer_info;
   map<pg_shard_t, int64_t> &peer_bytes;
@@ -1658,7 +1658,7 @@ protected:
     boost::optional<eversion_t> trim_to,
     boost::optional<eversion_t> roll_forward_to);
 
-  void reset_interval_flush();
+  bool try_flush_or_schedule_async() override;
   void start_peering_interval(
     const OSDMapRef lastmap,
     const vector<int>& newup, int up_primary,
@@ -1666,8 +1666,8 @@ protected:
     ObjectStore::Transaction *t);
   void on_new_interval();
   virtual void _on_new_interval() = 0;
-  void start_flush(ObjectStore::Transaction *t);
-  void set_last_peering_reset();
+  void start_flush_on_transaction(
+    ObjectStore::Transaction *t) override;
 
   void update_history(const pg_history_t& history) {
     recovery_state.update_history(history);
index 694aebae1599c0103c9437afa1cc503d2bc5a1c5..89010e2b6fdc39edee5a62561412d1c9b33165cc 100644 (file)
@@ -372,6 +372,28 @@ void PeeringState::activate_map(PeeringCtx *rctx)
   write_if_dirty(*rctx->transaction);
 }
 
+void PeeringState::set_last_peering_reset()
+{
+  psdout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl;
+  if (last_peering_reset != get_osdmap_epoch()) {
+    dout(10) << "Clearing blocked outgoing recovery messages" << dendl;
+    clear_blocked_outgoing();
+    if (!pl->try_flush_or_schedule_async()) {
+      psdout(10) << "Beginning to block outgoing recovery messages" << dendl;
+      begin_block_outgoing();
+    } else {
+      psdout(10) << "Not blocking outgoing recovery messages" << dendl;
+    }
+  }
+}
+
+void PeeringState::complete_flush()
+{
+  flushes_in_progress--;
+  if (flushes_in_progress == 0) {
+    pl->on_flushed();
+  }
+}
 
 /*------------ Peering State Machine----------------*/
 #undef dout_prefix
@@ -401,10 +423,9 @@ PeeringState::Initial::Initial(my_context ctx)
 boost::statechart::result PeeringState::Initial::react(const MNotifyRec& notify)
 {
   PeeringState *ps = context< PeeringMachine >().state;
-  PG *pg = context< PeeringMachine >().pg;
   ps->proc_replica_info(
     notify.from, notify.notify.info, notify.notify.epoch_sent);
-  pg->set_last_peering_reset();
+  ps->set_last_peering_reset();
   return transit< Primary >();
 }
 
@@ -444,7 +465,7 @@ boost::statechart::result
 PeeringState::Started::react(const IntervalFlush&)
 {
   psdout(10) << "Ending blocked outgoing recovery messages" << dendl;
-  context< PeeringMachine >().pg->recovery_state.end_block_outgoing();
+  context< PeeringMachine >().state->end_block_outgoing();
   return discard_event();
 }
 
@@ -493,17 +514,17 @@ PeeringState::Reset::Reset(my_context ctx)
     NamedState(context< PeeringMachine >().state_history, "Reset")
 {
   context< PeeringMachine >().log_enter(state_name);
-  PG *pg = context< PeeringMachine >().pg;
+  PeeringState *ps = context< PeeringMachine >().state;
 
-  pg->flushes_in_progress = 0;
-  pg->set_last_peering_reset();
+  ps->flushes_in_progress = 0;
+  ps->set_last_peering_reset();
 }
 
 boost::statechart::result
 PeeringState::Reset::react(const IntervalFlush&)
 {
   psdout(10) << "Ending blocked outgoing recovery messages" << dendl;
-  context< PeeringMachine >().pg->recovery_state.end_block_outgoing();
+  context< PeeringMachine >().state->end_block_outgoing();
   return discard_event();
 }
 
@@ -1699,13 +1720,14 @@ PeeringState::Active::Active(my_context ctx)
 {
   context< PeeringMachine >().log_enter(state_name);
 
+  PeeringState *ps = context< PeeringMachine >().state;
   PG *pg = context< PeeringMachine >().pg;
 
   ceph_assert(!pg->backfill_reserving);
   ceph_assert(!pg->backfill_reserved);
   ceph_assert(pg->is_primary());
   psdout(10) << "In Active, about to call activate" << dendl;
-  pg->start_flush(context< PeeringMachine >().get_cur_transaction());
+  ps->start_flush(context< PeeringMachine >().get_cur_transaction());
   pg->activate(*context< PeeringMachine >().get_cur_transaction(),
               pg->get_osdmap_epoch(),
               *context< PeeringMachine >().get_query_map(),
@@ -2048,6 +2070,7 @@ boost::statechart::result PeeringState::Active::react(const QueryState& q)
 
 boost::statechart::result PeeringState::Active::react(const AllReplicasActivated &evt)
 {
+  PeeringState *ps = context< PeeringMachine >().state;
   PG *pg = context< PeeringMachine >().pg;
   pg_t pgid = context< PeeringMachine >().spgid.pgid;
 
@@ -2092,7 +2115,7 @@ boost::statechart::result PeeringState::Active::react(const AllReplicasActivated
   pg->check_local();
 
   // waiters
-  if (pg->flushes_in_progress == 0) {
+  if (ps->flushes_in_progress == 0) {
     pg->requeue_ops(pg->waiting_for_peered);
   } else if (!pg->waiting_for_peered.empty()) {
     psdout(10) << __func__ << " flushes in progress, moving "
@@ -2136,8 +2159,8 @@ PeeringState::ReplicaActive::ReplicaActive(my_context ctx)
 {
   context< PeeringMachine >().log_enter(state_name);
 
-  PG *pg = context< PeeringMachine >().pg;
-  pg->start_flush(context< PeeringMachine >().get_cur_transaction());
+  PeeringState *ps = context< PeeringMachine >().state;
+  ps->start_flush(context< PeeringMachine >().get_cur_transaction());
 }
 
 
@@ -2232,6 +2255,7 @@ PeeringState::Stray::Stray(my_context ctx)
 {
   context< PeeringMachine >().log_enter(state_name);
 
+  PeeringState *ps = context< PeeringMachine >().state;
   PG *pg = context< PeeringMachine >().pg;
   ceph_assert(!pg->is_peered());
   ceph_assert(!pg->is_peering());
@@ -2241,7 +2265,7 @@ PeeringState::Stray::Stray(my_context ctx)
     ldout(pg->cct,10) << __func__ << " pool is deleted" << dendl;
     post_event(DeleteStart());
   } else {
-    pg->start_flush(context< PeeringMachine >().get_cur_transaction());
+    ps->start_flush(context< PeeringMachine >().get_cur_transaction());
   }
 }
 
@@ -2658,6 +2682,7 @@ boost::statechart::result PeeringState::GetLog::react(const MLogRec& logevt)
 
 boost::statechart::result PeeringState::GetLog::react(const GotLog&)
 {
+  PeeringState *ps = context< PeeringMachine >().state;
   PG *pg = context< PeeringMachine >().pg;
   psdout(10) << "leaving GetLog" << dendl;
   if (msg) {
@@ -2666,7 +2691,7 @@ boost::statechart::result PeeringState::GetLog::react(const GotLog&)
                        msg->info, msg->log, msg->missing,
                        auth_log_shard);
   }
-  pg->start_flush(context< PeeringMachine >().get_cur_transaction());
+  ps->start_flush(context< PeeringMachine >().get_cur_transaction());
   return transit< GetMissing >();
 }
 
index b32f6914b7b9971c934ee3296eeb815d7c44dc68..a9674471a41a4f4617cbb87af84312c3801cb23f 100644 (file)
@@ -71,12 +71,17 @@ public:
 
     virtual void send_cluster_message(int osd, Message *m, epoch_t epoch) = 0;
 
+    // Flush state
+    virtual bool try_flush_or_schedule_async() = 0;
+    virtual void start_flush_on_transaction(
+      ObjectStore::Transaction *t) = 0;
+    virtual void on_flushed() = 0;
+
     virtual void check_recovery_sources(const OSDMapRef& newmap) = 0;
     virtual void on_pool_change() = 0;
     virtual void on_role_change() = 0;
     virtual void on_change(ObjectStore::Transaction *t) = 0;
     virtual void on_activate() = 0;
-    virtual void on_flushed() = 0;
     virtual void check_blacklisted_watchers() = 0;
     virtual ~PeeringListener() {}
   };
@@ -1152,6 +1157,7 @@ public:
   void purge_strays();
   void update_history(const pg_history_t& new_history);
   void check_recovery_sources(const OSDMapRef& map);
+  void set_last_peering_reset();
 
 public:
   PeeringState(
@@ -1317,4 +1323,15 @@ public:
   bool is_repair() const { return state_test(PG_STATE_REPAIR); }
   bool is_empty() const { return info.last_update == eversion_t(0,0); }
 
+  // Flush control interface
+private:
+  void start_flush(ObjectStore::Transaction *t) {
+    flushes_in_progress++;
+    pl->start_flush_on_transaction(t);
+  }
+public:
+  bool needs_flush() const {
+    return flushes_in_progress > 0;
+  }
+  void complete_flush();
 };
index 3ff2ae824cfe8bdda1702ed1c6e5cb732517ff87..9136127b7998d18a8012235c16bcdb29a9e32329 100644 (file)
@@ -1835,16 +1835,14 @@ void PrimaryLogPG::do_request(
     }
   }
 
-  if (flushes_in_progress > 0) {
-    dout(20) << flushes_in_progress
-            << " flushes_in_progress pending "
-            << "waiting for flush on " << op << dendl;
+  if (recovery_state.needs_flush()) {
+    dout(20) << "waiting for flush on " << op << dendl;
     waiting_for_flush.push_back(op);
     op->mark_delayed("waiting for flush");
     return;
   }
 
-  ceph_assert(is_peered() && flushes_in_progress == 0);
+  ceph_assert(is_peered() && !recovery_state.needs_flush());
   if (pgbackend->handle_message(op))
     return;
 
@@ -12047,11 +12045,7 @@ void PrimaryLogPG::apply_and_flush_repops(bool requeue)
 
 void PrimaryLogPG::on_flushed()
 {
-  ceph_assert(flushes_in_progress > 0);
-  flushes_in_progress--;
-  if (flushes_in_progress == 0) {
-    requeue_ops(waiting_for_flush);
-  }
+  requeue_ops(waiting_for_flush);
   if (!is_peered() || !is_primary()) {
     pair<hobject_t, ObjectContextRef> i;
     while (object_contexts.get_next(i.first, &i)) {