]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/journal: hide RBM specific finish_commit()
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 20 Dec 2024 08:23:26 +0000 (16:23 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Tue, 24 Dec 2024 08:32:28 +0000 (16:32 +0800)
By introducing a callback upon completing submission.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/journal/circular_bounded_journal.cc
src/crimson/os/seastore/journal/circular_bounded_journal.h
src/crimson/os/seastore/journal/segmented_journal.cc
src/crimson/os/seastore/journal/segmented_journal.h
src/crimson/os/seastore/transaction_manager.cc
src/test/crimson/seastore/test_btree_lba_manager.cc
src/test/crimson/seastore/test_cbjournal.cc
src/test/crimson/seastore/test_seastore_journal.cc

index a5c9029c43cb21e99ceae0b587e80c08c6bf313e..298935bd22e2da8f99966106f39815b9560a706d 100644 (file)
@@ -59,13 +59,13 @@ public:
     crimson::ct_error::erange,
     crimson::ct_error::input_output_error
     >;
-  using submit_record_ret = submit_record_ertr::future<
-    record_locator_t
-    >;
-  virtual submit_record_ret submit_record(
+  using on_submission_func_t = std::function<
+    void(record_locator_t)>;
+  virtual submit_record_ertr::future<> submit_record(
     record_t &&record,
-    OrderingHandle &handle
-  ) = 0;
+    OrderingHandle &handle,
+    transaction_type_t t_src,
+    on_submission_func_t &&on_submission) = 0;
 
   /**
    * flush
@@ -101,9 +101,6 @@ public:
   virtual replay_ret replay(
     delta_handler_t &&delta_handler) = 0;
 
-  virtual seastar::future<> finish_commit(
-    transaction_type_t type) = 0;
-
   virtual ~Journal() {}
 
   virtual backend_type_t get_type() = 0;
index 9ee8b1b997f0ae39118a19e1c91802eec6504984..41ff8318aba0bc3bcb3d7e160dc4677cb5487aa3 100644 (file)
@@ -58,35 +58,52 @@ CircularBoundedJournal::close_ertr::future<> CircularBoundedJournal::close()
   return record_submitter.close();
 }
 
-CircularBoundedJournal::submit_record_ret
+CircularBoundedJournal::submit_record_ertr::future<>
 CircularBoundedJournal::submit_record(
     record_t &&record,
-    OrderingHandle &handle)
+    OrderingHandle &handle,
+    transaction_type_t t_src,
+    on_submission_func_t &&on_submission)
 {
   LOG_PREFIX(CircularBoundedJournal::submit_record);
   DEBUG("H{} {} start ...", (void*)&handle, record);
   assert(write_pipeline);
-  return do_submit_record(std::move(record), handle);
+  return do_submit_record(
+    std::move(record), handle, std::move(on_submission)
+  ).safe_then([this, t_src] {
+    if (is_trim_transaction(t_src)) {
+      return update_journal_tail(
+       trimmer.get_dirty_tail(),
+       trimmer.get_alloc_tail());
+    } else {
+      return seastar::now();
+    }
+  });
 }
 
-CircularBoundedJournal::submit_record_ret
+CircularBoundedJournal::submit_record_ertr::future<>
 CircularBoundedJournal::do_submit_record(
   record_t &&record,
-  OrderingHandle &handle)
+  OrderingHandle &handle,
+  on_submission_func_t &&on_submission)
 {
   LOG_PREFIX(CircularBoundedJournal::do_submit_record);
   if (!record_submitter.is_available()) {
     DEBUG("H{} wait ...", (void*)&handle);
     return record_submitter.wait_available(
-    ).safe_then([this, record=std::move(record), &handle]() mutable {
-      return do_submit_record(std::move(record), handle);
+    ).safe_then([this, record=std::move(record), &handle,
+                on_submission=std::move(on_submission)]() mutable {
+      return do_submit_record(
+       std::move(record), handle, std::move(on_submission));
     });
   }
   auto action = record_submitter.check_action(record.size);
   if (action == RecordSubmitter::action_t::ROLL) {
     return record_submitter.roll_segment(
-    ).safe_then([this, record=std::move(record), &handle]() mutable {
-      return do_submit_record(std::move(record), handle);
+    ).safe_then([this, record=std::move(record), &handle,
+                on_submission=std::move(on_submission)]() mutable {
+      return do_submit_record(
+       std::move(record), handle, std::move(on_submission));
     });
   }
 
@@ -99,13 +116,16 @@ CircularBoundedJournal::do_submit_record(
   return handle.enter(write_pipeline->device_submission
   ).then([submit_fut=std::move(submit_ret.future)]() mutable {
     return std::move(submit_fut);
-  }).safe_then([FNAME, this, &handle](record_locator_t result) {
+  }).safe_then([FNAME, this, &handle, on_submission=std::move(on_submission)
+              ](record_locator_t result) mutable {
     return handle.enter(write_pipeline->finalize
-    ).then([FNAME, this, result, &handle] {
+    ).then([FNAME, this, result, &handle,
+           on_submission=std::move(on_submission)] {
       DEBUG("H{} finish with {}", (void*)&handle, result);
       auto new_committed_to = result.write_result.get_end_seq();
       record_submitter.update_committed_to(new_committed_to);
-      return result;
+      std::invoke(on_submission, result);
+      return seastar::now();
     });
   });
 }
@@ -392,13 +412,4 @@ Journal::replay_ret CircularBoundedJournal::replay(
   });
 }
 
-seastar::future<> CircularBoundedJournal::finish_commit(transaction_type_t type) {
-  if (is_trim_transaction(type)) {
-    return update_journal_tail(
-      trimmer.get_dirty_tail(),
-      trimmer.get_alloc_tail());
-  }
-  return seastar::now();
-}
-
 }
index 874bd8dc086d9d48444e763d644818e8c771405d..16278df6cfe1c94e129c55bc0817273e296e907e 100644 (file)
@@ -80,9 +80,11 @@ public:
     return backend_type_t::RANDOM_BLOCK;
   }
 
-  submit_record_ret submit_record(
+  submit_record_ertr::future<> submit_record(
     record_t &&record,
-    OrderingHandle &handle
+    OrderingHandle &handle,
+    transaction_type_t t_src,
+    on_submission_func_t &&on_submission
   ) final;
 
   seastar::future<> flush(
@@ -148,8 +150,6 @@ public:
     return cjs.get_records_start();
   }
 
-  seastar::future<> finish_commit(transaction_type_t type) final;
-
   using cbj_delta_handler_t = std::function<
   replay_ertr::future<bool>(
     const record_locator_t&,
@@ -160,7 +160,10 @@ public:
     cbj_delta_handler_t &&delta_handler,
     journal_seq_t tail);
 
-  submit_record_ret do_submit_record(record_t &&record, OrderingHandle &handle);
+  submit_record_ertr::future<> do_submit_record(
+    record_t &&record,
+    OrderingHandle &handle,
+    on_submission_func_t &&on_submission);
 
   void try_read_rolled_header(scan_valid_records_cursor &cursor) {
     paddr_t addr = convert_abs_addr_to_paddr(
index 6be2ad4936a0ae477b104bc16034273cecda3643..67c0b3fb8ac01e29430c21bff7a20d245229aa1c 100644 (file)
@@ -368,25 +368,30 @@ seastar::future<> SegmentedJournal::flush(OrderingHandle &handle)
   });
 }
 
-SegmentedJournal::submit_record_ret
+SegmentedJournal::submit_record_ertr::future<>
 SegmentedJournal::do_submit_record(
   record_t &&record,
-  OrderingHandle &handle)
+  OrderingHandle &handle,
+  on_submission_func_t &&on_submission)
 {
   LOG_PREFIX(SegmentedJournal::do_submit_record);
   if (!record_submitter.is_available()) {
     DEBUG("H{} wait ...", (void*)&handle);
     return record_submitter.wait_available(
-    ).safe_then([this, record=std::move(record), &handle]() mutable {
-      return do_submit_record(std::move(record), handle);
+    ).safe_then([this, record=std::move(record), &handle,
+                on_submission=std::move(on_submission)]() mutable {
+      return do_submit_record(
+       std::move(record), handle, std::move(on_submission));
     });
   }
   auto action = record_submitter.check_action(record.size);
   if (action == RecordSubmitter::action_t::ROLL) {
     DEBUG("H{} roll, unavailable ...", (void*)&handle);
     return record_submitter.roll_segment(
-    ).safe_then([this, record=std::move(record), &handle]() mutable {
-      return do_submit_record(std::move(record), handle);
+    ).safe_then([this, record=std::move(record), &handle,
+                on_submission=std::move(on_submission)]() mutable {
+      return do_submit_record(
+       std::move(record), handle, std::move(on_submission));
     });
   } else { // SUBMIT_FULL/NOT_FULL
     DEBUG("H{} submit {} ...",
@@ -398,22 +403,27 @@ SegmentedJournal::do_submit_record(
     return handle.enter(write_pipeline->device_submission
     ).then([submit_fut=std::move(submit_ret.future)]() mutable {
       return std::move(submit_fut);
-    }).safe_then([FNAME, this, &handle](record_locator_t result) {
+    }).safe_then([FNAME, this, &handle, on_submission=std::move(on_submission)
+                ](record_locator_t result) mutable {
       return handle.enter(write_pipeline->finalize
-      ).then([FNAME, this, result, &handle] {
+      ).then([FNAME, this, result, &handle,
+             on_submission=std::move(on_submission)] {
         DEBUG("H{} finish with {}", (void*)&handle, result);
         auto new_committed_to = result.write_result.get_end_seq();
         record_submitter.update_committed_to(new_committed_to);
-        return result;
+        std::invoke(on_submission, result);
+       return seastar::now();
       });
     });
   }
 }
 
-SegmentedJournal::submit_record_ret
+SegmentedJournal::submit_record_ertr::future<>
 SegmentedJournal::submit_record(
     record_t &&record,
-    OrderingHandle &handle)
+    OrderingHandle &handle,
+    transaction_type_t t_src,
+    on_submission_func_t &&on_submission)
 {
   LOG_PREFIX(SegmentedJournal::submit_record);
   DEBUG("H{} {} start ...", (void*)&handle, record);
@@ -429,7 +439,8 @@ SegmentedJournal::submit_record(
     return crimson::ct_error::erange::make();
   }
 
-  return do_submit_record(std::move(record), handle);
+  return do_submit_record(
+    std::move(record), handle, std::move(on_submission));
 }
 
 }
index 891de7ec306978277cab2cdafa8bd574e377963d..3f51de70fb35159e607a46eabf55abdc7441281a 100644 (file)
@@ -44,9 +44,11 @@ public:
 
   close_ertr::future<> close() final;
 
-  submit_record_ret submit_record(
+  submit_record_ertr::future<> submit_record(
     record_t &&record,
-    OrderingHandle &handle) final;
+    OrderingHandle &handle,
+    transaction_type_t t_src,
+    on_submission_func_t &&on_submission) final;
 
   seastar::future<> flush(OrderingHandle &handle) final;
 
@@ -59,9 +61,6 @@ public:
   backend_type_t get_type() final {
     return backend_type_t::SEGMENTED;
   }
-  seastar::future<> finish_commit(transaction_type_t type) {
-    return seastar::now();
-  }
 
   bool is_checksum_needed() final {
     // segmented journal always requires checksum
@@ -69,10 +68,10 @@ public:
   }
 
 private:
-  submit_record_ret do_submit_record(
+  submit_record_ertr::future<> do_submit_record(
     record_t &&record,
-    OrderingHandle &handle
-  );
+    OrderingHandle &handle,
+    on_submission_func_t &&on_submission);
 
   SegmentSeqAllocatorRef segment_seq_allocator;
   SegmentAllocator journal_segment_allocator;
index 94e9b3b9ab1536f48a216cae9349965033c45f03..753bd5d6ff62a8c27e78b66b4f2e9b7d8b44d2c2 100644 (file)
@@ -461,8 +461,12 @@ TransactionManager::do_submit_transaction(
     }
 
     SUBTRACET(seastore_t, "submitting record", tref);
-    return journal->submit_record(std::move(record), tref.get_handle()
-    ).safe_then([this, FNAME, &tref](auto submit_result) mutable {
+    return journal->submit_record(
+      std::move(record),
+      tref.get_handle(),
+      tref.get_src(),
+      [this, FNAME, &tref](record_locator_t submit_result)
+    {
       SUBDEBUGT(seastore_t, "committed with {}", tref, submit_result);
       auto start_seq = submit_result.write_result.start_seq;
       journal->get_trimmer().set_journal_head(start_seq);
@@ -473,10 +477,8 @@ TransactionManager::do_submit_transaction(
       journal->get_trimmer().update_journal_tails(
        cache->get_oldest_dirty_from().value_or(start_seq),
        cache->get_oldest_backref_dirty_from().value_or(start_seq));
-      return journal->finish_commit(tref.get_src()
-      ).then([&tref] {
-       return tref.get_handle().complete();
-      });
+    }).safe_then([&tref] {
+      return tref.get_handle().complete();
     }).handle_error(
       submit_transaction_iertr::pass_further{},
       crimson::ct_error::assert_all{"Hit error submitting to journal"}
index 9988df3a1246198b8fcb875bae81110e819228d8..8b1f7435c87bfcfa46034977b737e86805fd12db 100644 (file)
@@ -112,14 +112,22 @@ struct btree_test_base :
   seastar::future<> submit_transaction(TransactionRef t)
   {
     auto record = cache->prepare_record(*t, JOURNAL_SEQ_NULL, JOURNAL_SEQ_NULL);
-    return journal->submit_record(std::move(record), t->get_handle()).safe_then(
-      [this, t=std::move(t)](auto submit_result) mutable {
-       cache->complete_commit(
-            *t,
+    return seastar::do_with(
+       std::move(t), [this, record=std::move(record)](auto& _t) mutable {
+      auto& t = *_t;
+      return journal->submit_record(
+        std::move(record),
+        t.get_handle(),
+        t.get_src(),
+        [this, &t](auto submit_result) {
+          cache->complete_commit(
+            t,
             submit_result.record_block_base,
             submit_result.write_result.start_seq);
-       complete_commit(*t);
-      }).handle_error(crimson::ct_error::assert_all{});
+          complete_commit(t);
+        }
+      ).handle_error(crimson::ct_error::assert_all{});
+    });
   }
 
   virtual LBAManager::mkfs_ret test_structure_setup(Transaction &t) = 0;
index d00a0f42729acdf5a518561d10364a17f378c795..47a08d68cbb4552390eb906b104e3db65765affc 100644 (file)
@@ -181,15 +181,20 @@ struct cbjournal_test_t : public seastar_test_suite_t, JournalTrimmer
 
   auto submit_record(record_t&& record) {
     entries.push_back(record);
+    entry_validator_t& back = entries.back();
     OrderingHandle handle = get_dummy_ordering_handle();
-    auto [addr, w_result] = cbj->submit_record(
-         std::move(record),
-         handle).unsafe_get();
-    entries.back().seq = w_result.start_seq;
-    entries.back().entries = 1;
-    entries.back().magic = cbj->get_cjs().get_cbj_header().magic;
-    logger().debug("submit entry to addr {}", entries.back().seq);
-    return convert_paddr_to_abs_addr(entries.back().seq.offset);
+    cbj->submit_record(
+      std::move(record),
+      handle,
+      transaction_type_t::MUTATE,
+      [this, &back](auto locator) {
+        back.seq = locator.write_result.start_seq;
+        back.entries = 1;
+        back.magic = cbj->get_cjs().get_cbj_header().magic;
+        logger().debug("submit entry to addr {}", back.seq);
+      }
+    ).unsafe_get();
+    return convert_paddr_to_abs_addr(back.seq.offset);
   }
 
   seastar::future<> tear_down_fut() final {
index 2eb791b1d46ab13909c5332bcf836d24b4e75fc4..04a99319b1179cf30feee4e313d15a48c0fcf0b2 100644 (file)
@@ -233,12 +233,17 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider, JournalTrimmer {
   auto submit_record(T&&... _record) {
     auto record{std::forward<T>(_record)...};
     records.push_back(record);
+    record_validator_t& back = records.back();
     OrderingHandle handle = get_dummy_ordering_handle();
-    auto [addr, _] = journal->submit_record(
+    journal->submit_record(
       std::move(record),
-      handle).unsafe_get();
-    records.back().record_final_offset = addr;
-    return addr;
+      handle,
+      transaction_type_t::MUTATE,
+      [&back](auto locator) {
+        back.record_final_offset = locator.record_block_base;
+      }
+    ).unsafe_get();
+    return back.record_final_offset;
   }
 
   extent_t generate_extent(size_t blocks) {