From: Samuel Just Date: Thu, 24 Oct 2024 23:39:18 +0000 (-0700) Subject: crimson: manage log submission atomicity independently of pipeline stages X-Git-Tag: v20.0.0~524^2~16 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=54a42bef571b3d83a81e02595dd40edcf5054245;p=ceph.git crimson: manage log submission atomicity independently of pipeline stages It's important to construct and submit log entries atomically because the submission order needs to match the entry numbering. Previously, this occurred under the pg-wide exclusive process stage. Shortly, each obc will have its own process stage, so we need to ensure that atomicity seperately from the pipeline stages. Introduce a simple mutex. Signed-off-by: Samuel Just --- diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc index 7f78457167d03..459c98bb9c0f6 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.cc +++ b/src/crimson/osd/osd_operations/snaptrim_event.cc @@ -414,21 +414,31 @@ SnapTrimObjSubEvent::start() logger().debug("{}: got obc={}", *this, obc_manager.get_obc()->get_oid()); co_await enter_stage(client_pp().process); + auto all_completed = interruptor::now(); + { + // as with PG::submit_executer, we need to build the pg log entries + // and submit the transaction atomically + co_await interruptor::make_interruptible(pg->submit_lock.lock()); + auto unlocker = seastar::defer([this] { + pg->submit_lock.unlock(); + }); - logger().debug("{}: processing obc={}", *this, obc_manager.get_obc()->get_oid()); - - auto txn = co_await remove_or_update( - obc_manager.get_obc(), obc_manager.get_head_obc()); - - auto [submitted, all_completed] = co_await pg->submit_transaction( - ObjectContextRef(obc_manager.get_obc()), - nullptr, - std::move(txn), - std::move(osd_op_p), - std::move(log_entries) - ); - - co_await std::move(submitted); + logger().debug("{}: calling remove_or_update obc={}", + *this, obc_manager.get_obc()->get_oid()); + + auto txn = co_await remove_or_update( + obc_manager.get_obc(), obc_manager.get_head_obc()); + + auto submitted = interruptor::now(); + std::tie(submitted, all_completed) = co_await pg->submit_transaction( + ObjectContextRef(obc_manager.get_obc()), + nullptr, + std::move(txn), + std::move(osd_op_p), + std::move(log_entries) + ); + co_await std::move(submitted); + } co_await enter_stage(client_pp().wait_repop); diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index a276b0e43661a..c677961fe0fc4 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1039,6 +1039,12 @@ PG::interruptible_future PG::submit_error_log( const std::error_code e, ceph_tid_t rep_tid) { + // as with submit_executer, need to ensure that log numbering and submission + // are atomic + co_await interruptor::make_interruptible(submit_lock.lock()); + auto unlocker = seastar::defer([this] { + submit_lock.unlock(); + }); LOG_PREFIX(PG::submit_error_log); DEBUGDPP("{} rep_tid: {} error: {}", *this, *m, rep_tid, e); @@ -1156,8 +1162,15 @@ PG::submit_executer_fut PG::submit_executer( OpsExecuter &&ox, const std::vector& ops) { LOG_PREFIX(PG::submit_executer); - // transaction must commit at this point - return std::move( + + // we need to build the pg log entries and submit the transaction + // atomically to ensure log ordering + co_await interruptor::make_interruptible(submit_lock.lock()); + auto unlocker = seastar::defer([this] { + submit_lock.unlock(); + }); + + auto [submitted, completed] = co_await std::move( ox ).flush_changes_n_do_ops_effects( ops, @@ -1177,6 +1190,10 @@ PG::submit_executer_fut PG::submit_executer( std::move(osd_op_p), std::move(log_entries)); }); + + co_return std::make_tuple( + std::move(submitted).then_interruptible([unlocker=std::move(unlocker)] {}), + std::move(completed)); } PG::interruptible_future> PG::do_pg_ops(Ref m) diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 15aeec0e4f35c..2adaf69f26b30 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -663,6 +663,7 @@ private: const OpInfo &op_info, std::vector& ops); + seastar::shared_mutex submit_lock; using submit_executer_ret = std::tuple< interruptible_future<>, interruptible_future<>>;