]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: add more peering facilities
authorKefu Chai <kchai@redhat.com>
Mon, 4 Mar 2019 02:58:12 +0000 (10:58 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 22 Mar 2019 05:24:15 +0000 (13:24 +0800)
* add PG::should_restart_peering() for telling if we should start
  a peering at seeing a new mapping
* add PG::start_peering_interval() for starting a peering
* add PG::activate() for activating a PG
* add PG::on_activated() to be called once a PG is activated
* add PG::send_to_osd() for sending message to given OSD

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 4a059d7193b72838e6273e78c141e9a91dc191d0..02da7e473d107e59b15ae5a9944de1f9e68244ed 100644 (file)
@@ -543,6 +543,226 @@ void PG::clear_primary_state()
   peer_activated.clear();
 }
 
+bool PG::should_restart_peering(int new_up_primary,
+                                int new_acting_primary,
+                                const std::vector<int>& new_up,
+                                const std::vector<int>& new_acting,
+                                cached_map_t last_map,
+                                cached_map_t osd_map) const
+{
+  auto pgid = info.pgid.pgid;
+  auto pool = last_map->get_pg_pool(pgid.pool());
+  if (!pool) {
+    return false;
+  }
+  auto new_pool = osd_map->get_pg_pool(pgid.pool());
+  if (!new_pool) {
+    return true;
+  }
+  if (PastIntervals::is_new_interval(
+        primary.osd,
+        new_acting_primary,
+        acting,
+        new_acting,
+        up_primary.osd,
+        new_up_primary,
+        up,
+        new_up,
+        pool->size,
+        new_pool->size,
+        pool->min_size,
+        new_pool->min_size,
+        pool->get_pg_num(),
+        new_pool->get_pg_num(),
+        pool->get_pg_num_pending(),
+        new_pool->get_pg_num_pending(),
+        last_map->test_flag(CEPH_OSDMAP_SORTBITWISE),
+        osd_map->test_flag(CEPH_OSDMAP_SORTBITWISE),
+        last_map->test_flag(CEPH_OSDMAP_RECOVERY_DELETES),
+        osd_map->test_flag(CEPH_OSDMAP_RECOVERY_DELETES),
+        pgid)) {
+    logger().info("new interval new_up {} new_acting {}",
+                  new_up, new_acting);
+    return true;
+  }
+  if (!last_map->is_up(whoami.osd) && osd_map->is_up(whoami.osd)) {
+    logger().info(" osd transitioned from down -> up");
+    return true;
+  }
+  return false;
+}
+
+template<class T>
+bool compare_n_set(T& v, const T& new_v)
+{
+  if (v != new_v) {
+    v = new_v;
+    return true;
+  } else {
+    return false;
+  }
+}
+
+void PG::start_peering_interval(int new_up_primary,
+                                int new_acting_primary,
+                                const std::vector<int>& new_up,
+                                const std::vector<int>& new_acting,
+                                cached_map_t last_map)
+{
+  // todo
+  update_last_peering_reset();
+
+  auto old_acting_primary = primary;
+  auto old_acting = std::move(acting);
+  auto old_up_primary = up_primary;
+  auto old_up = std::move(up);
+
+  update_primary_state(new_up, new_up_primary,
+                       new_acting, new_acting_primary);
+  if (compare_n_set(info.stats.up, up) +
+      compare_n_set(info.stats.up_primary, up_primary.osd) +
+      compare_n_set(info.stats.acting, acting) +
+      compare_n_set(info.stats.acting_primary, primary.osd)) {
+    info.stats.mapping_epoch = osdmap->get_epoch();
+  }
+  if (old_up_primary != up_primary || old_up != up) {
+    info.history.same_up_since = osdmap->get_epoch();
+  }
+  // this comparison includes primary rank via pg_shard_t
+  if (old_acting_primary != get_primary()) {
+    info.history.same_primary_since = osdmap->get_epoch();
+  }
+  // todo: always start a new interval
+  info.history.same_interval_since = osdmap->get_epoch();
+  // This will now be remapped during a backfill in cases
+  // that it would not have been before.
+  if (up != acting) {
+    set_state(PG_STATE_REMAPPED);
+  } else {
+    clear_state(PG_STATE_REMAPPED);
+  }
+  // deactivate.
+  clear_state(PG_STATE_ACTIVE);
+  clear_state(PG_STATE_PEERED);
+  clear_state(PG_STATE_DOWN);
+
+  acting_recovery_backfill.clear();
+  // should we tell the primary we are here?
+  should_notify_primary = !is_primary();
+}
+
+void PG::activate(epoch_t activation_epoch)
+{
+  clear_state(PG_STATE_DOWN);
+
+  if (is_primary()) {
+    // only update primary last_epoch_started if we will go active
+    if (acting.size() >= pool.min_size) {
+      info.last_epoch_started = activation_epoch;
+      info.last_interval_started = info.history.same_interval_since;
+    }
+  } else if (is_acting(whoami)) {
+    // update last_epoch_started on acting replica to whatever the primary sent
+    // unless it's smaller (could happen if we are going peered rather than
+    // active, see doc/dev/osd_internals/last_epoch_started.rst)
+    if (info.last_epoch_started < activation_epoch) {
+      info.last_epoch_started = activation_epoch;
+      info.last_interval_started = info.history.same_interval_since;
+    }
+  }
+  if (is_primary()) {
+    // start up replicas
+    seastar::do_for_each(
+      acting_recovery_backfill.begin(),
+      acting_recovery_backfill.end(),
+      [this](pg_shard_t peer) { return activate_peer(peer); });
+    set_state(PG_STATE_ACTIVATING);
+  } else {
+    // todo: write/commit pg log, activate myself, and then tell primary
+    on_activated();
+    pg_notify_t notify{get_primary().shard,
+                       whoami.shard,
+                       get_osdmap_epoch(),
+                       get_osdmap_epoch(),
+                       info};
+    auto m = make_message<MOSDPGInfo>(
+      get_osdmap_epoch(),
+      MOSDPGInfo::pg_list_t{make_pair(std::move(notify), PastIntervals{})});
+    send_to_osd(get_primary().osd, std::move(m), get_osdmap_epoch());
+  }
+  // todo:
+  info.last_complete = info.last_update;
+  update_need_up_thru();
+}
+
+void PG::on_activated()
+{
+  if (acting.size() >= pool.min_size) {
+    set_state(PG_STATE_ACTIVE);
+  } else {
+    set_state(PG_STATE_PEERED);
+  }
+}
+
+seastar::future<> PG::activate_peer(pg_shard_t peer)
+{
+  if (peer == whoami) {
+    // todo: write/commit pg log
+    peer_activated.insert(whoami);
+    return seastar::now();
+  }
+  auto& pi = peer_info[peer];
+  MOSDPGLog* m = nullptr;
+  if (pi.last_update == info.last_update) {
+    // empty log
+    logger().info("activate peer osd.{} is up to date, "
+                  "but sending pg_log anyway", peer);
+    m = new MOSDPGLog{peer.shard,
+                      whoami.shard,
+                      get_osdmap_epoch(),
+                      get_info(),
+                      get_last_peering_reset()};
+  } else if (pi.last_backfill.is_min()) {
+    logger().info("starting backfill to osd.{} from ({},{}] {} to {}", peer,
+                  pi.log_tail, pi.last_update,
+                  pi.last_backfill, info.last_update);
+    // backfill
+    pi.last_update = info.last_update;
+    pi.last_complete = info.last_update;
+    pi.last_epoch_started = info.last_epoch_started;
+    pi.last_interval_started = info.last_interval_started;
+    pi.history = info.history;
+    pi.hit_set = info.hit_set;
+    pi.stats.stats.clear();
+    pi.purged_snaps = info.purged_snaps;
+    m = new MOSDPGLog{peer.shard,
+                      whoami.shard,
+                      get_osdmap_epoch(),
+                      pi,
+                      get_last_peering_reset()};
+  } else {
+    // catch up
+    logger().info("send missing log to osd.{}", peer);
+    m = new MOSDPGLog{peer.shard,
+                      whoami.shard,
+                      get_osdmap_epoch(),
+                      get_info(),
+                      get_last_peering_reset()};
+    // todo. send pg_log
+    pi.last_update = info.last_update;
+  }
+  return send_to_osd(peer.osd, Ref<Message>{m, false}, get_osdmap_epoch());
+}
+
+void PG::maybe_mark_clean()
+{
+  if (actingset.size() == osdmap->get_pg_size(pgid.pgid)) {
+    set_state(PG_STATE_CLEAN);
+    info.history.last_epoch_clean = get_osdmap_epoch();
+    info.history.last_interval_clean = info.history.same_interval_since;
+  }
+}
+
 seastar::future<> PG::do_peering_event(std::unique_ptr<PGPeeringEvent> evt)
 {
   // todo
@@ -578,3 +798,41 @@ std::ostream& operator<<(std::ostream& os, const PG& pg)
   pg.print(os);
   return os;
 }
+
+seastar::future<> PG::send_to_osd(int peer, Ref<Message> m, epoch_t from_epoch)
+{
+  if (osdmap->is_down(peer) || osdmap->get_info(peer).up_from > from_epoch) {
+    return seastar::now();
+  } else {
+    return msgr.connect(osdmap->get_cluster_addrs(peer).legacy_addr(),
+                        CEPH_ENTITY_TYPE_OSD)
+     .then([m, this] (auto xconn) {
+       return (*xconn)->send(m);
+     });
+  }
+}
+
+seastar::future<> PG::share_pg_info()
+{
+  return seastar::do_for_each(
+    acting_recovery_backfill.begin(),
+    acting_recovery_backfill.end(),
+    [this](pg_shard_t peer) {
+      if (peer == whoami) return seastar::now();
+      if (auto pi = peer_info.find(peer); pi != peer_info.end()) {
+        pi->second.last_epoch_started = info.last_epoch_started;
+        pi->second.last_interval_started = info.last_interval_started;
+        pi->second.history.merge(info.history);
+      }
+      pg_notify_t notify{peer.shard,
+                         whoami.shard,
+                         get_osdmap_epoch(),
+                         get_osdmap_epoch(),
+                         info};
+      auto m = make_message<MOSDPGInfo>(
+        get_osdmap_epoch(),
+        MOSDPGInfo::pg_list_t{make_pair(std::move(notify),
+                                        past_intervals)});
+      return send_to_osd(peer.osd, m, get_osdmap_epoch());
+    });
+}
index 1d0c760cc5f3c694728a690af6c566795fc59414..5ab4807abeb270b91f074ed090bcda4cd555e1f1 100644 (file)
@@ -83,13 +83,31 @@ public:
   bool is_last_activated_peer(pg_shard_t peer);
   void clear_primary_state();
 
+  bool should_restart_peering(int new_up_primary,
+                             int new_acting_primary,
+                             const std::vector<int>& new_up,
+                             const std::vector<int>& new_acting,
+                             cached_map_t last_map,
+                             cached_map_t osd_map) const;
+  void start_peering_interval(int new_up_primary,
+                             int new_acting_primary,
+                             const std::vector<int>& new_up,
+                             const std::vector<int>& new_acting,
+                             cached_map_t last_map);
+  void activate(epoch_t activation_epoch);
+  void on_activated();
+  void maybe_mark_clean();
+
   seastar::future<> do_peering_event(std::unique_ptr<PGPeeringEvent> evt);
   seastar::future<> handle_advance_map(cached_map_t next_map);
   seastar::future<> handle_activate_map();
+  seastar::future<> share_pg_info();
 
   void print(ostream& os) const;
 
 private:
+  seastar::future<> activate_peer(pg_shard_t peer);
+  seastar::future<> send_to_osd(int peer, Ref<Message> m, epoch_t from_epoch);
   void update_primary_state(const std::vector<int>& new_up,
                            int new_up_primary,
                            const std::vector<int>& new_acting,