From 5f82fbbc72f840fa8642015a120485648aaf507c Mon Sep 17 00:00:00 2001 From: chunmei Date: Fri, 23 Jun 2023 03:18:15 +0000 Subject: [PATCH] crimson/osd: make osd sharded Signed-off-by: chunmei --- src/crimson/osd/osd.cc | 413 +++++++++++++++++----------- src/crimson/osd/osd.h | 146 +++++++--- src/crimson/osd/pg_shard_manager.cc | 33 --- src/crimson/osd/pg_shard_manager.h | 19 +- src/crimson/osd/shard_services.h | 1 + 5 files changed, 361 insertions(+), 251 deletions(-) diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index b9f96ade818ee..bf3a9e2d7c411 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -162,7 +162,8 @@ seastar::future<> OSD::open_meta_coll() return store.get_sharded_store().open_collection( coll_t::meta() ).then([this](auto ch) { - pg_shard_manager.init_meta_coll(ch, store.get_sharded_store()); + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + get_pg_shard_manager().init_meta_coll(ch, store.get_sharded_store()); return seastar::now(); }); } @@ -354,11 +355,26 @@ seastar::future<> OSD::start() logger().info("start"); startup_time = ceph::mono_clock::now(); - + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); return store.start().then([this] { - return pg_shard_manager.start( - whoami, *cluster_msgr, - *public_msgr, *monc, *mgrc, store); + return osd_singleton_state.start_single( + whoami, std::ref(*cluster_msgr), std::ref(*public_msgr), + std::ref(*monc), std::ref(*mgrc) + ).then([this] { + ceph::mono_time startup_time = ceph::mono_clock::now(); + return shard_services.start( + std::ref(osd_singleton_state), + whoami, + startup_time, + osd_singleton_state.local().perf, + osd_singleton_state.local().recoverystate_perf, + std::ref(store)); + }).then([this] { + return shard_dispatchers.start( + std::ref(*this), + whoami, + std::ref(store)); + }); }).then([this] { heartbeat.reset(new Heartbeat{ whoami, get_shard_services(), @@ -373,21 +389,21 @@ seastar::future<> OSD::start() }).then([this] { return open_meta_coll(); }).then([this] { - return pg_shard_manager.get_meta_coll().load_superblock( + return get_pg_shard_manager().get_meta_coll().load_superblock( ).handle_error( crimson::ct_error::assert_all("open_meta_coll error") ); }).then([this](OSDSuperblock&& sb) { superblock = std::move(sb); - pg_shard_manager.set_superblock(superblock); - return pg_shard_manager.get_local_map(superblock.current_epoch); + get_pg_shard_manager().set_superblock(superblock); + return get_pg_shard_manager().get_local_map(superblock.current_epoch); }).then([this](OSDMapService::local_cached_map_t&& map) { osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(map)); - return pg_shard_manager.update_map(std::move(map)); + return get_pg_shard_manager().update_map(std::move(map)); }).then([this] { - pg_shard_manager.got_map(osdmap->get_epoch()); + get_pg_shard_manager().got_map(osdmap->get_epoch()); bind_epoch = osdmap->get_epoch(); - return pg_shard_manager.load_pgs(store); + return get_pg_shard_manager().load_pgs(store); }).then([this] { uint64_t osd_required = @@ -462,7 +478,7 @@ seastar::future<> OSD::start() seastar::future<> OSD::start_boot() { - pg_shard_manager.set_preboot(); + get_pg_shard_manager().set_preboot(); return monc->get_version("osdmap").then([this](auto&& ret) { auto [newest, oldest] = ret; return _preboot(oldest, newest); @@ -504,7 +520,7 @@ seastar::future<> OSD::_preboot(version_t oldest, version_t newest) seastar::future<> OSD::_send_boot() { - pg_shard_manager.set_booting(); + get_pg_shard_manager().set_booting(); entity_addrvec_t public_addrs = public_msgr->get_myaddrs(); entity_addrvec_t cluster_addrs = cluster_msgr->get_myaddrs(); @@ -584,10 +600,12 @@ seastar::future<> OSD::_add_me_to_crush() }); } -seastar::future<> OSD::handle_command(crimson::net::ConnectionRef conn, - Ref m) +seastar::future<> OSD::ShardDispatcher::handle_command( + crimson::net::ConnectionRef conn, + Ref m) { - return asok->handle_command(conn, std::move(m)); + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return osd.asok->handle_command(conn, std::move(m)); } /* @@ -607,7 +625,7 @@ seastar::future<> OSD::start_asok_admin() asok->register_command(make_asok_hook(*this)); asok->register_command(make_asok_hook(*this)); asok->register_command( - make_asok_hook(std::as_const(pg_shard_manager))); + make_asok_hook(std::as_const(get_pg_shard_manager()))); asok->register_command(make_asok_hook()); asok->register_command(make_asok_hook()); asok->register_command(make_asok_hook(get_shard_services())); @@ -618,7 +636,7 @@ seastar::future<> OSD::start_asok_admin() // ops commands asok->register_command( make_asok_hook( - std::as_const(pg_shard_manager))); + std::as_const(get_pg_shard_manager()))); asok->register_command( make_asok_hook( std::as_const(get_shard_services().get_registry()))); @@ -637,7 +655,7 @@ seastar::future<> OSD::stop() tick_timer.cancel(); // see also OSD::shutdown() return prepare_to_stop().then([this] { - pg_shard_manager.set_stopping(); + get_pg_shard_manager().set_stopping(); logger().debug("prepared to stop"); public_msgr->stop(); cluster_msgr->stop(); @@ -645,19 +663,24 @@ seastar::future<> OSD::stop() return asok->stop().then([this] { return heartbeat->stop(); }).then([this] { - return pg_shard_manager.stop_registries(); + return get_pg_shard_manager().stop_registries(); }).then([this] { return store.umount(); }).then([this] { return store.stop(); }).then([this] { - return pg_shard_manager.stop_pgs(); + return get_pg_shard_manager().stop_pgs(); }).then([this] { return monc->stop(); }).then([this] { return mgrc->stop(); }).then([this] { - return pg_shard_manager.stop(); + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return shard_dispatchers.stop(); + }).then([this] { + return shard_services.stop(); + }).then([this] { + return osd_singleton_state.stop(); }).then([fut=std::move(gate_close_fut)]() mutable { return std::move(fut); }).then([this] { @@ -675,12 +698,12 @@ void OSD::dump_status(Formatter* f) const f->dump_stream("cluster_fsid") << superblock.cluster_fsid; f->dump_stream("osd_fsid") << superblock.osd_fsid; f->dump_unsigned("whoami", superblock.whoami); - f->dump_string("state", pg_shard_manager.get_osd_state_string()); + f->dump_string("state", get_pg_shard_manager().get_osd_state_string()); f->dump_unsigned("oldest_map", superblock.oldest_map); f->dump_unsigned("cluster_osdmap_trim_lower_bound", superblock.cluster_osdmap_trim_lower_bound); f->dump_unsigned("newest_map", superblock.newest_map); - f->dump_unsigned("num_pgs", pg_shard_manager.get_num_pgs()); + f->dump_unsigned("num_pgs", get_pg_shard_manager().get_num_pgs()); } void OSD::print(std::ostream& out) const @@ -689,87 +712,140 @@ void OSD::print(std::ostream& out) const << superblock.osd_fsid << " [" << superblock.oldest_map << "," << superblock.newest_map << "] " << "tlb:" << superblock.cluster_osdmap_trim_lower_bound - << " pgs:" << pg_shard_manager.get_num_pgs() + << " pgs:" << get_pg_shard_manager().get_num_pgs() + << "}"; +} + +void OSD::ShardDispatcher::print(std::ostream& out) const +{ + out << "{osd." << osd.superblock.whoami << " " + << osd.superblock.osd_fsid << " [" << osd.superblock.oldest_map + << "," << osd.superblock.newest_map << "] " + << " pgs:" << get_pg_shard_manager().get_num_pgs() << "}"; } std::optional> OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) { - if (pg_shard_manager.is_stopping()) { - return {}; - } - // XXX: we're assuming the `switch` part is executed immediately, and thus - // we won't smash the stack. Taking into account how `seastar::with_gate` - // is currently implemented, this seems to be the case (Summer 2022). + assert(seastar::this_shard_id() == PRIMARY_CORE); bool dispatched = true; gate.dispatch_in_background(__func__, *this, [this, conn=std::move(conn), m=std::move(m), &dispatched] { switch (m->get_type()) { - case CEPH_MSG_OSD_MAP: - return handle_osd_map(boost::static_pointer_cast(m)); - case CEPH_MSG_OSD_OP: - return handle_osd_op(conn, boost::static_pointer_cast(m)); - case MSG_OSD_PG_CREATE2: - return handle_pg_create( - conn, boost::static_pointer_cast(m)); - return seastar::now(); - case MSG_COMMAND: - return handle_command(conn, boost::static_pointer_cast(m)); - case MSG_OSD_MARK_ME_DOWN: - return handle_mark_me_down(conn, boost::static_pointer_cast(m)); - case MSG_OSD_PG_PULL: - [[fallthrough]]; - case MSG_OSD_PG_PUSH: - [[fallthrough]]; - case MSG_OSD_PG_PUSH_REPLY: - [[fallthrough]]; - case MSG_OSD_PG_RECOVERY_DELETE: - [[fallthrough]]; - case MSG_OSD_PG_RECOVERY_DELETE_REPLY: - [[fallthrough]]; - case MSG_OSD_PG_SCAN: - [[fallthrough]]; - case MSG_OSD_PG_BACKFILL: - [[fallthrough]]; - case MSG_OSD_PG_BACKFILL_REMOVE: - return handle_recovery_subreq(conn, boost::static_pointer_cast(m)); - case MSG_OSD_PG_LEASE: - [[fallthrough]]; - case MSG_OSD_PG_LEASE_ACK: - [[fallthrough]]; - case MSG_OSD_PG_NOTIFY2: - [[fallthrough]]; - case MSG_OSD_PG_INFO2: - [[fallthrough]]; - case MSG_OSD_PG_QUERY2: - [[fallthrough]]; - case MSG_OSD_BACKFILL_RESERVE: - [[fallthrough]]; - case MSG_OSD_RECOVERY_RESERVE: - [[fallthrough]]; - case MSG_OSD_PG_LOG: - return handle_peering_op(conn, boost::static_pointer_cast(m)); - case MSG_OSD_REPOP: - return handle_rep_op(conn, boost::static_pointer_cast(m)); - case MSG_OSD_REPOPREPLY: - return handle_rep_op_reply(conn, boost::static_pointer_cast(m)); - case MSG_OSD_SCRUB2: - return handle_scrub(conn, boost::static_pointer_cast(m)); - case MSG_OSD_PG_UPDATE_LOG_MISSING: - return handle_update_log_missing(conn, boost::static_pointer_cast< - MOSDPGUpdateLogMissing>(m)); - case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: - return handle_update_log_missing_reply(conn, boost::static_pointer_cast< - MOSDPGUpdateLogMissingReply>(m)); - default: - dispatched = false; - return seastar::now(); + case CEPH_MSG_OSD_MAP: + case CEPH_MSG_OSD_OP: + case MSG_OSD_PG_CREATE2: + case MSG_COMMAND: + case MSG_OSD_MARK_ME_DOWN: + case MSG_OSD_PG_PULL: + case MSG_OSD_PG_PUSH: + case MSG_OSD_PG_PUSH_REPLY: + case MSG_OSD_PG_RECOVERY_DELETE: + case MSG_OSD_PG_RECOVERY_DELETE_REPLY: + case MSG_OSD_PG_SCAN: + case MSG_OSD_PG_BACKFILL: + case MSG_OSD_PG_BACKFILL_REMOVE: + case MSG_OSD_PG_LEASE: + case MSG_OSD_PG_LEASE_ACK: + case MSG_OSD_PG_NOTIFY2: + case MSG_OSD_PG_INFO2: + case MSG_OSD_PG_QUERY2: + case MSG_OSD_BACKFILL_RESERVE: + case MSG_OSD_RECOVERY_RESERVE: + case MSG_OSD_PG_LOG: + case MSG_OSD_REPOP: + case MSG_OSD_REPOPREPLY: + case MSG_OSD_SCRUB2: + case MSG_OSD_PG_UPDATE_LOG_MISSING: + case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: + { + return shard_dispatchers.invoke_on(0, + [conn, m = std::move(m)] + (auto &local_dispatcher) mutable ->seastar::future<>{ + return local_dispatcher.ms_dispatch(std::move(conn), m); + }); + } + default: + { + dispatched = false; + return seastar::now(); + } } }); return (dispatched ? std::make_optional(seastar::now()) : std::nullopt); } +seastar::future<> +OSD::ShardDispatcher::ms_dispatch( + crimson::net::ConnectionRef conn, + MessageRef m) +{ + if (pg_shard_manager.is_stopping()) { + return seastar::now(); + } + switch (m->get_type()) { + case CEPH_MSG_OSD_MAP: + return handle_osd_map(boost::static_pointer_cast(m)); + case CEPH_MSG_OSD_OP: + return handle_osd_op(conn, boost::static_pointer_cast(m)); + case MSG_OSD_PG_CREATE2: + return handle_pg_create( + conn, boost::static_pointer_cast(m)); + return seastar::now(); + case MSG_COMMAND: + return handle_command(conn, boost::static_pointer_cast(m)); + case MSG_OSD_MARK_ME_DOWN: + return handle_mark_me_down(conn, boost::static_pointer_cast(m)); + case MSG_OSD_PG_PULL: + [[fallthrough]]; + case MSG_OSD_PG_PUSH: + [[fallthrough]]; + case MSG_OSD_PG_PUSH_REPLY: + [[fallthrough]]; + case MSG_OSD_PG_RECOVERY_DELETE: + [[fallthrough]]; + case MSG_OSD_PG_RECOVERY_DELETE_REPLY: + [[fallthrough]]; + case MSG_OSD_PG_SCAN: + [[fallthrough]]; + case MSG_OSD_PG_BACKFILL: + [[fallthrough]]; + case MSG_OSD_PG_BACKFILL_REMOVE: + return handle_recovery_subreq(conn, boost::static_pointer_cast(m)); + case MSG_OSD_PG_LEASE: + [[fallthrough]]; + case MSG_OSD_PG_LEASE_ACK: + [[fallthrough]]; + case MSG_OSD_PG_NOTIFY2: + [[fallthrough]]; + case MSG_OSD_PG_INFO2: + [[fallthrough]]; + case MSG_OSD_PG_QUERY2: + [[fallthrough]]; + case MSG_OSD_BACKFILL_RESERVE: + [[fallthrough]]; + case MSG_OSD_RECOVERY_RESERVE: + [[fallthrough]]; + case MSG_OSD_PG_LOG: + return handle_peering_op(conn, boost::static_pointer_cast(m)); + case MSG_OSD_REPOP: + return handle_rep_op(conn, boost::static_pointer_cast(m)); + case MSG_OSD_REPOPREPLY: + return handle_rep_op_reply(conn, boost::static_pointer_cast(m)); + case MSG_OSD_SCRUB2: + return handle_scrub(conn, boost::static_pointer_cast(m)); + case MSG_OSD_PG_UPDATE_LOG_MISSING: + return handle_update_log_missing(conn, boost::static_pointer_cast< + MOSDPGUpdateLogMissing>(m)); + case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: + return handle_update_log_missing_reply(conn, boost::static_pointer_cast< + MOSDPGUpdateLogMissingReply>(m)); + default: + return seastar::now(); + } +} + void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { // TODO: cleanup the session attached to this connection @@ -827,7 +903,7 @@ seastar::future OSD::get_stats() const // MPGStats::had_map_for is not used since PGMonitor was removed auto m = crimson::make_message(monc->get_fsid(), osdmap->get_epoch()); m->osd_stat = osd_stat; - return pg_shard_manager.get_pg_stats( + return get_pg_shard_manager().get_pg_stats( ).then([m=std::move(m)](auto &&stats) mutable { m->pg_stat = std::move(stats); return seastar::make_ready_future(std::move(m)); @@ -841,7 +917,9 @@ uint64_t OSD::send_pg_stats() return osd_stat.seq; } -bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref m) +bool OSD::ShardDispatcher::require_mon_peer( + crimson::net::Connection *conn, + Ref m) { if (!conn->peer_is_mon()) { logger().info("{} received from non-mon {}, {}", @@ -853,7 +931,7 @@ bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref m) return true; } -seastar::future<> OSD::handle_osd_map(Ref m) +seastar::future<> OSD::ShardDispatcher::handle_osd_map(Ref m) { /* Ensure that only one MOSDMap is processed at a time. Allowing concurrent * processing may eventually be worthwhile, but such an implementation would @@ -864,17 +942,18 @@ seastar::future<> OSD::handle_osd_map(Ref m) * simpler invariant for now. * See https://tracker.ceph.com/issues/59165 */ - return handle_osd_map_lock.lock().then([this, m] { + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return osd.handle_osd_map_lock.lock().then([this, m] { return _handle_osd_map(m); }).finally([this] { - return handle_osd_map_lock.unlock(); + return osd.handle_osd_map_lock.unlock(); }); } -seastar::future<> OSD::_handle_osd_map(Ref m) +seastar::future<> OSD::ShardDispatcher::_handle_osd_map(Ref m) { logger().info("handle_osd_map {}", *m); - if (m->fsid != superblock.cluster_fsid) { + if (m->fsid != osd.superblock.cluster_fsid) { logger().warn("fsid mismatched"); return seastar::now(); } @@ -886,16 +965,16 @@ seastar::future<> OSD::_handle_osd_map(Ref m) const auto first = m->get_first(); const auto last = m->get_last(); logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]", - first, last, superblock.newest_map, + first, last, osd.superblock.newest_map, m->cluster_osdmap_trim_lower_bound, m->newest_map); // make sure there is something new, here, before we bother flushing // the queues and such - if (last <= superblock.newest_map) { + if (last <= osd.superblock.newest_map) { return seastar::now(); } // missing some? bool skip_maps = false; - epoch_t start = superblock.newest_map + 1; + epoch_t start = osd.superblock.newest_map + 1; if (first > start) { logger().info("handle_osd_map message skips epochs {}..{}", start, first - 1); @@ -918,20 +997,20 @@ seastar::future<> OSD::_handle_osd_map(Ref m) [=, this](auto& t) { return pg_shard_manager.store_maps(t, start, m).then([=, this, &t] { // even if this map isn't from a mon, we may have satisfied our subscription - monc->sub_got("osdmap", last); - if (!superblock.oldest_map || skip_maps) { - superblock.oldest_map = first; + osd.monc->sub_got("osdmap", last); + if (!osd.superblock.oldest_map || skip_maps) { + osd.superblock.oldest_map = first; } - superblock.newest_map = last; - superblock.current_epoch = last; + osd.superblock.newest_map = last; + osd.superblock.current_epoch = last; // note in the superblock that we were clean thru the prior epoch - if (boot_epoch && boot_epoch >= superblock.mounted) { - superblock.mounted = boot_epoch; - superblock.clean_thru = last; + if (osd.boot_epoch && osd.boot_epoch >= osd.superblock.mounted) { + osd.superblock.mounted = osd.boot_epoch; + osd.superblock.clean_thru = last; } - pg_shard_manager.get_meta_coll().store_superblock(t, superblock); - pg_shard_manager.set_superblock(superblock); + pg_shard_manager.get_meta_coll().store_superblock(t, osd.superblock); + pg_shard_manager.set_superblock(osd.superblock); logger().debug("OSD::handle_osd_map: do_transaction..."); return store.get_sharded_store().do_transaction( pg_shard_manager.get_meta_coll().collection(), @@ -943,10 +1022,12 @@ seastar::future<> OSD::_handle_osd_map(Ref m) }); } -seastar::future<> OSD::committed_osd_maps(version_t first, - version_t last, - Ref m) +seastar::future<> OSD::ShardDispatcher::committed_osd_maps( + version_t first, + version_t last, + Ref m) { + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last); // advance through the new maps return seastar::do_for_each(boost::make_counting_iterator(first), @@ -955,17 +1036,17 @@ seastar::future<> OSD::committed_osd_maps(version_t first, return pg_shard_manager.get_local_map( cur ).then([this](OSDMapService::local_cached_map_t&& o) { - osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o)); + osd.osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o)); return pg_shard_manager.update_map(std::move(o)); }).then([this] { if (get_shard_services().get_up_epoch() == 0 && - osdmap->is_up(whoami) && - osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) { + osd.osdmap->is_up(whoami) && + osd.osdmap->get_addrs(whoami) == osd.public_msgr->get_myaddrs()) { return pg_shard_manager.set_up_epoch( - osdmap->get_epoch() + osd.osdmap->get_epoch() ).then([this] { - if (!boot_epoch) { - boot_epoch = osdmap->get_epoch(); + if (!osd.boot_epoch) { + osd.boot_epoch = osd.osdmap->get_epoch(); } }); } else { @@ -973,43 +1054,43 @@ seastar::future<> OSD::committed_osd_maps(version_t first, } }); }).then([m, this] { - if (osdmap->is_up(whoami)) { - const auto up_from = osdmap->get_up_from(whoami); + if (osd.osdmap->is_up(whoami)) { + const auto up_from = osd.osdmap->get_up_from(whoami); logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}", - whoami, osdmap->get_epoch(), up_from, bind_epoch, + whoami, osd.osdmap->get_epoch(), up_from, osd.bind_epoch, pg_shard_manager.get_osd_state_string()); - if (bind_epoch < up_from && - osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() && + if (osd.bind_epoch < up_from && + osd.osdmap->get_addrs(whoami) == osd.public_msgr->get_myaddrs() && pg_shard_manager.is_booting()) { logger().info("osd.{}: activating...", whoami); pg_shard_manager.set_active(); - beacon_timer.arm_periodic( + osd.beacon_timer.arm_periodic( std::chrono::seconds(local_conf()->osd_beacon_report_interval)); // timer continuation rearms when complete - tick_timer.arm( + osd.tick_timer.arm( std::chrono::seconds(TICK_INTERVAL)); } } else { if (pg_shard_manager.is_prestop()) { - got_stop_ack(); + osd.got_stop_ack(); return seastar::now(); } } return check_osdmap_features().then([this] { // yay! logger().info("osd.{}: committed_osd_maps: broadcasting osdmaps up" - " to {} epoch to pgs", whoami, osdmap->get_epoch()); - return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch()); + " to {} epoch to pgs", whoami, osd.osdmap->get_epoch()); + return pg_shard_manager.broadcast_map_to_pgs(osd.osdmap->get_epoch()); }); }).then([m, this] { if (pg_shard_manager.is_active()) { logger().info("osd.{}: now active", whoami); - if (!osdmap->exists(whoami) || - osdmap->is_stop(whoami)) { - return shutdown(); + if (!osd.osdmap->exists(whoami) || + osd.osdmap->is_stop(whoami)) { + return osd.shutdown(); } - if (should_restart()) { - return restart(); + if (osd.should_restart()) { + return osd.restart(); } else { return seastar::now(); } @@ -1017,11 +1098,11 @@ seastar::future<> OSD::committed_osd_maps(version_t first, logger().info("osd.{}: now preboot", whoami); if (m->get_source().is_mon()) { - return _preboot( + return osd._preboot( m->cluster_osdmap_trim_lower_bound, m->newest_map); } else { logger().info("osd.{}: start_boot", whoami); - return start_boot(); + return osd.start_boot(); } } else { logger().info("osd.{}: now {}", whoami, @@ -1032,8 +1113,9 @@ seastar::future<> OSD::committed_osd_maps(version_t first, }); } -seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn, - Ref m) +seastar::future<> OSD::ShardDispatcher::handle_osd_op( + crimson::net::ConnectionRef conn, + Ref m) { (void) pg_shard_manager.start_pg_operation( get_shard_services(), @@ -1042,8 +1124,9 @@ seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn, return seastar::now(); } -seastar::future<> OSD::handle_pg_create(crimson::net::ConnectionRef conn, - Ref m) +seastar::future<> OSD::ShardDispatcher::handle_pg_create( + crimson::net::ConnectionRef conn, + Ref m) { for (auto& [pgid, when] : m->pgs) { const auto &[created, created_stamp] = when; @@ -1077,7 +1160,7 @@ seastar::future<> OSD::handle_pg_create(crimson::net::ConnectionRef conn, return seastar::now(); } -seastar::future<> OSD::handle_update_log_missing( +seastar::future<> OSD::ShardDispatcher::handle_update_log_missing( crimson::net::ConnectionRef conn, Ref m) { @@ -1088,7 +1171,7 @@ seastar::future<> OSD::handle_update_log_missing( return seastar::now(); } -seastar::future<> OSD::handle_update_log_missing_reply( +seastar::future<> OSD::ShardDispatcher::handle_update_log_missing_reply( crimson::net::ConnectionRef conn, Ref m) { @@ -1099,8 +1182,9 @@ seastar::future<> OSD::handle_update_log_missing_reply( return seastar::now(); } -seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn, - Ref m) +seastar::future<> OSD::ShardDispatcher::handle_rep_op( + crimson::net::ConnectionRef conn, + Ref m) { m->finish_decode(); std::ignore = pg_shard_manager.start_pg_operation( @@ -1109,8 +1193,9 @@ seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn, return seastar::now(); } -seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn, - Ref m) +seastar::future<> OSD::ShardDispatcher::handle_rep_op_reply( + crimson::net::ConnectionRef conn, + Ref m) { spg_t pgid = m->get_spg(); return pg_shard_manager.with_pg( @@ -1126,10 +1211,11 @@ seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn, }); } -seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn, - Ref m) +seastar::future<> OSD::ShardDispatcher::handle_scrub( + crimson::net::ConnectionRef conn, + Ref m) { - if (m->fsid != superblock.cluster_fsid) { + if (m->fsid != osd.superblock.cluster_fsid) { logger().warn("fsid mismatched"); return seastar::now(); } @@ -1146,17 +1232,20 @@ seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn, }); } -seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn, - Ref m) +seastar::future<> OSD::ShardDispatcher::handle_mark_me_down( + crimson::net::ConnectionRef conn, + Ref m) { + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); if (pg_shard_manager.is_prestop()) { - got_stop_ack(); + osd.got_stop_ack(); } return seastar::now(); } -seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn, - Ref m) +seastar::future<> OSD::ShardDispatcher::handle_recovery_subreq( + crimson::net::ConnectionRef conn, + Ref m) { std::ignore = pg_shard_manager.start_pg_operation( conn, std::move(m)); @@ -1190,7 +1279,7 @@ seastar::future<> OSD::restart() { beacon_timer.cancel(); tick_timer.cancel(); - return pg_shard_manager.set_up_epoch( + return get_pg_shard_manager().set_up_epoch( 0 ).then([this] { bind_epoch = osdmap->get_epoch(); @@ -1209,7 +1298,7 @@ seastar::future<> OSD::shutdown() seastar::future<> OSD::send_beacon() { - if (!pg_shard_manager.is_active()) { + if (!get_pg_shard_manager().is_active()) { return seastar::now(); } // FIXME: min lec should be calculated from pg_stat @@ -1224,11 +1313,11 @@ seastar::future<> OSD::send_beacon() seastar::future<> OSD::update_heartbeat_peers() { - if (!pg_shard_manager.is_active()) { + if (!get_pg_shard_manager().is_active()) { return seastar::now();; } - pg_shard_manager.for_each_pgid([this](auto &pgid) { + get_pg_shard_manager().for_each_pgid([this](auto &pgid) { vector up, acting; osdmap->pg_to_up_acting_osds(pgid.pgid, &up, nullptr, @@ -1245,7 +1334,7 @@ seastar::future<> OSD::update_heartbeat_peers() return seastar::now(); } -seastar::future<> OSD::handle_peering_op( +seastar::future<> OSD::ShardDispatcher::handle_peering_op( crimson::net::ConnectionRef conn, Ref m) { @@ -1261,16 +1350,16 @@ seastar::future<> OSD::handle_peering_op( return seastar::now(); } -seastar::future<> OSD::check_osdmap_features() +seastar::future<> OSD::ShardDispatcher::check_osdmap_features() { return store.write_meta("require_osd_release", - stringify((int)osdmap->require_osd_release)); + stringify((int)osd.osdmap->require_osd_release)); } seastar::future<> OSD::prepare_to_stop() { if (osdmap && osdmap->is_up(whoami)) { - pg_shard_manager.set_prestop(); + get_pg_shard_manager().set_prestop(); const auto timeout = std::chrono::duration_cast( std::chrono::duration( diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index abf4a00844d3c..1093cab34105d 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -61,6 +61,81 @@ class PG; class OSD final : public crimson::net::Dispatcher, private crimson::common::AuthHandler, private crimson::mgr::WithStats { +public: + class ShardDispatcher + : public seastar::peering_sharded_service { + friend class OSD; + public: + ShardDispatcher( + OSD& osd, + int whoami, + crimson::os::FuturizedStore& store) + : pg_shard_manager(osd.osd_singleton_state, osd.shard_services), + osd(osd), + whoami(whoami), + store(store) {} + ~ShardDispatcher() = default; + + // Dispatcher methods + seastar::future<> ms_dispatch(crimson::net::ConnectionRef, + MessageRef); + + private: + bool require_mon_peer(crimson::net::Connection *conn, Ref m); + + seastar::future<> handle_osd_map(Ref m); + seastar::future<> _handle_osd_map(Ref m); + seastar::future<> handle_pg_create(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_scrub(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn, + Ref m); + + seastar::future<> committed_osd_maps(version_t first, + version_t last, + Ref m); + + seastar::future<> check_osdmap_features(); + + seastar::future<> handle_command(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_update_log_missing_reply( + crimson::net::ConnectionRef conn, + Ref m); + + public: + void print(std::ostream&) const; + + auto &get_pg_shard_manager() { + return pg_shard_manager; + } + auto &get_pg_shard_manager() const { + return pg_shard_manager; + } + ShardServices &get_shard_services() { + return pg_shard_manager.get_shard_services(); + } + + private: + crimson::osd::PGShardManager pg_shard_manager; + OSD& osd; + const int whoami; + crimson::os::FuturizedStore& store; + }; + const int whoami; const uint32_t nonce; seastar::abort_source& abort_source; @@ -110,7 +185,9 @@ class OSD final : public crimson::net::Dispatcher, void handle_authentication(const EntityName& name, const AuthCapsInfo& caps) final; - crimson::osd::PGShardManager pg_shard_manager; + seastar::sharded osd_singleton_state; + seastar::sharded shard_services; + seastar::sharded shard_dispatchers; std::unique_ptr heartbeat; seastar::timer tick_timer; @@ -148,6 +225,21 @@ public: /// @return the seq id of the pg stats being sent uint64_t send_pg_stats(); + auto &get_shard_services() { + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return shard_services.local(); + } + + auto &get_pg_shard_manager() { + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return shard_dispatchers.local().get_pg_shard_manager(); + } + + auto &get_pg_shard_manager() const { + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return shard_dispatchers.local().get_pg_shard_manager(); + } + private: static seastar::future<> _write_superblock( crimson::os::FuturizedStore &store, @@ -163,52 +255,11 @@ private: seastar::future<> osdmap_subscribe(version_t epoch, bool force_request); + seastar::future<> start_asok_admin(); + void write_superblock(ceph::os::Transaction& t); seastar::future<> read_superblock(); - bool require_mon_peer(crimson::net::Connection *conn, Ref m); - - seastar::future<> handle_osd_map(Ref m); - seastar::future<> _handle_osd_map(Ref m); - seastar::future<> handle_pg_create(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_scrub(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn, - Ref m); - - seastar::future<> committed_osd_maps(version_t first, - version_t last, - Ref m); - - seastar::future<> check_osdmap_features(); - - seastar::future<> handle_command(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> start_asok_admin(); - seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_update_log_missing_reply( - crimson::net::ConnectionRef conn, - Ref m); -public: - auto &get_pg_shard_manager() { - return pg_shard_manager; - } - ShardServices &get_shard_services() { - return pg_shard_manager.get_shard_services(); - } - private: crimson::common::Gated gate; @@ -236,8 +287,15 @@ inline std::ostream& operator<<(std::ostream& out, const OSD& osd) { return out; } +inline std::ostream& operator<<(std::ostream& out, + const OSD::ShardDispatcher& shard_dispatcher) { + shard_dispatcher.print(out); + return out; +} + } #if FMT_VERSION >= 90000 template <> struct fmt::formatter : fmt::ostream_formatter {}; +template <> struct fmt::formatter : fmt::ostream_formatter {}; #endif diff --git a/src/crimson/osd/pg_shard_manager.cc b/src/crimson/osd/pg_shard_manager.cc index 01dbd11c21458..e0ef237489145 100644 --- a/src/crimson/osd/pg_shard_manager.cc +++ b/src/crimson/osd/pg_shard_manager.cc @@ -12,39 +12,6 @@ namespace { namespace crimson::osd { -seastar::future<> PGShardManager::start( - const int whoami, - crimson::net::Messenger &cluster_msgr, - crimson::net::Messenger &public_msgr, - crimson::mon::Client &monc, - crimson::mgr::Client &mgrc, - crimson::os::FuturizedStore &store) -{ - ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); - return osd_singleton_state.start_single( - whoami, std::ref(cluster_msgr), std::ref(public_msgr), - std::ref(monc), std::ref(mgrc) - ).then([this, whoami, &store] { - ceph::mono_time startup_time = ceph::mono_clock::now(); - return shard_services.start( - std::ref(osd_singleton_state), - whoami, - startup_time, - osd_singleton_state.local().perf, - osd_singleton_state.local().recoverystate_perf, - std::ref(store)); - }); -} - -seastar::future<> PGShardManager::stop() -{ - ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); - return shard_services.stop( - ).then([this] { - return osd_singleton_state.stop(); - }); -} - seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store) { ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index dcd91c10553c7..9526ddcd05af5 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -24,8 +24,8 @@ namespace crimson::osd { * etc) */ class PGShardManager { - seastar::sharded osd_singleton_state; - seastar::sharded shard_services; + seastar::sharded &osd_singleton_state; + seastar::sharded &shard_services; #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \ template \ @@ -46,16 +46,11 @@ public: using cached_map_t = OSDMapService::cached_map_t; using local_cached_map_t = OSDMapService::local_cached_map_t; - PGShardManager() = default; - - seastar::future<> start( - const int whoami, - crimson::net::Messenger &cluster_msgr, - crimson::net::Messenger &public_msgr, - crimson::mon::Client &monc, - crimson::mgr::Client &mgrc, - crimson::os::FuturizedStore &store); - seastar::future<> stop(); + PGShardManager( + seastar::sharded &osd_singleton_state, + seastar::sharded &shard_services) + : osd_singleton_state(osd_singleton_state), + shard_services(shard_services) {} auto &get_osd_singleton_state() { ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 7ae20e88fa4d4..34b3a32e34b98 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -201,6 +201,7 @@ public: class OSDSingletonState : public md_config_obs_t { friend class ShardServices; friend class PGShardManager; + friend class OSD; using cached_map_t = OSDMapService::cached_map_t; using local_cached_map_t = OSDMapService::local_cached_map_t; -- 2.39.5