]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: make osd sharded
authorchunmei <chunmei.liu@intel.com>
Fri, 23 Jun 2023 03:18:15 +0000 (03:18 +0000)
committerchunmei <chunmei.liu@intel.com>
Fri, 23 Jun 2023 03:18:15 +0000 (03:18 +0000)
Signed-off-by: chunmei <chunmei.liu@intel.com>
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/pg_shard_manager.cc
src/crimson/osd/pg_shard_manager.h
src/crimson/osd/shard_services.h

index b9f96ade818ee80d542d7f12a5ff6218ffa43e67..bf3a9e2d7c4118a5a082e2ba15709c6c3c1f018c 100644 (file)
@@ -162,7 +162,8 @@ seastar::future<> OSD::open_meta_coll()
   return store.get_sharded_store().open_collection(
     coll_t::meta()
   ).then([this](auto ch) {
-    pg_shard_manager.init_meta_coll(ch, store.get_sharded_store());
+    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+    get_pg_shard_manager().init_meta_coll(ch, store.get_sharded_store());
     return seastar::now();
   });
 }
@@ -354,11 +355,26 @@ seastar::future<> OSD::start()
   logger().info("start");
 
   startup_time = ceph::mono_clock::now();
-
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
   return store.start().then([this] {
-    return pg_shard_manager.start(
-    whoami, *cluster_msgr,
-    *public_msgr, *monc, *mgrc, store);
+    return osd_singleton_state.start_single(
+      whoami, std::ref(*cluster_msgr), std::ref(*public_msgr),
+      std::ref(*monc), std::ref(*mgrc)
+    ).then([this] {
+      ceph::mono_time startup_time = ceph::mono_clock::now();
+      return shard_services.start(
+        std::ref(osd_singleton_state),
+        whoami,
+        startup_time,
+        osd_singleton_state.local().perf,
+        osd_singleton_state.local().recoverystate_perf,
+        std::ref(store));
+    }).then([this] {
+      return shard_dispatchers.start(
+        std::ref(*this),
+        whoami,
+        std::ref(store));
+    });
   }).then([this] {
     heartbeat.reset(new Heartbeat{
        whoami, get_shard_services(),
@@ -373,21 +389,21 @@ seastar::future<> OSD::start()
   }).then([this] {
     return open_meta_coll();
   }).then([this] {
-    return pg_shard_manager.get_meta_coll().load_superblock(
+    return get_pg_shard_manager().get_meta_coll().load_superblock(
     ).handle_error(
       crimson::ct_error::assert_all("open_meta_coll error")
     );
   }).then([this](OSDSuperblock&& sb) {
     superblock = std::move(sb);
-    pg_shard_manager.set_superblock(superblock);
-    return pg_shard_manager.get_local_map(superblock.current_epoch);
+    get_pg_shard_manager().set_superblock(superblock);
+    return get_pg_shard_manager().get_local_map(superblock.current_epoch);
   }).then([this](OSDMapService::local_cached_map_t&& map) {
     osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(map));
-    return pg_shard_manager.update_map(std::move(map));
+    return get_pg_shard_manager().update_map(std::move(map));
   }).then([this] {
-    pg_shard_manager.got_map(osdmap->get_epoch());
+    get_pg_shard_manager().got_map(osdmap->get_epoch());
     bind_epoch = osdmap->get_epoch();
-    return pg_shard_manager.load_pgs(store);
+    return get_pg_shard_manager().load_pgs(store);
   }).then([this] {
 
     uint64_t osd_required =
@@ -462,7 +478,7 @@ seastar::future<> OSD::start()
 
 seastar::future<> OSD::start_boot()
 {
-  pg_shard_manager.set_preboot();
+  get_pg_shard_manager().set_preboot();
   return monc->get_version("osdmap").then([this](auto&& ret) {
     auto [newest, oldest] = ret;
     return _preboot(oldest, newest);
@@ -504,7 +520,7 @@ seastar::future<> OSD::_preboot(version_t oldest, version_t newest)
 
 seastar::future<> OSD::_send_boot()
 {
-  pg_shard_manager.set_booting();
+  get_pg_shard_manager().set_booting();
 
   entity_addrvec_t public_addrs = public_msgr->get_myaddrs();
   entity_addrvec_t cluster_addrs = cluster_msgr->get_myaddrs();
@@ -584,10 +600,12 @@ seastar::future<> OSD::_add_me_to_crush()
   });
 }
 
-seastar::future<> OSD::handle_command(crimson::net::ConnectionRef conn,
-                                     Ref<MCommand> m)
+seastar::future<> OSD::ShardDispatcher::handle_command(
+  crimson::net::ConnectionRef conn,
+  Ref<MCommand> m)
 {
-  return asok->handle_command(conn, std::move(m));
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+  return osd.asok->handle_command(conn, std::move(m));
 }
 
 /*
@@ -607,7 +625,7 @@ seastar::future<> OSD::start_asok_admin()
     asok->register_command(make_asok_hook<SendBeaconHook>(*this));
     asok->register_command(make_asok_hook<FlushPgStatsHook>(*this));
     asok->register_command(
-      make_asok_hook<DumpPGStateHistory>(std::as_const(pg_shard_manager)));
+      make_asok_hook<DumpPGStateHistory>(std::as_const(get_pg_shard_manager())));
     asok->register_command(make_asok_hook<DumpMetricsHook>());
     asok->register_command(make_asok_hook<DumpPerfCountersHook>());
     asok->register_command(make_asok_hook<InjectDataErrorHook>(get_shard_services()));
@@ -618,7 +636,7 @@ seastar::future<> OSD::start_asok_admin()
     // ops commands
     asok->register_command(
       make_asok_hook<DumpInFlightOpsHook>(
-       std::as_const(pg_shard_manager)));
+       std::as_const(get_pg_shard_manager())));
     asok->register_command(
       make_asok_hook<DumpHistoricOpsHook>(
        std::as_const(get_shard_services().get_registry())));
@@ -637,7 +655,7 @@ seastar::future<> OSD::stop()
   tick_timer.cancel();
   // see also OSD::shutdown()
   return prepare_to_stop().then([this] {
-    pg_shard_manager.set_stopping();
+    get_pg_shard_manager().set_stopping();
     logger().debug("prepared to stop");
     public_msgr->stop();
     cluster_msgr->stop();
@@ -645,19 +663,24 @@ seastar::future<> OSD::stop()
     return asok->stop().then([this] {
       return heartbeat->stop();
     }).then([this] {
-      return pg_shard_manager.stop_registries();
+      return get_pg_shard_manager().stop_registries();
     }).then([this] {
       return store.umount();
     }).then([this] {
       return store.stop();
     }).then([this] {
-      return pg_shard_manager.stop_pgs();
+      return get_pg_shard_manager().stop_pgs();
     }).then([this] {
       return monc->stop();
     }).then([this] {
       return mgrc->stop();
     }).then([this] {
-      return pg_shard_manager.stop();
+      ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+      return shard_dispatchers.stop();
+    }).then([this] {
+      return shard_services.stop();
+    }).then([this] {
+      return osd_singleton_state.stop();
     }).then([fut=std::move(gate_close_fut)]() mutable {
       return std::move(fut);
     }).then([this] {
@@ -675,12 +698,12 @@ void OSD::dump_status(Formatter* f) const
   f->dump_stream("cluster_fsid") << superblock.cluster_fsid;
   f->dump_stream("osd_fsid") << superblock.osd_fsid;
   f->dump_unsigned("whoami", superblock.whoami);
-  f->dump_string("state", pg_shard_manager.get_osd_state_string());
+  f->dump_string("state", get_pg_shard_manager().get_osd_state_string());
   f->dump_unsigned("oldest_map", superblock.oldest_map);
   f->dump_unsigned("cluster_osdmap_trim_lower_bound",
                    superblock.cluster_osdmap_trim_lower_bound);
   f->dump_unsigned("newest_map", superblock.newest_map);
-  f->dump_unsigned("num_pgs", pg_shard_manager.get_num_pgs());
+  f->dump_unsigned("num_pgs", get_pg_shard_manager().get_num_pgs());
 }
 
 void OSD::print(std::ostream& out) const
@@ -689,87 +712,140 @@ void OSD::print(std::ostream& out) const
       << superblock.osd_fsid << " [" << superblock.oldest_map
       << "," << superblock.newest_map << "] "
       << "tlb:" << superblock.cluster_osdmap_trim_lower_bound
-      << " pgs:" << pg_shard_manager.get_num_pgs()
+      << " pgs:" << get_pg_shard_manager().get_num_pgs()
+      << "}";
+}
+
+void OSD::ShardDispatcher::print(std::ostream& out) const
+{
+  out << "{osd." << osd.superblock.whoami << " "
+      << osd.superblock.osd_fsid << " [" << osd.superblock.oldest_map
+      << "," << osd.superblock.newest_map << "] "
+      << " pgs:" << get_pg_shard_manager().get_num_pgs()
       << "}";
 }
 
 std::optional<seastar::future<>>
 OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 {
-  if (pg_shard_manager.is_stopping()) {
-    return {};
-  }
-  // XXX: we're assuming the `switch` part is executed immediately, and thus
-  // we won't smash the stack. Taking into account how `seastar::with_gate`
-  // is currently implemented, this seems to be the case (Summer 2022).
+  assert(seastar::this_shard_id() == PRIMARY_CORE);
   bool dispatched = true;
   gate.dispatch_in_background(__func__, *this, [this, conn=std::move(conn),
                                                 m=std::move(m), &dispatched] {
     switch (m->get_type()) {
-    case CEPH_MSG_OSD_MAP:
-      return handle_osd_map(boost::static_pointer_cast<MOSDMap>(m));
-    case CEPH_MSG_OSD_OP:
-      return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
-    case MSG_OSD_PG_CREATE2:
-      return handle_pg_create(
-       conn, boost::static_pointer_cast<MOSDPGCreate2>(m));
-      return seastar::now();
-    case MSG_COMMAND:
-      return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
-    case MSG_OSD_MARK_ME_DOWN:
-      return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m));
-    case MSG_OSD_PG_PULL:
-      [[fallthrough]];
-    case MSG_OSD_PG_PUSH:
-      [[fallthrough]];
-    case MSG_OSD_PG_PUSH_REPLY:
-      [[fallthrough]];
-    case MSG_OSD_PG_RECOVERY_DELETE:
-      [[fallthrough]];
-    case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
-      [[fallthrough]];
-    case MSG_OSD_PG_SCAN:
-      [[fallthrough]];
-    case MSG_OSD_PG_BACKFILL:
-      [[fallthrough]];
-    case MSG_OSD_PG_BACKFILL_REMOVE:
-      return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
-    case MSG_OSD_PG_LEASE:
-      [[fallthrough]];
-    case MSG_OSD_PG_LEASE_ACK:
-      [[fallthrough]];
-    case MSG_OSD_PG_NOTIFY2:
-      [[fallthrough]];
-    case MSG_OSD_PG_INFO2:
-      [[fallthrough]];
-    case MSG_OSD_PG_QUERY2:
-      [[fallthrough]];
-    case MSG_OSD_BACKFILL_RESERVE:
-      [[fallthrough]];
-    case MSG_OSD_RECOVERY_RESERVE:
-      [[fallthrough]];
-    case MSG_OSD_PG_LOG:
-      return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
-    case MSG_OSD_REPOP:
-      return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
-    case MSG_OSD_REPOPREPLY:
-      return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
-    case MSG_OSD_SCRUB2:
-      return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
-    case MSG_OSD_PG_UPDATE_LOG_MISSING:
-      return handle_update_log_missing(conn, boost::static_pointer_cast<
-        MOSDPGUpdateLogMissing>(m));
-    case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
-      return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
-        MOSDPGUpdateLogMissingReply>(m));
-    default:
-      dispatched = false;
-      return seastar::now();
+      case CEPH_MSG_OSD_MAP:
+      case CEPH_MSG_OSD_OP:
+      case MSG_OSD_PG_CREATE2:
+      case MSG_COMMAND:
+      case MSG_OSD_MARK_ME_DOWN:
+      case MSG_OSD_PG_PULL:
+      case MSG_OSD_PG_PUSH:
+      case MSG_OSD_PG_PUSH_REPLY:
+      case MSG_OSD_PG_RECOVERY_DELETE:
+      case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+      case MSG_OSD_PG_SCAN:
+      case MSG_OSD_PG_BACKFILL:
+      case MSG_OSD_PG_BACKFILL_REMOVE:
+      case MSG_OSD_PG_LEASE:
+      case MSG_OSD_PG_LEASE_ACK:
+      case MSG_OSD_PG_NOTIFY2:
+      case MSG_OSD_PG_INFO2:
+      case MSG_OSD_PG_QUERY2:
+      case MSG_OSD_BACKFILL_RESERVE:
+      case MSG_OSD_RECOVERY_RESERVE:
+      case MSG_OSD_PG_LOG:
+      case MSG_OSD_REPOP:
+      case MSG_OSD_REPOPREPLY:
+      case MSG_OSD_SCRUB2:
+      case MSG_OSD_PG_UPDATE_LOG_MISSING:
+      case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+      {
+        return shard_dispatchers.invoke_on(0,
+          [conn, m = std::move(m)]
+          (auto &local_dispatcher) mutable ->seastar::future<>{
+          return local_dispatcher.ms_dispatch(std::move(conn), m);
+        });
+      }
+      default:
+      {
+          dispatched = false;
+          return seastar::now();
+      }
     }
   });
   return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
 }
 
+seastar::future<>
+OSD::ShardDispatcher::ms_dispatch(
+  crimson::net::ConnectionRef conn,
+   MessageRef m)
+{
+  if (pg_shard_manager.is_stopping()) {
+    return seastar::now();
+  }
+  switch (m->get_type()) {
+  case CEPH_MSG_OSD_MAP:
+    return handle_osd_map(boost::static_pointer_cast<MOSDMap>(m));
+  case CEPH_MSG_OSD_OP:
+    return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
+  case MSG_OSD_PG_CREATE2:
+    return handle_pg_create(
+      conn, boost::static_pointer_cast<MOSDPGCreate2>(m));
+    return seastar::now();
+  case MSG_COMMAND:
+    return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
+  case MSG_OSD_MARK_ME_DOWN:
+    return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m));
+  case MSG_OSD_PG_PULL:
+    [[fallthrough]];
+  case MSG_OSD_PG_PUSH:
+    [[fallthrough]];
+  case MSG_OSD_PG_PUSH_REPLY:
+    [[fallthrough]];
+  case MSG_OSD_PG_RECOVERY_DELETE:
+    [[fallthrough]];
+  case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+    [[fallthrough]];
+  case MSG_OSD_PG_SCAN:
+    [[fallthrough]];
+  case MSG_OSD_PG_BACKFILL:
+    [[fallthrough]];
+  case MSG_OSD_PG_BACKFILL_REMOVE:
+    return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
+  case MSG_OSD_PG_LEASE:
+    [[fallthrough]];
+  case MSG_OSD_PG_LEASE_ACK:
+    [[fallthrough]];
+  case MSG_OSD_PG_NOTIFY2:
+    [[fallthrough]];
+  case MSG_OSD_PG_INFO2:
+    [[fallthrough]];
+  case MSG_OSD_PG_QUERY2:
+    [[fallthrough]];
+  case MSG_OSD_BACKFILL_RESERVE:
+    [[fallthrough]];
+  case MSG_OSD_RECOVERY_RESERVE:
+    [[fallthrough]];
+  case MSG_OSD_PG_LOG:
+    return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
+  case MSG_OSD_REPOP:
+    return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
+  case MSG_OSD_REPOPREPLY:
+    return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
+  case MSG_OSD_SCRUB2:
+    return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
+  case MSG_OSD_PG_UPDATE_LOG_MISSING:
+    return handle_update_log_missing(conn, boost::static_pointer_cast<
+      MOSDPGUpdateLogMissing>(m));
+  case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+    return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
+      MOSDPGUpdateLogMissingReply>(m));
+  default:
+    return seastar::now();
+  }
+}
+
 void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
 {
   // TODO: cleanup the session attached to this connection
@@ -827,7 +903,7 @@ seastar::future<MessageURef> OSD::get_stats() const
   // MPGStats::had_map_for is not used since PGMonitor was removed
   auto m = crimson::make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
   m->osd_stat = osd_stat;
-  return pg_shard_manager.get_pg_stats(
+  return get_pg_shard_manager().get_pg_stats(
   ).then([m=std::move(m)](auto &&stats) mutable {
     m->pg_stat = std::move(stats);
     return seastar::make_ready_future<MessageURef>(std::move(m));
@@ -841,7 +917,9 @@ uint64_t OSD::send_pg_stats()
   return osd_stat.seq;
 }
 
-bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref<Message> m)
+bool OSD::ShardDispatcher::require_mon_peer(
+  crimson::net::Connection *conn,
+  Ref<Message> m)
 {
   if (!conn->peer_is_mon()) {
     logger().info("{} received from non-mon {}, {}",
@@ -853,7 +931,7 @@ bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref<Message> m)
   return true;
 }
 
-seastar::future<> OSD::handle_osd_map(Ref<MOSDMap> m)
+seastar::future<> OSD::ShardDispatcher::handle_osd_map(Ref<MOSDMap> m)
 {
   /* Ensure that only one MOSDMap is processed at a time.  Allowing concurrent
   * processing may eventually be worthwhile, but such an implementation would
@@ -864,17 +942,18 @@ seastar::future<> OSD::handle_osd_map(Ref<MOSDMap> m)
   * simpler invariant for now.
   * See https://tracker.ceph.com/issues/59165
   */
-  return handle_osd_map_lock.lock().then([this, m] {
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+  return osd.handle_osd_map_lock.lock().then([this, m] {
     return _handle_osd_map(m);
   }).finally([this] {
-    return handle_osd_map_lock.unlock();
+    return osd.handle_osd_map_lock.unlock();
   });
 }
 
-seastar::future<> OSD::_handle_osd_map(Ref<MOSDMap> m)
+seastar::future<> OSD::ShardDispatcher::_handle_osd_map(Ref<MOSDMap> m)
 {
   logger().info("handle_osd_map {}", *m);
-  if (m->fsid != superblock.cluster_fsid) {
+  if (m->fsid != osd.superblock.cluster_fsid) {
     logger().warn("fsid mismatched");
     return seastar::now();
   }
@@ -886,16 +965,16 @@ seastar::future<> OSD::_handle_osd_map(Ref<MOSDMap> m)
   const auto first = m->get_first();
   const auto last = m->get_last();
   logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]",
-                first, last, superblock.newest_map,
+                first, last, osd.superblock.newest_map,
                 m->cluster_osdmap_trim_lower_bound, m->newest_map);
   // make sure there is something new, here, before we bother flushing
   // the queues and such
-  if (last <= superblock.newest_map) {
+  if (last <= osd.superblock.newest_map) {
     return seastar::now();
   }
   // missing some?
   bool skip_maps = false;
-  epoch_t start = superblock.newest_map + 1;
+  epoch_t start = osd.superblock.newest_map + 1;
   if (first > start) {
     logger().info("handle_osd_map message skips epochs {}..{}",
                   start, first - 1);
@@ -918,20 +997,20 @@ seastar::future<> OSD::_handle_osd_map(Ref<MOSDMap> m)
                           [=, this](auto& t) {
     return pg_shard_manager.store_maps(t, start, m).then([=, this, &t] {
       // even if this map isn't from a mon, we may have satisfied our subscription
-      monc->sub_got("osdmap", last);
-      if (!superblock.oldest_map || skip_maps) {
-        superblock.oldest_map = first;
+      osd.monc->sub_got("osdmap", last);
+      if (!osd.superblock.oldest_map || skip_maps) {
+        osd.superblock.oldest_map = first;
       }
-      superblock.newest_map = last;
-      superblock.current_epoch = last;
+      osd.superblock.newest_map = last;
+      osd.superblock.current_epoch = last;
 
       // note in the superblock that we were clean thru the prior epoch
-      if (boot_epoch && boot_epoch >= superblock.mounted) {
-        superblock.mounted = boot_epoch;
-        superblock.clean_thru = last;
+      if (osd.boot_epoch && osd.boot_epoch >= osd.superblock.mounted) {
+        osd.superblock.mounted = osd.boot_epoch;
+        osd.superblock.clean_thru = last;
       }
-      pg_shard_manager.get_meta_coll().store_superblock(t, superblock);
-      pg_shard_manager.set_superblock(superblock);
+      pg_shard_manager.get_meta_coll().store_superblock(t, osd.superblock);
+      pg_shard_manager.set_superblock(osd.superblock);
       logger().debug("OSD::handle_osd_map: do_transaction...");
       return store.get_sharded_store().do_transaction(
        pg_shard_manager.get_meta_coll().collection(),
@@ -943,10 +1022,12 @@ seastar::future<> OSD::_handle_osd_map(Ref<MOSDMap> m)
   });
 }
 
-seastar::future<> OSD::committed_osd_maps(version_t first,
-                                          version_t last,
-                                          Ref<MOSDMap> m)
+seastar::future<> OSD::ShardDispatcher::committed_osd_maps(
+  version_t first,
+  version_t last,
+  Ref<MOSDMap> m)
 {
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
   logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
   // advance through the new maps
   return seastar::do_for_each(boost::make_counting_iterator(first),
@@ -955,17 +1036,17 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
     return pg_shard_manager.get_local_map(
       cur
     ).then([this](OSDMapService::local_cached_map_t&& o) {
-      osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o));
+      osd.osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o));
       return pg_shard_manager.update_map(std::move(o));
     }).then([this] {
       if (get_shard_services().get_up_epoch() == 0 &&
-         osdmap->is_up(whoami) &&
-         osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
+         osd.osdmap->is_up(whoami) &&
+         osd.osdmap->get_addrs(whoami) == osd.public_msgr->get_myaddrs()) {
        return pg_shard_manager.set_up_epoch(
-         osdmap->get_epoch()
+         osd.osdmap->get_epoch()
        ).then([this] {
-         if (!boot_epoch) {
-           boot_epoch = osdmap->get_epoch();
+         if (!osd.boot_epoch) {
+           osd.boot_epoch = osd.osdmap->get_epoch();
          }
        });
       } else {
@@ -973,43 +1054,43 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
       }
     });
   }).then([m, this] {
-    if (osdmap->is_up(whoami)) {
-      const auto up_from = osdmap->get_up_from(whoami);
+    if (osd.osdmap->is_up(whoami)) {
+      const auto up_from = osd.osdmap->get_up_from(whoami);
       logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
-                    whoami, osdmap->get_epoch(), up_from, bind_epoch,
+                    whoami, osd.osdmap->get_epoch(), up_from, osd.bind_epoch,
                    pg_shard_manager.get_osd_state_string());
-      if (bind_epoch < up_from &&
-          osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
+      if (osd.bind_epoch < up_from &&
+          osd.osdmap->get_addrs(whoami) == osd.public_msgr->get_myaddrs() &&
           pg_shard_manager.is_booting()) {
         logger().info("osd.{}: activating...", whoami);
         pg_shard_manager.set_active();
-        beacon_timer.arm_periodic(
+        osd.beacon_timer.arm_periodic(
           std::chrono::seconds(local_conf()->osd_beacon_report_interval));
        // timer continuation rearms when complete
-        tick_timer.arm(
+        osd.tick_timer.arm(
           std::chrono::seconds(TICK_INTERVAL));
       }
     } else {
       if (pg_shard_manager.is_prestop()) {
-       got_stop_ack();
+       osd.got_stop_ack();
        return seastar::now();
       }
     }
     return check_osdmap_features().then([this] {
       // yay!
       logger().info("osd.{}: committed_osd_maps: broadcasting osdmaps up"
-                    " to {} epoch to pgs", whoami, osdmap->get_epoch());
-      return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch());
+                    " to {} epoch to pgs", whoami, osd.osdmap->get_epoch());
+      return pg_shard_manager.broadcast_map_to_pgs(osd.osdmap->get_epoch());
     });
   }).then([m, this] {
     if (pg_shard_manager.is_active()) {
       logger().info("osd.{}: now active", whoami);
-      if (!osdmap->exists(whoami) ||
-         osdmap->is_stop(whoami)) {
-        return shutdown();
+      if (!osd.osdmap->exists(whoami) ||
+         osd.osdmap->is_stop(whoami)) {
+        return osd.shutdown();
       }
-      if (should_restart()) {
-        return restart();
+      if (osd.should_restart()) {
+        return osd.restart();
       } else {
         return seastar::now();
       }
@@ -1017,11 +1098,11 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
       logger().info("osd.{}: now preboot", whoami);
 
       if (m->get_source().is_mon()) {
-        return _preboot(
+        return osd._preboot(
           m->cluster_osdmap_trim_lower_bound, m->newest_map);
       } else {
         logger().info("osd.{}: start_boot", whoami);
-        return start_boot();
+        return osd.start_boot();
       }
     } else {
       logger().info("osd.{}: now {}", whoami,
@@ -1032,8 +1113,9 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
   });
 }
 
-seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
-                                     Ref<MOSDOp> m)
+seastar::future<> OSD::ShardDispatcher::handle_osd_op(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDOp> m)
 {
   (void) pg_shard_manager.start_pg_operation<ClientRequest>(
     get_shard_services(),
@@ -1042,8 +1124,9 @@ seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
   return seastar::now();
 }
 
-seastar::future<> OSD::handle_pg_create(crimson::net::ConnectionRef conn,
-                                       Ref<MOSDPGCreate2> m)
+seastar::future<> OSD::ShardDispatcher::handle_pg_create(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDPGCreate2> m)
 {
   for (auto& [pgid, when] : m->pgs) {
     const auto &[created, created_stamp] = when;
@@ -1077,7 +1160,7 @@ seastar::future<> OSD::handle_pg_create(crimson::net::ConnectionRef conn,
   return seastar::now();
 }
 
-seastar::future<> OSD::handle_update_log_missing(
+seastar::future<> OSD::ShardDispatcher::handle_update_log_missing(
   crimson::net::ConnectionRef conn,
   Ref<MOSDPGUpdateLogMissing> m)
 {
@@ -1088,7 +1171,7 @@ seastar::future<> OSD::handle_update_log_missing(
   return seastar::now();
 }
 
-seastar::future<> OSD::handle_update_log_missing_reply(
+seastar::future<> OSD::ShardDispatcher::handle_update_log_missing_reply(
   crimson::net::ConnectionRef conn,
   Ref<MOSDPGUpdateLogMissingReply> m)
 {
@@ -1099,8 +1182,9 @@ seastar::future<> OSD::handle_update_log_missing_reply(
   return seastar::now();
 }
 
-seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn,
-                                    Ref<MOSDRepOp> m)
+seastar::future<> OSD::ShardDispatcher::handle_rep_op(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDRepOp> m)
 {
   m->finish_decode();
   std::ignore = pg_shard_manager.start_pg_operation<RepRequest>(
@@ -1109,8 +1193,9 @@ seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn,
   return seastar::now();
 }
 
-seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn,
-                                          Ref<MOSDRepOpReply> m)
+seastar::future<> OSD::ShardDispatcher::handle_rep_op_reply(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDRepOpReply> m)
 {
   spg_t pgid = m->get_spg();
   return pg_shard_manager.with_pg(
@@ -1126,10 +1211,11 @@ seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn,
     });
 }
 
-seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
-                                   Ref<MOSDScrub2> m)
+seastar::future<> OSD::ShardDispatcher::handle_scrub(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDScrub2> m)
 {
-  if (m->fsid != superblock.cluster_fsid) {
+  if (m->fsid != osd.superblock.cluster_fsid) {
     logger().warn("fsid mismatched");
     return seastar::now();
   }
@@ -1146,17 +1232,20 @@ seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
   });
 }
 
-seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn,
-                                          Ref<MOSDMarkMeDown> m)
+seastar::future<> OSD::ShardDispatcher::handle_mark_me_down(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDMarkMeDown> m)
 {
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
   if (pg_shard_manager.is_prestop()) {
-    got_stop_ack();
+    osd.got_stop_ack();
   }
   return seastar::now();
 }
 
-seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn,
-                                  Ref<MOSDFastDispatchOp> m)
+seastar::future<> OSD::ShardDispatcher::handle_recovery_subreq(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDFastDispatchOp> m)
 {
   std::ignore = pg_shard_manager.start_pg_operation<RecoverySubRequest>(
     conn, std::move(m));
@@ -1190,7 +1279,7 @@ seastar::future<> OSD::restart()
 {
   beacon_timer.cancel();
   tick_timer.cancel();
-  return pg_shard_manager.set_up_epoch(
+  return get_pg_shard_manager().set_up_epoch(
     0
   ).then([this] {
     bind_epoch = osdmap->get_epoch();
@@ -1209,7 +1298,7 @@ seastar::future<> OSD::shutdown()
 
 seastar::future<> OSD::send_beacon()
 {
-  if (!pg_shard_manager.is_active()) {
+  if (!get_pg_shard_manager().is_active()) {
     return seastar::now();
   }
   // FIXME: min lec should be calculated from pg_stat
@@ -1224,11 +1313,11 @@ seastar::future<> OSD::send_beacon()
 
 seastar::future<> OSD::update_heartbeat_peers()
 {
-  if (!pg_shard_manager.is_active()) {
+  if (!get_pg_shard_manager().is_active()) {
     return seastar::now();;
   }
 
-  pg_shard_manager.for_each_pgid([this](auto &pgid) {
+  get_pg_shard_manager().for_each_pgid([this](auto &pgid) {
     vector<int> up, acting;
     osdmap->pg_to_up_acting_osds(pgid.pgid,
                                  &up, nullptr,
@@ -1245,7 +1334,7 @@ seastar::future<> OSD::update_heartbeat_peers()
   return seastar::now();
 }
 
-seastar::future<> OSD::handle_peering_op(
+seastar::future<> OSD::ShardDispatcher::handle_peering_op(
   crimson::net::ConnectionRef conn,
   Ref<MOSDPeeringOp> m)
 {
@@ -1261,16 +1350,16 @@ seastar::future<> OSD::handle_peering_op(
   return seastar::now();
 }
 
-seastar::future<> OSD::check_osdmap_features()
+seastar::future<> OSD::ShardDispatcher::check_osdmap_features()
 {
   return store.write_meta("require_osd_release",
-                          stringify((int)osdmap->require_osd_release));
+                          stringify((int)osd.osdmap->require_osd_release));
 }
 
 seastar::future<> OSD::prepare_to_stop()
 {
   if (osdmap && osdmap->is_up(whoami)) {
-    pg_shard_manager.set_prestop();
+    get_pg_shard_manager().set_prestop();
     const auto timeout =
       std::chrono::duration_cast<std::chrono::milliseconds>(
        std::chrono::duration<double>(
index abf4a00844d3c08141b7d7d3f5ea7239c38cfed3..1093cab34105d1caf3169bedbc0cf721e7474f6f 100644 (file)
@@ -61,6 +61,81 @@ class PG;
 class OSD final : public crimson::net::Dispatcher,
                  private crimson::common::AuthHandler,
                  private crimson::mgr::WithStats {
+public:
+  class ShardDispatcher
+    : public seastar::peering_sharded_service<ShardDispatcher> {
+  friend class OSD;
+  public:
+    ShardDispatcher(
+      OSD& osd,
+      int whoami,
+      crimson::os::FuturizedStore& store)
+    : pg_shard_manager(osd.osd_singleton_state, osd.shard_services),
+      osd(osd),
+      whoami(whoami),
+      store(store) {}
+    ~ShardDispatcher() = default;
+
+    // Dispatcher methods
+    seastar::future<> ms_dispatch(crimson::net::ConnectionRef,
+                                                 MessageRef);
+
+  private:
+    bool require_mon_peer(crimson::net::Connection *conn, Ref<Message> m);
+
+    seastar::future<> handle_osd_map(Ref<MOSDMap> m);
+    seastar::future<> _handle_osd_map(Ref<MOSDMap> m);
+    seastar::future<> handle_pg_create(crimson::net::ConnectionRef conn,
+                                       Ref<MOSDPGCreate2> m);
+    seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn,
+                                    Ref<MOSDOp> m);
+    seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn,
+                                    Ref<MOSDRepOp> m);
+    seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn,
+                                          Ref<MOSDRepOpReply> m);
+    seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn,
+                                        Ref<MOSDPeeringOp> m);
+    seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn,
+                                             Ref<MOSDFastDispatchOp> m);
+    seastar::future<> handle_scrub(crimson::net::ConnectionRef conn,
+                                   Ref<MOSDScrub2> m);
+    seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn,
+                                          Ref<MOSDMarkMeDown> m);
+
+    seastar::future<> committed_osd_maps(version_t first,
+                                         version_t last,
+                                         Ref<MOSDMap> m);
+
+    seastar::future<> check_osdmap_features();
+
+    seastar::future<> handle_command(crimson::net::ConnectionRef conn,
+                                     Ref<MCommand> m);
+    seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn,
+                                                Ref<MOSDPGUpdateLogMissing> m);
+    seastar::future<> handle_update_log_missing_reply(
+      crimson::net::ConnectionRef conn,
+      Ref<MOSDPGUpdateLogMissingReply> m);
+
+  public:
+    void print(std::ostream&) const;
+
+    auto &get_pg_shard_manager() {
+      return pg_shard_manager;
+    }
+    auto &get_pg_shard_manager() const {
+      return pg_shard_manager;
+    }
+    ShardServices &get_shard_services() {
+      return pg_shard_manager.get_shard_services();
+    }
+
+  private:
+    crimson::osd::PGShardManager pg_shard_manager;
+    OSD& osd;
+    const int whoami;
+    crimson::os::FuturizedStore& store;
+  };
+
   const int whoami;
   const uint32_t nonce;
   seastar::abort_source& abort_source;
@@ -110,7 +185,9 @@ class OSD final : public crimson::net::Dispatcher,
   void handle_authentication(const EntityName& name,
                             const AuthCapsInfo& caps) final;
 
-  crimson::osd::PGShardManager pg_shard_manager;
+  seastar::sharded<OSDSingletonState> osd_singleton_state;
+  seastar::sharded<ShardServices> shard_services;
+  seastar::sharded<ShardDispatcher> shard_dispatchers;
 
   std::unique_ptr<Heartbeat> heartbeat;
   seastar::timer<seastar::lowres_clock> tick_timer;
@@ -148,6 +225,21 @@ public:
   /// @return the seq id of the pg stats being sent
   uint64_t send_pg_stats();
 
+  auto &get_shard_services() {
+    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+    return shard_services.local();
+  }
+
+  auto &get_pg_shard_manager() {
+    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+    return shard_dispatchers.local().get_pg_shard_manager();
+  }
+
+  auto &get_pg_shard_manager() const {
+    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+    return shard_dispatchers.local().get_pg_shard_manager();
+  }
+
 private:
   static seastar::future<> _write_superblock(
     crimson::os::FuturizedStore &store,
@@ -163,52 +255,11 @@ private:
 
   seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
 
+  seastar::future<> start_asok_admin();
+
   void write_superblock(ceph::os::Transaction& t);
   seastar::future<> read_superblock();
 
-  bool require_mon_peer(crimson::net::Connection *conn, Ref<Message> m);
-
-  seastar::future<> handle_osd_map(Ref<MOSDMap> m);
-  seastar::future<> _handle_osd_map(Ref<MOSDMap> m);
-  seastar::future<> handle_pg_create(crimson::net::ConnectionRef conn,
-                                    Ref<MOSDPGCreate2> m);
-  seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn,
-                                 Ref<MOSDOp> m);
-  seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn,
-                                 Ref<MOSDRepOp> m);
-  seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn,
-                                       Ref<MOSDRepOpReply> m);
-  seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn,
-                                     Ref<MOSDPeeringOp> m);
-  seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn,
-                                          Ref<MOSDFastDispatchOp> m);
-  seastar::future<> handle_scrub(crimson::net::ConnectionRef conn,
-                                Ref<MOSDScrub2> m);
-  seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn,
-                                       Ref<MOSDMarkMeDown> m);
-
-  seastar::future<> committed_osd_maps(version_t first,
-                                       version_t last,
-                                       Ref<MOSDMap> m);
-
-  seastar::future<> check_osdmap_features();
-
-  seastar::future<> handle_command(crimson::net::ConnectionRef conn,
-                                  Ref<MCommand> m);
-  seastar::future<> start_asok_admin();
-  seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn,
-                                              Ref<MOSDPGUpdateLogMissing> m);
-  seastar::future<> handle_update_log_missing_reply(
-    crimson::net::ConnectionRef conn,
-    Ref<MOSDPGUpdateLogMissingReply> m);
-public:
-  auto &get_pg_shard_manager() {
-    return pg_shard_manager;
-  }
-  ShardServices &get_shard_services() {
-    return pg_shard_manager.get_shard_services();
-  }
-
 private:
   crimson::common::Gated gate;
 
@@ -236,8 +287,15 @@ inline std::ostream& operator<<(std::ostream& out, const OSD& osd) {
   return out;
 }
 
+inline std::ostream& operator<<(std::ostream& out,
+                                const OSD::ShardDispatcher& shard_dispatcher) {
+  shard_dispatcher.print(out);
+  return out;
+}
+
 }
 
 #if FMT_VERSION >= 90000
 template <> struct fmt::formatter<crimson::osd::OSD> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<crimson::osd::OSD::ShardDispatcher> : fmt::ostream_formatter {};
 #endif
index 01dbd11c21458204f0c796018c0c0b43654194ec..e0ef237489145a3f679f659d62639581c01d0e27 100644 (file)
@@ -12,39 +12,6 @@ namespace {
 
 namespace crimson::osd {
 
-seastar::future<> PGShardManager::start(
-  const int whoami,
-  crimson::net::Messenger &cluster_msgr,
-  crimson::net::Messenger &public_msgr,
-  crimson::mon::Client &monc,
-  crimson::mgr::Client &mgrc,
-  crimson::os::FuturizedStore &store)
-{
-  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
-  return osd_singleton_state.start_single(
-    whoami, std::ref(cluster_msgr), std::ref(public_msgr),
-    std::ref(monc), std::ref(mgrc)
-  ).then([this, whoami, &store] {
-    ceph::mono_time startup_time = ceph::mono_clock::now();
-    return shard_services.start(
-      std::ref(osd_singleton_state),
-      whoami,
-      startup_time,
-      osd_singleton_state.local().perf,
-      osd_singleton_state.local().recoverystate_perf,
-      std::ref(store));
-  });
-}
-
-seastar::future<> PGShardManager::stop()
-{
-  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
-  return shard_services.stop(
-  ).then([this] {
-    return osd_singleton_state.stop();
-  });
-}
-
 seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store)
 {
   ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
index dcd91c10553c77860b071025f419308b901fbc77..9526ddcd05af55c4fb38311c078f736473e71629 100644 (file)
@@ -24,8 +24,8 @@ namespace crimson::osd {
  * etc)
  */
 class PGShardManager {
-  seastar::sharded<OSDSingletonState> osd_singleton_state;
-  seastar::sharded<ShardServices> shard_services;
+  seastar::sharded<OSDSingletonState> &osd_singleton_state;
+  seastar::sharded<ShardServices> &shard_services;
 
 #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET)          \
   template <typename... Args>                                  \
@@ -46,16 +46,11 @@ public:
   using cached_map_t = OSDMapService::cached_map_t;
   using local_cached_map_t = OSDMapService::local_cached_map_t;
 
-  PGShardManager() = default;
-
-  seastar::future<> start(
-    const int whoami,
-    crimson::net::Messenger &cluster_msgr,
-    crimson::net::Messenger &public_msgr,
-    crimson::mon::Client &monc,
-    crimson::mgr::Client &mgrc,
-    crimson::os::FuturizedStore &store);
-  seastar::future<> stop();
+  PGShardManager(
+    seastar::sharded<OSDSingletonState> &osd_singleton_state,
+    seastar::sharded<ShardServices> &shard_services)
+  : osd_singleton_state(osd_singleton_state),
+    shard_services(shard_services) {}
 
   auto &get_osd_singleton_state() {
     ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
index 7ae20e88fa4d40a0d039e6d93936c1cf58e5bf9f..34b3a32e34b9817c65711f483247221bb3e1e152 100644 (file)
@@ -201,6 +201,7 @@ public:
 class OSDSingletonState : public md_config_obs_t {
   friend class ShardServices;
   friend class PGShardManager;
+  friend class OSD;
   using cached_map_t = OSDMapService::cached_map_t;
   using local_cached_map_t = OSDMapService::local_cached_map_t;