]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd/: move proc_replica_info and some callees into PeeringState
authorSamuel Just <sjust@redhat.com>
Fri, 12 Apr 2019 18:08:53 +0000 (11:08 -0700)
committersjust@redhat.com <sjust@redhat.com>
Wed, 1 May 2019 18:22:12 +0000 (11:22 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/PG.cc
src/osd/PG.h
src/osd/PeeringState.cc
src/osd/PeeringState.h

index 66e3f26f5fe7594528947a0a794612ce406b9ce9..cfab9a0d179a1b2fc17c622c6043a0f6c01f7488 100644 (file)
@@ -366,44 +366,6 @@ void PG::proc_replica_log(
   peer_missing[from].claim(omissing);
 }
 
-bool PG::proc_replica_info(
-  pg_shard_t from, const pg_info_t &oinfo, epoch_t send_epoch)
-{
-  map<pg_shard_t, pg_info_t>::iterator p = peer_info.find(from);
-  if (p != peer_info.end() && p->second.last_update == oinfo.last_update) {
-    dout(10) << " got dup osd." << from << " info " << oinfo << ", identical to ours" << dendl;
-    return false;
-  }
-
-  if (!get_osdmap()->has_been_up_since(from.osd, send_epoch)) {
-    dout(10) << " got info " << oinfo << " from down osd." << from
-            << " discarding" << dendl;
-    return false;
-  }
-
-  dout(10) << " got osd." << from << " " << oinfo << dendl;
-  ceph_assert(is_primary());
-  peer_info[from] = oinfo;
-  might_have_unfound.insert(from);
-
-  update_history(oinfo.history);
-  
-  // stray?
-  if (!is_up(from) && !is_acting(from)) {
-    dout(10) << " osd." << from << " has stray content: " << oinfo << dendl;
-    stray_set.insert(from);
-    if (is_clean()) {
-      purge_strays();
-    }
-  }
-
-  // was this a new info?  if so, update peers!
-  if (p == peer_info.end())
-    update_heartbeat_peers();
-
-  return true;
-}
-
 void PG::remove_snap_mapped_object(
   ObjectStore::Transaction &t, const hobject_t &soid)
 {
@@ -860,30 +822,6 @@ bool PG::adjust_need_up_thru(const OSDMapRef osdmap)
   return false;
 }
 
-void PG::remove_down_peer_info(const OSDMapRef osdmap)
-{
-  // Remove any downed osds from peer_info
-  bool removed = false;
-  map<pg_shard_t, pg_info_t>::iterator p = peer_info.begin();
-  while (p != peer_info.end()) {
-    if (!osdmap->is_up(p->first.osd)) {
-      dout(10) << " dropping down osd." << p->first << " info " << p->second << dendl;
-      peer_missing.erase(p->first);
-      peer_log_requested.erase(p->first);
-      peer_missing_requested.erase(p->first);
-      peer_purged.erase(p->first); // so we can re-purge if necessary
-      peer_info.erase(p++);
-      removed = true;
-    } else
-      ++p;
-  }
-
-  // if we removed anyone, update peers (which include peer_info)
-  if (removed)
-    update_heartbeat_peers();
-  check_recovery_sources(osdmap);
-}
-
 /*
  * Returns true unless there is a non-lost OSD in might_have_unfound.
  */
@@ -3020,7 +2958,7 @@ void PG::purge_strays()
 
   // if we removed anyone, update peers (which include peer_info)
   if (removed)
-    update_heartbeat_peers();
+    recovery_state.update_heartbeat_peers();
 
   stray_set.clear();
 
@@ -3047,27 +2985,8 @@ void PG::clear_probe_targets()
   probe_targets.clear();
 }
 
-void PG::update_heartbeat_peers()
+void PG::update_heartbeat_peers(set<int> new_peers)
 {
-  ceph_assert(is_locked());
-
-  if (!is_primary())
-    return;
-
-  set<int> new_peers;
-  for (unsigned i=0; i<acting.size(); i++) {
-    if (acting[i] != CRUSH_ITEM_NONE)
-      new_peers.insert(acting[i]);
-  }
-  for (unsigned i=0; i<up.size(); i++) {
-    if (up[i] != CRUSH_ITEM_NONE)
-      new_peers.insert(up[i]);
-  }
-  for (map<pg_shard_t,pg_info_t>::iterator p = peer_info.begin();
-    p != peer_info.end();
-    ++p)
-    new_peers.insert(p->first.osd);
-
   bool need_update = false;
   heartbeat_peer_lock.Lock();
   if (new_peers == heartbeat_peers) {
index 688f49fd4f5331aac4662aabf614db678f9cf9bd..592e0da9d43bd488bd4f064a519134a5795c3cef 100644 (file)
@@ -1265,8 +1265,6 @@ protected:
   void check_past_interval_bounds() const;
   PastIntervals::PriorSet build_prior();
 
-  void remove_down_peer_info(const OSDMapRef osdmap);
-
   bool adjust_need_up_thru(const OSDMapRef osdmap);
 
   bool all_unfound_are_queried_or_lost(const OSDMapRef osdmap) const;
@@ -1299,9 +1297,6 @@ protected:
                        pg_missing_t& omissing, pg_shard_t from);
   void proc_master_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog,
                       pg_missing_t& omissing, pg_shard_t from);
-  bool proc_replica_info(
-    pg_shard_t from, const pg_info_t &info, epoch_t send_epoch);
-
   struct PGLogEntryHandler : public PGLog::LogEntryHandler {
     PG *pg;
     ObjectStore::Transaction *t;
@@ -1433,7 +1428,7 @@ protected:
 
   void purge_strays();
 
-  void update_heartbeat_peers();
+  void update_heartbeat_peers(set<int> peers) override;
 
   Context *finish_sync_event;
 
index 3eb88eeba820eda273e944553e5b799fea4701ab..d4483f0d0b34ed493dda325c455529aaf9c27808 100644 (file)
@@ -131,6 +131,91 @@ PeeringState::PeeringState(
   machine.initiate();
 }
 
+bool PeeringState::proc_replica_info(
+  pg_shard_t from, const pg_info_t &oinfo, epoch_t send_epoch)
+{
+  map<pg_shard_t, pg_info_t>::iterator p = peer_info.find(from);
+  if (p != peer_info.end() && p->second.last_update == oinfo.last_update) {
+    dout(10) << " got dup osd." << from << " info " << oinfo << ", identical to ours" << dendl;
+    return false;
+  }
+
+  if (!get_osdmap()->has_been_up_since(from.osd, send_epoch)) {
+    dout(10) << " got info " << oinfo << " from down osd." << from
+            << " discarding" << dendl;
+    return false;
+  }
+
+  dout(10) << " got osd." << from << " " << oinfo << dendl;
+  ceph_assert(is_primary());
+  peer_info[from] = oinfo;
+  might_have_unfound.insert(from);
+
+  pg->update_history(oinfo.history);
+
+  // stray?
+  if (!is_up(from) && !is_acting(from)) {
+    dout(10) << " osd." << from << " has stray content: " << oinfo << dendl;
+    stray_set.insert(from);
+    if (is_clean()) {
+      pg->purge_strays();
+    }
+  }
+
+  // was this a new info?  if so, update peers!
+  if (p == peer_info.end())
+    update_heartbeat_peers();
+
+  return true;
+}
+
+
+void PeeringState::remove_down_peer_info(const OSDMapRef &osdmap)
+{
+  // Remove any downed osds from peer_info
+  bool removed = false;
+  map<pg_shard_t, pg_info_t>::iterator p = peer_info.begin();
+  while (p != peer_info.end()) {
+    if (!osdmap->is_up(p->first.osd)) {
+      psdout(10) << " dropping down osd." << p->first << " info " << p->second << dendl;
+      peer_missing.erase(p->first);
+      peer_log_requested.erase(p->first);
+      peer_missing_requested.erase(p->first);
+      peer_purged.erase(p->first);
+      peer_info.erase(p++);
+      removed = true;
+    } else
+      ++p;
+  }
+
+  // if we removed anyone, update peers (which include peer_info)
+  if (removed)
+    update_heartbeat_peers();
+  pg->check_recovery_sources(osdmap);
+}
+
+void PeeringState::update_heartbeat_peers()
+{
+  if (!is_primary())
+    return;
+
+  set<int> new_peers;
+  for (unsigned i=0; i<acting.size(); i++) {
+    if (acting[i] != CRUSH_ITEM_NONE)
+      new_peers.insert(acting[i]);
+  }
+  for (unsigned i=0; i<up.size(); i++) {
+    if (up[i] != CRUSH_ITEM_NONE)
+      new_peers.insert(up[i]);
+  }
+  for (map<pg_shard_t,pg_info_t>::iterator p = peer_info.begin();
+       p != peer_info.end();
+       ++p) {
+    new_peers.insert(p->first.osd);
+  }
+  pl->update_heartbeat_peers(std::move(new_peers));
+}
+
 void PeeringState::write_if_dirty(ObjectStore::Transaction& t)
 {
   pl->prepare_write(
@@ -217,8 +302,9 @@ PeeringState::Initial::Initial(my_context ctx)
 
 boost::statechart::result PeeringState::Initial::react(const MNotifyRec& notify)
 {
+  PeeringState *ps = context< PeeringMachine >().state;
   PG *pg = context< PeeringMachine >().pg;
-  pg->proc_replica_info(
+  ps->proc_replica_info(
     notify.from, notify.notify.info, notify.notify.epoch_sent);
   pg->set_last_peering_reset();
   return transit< Primary >();
@@ -266,6 +352,7 @@ PeeringState::Started::react(const IntervalFlush&)
 
 boost::statechart::result PeeringState::Started::react(const AdvMap& advmap)
 {
+  PeeringState *ps = context< PeeringMachine >().state;
   PG *pg = context< PeeringMachine >().pg;
   psdout(10) << "Started advmap" << dendl;
   pg->check_full_transition(advmap.lastmap, advmap.osdmap);
@@ -281,7 +368,7 @@ boost::statechart::result PeeringState::Started::react(const AdvMap& advmap)
     post_event(advmap);
     return transit< Reset >();
   }
-  pg->remove_down_peer_info(advmap.osdmap);
+  ps->remove_down_peer_info(advmap.osdmap);
   return discard_event();
 }
 
@@ -324,6 +411,7 @@ PeeringState::Reset::react(const IntervalFlush&)
 
 boost::statechart::result PeeringState::Reset::react(const AdvMap& advmap)
 {
+  PeeringState *ps = context< PeeringMachine >().state;
   PG *pg = context< PeeringMachine >().pg;
   psdout(10) << "Reset advmap" << dendl;
 
@@ -344,13 +432,14 @@ boost::statechart::result PeeringState::Reset::react(const AdvMap& advmap)
       advmap.newacting, advmap.acting_primary,
       context< PeeringMachine >().get_cur_transaction());
   }
-  pg->remove_down_peer_info(advmap.osdmap);
+  ps->remove_down_peer_info(advmap.osdmap);
   pg->check_past_interval_bounds();
   return discard_event();
 }
 
 boost::statechart::result PeeringState::Reset::react(const ActMap&)
 {
+  PeeringState *ps = context< PeeringMachine >().state;
   PG *pg = context< PeeringMachine >().pg;
   if (pg->should_send_notify() && pg->get_primary().osd >= 0) {
     context< PeeringMachine >().send_notify(
@@ -363,7 +452,7 @@ boost::statechart::result PeeringState::Reset::react(const ActMap&)
       pg->past_intervals);
   }
 
-  pg->update_heartbeat_peers();
+  ps->update_heartbeat_peers();
   pg->take_waiters();
 
   return transit< Started >();
@@ -442,9 +531,9 @@ PeeringState::Primary::Primary(my_context ctx)
 
 boost::statechart::result PeeringState::Primary::react(const MNotifyRec& notevt)
 {
-  PG *pg = context< PeeringMachine >().pg;
+  PeeringState *ps = context< PeeringMachine >().state;
   psdout(7) << "handle_pg_notify from osd." << notevt.from << dendl;
-  pg->proc_replica_info(
+  ps->proc_replica_info(
     notevt.from, notevt.notify.info, notevt.notify.epoch_sent);
   return discard_event();
 }
@@ -1719,6 +1808,7 @@ boost::statechart::result PeeringState::Active::react(const ActMap&)
 
 boost::statechart::result PeeringState::Active::react(const MNotifyRec& notevt)
 {
+  PeeringState *ps = context< PeeringMachine >().state;
   PG *pg = context< PeeringMachine >().pg;
   ceph_assert(pg->is_primary());
   if (pg->peer_info.count(notevt.from)) {
@@ -1733,7 +1823,7 @@ boost::statechart::result PeeringState::Active::react(const MNotifyRec& notevt)
     psdout(10) << "Active: got notify from " << notevt.from
                       << ", calling proc_replica_info and discover_all_missing"
                       << dendl;
-    pg->proc_replica_info(
+    ps->proc_replica_info(
       notevt.from, notevt.notify.info, notevt.notify.epoch_sent);
     if (pg->have_unfound() || (pg->is_degraded() && pg->might_have_unfound.count(notevt.from))) {
       pg->discover_all_missing(*context< PeeringMachine >().get_query_map());
@@ -2289,6 +2379,7 @@ void PeeringState::GetInfo::get_infos()
 
 boost::statechart::result PeeringState::GetInfo::react(const MNotifyRec& infoevt)
 {
+  PeeringState *ps = context< PeeringMachine >().state;
   PG *pg = context< PeeringMachine >().pg;
 
   set<pg_shard_t>::iterator p = peer_info_requested.find(infoevt.from);
@@ -2298,7 +2389,7 @@ boost::statechart::result PeeringState::GetInfo::react(const MNotifyRec& infoevt
   }
 
   epoch_t old_start = pg->info.history.last_epoch_started;
-  if (pg->proc_replica_info(
+  if (ps->proc_replica_info(
        infoevt.from, infoevt.notify.info, infoevt.notify.epoch_sent)) {
     // we got something new ...
     PastIntervals::PriorSet &prior_set = context< Peering >().prior_set;
@@ -2655,9 +2746,9 @@ boost::statechart::result PeeringState::Incomplete::react(const AdvMap &advmap)
 }
 
 boost::statechart::result PeeringState::Incomplete::react(const MNotifyRec& notevt) {
-  PG *pg = context< PeeringMachine >().pg;
+  PeeringState *ps = context< PeeringMachine >().state;
   psdout(7) << "handle_pg_notify from osd." << notevt.from << dendl;
-  if (pg->proc_replica_info(
+  if (ps->proc_replica_info(
     notevt.from, notevt.notify.info, notevt.notify.epoch_sent)) {
     // We got something new, try again!
     return transit< GetLog >();
index 57a389ce8e8f298c0843a2706d680f6923a64874..0eb46d49b5b7d327a8116021f537ff5c91b8233f 100644 (file)
@@ -11,6 +11,7 @@
 #include <boost/statechart/transition.hpp>
 #include <boost/statechart/event_base.hpp>
 #include <string>
+#include <atomic>
 
 #include "PGLog.h"
 #include "PGStateUtils.h"
@@ -62,6 +63,7 @@ public:
       bool need_write_epoch,
       ObjectStore::Transaction &t) = 0;
     virtual void update_store_with_options(const pool_opts_t &opts) = 0;
+    virtual void update_heartbeat_peers(set<int> peers) = 0;
 
     virtual void on_pool_change() = 0;
     virtual void on_role_change() = 0;
@@ -260,8 +262,8 @@ public:
   /* States */
   struct Initial;
   class PeeringMachine : public boost::statechart::state_machine< PeeringMachine, Initial > {
-    PeeringState *state;
   public:
+    PeeringState *state;
     PGStateHistory *state_history;
     CephContext *cct;
     spg_t spgid;
@@ -1137,7 +1139,7 @@ public:
   void update_heartbeat_peers();
   bool proc_replica_info(
     pg_shard_t from, const pg_info_t &oinfo, epoch_t send_epoch);
-  void remove_down_peer_info(const OSDMapRef osdmap);
+  void remove_down_peer_info(const OSDMapRef &osdmap);
 
 public:
   PeeringState(