From 34570c9427d6c1b909ed9b1f6fcbf73370ef828e Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Mon, 4 Mar 2019 10:58:12 +0800 Subject: [PATCH] crimson/osd: add more peering facilities * add PG::should_restart_peering() for telling if we should start a peering at seeing a new mapping * add PG::start_peering_interval() for starting a peering * add PG::activate() for activating a PG * add PG::on_activated() to be called once a PG is activated * add PG::send_to_osd() for sending message to given OSD Signed-off-by: Kefu Chai --- src/crimson/osd/pg.cc | 258 ++++++++++++++++++++++++++++++++++++++++++ src/crimson/osd/pg.h | 18 +++ 2 files changed, 276 insertions(+) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 4a059d7193b..02da7e473d1 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -543,6 +543,226 @@ void PG::clear_primary_state() peer_activated.clear(); } +bool PG::should_restart_peering(int new_up_primary, + int new_acting_primary, + const std::vector& new_up, + const std::vector& new_acting, + cached_map_t last_map, + cached_map_t osd_map) const +{ + auto pgid = info.pgid.pgid; + auto pool = last_map->get_pg_pool(pgid.pool()); + if (!pool) { + return false; + } + auto new_pool = osd_map->get_pg_pool(pgid.pool()); + if (!new_pool) { + return true; + } + if (PastIntervals::is_new_interval( + primary.osd, + new_acting_primary, + acting, + new_acting, + up_primary.osd, + new_up_primary, + up, + new_up, + pool->size, + new_pool->size, + pool->min_size, + new_pool->min_size, + pool->get_pg_num(), + new_pool->get_pg_num(), + pool->get_pg_num_pending(), + new_pool->get_pg_num_pending(), + last_map->test_flag(CEPH_OSDMAP_SORTBITWISE), + osd_map->test_flag(CEPH_OSDMAP_SORTBITWISE), + last_map->test_flag(CEPH_OSDMAP_RECOVERY_DELETES), + osd_map->test_flag(CEPH_OSDMAP_RECOVERY_DELETES), + pgid)) { + logger().info("new interval new_up {} new_acting {}", + new_up, new_acting); + return true; + } + if (!last_map->is_up(whoami.osd) && osd_map->is_up(whoami.osd)) { + logger().info(" osd transitioned from down -> up"); + return true; + } + return false; +} + +template +bool compare_n_set(T& v, const T& new_v) +{ + if (v != new_v) { + v = new_v; + return true; + } else { + return false; + } +} + +void PG::start_peering_interval(int new_up_primary, + int new_acting_primary, + const std::vector& new_up, + const std::vector& new_acting, + cached_map_t last_map) +{ + // todo + update_last_peering_reset(); + + auto old_acting_primary = primary; + auto old_acting = std::move(acting); + auto old_up_primary = up_primary; + auto old_up = std::move(up); + + update_primary_state(new_up, new_up_primary, + new_acting, new_acting_primary); + if (compare_n_set(info.stats.up, up) + + compare_n_set(info.stats.up_primary, up_primary.osd) + + compare_n_set(info.stats.acting, acting) + + compare_n_set(info.stats.acting_primary, primary.osd)) { + info.stats.mapping_epoch = osdmap->get_epoch(); + } + if (old_up_primary != up_primary || old_up != up) { + info.history.same_up_since = osdmap->get_epoch(); + } + // this comparison includes primary rank via pg_shard_t + if (old_acting_primary != get_primary()) { + info.history.same_primary_since = osdmap->get_epoch(); + } + // todo: always start a new interval + info.history.same_interval_since = osdmap->get_epoch(); + // This will now be remapped during a backfill in cases + // that it would not have been before. + if (up != acting) { + set_state(PG_STATE_REMAPPED); + } else { + clear_state(PG_STATE_REMAPPED); + } + // deactivate. + clear_state(PG_STATE_ACTIVE); + clear_state(PG_STATE_PEERED); + clear_state(PG_STATE_DOWN); + + acting_recovery_backfill.clear(); + // should we tell the primary we are here? + should_notify_primary = !is_primary(); +} + +void PG::activate(epoch_t activation_epoch) +{ + clear_state(PG_STATE_DOWN); + + if (is_primary()) { + // only update primary last_epoch_started if we will go active + if (acting.size() >= pool.min_size) { + info.last_epoch_started = activation_epoch; + info.last_interval_started = info.history.same_interval_since; + } + } else if (is_acting(whoami)) { + // update last_epoch_started on acting replica to whatever the primary sent + // unless it's smaller (could happen if we are going peered rather than + // active, see doc/dev/osd_internals/last_epoch_started.rst) + if (info.last_epoch_started < activation_epoch) { + info.last_epoch_started = activation_epoch; + info.last_interval_started = info.history.same_interval_since; + } + } + if (is_primary()) { + // start up replicas + seastar::do_for_each( + acting_recovery_backfill.begin(), + acting_recovery_backfill.end(), + [this](pg_shard_t peer) { return activate_peer(peer); }); + set_state(PG_STATE_ACTIVATING); + } else { + // todo: write/commit pg log, activate myself, and then tell primary + on_activated(); + pg_notify_t notify{get_primary().shard, + whoami.shard, + get_osdmap_epoch(), + get_osdmap_epoch(), + info}; + auto m = make_message( + get_osdmap_epoch(), + MOSDPGInfo::pg_list_t{make_pair(std::move(notify), PastIntervals{})}); + send_to_osd(get_primary().osd, std::move(m), get_osdmap_epoch()); + } + // todo: + info.last_complete = info.last_update; + update_need_up_thru(); +} + +void PG::on_activated() +{ + if (acting.size() >= pool.min_size) { + set_state(PG_STATE_ACTIVE); + } else { + set_state(PG_STATE_PEERED); + } +} + +seastar::future<> PG::activate_peer(pg_shard_t peer) +{ + if (peer == whoami) { + // todo: write/commit pg log + peer_activated.insert(whoami); + return seastar::now(); + } + auto& pi = peer_info[peer]; + MOSDPGLog* m = nullptr; + if (pi.last_update == info.last_update) { + // empty log + logger().info("activate peer osd.{} is up to date, " + "but sending pg_log anyway", peer); + m = new MOSDPGLog{peer.shard, + whoami.shard, + get_osdmap_epoch(), + get_info(), + get_last_peering_reset()}; + } else if (pi.last_backfill.is_min()) { + logger().info("starting backfill to osd.{} from ({},{}] {} to {}", peer, + pi.log_tail, pi.last_update, + pi.last_backfill, info.last_update); + // backfill + pi.last_update = info.last_update; + pi.last_complete = info.last_update; + pi.last_epoch_started = info.last_epoch_started; + pi.last_interval_started = info.last_interval_started; + pi.history = info.history; + pi.hit_set = info.hit_set; + pi.stats.stats.clear(); + pi.purged_snaps = info.purged_snaps; + m = new MOSDPGLog{peer.shard, + whoami.shard, + get_osdmap_epoch(), + pi, + get_last_peering_reset()}; + } else { + // catch up + logger().info("send missing log to osd.{}", peer); + m = new MOSDPGLog{peer.shard, + whoami.shard, + get_osdmap_epoch(), + get_info(), + get_last_peering_reset()}; + // todo. send pg_log + pi.last_update = info.last_update; + } + return send_to_osd(peer.osd, Ref{m, false}, get_osdmap_epoch()); +} + +void PG::maybe_mark_clean() +{ + if (actingset.size() == osdmap->get_pg_size(pgid.pgid)) { + set_state(PG_STATE_CLEAN); + info.history.last_epoch_clean = get_osdmap_epoch(); + info.history.last_interval_clean = info.history.same_interval_since; + } +} + seastar::future<> PG::do_peering_event(std::unique_ptr evt) { // todo @@ -578,3 +798,41 @@ std::ostream& operator<<(std::ostream& os, const PG& pg) pg.print(os); return os; } + +seastar::future<> PG::send_to_osd(int peer, Ref m, epoch_t from_epoch) +{ + if (osdmap->is_down(peer) || osdmap->get_info(peer).up_from > from_epoch) { + return seastar::now(); + } else { + return msgr.connect(osdmap->get_cluster_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD) + .then([m, this] (auto xconn) { + return (*xconn)->send(m); + }); + } +} + +seastar::future<> PG::share_pg_info() +{ + return seastar::do_for_each( + acting_recovery_backfill.begin(), + acting_recovery_backfill.end(), + [this](pg_shard_t peer) { + if (peer == whoami) return seastar::now(); + if (auto pi = peer_info.find(peer); pi != peer_info.end()) { + pi->second.last_epoch_started = info.last_epoch_started; + pi->second.last_interval_started = info.last_interval_started; + pi->second.history.merge(info.history); + } + pg_notify_t notify{peer.shard, + whoami.shard, + get_osdmap_epoch(), + get_osdmap_epoch(), + info}; + auto m = make_message( + get_osdmap_epoch(), + MOSDPGInfo::pg_list_t{make_pair(std::move(notify), + past_intervals)}); + return send_to_osd(peer.osd, m, get_osdmap_epoch()); + }); +} diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 1d0c760cc5f..5ab4807abeb 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -83,13 +83,31 @@ public: bool is_last_activated_peer(pg_shard_t peer); void clear_primary_state(); + bool should_restart_peering(int new_up_primary, + int new_acting_primary, + const std::vector& new_up, + const std::vector& new_acting, + cached_map_t last_map, + cached_map_t osd_map) const; + void start_peering_interval(int new_up_primary, + int new_acting_primary, + const std::vector& new_up, + const std::vector& new_acting, + cached_map_t last_map); + void activate(epoch_t activation_epoch); + void on_activated(); + void maybe_mark_clean(); + seastar::future<> do_peering_event(std::unique_ptr evt); seastar::future<> handle_advance_map(cached_map_t next_map); seastar::future<> handle_activate_map(); + seastar::future<> share_pg_info(); void print(ostream& os) const; private: + seastar::future<> activate_peer(pg_shard_t peer); + seastar::future<> send_to_osd(int peer, Ref m, epoch_t from_epoch); void update_primary_state(const std::vector& new_up, int new_up_primary, const std::vector& new_acting, -- 2.39.5