crimson/osd: maintain PG::want_acting
authorKefu Chai <kchai@redhat.com>
Sun, 3 Mar 2019 16:13:09 +0000 (00:13 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 22 Mar 2019 05:21:32 +0000 (13:21 +0800)
* add PG::choose_acting()
* add PG::proc_replica_info()

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index c76a986ff5d12b01c2e4060eb080ce3339924f87..8b59d3e16e263ade1f64b289ba8c9627112bdf9e 100644 (file)
@@ -1,5 +1,15 @@
 #include "pg.h"
 
+#include <functional>
+
+#include <boost/range/adaptor/filtered.hpp>
+#include <boost/range/adaptor/map.hpp>
+#include <boost/range/adaptor/transformed.hpp>
+#include <boost/range/algorithm/copy.hpp>
+#include <boost/range/algorithm/max_element.hpp>
+#include <boost/range/numeric.hpp>
+
+#include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGLog.h"
 #include "osd/OSDMap.h"
 
@@ -210,6 +220,296 @@ void PG::update_need_up_thru(const OSDMap* o)
   }
 }
 
+std::vector<int>
+PG::calc_acting(pg_shard_t auth_shard,
+                const vector<int>& acting,
+                const map<pg_shard_t, pg_info_t>& all_info) const
+{
+  // select primary
+  auto auth_log_shard = all_info.find(auth_shard);
+  auto primary = all_info.find(up_primary);
+  if (up.empty() ||
+      primary->second.is_incomplete() ||
+      primary->second.last_update < auth_log_shard->second.log_tail) {
+    ceph_assert(!auth_log_shard->second.is_incomplete());
+    logger().info("up[0] needs backfill, osd.{} selected as primary instead",
+                  auth_shard);
+    primary = auth_log_shard;
+  }
+  auto& [primary_shard_id, primary_info] = *primary;
+  logger().info("primary is osd.{} with {}",
+                primary_shard_id.osd, primary_info);
+
+  vector<int> want{primary_shard_id.osd};
+  // We include auth_log_shard->second.log_tail because in GetLog,
+  // we will request logs back to the min last_update over our
+  // acting_backfill set, which will result in our log being extended
+  // as far backwards as necessary to pick up any peers which can
+  // be log recovered by auth_log_shard's log
+  auto oldest_auth_log_entry =
+    std::min(primary_info.log_tail, auth_log_shard->second.log_tail);
+  // select replicas that have log contiguity with primary.
+  // prefer up, then acting, then any peer_info osds
+  auto get_shard = [](int osd) {
+    return pg_shard_t{osd, shard_id_t::NO_SHARD}; };
+  auto get_info = [&](int osd) -> const pg_info_t& {
+    return all_info.find(get_shard(osd))->second; };
+  auto is_good = [&, oldest_auth_log_entry](int osd) {
+    auto& info = get_info(osd);
+    return (!info.is_incomplete() &&
+            info.last_update >= oldest_auth_log_entry);
+  };
+  auto is_enough = [size=pool.get_size(), &want](int) {
+    return want.size() >= size;
+  };
+  std::vector<std::reference_wrapper<const vector<int>>> covered;
+  auto has_covered = [primary=primary_shard_id.osd, &covered](int osd) {
+    if (osd == primary)
+      return true;
+    for (auto& c : covered) {
+      if (std::find(c.get().begin(), c.get().end(), osd) != c.get().end()) {
+        return true;
+      }
+    }
+    return false;
+  };
+  boost::copy((up |
+               boost::adaptors::filtered(std::not_fn(is_enough)) |
+               boost::adaptors::filtered(std::not_fn(has_covered)) |
+               boost::adaptors::filtered(is_good)),
+              std::back_inserter(want));
+  if (is_enough(0))
+    return want;
+  covered.push_back(std::cref(up));
+  // let's select from acting. the later "last_update" is, the better
+  // sort by last_update, in descending order.
+  using cands_sorted_by_eversion_t = std::map<eversion_t,
+                                              pg_shard_t,
+                                              std::greater<eversion_t>>;
+  auto shard_to_osd = [](const pg_shard_t& shard) { return shard.osd; };
+  {
+    // This no longer has backfill OSDs, as they are covered above.
+    auto cands = boost::accumulate(
+      (acting |
+       boost::adaptors::filtered(std::not_fn(is_enough)) |
+       boost::adaptors::filtered(std::not_fn(has_covered)) |
+       boost::adaptors::filtered(is_good)),
+      cands_sorted_by_eversion_t{},
+      [&](cands_sorted_by_eversion_t& cands, int osd) {
+        cands.emplace(get_info(osd).last_update, get_shard(osd));
+        return std::move(cands);
+      });
+    boost::copy(cands |
+                boost::adaptors::map_values |
+                boost::adaptors::transformed(shard_to_osd),
+                std::back_inserter(want));
+    if (is_enough(0)) {
+      return want;
+    }
+    covered.push_back(std::cref(acting));
+  }
+  // continue to search stray for more suitable peers
+  {
+    auto pi_to_osd = [](const peer_info_t::value_type& pi) {
+      return pi.first.osd; };
+    auto cands = boost::accumulate(
+      (all_info |
+       boost::adaptors::transformed(pi_to_osd) |
+       boost::adaptors::filtered(std::not_fn(is_enough)) |
+       boost::adaptors::filtered(std::not_fn(has_covered)) |
+       boost::adaptors::filtered(is_good)),
+      cands_sorted_by_eversion_t{},
+      [&](cands_sorted_by_eversion_t& cands, int osd) {
+        cands.emplace(get_info(osd).last_update, get_shard(osd));
+        return cands;
+      });
+    boost::copy(cands |
+                boost::adaptors::map_values |
+                boost::adaptors::transformed(shard_to_osd),
+                std::back_inserter(want));
+  }
+  return want;
+}
+
+bool PG::proc_replica_info(pg_shard_t from,
+                           const pg_info_t& pg_info,
+                           epoch_t send_epoch)
+{
+
+  if (auto found = peer_info.find(from);
+      found != peer_info.end() &&
+      found->second.last_update == pg_info.last_update) {
+    logger().info("got info {} from osd.{}, identical to ours",
+                  info, from);
+    return false;
+  } else if (!osdmap->has_been_up_since(from.osd, send_epoch)) {
+    logger().info("got info {} from down osd.{}. discarding",
+                  info, from);
+    return false;
+  } else {
+    logger().info("got info {} from osd.{}", info, from);
+    peer_info.emplace(from, pg_info);
+    return true;
+  }
+}
+
+void PG::proc_replica_log(pg_shard_t from,
+                          const pg_info_t& pg_info,
+                          const pg_log_t& pg_log,
+                          const pg_missing_t& pg_missing)
+{
+
+  logger().info("{} for osd.{}: {} {} {}", from, pg_info, pg_log, pg_missing);
+  peer_info.insert_or_assign(from, pg_info);
+}
+
+// Returns an iterator to the best info in infos sorted by:
+//  1) Prefer newer last_update
+//  2) Prefer longer tail if it brings another info into contiguity
+//  3) Prefer current primary
+pg_shard_t
+PG::find_best_info(const PG::peer_info_t& infos) const
+{
+  // See doc/dev/osd_internals/last_epoch_started.rst before attempting
+  // to make changes to this process.  Also, make sure to update it
+  // when you find bugs!
+  auto min_last_update_acceptable = eversion_t::max();
+  epoch_t max_les = 0;
+  for (auto& [shard, info] : infos) {
+    if (max_les < info.history.last_epoch_started) {
+      max_les = info.history.last_epoch_started;
+    }
+    if (!info.is_incomplete() &&
+        max_les < info.last_epoch_started) {
+      max_les = info.last_epoch_started;
+    }
+  }
+  for (auto& [shard, info] : infos) {
+    if (max_les <= info.last_epoch_started &&
+        min_last_update_acceptable > info.last_update) {
+      min_last_update_acceptable = info.last_update;
+    }
+  }
+  if (min_last_update_acceptable == eversion_t::max()) {
+    return pg_shard_t{};
+  }
+  // find osd with newest last_update (oldest for ec_pool).
+  // if there are multiples, prefer
+  //  - a longer tail, if it brings another peer into log contiguity
+  //  - the current primary
+  struct is_good {
+    // boost::max_element() copies the filter function, so make it copyable
+    eversion_t min_last_update_acceptable;
+    epoch_t max_les;
+    const PG* thiz;
+    is_good(eversion_t min_lua, epoch_t max_les, const PG* thiz)
+      : min_last_update_acceptable{min_lua}, max_les{max_les}, thiz{thiz} {}
+    is_good(const is_good& rhs) = default;
+    is_good& operator=(const is_good& rhs) = default;
+    bool operator()(const PG::peer_info_t::value_type& pi) const {
+      auto& [shard, info] = pi;
+      if (!thiz->is_up(shard) && !thiz->is_acting(shard)) {
+        return false;
+        // Only consider peers with last_update >= min_last_update_acceptable
+      } else if (info.last_update < min_last_update_acceptable) {
+        return false;
+        // Disqualify anyone with a too old last_epoch_started
+      } else if (info.last_epoch_started < max_les) {
+        return false;
+        // Disqualify anyone who is incomplete (not fully backfilled)
+      } else if (info.is_incomplete()) {
+        return false;
+      } else {
+        return true;
+      }
+    }
+  };
+  auto better = [require_rollback=pool.require_rollback(), this]
+    (const PG::peer_info_t::value_type& lhs,
+     const PG::peer_info_t::value_type& rhs) {
+    if (require_rollback) {
+      // prefer older last_update for ec_pool
+      if (lhs.second.last_update > rhs.second.last_update) {
+        return true;
+      } else if (lhs.second.last_update < rhs.second.last_update) {
+        return false;
+      }
+    } else {
+      // prefer newer last_update for replica pool
+      if (lhs.second.last_update > rhs.second.last_update) {
+        return false;
+      } else if (lhs.second.last_update < rhs.second.last_update) {
+        return true;
+      }
+    }
+    // Prefer longer tail
+    if (lhs.second.log_tail > rhs.second.log_tail) {
+      return true;
+    } else if (lhs.second.log_tail < rhs.second.log_tail) {
+      return false;
+    }
+    // prefer complete to missing
+    if (lhs.second.has_missing() && !rhs.second.has_missing()) {
+      return true;
+    } else if (!lhs.second.has_missing() && rhs.second.has_missing()) {
+      return false;
+    }
+    // prefer current primary (usually the caller), all things being equal
+    if (rhs.first == whoami) {
+      return true;
+    } else if (lhs.first == whoami) {
+      return false;
+    }
+    return false;
+  };
+  auto good_infos =
+    (infos | boost::adaptors::filtered(is_good{min_last_update_acceptable,
+                                               max_les, this}));
+  if (good_infos.empty()) {
+    return pg_shard_t{};
+  } else {
+    return boost::max_element(good_infos, better)->first;
+  }
+}
+
+std::pair<PG::choose_acting_t, pg_shard_t> PG::choose_acting()
+{
+  auto all_info = peer_info;
+  all_info.emplace(whoami, info);
+
+  auto auth_log_shard = find_best_info(all_info);
+  if (auth_log_shard.is_undefined()) {
+    if (up != acting) {
+      logger().info("{} no suitable info found (incomplete backfills?), "
+                    "reverting to up", __func__);
+      want_acting = up;
+      // todo: reset pg_temp
+      return {choose_acting_t::should_change, auth_log_shard};
+    } else {
+      logger().info("{} failed ", __func__);
+      ceph_assert(want_acting.empty());
+      return {choose_acting_t::pg_incomplete, auth_log_shard};
+    }
+  }
+
+  auto want = calc_acting(auth_log_shard, acting, all_info);
+  if (want != acting) {
+    logger().info("{} want {} != acting {}, requesting pg_temp change",
+                  __func__, want, acting);
+    want_acting = std::move(want);
+    // todo: update pg temp
+    return {choose_acting_t::should_change, auth_log_shard};
+  } else {
+    logger().info("{} want={}", __func__, want);
+    want_acting.clear();
+    acting_recovery_backfill.clear();
+    std::transform(want.begin(), want.end(),
+      std::inserter(acting_recovery_backfill, acting_recovery_backfill.end()),
+      [](int osd) { return pg_shard_t{osd, shard_id_t::NO_SHARD}; });
+    return {choose_acting_t::dont_change, auth_log_shard};
+  }
+}
+
 seastar::future<> PG::do_peering_event(std::unique_ptr<PGPeeringEvent> evt)
 {
   // todo
index d2fb3535dd4e5e6dde46f9e66df9e77c77d74d67..a630c4417d8bf6d64f12365ba051de4a1e7b8ac2 100644 (file)
@@ -55,6 +55,26 @@ public:
   epoch_t get_need_up_thru() const;
   void update_need_up_thru(const OSDMap* o = nullptr);
 
+  bool proc_replica_info(pg_shard_t from,
+                        const pg_info_t& pg_info,
+                        epoch_t send_epoch);
+  void proc_replica_log(pg_shard_t from,
+                       const pg_info_t& pg_info,
+                       const pg_log_t& pg_log,
+                       const pg_missing_t& pg_missing);
+
+  using peer_info_t = std::map<pg_shard_t, pg_info_t>;
+  pg_shard_t find_best_info(const PG::peer_info_t& infos) const;
+  enum class choose_acting_t {
+    dont_change,
+    should_change,
+    pg_incomplete,
+  };
+  std::vector<int>
+  calc_acting(pg_shard_t auth_shard,
+             const vector<int>& acting,
+             const map<pg_shard_t, pg_info_t>& all_info) const;
+  std::pair<choose_acting_t, pg_shard_t> choose_acting();
   seastar::future<> read_state(ceph::os::CyanStore* store);
 
   // peering/recovery
@@ -75,6 +95,10 @@ private:
   epoch_t last_peering_reset = 0;
   epoch_t need_up_thru = 0;
 
+
+  // peer_info    -- projected (updates _before_ replicas ack)
+  peer_info_t peer_info; //< info from peers (stray or prior)
+
   //< pg state
   pg_info_t info;
   //< last written info, for fast info persistence
@@ -85,6 +109,8 @@ private:
   pg_shard_t primary, up_primary;
   std::vector<int> acting, up;
   pg_shard_set_t actingset, upset;
+  pg_shard_set_t acting_recovery_backfill;
+  std::vector<int> want_acting;
 
   cached_map_t osdmap;
   ceph::net::Messenger& msgr;