From a4edc9db2dc47c46ce83329c6620cb7d2c863de4 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 20 Aug 2025 01:48:44 +0000 Subject: [PATCH] crimson/.../seastore: simplify do_transaction_no_callbacks Signed-off-by: Samuel Just --- src/crimson/os/seastore/seastore.cc | 134 ++++++++++++++++------------ src/crimson/os/seastore/seastore.h | 54 ----------- 2 files changed, 76 insertions(+), 112 deletions(-) diff --git a/src/crimson/os/seastore/seastore.cc b/src/crimson/os/seastore/seastore.cc index 196c6bd82752d..1acaf268c96ce 100644 --- a/src/crimson/os/seastore/seastore.cc +++ b/src/crimson/os/seastore/seastore.cc @@ -1521,70 +1521,88 @@ seastar::future<> SeaStore::Shard::do_transaction_no_callbacks( CollectionRef _ch, ceph::os::Transaction&& _t) { + LOG_PREFIX(SeaStoreS::do_transaction_no_callbacks); ++(shard_stats.io_num); ++(shard_stats.pending_io_num); ++(shard_stats.starting_io_num); - // repeat_with_internal_context ensures ordering via collection lock - auto num_bytes = _t.get_num_bytes(); - return repeat_with_internal_context( - _ch, - std::move(_t), - Transaction::src_t::MUTATE, - "do_transaction", - op_type_t::DO_TRANSACTION, - [this, num_bytes](auto &ctx) { - LOG_PREFIX(SeaStoreS::do_transaction_no_callbacks); - return with_trans_intr(*ctx.transaction, [&ctx, this, FNAME, num_bytes](auto &t) { - DEBUGT("cid={}, {} operations, 0x{:x} bytes, {} colls, {} objects ...", - t, ctx.ch->get_cid(), - ctx.ext_transaction.get_num_ops(), - num_bytes, - ctx.iter.colls.size(), - ctx.iter.objects.size()); + auto flags = _t.get_fadvise_flags(); + internal_context_t ctx{ + _ch, std::move(_t), + transaction_manager->create_transaction( + Transaction::src_t::MUTATE, + "do_transaction", + flags) + }; + + assert(shard_stats.starting_io_num); + --(shard_stats.starting_io_num); + ++(shard_stats.waiting_collock_io_num); + + co_await ctx.transaction->get_handle().take_collection_lock( + static_cast(*(ctx.ch)).ordering_lock + ); + + assert(shard_stats.waiting_collock_io_num); + --(shard_stats.waiting_collock_io_num); + ++(shard_stats.waiting_throttler_io_num); + + co_await throttler.get(1); + + assert(shard_stats.waiting_throttler_io_num); + --(shard_stats.waiting_throttler_io_num); + ++(shard_stats.processing_inlock_io_num); + + co_await with_repeat_trans_intr( + *ctx.transaction, + seastar::coroutine::lambda([&ctx, this, FNAME](auto &t) + -> tm_ret { + ++(shard_stats.repeat_io_num); #ifndef NDEBUG - TRACET(" transaction dump:\n", t); - JSONFormatter f(true); - f.open_object_section("transaction"); - ctx.ext_transaction.dump(&f); - f.close_section(); - std::stringstream str; - f.flush(str); - TRACET("{}", t, str.str()); + TRACET(" transaction dump:\n", t); + JSONFormatter f(true); + f.open_object_section("transaction"); + ctx.ext_transaction.dump(&f); + f.close_section(); + std::stringstream str; + f.flush(str); + TRACET("{}", t, str.str()); #endif - return seastar::do_with( - std::vector(ctx.iter.objects.size()), - [this, &ctx](auto& onodes) - { - return trans_intr::repeat( - [this, &ctx, &onodes]() - -> tm_iertr::future - { - if (ctx.iter.have_op()) { - return _do_transaction_step( - ctx, ctx.ch, onodes, ctx.iter - ).si_then([] { - return seastar::make_ready_future( - seastar::stop_iteration::no); - }); - } else { - return seastar::make_ready_future( - seastar::stop_iteration::yes); - }; - }); - }).si_then([this, &ctx] { - return transaction_manager->submit_transaction(*ctx.transaction); - }); - }).safe_then([FNAME, &ctx] { - DEBUGT("done", *ctx.transaction); - }); - } - ).finally([this] { - assert(shard_stats.pending_io_num); - --(shard_stats.pending_io_num); - // XXX: it's wrong to assume no failure - --(shard_stats.processing_postlock_io_num); - }); + + DEBUGT("cid={}, {} operations, 0x{:x} bytes, {} colls, {} objects ...", + t, ctx.ch->get_cid(), + ctx.ext_transaction.get_num_ops(), + ctx.ext_transaction.get_num_bytes(), + ctx.iter.colls.size(), + ctx.iter.objects.size()); + + ctx.reset_preserve_handle(*transaction_manager); + std::vector onodes(ctx.iter.objects.size()); + while (ctx.iter.have_op()) { + co_await _do_transaction_step( + ctx, ctx.ch, onodes, ctx.iter); + } + + co_await transaction_manager->submit_transaction(*ctx.transaction); + }) + ).handle_error( + crimson::ct_error::all_same_way([&ctx](auto e) { + on_error(ctx.ext_transaction); + return seastar::now(); + }) + ); + + DEBUGT("done", *ctx.transaction); + add_latency_sample( + op_type_t::DO_TRANSACTION, + std::chrono::steady_clock::now() - ctx.begin_timestamp); + + throttler.put(); + + assert(shard_stats.pending_io_num); + --(shard_stats.pending_io_num); + // XXX: it's wrong to assume no failure + --(shard_stats.processing_postlock_io_num); } diff --git a/src/crimson/os/seastore/seastore.h b/src/crimson/os/seastore/seastore.h index c7f2b9da45463..ec847302c4c84 100644 --- a/src/crimson/os/seastore/seastore.h +++ b/src/crimson/os/seastore/seastore.h @@ -245,60 +245,6 @@ public: static void on_error(ceph::os::Transaction &t); - template - auto repeat_with_internal_context( - CollectionRef ch, - ceph::os::Transaction &&t, - Transaction::src_t src, - const char* tname, - op_type_t op_type, - F &&f) { - // The below repeat_io_num requires MUTATE - assert(src == Transaction::src_t::MUTATE); - return seastar::do_with( - internal_context_t( - ch, std::move(t), - transaction_manager->create_transaction( - src, tname, t.get_fadvise_flags())), - std::forward(f), - [this, op_type](auto &ctx, auto &f) { - assert(shard_stats.starting_io_num); - --(shard_stats.starting_io_num); - ++(shard_stats.waiting_collock_io_num); - - return ctx.transaction->get_handle().take_collection_lock( - static_cast(*(ctx.ch)).ordering_lock - ).then([this] { - assert(shard_stats.waiting_collock_io_num); - --(shard_stats.waiting_collock_io_num); - ++(shard_stats.waiting_throttler_io_num); - - return throttler.get(1); - }).then([&, this] { - assert(shard_stats.waiting_throttler_io_num); - --(shard_stats.waiting_throttler_io_num); - ++(shard_stats.processing_inlock_io_num); - - return repeat_eagain([&, this] { - ++(shard_stats.repeat_io_num); - - ctx.reset_preserve_handle(*transaction_manager); - return std::invoke(f, ctx); - }).handle_error( - crimson::ct_error::all_same_way([&ctx](auto e) { - on_error(ctx.ext_transaction); - return seastar::now(); - }) - ); - }).then([this, op_type, &ctx] { - add_latency_sample(op_type, - std::chrono::steady_clock::now() - ctx.begin_timestamp); - }).finally([this] { - throttler.put(); - }); - }); - } - template auto repeat_with_onode( CollectionRef ch, -- 2.39.5