From ce65d7e6fa13bb8ee88ac94431b7b57627b99470 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 13 Sep 2022 19:09:58 -0700 Subject: [PATCH] crimson/osd: shard PerShardState across cores Signed-off-by: Samuel Just --- src/crimson/osd/pg.h | 28 +++++-- src/crimson/osd/pg_shard_manager.cc | 54 +++++++++---- src/crimson/osd/pg_shard_manager.h | 82 +++++++++++++------ src/crimson/osd/shard_services.cc | 4 +- src/crimson/osd/shard_services.h | 119 +++++++++++++++++++++------- 5 files changed, 207 insertions(+), 80 deletions(-) diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 3a9b709b019..98e2d2103bc 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -220,7 +220,9 @@ public: unsigned priority, PGPeeringEventURef on_grant, PGPeeringEventURef on_preempt) final { - shard_services.local_request_reservation( + // TODO -- we probably want to add a mechanism for blocking on this + // after handling the peering event + std::ignore = shard_services.local_request_reservation( pgid, on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) { start_peering_event_operation(std::move(*on_grant)); @@ -234,13 +236,17 @@ public: void update_local_background_io_priority( unsigned priority) final { - shard_services.local_update_priority( + // TODO -- we probably want to add a mechanism for blocking on this + // after handling the peering event + std::ignore = shard_services.local_update_priority( pgid, priority); } void cancel_local_background_io_reservation() final { - shard_services.local_cancel_reservation( + // TODO -- we probably want to add a mechanism for blocking on this + // after handling the peering event + std::ignore = shard_services.local_cancel_reservation( pgid); } @@ -248,7 +254,9 @@ public: unsigned priority, PGPeeringEventURef on_grant, PGPeeringEventURef on_preempt) final { - shard_services.remote_request_reservation( + // TODO -- we probably want to add a mechanism for blocking on this + // after handling the peering event + std::ignore = shard_services.remote_request_reservation( pgid, on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) { start_peering_event_operation(std::move(*on_grant)); @@ -261,7 +269,9 @@ public: } void cancel_remote_recovery_reservation() final { - shard_services.remote_cancel_reservation( + // TODO -- we probably want to add a mechanism for blocking on this + // after handling the peering event + std::ignore = shard_services.remote_cancel_reservation( pgid); } @@ -285,10 +295,14 @@ public: // Not needed yet } void queue_want_pg_temp(const std::vector &wanted) final { - shard_services.queue_want_pg_temp(pgid.pgid, wanted); + // TODO -- we probably want to add a mechanism for blocking on this + // after handling the peering event + std::ignore = shard_services.queue_want_pg_temp(pgid.pgid, wanted); } void clear_want_pg_temp() final { - shard_services.remove_want_pg_temp(pgid.pgid); + // TODO -- we probably want to add a mechanism for blocking on this + // after handling the peering event + std::ignore = shard_services.remove_want_pg_temp(pgid.pgid); } void check_recovery_sources(const OSDMapRef& newmap) final { // Not needed yet diff --git a/src/crimson/osd/pg_shard_manager.cc b/src/crimson/osd/pg_shard_manager.cc index fef77d6166a..84b17e721f9 100644 --- a/src/crimson/osd/pg_shard_manager.cc +++ b/src/crimson/osd/pg_shard_manager.cc @@ -20,19 +20,24 @@ seastar::future<> PGShardManager::start( crimson::mgr::Client &mgrc, crimson::os::FuturizedStore &store) { - osd_singleton_state.reset( - new OSDSingletonState(whoami, cluster_msgr, public_msgr, - monc, mgrc)); - shard_services.reset( - new ShardServices(*osd_singleton_state, whoami, store)); - return seastar::now(); + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return osd_singleton_state.start_single( + whoami, std::ref(cluster_msgr), std::ref(public_msgr), + std::ref(monc), std::ref(mgrc) + ).then([this, whoami, &store] { + ceph::mono_time startup_time = ceph::mono_clock::now(); + return shard_services.start( + std::ref(osd_singleton_state), whoami, startup_time, std::ref(store)); + }); } seastar::future<> PGShardManager::stop() { - shard_services.reset(); - osd_singleton_state.reset(); - return seastar::now(); + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return shard_services.stop( + ).then([this] { + return osd_singleton_state.stop(); + }); } seastar::future<> PGShardManager::load_pgs() @@ -76,29 +81,44 @@ seastar::future<> PGShardManager::load_pgs() seastar::future<> PGShardManager::stop_pgs() { - return get_local_state().stop_pgs(); + return shard_services.invoke_on_all([](auto &local_service) { + return local_service.local_state.stop_pgs(); + }); } seastar::future> PGShardManager::get_pg_stats() const { - return seastar::make_ready_future>( - get_local_state().get_pg_stats()); + return shard_services.map_reduce0( + [](auto &local) { + return local.local_state.get_pg_stats(); + }, + std::map(), + [](auto &&left, auto &&right) { + left.merge(std::move(right)); + return std::move(left); + }); } seastar::future<> PGShardManager::broadcast_map_to_pgs(epoch_t epoch) { - return get_local_state().broadcast_map_to_pgs( - get_shard_services(), epoch - ).then([this, epoch] { + return shard_services.invoke_on_all([epoch](auto &local_service) { + return local_service.local_state.broadcast_map_to_pgs( + local_service, epoch + ); + }).then([this, epoch] { get_osd_singleton_state().osdmap_gate.got_map(epoch); return seastar::now(); }); } seastar::future<> PGShardManager::set_up_epoch(epoch_t e) { - local_state.set_up_epoch(e); - return seastar::now(); + return shard_services.invoke_on_all( + seastar::smp_submit_to_options{}, + [e](auto &local_service) { + local_service.local_state.set_up_epoch(e); + return seastar::now(); + }); } } diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index 2135719fa06..66b289a073e 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -21,8 +21,8 @@ namespace crimson::osd { * etc) */ class PGShardManager { - std::unique_ptr osd_singleton_state; - std::unique_ptr shard_services; + seastar::sharded osd_singleton_state; + seastar::sharded shard_services; #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \ template \ @@ -54,22 +54,53 @@ public: crimson::os::FuturizedStore &store); seastar::future<> stop(); - auto &get_osd_singleton_state() { return *osd_singleton_state; } - auto &get_osd_singleton_state() const { return *osd_singleton_state; } - auto &get_shard_services() { return *shard_services; } - auto &get_shard_services() const { return *shard_services; } - auto &get_local_state() { return shard_services->local_state; } - auto &get_local_state() const { return shard_services->local_state; } + auto &get_osd_singleton_state() { + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return osd_singleton_state.local(); + } + auto &get_osd_singleton_state() const { + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return osd_singleton_state.local(); + } + auto &get_shard_services() { + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return shard_services.local(); + } + auto &get_shard_services() const { + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return shard_services.local(); + } + auto &get_local_state() { return get_shard_services().local_state; } + auto &get_local_state() const { return get_shard_services().local_state; } seastar::future<> update_map(local_cached_map_t &&map) { - auto fmap = make_local_shared_foreign(std::move(map)); - get_osd_singleton_state().update_map(fmap); - get_local_state().update_map(std::move(fmap)); - return seastar::now(); + get_osd_singleton_state().update_map( + make_local_shared_foreign(local_cached_map_t(map)) + ); + /* We need each core to get its own foreign_ptr. + * foreign_ptr can't be cheaply copied, so we make one for each core + * up front. */ + return seastar::do_with( + std::vector>(), + [this, map](auto &fmaps) { + fmaps.resize(seastar::smp::count); + for (auto &i: fmaps) { + i = seastar::foreign_ptr(map); + } + return shard_services.invoke_on_all( + [&fmaps](auto &local) mutable { + local.local_state.update_map( + make_local_shared_foreign( + std::move(fmaps[seastar::this_shard_id()]) + )); + }); + }); } - auto stop_registries() { - return get_local_state().stop_registry(); + seastar::future<> stop_registries() { + return shard_services.invoke_on_all([](auto &local) { + return local.local_state.stop_registry(); + }); } FORWARD_TO_OSD_SINGLETON(send_pg_created) @@ -106,14 +137,11 @@ public: template auto with_remote_shard_state(core_id_t core, F &&f) { - ceph_assert(core == 0); - auto &local_state_ref = get_local_state(); - auto &shard_services_ref = get_shard_services(); - return seastar::smp::submit_to( - core, - [f=std::forward(f), &local_state_ref, &shard_services_ref]() mutable { + return shard_services.invoke_on( + core, [f=std::move(f)](auto &target_shard_services) mutable { return std::invoke( - std::move(f), local_state_ref, shard_services_ref); + std::move(f), target_shard_services.local_state, + target_shard_services); }); } @@ -202,10 +230,14 @@ public: */ template seastar::future<> for_each_pg(F &&f) const { - for (auto &&pg: get_local_state().pg_map.get_pgs()) { - std::apply(f, pg); - } - return seastar::now(); + return sharded_map_seq( + shard_services, + [f=std::move(f)](const auto &local_service) mutable { + for (auto &pg: local_service.local_state.pg_map.get_pgs()) { + std::apply(f, pg); + } + return seastar::now(); + }); } auto get_num_pgs() const { diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index aadc55fea95..f9f0cb393b0 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -35,6 +35,7 @@ namespace crimson::osd { PerShardState::PerShardState( int whoami, + ceph::mono_time startup_time, crimson::os::FuturizedStore &store) : whoami(whoami), store(store), @@ -42,7 +43,8 @@ PerShardState::PerShardState( obc_registry(crimson::common::local_conf()), next_tid( static_cast(seastar::this_shard_id()) << - (std::numeric_limits::digits - 8)) + (std::numeric_limits::digits - 8)), + startup_time(startup_time) { perf = build_osd_logger(&cct); cct.get_perfcounters_collection()->add(perf); diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 54b62dcb2d7..b5f9a388c61 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -45,6 +45,9 @@ class BufferedRecoveryMessages; namespace crimson::osd { +// seastar::sharded puts start_single on core 0 +constexpr core_id_t PRIMARY_CORE = 0; + class PGShardManager; /** @@ -132,9 +135,16 @@ class PerShardState { HeartbeatStampsRef get_hb_stamps(int peer); std::map heartbeat_stamps; + // Time state + const ceph::mono_time startup_time; + ceph::signedspan get_mnow() const { + return ceph::mono_clock::now() - startup_time; + } + public: PerShardState( int whoami, + ceph::mono_time startup_time, crimson::os::FuturizedStore &store); }; @@ -158,6 +168,7 @@ public: crimson::mon::Client &monc, crimson::mgr::Client &mgrc); +private: const int whoami; crimson::common::CephContext cct; @@ -214,19 +225,13 @@ public: seastar::future<> send_pg_temp(); // TODO: add config to control mapping - PGShardMapping pg_to_shard_mapping{0, 1}; + PGShardMapping pg_to_shard_mapping{0, seastar::smp::count}; std::set pg_created; seastar::future<> send_pg_created(pg_t pgid); seastar::future<> send_pg_created(); void prune_pg_created(); - // Time state - ceph::mono_time startup_time = ceph::mono_clock::now(); - ceph::signedspan get_mnow() const { - return ceph::mono_clock::now() - startup_time; - } - struct DirectFinisher { void queue(Context *c) { c->complete(0); @@ -263,7 +268,16 @@ class ShardServices : public OSDMapService { using local_cached_map_t = OSDMapService::local_cached_map_t; PerShardState local_state; - OSDSingletonState &osd_singleton_state; + seastar::sharded &osd_singleton_state; + + template + auto with_singleton(F &&f, Args&&... args) { + return osd_singleton_state.invoke_on( + PRIMARY_CORE, + std::forward(f), + std::forward(args)... + ); + } #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \ template \ @@ -278,17 +292,23 @@ class ShardServices : public OSDMapService { } #define FORWARD_TO_LOCAL(METHOD) FORWARD(METHOD, METHOD, local_state) -#define FORWARD_TO_OSD_SINGLETON(METHOD) \ - FORWARD(METHOD, METHOD, osd_singleton_state) - template - auto with_singleton(F &&f, Args&&... args) { - return std::invoke(f, osd_singleton_state, std::forward(args)...); +#define FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, TARGET) \ + template \ + auto METHOD(Args&&... args) { \ + return with_singleton( \ + [](auto &local_state, auto&&... args) { \ + return local_state.TARGET( \ + std::forward(args)...); \ + }, std::forward(args)...); \ } +#define FORWARD_TO_OSD_SINGLETON(METHOD) \ + FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, METHOD) + public: template ShardServices( - OSDSingletonState &osd_singleton_state, + seastar::sharded &osd_singleton_state, PSSArgs&&... args) : local_state(std::forward(args)...), osd_singleton_state(osd_singleton_state) {} @@ -387,7 +407,7 @@ public: FORWARD_TO_OSD_SINGLETON(send_pg_created) FORWARD_TO_OSD_SINGLETON(send_alive) FORWARD_TO_OSD_SINGLETON(send_pg_temp) - FORWARD_CONST(get_mnow, get_mnow, osd_singleton_state) + FORWARD_CONST(get_mnow, get_mnow, local_state) FORWARD_TO_LOCAL(get_hb_stamps) FORWARD(pg_created, pg_created, local_state.pg_map) @@ -397,21 +417,60 @@ public: FORWARD( get_cached_obc, get_cached_obc, local_state.obc_registry) - FORWARD( - local_request_reservation, request_reservation, - osd_singleton_state.local_reserver) - FORWARD( - local_update_priority, update_priority, - osd_singleton_state.local_reserver) - FORWARD( - local_cancel_reservation, cancel_reservation, - osd_singleton_state.local_reserver) - FORWARD( - remote_request_reservation, request_reservation, - osd_singleton_state.remote_reserver) - FORWARD( - remote_cancel_reservation, cancel_reservation, - osd_singleton_state.remote_reserver) + FORWARD_TO_OSD_SINGLETON_TARGET( + local_update_priority, + local_reserver.update_priority) + FORWARD_TO_OSD_SINGLETON_TARGET( + local_cancel_reservation, + local_reserver.cancel_reservation) + FORWARD_TO_OSD_SINGLETON_TARGET( + remote_cancel_reservation, + remote_reserver.cancel_reservation) + + Context *invoke_context_on_core(core_id_t core, Context *c) { + if (!c) return nullptr; + return new LambdaContext([core, c](int code) { + std::ignore = seastar::smp::submit_to( + core, + [c, code] { + c->complete(code); + }); + }); + } + seastar::future<> local_request_reservation( + spg_t item, + Context *on_reserved, + unsigned prio, + Context *on_preempt) { + return with_singleton( + [item, prio](OSDSingletonState &singleton, + Context *wrapped_on_reserved, Context *wrapped_on_preempt) { + return singleton.local_reserver.request_reservation( + item, + wrapped_on_reserved, + prio, + wrapped_on_preempt); + }, + invoke_context_on_core(seastar::this_shard_id(), on_reserved), + invoke_context_on_core(seastar::this_shard_id(), on_preempt)); + } + seastar::future<> remote_request_reservation( + spg_t item, + Context *on_reserved, + unsigned prio, + Context *on_preempt) { + return with_singleton( + [item, prio](OSDSingletonState &singleton, + Context *wrapped_on_reserved, Context *wrapped_on_preempt) { + return singleton.remote_reserver.request_reservation( + item, + wrapped_on_reserved, + prio, + wrapped_on_preempt); + }, + invoke_context_on_core(seastar::this_shard_id(), on_reserved), + invoke_context_on_core(seastar::this_shard_id(), on_preempt)); + } #undef FORWARD_CONST #undef FORWARD -- 2.39.5