]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/.../seastore: simplify do_transaction_no_callbacks
authorSamuel Just <sjust@redhat.com>
Wed, 20 Aug 2025 01:48:44 +0000 (01:48 +0000)
committerSamuel Just <sjust@redhat.com>
Fri, 5 Sep 2025 22:22:06 +0000 (15:22 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/os/seastore/seastore.cc
src/crimson/os/seastore/seastore.h

index 196c6bd82752d3dbde5abb9a4874061ddcbc4d3e..1acaf268c96ced8e01893f498f3e1ca6a727bb4f 100644 (file)
@@ -1521,70 +1521,88 @@ seastar::future<> SeaStore::Shard::do_transaction_no_callbacks(
   CollectionRef _ch,
   ceph::os::Transaction&& _t)
 {
+  LOG_PREFIX(SeaStoreS::do_transaction_no_callbacks);
   ++(shard_stats.io_num);
   ++(shard_stats.pending_io_num);
   ++(shard_stats.starting_io_num);
 
-  // repeat_with_internal_context ensures ordering via collection lock
-  auto num_bytes = _t.get_num_bytes();
-  return repeat_with_internal_context(
-    _ch,
-    std::move(_t),
-    Transaction::src_t::MUTATE,
-    "do_transaction",
-    op_type_t::DO_TRANSACTION,
-    [this, num_bytes](auto &ctx) {
-      LOG_PREFIX(SeaStoreS::do_transaction_no_callbacks);
-      return with_trans_intr(*ctx.transaction, [&ctx, this, FNAME, num_bytes](auto &t) {
-        DEBUGT("cid={}, {} operations, 0x{:x} bytes, {} colls, {} objects ...",
-               t, ctx.ch->get_cid(),
-               ctx.ext_transaction.get_num_ops(),
-               num_bytes,
-               ctx.iter.colls.size(),
-               ctx.iter.objects.size());
+  auto flags = _t.get_fadvise_flags();
+  internal_context_t ctx{
+    _ch, std::move(_t),
+    transaction_manager->create_transaction(
+      Transaction::src_t::MUTATE,
+      "do_transaction",
+      flags)
+  };
+
+  assert(shard_stats.starting_io_num);
+  --(shard_stats.starting_io_num);
+  ++(shard_stats.waiting_collock_io_num);
+
+  co_await ctx.transaction->get_handle().take_collection_lock(
+    static_cast<SeastoreCollection&>(*(ctx.ch)).ordering_lock
+  );
+
+  assert(shard_stats.waiting_collock_io_num);
+  --(shard_stats.waiting_collock_io_num);
+  ++(shard_stats.waiting_throttler_io_num);
+
+  co_await throttler.get(1);
+
+  assert(shard_stats.waiting_throttler_io_num);
+  --(shard_stats.waiting_throttler_io_num);
+  ++(shard_stats.processing_inlock_io_num);
+
+  co_await with_repeat_trans_intr(
+    *ctx.transaction,
+    seastar::coroutine::lambda([&ctx, this, FNAME](auto &t)
+                              -> tm_ret {
+      ++(shard_stats.repeat_io_num);
 #ifndef NDEBUG
-       TRACET(" transaction dump:\n", t);
-       JSONFormatter f(true);
-       f.open_object_section("transaction");
-       ctx.ext_transaction.dump(&f);
-       f.close_section();
-       std::stringstream str;
-       f.flush(str);
-       TRACET("{}", t, str.str());
+      TRACET(" transaction dump:\n", t);
+      JSONFormatter f(true);
+      f.open_object_section("transaction");
+      ctx.ext_transaction.dump(&f);
+      f.close_section();
+      std::stringstream str;
+      f.flush(str);
+      TRACET("{}", t, str.str());
 #endif
-        return seastar::do_with(
-         std::vector<OnodeRef>(ctx.iter.objects.size()),
-          [this, &ctx](auto& onodes)
-        {
-          return trans_intr::repeat(
-            [this, &ctx, &onodes]()
-            -> tm_iertr::future<seastar::stop_iteration>
-          {
-            if (ctx.iter.have_op()) {
-              return _do_transaction_step(
-                ctx, ctx.ch, onodes, ctx.iter
-              ).si_then([] {
-                return seastar::make_ready_future<seastar::stop_iteration>(
-                  seastar::stop_iteration::no);
-              });
-            } else {
-              return seastar::make_ready_future<seastar::stop_iteration>(
-                seastar::stop_iteration::yes);
-            };
-          });
-        }).si_then([this, &ctx] {
-          return transaction_manager->submit_transaction(*ctx.transaction);
-        });
-      }).safe_then([FNAME, &ctx] {
-        DEBUGT("done", *ctx.transaction);
-      });
-    }
-  ).finally([this] {
-    assert(shard_stats.pending_io_num);
-    --(shard_stats.pending_io_num);
-    // XXX: it's wrong to assume no failure
-    --(shard_stats.processing_postlock_io_num);
-  });
+
+      DEBUGT("cid={}, {} operations, 0x{:x} bytes, {} colls, {} objects ...",
+            t, ctx.ch->get_cid(),
+            ctx.ext_transaction.get_num_ops(),
+            ctx.ext_transaction.get_num_bytes(),
+            ctx.iter.colls.size(),
+            ctx.iter.objects.size());
+
+      ctx.reset_preserve_handle(*transaction_manager);
+      std::vector<OnodeRef> onodes(ctx.iter.objects.size());
+      while (ctx.iter.have_op()) {
+       co_await _do_transaction_step(
+         ctx, ctx.ch, onodes, ctx.iter);
+      }
+
+      co_await transaction_manager->submit_transaction(*ctx.transaction);
+    })
+  ).handle_error(
+    crimson::ct_error::all_same_way([&ctx](auto e) {
+      on_error(ctx.ext_transaction);
+      return seastar::now();
+    })
+  );
+
+  DEBUGT("done", *ctx.transaction);
+  add_latency_sample(
+    op_type_t::DO_TRANSACTION,
+    std::chrono::steady_clock::now() - ctx.begin_timestamp);
+
+  throttler.put();
+
+  assert(shard_stats.pending_io_num);
+  --(shard_stats.pending_io_num);
+  // XXX: it's wrong to assume no failure
+  --(shard_stats.processing_postlock_io_num);
 }
 
 
index c7f2b9da45463f8281418d9081265a0f89a43bbc..ec847302c4c8463ed1fc654bb27fb5d10dcaa451 100644 (file)
@@ -245,60 +245,6 @@ public:
 
     static void on_error(ceph::os::Transaction &t);
 
-    template <typename F>
-    auto repeat_with_internal_context(
-      CollectionRef ch,
-      ceph::os::Transaction &&t,
-      Transaction::src_t src,
-      const char* tname,
-      op_type_t op_type,
-      F &&f) {
-      // The below repeat_io_num requires MUTATE
-      assert(src == Transaction::src_t::MUTATE);
-      return seastar::do_with(
-        internal_context_t(
-          ch, std::move(t),
-          transaction_manager->create_transaction(
-           src, tname, t.get_fadvise_flags())),
-        std::forward<F>(f),
-        [this, op_type](auto &ctx, auto &f) {
-        assert(shard_stats.starting_io_num);
-        --(shard_stats.starting_io_num);
-        ++(shard_stats.waiting_collock_io_num);
-
-       return ctx.transaction->get_handle().take_collection_lock(
-         static_cast<SeastoreCollection&>(*(ctx.ch)).ordering_lock
-       ).then([this] {
-         assert(shard_stats.waiting_collock_io_num);
-         --(shard_stats.waiting_collock_io_num);
-         ++(shard_stats.waiting_throttler_io_num);
-
-         return throttler.get(1);
-       }).then([&, this] {
-         assert(shard_stats.waiting_throttler_io_num);
-         --(shard_stats.waiting_throttler_io_num);
-         ++(shard_stats.processing_inlock_io_num);
-
-         return repeat_eagain([&, this] {
-           ++(shard_stats.repeat_io_num);
-
-           ctx.reset_preserve_handle(*transaction_manager);
-           return std::invoke(f, ctx);
-         }).handle_error(
-           crimson::ct_error::all_same_way([&ctx](auto e) {
-             on_error(ctx.ext_transaction);
-             return seastar::now();
-           })
-         );
-       }).then([this, op_type, &ctx] {
-         add_latency_sample(op_type,
-             std::chrono::steady_clock::now() - ctx.begin_timestamp);
-       }).finally([this] {
-         throttler.put();
-       });
-      });
-    }
-
     template <typename Ret, typename F>
     auto repeat_with_onode(
       CollectionRef ch,