]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/../record_submitter: account submission from record_group_t
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 24 May 2024 06:52:55 +0000 (14:52 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Sun, 16 Jun 2024 10:22:27 +0000 (13:22 +0300)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit c245900cf3f7784d9622799c2dbd6189e5d52258)

src/crimson/os/seastore/journal/record_submitter.cc
src/crimson/os/seastore/journal/record_submitter.h

index 5c892dcee22202a3b5d6a36c6487e9650debb5a3..418a453aac7d4a9b3d03c34c7e7e1072f8e675cf 100644 (file)
@@ -61,8 +61,7 @@ RecordBatch::add_pending(
   });
 }
 
-std::pair<ceph::bufferlist, record_group_size_t>
-RecordBatch::encode_batch(
+ceph::bufferlist RecordBatch::encode_batch(
   const journal_seq_t& committed_to,
   segment_nonce_t segment_nonce)
 {
@@ -72,13 +71,12 @@ RecordBatch::encode_batch(
 
   state = state_t::SUBMITTING;
   submitting_size = pending.get_size();
-  auto gsize = pending.size;
-  submitting_length = gsize.get_encoded_length();
-  submitting_mdlength = gsize.get_mdlength();
+  submitting_length = pending.size.get_encoded_length();
+  submitting_mdlength = pending.size.get_mdlength();
   auto bl = encode_records(pending, committed_to, segment_nonce);
   // Note: pending is cleared here
   assert(bl.length() == submitting_length);
-  return std::make_pair(bl, gsize);
+  return bl;
 }
 
 void RecordBatch::set_result(
@@ -103,24 +101,24 @@ void RecordBatch::set_result(
   io_promise.reset();
 }
 
-std::pair<ceph::bufferlist, record_group_size_t>
+ceph::bufferlist
 RecordBatch::submit_pending_fast(
-  record_t&& record,
+  record_group_t&& group,
   extent_len_t block_size,
   const journal_seq_t& committed_to,
   segment_nonce_t segment_nonce)
 {
+  assert(group.get_size() == 1);
+  auto& record = group.records[0];
   auto new_size = get_encoded_length_after(record, block_size);
   std::ignore = new_size;
   assert(state == state_t::EMPTY);
   assert(evaluate_submit(record.size, block_size).submit_size == new_size);
-
-  auto group = record_group_t(std::move(record), block_size);
-  auto size = group.size;
-  assert(size == new_size);
+  assert(group.size == new_size);
   auto bl = encode_records(group, committed_to, segment_nonce);
-  assert(bl.length() == size.get_encoded_length());
-  return std::make_pair(std::move(bl), size);
+  // Note: group is cleared here
+  assert(bl.length() == new_size.get_encoded_length());
+  return bl;
 }
 
 RecordSubmitter::RecordSubmitter(
@@ -292,14 +290,17 @@ RecordSubmitter::submit(
       state != state_t::FULL) {
     // fast path with direct write
     increment_io();
-    auto [to_write, sizes] = p_current_batch->submit_pending_fast(
-      std::move(record),
-      journal_allocator.get_block_size(),
+    auto block_size = journal_allocator.get_block_size();
+    auto rg = record_group_t(std::move(record), block_size);
+    account_submission(rg);
+    record_group_size_t sizes = rg.size;
+    auto to_write = p_current_batch->submit_pending_fast(
+      std::move(rg),
+      block_size,
       get_committed_to(),
       journal_allocator.get_nonce());
     DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
           get_name(), sizes, get_committed_to(), num_outstanding_io);
-    account_submission(1, sizes);
     return journal_allocator.write(std::move(to_write)
     ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
       return record_locator_t{
@@ -478,14 +479,13 @@ void RecordSubmitter::decrement_io_with_flush()
 }
 
 void RecordSubmitter::account_submission(
-  std::size_t num,
-  const record_group_size_t& size)
+  const record_group_t& rg)
 {
   stats.record_group_padding_bytes +=
-    (size.get_mdlength() - size.get_raw_mdlength());
-  stats.record_group_metadata_bytes += size.get_raw_mdlength();
-  stats.record_group_data_bytes += size.dlength;
-  stats.record_batch_stats.increment(num);
+    (rg.size.get_mdlength() - rg.size.get_raw_mdlength());
+  stats.record_group_metadata_bytes += rg.size.get_raw_mdlength();
+  stats.record_group_data_bytes += rg.size.dlength;
+  stats.record_batch_stats.increment(rg.get_size());
 }
 
 void RecordSubmitter::finish_submit_batch(
@@ -508,24 +508,28 @@ void RecordSubmitter::flush_current_batch()
 
   increment_io();
   auto num = p_batch->get_num_records();
-  auto [to_write, sizes] = p_batch->encode_batch(
+  const auto& rg = p_batch->get_record_group();
+  assert(rg.get_size() == num);
+  record_group_size_t sizes = rg.size;
+  account_submission(rg);
+  auto to_write = p_batch->encode_batch(
     get_committed_to(), journal_allocator.get_nonce());
+  // Note: rg is cleared
   DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
         get_name(), num, sizes, get_committed_to(), num_outstanding_io);
-  account_submission(num, sizes);
   std::ignore = journal_allocator.write(std::move(to_write)
-  ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
+  ).safe_then([this, p_batch, FNAME, num, sizes](auto write_result) {
     TRACE("{} {} records, {}, write done with {}",
           get_name(), num, sizes, write_result);
     finish_submit_batch(p_batch, write_result);
   }).handle_error(
-    crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+    crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes](auto e) {
       ERROR("{} {} records, {}, got error {}",
             get_name(), num, sizes, e);
       finish_submit_batch(p_batch, std::nullopt);
       return seastar::now();
     })
-  ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+  ).handle_exception([this, p_batch, FNAME, num, sizes](auto e) {
     ERROR("{} {} records, {}, got exception {}",
           get_name(), num, sizes, e);
     finish_submit_batch(p_batch, std::nullopt);
index 4381c9a34225ba7df40d1e4c0833d6f2f233936a..1ca19f0457bcd88aa3c93cba1fe74dfe306d2a06 100644 (file)
@@ -113,6 +113,10 @@ public:
     }
   }
 
+  const record_group_t& get_record_group() const {
+    return pending;
+  }
+
   struct evaluation_t {
     record_group_size_t submit_size;
     bool is_full;
@@ -150,7 +154,7 @@ public:
       extent_len_t block_size);
 
   // Encode the batched records for write.
-  std::pair<ceph::bufferlist, record_group_size_t> encode_batch(
+  ceph::bufferlist encode_batch(
       const journal_seq_t& committed_to,
       segment_nonce_t segment_nonce);
 
@@ -165,8 +169,8 @@ public:
   // the intervention of the shared io_promise.
   //
   // Note the current RecordBatch can be reused afterwards.
-  std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
-      record_t&&,
+  ceph::bufferlist submit_pending_fast(
+      record_group_t&&,
       extent_len_t block_size,
       const journal_seq_t& committed_to,
       segment_nonce_t segment_nonce);
@@ -296,7 +300,7 @@ private:
     free_batch_ptrs.pop_front();
   }
 
-  void account_submission(std::size_t, const record_group_size_t&);
+  void account_submission(const record_group_t&);
 
   using maybe_result_t = RecordBatch::maybe_result_t;
   void finish_submit_batch(RecordBatch*, maybe_result_t);