From: Samuel Just Date: Tue, 2 Jul 2024 22:27:55 +0000 (-0700) Subject: crimson/osd: execute PGListener async operations in order X-Git-Tag: v19.1.1~10^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=ae47b302d6495ce649efec0c0ff9e22bab65a7f0;p=ceph.git crimson/osd: execute PGListener async operations in order - Adds ShardServices::singleton_orderer_t mechanism to ensure that OSDSingleton calls are completed in order. - Updates ShardServices accessors invoked from PeeringListener handlers to use orderer. - Updates PGListener handlers and complete_rctx to use orderer. Fixes: https://tracker.ceph.com/issues/66316 Signed-off-by: Samuel Just (cherry picked from commit e12e92c50fdd0288a7bfcf43c60fba5b938914a1) --- diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index d65535c6d0844..3d588466e0b41 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -166,17 +166,37 @@ public: return std::size(snap_trimq); } + /** + * complete_rctx + * + * complete_rctx is responsible for submitting writes and messages + * resulting from processing a PeeringState event as well as resolving + * any asyncronous actions initiated by the PeeringState::Listener + * callbacks below. The caller is responsible for calling complete_rctx + * and waiting for the future to resolve before exiting the + * PGPeeringPipeline::process stage (see osd_operations/peering_event.h). + * + * orderer below ensures that operations submitted on the OSD-wide + * OSDSingleton instance are completed in the order initiated. This is + * specifically important for operations on the local and remote async + * reserver instances, as well as setting and clearing pg_temp mapping + * requests. + */ + ShardServices::singleton_orderer_t orderer; seastar::future<> complete_rctx(PeeringCtx &&rctx) { - return seastar::when_all_succeed( - get_need_up_thru() - ? shard_services.send_alive( - get_same_interval_since()) - : seastar::now(), + shard_services.send_pg_temp(orderer); + if (get_need_up_thru()) { + shard_services.send_alive(orderer, get_same_interval_since()); + } + + ShardServices::singleton_orderer_t o; + std::swap(o, orderer); + return seastar::when_all( shard_services.dispatch_context( get_collection_ref(), std::move(rctx)), - shard_services.send_pg_temp() - ).then([](auto){}); + shard_services.run_orderer(std::move(o)) + ).then([](auto) {}); } void send_cluster_message( @@ -186,13 +206,21 @@ public: SUBDEBUGDPP( osd, "message {} to {} share_map_update {}", *this, *m, osd, share_map_update); - (void)shard_services.send_to_osd(osd, std::move(m), epoch); + /* We don't bother to queue this one in the orderer because capturing the + * message ref in std::function is problematic as it isn't copyable. This + * is solvable, but it's not quite worth the effort at the moment as we + * aren't worried about ordering of message send events except between + * messages to the same target within an interval, which doesn't really + * happen while processing a single event. It'll probably be worth + * generalizing the orderer structure to fix this in the future, probably + * by using std::move_only_function once widely available. */ + std::ignore = shard_services.send_to_osd(osd, std::move(m), epoch); } void send_pg_created(pg_t pgid) final { LOG_PREFIX(PG::send_pg_created); SUBDEBUGDPP(osd, "pgid {}", *this, pgid); - (void)shard_services.send_pg_created(pgid); + shard_services.send_pg_created(orderer, pgid); } bool try_flush_or_schedule_async() final; @@ -237,9 +265,8 @@ public: SUBDEBUGDPP( osd, "priority {} on_grant {} on_preempt {}", *this, on_grant->get_desc(), on_preempt->get_desc()); - // TODO -- we probably want to add a mechanism for blocking on this - // after handling the peering event - std::ignore = shard_services.local_request_reservation( + shard_services.local_request_reservation( + orderer, pgid, on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) { start_peering_event_operation(std::move(*on_grant)); @@ -247,17 +274,17 @@ public: priority, on_preempt ? make_lambda_context( [this, on_preempt=std::move(on_preempt)] (int) { - start_peering_event_operation(std::move(*on_preempt)); - }) : nullptr); + start_peering_event_operation(std::move(*on_preempt)); + }) : nullptr + ); } void update_local_background_io_priority( unsigned priority) final { LOG_PREFIX(PG::update_local_background_io_priority); SUBDEBUGDPP(osd, "priority {}", *this, priority); - // TODO -- we probably want to add a mechanism for blocking on this - // after handling the peering event - std::ignore = shard_services.local_update_priority( + shard_services.local_update_priority( + orderer, pgid, priority); } @@ -265,9 +292,8 @@ public: void cancel_local_background_io_reservation() final { LOG_PREFIX(PG::cancel_local_background_io_reservation); SUBDEBUGDPP(osd, "", *this); - // TODO -- we probably want to add a mechanism for blocking on this - // after handling the peering event - std::ignore = shard_services.local_cancel_reservation( + shard_services.local_cancel_reservation( + orderer, pgid); } @@ -279,9 +305,8 @@ public: SUBDEBUGDPP( osd, "priority {} on_grant {} on_preempt {}", *this, on_grant->get_desc(), on_preempt->get_desc()); - // TODO -- we probably want to add a mechanism for blocking on this - // after handling the peering event - std::ignore = shard_services.remote_request_reservation( + shard_services.remote_request_reservation( + orderer, pgid, on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) { start_peering_event_operation(std::move(*on_grant)); @@ -289,17 +314,15 @@ public: priority, on_preempt ? make_lambda_context( [this, on_preempt=std::move(on_preempt)] (int) { - start_peering_event_operation(std::move(*on_preempt)); - }) : nullptr); + start_peering_event_operation(std::move(*on_preempt)); + }) : nullptr + ); } void cancel_remote_recovery_reservation() final { LOG_PREFIX(PG::cancel_remote_recovery_reservation); SUBDEBUGDPP(osd, "", *this); - // TODO -- we probably want to add a mechanism for blocking on this - // after handling the peering event - std::ignore = shard_services.remote_cancel_reservation( - pgid); + shard_services.remote_cancel_reservation(orderer, pgid); } void schedule_event_on_commit( @@ -326,16 +349,12 @@ public: void queue_want_pg_temp(const std::vector &wanted) final { LOG_PREFIX(PG::queue_want_pg_temp); SUBDEBUGDPP(osd, "wanted {}", *this, wanted); - // TODO -- we probably want to add a mechanism for blocking on this - // after handling the peering event - std::ignore = shard_services.queue_want_pg_temp(pgid.pgid, wanted); + shard_services.queue_want_pg_temp(orderer, pgid.pgid, wanted); } void clear_want_pg_temp() final { LOG_PREFIX(PG::clear_want_pg_temp); SUBDEBUGDPP(osd, "", *this); - // TODO -- we probably want to add a mechanism for blocking on this - // after handling the peering event - std::ignore = shard_services.remove_want_pg_temp(pgid.pgid); + shard_services.remove_want_pg_temp(orderer, pgid.pgid); } void check_recovery_sources(const OSDMapRef& newmap) final { // Not needed yet diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 7163852a7adc3..fb86418aba2a6 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -359,6 +359,60 @@ class ShardServices : public OSDMapService { ); } +public: + /** + * singleton_orderer_t + * + * schedule_for_singleton/run_orderer allows users to queue a sequence + * of operations on the OSDSingleton instance and run them as an ordered + * batch. The user may rely on operations being completed in the order + * submitted. + * + * Generally, users will declare a singleton_orderer_t instance, pass it + * by reference to ShardServices methods implemented with + * schedule_for_singleton or the QUEUE_FOR_OSD_SINGLETON_* macros, + * and finally call run_orderer to submit the batch. + */ + struct singleton_orderer_t { + using remote_func_t = std::function(OSDSingletonState&)>; + std::vector queue; + + singleton_orderer_t() = default; + singleton_orderer_t(singleton_orderer_t &&) = default; + singleton_orderer_t &operator=(singleton_orderer_t &&) = default; + + singleton_orderer_t(const singleton_orderer_t &) = delete; + singleton_orderer_t &operator=(const singleton_orderer_t &) = delete; + }; + + seastar::future<> run_orderer(singleton_orderer_t &&orderer) { + return with_singleton([](auto &singleton, auto &&orderer) { + return seastar::do_with( + std::move(orderer), + [&singleton](auto &orderer) { + return seastar::do_for_each( + orderer.queue, + [&singleton](auto &func) { + return std::invoke(func, singleton); + }); + }); + }, std::move(orderer)); + } + +private: + template + void schedule_for_singleton( + singleton_orderer_t &orderer, F &&f, Args&&... args) { + orderer.queue.push_back( + [f=std::forward(f), + args=std::make_tuple( + std::forward(args)...)](OSDSingletonState &state) -> seastar::future<> { + return seastar::futurize_apply<>( + std::move(f), + std::tuple_cat(std::make_tuple(std::ref(state)), std::move(args))); + }); + } + #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \ template \ auto FROM_METHOD(Args&&... args) const { \ @@ -387,6 +441,19 @@ class ShardServices : public OSDMapService { #define FORWARD_TO_OSD_SINGLETON(METHOD) \ FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, METHOD) +#define QUEUE_FOR_OSD_SINGLETON_TARGET(METHOD, TARGET) \ + template \ + auto METHOD(singleton_orderer_t &orderer, Args&&... args) { \ + return schedule_for_singleton( \ + orderer, \ + [](auto &local_state, auto&&... args) { \ + return local_state.TARGET( \ + std::forward(args)...); \ + }, std::forward(args)...); \ + } +#define QUEUE_FOR_OSD_SINGLETON(METHOD) \ + QUEUE_FOR_OSD_SINGLETON_TARGET(METHOD, METHOD) + public: template ShardServices( @@ -527,12 +594,12 @@ public: FORWARD_TO_OSD_SINGLETON(send_incremental_map_to_osd) FORWARD_TO_OSD_SINGLETON(osdmap_subscribe) - FORWARD_TO_OSD_SINGLETON(queue_want_pg_temp) - FORWARD_TO_OSD_SINGLETON(remove_want_pg_temp) + QUEUE_FOR_OSD_SINGLETON(queue_want_pg_temp) + QUEUE_FOR_OSD_SINGLETON(remove_want_pg_temp) FORWARD_TO_OSD_SINGLETON(requeue_pg_temp) - FORWARD_TO_OSD_SINGLETON(send_pg_created) - FORWARD_TO_OSD_SINGLETON(send_alive) - FORWARD_TO_OSD_SINGLETON(send_pg_temp) + QUEUE_FOR_OSD_SINGLETON(send_pg_created) + QUEUE_FOR_OSD_SINGLETON(send_alive) + QUEUE_FOR_OSD_SINGLETON(send_pg_temp) FORWARD_TO_LOCAL_CONST(get_mnow) FORWARD_TO_LOCAL(get_hb_stamps) FORWARD_TO_LOCAL(update_shard_superblock) @@ -544,41 +611,58 @@ public: snap_dump_reservations, snap_reserver.dump) - auto local_update_priority(spg_t pgid, unsigned newprio) { + + auto local_update_priority( + singleton_orderer_t &orderer, + spg_t pgid, unsigned newprio) { LOG_PREFIX(ShardServices::local_update_priority); SUBDEBUG(osd, "sending to singleton pgid {} newprio {}", pgid, newprio); - return with_singleton([FNAME, pgid, newprio](auto &singleton) { - SUBDEBUG(osd, "on singleton pgid {} newprio {}", pgid, newprio); - return singleton.local_reserver.update_priority(pgid, newprio); - }); + return schedule_for_singleton( + orderer, + [FNAME, pgid, newprio](auto &singleton) { + SUBDEBUG(osd, "on singleton pgid {} newprio {}", pgid, newprio); + return singleton.local_reserver.update_priority(pgid, newprio); + }); } - auto local_cancel_reservation(spg_t pgid) { + auto local_cancel_reservation( + singleton_orderer_t &orderer, + spg_t pgid) { LOG_PREFIX(ShardServices::local_cancel_reservation); SUBDEBUG(osd, "sending to singleton pgid {}", pgid); - return with_singleton([FNAME, pgid](auto &singleton) { - SUBDEBUG(osd, "on singleton pgid {}", pgid); - return singleton.local_reserver.cancel_reservation(pgid); - }); + return schedule_for_singleton( + orderer, + [FNAME, pgid](auto &singleton) { + SUBDEBUG(osd, "on singleton pgid {}", pgid); + return singleton.local_reserver.cancel_reservation(pgid); + }); } FORWARD_TO_OSD_SINGLETON_TARGET( local_dump_reservations, local_reserver.dump) - auto remote_update_priority(spg_t pgid, unsigned newprio) { + auto remote_update_priority( + singleton_orderer_t &orderer, + spg_t pgid, unsigned newprio) { LOG_PREFIX(ShardServices::remote_update_priority); SUBDEBUG(osd, "sending to singleton pgid {} newprio {}", pgid, newprio); - return with_singleton([FNAME, pgid, newprio](auto &singleton) { - SUBDEBUG(osd, "on singleton pgid {} newprio {}", pgid, newprio); - return singleton.remote_reserver.update_priority(pgid, newprio); - }); + return schedule_for_singleton( + orderer, + [FNAME, pgid, newprio](auto &singleton) { + SUBDEBUG(osd, "on singleton pgid {} newprio {}", pgid, newprio); + return singleton.remote_reserver.update_priority(pgid, newprio); + }); } - auto remote_cancel_reservation(spg_t pgid) { + auto remote_cancel_reservation( + singleton_orderer_t &orderer, + spg_t pgid) { LOG_PREFIX(ShardServices::remote_cancel_reservation); SUBDEBUG(osd, "sending to singleton pgid {}", pgid); - return with_singleton([FNAME, pgid](auto &singleton) { - SUBDEBUG(osd, "on singleton pgid {}", pgid); - return singleton.remote_reserver.cancel_reservation(pgid); - }); + return schedule_for_singleton( + orderer, + [FNAME, pgid](auto &singleton) { + SUBDEBUG(osd, "on singleton pgid {}", pgid); + return singleton.remote_reserver.cancel_reservation(pgid); + }); } FORWARD_TO_OSD_SINGLETON_TARGET( remote_dump_reservations, @@ -594,14 +678,16 @@ public: }); }); } - seastar::future<> local_request_reservation( + void local_request_reservation( + singleton_orderer_t &orderer, spg_t item, Context *on_reserved, unsigned prio, Context *on_preempt) { LOG_PREFIX(ShardServices::local_request_reservation); SUBDEBUG(osd, "sending to singleton pgid {} prio {}", item, prio); - return with_singleton( + return schedule_for_singleton( + orderer, [FNAME, item, prio]( OSDSingletonState &singleton, Context *wrapped_on_reserved, Context *wrapped_on_preempt) { @@ -615,14 +701,16 @@ public: invoke_context_on_core(seastar::this_shard_id(), on_reserved), invoke_context_on_core(seastar::this_shard_id(), on_preempt)); } - seastar::future<> remote_request_reservation( + void remote_request_reservation( + singleton_orderer_t &orderer, spg_t item, Context *on_reserved, unsigned prio, Context *on_preempt) { LOG_PREFIX(ShardServices::remote_request_reservation); SUBDEBUG(osd, "sending to singleton pgid {} prio {}", item, prio); - return with_singleton( + return schedule_for_singleton( + orderer, [FNAME, item, prio]( OSDSingletonState &singleton, Context *wrapped_on_reserved, Context *wrapped_on_preempt) { @@ -636,11 +724,13 @@ public: invoke_context_on_core(seastar::this_shard_id(), on_reserved), invoke_context_on_core(seastar::this_shard_id(), on_preempt)); } - seastar::future<> snap_request_reservation( + void snap_request_reservation( + singleton_orderer_t &orderer, spg_t item, Context *on_reserved, unsigned prio) { - return with_singleton( + return schedule_for_singleton( + orderer, [item, prio](OSDSingletonState &singleton, Context *wrapped_on_reserved) { return singleton.snap_reserver.request_reservation(