]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/.../transaction_manager: convert do_submit_transaction to coroutine
authorSamuel Just <sjust@redhat.com>
Tue, 26 Aug 2025 16:04:05 +0000 (09:04 -0700)
committerSamuel Just <sjust@redhat.com>
Wed, 17 Sep 2025 16:23:45 +0000 (09:23 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/os/seastore/transaction_manager.cc

index 24f8df98332c389bd907ef599e6449e07cdc1b03..9184a9a27da5c554ed80b134bad9a35149e3b639 100644 (file)
@@ -562,79 +562,67 @@ TransactionManager::do_submit_transaction(
 {
   LOG_PREFIX(TransactionManager::do_submit_transaction);
   SUBDEBUGT(seastore_t, "start, entering ool_writes", tref);
-  return trans_intr::make_interruptible(
+  co_await trans_intr::make_interruptible(
     tref.get_handle().enter(write_pipeline.ool_writes_and_lba_updates)
-  ).then_interruptible([this, FNAME, &tref,
-                       dispatch_result = std::move(dispatch_result)] {
-    return seastar::do_with(std::move(dispatch_result),
-                           [this, FNAME, &tref](auto &dispatch_result) {
-      SUBTRACET(seastore_t, "write delayed ool extents", tref);
-      return epm->write_delayed_ool_extents(tref, dispatch_result.alloc_map
-      ).handle_error_interruptible(
-        crimson::ct_error::input_output_error::pass_further(),
-        crimson::ct_error::assert_all("invalid error")
-      );
-    });
-  }).si_then([&tref, FNAME, this] {
-    return seastar::do_with(
-      tref.get_valid_pre_alloc_list(),
-      [this, FNAME, &tref](auto &allocated_extents) {
-      return update_lba_mappings(tref, allocated_extents
-      ).si_then([this, FNAME, &tref, &allocated_extents] {
-        auto num_extents = allocated_extents.size();
-        SUBTRACET(seastore_t, "process {} allocated extents", tref, num_extents);
-        return epm->write_preallocated_ool_extents(tref, allocated_extents
-        ).handle_error_interruptible(
-          crimson::ct_error::input_output_error::pass_further(),
-          crimson::ct_error::assert_all("invalid error")
-        );
-      });
-    });
-  }).si_then([this, FNAME, &tref] {
-    SUBTRACET(seastore_t, "entering prepare", tref);
-    return tref.get_handle().enter(write_pipeline.prepare);
-  }).si_then([this, FNAME, &tref, trim_alloc_to=std::move(trim_alloc_to)]() mutable
-             -> submit_transaction_iertr::future<> {
-    if (trim_alloc_to && *trim_alloc_to != JOURNAL_SEQ_NULL) {
-      SUBTRACET(seastore_t, "trim backref_bufs to {}", tref, *trim_alloc_to);
-      cache->trim_backref_bufs(*trim_alloc_to);
-    }
+  );
 
-    auto record = cache->prepare_record(
-      tref,
-      journal->get_trimmer().get_journal_head(),
-      journal->get_trimmer().get_dirty_tail());
+  SUBTRACET(seastore_t, "write delayed ool extents", tref);
+  co_await epm->write_delayed_ool_extents(
+    tref, dispatch_result.alloc_map
+  );
 
-    tref.get_handle().maybe_release_collection_lock();
-    if (tref.get_src() == Transaction::src_t::MUTATE) {
-      --(shard_stats.processing_inlock_io_num);
-      ++(shard_stats.processing_postlock_io_num);
-    }
+  auto allocated_extents = tref.get_valid_pre_alloc_list();
+  co_await update_lba_mappings(tref, allocated_extents);
 
-    SUBTRACET(seastore_t, "submitting record", tref);
-    return journal->submit_record(
-      std::move(record),
-      tref.get_handle(),
-      tref.get_src(),
-      [this, FNAME, &tref](record_locator_t submit_result)
-    {
-      SUBDEBUGT(seastore_t, "committed with {}", tref, submit_result);
-      auto start_seq = submit_result.write_result.start_seq;
-      journal->get_trimmer().set_journal_head(start_seq);
-      cache->complete_commit(
-          tref,
-          submit_result.record_block_base,
-          start_seq);
-      journal->get_trimmer().update_journal_tails(
-       cache->get_oldest_dirty_from().value_or(start_seq),
-       cache->get_oldest_backref_dirty_from().value_or(start_seq));
-    }).safe_then([&tref] {
-      return tref.get_handle().complete();
+  auto num_extents = allocated_extents.size();
+  SUBTRACET(seastore_t, "process {} allocated extents", tref, num_extents);
+  co_await epm->write_preallocated_ool_extents(tref, allocated_extents);
+
+  SUBTRACET(seastore_t, "entering prepare", tref);
+  co_await trans_intr::make_interruptible(
+    tref.get_handle().enter(write_pipeline.prepare)
+  );
+
+  if (trim_alloc_to && *trim_alloc_to != JOURNAL_SEQ_NULL) {
+    SUBTRACET(seastore_t, "trim backref_bufs to {}", tref, *trim_alloc_to);
+    cache->trim_backref_bufs(*trim_alloc_to);
+  }
+
+  auto record = cache->prepare_record(
+    tref,
+    journal->get_trimmer().get_journal_head(),
+    journal->get_trimmer().get_dirty_tail());
+
+  tref.get_handle().maybe_release_collection_lock();
+  if (tref.get_src() == Transaction::src_t::MUTATE) {
+    --(shard_stats.processing_inlock_io_num);
+    ++(shard_stats.processing_postlock_io_num);
+  }
+
+  SUBTRACET(seastore_t, "submitting record", tref);
+  co_await journal->submit_record(
+    std::move(record),
+    tref.get_handle(),
+    tref.get_src(),
+    [this, FNAME, &tref](record_locator_t submit_result) {
+    SUBDEBUGT(seastore_t, "committed with {}", tref, submit_result);
+    auto start_seq = submit_result.write_result.start_seq;
+    journal->get_trimmer().set_journal_head(start_seq);
+    cache->complete_commit(
+      tref,
+      submit_result.record_block_base,
+      start_seq);
+    journal->get_trimmer().update_journal_tails(
+      cache->get_oldest_dirty_from().value_or(start_seq),
+      cache->get_oldest_backref_dirty_from().value_or(start_seq));
     }).handle_error(
       submit_transaction_iertr::pass_further{},
       crimson::ct_error::assert_all{"Hit error submitting to journal"}
     );
-  });
+
+  co_await trans_intr::make_interruptible(
+    tref.get_handle().complete()
+  );
 }
 
 seastar::future<> TransactionManager::flush(OrderingHandle &handle)