From 2b9a6a7a3099b32689a896cabc68ece29ee9c905 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 7 Aug 2023 16:58:12 +0800 Subject: [PATCH] crimson/osd: cleanup and drop OSD::ShardDispatcher Signed-off-by: Yingxin Cheng (cherry picked from commit e3d910a16c4b6239c7cf523c5510bc76fadcb832) --- src/crimson/osd/osd.cc | 278 +++++++++++----------------- src/crimson/osd/osd.h | 128 ++++--------- src/crimson/osd/pg_shard_manager.cc | 2 +- src/crimson/osd/pg_shard_manager.h | 16 +- 4 files changed, 162 insertions(+), 262 deletions(-) diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 60caf3a164c62..cfe4f54ab2e5e 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -95,6 +95,9 @@ OSD::OSD(int id, uint32_t nonce, monc{new crimson::mon::Client{*public_msgr, *this}}, mgrc{new crimson::mgr::Client{*public_msgr, *this}}, store{store}, + pg_shard_manager{osd_singleton_state, + shard_services, + pg_to_shard_mappings}, // do this in background -- continuation rearms timer when complete tick_timer{[this] { std::ignore = update_heartbeat_peers( @@ -160,11 +163,11 @@ CompatSet get_osd_initial_compat_set() seastar::future<> OSD::open_meta_coll() { + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); return store.get_sharded_store().open_collection( coll_t::meta() ).then([this](auto ch) { - ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); - get_pg_shard_manager().init_meta_coll(ch, store.get_sharded_store()); + pg_shard_manager.init_meta_coll(ch, store.get_sharded_store()); return seastar::now(); }); } @@ -376,10 +379,6 @@ seastar::future<> OSD::start() osd_singleton_state.local().recoverystate_perf, std::ref(store), std::ref(osd_states)); - }).then([this] { - return shard_dispatchers.start( - std::ref(*this), - std::ref(pg_to_shard_mappings)); }); }).then([this] { heartbeat.reset(new Heartbeat{ @@ -395,24 +394,24 @@ seastar::future<> OSD::start() }).then([this] { return open_meta_coll(); }).then([this] { - return get_pg_shard_manager().get_meta_coll().load_superblock( + return pg_shard_manager.get_meta_coll().load_superblock( ).handle_error( crimson::ct_error::assert_all("open_meta_coll error") ); }).then([this](OSDSuperblock&& sb) { superblock = std::move(sb); - get_pg_shard_manager().set_superblock(superblock); - return get_pg_shard_manager().get_local_map(superblock.current_epoch); + pg_shard_manager.set_superblock(superblock); + return pg_shard_manager.get_local_map(superblock.current_epoch); }).then([this](OSDMapService::local_cached_map_t&& map) { osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(map)); - return get_pg_shard_manager().update_map(std::move(map)); + return pg_shard_manager.update_map(std::move(map)); }).then([this] { return shard_services.invoke_on_all([this](auto &local_service) { local_service.local_state.osdmap_gate.got_map(osdmap->get_epoch()); }); }).then([this] { bind_epoch = osdmap->get_epoch(); - return get_pg_shard_manager().load_pgs(store); + return pg_shard_manager.load_pgs(store); }).then([this] { uint64_t osd_required = CEPH_FEATURE_UID | @@ -486,7 +485,7 @@ seastar::future<> OSD::start() seastar::future<> OSD::start_boot() { - get_pg_shard_manager().set_preboot(); + pg_shard_manager.set_preboot(); return monc->get_version("osdmap").then([this](auto&& ret) { auto [newest, oldest] = ret; return _preboot(oldest, newest); @@ -528,7 +527,7 @@ seastar::future<> OSD::_preboot(version_t oldest, version_t newest) seastar::future<> OSD::_send_boot() { - get_pg_shard_manager().set_booting(); + pg_shard_manager.set_booting(); entity_addrvec_t public_addrs = public_msgr->get_myaddrs(); entity_addrvec_t cluster_addrs = cluster_msgr->get_myaddrs(); @@ -608,12 +607,12 @@ seastar::future<> OSD::_add_me_to_crush() }); } -seastar::future<> OSD::ShardDispatcher::handle_command( +seastar::future<> OSD::handle_command( crimson::net::ConnectionRef conn, Ref m) { ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); - return osd.asok->handle_command(conn, std::move(m)); + return asok->handle_command(conn, std::move(m)); } /* @@ -633,7 +632,7 @@ seastar::future<> OSD::start_asok_admin() asok->register_command(make_asok_hook(*this)); asok->register_command(make_asok_hook(*this)); asok->register_command( - make_asok_hook(std::as_const(get_pg_shard_manager()))); + make_asok_hook(std::as_const(pg_shard_manager))); asok->register_command(make_asok_hook()); asok->register_command(make_asok_hook()); asok->register_command(make_asok_hook(get_shard_services())); @@ -644,7 +643,7 @@ seastar::future<> OSD::start_asok_admin() // ops commands asok->register_command( make_asok_hook( - std::as_const(get_pg_shard_manager()))); + std::as_const(pg_shard_manager))); asok->register_command( make_asok_hook( std::as_const(get_shard_services().get_registry()))); @@ -663,7 +662,7 @@ seastar::future<> OSD::stop() tick_timer.cancel(); // see also OSD::shutdown() return prepare_to_stop().then([this] { - return get_pg_shard_manager().set_stopping(); + return pg_shard_manager.set_stopping(); }).then([this] { logger().debug("prepared to stop"); public_msgr->stop(); @@ -672,20 +671,17 @@ seastar::future<> OSD::stop() return asok->stop().then([this] { return heartbeat->stop(); }).then([this] { - return get_pg_shard_manager().stop_registries(); + return pg_shard_manager.stop_registries(); }).then([this] { return store.umount(); }).then([this] { return store.stop(); }).then([this] { - return get_pg_shard_manager().stop_pgs(); + return pg_shard_manager.stop_pgs(); }).then([this] { return monc->stop(); }).then([this] { return mgrc->stop(); - }).then([this] { - ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); - return shard_dispatchers.stop(); }).then([this] { return shard_services.stop(); }).then([this] { @@ -711,12 +707,12 @@ void OSD::dump_status(Formatter* f) const f->dump_stream("cluster_fsid") << superblock.cluster_fsid; f->dump_stream("osd_fsid") << superblock.osd_fsid; f->dump_unsigned("whoami", superblock.whoami); - f->dump_string("state", get_pg_shard_manager().get_osd_state_string()); + f->dump_string("state", pg_shard_manager.get_osd_state_string()); f->dump_unsigned("oldest_map", superblock.oldest_map); f->dump_unsigned("cluster_osdmap_trim_lower_bound", superblock.cluster_osdmap_trim_lower_bound); f->dump_unsigned("newest_map", superblock.newest_map); - f->dump_unsigned("num_pgs", get_pg_shard_manager().get_num_pgs()); + f->dump_unsigned("num_pgs", pg_shard_manager.get_num_pgs()); } void OSD::print(std::ostream& out) const @@ -725,70 +721,30 @@ void OSD::print(std::ostream& out) const << superblock.osd_fsid << " [" << superblock.oldest_map << "," << superblock.newest_map << "] " << "tlb:" << superblock.cluster_osdmap_trim_lower_bound - << " pgs:" << get_pg_shard_manager().get_num_pgs() - << "}"; -} - -void OSD::ShardDispatcher::print(std::ostream& out) const -{ - out << "{osd." << osd.superblock.whoami << " " - << osd.superblock.osd_fsid << " [" << osd.superblock.oldest_map - << "," << osd.superblock.newest_map << "] " - << " pgs:" << get_pg_shard_manager().get_num_pgs() + << " pgs:" << pg_shard_manager.get_num_pgs() << "}"; } std::optional> OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) { - if (get_pg_shard_manager().is_stopping()) { + if (pg_shard_manager.is_stopping()) { return seastar::now(); } - bool dispatched = true; - gate.dispatch_in_background(__func__, *this, [this, conn=std::move(conn), - m=std::move(m), &dispatched]() mutable { - switch (m->get_type()) { - case CEPH_MSG_OSD_MAP: - case CEPH_MSG_OSD_OP: - case MSG_OSD_PG_CREATE2: - case MSG_COMMAND: - case MSG_OSD_MARK_ME_DOWN: - case MSG_OSD_PG_PULL: - case MSG_OSD_PG_PUSH: - case MSG_OSD_PG_PUSH_REPLY: - case MSG_OSD_PG_RECOVERY_DELETE: - case MSG_OSD_PG_RECOVERY_DELETE_REPLY: - case MSG_OSD_PG_SCAN: - case MSG_OSD_PG_BACKFILL: - case MSG_OSD_PG_BACKFILL_REMOVE: - case MSG_OSD_PG_LEASE: - case MSG_OSD_PG_LEASE_ACK: - case MSG_OSD_PG_NOTIFY2: - case MSG_OSD_PG_INFO2: - case MSG_OSD_PG_QUERY2: - case MSG_OSD_BACKFILL_RESERVE: - case MSG_OSD_RECOVERY_RESERVE: - case MSG_OSD_PG_LOG: - case MSG_OSD_REPOP: - case MSG_OSD_REPOPREPLY: - case MSG_OSD_SCRUB2: - case MSG_OSD_PG_UPDATE_LOG_MISSING: - case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: - { - return shard_dispatchers.local().ms_dispatch(conn, std::move(m)); - } - default: - { - dispatched = false; - return seastar::now(); - } - } + auto maybe_ret = do_ms_dispatch(conn, std::move(m)); + if (!maybe_ret.has_value()) { + return std::nullopt; + } + + gate.dispatch_in_background( + __func__, *this, [ret=std::move(maybe_ret.value())]() mutable { + return std::move(ret); }); - return (dispatched ? std::make_optional(seastar::now()) : std::nullopt); + return seastar::now(); } -seastar::future<> -OSD::ShardDispatcher::ms_dispatch( +std::optional> +OSD::do_ms_dispatch( crimson::net::ConnectionRef conn, MessageRef m) { @@ -800,11 +756,12 @@ OSD::ShardDispatcher::ms_dispatch( // FIXME: order is not guaranteed in this path return conn.get_foreign( ).then([this, m=std::move(m)](auto f_conn) { - return container().invoke_on(PRIMARY_CORE, - [f_conn=std::move(f_conn), m=std::move(m)] - (auto& local_dispatcher) mutable { + return seastar::smp::submit_to(PRIMARY_CORE, + [f_conn=std::move(f_conn), m=std::move(m), this]() mutable { auto conn = make_local_shared_foreign(std::move(f_conn)); - return local_dispatcher.ms_dispatch(conn, std::move(m)); + auto ret = do_ms_dispatch(conn, std::move(m)); + assert(ret.has_value()); + return std::move(ret.value()); }); }); } @@ -868,7 +825,7 @@ OSD::ShardDispatcher::ms_dispatch( return handle_update_log_missing_reply(conn, boost::static_pointer_cast< MOSDPGUpdateLogMissingReply>(m)); default: - return seastar::now(); + return std::nullopt; } } @@ -929,7 +886,7 @@ seastar::future OSD::get_stats() const // MPGStats::had_map_for is not used since PGMonitor was removed auto m = crimson::make_message(monc->get_fsid(), osdmap->get_epoch()); m->osd_stat = osd_stat; - return get_pg_shard_manager().get_pg_stats( + return pg_shard_manager.get_pg_stats( ).then([m=std::move(m)](auto &&stats) mutable { m->pg_stat = std::move(stats); return seastar::make_ready_future(std::move(m)); @@ -943,21 +900,7 @@ uint64_t OSD::send_pg_stats() return osd_stat.seq; } -bool OSD::ShardDispatcher::require_mon_peer( - crimson::net::Connection *conn, - Ref m) -{ - if (!conn->peer_is_mon()) { - logger().info("{} received from non-mon {}, {}", - __func__, - conn->get_peer_addr(), - *m); - return false; - } - return true; -} - -seastar::future<> OSD::ShardDispatcher::handle_osd_map(Ref m) +seastar::future<> OSD::handle_osd_map(Ref m) { /* Ensure that only one MOSDMap is processed at a time. Allowing concurrent * processing may eventually be worthwhile, but such an implementation would @@ -969,17 +912,17 @@ seastar::future<> OSD::ShardDispatcher::handle_osd_map(Ref m) * See https://tracker.ceph.com/issues/59165 */ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); - return osd.handle_osd_map_lock.lock().then([this, m] { + return handle_osd_map_lock.lock().then([this, m] { return _handle_osd_map(m); }).finally([this] { - return osd.handle_osd_map_lock.unlock(); + return handle_osd_map_lock.unlock(); }); } -seastar::future<> OSD::ShardDispatcher::_handle_osd_map(Ref m) +seastar::future<> OSD::_handle_osd_map(Ref m) { logger().info("handle_osd_map {}", *m); - if (m->fsid != osd.superblock.cluster_fsid) { + if (m->fsid != superblock.cluster_fsid) { logger().warn("fsid mismatched"); return seastar::now(); } @@ -991,16 +934,16 @@ seastar::future<> OSD::ShardDispatcher::_handle_osd_map(Ref m) const auto first = m->get_first(); const auto last = m->get_last(); logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]", - first, last, osd.superblock.newest_map, + first, last, superblock.newest_map, m->cluster_osdmap_trim_lower_bound, m->newest_map); // make sure there is something new, here, before we bother flushing // the queues and such - if (last <= osd.superblock.newest_map) { + if (last <= superblock.newest_map) { return seastar::now(); } // missing some? bool skip_maps = false; - epoch_t start = osd.superblock.newest_map + 1; + epoch_t start = superblock.newest_map + 1; if (first > start) { logger().info("handle_osd_map message skips epochs {}..{}", start, first - 1); @@ -1023,22 +966,22 @@ seastar::future<> OSD::ShardDispatcher::_handle_osd_map(Ref m) [=, this](auto& t) { return pg_shard_manager.store_maps(t, start, m).then([=, this, &t] { // even if this map isn't from a mon, we may have satisfied our subscription - osd.monc->sub_got("osdmap", last); - if (!osd.superblock.oldest_map || skip_maps) { - osd.superblock.oldest_map = first; + monc->sub_got("osdmap", last); + if (!superblock.oldest_map || skip_maps) { + superblock.oldest_map = first; } - osd.superblock.newest_map = last; - osd.superblock.current_epoch = last; + superblock.newest_map = last; + superblock.current_epoch = last; // note in the superblock that we were clean thru the prior epoch - if (osd.boot_epoch && osd.boot_epoch >= osd.superblock.mounted) { - osd.superblock.mounted = osd.boot_epoch; - osd.superblock.clean_thru = last; + if (boot_epoch && boot_epoch >= superblock.mounted) { + superblock.mounted = boot_epoch; + superblock.clean_thru = last; } - pg_shard_manager.get_meta_coll().store_superblock(t, osd.superblock); - pg_shard_manager.set_superblock(osd.superblock); + pg_shard_manager.get_meta_coll().store_superblock(t, superblock); + pg_shard_manager.set_superblock(superblock); logger().debug("OSD::handle_osd_map: do_transaction..."); - return osd.store.get_sharded_store().do_transaction( + return store.get_sharded_store().do_transaction( pg_shard_manager.get_meta_coll().collection(), std::move(t)); }); @@ -1048,13 +991,13 @@ seastar::future<> OSD::ShardDispatcher::_handle_osd_map(Ref m) }); } -seastar::future<> OSD::ShardDispatcher::committed_osd_maps( +seastar::future<> OSD::committed_osd_maps( version_t first, version_t last, Ref m) { ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); - logger().info("osd.{}: committed_osd_maps({}, {})", osd.whoami, first, last); + logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last); // advance through the new maps return seastar::do_for_each(boost::make_counting_iterator(first), boost::make_counting_iterator(last + 1), @@ -1062,17 +1005,17 @@ seastar::future<> OSD::ShardDispatcher::committed_osd_maps( return pg_shard_manager.get_local_map( cur ).then([this](OSDMapService::local_cached_map_t&& o) { - osd.osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o)); + osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o)); return pg_shard_manager.update_map(std::move(o)); }).then([this] { if (get_shard_services().get_up_epoch() == 0 && - osd.osdmap->is_up(osd.whoami) && - osd.osdmap->get_addrs(osd.whoami) == osd.public_msgr->get_myaddrs()) { + osdmap->is_up(whoami) && + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) { return pg_shard_manager.set_up_epoch( - osd.osdmap->get_epoch() + osdmap->get_epoch() ).then([this] { - if (!osd.boot_epoch) { - osd.boot_epoch = osd.osdmap->get_epoch(); + if (!boot_epoch) { + boot_epoch = osdmap->get_epoch(); } }); } else { @@ -1081,26 +1024,26 @@ seastar::future<> OSD::ShardDispatcher::committed_osd_maps( }); }).then([m, this] { auto fut = seastar::now(); - if (osd.osdmap->is_up(osd.whoami)) { - const auto up_from = osd.osdmap->get_up_from(osd.whoami); + if (osdmap->is_up(whoami)) { + const auto up_from = osdmap->get_up_from(whoami); logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}", - osd.whoami, osd.osdmap->get_epoch(), up_from, osd.bind_epoch, + whoami, osdmap->get_epoch(), up_from, bind_epoch, pg_shard_manager.get_osd_state_string()); - if (osd.bind_epoch < up_from && - osd.osdmap->get_addrs(osd.whoami) == osd.public_msgr->get_myaddrs() && + if (bind_epoch < up_from && + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() && pg_shard_manager.is_booting()) { - logger().info("osd.{}: activating...", osd.whoami); + logger().info("osd.{}: activating...", whoami); fut = pg_shard_manager.set_active().then([this] { - osd.beacon_timer.arm_periodic( + beacon_timer.arm_periodic( std::chrono::seconds(local_conf()->osd_beacon_report_interval)); // timer continuation rearms when complete - osd.tick_timer.arm( + tick_timer.arm( std::chrono::seconds(TICK_INTERVAL)); }); } } else { if (pg_shard_manager.is_prestop()) { - osd.got_stop_ack(); + got_stop_ack(); return seastar::now(); } } @@ -1108,34 +1051,34 @@ seastar::future<> OSD::ShardDispatcher::committed_osd_maps( return check_osdmap_features().then([this] { // yay! logger().info("osd.{}: committed_osd_maps: broadcasting osdmaps up" - " to {} epoch to pgs", osd.whoami, osd.osdmap->get_epoch()); - return pg_shard_manager.broadcast_map_to_pgs(osd.osdmap->get_epoch()); + " to {} epoch to pgs", whoami, osdmap->get_epoch()); + return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch()); }); }); }).then([m, this] { if (pg_shard_manager.is_active()) { - logger().info("osd.{}: now active", osd.whoami); - if (!osd.osdmap->exists(osd.whoami) || - osd.osdmap->is_stop(osd.whoami)) { - return osd.shutdown(); + logger().info("osd.{}: now active", whoami); + if (!osdmap->exists(whoami) || + osdmap->is_stop(whoami)) { + return shutdown(); } - if (osd.should_restart()) { - return osd.restart(); + if (should_restart()) { + return restart(); } else { return seastar::now(); } } else if (pg_shard_manager.is_preboot()) { - logger().info("osd.{}: now preboot", osd.whoami); + logger().info("osd.{}: now preboot", whoami); if (m->get_source().is_mon()) { - return osd._preboot( + return _preboot( m->cluster_osdmap_trim_lower_bound, m->newest_map); } else { - logger().info("osd.{}: start_boot", osd.whoami); - return osd.start_boot(); + logger().info("osd.{}: start_boot", whoami); + return start_boot(); } } else { - logger().info("osd.{}: now {}", osd.whoami, + logger().info("osd.{}: now {}", whoami, pg_shard_manager.get_osd_state_string()); // XXX return seastar::now(); @@ -1143,7 +1086,7 @@ seastar::future<> OSD::ShardDispatcher::committed_osd_maps( }); } -seastar::future<> OSD::ShardDispatcher::handle_osd_op( +seastar::future<> OSD::handle_osd_op( crimson::net::ConnectionRef conn, Ref m) { @@ -1153,7 +1096,7 @@ seastar::future<> OSD::ShardDispatcher::handle_osd_op( std::move(m)).second; } -seastar::future<> OSD::ShardDispatcher::handle_pg_create( +seastar::future<> OSD::handle_pg_create( crimson::net::ConnectionRef conn, Ref m) { @@ -1190,7 +1133,7 @@ seastar::future<> OSD::ShardDispatcher::handle_pg_create( }); } -seastar::future<> OSD::ShardDispatcher::handle_update_log_missing( +seastar::future<> OSD::handle_update_log_missing( crimson::net::ConnectionRef conn, Ref m) { @@ -1200,7 +1143,7 @@ seastar::future<> OSD::ShardDispatcher::handle_update_log_missing( std::move(m)).second; } -seastar::future<> OSD::ShardDispatcher::handle_update_log_missing_reply( +seastar::future<> OSD::handle_update_log_missing_reply( crimson::net::ConnectionRef conn, Ref m) { @@ -1210,7 +1153,7 @@ seastar::future<> OSD::ShardDispatcher::handle_update_log_missing_reply( std::move(m)).second; } -seastar::future<> OSD::ShardDispatcher::handle_rep_op( +seastar::future<> OSD::handle_rep_op( crimson::net::ConnectionRef conn, Ref m) { @@ -1220,7 +1163,7 @@ seastar::future<> OSD::ShardDispatcher::handle_rep_op( std::move(m)).second; } -seastar::future<> OSD::ShardDispatcher::handle_rep_op_reply( +seastar::future<> OSD::handle_rep_op_reply( crimson::net::ConnectionRef conn, Ref m) { @@ -1238,11 +1181,11 @@ seastar::future<> OSD::ShardDispatcher::handle_rep_op_reply( }); } -seastar::future<> OSD::ShardDispatcher::handle_scrub( +seastar::future<> OSD::handle_scrub( crimson::net::ConnectionRef conn, Ref m) { - if (m->fsid != osd.superblock.cluster_fsid) { + if (m->fsid != superblock.cluster_fsid) { logger().warn("fsid mismatched"); return seastar::now(); } @@ -1259,18 +1202,18 @@ seastar::future<> OSD::ShardDispatcher::handle_scrub( }); } -seastar::future<> OSD::ShardDispatcher::handle_mark_me_down( +seastar::future<> OSD::handle_mark_me_down( crimson::net::ConnectionRef conn, Ref m) { ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); if (pg_shard_manager.is_prestop()) { - osd.got_stop_ack(); + got_stop_ack(); } return seastar::now(); } -seastar::future<> OSD::ShardDispatcher::handle_recovery_subreq( +seastar::future<> OSD::handle_recovery_subreq( crimson::net::ConnectionRef conn, Ref m) { @@ -1305,7 +1248,7 @@ seastar::future<> OSD::restart() { beacon_timer.cancel(); tick_timer.cancel(); - return get_pg_shard_manager().set_up_epoch( + return pg_shard_manager.set_up_epoch( 0 ).then([this] { bind_epoch = osdmap->get_epoch(); @@ -1324,7 +1267,7 @@ seastar::future<> OSD::shutdown() seastar::future<> OSD::send_beacon() { - if (!get_pg_shard_manager().is_active()) { + if (!pg_shard_manager.is_active()) { return seastar::now(); } // FIXME: min lec should be calculated from pg_stat @@ -1339,11 +1282,11 @@ seastar::future<> OSD::send_beacon() seastar::future<> OSD::update_heartbeat_peers() { - if (!get_pg_shard_manager().is_active()) { + if (!pg_shard_manager.is_active()) { return seastar::now();; } - get_pg_shard_manager().for_each_pgid([this](auto &pgid) { + pg_shard_manager.for_each_pgid([this](auto &pgid) { vector up, acting; osdmap->pg_to_up_acting_osds(pgid.pgid, &up, nullptr, @@ -1360,7 +1303,7 @@ seastar::future<> OSD::update_heartbeat_peers() return seastar::now(); } -seastar::future<> OSD::ShardDispatcher::handle_peering_op( +seastar::future<> OSD::handle_peering_op( crimson::net::ConnectionRef conn, Ref m) { @@ -1375,17 +1318,18 @@ seastar::future<> OSD::ShardDispatcher::handle_peering_op( std::move(*evt)).second; } -seastar::future<> OSD::ShardDispatcher::check_osdmap_features() +seastar::future<> OSD::check_osdmap_features() { - return osd.store.write_meta( + assert(seastar::this_shard_id() == PRIMARY_CORE); + return store.write_meta( "require_osd_release", - stringify((int)osd.osdmap->require_osd_release)); + stringify((int)osdmap->require_osd_release)); } seastar::future<> OSD::prepare_to_stop() { if (osdmap && osdmap->is_up(whoami)) { - get_pg_shard_manager().set_prestop(); + pg_shard_manager.set_prestop(); const auto timeout = std::chrono::duration_cast( std::chrono::duration( diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 49216909c900f..10ff60d47017f 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -62,76 +62,6 @@ class PG; class OSD final : public crimson::net::Dispatcher, private crimson::common::AuthHandler, private crimson::mgr::WithStats { -public: - class ShardDispatcher - : public seastar::peering_sharded_service { - friend class OSD; - public: - ShardDispatcher( - OSD& osd, - PGShardMapping& pg_to_shard_mapping) - : pg_shard_manager(osd.osd_singleton_state, - osd.shard_services, pg_to_shard_mapping), - osd(osd) {} - ~ShardDispatcher() = default; - - // Dispatcher methods - seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef); - - private: - bool require_mon_peer(crimson::net::Connection *conn, Ref m); - - seastar::future<> handle_osd_map(Ref m); - seastar::future<> _handle_osd_map(Ref m); - seastar::future<> handle_pg_create(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_scrub(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn, - Ref m); - - seastar::future<> committed_osd_maps(version_t first, - version_t last, - Ref m); - - seastar::future<> check_osdmap_features(); - - seastar::future<> handle_command(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn, - Ref m); - seastar::future<> handle_update_log_missing_reply( - crimson::net::ConnectionRef conn, - Ref m); - - public: - void print(std::ostream&) const; - - auto &get_pg_shard_manager() { - return pg_shard_manager; - } - auto &get_pg_shard_manager() const { - return pg_shard_manager; - } - ShardServices &get_shard_services() { - return pg_shard_manager.get_shard_services(); - } - - private: - crimson::osd::PGShardManager pg_shard_manager; - OSD& osd; - }; - const int whoami; const uint32_t nonce; seastar::abort_source& abort_source; @@ -170,6 +100,8 @@ public: void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final; + std::optional> do_ms_dispatch(crimson::net::ConnectionRef, MessageRef); + // mgr::WithStats methods // pg statistics including osd ones osd_stat_t osd_stat; @@ -185,7 +117,8 @@ public: seastar::sharded osd_singleton_state; seastar::sharded osd_states; seastar::sharded shard_services; - seastar::sharded shard_dispatchers; + + crimson::osd::PGShardManager pg_shard_manager; std::unique_ptr heartbeat; seastar::timer tick_timer; @@ -203,6 +136,10 @@ public: crimson::net::MessengerRef hb_back_msgr); ~OSD() final; + auto &get_pg_shard_manager() { + return pg_shard_manager; + } + seastar::future<> open_meta_coll(); static seastar::future open_or_create_meta_coll( crimson::os::FuturizedStore &store @@ -224,18 +161,9 @@ public: uint64_t send_pg_stats(); auto &get_shard_services() { - ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); return shard_services.local(); } - auto &get_pg_shard_manager() { - return shard_dispatchers.local().get_pg_shard_manager(); - } - - auto &get_pg_shard_manager() const { - return shard_dispatchers.local().get_pg_shard_manager(); - } - private: static seastar::future<> _write_superblock( crimson::os::FuturizedStore &store, @@ -256,6 +184,39 @@ private: void write_superblock(ceph::os::Transaction& t); seastar::future<> read_superblock(); + seastar::future<> handle_osd_map(Ref m); + seastar::future<> _handle_osd_map(Ref m); + seastar::future<> handle_pg_create(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_scrub(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn, + Ref m); + + seastar::future<> committed_osd_maps(version_t first, + version_t last, + Ref m); + + seastar::future<> check_osdmap_features(); + + seastar::future<> handle_command(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_update_log_missing_reply( + crimson::net::ConnectionRef conn, + Ref m); + private: crimson::common::Gated gate; @@ -283,15 +244,8 @@ inline std::ostream& operator<<(std::ostream& out, const OSD& osd) { return out; } -inline std::ostream& operator<<(std::ostream& out, - const OSD::ShardDispatcher& shard_dispatcher) { - shard_dispatcher.print(out); - return out; -} - } #if FMT_VERSION >= 90000 template <> struct fmt::formatter : fmt::ostream_formatter {}; -template <> struct fmt::formatter : fmt::ostream_formatter {}; #endif diff --git a/src/crimson/osd/pg_shard_manager.cc b/src/crimson/osd/pg_shard_manager.cc index 472376fbfb460..6061c856be263 100644 --- a/src/crimson/osd/pg_shard_manager.cc +++ b/src/crimson/osd/pg_shard_manager.cc @@ -23,7 +23,7 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store) auto[coll, shard_core] = coll_core; spg_t pgid; if (coll.is_pg(&pgid)) { - return pg_to_shard_mapping.maybe_create_pg( + return get_pg_to_shard_mapping().maybe_create_pg( pgid, shard_core ).then([this, pgid] (auto core) { return this->template with_remote_shard_state( diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index 949d3aedfde6f..2f3a3015d1cd6 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -26,7 +26,7 @@ namespace crimson::osd { class PGShardManager { seastar::sharded &osd_singleton_state; seastar::sharded &shard_services; - PGShardMapping &pg_to_shard_mapping; + seastar::sharded &pg_to_shard_mapping; #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \ template \ @@ -50,7 +50,7 @@ public: PGShardManager( seastar::sharded &osd_singleton_state, seastar::sharded &shard_services, - PGShardMapping &pg_to_shard_mapping) + seastar::sharded &pg_to_shard_mapping) : osd_singleton_state(osd_singleton_state), shard_services(shard_services), pg_to_shard_mapping(pg_to_shard_mapping) {} @@ -71,6 +71,8 @@ public: } auto &get_local_state() { return get_shard_services().local_state; } auto &get_local_state() const { return get_shard_services().local_state; } + auto &get_pg_to_shard_mapping() { return pg_to_shard_mapping.local(); } + auto &get_pg_to_shard_mapping() const { return pg_to_shard_mapping.local(); } seastar::future<> update_map(local_cached_map_t &&map) { get_osd_singleton_state().update_map( @@ -187,7 +189,7 @@ public: logger.debug("{}: can_create", *op); get_local_state().registry.remove_from_registry(*op); - return pg_to_shard_mapping.maybe_create_pg( + return get_pg_to_shard_mapping().maybe_create_pg( op->get_pgid() ).then([this, op = std::move(op)](auto core) mutable { return this->template with_remote_shard_state_and_op( @@ -231,7 +233,7 @@ public: logger.debug("{}: !can_create", *op); get_local_state().registry.remove_from_registry(*op); - return pg_to_shard_mapping.maybe_create_pg( + return get_pg_to_shard_mapping().maybe_create_pg( op->get_pgid() ).then([this, op = std::move(op)](auto core) mutable { return this->template with_remote_shard_state_and_op( @@ -307,19 +309,19 @@ public: */ template void for_each_pgid(F &&f) const { - return pg_to_shard_mapping.for_each_pgid( + return get_pg_to_shard_mapping().for_each_pgid( std::forward(f)); } auto get_num_pgs() const { - return pg_to_shard_mapping.get_num_pgs(); + return get_pg_to_shard_mapping().get_num_pgs(); } seastar::future<> broadcast_map_to_pgs(epoch_t epoch); template auto with_pg(spg_t pgid, F &&f) { - core_id_t core = pg_to_shard_mapping.get_pg_mapping(pgid); + core_id_t core = get_pg_to_shard_mapping().get_pg_mapping(pgid); return with_remote_shard_state( core, [pgid, f=std::move(f)](auto &local_state, auto &local_service) mutable { -- 2.39.5