]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/record_submitter: return record-base immediately upon submit
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 17 Jul 2024 05:41:54 +0000 (13:41 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 5 Aug 2024 03:22:43 +0000 (11:22 +0800)
For out-of-line write only.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/extent_placement_manager.cc
src/crimson/os/seastore/journal/circular_bounded_journal.cc
src/crimson/os/seastore/journal/record_submitter.cc
src/crimson/os/seastore/journal/record_submitter.h
src/crimson/os/seastore/journal/segmented_journal.cc

index 1603b9179b720fbecccf4ca4a91142c37d299746..eaa3dd94a32aa9336d126d67270e7312c7498a30 100644 (file)
@@ -47,15 +47,20 @@ SegmentedOolWriter::write_record(
   stats.md_bytes += record.size.get_raw_mdlength();
   stats.num_records += 1;
 
-  return record_submitter.submit(
+  auto ret = record_submitter.submit(
     std::move(record),
-    with_atomic_roll_segment
-  ).safe_then([this, FNAME, &t, extents=std::move(extents)
+    with_atomic_roll_segment);
+  return std::move(ret.future
+  ).safe_then([this, FNAME, &t,
+               record_base=ret.record_base_regardless_md,
+               extents=std::move(extents)
               ](record_locator_t ret) mutable {
-    DEBUGT("{} finish with {} and {} extents",
+    DEBUGT("{} finish with {}={} and {} extents",
            t, segment_allocator.get_name(),
-           ret, extents.size());
+           ret, record_base, extents.size());
     paddr_t extent_addr = ret.record_block_base;
+    // ool won't write metadata, so the paddrs must be equal
+    assert(record_base.offset == extent_addr);
     for (auto& extent : extents) {
       TRACET("{} ool extent written at {} -- {}",
              t, segment_allocator.get_name(),
index a848811c1f81d5442ec3fb794cb229dd0ae1131a..9ee8b1b997f0ae39118a19e1c91802eec6504984 100644 (file)
@@ -94,9 +94,10 @@ CircularBoundedJournal::do_submit_record(
        (void*)&handle,
        action == RecordSubmitter::action_t::SUBMIT_FULL ?
        "FULL" : "NOT_FULL");
-  auto submit_fut = record_submitter.submit(std::move(record));
+  auto submit_ret = record_submitter.submit(std::move(record));
+  // submit_ret.record_base_regardless_md is wrong for journaling
   return handle.enter(write_pipeline->device_submission
-  ).then([submit_fut=std::move(submit_fut)]() mutable {
+  ).then([submit_fut=std::move(submit_ret.future)]() mutable {
     return std::move(submit_fut);
   }).safe_then([FNAME, this, &handle](record_locator_t result) {
     return handle.enter(write_pipeline->finalize
index c93d11085aea6e42066cbebcf2c86aa2f331fe29..27fc3e4a6ccf299afd3e50a77a866c44e2ff9fcc 100644 (file)
@@ -13,7 +13,7 @@ SET_SUBSYS(seastore_journal);
 
 namespace crimson::os::seastore::journal {
 
-RecordBatch::add_pending_ret
+RecordBatch::add_pending_ret_t
 RecordBatch::add_pending(
   const std::string& name,
   record_t&& record,
@@ -45,23 +45,26 @@ RecordBatch::add_pending(
   assert(write_base.has_value());
   assert(io_promise.has_value());
 
-  return io_promise->get_shared_future(
-  ).then([dlength_offset, FNAME, &name, write_base=*write_base
-         ](auto maybe_promise_result) -> add_pending_ret {
+  auto _write_base = *write_base;
+  auto fut = io_promise->get_shared_future(
+  ).then([dlength_offset, FNAME, &name, _write_base
+         ](auto maybe_promise_result) -> add_pending_fut {
     if (!maybe_promise_result.has_value()) {
       ERROR("{} write failed", name);
       return crimson::ct_error::input_output_error::make();
     }
     auto submit_result = record_locator_t{
-      write_base.offset.add_offset(
+      _write_base.offset.add_offset(
           maybe_promise_result->mdlength + dlength_offset),
-      write_result_t{write_base, maybe_promise_result->write_length}
+      write_result_t{_write_base, maybe_promise_result->write_length}
     };
     TRACE("{} write finish with {}", name, submit_result);
-    return add_pending_ret(
+    return add_pending_fut(
       add_pending_ertr::ready_future_marker{},
       submit_result);
   });
+  _write_base.offset = _write_base.offset.add_offset(dlength_offset);
+  return {_write_base, std::move(fut)};
 }
 
 RecordBatch::encode_ret_t RecordBatch::encode_batch(
@@ -321,7 +324,7 @@ RecordSubmitter::submit(
     write_result_t result{
         journal_allocator.get_written_to(),
         to_write.length()};
-    return journal_allocator.write(std::move(to_write)
+    auto write_fut = journal_allocator.write(std::move(to_write)
     ).safe_then([mdlength=sizes.get_mdlength(), result] {
       return record_locator_t{
         result.start_seq.offset.add_offset(mdlength),
@@ -330,6 +333,7 @@ RecordSubmitter::submit(
     }).finally([this] {
       decrement_io_with_flush();
     });
+    return {result.start_seq, std::move(write_fut)};
   }
   // indirect batched write
   std::optional<journal_seq_t> maybe_write_base;
@@ -340,7 +344,7 @@ RecordSubmitter::submit(
     assert(*p_current_batch->get_write_base() ==
            journal_allocator.get_written_to());
   }
-  auto write_fut = p_current_batch->add_pending(
+  auto ret = p_current_batch->add_pending(
     get_name(),
     std::move(record),
     journal_allocator.get_block_size(),
@@ -380,7 +384,7 @@ RecordSubmitter::submit(
           num_outstanding_io);
     assert(!p_current_batch->needs_flush());
   }
-  return write_fut;
+  return ret;
 }
 
 RecordSubmitter::open_ret
index 7c76ac78c32a2b811a6aceca4c5b5b78c352504c..656022ee09ca785ef7bc915ecad95739746cd24d 100644 (file)
@@ -151,8 +151,14 @@ public:
   //
   // write_base must be assigned when the state is empty
   using add_pending_ertr = JournalAllocator::write_ertr;
-  using add_pending_ret = add_pending_ertr::future<record_locator_t>;
-  add_pending_ret add_pending(
+  using add_pending_fut = add_pending_ertr::future<record_locator_t>;
+  struct add_pending_ret_t {
+    // The supposed record base if no metadata,
+    // only useful in case of ool.
+    journal_seq_t record_base_regardless_md;
+    add_pending_fut future;
+  };
+  add_pending_ret_t add_pending(
       const std::string& name,
       record_t&&,
       extent_len_t block_size,
@@ -275,8 +281,7 @@ public:
   roll_segment_ertr::future<> roll_segment();
 
   // when available, submit the record if possible
-  using submit_ertr = base_ertr;
-  using submit_ret = submit_ertr::future<record_locator_t>;
+  using submit_ret = RecordBatch::add_pending_ret_t;
   submit_ret submit(record_t&&, bool with_atomic_roll_segment=false);
 
   void update_committed_to(const journal_seq_t& new_committed_to) {
index 683481f15b4cb1a9ad1b2ec45291a1b36b9e6cc1..eca45f113c25c2f18c4f9c445346d8246d965964 100644 (file)
@@ -393,9 +393,10 @@ SegmentedJournal::do_submit_record(
           (void*)&handle,
           action == RecordSubmitter::action_t::SUBMIT_FULL ?
           "FULL" : "NOT_FULL");
-    auto submit_fut = record_submitter.submit(std::move(record));
+    auto submit_ret = record_submitter.submit(std::move(record));
+    // submit_ret.record_base_regardless_md is wrong for journaling
     return handle.enter(write_pipeline->device_submission
-    ).then([submit_fut=std::move(submit_fut)]() mutable {
+    ).then([submit_fut=std::move(submit_ret.future)]() mutable {
       return std::move(submit_fut);
     }).safe_then([FNAME, this, &handle](record_locator_t result) {
       return handle.enter(write_pipeline->finalize