]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/record_submitter: refactor to make write base available
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 16 Jul 2024 06:36:22 +0000 (14:36 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 5 Aug 2024 03:15:13 +0000 (11:15 +0800)
Make sure the write base is available upon RecordSubmitter::submit().

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/journal/record_submitter.cc
src/crimson/os/seastore/journal/record_submitter.h

index 8d71cb994a32132912656f79d277d21f48c810f3..e5fa90b4b22b0ade676a4818e088ba77be5b3292 100644 (file)
@@ -17,7 +17,8 @@ RecordBatch::add_pending_ret
 RecordBatch::add_pending(
   const std::string& name,
   record_t&& record,
-  extent_len_t block_size)
+  extent_len_t block_size,
+  std::optional<journal_seq_t> maybe_write_base)
 {
   LOG_PREFIX(RecordBatch::add_pending);
   auto new_size = get_encoded_length_after(record, block_size);
@@ -36,10 +37,14 @@ RecordBatch::add_pending(
   if (state == state_t::EMPTY) {
     assert(!io_promise.has_value());
     io_promise = seastar::shared_promise<maybe_promise_result_t>();
+    assert(maybe_write_base.has_value());
+    assert(!write_base.has_value());
+    write_base = maybe_write_base;
   } else {
     assert(io_promise.has_value());
   }
   state = state_t::PENDING;
+  assert(write_base.has_value());
 
   return io_promise->get_shared_future(
   ).then([dlength_offset, FNAME, &name
@@ -61,22 +66,25 @@ RecordBatch::add_pending(
   });
 }
 
-ceph::bufferlist RecordBatch::encode_batch(
+RecordBatch::encode_ret_t RecordBatch::encode_batch(
   const journal_seq_t& committed_to,
   segment_nonce_t segment_nonce)
 {
   assert(state == state_t::PENDING);
   assert(pending.get_size() > 0);
   assert(io_promise.has_value());
+  assert(write_base.has_value());
 
   state = state_t::SUBMITTING;
+  auto _write_base = *write_base;
+  write_base.reset();
   submitting_size = pending.get_size();
   submitting_length = pending.size.get_encoded_length();
   submitting_mdlength = pending.size.get_mdlength();
   auto bl = encode_records(pending, committed_to, segment_nonce);
   // Note: pending is cleared here
   assert(bl.length() == submitting_length);
-  return bl;
+  return {_write_base, std::move(bl)};
 }
 
 void RecordBatch::set_result(
@@ -92,6 +100,7 @@ void RecordBatch::set_result(
   }
   assert(state == state_t::SUBMITTING);
   assert(io_promise.has_value());
+  assert(!write_base.has_value());
 
   state = state_t::EMPTY;
   submitting_size = 0;
@@ -325,10 +334,19 @@ RecordSubmitter::submit(
     });
   }
   // indirect batched write
+  std::optional<journal_seq_t> maybe_write_base;
+  if (p_current_batch->is_empty()) {
+    maybe_write_base = journal_allocator.get_written_to();
+  } else {
+    assert(p_current_batch->get_write_base().has_value());
+    assert(*p_current_batch->get_write_base() ==
+           journal_allocator.get_written_to());
+  }
   auto write_fut = p_current_batch->add_pending(
     get_name(),
     std::move(record),
-    journal_allocator.get_block_size());
+    journal_allocator.get_block_size(),
+    maybe_write_base);
   if (needs_flush) {
     if (state == state_t::FULL) {
       // #2 block concurrent submissions due to lack of resource
@@ -531,15 +549,16 @@ void RecordSubmitter::flush_current_batch()
   account_submission(rg);
   assert(stats.record_batch_stats.num_io ==
          stats.io_depth_stats.num_io);
-  auto to_write = p_batch->encode_batch(
+  auto encode_ret = p_batch->encode_batch(
     get_committed_to(), journal_allocator.get_nonce());
   // Note: rg is cleared
   DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
         get_name(), num, sizes, get_committed_to(), num_outstanding_io);
+  assert(encode_ret.write_base == journal_allocator.get_written_to());
   write_result_t result{
-      journal_allocator.get_written_to(),
-      to_write.length()};
-  std::ignore = journal_allocator.write(std::move(to_write)
+      encode_ret.write_base,
+      encode_ret.bl.length()};
+  std::ignore = journal_allocator.write(std::move(encode_ret.bl)
   ).safe_then([this, p_batch, FNAME, num, sizes, result] {
     TRACE("{} {} records, {}, write done with {}",
           get_name(), num, sizes, result);
index 62047f6bdcd760cc1438a0bfe3d43bbba6804f93..4b38b292073879590593dbafb650e8453f949942 100644 (file)
@@ -102,6 +102,10 @@ public:
     return pending.size;
   }
 
+  std::optional<journal_seq_t> get_write_base() const {
+    return write_base;
+  }
+
   bool needs_flush() const {
     assert(state != state_t::SUBMITTING);
     assert(pending.get_size() <= batch_capacity);
@@ -147,15 +151,22 @@ public:
   //
   // Set write_result_t::write_length to 0 if the record is not the first one
   // in the batch.
+  //
+  // write_base must be assigned when the state is empty
   using add_pending_ertr = JournalAllocator::write_ertr;
   using add_pending_ret = add_pending_ertr::future<record_locator_t>;
   add_pending_ret add_pending(
       const std::string& name,
       record_t&&,
-      extent_len_t block_size);
+      extent_len_t block_size,
+      std::optional<journal_seq_t> maybe_write_base);
 
   // Encode the batched records for write.
-  ceph::bufferlist encode_batch(
+  struct encode_ret_t {
+    journal_seq_t write_base;
+    ceph::bufferlist bl;
+  };
+  encode_ret_t encode_batch(
       const journal_seq_t& committed_to,
       segment_nonce_t segment_nonce);
 
@@ -188,6 +199,8 @@ private:
   std::size_t index = 0;
   std::size_t batch_capacity = 0;
   std::size_t batch_flush_size = 0;
+  // Valid at state_t::PENDING
+  std::optional<journal_seq_t> write_base;
 
   record_group_t pending;
   std::size_t submitting_size = 0;