peer_activated.clear();
}
+bool PG::should_restart_peering(int new_up_primary,
+ int new_acting_primary,
+ const std::vector<int>& new_up,
+ const std::vector<int>& new_acting,
+ cached_map_t last_map,
+ cached_map_t osd_map) const
+{
+ auto pgid = info.pgid.pgid;
+ auto pool = last_map->get_pg_pool(pgid.pool());
+ if (!pool) {
+ return false;
+ }
+ auto new_pool = osd_map->get_pg_pool(pgid.pool());
+ if (!new_pool) {
+ return true;
+ }
+ if (PastIntervals::is_new_interval(
+ primary.osd,
+ new_acting_primary,
+ acting,
+ new_acting,
+ up_primary.osd,
+ new_up_primary,
+ up,
+ new_up,
+ pool->size,
+ new_pool->size,
+ pool->min_size,
+ new_pool->min_size,
+ pool->get_pg_num(),
+ new_pool->get_pg_num(),
+ pool->get_pg_num_pending(),
+ new_pool->get_pg_num_pending(),
+ last_map->test_flag(CEPH_OSDMAP_SORTBITWISE),
+ osd_map->test_flag(CEPH_OSDMAP_SORTBITWISE),
+ last_map->test_flag(CEPH_OSDMAP_RECOVERY_DELETES),
+ osd_map->test_flag(CEPH_OSDMAP_RECOVERY_DELETES),
+ pgid)) {
+ logger().info("new interval new_up {} new_acting {}",
+ new_up, new_acting);
+ return true;
+ }
+ if (!last_map->is_up(whoami.osd) && osd_map->is_up(whoami.osd)) {
+ logger().info(" osd transitioned from down -> up");
+ return true;
+ }
+ return false;
+}
+
+template<class T>
+bool compare_n_set(T& v, const T& new_v)
+{
+ if (v != new_v) {
+ v = new_v;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void PG::start_peering_interval(int new_up_primary,
+ int new_acting_primary,
+ const std::vector<int>& new_up,
+ const std::vector<int>& new_acting,
+ cached_map_t last_map)
+{
+ // todo
+ update_last_peering_reset();
+
+ auto old_acting_primary = primary;
+ auto old_acting = std::move(acting);
+ auto old_up_primary = up_primary;
+ auto old_up = std::move(up);
+
+ update_primary_state(new_up, new_up_primary,
+ new_acting, new_acting_primary);
+ if (compare_n_set(info.stats.up, up) +
+ compare_n_set(info.stats.up_primary, up_primary.osd) +
+ compare_n_set(info.stats.acting, acting) +
+ compare_n_set(info.stats.acting_primary, primary.osd)) {
+ info.stats.mapping_epoch = osdmap->get_epoch();
+ }
+ if (old_up_primary != up_primary || old_up != up) {
+ info.history.same_up_since = osdmap->get_epoch();
+ }
+ // this comparison includes primary rank via pg_shard_t
+ if (old_acting_primary != get_primary()) {
+ info.history.same_primary_since = osdmap->get_epoch();
+ }
+ // todo: always start a new interval
+ info.history.same_interval_since = osdmap->get_epoch();
+ // This will now be remapped during a backfill in cases
+ // that it would not have been before.
+ if (up != acting) {
+ set_state(PG_STATE_REMAPPED);
+ } else {
+ clear_state(PG_STATE_REMAPPED);
+ }
+ // deactivate.
+ clear_state(PG_STATE_ACTIVE);
+ clear_state(PG_STATE_PEERED);
+ clear_state(PG_STATE_DOWN);
+
+ acting_recovery_backfill.clear();
+ // should we tell the primary we are here?
+ should_notify_primary = !is_primary();
+}
+
+void PG::activate(epoch_t activation_epoch)
+{
+ clear_state(PG_STATE_DOWN);
+
+ if (is_primary()) {
+ // only update primary last_epoch_started if we will go active
+ if (acting.size() >= pool.min_size) {
+ info.last_epoch_started = activation_epoch;
+ info.last_interval_started = info.history.same_interval_since;
+ }
+ } else if (is_acting(whoami)) {
+ // update last_epoch_started on acting replica to whatever the primary sent
+ // unless it's smaller (could happen if we are going peered rather than
+ // active, see doc/dev/osd_internals/last_epoch_started.rst)
+ if (info.last_epoch_started < activation_epoch) {
+ info.last_epoch_started = activation_epoch;
+ info.last_interval_started = info.history.same_interval_since;
+ }
+ }
+ if (is_primary()) {
+ // start up replicas
+ seastar::do_for_each(
+ acting_recovery_backfill.begin(),
+ acting_recovery_backfill.end(),
+ [this](pg_shard_t peer) { return activate_peer(peer); });
+ set_state(PG_STATE_ACTIVATING);
+ } else {
+ // todo: write/commit pg log, activate myself, and then tell primary
+ on_activated();
+ pg_notify_t notify{get_primary().shard,
+ whoami.shard,
+ get_osdmap_epoch(),
+ get_osdmap_epoch(),
+ info};
+ auto m = make_message<MOSDPGInfo>(
+ get_osdmap_epoch(),
+ MOSDPGInfo::pg_list_t{make_pair(std::move(notify), PastIntervals{})});
+ send_to_osd(get_primary().osd, std::move(m), get_osdmap_epoch());
+ }
+ // todo:
+ info.last_complete = info.last_update;
+ update_need_up_thru();
+}
+
+void PG::on_activated()
+{
+ if (acting.size() >= pool.min_size) {
+ set_state(PG_STATE_ACTIVE);
+ } else {
+ set_state(PG_STATE_PEERED);
+ }
+}
+
+seastar::future<> PG::activate_peer(pg_shard_t peer)
+{
+ if (peer == whoami) {
+ // todo: write/commit pg log
+ peer_activated.insert(whoami);
+ return seastar::now();
+ }
+ auto& pi = peer_info[peer];
+ MOSDPGLog* m = nullptr;
+ if (pi.last_update == info.last_update) {
+ // empty log
+ logger().info("activate peer osd.{} is up to date, "
+ "but sending pg_log anyway", peer);
+ m = new MOSDPGLog{peer.shard,
+ whoami.shard,
+ get_osdmap_epoch(),
+ get_info(),
+ get_last_peering_reset()};
+ } else if (pi.last_backfill.is_min()) {
+ logger().info("starting backfill to osd.{} from ({},{}] {} to {}", peer,
+ pi.log_tail, pi.last_update,
+ pi.last_backfill, info.last_update);
+ // backfill
+ pi.last_update = info.last_update;
+ pi.last_complete = info.last_update;
+ pi.last_epoch_started = info.last_epoch_started;
+ pi.last_interval_started = info.last_interval_started;
+ pi.history = info.history;
+ pi.hit_set = info.hit_set;
+ pi.stats.stats.clear();
+ pi.purged_snaps = info.purged_snaps;
+ m = new MOSDPGLog{peer.shard,
+ whoami.shard,
+ get_osdmap_epoch(),
+ pi,
+ get_last_peering_reset()};
+ } else {
+ // catch up
+ logger().info("send missing log to osd.{}", peer);
+ m = new MOSDPGLog{peer.shard,
+ whoami.shard,
+ get_osdmap_epoch(),
+ get_info(),
+ get_last_peering_reset()};
+ // todo. send pg_log
+ pi.last_update = info.last_update;
+ }
+ return send_to_osd(peer.osd, Ref<Message>{m, false}, get_osdmap_epoch());
+}
+
+void PG::maybe_mark_clean()
+{
+ if (actingset.size() == osdmap->get_pg_size(pgid.pgid)) {
+ set_state(PG_STATE_CLEAN);
+ info.history.last_epoch_clean = get_osdmap_epoch();
+ info.history.last_interval_clean = info.history.same_interval_since;
+ }
+}
+
seastar::future<> PG::do_peering_event(std::unique_ptr<PGPeeringEvent> evt)
{
// todo
pg.print(os);
return os;
}
+
+seastar::future<> PG::send_to_osd(int peer, Ref<Message> m, epoch_t from_epoch)
+{
+ if (osdmap->is_down(peer) || osdmap->get_info(peer).up_from > from_epoch) {
+ return seastar::now();
+ } else {
+ return msgr.connect(osdmap->get_cluster_addrs(peer).legacy_addr(),
+ CEPH_ENTITY_TYPE_OSD)
+ .then([m, this] (auto xconn) {
+ return (*xconn)->send(m);
+ });
+ }
+}
+
+seastar::future<> PG::share_pg_info()
+{
+ return seastar::do_for_each(
+ acting_recovery_backfill.begin(),
+ acting_recovery_backfill.end(),
+ [this](pg_shard_t peer) {
+ if (peer == whoami) return seastar::now();
+ if (auto pi = peer_info.find(peer); pi != peer_info.end()) {
+ pi->second.last_epoch_started = info.last_epoch_started;
+ pi->second.last_interval_started = info.last_interval_started;
+ pi->second.history.merge(info.history);
+ }
+ pg_notify_t notify{peer.shard,
+ whoami.shard,
+ get_osdmap_epoch(),
+ get_osdmap_epoch(),
+ info};
+ auto m = make_message<MOSDPGInfo>(
+ get_osdmap_epoch(),
+ MOSDPGInfo::pg_list_t{make_pair(std::move(notify),
+ past_intervals)});
+ return send_to_osd(peer.osd, m, get_osdmap_epoch());
+ });
+}