From e725936703f89933fc9fae70f649cb5632480093 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 12 Jul 2022 22:35:44 +0000 Subject: [PATCH] crimson/osd: introduce pg_shard_manager to clarify shard-local vs osd-wide state This commits begins to change ShardServices to be the interface by which PGs access shard local and osd wide state. Future work will further clarify this interface boundary and introduce machinery to mediate cold path access to state on remote shards. Signed-off-by: Samuel Just --- src/crimson/osd/CMakeLists.txt | 1 + src/crimson/osd/osd.cc | 26 +- src/crimson/osd/osd.h | 10 +- .../osd/osd_operation_external_tracking.h | 3 +- .../osd/osd_operations/background_recovery.cc | 2 +- .../osd/osd_operations/client_request.cc | 5 + .../osd/osd_operations/client_request.h | 2 + src/crimson/osd/pg.cc | 4 +- src/crimson/osd/pg.h | 10 +- src/crimson/osd/pg_shard_manager.cc | 28 ++ src/crimson/osd/pg_shard_manager.h | 52 +++ src/crimson/osd/recovery_backend.cc | 2 +- src/crimson/osd/shard_services.cc | 258 ++++++++------- src/crimson/osd/shard_services.h | 295 ++++++++++++------ 14 files changed, 437 insertions(+), 261 deletions(-) create mode 100644 src/crimson/osd/pg_shard_manager.cc create mode 100644 src/crimson/osd/pg_shard_manager.h diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index cfa9f9d71ab01..779a5beca53ad 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -10,6 +10,7 @@ add_executable(crimson-osd pg_meta.cc replicated_backend.cc shard_services.cc + pg_shard_manager.cc object_context.cc ops_executer.cc osd_operation.cc diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index b08c8dcfa0e73..4c9d92fbfe3df 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -90,7 +90,10 @@ OSD::OSD(int id, uint32_t nonce, monc{new crimson::mon::Client{*public_msgr, *this}}, mgrc{new crimson::mgr::Client{*public_msgr, *this}}, store{store}, - shard_services{*this, whoami, *cluster_msgr, *public_msgr, *monc, *mgrc, store}, + pg_shard_manager{ + static_cast(*this), whoami, *cluster_msgr, + *public_msgr, *monc, *mgrc, store}, + shard_services{pg_shard_manager.get_shard_services()}, heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}}, // do this in background tick_timer{[this] { @@ -329,8 +332,8 @@ seastar::future<> OSD::start() superblock = std::move(sb); return get_map(superblock.current_epoch); }).then([this](cached_map_t&& map) { - shard_services.update_map(map); osdmap_gate.got_map(map->get_epoch()); + pg_shard_manager.update_map(map); osdmap = std::move(map); bind_epoch = osdmap->get_epoch(); return load_pgs(); @@ -554,12 +557,15 @@ seastar::future<> OSD::start_asok_admin() asok->register_command(make_asok_hook(*this)); asok->register_command(make_asok_hook(*this)); // ops commands - asok->register_command(make_asok_hook( - std::as_const(get_shard_services().registry))); - asok->register_command(make_asok_hook( - std::as_const(get_shard_services().registry))); - asok->register_command(make_asok_hook( - std::as_const(get_shard_services().registry))); + asok->register_command( + make_asok_hook( + std::as_const(get_shard_services().get_registry()))); + asok->register_command( + make_asok_hook( + std::as_const(get_shard_services().get_registry()))); + asok->register_command( + make_asok_hook( + std::as_const(get_shard_services().get_registry()))); }); } @@ -578,7 +584,7 @@ seastar::future<> OSD::stop() return asok->stop().then([this] { return heartbeat->stop(); }).then([this] { - return shard_services.stop(); + return pg_shard_manager.stop_registries(); }).then([this] { return store.umount(); }).then([this] { @@ -1150,7 +1156,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first, [this](epoch_t cur) { return get_map(cur).then([this](cached_map_t&& o) { osdmap = std::move(o); - shard_services.update_map(osdmap); + pg_shard_manager.update_map(osdmap); if (up_epoch == 0 && osdmap->is_up(whoami) && osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) { diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 809f2dda0dced..a6030e2ac91ec 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -20,8 +20,7 @@ #include "crimson/mgr/client.h" #include "crimson/net/Dispatcher.h" #include "crimson/osd/osdmap_service.h" -#include "crimson/osd/state.h" -#include "crimson/osd/shard_services.h" +#include "crimson/osd/pg_shard_manager.h" #include "crimson/osd/osdmap_gate.h" #include "crimson/osd/pg_map.h" #include "crimson/osd/osd_operations/peering_event.h" @@ -111,7 +110,8 @@ class OSD final : public crimson::net::Dispatcher, void handle_authentication(const EntityName& name, const AuthCapsInfo& caps) final; - crimson::osd::ShardServices shard_services; + crimson::osd::PGShardManager pg_shard_manager; + crimson::osd::ShardServices &shard_services; std::unique_ptr heartbeat; seastar::timer tick_timer; @@ -218,7 +218,7 @@ public: OSD_OSDMapGate osdmap_gate; ShardServices &get_shard_services() { - return shard_services; + return pg_shard_manager.get_shard_services(); } seastar::future<> consume_map(epoch_t epoch); @@ -252,7 +252,7 @@ public: template auto start_pg_operation(Args&&... args) { - auto op = shard_services.registry.create_operation( + auto op = shard_services.get_registry().create_operation( std::forward(args)...); auto &logger = crimson::get_logger(ceph_subsys_osd); logger.debug("{}: starting {}", *op, __func__); diff --git a/src/crimson/osd/osd_operation_external_tracking.h b/src/crimson/osd/osd_operation_external_tracking.h index b77d0101e4201..a2eec6d39d59e 100644 --- a/src/crimson/osd/osd_operation_external_tracking.h +++ b/src/crimson/osd/osd_operation_external_tracking.h @@ -241,8 +241,7 @@ struct HistoricBackend void handle(ClientRequest::CompletionEvent&, const Operation& op) override { if (crimson::common::local_conf()->osd_op_history_size) { - const auto& client_op = to_client_request(op); - client_op.osd.get_shard_services().registry.put_historic(client_op); + to_client_request(op).put_historic(); } } }; diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc index 6e8319e92a5b6..116a9010d1c27 100644 --- a/src/crimson/osd/osd_operations/background_recovery.cc +++ b/src/crimson/osd/osd_operations/background_recovery.cc @@ -81,7 +81,7 @@ seastar::future<> BackgroundRecoveryT::start() return maybe_delay.then([ref, this] { return this->template with_blocking_event( [ref, this] (auto&& trigger) { - return ss.throttler.with_throttle_while( + return ss.with_throttle_while( std::move(trigger), this, get_scheduler_params(), [this] { return T::interruptor::with_interruption([this] { diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 7d3ea41b50ec9..db531f7bcb242 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -334,4 +334,9 @@ bool ClientRequest::is_misdirected(const PG& pg) const return true; } +void ClientRequest::put_historic() const +{ + osd.get_shard_services().get_registry().put_historic(*this); +} + } diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index 365420b4ecbce..1d8b521afbe9d 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -165,6 +165,8 @@ public: auto get_completed() const { return get_event().get_timestamp(); }; + + void put_historic() const; }; } diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 203e4542ae3d8..cac17c2f7aad6 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -999,7 +999,7 @@ PG::load_obc_iertr::future<> PG::with_head_obc(hobject_t oid, with_obc_func_t&& func) { auto [obc, existed] = - shard_services.obc_registry.get_cached_obc(std::move(oid)); + shard_services.get_cached_obc(std::move(oid)); return with_head_obc(std::move(obc), existed, std::move(func)); } @@ -1026,7 +1026,7 @@ PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func) logger().error("with_clone_obc: {} clone not found", coid); return load_obc_ertr::make_ready_future<>(); } - auto [clone, existed] = shard_services.obc_registry.get_cached_obc(*coid); + auto [clone, existed] = shard_services.get_cached_obc(*coid); return clone->template with_lock( [coid=*coid, existed=existed, head=std::move(head), clone=std::move(clone), diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 236a92b30405b..7a5d6f7075ed6 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -221,7 +221,7 @@ public: unsigned priority, PGPeeringEventURef on_grant, PGPeeringEventURef on_preempt) final { - shard_services.local_reserver.request_reservation( + shard_services.local_request_reservation( pgid, on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) { start_peering_event_operation(std::move(*on_grant)); @@ -235,13 +235,13 @@ public: void update_local_background_io_priority( unsigned priority) final { - shard_services.local_reserver.update_priority( + shard_services.local_update_priority( pgid, priority); } void cancel_local_background_io_reservation() final { - shard_services.local_reserver.cancel_reservation( + shard_services.local_cancel_reservation( pgid); } @@ -249,7 +249,7 @@ public: unsigned priority, PGPeeringEventURef on_grant, PGPeeringEventURef on_preempt) final { - shard_services.remote_reserver.request_reservation( + shard_services.remote_request_reservation( pgid, on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) { start_peering_event_operation(std::move(*on_grant)); @@ -262,7 +262,7 @@ public: } void cancel_remote_recovery_reservation() final { - shard_services.remote_reserver.cancel_reservation( + shard_services.remote_cancel_reservation( pgid); } diff --git a/src/crimson/osd/pg_shard_manager.cc b/src/crimson/osd/pg_shard_manager.cc new file mode 100644 index 0000000000000..5c00651dbac29 --- /dev/null +++ b/src/crimson/osd/pg_shard_manager.cc @@ -0,0 +1,28 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "crimson/osd/pg_shard_manager.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +PGShardManager::PGShardManager( + OSDMapService &osdmap_service, + const int whoami, + crimson::net::Messenger &cluster_msgr, + crimson::net::Messenger &public_msgr, + crimson::mon::Client &monc, + crimson::mgr::Client &mgrc, + crimson::os::FuturizedStore &store) + : core_state(whoami, osdmap_service, cluster_msgr, public_msgr, + monc, mgrc, store), + local_state(whoami), + shard_services(core_state, local_state) +{} + +} diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h new file mode 100644 index 0000000000000..571f5cdb47e84 --- /dev/null +++ b/src/crimson/osd/pg_shard_manager.h @@ -0,0 +1,52 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include + +#include "crimson/osd/shard_services.h" +#include "crimson/osd/pg_map.h" + +namespace crimson::osd { + +/** + * PGShardManager + * + * Manages all state required to partition PGs over seastar reactors + * as well as state required to route messages to pgs. Mediates access to + * shared resources required by PGs (objectstore, messenger, monclient, + * etc) + */ +class PGShardManager { + CoreState core_state; + PerShardState local_state; + ShardServices shard_services; + +public: + PGShardManager( + OSDMapService &osdmap_service, + const int whoami, + crimson::net::Messenger &cluster_msgr, + crimson::net::Messenger &public_msgr, + crimson::mon::Client &monc, + crimson::mgr::Client &mgrc, + crimson::os::FuturizedStore &store); + + auto &get_shard_services() { return shard_services; } + + void update_map(OSDMapService::cached_map_t map) { + core_state.update_map(map); + local_state.update_map(map); + } + + auto stop_registries() { + return local_state.stop_registry(); + } + + FORWARD_TO_CORE(send_pg_created) +}; + +} diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc index 24d7d00477d6f..76ca1bd8b0dff 100644 --- a/src/crimson/osd/recovery_backend.cc +++ b/src/crimson/osd/recovery_backend.cc @@ -179,7 +179,7 @@ RecoveryBackend::scan_for_backfill( -> interruptible_future<> { crimson::osd::ObjectContextRef obc; if (pg.is_primary()) { - obc = shard_services.obc_registry.maybe_get_cached_obc(object); + obc = shard_services.maybe_get_cached_obc(object); } if (obc) { if (obc->obs.exists) { diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index b9d97b2b283b9..71c3c96ab8520 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -27,67 +27,49 @@ using std::vector; namespace crimson::osd { -ShardServices::ShardServices( - OSDMapService &osdmap_service, - const int whoami, - crimson::net::Messenger &cluster_msgr, - crimson::net::Messenger &public_msgr, - crimson::mon::Client &monc, - crimson::mgr::Client &mgrc, - crimson::os::FuturizedStore &store) - : osdmap_service(osdmap_service), - whoami(whoami), - cluster_msgr(cluster_msgr), - public_msgr(public_msgr), - monc(monc), - mgrc(mgrc), - store(store), - throttler(crimson::common::local_conf()), - obc_registry(crimson::common::local_conf()), - local_reserver( - &cct, - &finisher, - crimson::common::local_conf()->osd_max_backfills, - crimson::common::local_conf()->osd_min_recovery_priority), - remote_reserver( - &cct, - &finisher, - crimson::common::local_conf()->osd_max_backfills, - crimson::common::local_conf()->osd_min_recovery_priority) +PerShardState::PerShardState( + int whoami) + : whoami(whoami), + throttler(crimson::common::local_conf()), + obc_registry(crimson::common::local_conf()) { perf = build_osd_logger(&cct); cct.get_perfcounters_collection()->add(perf); recoverystate_perf = build_recoverystate_perf(&cct); cct.get_perfcounters_collection()->add(recoverystate_perf); - - crimson::common::local_conf().add_observer(this); -} - -const char** ShardServices::get_tracked_conf_keys() const -{ - static const char* KEYS[] = { - "osd_max_backfills", - "osd_min_recovery_priority", - nullptr - }; - return KEYS; } -void ShardServices::handle_conf_change(const ConfigProxy& conf, - const std::set &changed) +CoreState::CoreState( + int whoami, + OSDMapService &osdmap_service, + crimson::net::Messenger &cluster_msgr, + crimson::net::Messenger &public_msgr, + crimson::mon::Client &monc, + crimson::mgr::Client &mgrc, + crimson::os::FuturizedStore &store) + : whoami(whoami), + osdmap_service(osdmap_service), + cluster_msgr(cluster_msgr), + public_msgr(public_msgr), + monc(monc), + mgrc(mgrc), + store(store), + local_reserver( + &cct, + &finisher, + crimson::common::local_conf()->osd_max_backfills, + crimson::common::local_conf()->osd_min_recovery_priority), + remote_reserver( + &cct, + &finisher, + crimson::common::local_conf()->osd_max_backfills, + crimson::common::local_conf()->osd_min_recovery_priority) { - if (changed.count("osd_max_backfills")) { - local_reserver.set_max(conf->osd_max_backfills); - remote_reserver.set_max(conf->osd_max_backfills); - } - if (changed.count("osd_min_recovery_priority")) { - local_reserver.set_min_priority(conf->osd_min_recovery_priority); - remote_reserver.set_min_priority(conf->osd_min_recovery_priority); - } + crimson::common::local_conf().add_observer(this); } -seastar::future<> ShardServices::send_to_osd( +seastar::future<> CoreState::send_to_osd( int peer, MessageURef m, epoch_t from_epoch) { if (osdmap->is_down(peer)) { @@ -104,54 +86,20 @@ seastar::future<> ShardServices::send_to_osd( } } -seastar::future<> ShardServices::dispatch_context_transaction( - crimson::os::CollectionRef col, PeeringCtx &ctx) { - if (ctx.transaction.empty()) { - logger().debug("ShardServices::dispatch_context_transaction: empty transaction"); - return seastar::now(); - } - - logger().debug("ShardServices::dispatch_context_transaction: do_transaction ..."); - auto ret = store.do_transaction( - col, - std::move(ctx.transaction)); - ctx.reset_transaction(); - return ret; -} - -seastar::future<> ShardServices::dispatch_context_messages( - BufferedRecoveryMessages &&ctx) -{ - auto ret = seastar::parallel_for_each(std::move(ctx.message_map), - [this](auto& osd_messages) { - auto& [peer, messages] = osd_messages; - logger().debug("dispatch_context_messages sending messages to {}", peer); - return seastar::parallel_for_each( - std::move(messages), [=, peer=peer](auto& m) { - return send_to_osd(peer, std::move(m), osdmap->get_epoch()); - }); - }); - ctx.message_map.clear(); - return ret; -} - -seastar::future<> ShardServices::dispatch_context( - crimson::os::CollectionRef col, - PeeringCtx &&ctx) +seastar::future<> CoreState::osdmap_subscribe(version_t epoch, bool force_request) { - ceph_assert(col || ctx.transaction.empty()); - return seastar::when_all_succeed( - dispatch_context_messages( - BufferedRecoveryMessages{ctx}), - col ? dispatch_context_transaction(col, ctx) : seastar::now() - ).then_unpack([] { + logger().info("{}({})", __func__, epoch); + if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) || + force_request) { + return monc.renew_subs(); + } else { return seastar::now(); - }); + } } -void ShardServices::queue_want_pg_temp(pg_t pgid, - const vector& want, - bool forced) +void CoreState::queue_want_pg_temp(pg_t pgid, + const vector& want, + bool forced) { auto p = pg_temp_pending.find(pgid); if (p == pg_temp_pending.end() || @@ -161,13 +109,13 @@ void ShardServices::queue_want_pg_temp(pg_t pgid, } } -void ShardServices::remove_want_pg_temp(pg_t pgid) +void CoreState::remove_want_pg_temp(pg_t pgid) { pg_temp_wanted.erase(pgid); pg_temp_pending.erase(pgid); } -void ShardServices::requeue_pg_temp() +void CoreState::requeue_pg_temp() { unsigned old_wanted = pg_temp_wanted.size(); unsigned old_pending = pg_temp_pending.size(); @@ -181,18 +129,7 @@ void ShardServices::requeue_pg_temp() pg_temp_wanted.size()); } -std::ostream& operator<<( - std::ostream& out, - const ShardServices::pg_temp_t& pg_temp) -{ - out << pg_temp.acting; - if (pg_temp.forced) { - out << " (forced)"; - } - return out; -} - -seastar::future<> ShardServices::send_pg_temp() +seastar::future<> CoreState::send_pg_temp() { if (pg_temp_wanted.empty()) return seastar::now(); @@ -218,17 +155,18 @@ seastar::future<> ShardServices::send_pg_temp() }); } -void ShardServices::update_map(cached_map_t new_osdmap) -{ - osdmap = std::move(new_osdmap); -} - -ShardServices::cached_map_t &ShardServices::get_osdmap() +std::ostream& operator<<( + std::ostream& out, + const CoreState::pg_temp_t& pg_temp) { - return osdmap; + out << pg_temp.acting; + if (pg_temp.forced) { + out << " (forced)"; + } + return out; } -seastar::future<> ShardServices::send_pg_created(pg_t pgid) +seastar::future<> CoreState::send_pg_created(pg_t pgid) { logger().debug(__func__); auto o = get_osdmap(); @@ -237,7 +175,7 @@ seastar::future<> ShardServices::send_pg_created(pg_t pgid) return monc.send_message(crimson::make_message(pgid)); } -seastar::future<> ShardServices::send_pg_created() +seastar::future<> CoreState::send_pg_created() { logger().debug(__func__); auto o = get_osdmap(); @@ -248,7 +186,7 @@ seastar::future<> ShardServices::send_pg_created() }); } -void ShardServices::prune_pg_created() +void CoreState::prune_pg_created() { logger().debug(__func__); auto o = get_osdmap(); @@ -265,18 +203,7 @@ void ShardServices::prune_pg_created() } } -seastar::future<> ShardServices::osdmap_subscribe(version_t epoch, bool force_request) -{ - logger().info("{}({})", __func__, epoch); - if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) || - force_request) { - return monc.renew_subs(); - } else { - return seastar::now(); - } -} - -HeartbeatStampsRef ShardServices::get_hb_stamps(int peer) +HeartbeatStampsRef CoreState::get_hb_stamps(int peer) { auto [stamps, added] = heartbeat_stamps.try_emplace(peer); if (added) { @@ -285,7 +212,7 @@ HeartbeatStampsRef ShardServices::get_hb_stamps(int peer) return stamps->second; } -seastar::future<> ShardServices::send_alive(const epoch_t want) +seastar::future<> CoreState::send_alive(const epoch_t want) { logger().info( "{} want={} up_thru_wanted={}", @@ -314,4 +241,73 @@ seastar::future<> ShardServices::send_alive(const epoch_t want) } } +const char** CoreState::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + "osd_max_backfills", + "osd_min_recovery_priority", + nullptr + }; + return KEYS; +} + +void CoreState::handle_conf_change(const ConfigProxy& conf, + const std::set &changed) +{ + if (changed.count("osd_max_backfills")) { + local_reserver.set_max(conf->osd_max_backfills); + remote_reserver.set_max(conf->osd_max_backfills); + } + if (changed.count("osd_min_recovery_priority")) { + local_reserver.set_min_priority(conf->osd_min_recovery_priority); + remote_reserver.set_min_priority(conf->osd_min_recovery_priority); + } +} + +seastar::future<> ShardServices::dispatch_context_transaction( + crimson::os::CollectionRef col, PeeringCtx &ctx) { + if (ctx.transaction.empty()) { + logger().debug("ShardServices::dispatch_context_transaction: empty transaction"); + return seastar::now(); + } + + logger().debug("ShardServices::dispatch_context_transaction: do_transaction ..."); + auto ret = get_store().do_transaction( + col, + std::move(ctx.transaction)); + ctx.reset_transaction(); + return ret; +} + +seastar::future<> ShardServices::dispatch_context_messages( + BufferedRecoveryMessages &&ctx) +{ + auto ret = seastar::parallel_for_each(std::move(ctx.message_map), + [this](auto& osd_messages) { + auto& [peer, messages] = osd_messages; + logger().debug("dispatch_context_messages sending messages to {}", peer); + return seastar::parallel_for_each( + std::move(messages), [=, peer=peer](auto& m) { + return send_to_osd(peer, std::move(m), local_state.osdmap->get_epoch()); + }); + }); + ctx.message_map.clear(); + return ret; +} + +seastar::future<> ShardServices::dispatch_context( + crimson::os::CollectionRef col, + PeeringCtx &&ctx) +{ + ceph_assert(col || ctx.transaction.empty()); + return seastar::when_all_succeed( + dispatch_context_messages( + BufferedRecoveryMessages{ctx}), + col ? dispatch_context_transaction(col, ctx) : seastar::now() + ).then_unpack([] { + return seastar::now(); + }); +} + + }; diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 917c06303865c..d3df2c8f794c5 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -13,7 +13,9 @@ #include "crimson/os/futurized_collection.h" #include "osd/PeeringState.h" #include "crimson/osd/osdmap_service.h" +#include "crimson/osd/osdmap_gate.h" #include "crimson/osd/object_context.h" +#include "crimson/osd/state.h" #include "common/AsyncReserver.h" namespace crimson::net { @@ -39,59 +41,44 @@ class BufferedRecoveryMessages; namespace crimson::osd { /** - * Represents services available to each PG + * PerShardState + * + * Per-shard state holding instances local to each shard. */ -class ShardServices : public md_config_obs_t { - using cached_map_t = boost::local_shared_ptr; - OSDMapService &osdmap_service; - const int whoami; - crimson::net::Messenger &cluster_msgr; - crimson::net::Messenger &public_msgr; - crimson::mon::Client &monc; - crimson::mgr::Client &mgrc; - crimson::os::FuturizedStore &store; +class PerShardState { + friend class ShardServices; + friend class PGShardManager; + const int whoami; crimson::common::CephContext cct; PerfCounters *perf = nullptr; PerfCounters *recoverystate_perf = nullptr; - const char** get_tracked_conf_keys() const final; - void handle_conf_change(const ConfigProxy& conf, - const std::set &changed) final; - -public: - ShardServices( - OSDMapService &osdmap_service, - const int whoami, - crimson::net::Messenger &cluster_msgr, - crimson::net::Messenger &public_msgr, - crimson::mon::Client &monc, - crimson::mgr::Client &mgrc, - crimson::os::FuturizedStore &store); - - seastar::future<> send_to_osd( - int peer, - MessageURef m, - epoch_t from_epoch); + // Op Management + OSDOperationRegistry registry; + OperationThrottler throttler; - crimson::os::FuturizedStore &get_store() { - return store; + OSDMapService::cached_map_t osdmap; + OSDMapService::cached_map_t &get_osdmap() { return osdmap; } + void update_map(OSDMapService::cached_map_t new_osdmap) { + osdmap = std::move(new_osdmap); } - crimson::common::CephContext *get_cct() { - return &cct; - } + crimson::osd::ObjectContextRegistry obc_registry; - // OSDMapService - const OSDMapService &get_osdmap_service() const { - return osdmap_service; + // prevent creating new osd operations when system is shutting down, + // this is necessary because there are chances that a new operation + // is created, after the interruption of all ongoing operations, and + // creats and waits on a new and may-never-resolve future, in which + // case the shutdown may never succeed. + bool stopping = false; + seastar::future<> stop_registry() { + crimson::get_logger(ceph_subsys_osd).info("PerShardState::{}", __func__); + stopping = true; + return registry.stop(); } - // Op Management - OSDOperationRegistry registry; - OperationThrottler throttler; - template auto start_operation(Args&&... args) { if (__builtin_expect(stopping, false)) { @@ -107,50 +94,58 @@ public: return std::make_pair(std::move(op), std::move(fut)); } - seastar::future<> stop() { - crimson::get_logger(ceph_subsys_osd).info("ShardServices::{}", __func__); - stopping = true; - return registry.stop(); - } + PerShardState(int whoami); +}; - // Loggers - PerfCounters &get_recoverystate_perf_logger() { - return *recoverystate_perf; - } - PerfCounters &get_perf_logger() { - return *perf; - } +/** + * CoreState + * + * OSD-wide singleton holding instances that need to be accessible + * from all PGs. + */ +class CoreState : public md_config_obs_t { + friend class ShardServices; + friend class PGShardManager; + CoreState( + int whoami, + OSDMapService &osdmap_service, + crimson::net::Messenger &cluster_msgr, + crimson::net::Messenger &public_msgr, + crimson::mon::Client &monc, + crimson::mgr::Client &mgrc, + crimson::os::FuturizedStore &store); - /// Dispatch and reset ctx transaction - seastar::future<> dispatch_context_transaction( - crimson::os::CollectionRef col, PeeringCtx &ctx); + const int whoami; - /// Dispatch and reset ctx messages - seastar::future<> dispatch_context_messages( - BufferedRecoveryMessages &&ctx); + crimson::common::CephContext cct; - /// Dispatch ctx and dispose of context - seastar::future<> dispatch_context( - crimson::os::CollectionRef col, - PeeringCtx &&ctx); - /// Dispatch ctx and dispose of ctx, transaction must be empty - seastar::future<> dispatch_context( - PeeringCtx &&ctx) { - return dispatch_context({}, std::move(ctx)); + OSDMapService &osdmap_service; + OSDMapService::cached_map_t osdmap; + OSDMapService::cached_map_t &get_osdmap() { return osdmap; } + void update_map(OSDMapService::cached_map_t new_osdmap) { + osdmap = std::move(new_osdmap); } - // -- tids -- - // for ops i issue + crimson::net::Messenger &cluster_msgr; + crimson::net::Messenger &public_msgr; + + seastar::future<> send_to_osd(int peer, MessageURef m, epoch_t from_epoch); + + crimson::mon::Client &monc; + seastar::future<> osdmap_subscribe(version_t epoch, bool force_request); + + crimson::mgr::Client &mgrc; + + crimson::os::FuturizedStore &store; + + // tids for ops i issue unsigned int next_tid{0}; ceph_tid_t get_tid() { return (ceph_tid_t)next_tid++; } - // PG Temp State -private: - // TODO: hook into map processing and some kind of heartbeat/peering - // message processing + // global pg temp state struct pg_temp_t { std::vector acting; bool forced = false; @@ -158,28 +153,14 @@ private: std::map pg_temp_wanted; std::map pg_temp_pending; friend std::ostream& operator<<(std::ostream&, const pg_temp_t&); -public: + void queue_want_pg_temp(pg_t pgid, const std::vector& want, bool forced = false); void remove_want_pg_temp(pg_t pgid); void requeue_pg_temp(); seastar::future<> send_pg_temp(); - // Shard-local OSDMap -private: - cached_map_t osdmap; -public: - void update_map(cached_map_t new_osdmap); - cached_map_t &get_osdmap(); - - // PG Created State -private: - std::set pg_created; -public: - seastar::future<> send_pg_created(pg_t pgid); - seastar::future<> send_pg_created(); - void prune_pg_created(); - + unsigned num_pgs = 0; unsigned get_pg_num() const { return num_pgs; } @@ -190,41 +171,147 @@ public: --num_pgs; } - seastar::future<> osdmap_subscribe(version_t epoch, bool force_request); + std::set pg_created; + seastar::future<> send_pg_created(pg_t pgid); + seastar::future<> send_pg_created(); + void prune_pg_created(); // Time state ceph::mono_time startup_time = ceph::mono_clock::now(); ceph::signedspan get_mnow() const { return ceph::mono_clock::now() - startup_time; } + HeartbeatStampsRef get_hb_stamps(int peer); std::map heartbeat_stamps; - crimson::osd::ObjectContextRegistry obc_registry; - - // Async Reservers -private: - unsigned num_pgs = 0; - struct DirectFinisher { void queue(Context *c) { c->complete(0); } } finisher; - // prevent creating new osd operations when system is shutting down, - // this is necessary because there are chances that a new operation - // is created, after the interruption of all ongoing operations, and - // creats and waits on a new and may-never-resolve future, in which - // case the shutdown may never succeed. - bool stopping = false; -public: AsyncReserver local_reserver; AsyncReserver remote_reserver; -private: epoch_t up_thru_wanted = 0; -public: seastar::future<> send_alive(epoch_t want); + + const char** get_tracked_conf_keys() const final; + void handle_conf_change( + const ConfigProxy& conf, + const std::set &changed) final; +}; + +#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \ + template \ + auto FROM_METHOD(Args&&... args) const { \ + return TARGET.TO_METHOD(std::forward(args)...); \ + } + +#define FORWARD(FROM_METHOD, TO_METHOD, TARGET) \ + template \ + auto FROM_METHOD(Args&&... args) { \ + return TARGET.TO_METHOD(std::forward(args)...); \ + } + +#define FORWARD_TO_LOCAL(METHOD) FORWARD(METHOD, METHOD, local_state) +#define FORWARD_TO_CORE(METHOD) FORWARD(METHOD, METHOD, core_state) + +/** + * Represents services available to each PG + */ +class ShardServices { + using cached_map_t = boost::local_shared_ptr; + + CoreState &core_state; + PerShardState &local_state; +public: + ShardServices( + CoreState &core_state, + PerShardState &local_state) + : core_state(core_state), local_state(local_state) {} + + FORWARD_TO_CORE(send_to_osd) + + crimson::os::FuturizedStore &get_store() { + return core_state.store; + } + + crimson::common::CephContext *get_cct() { + return &(local_state.cct); + } + + // OSDMapService + const OSDMapService &get_osdmap_service() const { + return core_state.osdmap_service; + } + + template + auto start_operation(Args&&... args) { + return local_state.start_operation(std::forward(args)...); + } + + auto &get_registry() { return local_state.registry; } + + // Loggers + PerfCounters &get_recoverystate_perf_logger() { + return *local_state.recoverystate_perf; + } + PerfCounters &get_perf_logger() { + return *local_state.perf; + } + + /// Dispatch and reset ctx transaction + seastar::future<> dispatch_context_transaction( + crimson::os::CollectionRef col, PeeringCtx &ctx); + + /// Dispatch and reset ctx messages + seastar::future<> dispatch_context_messages( + BufferedRecoveryMessages &&ctx); + + /// Dispatch ctx and dispose of context + seastar::future<> dispatch_context( + crimson::os::CollectionRef col, + PeeringCtx &&ctx); + + /// Dispatch ctx and dispose of ctx, transaction must be empty + seastar::future<> dispatch_context( + PeeringCtx &&ctx) { + return dispatch_context({}, std::move(ctx)); + } + + FORWARD_TO_LOCAL(get_osdmap) + FORWARD_TO_CORE(get_pg_num) + FORWARD(with_throttle_while, with_throttle_while, local_state.throttler) + + FORWARD_TO_CORE(osdmap_subscribe) + FORWARD_TO_CORE(get_tid) + FORWARD_TO_CORE(queue_want_pg_temp) + FORWARD_TO_CORE(remove_want_pg_temp) + FORWARD_TO_CORE(requeue_pg_temp) + FORWARD_TO_CORE(send_pg_created) + FORWARD_TO_CORE(inc_pg_num) + FORWARD_TO_CORE(dec_pg_num) + FORWARD_TO_CORE(send_alive) + FORWARD_TO_CORE(send_pg_temp) + FORWARD_CONST(get_mnow, get_mnow, core_state) + FORWARD_TO_CORE(get_hb_stamps) + + FORWARD( + maybe_get_cached_obc, maybe_get_cached_obc, local_state.obc_registry) + FORWARD( + get_cached_obc, get_cached_obc, local_state.obc_registry) + + FORWARD( + local_request_reservation, request_reservation, core_state.local_reserver) + FORWARD( + local_update_priority, update_priority, core_state.local_reserver) + FORWARD( + local_cancel_reservation, cancel_reservation, core_state.local_reserver) + FORWARD( + remote_request_reservation, request_reservation, core_state.remote_reserver) + FORWARD( + remote_cancel_reservation, cancel_reservation, core_state.remote_reserver) }; } -- 2.39.5