From f9b08c848963d9cbd7688cede4c850bb4c7dcc7a Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Mon, 4 Mar 2019 00:13:09 +0800 Subject: [PATCH] crimson/osd: maintain PG::want_acting * add PG::choose_acting() * add PG::proc_replica_info() Signed-off-by: Kefu Chai --- src/crimson/osd/pg.cc | 300 ++++++++++++++++++++++++++++++++++++++++++ src/crimson/osd/pg.h | 26 ++++ 2 files changed, 326 insertions(+) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index c76a986ff5d..8b59d3e16e2 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1,5 +1,15 @@ #include "pg.h" +#include + +#include +#include +#include +#include +#include +#include + +#include "messages/MOSDPGInfo.h" #include "messages/MOSDPGLog.h" #include "osd/OSDMap.h" @@ -210,6 +220,296 @@ void PG::update_need_up_thru(const OSDMap* o) } } +std::vector +PG::calc_acting(pg_shard_t auth_shard, + const vector& acting, + const map& all_info) const +{ + // select primary + auto auth_log_shard = all_info.find(auth_shard); + auto primary = all_info.find(up_primary); + if (up.empty() || + primary->second.is_incomplete() || + primary->second.last_update < auth_log_shard->second.log_tail) { + ceph_assert(!auth_log_shard->second.is_incomplete()); + logger().info("up[0] needs backfill, osd.{} selected as primary instead", + auth_shard); + primary = auth_log_shard; + } + auto& [primary_shard_id, primary_info] = *primary; + logger().info("primary is osd.{} with {}", + primary_shard_id.osd, primary_info); + + vector want{primary_shard_id.osd}; + // We include auth_log_shard->second.log_tail because in GetLog, + // we will request logs back to the min last_update over our + // acting_backfill set, which will result in our log being extended + // as far backwards as necessary to pick up any peers which can + // be log recovered by auth_log_shard's log + auto oldest_auth_log_entry = + std::min(primary_info.log_tail, auth_log_shard->second.log_tail); + // select replicas that have log contiguity with primary. + // prefer up, then acting, then any peer_info osds + auto get_shard = [](int osd) { + return pg_shard_t{osd, shard_id_t::NO_SHARD}; }; + auto get_info = [&](int osd) -> const pg_info_t& { + return all_info.find(get_shard(osd))->second; }; + auto is_good = [&, oldest_auth_log_entry](int osd) { + auto& info = get_info(osd); + return (!info.is_incomplete() && + info.last_update >= oldest_auth_log_entry); + }; + auto is_enough = [size=pool.get_size(), &want](int) { + return want.size() >= size; + }; + std::vector>> covered; + auto has_covered = [primary=primary_shard_id.osd, &covered](int osd) { + if (osd == primary) + return true; + for (auto& c : covered) { + if (std::find(c.get().begin(), c.get().end(), osd) != c.get().end()) { + return true; + } + } + return false; + }; + boost::copy((up | + boost::adaptors::filtered(std::not_fn(is_enough)) | + boost::adaptors::filtered(std::not_fn(has_covered)) | + boost::adaptors::filtered(is_good)), + std::back_inserter(want)); + if (is_enough(0)) + return want; + covered.push_back(std::cref(up)); + // let's select from acting. the later "last_update" is, the better + // sort by last_update, in descending order. + using cands_sorted_by_eversion_t = std::map>; + auto shard_to_osd = [](const pg_shard_t& shard) { return shard.osd; }; + { + // This no longer has backfill OSDs, as they are covered above. + auto cands = boost::accumulate( + (acting | + boost::adaptors::filtered(std::not_fn(is_enough)) | + boost::adaptors::filtered(std::not_fn(has_covered)) | + boost::adaptors::filtered(is_good)), + cands_sorted_by_eversion_t{}, + [&](cands_sorted_by_eversion_t& cands, int osd) { + cands.emplace(get_info(osd).last_update, get_shard(osd)); + return std::move(cands); + }); + boost::copy(cands | + boost::adaptors::map_values | + boost::adaptors::transformed(shard_to_osd), + std::back_inserter(want)); + if (is_enough(0)) { + return want; + } + covered.push_back(std::cref(acting)); + } + // continue to search stray for more suitable peers + { + auto pi_to_osd = [](const peer_info_t::value_type& pi) { + return pi.first.osd; }; + auto cands = boost::accumulate( + (all_info | + boost::adaptors::transformed(pi_to_osd) | + boost::adaptors::filtered(std::not_fn(is_enough)) | + boost::adaptors::filtered(std::not_fn(has_covered)) | + boost::adaptors::filtered(is_good)), + cands_sorted_by_eversion_t{}, + [&](cands_sorted_by_eversion_t& cands, int osd) { + cands.emplace(get_info(osd).last_update, get_shard(osd)); + return cands; + }); + boost::copy(cands | + boost::adaptors::map_values | + boost::adaptors::transformed(shard_to_osd), + std::back_inserter(want)); + } + return want; +} + +bool PG::proc_replica_info(pg_shard_t from, + const pg_info_t& pg_info, + epoch_t send_epoch) +{ + + if (auto found = peer_info.find(from); + found != peer_info.end() && + found->second.last_update == pg_info.last_update) { + logger().info("got info {} from osd.{}, identical to ours", + info, from); + return false; + } else if (!osdmap->has_been_up_since(from.osd, send_epoch)) { + logger().info("got info {} from down osd.{}. discarding", + info, from); + return false; + } else { + logger().info("got info {} from osd.{}", info, from); + peer_info.emplace(from, pg_info); + return true; + } +} + +void PG::proc_replica_log(pg_shard_t from, + const pg_info_t& pg_info, + const pg_log_t& pg_log, + const pg_missing_t& pg_missing) +{ + + logger().info("{} for osd.{}: {} {} {}", from, pg_info, pg_log, pg_missing); + peer_info.insert_or_assign(from, pg_info); +} + +// Returns an iterator to the best info in infos sorted by: +// 1) Prefer newer last_update +// 2) Prefer longer tail if it brings another info into contiguity +// 3) Prefer current primary +pg_shard_t +PG::find_best_info(const PG::peer_info_t& infos) const +{ + // See doc/dev/osd_internals/last_epoch_started.rst before attempting + // to make changes to this process. Also, make sure to update it + // when you find bugs! + auto min_last_update_acceptable = eversion_t::max(); + epoch_t max_les = 0; + for (auto& [shard, info] : infos) { + if (max_les < info.history.last_epoch_started) { + max_les = info.history.last_epoch_started; + } + if (!info.is_incomplete() && + max_les < info.last_epoch_started) { + max_les = info.last_epoch_started; + } + } + for (auto& [shard, info] : infos) { + if (max_les <= info.last_epoch_started && + min_last_update_acceptable > info.last_update) { + min_last_update_acceptable = info.last_update; + } + } + if (min_last_update_acceptable == eversion_t::max()) { + return pg_shard_t{}; + } + // find osd with newest last_update (oldest for ec_pool). + // if there are multiples, prefer + // - a longer tail, if it brings another peer into log contiguity + // - the current primary + struct is_good { + // boost::max_element() copies the filter function, so make it copyable + eversion_t min_last_update_acceptable; + epoch_t max_les; + const PG* thiz; + is_good(eversion_t min_lua, epoch_t max_les, const PG* thiz) + : min_last_update_acceptable{min_lua}, max_les{max_les}, thiz{thiz} {} + is_good(const is_good& rhs) = default; + is_good& operator=(const is_good& rhs) = default; + bool operator()(const PG::peer_info_t::value_type& pi) const { + auto& [shard, info] = pi; + if (!thiz->is_up(shard) && !thiz->is_acting(shard)) { + return false; + // Only consider peers with last_update >= min_last_update_acceptable + } else if (info.last_update < min_last_update_acceptable) { + return false; + // Disqualify anyone with a too old last_epoch_started + } else if (info.last_epoch_started < max_les) { + return false; + // Disqualify anyone who is incomplete (not fully backfilled) + } else if (info.is_incomplete()) { + return false; + } else { + return true; + } + } + }; + auto better = [require_rollback=pool.require_rollback(), this] + (const PG::peer_info_t::value_type& lhs, + const PG::peer_info_t::value_type& rhs) { + if (require_rollback) { + // prefer older last_update for ec_pool + if (lhs.second.last_update > rhs.second.last_update) { + return true; + } else if (lhs.second.last_update < rhs.second.last_update) { + return false; + } + } else { + // prefer newer last_update for replica pool + if (lhs.second.last_update > rhs.second.last_update) { + return false; + } else if (lhs.second.last_update < rhs.second.last_update) { + return true; + } + } + // Prefer longer tail + if (lhs.second.log_tail > rhs.second.log_tail) { + return true; + } else if (lhs.second.log_tail < rhs.second.log_tail) { + return false; + } + // prefer complete to missing + if (lhs.second.has_missing() && !rhs.second.has_missing()) { + return true; + } else if (!lhs.second.has_missing() && rhs.second.has_missing()) { + return false; + } + // prefer current primary (usually the caller), all things being equal + if (rhs.first == whoami) { + return true; + } else if (lhs.first == whoami) { + return false; + } + return false; + }; + auto good_infos = + (infos | boost::adaptors::filtered(is_good{min_last_update_acceptable, + max_les, this})); + if (good_infos.empty()) { + return pg_shard_t{}; + } else { + return boost::max_element(good_infos, better)->first; + } +} + +std::pair PG::choose_acting() +{ + auto all_info = peer_info; + all_info.emplace(whoami, info); + + auto auth_log_shard = find_best_info(all_info); + if (auth_log_shard.is_undefined()) { + if (up != acting) { + logger().info("{} no suitable info found (incomplete backfills?), " + "reverting to up", __func__); + want_acting = up; + // todo: reset pg_temp + return {choose_acting_t::should_change, auth_log_shard}; + } else { + logger().info("{} failed ", __func__); + ceph_assert(want_acting.empty()); + return {choose_acting_t::pg_incomplete, auth_log_shard}; + } + } + + auto want = calc_acting(auth_log_shard, acting, all_info); + if (want != acting) { + logger().info("{} want {} != acting {}, requesting pg_temp change", + __func__, want, acting); + want_acting = std::move(want); + // todo: update pg temp + return {choose_acting_t::should_change, auth_log_shard}; + } else { + logger().info("{} want={}", __func__, want); + want_acting.clear(); + acting_recovery_backfill.clear(); + std::transform(want.begin(), want.end(), + std::inserter(acting_recovery_backfill, acting_recovery_backfill.end()), + [](int osd) { return pg_shard_t{osd, shard_id_t::NO_SHARD}; }); + return {choose_acting_t::dont_change, auth_log_shard}; + } +} + seastar::future<> PG::do_peering_event(std::unique_ptr evt) { // todo diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index d2fb3535dd4..a630c4417d8 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -55,6 +55,26 @@ public: epoch_t get_need_up_thru() const; void update_need_up_thru(const OSDMap* o = nullptr); + bool proc_replica_info(pg_shard_t from, + const pg_info_t& pg_info, + epoch_t send_epoch); + void proc_replica_log(pg_shard_t from, + const pg_info_t& pg_info, + const pg_log_t& pg_log, + const pg_missing_t& pg_missing); + + using peer_info_t = std::map; + pg_shard_t find_best_info(const PG::peer_info_t& infos) const; + enum class choose_acting_t { + dont_change, + should_change, + pg_incomplete, + }; + std::vector + calc_acting(pg_shard_t auth_shard, + const vector& acting, + const map& all_info) const; + std::pair choose_acting(); seastar::future<> read_state(ceph::os::CyanStore* store); // peering/recovery @@ -75,6 +95,10 @@ private: epoch_t last_peering_reset = 0; epoch_t need_up_thru = 0; + + // peer_info -- projected (updates _before_ replicas ack) + peer_info_t peer_info; //< info from peers (stray or prior) + //< pg state pg_info_t info; //< last written info, for fast info persistence @@ -85,6 +109,8 @@ private: pg_shard_t primary, up_primary; std::vector acting, up; pg_shard_set_t actingset, upset; + pg_shard_set_t acting_recovery_backfill; + std::vector want_acting; cached_map_t osdmap; ceph::net::Messenger& msgr; -- 2.39.5