]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: cleanup and drop OSD::ShardDispatcher 54138/head
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 7 Aug 2023 08:58:12 +0000 (16:58 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Sun, 22 Oct 2023 08:35:06 +0000 (08:35 +0000)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit e3d910a16c4b6239c7cf523c5510bc76fadcb832)

src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/pg_shard_manager.cc
src/crimson/osd/pg_shard_manager.h

index 60caf3a164c62e60303b90f3c3a027cf34af53cc..cfe4f54ab2e5e9aa49769f45f9d28fb651998dac 100644 (file)
@@ -95,6 +95,9 @@ OSD::OSD(int id, uint32_t nonce,
     monc{new crimson::mon::Client{*public_msgr, *this}},
     mgrc{new crimson::mgr::Client{*public_msgr, *this}},
     store{store},
+    pg_shard_manager{osd_singleton_state,
+                     shard_services,
+                     pg_to_shard_mappings},
     // do this in background -- continuation rearms timer when complete
     tick_timer{[this] {
       std::ignore = update_heartbeat_peers(
@@ -160,11 +163,11 @@ CompatSet get_osd_initial_compat_set()
 
 seastar::future<> OSD::open_meta_coll()
 {
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
   return store.get_sharded_store().open_collection(
     coll_t::meta()
   ).then([this](auto ch) {
-    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
-    get_pg_shard_manager().init_meta_coll(ch, store.get_sharded_store());
+    pg_shard_manager.init_meta_coll(ch, store.get_sharded_store());
     return seastar::now();
   });
 }
@@ -376,10 +379,6 @@ seastar::future<> OSD::start()
         osd_singleton_state.local().recoverystate_perf,
         std::ref(store),
         std::ref(osd_states));
-    }).then([this] {
-      return shard_dispatchers.start(
-        std::ref(*this),
-        std::ref(pg_to_shard_mappings));
     });
   }).then([this] {
     heartbeat.reset(new Heartbeat{
@@ -395,24 +394,24 @@ seastar::future<> OSD::start()
   }).then([this] {
     return open_meta_coll();
   }).then([this] {
-    return get_pg_shard_manager().get_meta_coll().load_superblock(
+    return pg_shard_manager.get_meta_coll().load_superblock(
     ).handle_error(
       crimson::ct_error::assert_all("open_meta_coll error")
     );
   }).then([this](OSDSuperblock&& sb) {
     superblock = std::move(sb);
-    get_pg_shard_manager().set_superblock(superblock);
-    return get_pg_shard_manager().get_local_map(superblock.current_epoch);
+    pg_shard_manager.set_superblock(superblock);
+    return pg_shard_manager.get_local_map(superblock.current_epoch);
   }).then([this](OSDMapService::local_cached_map_t&& map) {
     osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(map));
-    return get_pg_shard_manager().update_map(std::move(map));
+    return pg_shard_manager.update_map(std::move(map));
   }).then([this] {
     return shard_services.invoke_on_all([this](auto &local_service) {
       local_service.local_state.osdmap_gate.got_map(osdmap->get_epoch());
     });
   }).then([this] {
     bind_epoch = osdmap->get_epoch();
-    return get_pg_shard_manager().load_pgs(store);
+    return pg_shard_manager.load_pgs(store);
   }).then([this] {
     uint64_t osd_required =
       CEPH_FEATURE_UID |
@@ -486,7 +485,7 @@ seastar::future<> OSD::start()
 
 seastar::future<> OSD::start_boot()
 {
-  get_pg_shard_manager().set_preboot();
+  pg_shard_manager.set_preboot();
   return monc->get_version("osdmap").then([this](auto&& ret) {
     auto [newest, oldest] = ret;
     return _preboot(oldest, newest);
@@ -528,7 +527,7 @@ seastar::future<> OSD::_preboot(version_t oldest, version_t newest)
 
 seastar::future<> OSD::_send_boot()
 {
-  get_pg_shard_manager().set_booting();
+  pg_shard_manager.set_booting();
 
   entity_addrvec_t public_addrs = public_msgr->get_myaddrs();
   entity_addrvec_t cluster_addrs = cluster_msgr->get_myaddrs();
@@ -608,12 +607,12 @@ seastar::future<> OSD::_add_me_to_crush()
   });
 }
 
-seastar::future<> OSD::ShardDispatcher::handle_command(
+seastar::future<> OSD::handle_command(
   crimson::net::ConnectionRef conn,
   Ref<MCommand> m)
 {
   ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
-  return osd.asok->handle_command(conn, std::move(m));
+  return asok->handle_command(conn, std::move(m));
 }
 
 /*
@@ -633,7 +632,7 @@ seastar::future<> OSD::start_asok_admin()
     asok->register_command(make_asok_hook<SendBeaconHook>(*this));
     asok->register_command(make_asok_hook<FlushPgStatsHook>(*this));
     asok->register_command(
-      make_asok_hook<DumpPGStateHistory>(std::as_const(get_pg_shard_manager())));
+      make_asok_hook<DumpPGStateHistory>(std::as_const(pg_shard_manager)));
     asok->register_command(make_asok_hook<DumpMetricsHook>());
     asok->register_command(make_asok_hook<DumpPerfCountersHook>());
     asok->register_command(make_asok_hook<InjectDataErrorHook>(get_shard_services()));
@@ -644,7 +643,7 @@ seastar::future<> OSD::start_asok_admin()
     // ops commands
     asok->register_command(
       make_asok_hook<DumpInFlightOpsHook>(
-       std::as_const(get_pg_shard_manager())));
+       std::as_const(pg_shard_manager)));
     asok->register_command(
       make_asok_hook<DumpHistoricOpsHook>(
        std::as_const(get_shard_services().get_registry())));
@@ -663,7 +662,7 @@ seastar::future<> OSD::stop()
   tick_timer.cancel();
   // see also OSD::shutdown()
   return prepare_to_stop().then([this] {
-    return get_pg_shard_manager().set_stopping();
+    return pg_shard_manager.set_stopping();
   }).then([this] {
     logger().debug("prepared to stop");
     public_msgr->stop();
@@ -672,20 +671,17 @@ seastar::future<> OSD::stop()
     return asok->stop().then([this] {
       return heartbeat->stop();
     }).then([this] {
-      return get_pg_shard_manager().stop_registries();
+      return pg_shard_manager.stop_registries();
     }).then([this] {
       return store.umount();
     }).then([this] {
       return store.stop();
     }).then([this] {
-      return get_pg_shard_manager().stop_pgs();
+      return pg_shard_manager.stop_pgs();
     }).then([this] {
       return monc->stop();
     }).then([this] {
       return mgrc->stop();
-    }).then([this] {
-      ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
-      return shard_dispatchers.stop();
     }).then([this] {
       return shard_services.stop();
     }).then([this] {
@@ -711,12 +707,12 @@ void OSD::dump_status(Formatter* f) const
   f->dump_stream("cluster_fsid") << superblock.cluster_fsid;
   f->dump_stream("osd_fsid") << superblock.osd_fsid;
   f->dump_unsigned("whoami", superblock.whoami);
-  f->dump_string("state", get_pg_shard_manager().get_osd_state_string());
+  f->dump_string("state", pg_shard_manager.get_osd_state_string());
   f->dump_unsigned("oldest_map", superblock.oldest_map);
   f->dump_unsigned("cluster_osdmap_trim_lower_bound",
                    superblock.cluster_osdmap_trim_lower_bound);
   f->dump_unsigned("newest_map", superblock.newest_map);
-  f->dump_unsigned("num_pgs", get_pg_shard_manager().get_num_pgs());
+  f->dump_unsigned("num_pgs", pg_shard_manager.get_num_pgs());
 }
 
 void OSD::print(std::ostream& out) const
@@ -725,70 +721,30 @@ void OSD::print(std::ostream& out) const
       << superblock.osd_fsid << " [" << superblock.oldest_map
       << "," << superblock.newest_map << "] "
       << "tlb:" << superblock.cluster_osdmap_trim_lower_bound
-      << " pgs:" << get_pg_shard_manager().get_num_pgs()
-      << "}";
-}
-
-void OSD::ShardDispatcher::print(std::ostream& out) const
-{
-  out << "{osd." << osd.superblock.whoami << " "
-      << osd.superblock.osd_fsid << " [" << osd.superblock.oldest_map
-      << "," << osd.superblock.newest_map << "] "
-      << " pgs:" << get_pg_shard_manager().get_num_pgs()
+      << " pgs:" << pg_shard_manager.get_num_pgs()
       << "}";
 }
 
 std::optional<seastar::future<>>
 OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 {
-  if (get_pg_shard_manager().is_stopping()) {
+  if (pg_shard_manager.is_stopping()) {
     return seastar::now();
   }
-  bool dispatched = true;
-  gate.dispatch_in_background(__func__, *this, [this, conn=std::move(conn),
-                                                m=std::move(m), &dispatched]() mutable {
-    switch (m->get_type()) {
-      case CEPH_MSG_OSD_MAP:
-      case CEPH_MSG_OSD_OP:
-      case MSG_OSD_PG_CREATE2:
-      case MSG_COMMAND:
-      case MSG_OSD_MARK_ME_DOWN:
-      case MSG_OSD_PG_PULL:
-      case MSG_OSD_PG_PUSH:
-      case MSG_OSD_PG_PUSH_REPLY:
-      case MSG_OSD_PG_RECOVERY_DELETE:
-      case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
-      case MSG_OSD_PG_SCAN:
-      case MSG_OSD_PG_BACKFILL:
-      case MSG_OSD_PG_BACKFILL_REMOVE:
-      case MSG_OSD_PG_LEASE:
-      case MSG_OSD_PG_LEASE_ACK:
-      case MSG_OSD_PG_NOTIFY2:
-      case MSG_OSD_PG_INFO2:
-      case MSG_OSD_PG_QUERY2:
-      case MSG_OSD_BACKFILL_RESERVE:
-      case MSG_OSD_RECOVERY_RESERVE:
-      case MSG_OSD_PG_LOG:
-      case MSG_OSD_REPOP:
-      case MSG_OSD_REPOPREPLY:
-      case MSG_OSD_SCRUB2:
-      case MSG_OSD_PG_UPDATE_LOG_MISSING:
-      case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
-      {
-        return shard_dispatchers.local().ms_dispatch(conn, std::move(m));
-      }
-      default:
-      {
-          dispatched = false;
-          return seastar::now();
-      }
-    }
+  auto maybe_ret = do_ms_dispatch(conn, std::move(m));
+  if (!maybe_ret.has_value()) {
+    return std::nullopt;
+  }
+
+  gate.dispatch_in_background(
+      __func__, *this, [ret=std::move(maybe_ret.value())]() mutable {
+    return std::move(ret);
   });
-  return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
+  return seastar::now();
 }
 
-seastar::future<>
-OSD::ShardDispatcher::ms_dispatch(
+std::optional<seastar::future<>>
+OSD::do_ms_dispatch(
    crimson::net::ConnectionRef conn,
    MessageRef m)
 {
@@ -800,11 +756,12 @@ OSD::ShardDispatcher::ms_dispatch(
       // FIXME: order is not guaranteed in this path
       return conn.get_foreign(
       ).then([this, m=std::move(m)](auto f_conn) {
-        return container().invoke_on(PRIMARY_CORE,
-            [f_conn=std::move(f_conn), m=std::move(m)]
-            (auto& local_dispatcher) mutable {
+        return seastar::smp::submit_to(PRIMARY_CORE,
+            [f_conn=std::move(f_conn), m=std::move(m), this]() mutable {
           auto conn = make_local_shared_foreign(std::move(f_conn));
-          return local_dispatcher.ms_dispatch(conn, std::move(m));
+          auto ret = do_ms_dispatch(conn, std::move(m));
+          assert(ret.has_value());
+          return std::move(ret.value());
         });
       });
     }
@@ -868,7 +825,7 @@ OSD::ShardDispatcher::ms_dispatch(
     return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
       MOSDPGUpdateLogMissingReply>(m));
   default:
-    return seastar::now();
+    return std::nullopt;
   }
 }
 
@@ -929,7 +886,7 @@ seastar::future<MessageURef> OSD::get_stats() const
   // MPGStats::had_map_for is not used since PGMonitor was removed
   auto m = crimson::make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
   m->osd_stat = osd_stat;
-  return get_pg_shard_manager().get_pg_stats(
+  return pg_shard_manager.get_pg_stats(
   ).then([m=std::move(m)](auto &&stats) mutable {
     m->pg_stat = std::move(stats);
     return seastar::make_ready_future<MessageURef>(std::move(m));
@@ -943,21 +900,7 @@ uint64_t OSD::send_pg_stats()
   return osd_stat.seq;
 }
 
-bool OSD::ShardDispatcher::require_mon_peer(
-  crimson::net::Connection *conn,
-  Ref<Message> m)
-{
-  if (!conn->peer_is_mon()) {
-    logger().info("{} received from non-mon {}, {}",
-                 __func__,
-                 conn->get_peer_addr(),
-                 *m);
-    return false;
-  }
-  return true;
-}
-
-seastar::future<> OSD::ShardDispatcher::handle_osd_map(Ref<MOSDMap> m)
+seastar::future<> OSD::handle_osd_map(Ref<MOSDMap> m)
 {
   /* Ensure that only one MOSDMap is processed at a time.  Allowing concurrent
   * processing may eventually be worthwhile, but such an implementation would
@@ -969,17 +912,17 @@ seastar::future<> OSD::ShardDispatcher::handle_osd_map(Ref<MOSDMap> m)
   * See https://tracker.ceph.com/issues/59165
   */
   ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
-  return osd.handle_osd_map_lock.lock().then([this, m] {
+  return handle_osd_map_lock.lock().then([this, m] {
     return _handle_osd_map(m);
   }).finally([this] {
-    return osd.handle_osd_map_lock.unlock();
+    return handle_osd_map_lock.unlock();
   });
 }
 
-seastar::future<> OSD::ShardDispatcher::_handle_osd_map(Ref<MOSDMap> m)
+seastar::future<> OSD::_handle_osd_map(Ref<MOSDMap> m)
 {
   logger().info("handle_osd_map {}", *m);
-  if (m->fsid != osd.superblock.cluster_fsid) {
+  if (m->fsid != superblock.cluster_fsid) {
     logger().warn("fsid mismatched");
     return seastar::now();
   }
@@ -991,16 +934,16 @@ seastar::future<> OSD::ShardDispatcher::_handle_osd_map(Ref<MOSDMap> m)
   const auto first = m->get_first();
   const auto last = m->get_last();
   logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]",
-                first, last, osd.superblock.newest_map,
+                first, last, superblock.newest_map,
                 m->cluster_osdmap_trim_lower_bound, m->newest_map);
   // make sure there is something new, here, before we bother flushing
   // the queues and such
-  if (last <= osd.superblock.newest_map) {
+  if (last <= superblock.newest_map) {
     return seastar::now();
   }
   // missing some?
   bool skip_maps = false;
-  epoch_t start = osd.superblock.newest_map + 1;
+  epoch_t start = superblock.newest_map + 1;
   if (first > start) {
     logger().info("handle_osd_map message skips epochs {}..{}",
                   start, first - 1);
@@ -1023,22 +966,22 @@ seastar::future<> OSD::ShardDispatcher::_handle_osd_map(Ref<MOSDMap> m)
                           [=, this](auto& t) {
     return pg_shard_manager.store_maps(t, start, m).then([=, this, &t] {
       // even if this map isn't from a mon, we may have satisfied our subscription
-      osd.monc->sub_got("osdmap", last);
-      if (!osd.superblock.oldest_map || skip_maps) {
-        osd.superblock.oldest_map = first;
+      monc->sub_got("osdmap", last);
+      if (!superblock.oldest_map || skip_maps) {
+        superblock.oldest_map = first;
       }
-      osd.superblock.newest_map = last;
-      osd.superblock.current_epoch = last;
+      superblock.newest_map = last;
+      superblock.current_epoch = last;
 
       // note in the superblock that we were clean thru the prior epoch
-      if (osd.boot_epoch && osd.boot_epoch >= osd.superblock.mounted) {
-        osd.superblock.mounted = osd.boot_epoch;
-        osd.superblock.clean_thru = last;
+      if (boot_epoch && boot_epoch >= superblock.mounted) {
+        superblock.mounted = boot_epoch;
+        superblock.clean_thru = last;
       }
-      pg_shard_manager.get_meta_coll().store_superblock(t, osd.superblock);
-      pg_shard_manager.set_superblock(osd.superblock);
+      pg_shard_manager.get_meta_coll().store_superblock(t, superblock);
+      pg_shard_manager.set_superblock(superblock);
       logger().debug("OSD::handle_osd_map: do_transaction...");
-      return osd.store.get_sharded_store().do_transaction(
+      return store.get_sharded_store().do_transaction(
        pg_shard_manager.get_meta_coll().collection(),
        std::move(t));
     });
@@ -1048,13 +991,13 @@ seastar::future<> OSD::ShardDispatcher::_handle_osd_map(Ref<MOSDMap> m)
   });
 }
 
-seastar::future<> OSD::ShardDispatcher::committed_osd_maps(
+seastar::future<> OSD::committed_osd_maps(
   version_t first,
   version_t last,
   Ref<MOSDMap> m)
 {
   ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
-  logger().info("osd.{}: committed_osd_maps({}, {})", osd.whoami, first, last);
+  logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
   // advance through the new maps
   return seastar::do_for_each(boost::make_counting_iterator(first),
                               boost::make_counting_iterator(last + 1),
@@ -1062,17 +1005,17 @@ seastar::future<> OSD::ShardDispatcher::committed_osd_maps(
     return pg_shard_manager.get_local_map(
       cur
     ).then([this](OSDMapService::local_cached_map_t&& o) {
-      osd.osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o));
+      osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o));
       return pg_shard_manager.update_map(std::move(o));
     }).then([this] {
       if (get_shard_services().get_up_epoch() == 0 &&
-         osd.osdmap->is_up(osd.whoami) &&
-         osd.osdmap->get_addrs(osd.whoami) == osd.public_msgr->get_myaddrs()) {
+         osdmap->is_up(whoami) &&
+         osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
        return pg_shard_manager.set_up_epoch(
-         osd.osdmap->get_epoch()
+         osdmap->get_epoch()
        ).then([this] {
-         if (!osd.boot_epoch) {
-           osd.boot_epoch = osd.osdmap->get_epoch();
+         if (!boot_epoch) {
+           boot_epoch = osdmap->get_epoch();
          }
        });
       } else {
@@ -1081,26 +1024,26 @@ seastar::future<> OSD::ShardDispatcher::committed_osd_maps(
     });
   }).then([m, this] {
     auto fut = seastar::now();
-    if (osd.osdmap->is_up(osd.whoami)) {
-      const auto up_from = osd.osdmap->get_up_from(osd.whoami);
+    if (osdmap->is_up(whoami)) {
+      const auto up_from = osdmap->get_up_from(whoami);
       logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
-                    osd.whoami, osd.osdmap->get_epoch(), up_from, osd.bind_epoch,
+                    whoami, osdmap->get_epoch(), up_from, bind_epoch,
                    pg_shard_manager.get_osd_state_string());
-      if (osd.bind_epoch < up_from &&
-          osd.osdmap->get_addrs(osd.whoami) == osd.public_msgr->get_myaddrs() &&
+      if (bind_epoch < up_from &&
+          osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
           pg_shard_manager.is_booting()) {
-        logger().info("osd.{}: activating...", osd.whoami);
+        logger().info("osd.{}: activating...", whoami);
         fut = pg_shard_manager.set_active().then([this] {
-          osd.beacon_timer.arm_periodic(
+          beacon_timer.arm_periodic(
             std::chrono::seconds(local_conf()->osd_beacon_report_interval));
          // timer continuation rearms when complete
-          osd.tick_timer.arm(
+          tick_timer.arm(
             std::chrono::seconds(TICK_INTERVAL));
         });
       }
     } else {
       if (pg_shard_manager.is_prestop()) {
-       osd.got_stop_ack();
+       got_stop_ack();
        return seastar::now();
       }
     }
@@ -1108,34 +1051,34 @@ seastar::future<> OSD::ShardDispatcher::committed_osd_maps(
       return check_osdmap_features().then([this] {
         // yay!
         logger().info("osd.{}: committed_osd_maps: broadcasting osdmaps up"
-                      " to {} epoch to pgs", osd.whoami, osd.osdmap->get_epoch());
-        return pg_shard_manager.broadcast_map_to_pgs(osd.osdmap->get_epoch());
+                      " to {} epoch to pgs", whoami, osdmap->get_epoch());
+        return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch());
       });
     });
   }).then([m, this] {
     if (pg_shard_manager.is_active()) {
-      logger().info("osd.{}: now active", osd.whoami);
-      if (!osd.osdmap->exists(osd.whoami) ||
-         osd.osdmap->is_stop(osd.whoami)) {
-        return osd.shutdown();
+      logger().info("osd.{}: now active", whoami);
+      if (!osdmap->exists(whoami) ||
+         osdmap->is_stop(whoami)) {
+        return shutdown();
       }
-      if (osd.should_restart()) {
-        return osd.restart();
+      if (should_restart()) {
+        return restart();
       } else {
         return seastar::now();
       }
     } else if (pg_shard_manager.is_preboot()) {
-      logger().info("osd.{}: now preboot", osd.whoami);
+      logger().info("osd.{}: now preboot", whoami);
 
       if (m->get_source().is_mon()) {
-        return osd._preboot(
+        return _preboot(
           m->cluster_osdmap_trim_lower_bound, m->newest_map);
       } else {
-        logger().info("osd.{}: start_boot", osd.whoami);
-        return osd.start_boot();
+        logger().info("osd.{}: start_boot", whoami);
+        return start_boot();
       }
     } else {
-      logger().info("osd.{}: now {}", osd.whoami,
+      logger().info("osd.{}: now {}", whoami,
                    pg_shard_manager.get_osd_state_string());
       // XXX
       return seastar::now();
@@ -1143,7 +1086,7 @@ seastar::future<> OSD::ShardDispatcher::committed_osd_maps(
   });
 }
 
-seastar::future<> OSD::ShardDispatcher::handle_osd_op(
+seastar::future<> OSD::handle_osd_op(
   crimson::net::ConnectionRef conn,
   Ref<MOSDOp> m)
 {
@@ -1153,7 +1096,7 @@ seastar::future<> OSD::ShardDispatcher::handle_osd_op(
     std::move(m)).second;
 }
 
-seastar::future<> OSD::ShardDispatcher::handle_pg_create(
+seastar::future<> OSD::handle_pg_create(
   crimson::net::ConnectionRef conn,
   Ref<MOSDPGCreate2> m)
 {
@@ -1190,7 +1133,7 @@ seastar::future<> OSD::ShardDispatcher::handle_pg_create(
   });
 }
 
-seastar::future<> OSD::ShardDispatcher::handle_update_log_missing(
+seastar::future<> OSD::handle_update_log_missing(
   crimson::net::ConnectionRef conn,
   Ref<MOSDPGUpdateLogMissing> m)
 {
@@ -1200,7 +1143,7 @@ seastar::future<> OSD::ShardDispatcher::handle_update_log_missing(
     std::move(m)).second;
 }
 
-seastar::future<> OSD::ShardDispatcher::handle_update_log_missing_reply(
+seastar::future<> OSD::handle_update_log_missing_reply(
   crimson::net::ConnectionRef conn,
   Ref<MOSDPGUpdateLogMissingReply> m)
 {
@@ -1210,7 +1153,7 @@ seastar::future<> OSD::ShardDispatcher::handle_update_log_missing_reply(
     std::move(m)).second;
 }
 
-seastar::future<> OSD::ShardDispatcher::handle_rep_op(
+seastar::future<> OSD::handle_rep_op(
   crimson::net::ConnectionRef conn,
   Ref<MOSDRepOp> m)
 {
@@ -1220,7 +1163,7 @@ seastar::future<> OSD::ShardDispatcher::handle_rep_op(
     std::move(m)).second;
 }
 
-seastar::future<> OSD::ShardDispatcher::handle_rep_op_reply(
+seastar::future<> OSD::handle_rep_op_reply(
   crimson::net::ConnectionRef conn,
   Ref<MOSDRepOpReply> m)
 {
@@ -1238,11 +1181,11 @@ seastar::future<> OSD::ShardDispatcher::handle_rep_op_reply(
     });
 }
 
-seastar::future<> OSD::ShardDispatcher::handle_scrub(
+seastar::future<> OSD::handle_scrub(
   crimson::net::ConnectionRef conn,
   Ref<MOSDScrub2> m)
 {
-  if (m->fsid != osd.superblock.cluster_fsid) {
+  if (m->fsid != superblock.cluster_fsid) {
     logger().warn("fsid mismatched");
     return seastar::now();
   }
@@ -1259,18 +1202,18 @@ seastar::future<> OSD::ShardDispatcher::handle_scrub(
   });
 }
 
-seastar::future<> OSD::ShardDispatcher::handle_mark_me_down(
+seastar::future<> OSD::handle_mark_me_down(
   crimson::net::ConnectionRef conn,
   Ref<MOSDMarkMeDown> m)
 {
   ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
   if (pg_shard_manager.is_prestop()) {
-    osd.got_stop_ack();
+    got_stop_ack();
   }
   return seastar::now();
 }
 
-seastar::future<> OSD::ShardDispatcher::handle_recovery_subreq(
+seastar::future<> OSD::handle_recovery_subreq(
   crimson::net::ConnectionRef conn,
   Ref<MOSDFastDispatchOp> m)
 {
@@ -1305,7 +1248,7 @@ seastar::future<> OSD::restart()
 {
   beacon_timer.cancel();
   tick_timer.cancel();
-  return get_pg_shard_manager().set_up_epoch(
+  return pg_shard_manager.set_up_epoch(
     0
   ).then([this] {
     bind_epoch = osdmap->get_epoch();
@@ -1324,7 +1267,7 @@ seastar::future<> OSD::shutdown()
 
 seastar::future<> OSD::send_beacon()
 {
-  if (!get_pg_shard_manager().is_active()) {
+  if (!pg_shard_manager.is_active()) {
     return seastar::now();
   }
   // FIXME: min lec should be calculated from pg_stat
@@ -1339,11 +1282,11 @@ seastar::future<> OSD::send_beacon()
 
 seastar::future<> OSD::update_heartbeat_peers()
 {
-  if (!get_pg_shard_manager().is_active()) {
+  if (!pg_shard_manager.is_active()) {
     return seastar::now();;
   }
 
-  get_pg_shard_manager().for_each_pgid([this](auto &pgid) {
+  pg_shard_manager.for_each_pgid([this](auto &pgid) {
     vector<int> up, acting;
     osdmap->pg_to_up_acting_osds(pgid.pgid,
                                  &up, nullptr,
@@ -1360,7 +1303,7 @@ seastar::future<> OSD::update_heartbeat_peers()
   return seastar::now();
 }
 
-seastar::future<> OSD::ShardDispatcher::handle_peering_op(
+seastar::future<> OSD::handle_peering_op(
   crimson::net::ConnectionRef conn,
   Ref<MOSDPeeringOp> m)
 {
@@ -1375,17 +1318,18 @@ seastar::future<> OSD::ShardDispatcher::handle_peering_op(
     std::move(*evt)).second;
 }
 
-seastar::future<> OSD::ShardDispatcher::check_osdmap_features()
+seastar::future<> OSD::check_osdmap_features()
 {
-  return osd.store.write_meta(
+  assert(seastar::this_shard_id() == PRIMARY_CORE);
+  return store.write_meta(
       "require_osd_release",
-      stringify((int)osd.osdmap->require_osd_release));
+      stringify((int)osdmap->require_osd_release));
 }
 
 seastar::future<> OSD::prepare_to_stop()
 {
   if (osdmap && osdmap->is_up(whoami)) {
-    get_pg_shard_manager().set_prestop();
+    pg_shard_manager.set_prestop();
     const auto timeout =
       std::chrono::duration_cast<std::chrono::milliseconds>(
        std::chrono::duration<double>(
index 49216909c900f3f1ded9c2f1e028f293c94c486d..10ff60d47017fa0ac3d5969387f2e44582905393 100644 (file)
@@ -62,76 +62,6 @@ class PG;
 class OSD final : public crimson::net::Dispatcher,
                  private crimson::common::AuthHandler,
                  private crimson::mgr::WithStats {
-public:
-  class ShardDispatcher
-    : public seastar::peering_sharded_service<ShardDispatcher> {
-  friend class OSD;
-  public:
-    ShardDispatcher(
-      OSD& osd,
-      PGShardMapping& pg_to_shard_mapping)
-    : pg_shard_manager(osd.osd_singleton_state,
-                       osd.shard_services, pg_to_shard_mapping),
-      osd(osd) {}
-    ~ShardDispatcher() = default;
-
-    // Dispatcher methods
-    seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef);
-
-  private:
-    bool require_mon_peer(crimson::net::Connection *conn, Ref<Message> m);
-
-    seastar::future<> handle_osd_map(Ref<MOSDMap> m);
-    seastar::future<> _handle_osd_map(Ref<MOSDMap> m);
-    seastar::future<> handle_pg_create(crimson::net::ConnectionRef conn,
-                                       Ref<MOSDPGCreate2> m);
-    seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn,
-                                    Ref<MOSDOp> m);
-    seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn,
-                                    Ref<MOSDRepOp> m);
-    seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn,
-                                          Ref<MOSDRepOpReply> m);
-    seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn,
-                                        Ref<MOSDPeeringOp> m);
-    seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn,
-                                             Ref<MOSDFastDispatchOp> m);
-    seastar::future<> handle_scrub(crimson::net::ConnectionRef conn,
-                                   Ref<MOSDScrub2> m);
-    seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn,
-                                          Ref<MOSDMarkMeDown> m);
-
-    seastar::future<> committed_osd_maps(version_t first,
-                                         version_t last,
-                                         Ref<MOSDMap> m);
-
-    seastar::future<> check_osdmap_features();
-
-    seastar::future<> handle_command(crimson::net::ConnectionRef conn,
-                                     Ref<MCommand> m);
-    seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn,
-                                                Ref<MOSDPGUpdateLogMissing> m);
-    seastar::future<> handle_update_log_missing_reply(
-      crimson::net::ConnectionRef conn,
-      Ref<MOSDPGUpdateLogMissingReply> m);
-
-  public:
-    void print(std::ostream&) const;
-
-    auto &get_pg_shard_manager() {
-      return pg_shard_manager;
-    }
-    auto &get_pg_shard_manager() const {
-      return pg_shard_manager;
-    }
-    ShardServices &get_shard_services() {
-      return pg_shard_manager.get_shard_services();
-    }
-
-  private:
-    crimson::osd::PGShardManager pg_shard_manager;
-    OSD& osd;
-  };
-
   const int whoami;
   const uint32_t nonce;
   seastar::abort_source& abort_source;
@@ -170,6 +100,8 @@ public:
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
   void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final;
 
+  std::optional<seastar::future<>> do_ms_dispatch(crimson::net::ConnectionRef, MessageRef);
+
   // mgr::WithStats methods
   // pg statistics including osd ones
   osd_stat_t osd_stat;
@@ -185,7 +117,8 @@ public:
   seastar::sharded<OSDSingletonState> osd_singleton_state;
   seastar::sharded<OSDState> osd_states;
   seastar::sharded<ShardServices> shard_services;
-  seastar::sharded<ShardDispatcher> shard_dispatchers;
+
+  crimson::osd::PGShardManager pg_shard_manager;
 
   std::unique_ptr<Heartbeat> heartbeat;
   seastar::timer<seastar::lowres_clock> tick_timer;
@@ -203,6 +136,10 @@ public:
       crimson::net::MessengerRef hb_back_msgr);
   ~OSD() final;
 
+  auto &get_pg_shard_manager() {
+    return pg_shard_manager;
+  }
+
   seastar::future<> open_meta_coll();
   static seastar::future<OSDMeta> open_or_create_meta_coll(
     crimson::os::FuturizedStore &store
@@ -224,18 +161,9 @@ public:
   uint64_t send_pg_stats();
 
   auto &get_shard_services() {
-    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
     return shard_services.local();
   }
 
-  auto &get_pg_shard_manager() {
-    return shard_dispatchers.local().get_pg_shard_manager();
-  }
-
-  auto &get_pg_shard_manager() const {
-    return shard_dispatchers.local().get_pg_shard_manager();
-  }
-
 private:
   static seastar::future<> _write_superblock(
     crimson::os::FuturizedStore &store,
@@ -256,6 +184,39 @@ private:
   void write_superblock(ceph::os::Transaction& t);
   seastar::future<> read_superblock();
 
+  seastar::future<> handle_osd_map(Ref<MOSDMap> m);
+  seastar::future<> _handle_osd_map(Ref<MOSDMap> m);
+  seastar::future<> handle_pg_create(crimson::net::ConnectionRef conn,
+                                     Ref<MOSDPGCreate2> m);
+  seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn,
+                                  Ref<MOSDOp> m);
+  seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn,
+                                  Ref<MOSDRepOp> m);
+  seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn,
+                                        Ref<MOSDRepOpReply> m);
+  seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn,
+                                      Ref<MOSDPeeringOp> m);
+  seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn,
+                                           Ref<MOSDFastDispatchOp> m);
+  seastar::future<> handle_scrub(crimson::net::ConnectionRef conn,
+                                 Ref<MOSDScrub2> m);
+  seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn,
+                                        Ref<MOSDMarkMeDown> m);
+
+  seastar::future<> committed_osd_maps(version_t first,
+                                       version_t last,
+                                       Ref<MOSDMap> m);
+
+  seastar::future<> check_osdmap_features();
+
+  seastar::future<> handle_command(crimson::net::ConnectionRef conn,
+                                   Ref<MCommand> m);
+  seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn,
+                                              Ref<MOSDPGUpdateLogMissing> m);
+  seastar::future<> handle_update_log_missing_reply(
+    crimson::net::ConnectionRef conn,
+    Ref<MOSDPGUpdateLogMissingReply> m);
+
 private:
   crimson::common::Gated gate;
 
@@ -283,15 +244,8 @@ inline std::ostream& operator<<(std::ostream& out, const OSD& osd) {
   return out;
 }
 
-inline std::ostream& operator<<(std::ostream& out,
-                                const OSD::ShardDispatcher& shard_dispatcher) {
-  shard_dispatcher.print(out);
-  return out;
-}
-
 }
 
 #if FMT_VERSION >= 90000
 template <> struct fmt::formatter<crimson::osd::OSD> : fmt::ostream_formatter {};
-template <> struct fmt::formatter<crimson::osd::OSD::ShardDispatcher> : fmt::ostream_formatter {};
 #endif
index 472376fbfb46033bf259be80d5eec0d820241dab..6061c856be26348435c5c74e878b378434ab8d32 100644 (file)
@@ -23,7 +23,7 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store)
         auto[coll, shard_core] = coll_core;
        spg_t pgid;
        if (coll.is_pg(&pgid)) {
-          return pg_to_shard_mapping.maybe_create_pg(
+          return get_pg_to_shard_mapping().maybe_create_pg(
             pgid, shard_core
           ).then([this, pgid] (auto core) {
             return this->template with_remote_shard_state(
index 949d3aedfde6f82a74503ad1000c4edb8843cdbe..2f3a3015d1cd67117e6888081c805873c4cf1243 100644 (file)
@@ -26,7 +26,7 @@ namespace crimson::osd {
 class PGShardManager {
   seastar::sharded<OSDSingletonState> &osd_singleton_state;
   seastar::sharded<ShardServices> &shard_services;
-  PGShardMapping &pg_to_shard_mapping;
+  seastar::sharded<PGShardMapping> &pg_to_shard_mapping;
 
 #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET)          \
   template <typename... Args>                                  \
@@ -50,7 +50,7 @@ public:
   PGShardManager(
     seastar::sharded<OSDSingletonState> &osd_singleton_state,
     seastar::sharded<ShardServices> &shard_services,
-    PGShardMapping &pg_to_shard_mapping)
+    seastar::sharded<PGShardMapping> &pg_to_shard_mapping)
   : osd_singleton_state(osd_singleton_state),
     shard_services(shard_services),
     pg_to_shard_mapping(pg_to_shard_mapping) {}
@@ -71,6 +71,8 @@ public:
   }
   auto &get_local_state() { return get_shard_services().local_state; }
   auto &get_local_state() const { return get_shard_services().local_state; }
+  auto &get_pg_to_shard_mapping() { return pg_to_shard_mapping.local(); }
+  auto &get_pg_to_shard_mapping() const { return pg_to_shard_mapping.local(); }
 
   seastar::future<> update_map(local_cached_map_t &&map) {
     get_osd_singleton_state().update_map(
@@ -187,7 +189,7 @@ public:
     logger.debug("{}: can_create", *op);
 
     get_local_state().registry.remove_from_registry(*op);
-    return pg_to_shard_mapping.maybe_create_pg(
+    return get_pg_to_shard_mapping().maybe_create_pg(
       op->get_pgid()
     ).then([this, op = std::move(op)](auto core) mutable {
       return this->template with_remote_shard_state_and_op<T>(
@@ -231,7 +233,7 @@ public:
     logger.debug("{}: !can_create", *op);
 
     get_local_state().registry.remove_from_registry(*op);
-    return pg_to_shard_mapping.maybe_create_pg(
+    return get_pg_to_shard_mapping().maybe_create_pg(
       op->get_pgid()
     ).then([this, op = std::move(op)](auto core) mutable {
       return this->template with_remote_shard_state_and_op<T>(
@@ -307,19 +309,19 @@ public:
    */
   template <typename F>
   void for_each_pgid(F &&f) const {
-    return pg_to_shard_mapping.for_each_pgid(
+    return get_pg_to_shard_mapping().for_each_pgid(
       std::forward<F>(f));
   }
 
   auto get_num_pgs() const {
-    return pg_to_shard_mapping.get_num_pgs();
+    return get_pg_to_shard_mapping().get_num_pgs();
   }
 
   seastar::future<> broadcast_map_to_pgs(epoch_t epoch);
 
   template <typename F>
   auto with_pg(spg_t pgid, F &&f) {
-    core_id_t core = pg_to_shard_mapping.get_pg_mapping(pgid);
+    core_id_t core = get_pg_to_shard_mapping().get_pg_mapping(pgid);
     return with_remote_shard_state(
       core,
       [pgid, f=std::move(f)](auto &local_state, auto &local_service) mutable {