]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: refactor, introduce record_t and record_group_t with sizes
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 19 Nov 2021 05:17:16 +0000 (13:17 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 9 Dec 2021 01:37:05 +0000 (09:37 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/cache.cc
src/crimson/os/seastore/extent_placement_manager.cc
src/crimson/os/seastore/extent_placement_manager.h
src/crimson/os/seastore/extent_reader.cc
src/crimson/os/seastore/extent_reader.h
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/seastore_types.cc
src/crimson/os/seastore/seastore_types.h

index 0e3cd7661f40753e9f11e18155207d9642d1778f..c24a157727775c915b2515b3d8eea03dd6841a86 100644 (file)
@@ -977,7 +977,7 @@ record_t Cache::prepare_record(Transaction &t)
     if (i->get_type() == extent_types_t::ROOT) {
       root = t.root;
       DEBUGT("writing out root delta for {}", t, *t.root);
-      record.deltas.push_back(
+      record.push_back(
        delta_info_t{
          extent_types_t::ROOT,
          paddr_t{},
@@ -989,7 +989,7 @@ record_t Cache::prepare_record(Transaction &t)
          t.root->get_delta()
        });
     } else {
-      record.deltas.push_back(
+      record.push_back(
        delta_info_t{
          i->get_type(),
          i->get_paddr(),
@@ -1047,7 +1047,7 @@ record_t Cache::prepare_record(Transaction &t)
     }
 
     assert(bl.length() == i->get_length());
-    record.extents.push_back(extent_t{
+    record.push_back(extent_t{
        i->get_type(),
        i->is_logical()
        ? i->cast<LogicalCachedExtent>()->get_laddr()
@@ -1062,7 +1062,7 @@ record_t Cache::prepare_record(Transaction &t)
     delta_info_t delta;
     delta.type = extent_types_t::RBM_ALLOC_INFO;
     delta.bl = bl;
-    record.deltas.push_back(delta);
+    record.push_back(std::move(delta));
   }
 
   for (auto &i: t.ool_block_list) {
@@ -1087,13 +1087,13 @@ record_t Cache::prepare_record(Transaction &t)
   record_header_fullness.ool_stats.filled_bytes += ool_stats.header_raw_bytes;
   record_header_fullness.ool_stats.total_bytes += ool_stats.header_bytes;
 
-  auto record_size = get_encoded_record_length(
-      record, reader.get_block_size());
+  auto record_size = record_group_size_t(
+      record.size, reader.get_block_size());
   auto inline_overhead =
-      record_size.mdlength + record_size.dlength - record.get_raw_data_size();
+      record_size.get_encoded_length() - record.get_raw_data_size();
   efforts.inline_record_overhead_bytes += inline_overhead;
-  record_header_fullness.inline_stats.filled_bytes += record_size.raw_mdlength;
-  record_header_fullness.inline_stats.total_bytes += record_size.mdlength;
+  record_header_fullness.inline_stats.filled_bytes += record_size.get_raw_mdlength();
+  record_header_fullness.inline_stats.total_bytes += record_size.get_mdlength();
 
   return record;
 }
index b78ec32eebc6993c10423e72ea0778d41a332b03..a4d335a637a05edbc1455259e2fbd31a2c38ef48 100644 (file)
@@ -71,10 +71,9 @@ SegmentedAllocator::Writer::_write(
   Transaction& t,
   ool_record_t& record)
 {
-  record_size_t record_size = record.get_encoded_record_length();
-  allocated_to += record_size.mdlength + record_size.dlength;
+  auto record_size = record.get_encoded_record_length();
+  allocated_to += record_size.get_encoded_length();
   bufferlist bl = record.encode(
-      record_size,
       current_segment->segment->get_segment_id(),
       0);
   seastar::promise<> pr;
@@ -93,8 +92,8 @@ SegmentedAllocator::Writer::_write(
   auto& stats = t.get_ool_write_stats();
   stats.extents.num += record.get_num_extents();
   stats.extents.bytes += record_size.dlength;
-  stats.header_raw_bytes += record_size.raw_mdlength;
-  stats.header_bytes += record_size.mdlength;
+  stats.header_raw_bytes += record_size.get_raw_mdlength();
+  stats.header_bytes += record_size.get_mdlength();
   stats.num_records += 1;
 
   return trans_intr::make_interruptible(
index 0b040d72f76ac827652177d36ad54e337696855e..506edde8147db1cf3e4370d6bd4375b9a43ead6c 100644 (file)
@@ -47,39 +47,38 @@ class ool_record_t {
 
 public:
   ool_record_t(size_t block_size) : block_size(block_size) {}
-  record_size_t get_encoded_record_length() {
+  record_group_size_t get_encoded_record_length() {
     assert(extents.size() == record.extents.size());
-    return crimson::os::seastore::get_encoded_record_length(record, block_size);
+    return record_group_size_t(record.size, block_size);
   }
   size_t get_wouldbe_encoded_record_length(LogicalCachedExtentRef& extent) {
-    auto raw_mdlength = get_encoded_record_raw_mdlength(record, block_size);
-    auto wouldbe_mdlength = p2roundup(
-      raw_mdlength + ceph::encoded_sizeof_bounded<extent_info_t>(),
-      block_size);
-    return wouldbe_mdlength + extent_buf_len + extent->get_bptr().length();
+    record_size_t rsize = record.size;
+    rsize.account_extent(extent->get_bptr().length());
+    return record_group_size_t(rsize, block_size).get_encoded_length();
   }
-  ceph::bufferlist encode(const record_size_t& rsize,
-                          segment_id_t segment,
+  ceph::bufferlist encode(segment_id_t segment,
                           segment_nonce_t nonce) {
     assert(extents.size() == record.extents.size());
-    segment_off_t extent_offset = base + rsize.mdlength;
+    assert(!record.deltas.size());
+    auto record_group = record_group_t(std::move(record), block_size);
+    segment_off_t extent_offset = base + record_group.size.get_mdlength();
     for (auto& extent : extents) {
       extent.set_ool_paddr(
         paddr_t::make_seg_paddr(segment, extent_offset));
       extent_offset += extent.get_bptr().length();
     }
-    assert(extent_offset == (segment_off_t)(base + rsize.mdlength + rsize.dlength));
-    return encode_record(rsize, std::move(record), block_size, journal_seq_t(), nonce);
+    assert(extent_offset ==
+           (segment_off_t)(base + record_group.size.get_encoded_length()));
+    return encode_records(record_group, journal_seq_t(), nonce);
   }
   void add_extent(LogicalCachedExtentRef& extent) {
     extents.emplace_back(extent);
     ceph::bufferlist bl;
     bl.append(extent->get_bptr());
-    record.extents.emplace_back(extent_t{
+    record.push_back(extent_t{
       extent->get_type(),
       extent->get_laddr(),
       std::move(bl)});
-    extent_buf_len += extent->get_bptr().length();
   }
   std::vector<OolExtent>& get_extents() {
     return extents;
@@ -91,10 +90,8 @@ public:
     return base;
   }
   void clear() {
-    record.extents.clear();
+    record = {};
     extents.clear();
-    assert(!record.deltas.size());
-    extent_buf_len = 0;
     base = MAX_SEG_OFF;
   }
   uint64_t get_num_extents() const {
@@ -105,7 +102,6 @@ private:
   std::vector<OolExtent> extents;
   record_t record;
   size_t block_size;
-  segment_off_t extent_buf_len = 0;
   segment_off_t base = MAX_SEG_OFF;
 };
 
index b257d266ea66a944bf6e6dd26ab13dce4bf092bd..46b4165055fe40f2be4fad53a0a7acb5f47a4506 100644 (file)
@@ -77,7 +77,7 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents(
         const record_header_t& header,
         const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
       {
-        auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf);
+        auto maybe_record_extent_infos = try_decode_extent_info(header, mdbuf);
         if (!maybe_record_extent_infos) {
           // This should be impossible, we did check the crc on the mdbuf
           logger().error(
@@ -88,8 +88,8 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents(
 
         paddr_t extent_offset = locator.record_block_base;
         logger().debug("ExtentReader::scan_extents: decoded {} extents",
-                       maybe_record_extent_infos->size());
-        for (const auto &i : *maybe_record_extent_infos) {
+                       maybe_record_extent_infos->extent_infos.size());
+        for (const auto &i : maybe_record_extent_infos->extent_infos) {
           extents->emplace_back(extent_offset, i);
           auto& seg_addr = extent_offset.as_seg_paddr();
           seg_addr.set_segment_off(
@@ -237,21 +237,14 @@ ExtentReader::read_validate_record_metadata(
         segment_manager.get_block_size());
     bufferlist bl;
     bl.append(bptr);
-    auto bp = bl.cbegin();
-    record_header_t header;
-    try {
-      decode(header, bp);
-    } catch (ceph::buffer::error &e) {
-      return read_validate_record_metadata_ret(
-        read_validate_record_metadata_ertr::ready_future_marker{},
-        std::nullopt);
-    }
-    if (header.segment_nonce != nonce) {
+    auto maybe_header = try_decode_record_header(bl, nonce);
+    if (!maybe_header.has_value()) {
       return read_validate_record_metadata_ret(
         read_validate_record_metadata_ertr::ready_future_marker{},
         std::nullopt);
     }
     auto& seg_addr = start.as_seg_paddr();
+    auto& header = *maybe_header;
     if (header.mdlength < block_size ||
         header.mdlength % block_size != 0 ||
         header.dlength % block_size != 0 ||
@@ -281,8 +274,8 @@ ExtentReader::read_validate_record_metadata(
         read_validate_record_metadata_ertr::ready_future_marker{},
         std::make_pair(std::move(header), std::move(bl)));
     });
-  }).safe_then([this](auto p) {
-    if (p && validate_metadata(p->second)) {
+  }).safe_then([](auto p) {
+    if (p && validate_record_metadata(p->second)) {
       return read_validate_record_metadata_ret(
         read_validate_record_metadata_ertr::ready_future_marker{},
         std::move(*p)
@@ -295,26 +288,6 @@ ExtentReader::read_validate_record_metadata(
   });
 }
 
-std::optional<std::vector<extent_info_t>>
-ExtentReader::try_decode_extent_infos(
-  record_header_t header,
-  const bufferlist &bl)
-{
-  auto bliter = bl.cbegin();
-  bliter += ceph::encoded_sizeof_bounded<record_header_t>();
-  bliter += sizeof(checksum_t) /* crc */;
-  logger().debug("{}: decoding {} extents", __func__, header.extents);
-  std::vector<extent_info_t> extent_infos(header.extents);
-  for (auto &&i : extent_infos) {
-    try {
-      decode(i, bliter);
-    } catch (ceph::buffer::error &e) {
-      return std::nullopt;
-    }
-  }
-  return extent_infos;
-}
-
 ExtentReader::read_validate_data_ret
 ExtentReader::read_validate_data(
   paddr_t record_base,
@@ -330,25 +303,10 @@ ExtentReader::read_validate_data(
   ).safe_then([=, &header](auto bptr) {
     bufferlist bl;
     bl.append(bptr);
-    return bl.crc32c(-1) == header.data_crc;
+    return validate_record_data(header, bl);
   });
 }
 
-bool ExtentReader::validate_metadata(const bufferlist &bl)
-{
-  auto bliter = bl.cbegin();
-  auto test_crc = bliter.crc32c(
-    ceph::encoded_sizeof_bounded<record_header_t>(),
-    -1);
-  ceph_le32 recorded_crc_le;
-  decode(recorded_crc_le, bliter);
-  uint32_t recorded_crc = recorded_crc_le;
-  test_crc = bliter.crc32c(
-    bliter.get_remaining(),
-    test_crc);
-  return test_crc == recorded_crc;
-}
-
 ExtentReader::consume_record_group_ertr::future<>
 ExtentReader::consume_next_records(
   scan_valid_records_cursor& cursor,
index 243f3dc86d63067778f38be86c1c36eb06f291fa..dfa2cd5b67dc5225b13b18a8675583b0b9cd1f3c 100644 (file)
@@ -98,11 +98,6 @@ private:
     paddr_t start,
     segment_nonce_t nonce);
 
-  /// attempts to decode extent infos from bl, return nullopt if unsuccessful
-  std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
-    record_header_t header,
-    const bufferlist &bl);
-
   /// read and validate data
   using read_validate_data_ertr = read_ertr;
   using read_validate_data_ret = read_validate_data_ertr::future<bool>;
@@ -118,10 +113,6 @@ private:
       found_record_handler_t& handler,
       std::size_t& budget_used);
 
-
-  /// validate embedded metadata checksum
-  static bool validate_metadata(const bufferlist &bl);
-
   friend class TransactionManager;
 };
 
index f0331970236afc5993dc6060c190e9482df714b8..3d4eb66df561a88fd3c7713d14160a23d894603d 100644 (file)
@@ -144,26 +144,6 @@ Journal::prep_replay_segments(
     std::move(ret));
 }
 
-std::optional<std::vector<delta_info_t>> Journal::try_decode_deltas(
-  record_header_t header,
-  const bufferlist &bl)
-{
-  auto bliter = bl.cbegin();
-  bliter += ceph::encoded_sizeof_bounded<record_header_t>();
-  bliter += sizeof(checksum_t) /* crc */;
-  bliter += header.extents  * ceph::encoded_sizeof_bounded<extent_info_t>();
-  logger().debug("Journal::try_decode_deltas: decoding {} deltas", header.deltas);
-  std::vector<delta_info_t> deltas(header.deltas);
-  for (auto &&i : deltas) {
-    try {
-      decode(i, bliter);
-    } catch (ceph::buffer::error &e) {
-      return std::nullopt;
-    }
-  }
-  return deltas;
-}
-
 Journal::replay_ertr::future<>
 Journal::replay_segment(
   journal_seq_t seq,
@@ -191,14 +171,18 @@ Journal::replay_segment(
 
       return seastar::do_with(
         std::move(*maybe_record_deltas_list),
-        [=](auto &deltas)
+        [locator,
+         this,
+         &handler](record_deltas_t& record_deltas)
       {
         logger().debug("Journal::replay_segment: decoded {} deltas at block_base {}",
-                       deltas.size(),
+                       record_deltas.deltas.size(),
                        locator.record_block_base);
         return crimson::do_for_each(
-          deltas,
-          [=](auto &delta)
+          record_deltas.deltas,
+          [locator,
+           this,
+           &handler](delta_info_t& delta)
         {
           /* The journal may validly contain deltas for extents in
            * since released segments.  We can detect those cases by
@@ -213,7 +197,7 @@ Journal::replay_segment(
           auto& seg_addr = delta.paddr.as_seg_paddr();
           if (delta.paddr != P_ADDR_NULL &&
               (segment_provider->get_seq(seg_addr.get_segment_id()) >
-               seq.segment_seq)) {
+               locator.write_result.start_seq.segment_seq)) {
             return replay_ertr::now();
           } else {
             return handler(locator, delta);
@@ -438,21 +422,23 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment)
 Journal::RecordBatch::add_pending_ret
 Journal::RecordBatch::add_pending(
   record_t&& record,
-  const record_size_t& rsize)
+  extent_len_t block_size)
 {
+  auto new_encoded_length = get_encoded_length_after(record, block_size);
   logger().debug(
     "Journal::RecordBatch::add_pending: batches={}, write_size={}",
-    records.size() + 1,
-    get_encoded_length(rsize));
+    pending.get_size() + 1,
+    new_encoded_length);
   assert(state != state_t::SUBMITTING);
-  assert(can_batch(rsize));
-
-  auto block_start_offset = encoded_length + rsize.mdlength;
-  records.push_back(std::move(record));
-  record_sizes.push_back(rsize);
-  auto new_encoded_length = get_encoded_length(rsize);
-  assert(new_encoded_length < MAX_SEG_OFF);
-  encoded_length = new_encoded_length;
+  assert(can_batch(record, block_size) == new_encoded_length);
+
+  auto block_start_offset = record.size.get_mdlength(block_size);
+  if (state != state_t::EMPTY) {
+    block_start_offset += pending.size.get_encoded_length();
+  }
+  pending.push_back(
+      std::move(record), block_size);
+  assert(pending.size.get_encoded_length() == new_encoded_length);
   if (state == state_t::EMPTY) {
     assert(!io_promise.has_value());
     io_promise = seastar::shared_promise<maybe_result_t>();
@@ -477,33 +463,24 @@ Journal::RecordBatch::add_pending(
   });
 }
 
-ceph::bufferlist Journal::RecordBatch::encode_records(
-  size_t block_size,
+ceph::bufferlist Journal::RecordBatch::encode_batch(
   const journal_seq_t& committed_to,
   segment_nonce_t segment_nonce)
 {
   logger().debug(
-    "Journal::RecordBatch::encode_records: batches={}, committed_to={}",
-    records.size(),
+    "Journal::RecordBatch::encode_batch: batches={}, committed_to={}",
+    pending.get_size(),
     committed_to);
   assert(state == state_t::PENDING);
-  assert(records.size());
-  assert(records.size() == record_sizes.size());
+  assert(pending.get_size() > 0);
   assert(io_promise.has_value());
 
   state = state_t::SUBMITTING;
-  ceph::bufferlist bl;
-  std::size_t i = 0;
-  do {
-    auto record_bl = encode_record(
-        record_sizes[i],
-        std::move(records[i]),
-        block_size,
-        committed_to,
-        segment_nonce);
-    bl.claim_append(record_bl);
-  } while ((++i) < records.size());
-  assert(bl.length() == (std::size_t)encoded_length);
+  submitting_size = pending.get_size();
+  submitting_length = pending.size.get_encoded_length();
+  auto bl = encode_records(pending, committed_to, segment_nonce);
+  // Note: pending is cleared here
+  assert(bl.length() == (std::size_t)submitting_length);
   return bl;
 }
 
@@ -513,47 +490,45 @@ void Journal::RecordBatch::set_result(
   if (maybe_write_result.has_value()) {
     logger().debug(
       "Journal::RecordBatch::set_result: batches={}, write_start {} + {}",
-      records.size(),
+      submitting_size,
       maybe_write_result->start_seq,
       maybe_write_result->length);
-    assert(maybe_write_result->length == encoded_length);
+    assert(maybe_write_result->length == submitting_length);
   } else {
     logger().error(
       "Journal::RecordBatch::set_result: batches={}, write is failed!",
-      records.size());
+      submitting_size);
   }
   assert(state == state_t::SUBMITTING);
   assert(io_promise.has_value());
 
   state = state_t::EMPTY;
-  encoded_length = 0;
-  records.clear();
-  record_sizes.clear();
+  submitting_size = 0;
+  submitting_length = 0;
   io_promise->set_value(maybe_write_result);
   io_promise.reset();
 }
 
-ceph::bufferlist Journal::RecordBatch::submit_pending_fast(
+std::pair<ceph::bufferlist, record_group_size_t>
+Journal::RecordBatch::submit_pending_fast(
   record_t&& record,
-  const record_size_t& rsize,
-  size_t block_size,
+  extent_len_t block_size,
   const journal_seq_t& committed_to,
   segment_nonce_t segment_nonce)
 {
+  auto encoded_length = get_encoded_length_after(record, block_size);
   logger().debug(
     "Journal::RecordBatch::submit_pending_fast: write_size={}",
-    get_encoded_length(rsize));
+    encoded_length);
   assert(state == state_t::EMPTY);
-  assert(can_batch(rsize));
-
-  auto bl = encode_record(
-      rsize,
-      std::move(record),
-      block_size,
-      committed_to,
-      segment_nonce);
-  assert(bl.length() == get_encoded_length(rsize));
-  return bl;
+  assert(can_batch(record, block_size) == encoded_length);
+
+  auto group = record_group_t(std::move(record), block_size);
+  auto size = group.size;
+  auto bl = encode_records(group, committed_to, segment_nonce);
+  assert(bl.length() == encoded_length);
+  assert(bl.length() == size.get_encoded_length());
+  return std::make_pair(bl, size);
 }
 
 Journal::RecordSubmitter::RecordSubmitter(
@@ -583,20 +558,21 @@ Journal::RecordSubmitter::submit(
   OrderingHandle& handle)
 {
   assert(write_pipeline);
-  auto rsize = get_encoded_record_length(
-    record, journal_segment_manager.get_block_size());
-  auto total = rsize.mdlength + rsize.dlength;
+  auto expected_size = record_group_size_t(
+      record.size,
+      journal_segment_manager.get_block_size()
+  ).get_encoded_length();
   auto max_record_length = journal_segment_manager.get_max_write_length();
-  if (total > max_record_length) {
+  if (expected_size > max_record_length) {
     logger().error(
       "Journal::RecordSubmitter::submit: record size {} exceeds max {}",
-      total,
+      expected_size,
       max_record_length
     );
     return crimson::ct_error::erange::make();
   }
 
-  return do_submit(std::move(record), rsize, handle);
+  return do_submit(std::move(record), handle);
 }
 
 void Journal::RecordSubmitter::update_state()
@@ -630,8 +606,7 @@ void Journal::RecordSubmitter::flush_current_batch()
   pop_free_batch();
 
   increment_io();
-  ceph::bufferlist to_write = p_batch->encode_records(
-    journal_segment_manager.get_block_size(),
+  ceph::bufferlist to_write = p_batch->encode_batch(
     journal_segment_manager.get_committed_to(),
     journal_segment_manager.get_nonce());
   std::ignore = journal_segment_manager.write(to_write
@@ -655,27 +630,25 @@ void Journal::RecordSubmitter::flush_current_batch()
 Journal::RecordSubmitter::submit_pending_ret
 Journal::RecordSubmitter::submit_pending(
   record_t&& record,
-  const record_size_t& rsize,
   OrderingHandle& handle,
   bool flush)
 {
   assert(!p_current_batch->is_submitting());
   record_batch_stats.increment(
       p_current_batch->get_num_records() + 1);
-  auto write_fut = [this, flush, record=std::move(record), &rsize]() mutable {
+  auto write_fut = [this, flush, record=std::move(record)]() mutable {
     if (flush && p_current_batch->is_empty()) {
       // fast path with direct write
       increment_io();
-      ceph::bufferlist to_write = p_current_batch->submit_pending_fast(
+      auto [to_write, sizes] = p_current_batch->submit_pending_fast(
         std::move(record),
-        rsize,
         journal_segment_manager.get_block_size(),
         journal_segment_manager.get_committed_to(),
         journal_segment_manager.get_nonce());
       return journal_segment_manager.write(to_write
-      ).safe_then([rsize](auto write_result) {
+      ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
         return record_locator_t{
-          write_result.start_seq.offset.add_offset(rsize.mdlength),
+          write_result.start_seq.offset.add_offset(mdlength),
           write_result
         };
       }).finally([this] {
@@ -684,7 +657,7 @@ Journal::RecordSubmitter::submit_pending(
     } else {
       // indirect write with or without the existing pending records
       auto write_fut = p_current_batch->add_pending(
-        std::move(record), rsize);
+        std::move(record), journal_segment_manager.get_block_size());
       if (flush) {
         flush_current_batch();
       }
@@ -707,35 +680,36 @@ Journal::RecordSubmitter::submit_pending(
 Journal::RecordSubmitter::do_submit_ret
 Journal::RecordSubmitter::do_submit(
   record_t&& record,
-  const record_size_t& rsize,
   OrderingHandle& handle)
 {
   assert(!p_current_batch->is_submitting());
   if (state <= state_t::PENDING) {
     // can increment io depth
     assert(!wait_submit_promise.has_value());
-    auto batched_size = p_current_batch->can_batch(rsize);
+    auto batched_size = p_current_batch->can_batch(
+        record, journal_segment_manager.get_block_size());
     if (batched_size == 0 ||
         batched_size > journal_segment_manager.get_max_write_length()) {
       assert(p_current_batch->is_pending());
       flush_current_batch();
-      return do_submit(std::move(record), rsize, handle);
+      return do_submit(std::move(record), handle);
     } else if (journal_segment_manager.needs_roll(batched_size)) {
       if (p_current_batch->is_pending()) {
         flush_current_batch();
       }
       return journal_segment_manager.roll(
-      ).safe_then([this, record=std::move(record), rsize, &handle]() mutable {
-        return do_submit(std::move(record), rsize, handle);
+      ).safe_then([this, record=std::move(record), &handle]() mutable {
+        return do_submit(std::move(record), handle);
       });
     } else {
-      return submit_pending(std::move(record), rsize, handle, true);
+      return submit_pending(std::move(record), handle, true);
     }
   }
 
   assert(state == state_t::FULL);
   // cannot increment io depth
-  auto batched_size = p_current_batch->can_batch(rsize);
+  auto batched_size = p_current_batch->can_batch(
+      record, journal_segment_manager.get_block_size());
   if (batched_size == 0 ||
       batched_size > journal_segment_manager.get_max_write_length() ||
       journal_segment_manager.needs_roll(batched_size)) {
@@ -743,11 +717,11 @@ Journal::RecordSubmitter::do_submit(
       wait_submit_promise = seastar::promise<>();
     }
     return wait_submit_promise->get_future(
-    ).then([this, record=std::move(record), rsize, &handle]() mutable {
-      return do_submit(std::move(record), rsize, handle);
+    ).then([this, record=std::move(record), &handle]() mutable {
+      return do_submit(std::move(record), handle);
     });
   } else {
-    return submit_pending(std::move(record), rsize, handle, false);
+    return submit_pending(std::move(record), handle, false);
   }
 }
 
index 5aed2ed57e64b7a652a46e553b42c436b169b6ab..ba4c4498efee00a75feca4cde06599725f192f22 100644 (file)
@@ -250,19 +250,22 @@ private:
     }
 
     std::size_t get_num_records() const {
-      return records.size();
+      return pending.get_size();
     }
 
     // return the expected write size if allows to batch,
     // otherwise, return 0
-    std::size_t can_batch(const record_size_t& rsize) const {
+    std::size_t can_batch(
+        const record_t& record,
+        extent_len_t block_size) const {
       assert(state != state_t::SUBMITTING);
-      if (records.size() >= batch_capacity ||
-          static_cast<std::size_t>(encoded_length) > batch_flush_size) {
+      if (pending.get_size() >= batch_capacity ||
+          (pending.get_size() > 0 &&
+           pending.size.get_encoded_length() > batch_flush_size)) {
         assert(state == state_t::PENDING);
         return 0;
       }
-      return get_encoded_length(rsize);
+      return get_encoded_length_after(record, block_size);
     }
 
     void initialize(std::size_t i,
@@ -272,8 +275,7 @@ private:
       index = i;
       batch_capacity = _batch_capacity;
       batch_flush_size = _batch_flush_size;
-      records.reserve(batch_capacity);
-      record_sizes.reserve(batch_capacity);
+      pending.reserve(batch_capacity);
     }
 
     // Add to the batch, the future will be resolved after the batch is
@@ -283,11 +285,12 @@ private:
     // in the batch.
     using add_pending_ertr = JournalSegmentManager::write_ertr;
     using add_pending_ret = add_pending_ertr::future<record_locator_t>;
-    add_pending_ret add_pending(record_t&&, const record_size_t&);
+    add_pending_ret add_pending(
+        record_t&&,
+        extent_len_t block_size);
 
     // Encode the batched records for write.
-    ceph::bufferlist encode_records(
-        size_t block_size,
+    ceph::bufferlist encode_batch(
         const journal_seq_t& committed_to,
         segment_nonce_t segment_nonce);
 
@@ -298,31 +301,33 @@ private:
     // The fast path that is equivalent to submit a single record as a batch.
     //
     // Essentially, equivalent to the combined logic of:
-    // add_pending(), encode_records() and set_result() above without
+    // add_pending(), encode_batch() and set_result() above without
     // the intervention of the shared io_promise.
     //
     // Note the current RecordBatch can be reused afterwards.
-    ceph::bufferlist submit_pending_fast(
+    std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
         record_t&&,
-        const record_size_t&,
-        size_t block_size,
+        extent_len_t block_size,
         const journal_seq_t& committed_to,
         segment_nonce_t segment_nonce);
 
   private:
-    std::size_t get_encoded_length(const record_size_t& rsize) const {
-      auto ret = encoded_length + rsize.mdlength + rsize.dlength;
-      assert(ret > 0);
-      return ret;
+    std::size_t get_encoded_length_after(
+        const record_t& record,
+        extent_len_t block_size) const {
+      return pending.size.get_encoded_length_after(
+          record.size, block_size);
     }
 
     state_t state = state_t::EMPTY;
     std::size_t index = 0;
     std::size_t batch_capacity = 0;
     std::size_t batch_flush_size = 0;
-    segment_off_t encoded_length = 0;
-    std::vector<record_t> records;
-    std::vector<record_size_t> record_sizes;
+
+    record_group_t pending;
+    std::size_t submitting_size = 0;
+    segment_off_t submitting_length = 0;
+
     std::optional<seastar::shared_promise<maybe_result_t> > io_promise;
   };
 
@@ -416,11 +421,11 @@ private:
     using submit_pending_ret = submit_pending_ertr::future<
       record_locator_t>;
     submit_pending_ret submit_pending(
-        record_t&&, const record_size_t&, OrderingHandle &handle, bool flush);
+        record_t&&, OrderingHandle &handle, bool flush);
 
     using do_submit_ret = submit_pending_ret;
     do_submit_ret do_submit(
-        record_t&&, const record_size_t&, OrderingHandle&);
+        record_t&&, OrderingHandle&);
 
     state_t state = state_t::IDLE;
     std::size_t num_outstanding_io = 0;
@@ -456,11 +461,6 @@ private:
   prep_replay_segments_fut prep_replay_segments(
     std::vector<std::pair<segment_id_t, segment_header_t>> segments);
 
-  /// attempts to decode deltas from bl, return nullopt if unsuccessful
-  std::optional<std::vector<delta_info_t>> try_decode_deltas(
-    record_header_t header,
-    const bufferlist &bl);
-
   /// replays records starting at start through end of segment
   replay_ertr::future<>
   replay_segment(
index 6f77b3f1f8e1ebe1ee5b341e7d77d8b8010dce2c..839b2cc1c3cf31a352270a85b99f0845b32e12e8 100644 (file)
@@ -2,6 +2,15 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "crimson/os/seastore/seastore_types.h"
+#include "crimson/common/log.h"
+
+namespace {
+
+seastar::logger& logger() {
+  return crimson::get_logger(ceph_subsys_seastore);
+}
+
+}
 
 namespace crimson::os::seastore {
 
@@ -125,38 +134,42 @@ std::ostream &operator<<(std::ostream &lhs, const delta_info_t &rhs)
             << ")";
 }
 
-extent_len_t get_encoded_record_raw_mdlength(
-  const record_t &record,
-  size_t block_size) {
-  extent_len_t metadata =
-    (extent_len_t)ceph::encoded_sizeof_bounded<record_header_t>();
-  metadata += sizeof(checksum_t) /* crc */;
-  metadata += record.extents.size() *
-    ceph::encoded_sizeof_bounded<extent_info_t>();
-  for (const auto &i: record.deltas) {
-    metadata += ceph::encoded_sizeof(i);
-  }
-  return metadata;
+void record_size_t::account_extent(extent_len_t extent_len)
+{
+  assert(extent_len);
+  plain_mdlength += ceph::encoded_sizeof_bounded<extent_info_t>();
+  dlength += extent_len;
 }
 
-record_size_t get_encoded_record_length(
-  const record_t &record,
-  size_t block_size) {
-  extent_len_t raw_mdlength =
-    get_encoded_record_raw_mdlength(record, block_size);
-  extent_len_t mdlength =
-    p2roundup(raw_mdlength, (extent_len_t)block_size);
-  extent_len_t dlength = 0;
-  for (const auto &i: record.extents) {
-    dlength += i.bl.length();
-  }
-  return record_size_t{raw_mdlength, mdlength, dlength};
+void record_size_t::account(const delta_info_t& delta)
+{
+  assert(delta.bl.length());
+  plain_mdlength += ceph::encoded_sizeof(delta);
+}
+
+extent_len_t record_size_t::get_raw_mdlength() const
+{
+  return plain_mdlength +
+         sizeof(checksum_t) +
+         ceph::encoded_sizeof_bounded<record_header_t>();
+}
+
+void record_group_size_t::account(
+    const record_size_t& rsize,
+    extent_len_t _block_size)
+{
+  assert(_block_size > 0);
+  assert(rsize.dlength % _block_size == 0);
+  assert(block_size == 0 || block_size == _block_size);
+  raw_mdlength += rsize.get_raw_mdlength();
+  mdlength += rsize.get_mdlength(_block_size);
+  dlength += rsize.dlength;
+  block_size = _block_size;
 }
 
 ceph::bufferlist encode_record(
-  record_size_t rsize,
-  record_t &&record,
-  size_t block_size,
+  record_t&& record,
+  extent_len_t block_size,
   const journal_seq_t& committed_to,
   segment_nonce_t current_segment_nonce)
 {
@@ -167,17 +180,17 @@ ceph::bufferlist encode_record(
 
   bufferlist bl;
   record_header_t header{
-    rsize.mdlength,
-    rsize.dlength,
-    (uint32_t)record.deltas.size(),
-    (uint32_t)record.extents.size(),
+    record.size.get_mdlength(block_size),
+    record.size.dlength,
+    (extent_len_t)record.deltas.size(),
+    (extent_len_t)record.extents.size(),
     current_segment_nonce,
     committed_to,
     data_bl.crc32c(-1)
   };
   encode(header, bl);
 
-  auto metadata_crc_filler = bl.append_hole(sizeof(uint32_t));
+  auto metadata_crc_filler = bl.append_hole(sizeof(checksum_t));
 
   for (const auto &i: record.extents) {
     encode(extent_info_t(i), bl);
@@ -185,19 +198,19 @@ ceph::bufferlist encode_record(
   for (const auto &i: record.deltas) {
     encode(i, bl);
   }
-  ceph_assert(bl.length() == rsize.raw_mdlength);
+  ceph_assert(bl.length() == record.size.get_raw_mdlength());
 
-  if (bl.length() % block_size != 0) {
-    bl.append_zero(
-      block_size - (bl.length() % block_size));
+  auto aligned_mdlength = record.size.get_mdlength(block_size);
+  if (bl.length() != aligned_mdlength) {
+    assert(bl.length() < aligned_mdlength);
+    bl.append_zero(aligned_mdlength - bl.length());
   }
-  ceph_assert(bl.length() == rsize.mdlength);
 
   auto bliter = bl.cbegin();
   auto metadata_crc = bliter.crc32c(
     ceph::encoded_sizeof_bounded<record_header_t>(),
     -1);
-  bliter += sizeof(checksum_t); /* crc hole again */
+  bliter += sizeof(checksum_t); /* metadata crc hole */
   metadata_crc = bliter.crc32c(
     bliter.get_remaining(),
     metadata_crc);
@@ -208,11 +221,143 @@ ceph::bufferlist encode_record(
     reinterpret_cast<const char *>(&metadata_crc_le));
 
   bl.claim_append(data_bl);
-  ceph_assert(bl.length() == (rsize.dlength + rsize.mdlength));
+  ceph_assert(bl.length() == (record.size.get_encoded_length(block_size)));
+
+  return bl;
+}
+
+ceph::bufferlist encode_records(
+  record_group_t& record_group,
+  const journal_seq_t& committed_to,
+  segment_nonce_t current_segment_nonce)
+{
+  assert(record_group.size.block_size > 0);
+  assert(record_group.records.size() > 0);
+
+  bufferlist bl;
+  for (auto& r: record_group.records) {
+    bl.claim_append(
+        encode_record(
+          std::move(r),
+          record_group.size.block_size,
+          committed_to,
+          current_segment_nonce));
+  }
+  ceph_assert(bl.length() == record_group.size.get_encoded_length());
 
+  record_group.clear();
   return bl;
 }
 
+std::optional<record_header_t>
+try_decode_record_header(
+    const ceph::bufferlist& header_bl,
+    segment_nonce_t expected_nonce)
+{
+  auto bp = header_bl.cbegin();
+  record_header_t header;
+  try {
+    decode(header, bp);
+  } catch (ceph::buffer::error &e) {
+    logger().debug(
+        "try_decode_record_header: failed, "
+        "cannot decode record_header_t, got {}.",
+        e);
+    return std::nullopt;
+  }
+  if (header.segment_nonce != expected_nonce) {
+    logger().debug(
+        "try_decode_record_header: failed, record_header nonce mismatch, "
+        "read {}, expected {}!",
+        header.segment_nonce,
+        expected_nonce);
+    return std::nullopt;
+  }
+  return header;
+}
+
+bool validate_record_metadata(
+    const ceph::bufferlist& md_bl)
+{
+  auto bliter = md_bl.cbegin();
+  auto test_crc = bliter.crc32c(
+    ceph::encoded_sizeof_bounded<record_header_t>(),
+    -1);
+  ceph_le32 recorded_crc_le;
+  decode(recorded_crc_le, bliter);
+  uint32_t recorded_crc = recorded_crc_le;
+  test_crc = bliter.crc32c(
+    bliter.get_remaining(),
+    test_crc);
+  bool success = (test_crc == recorded_crc);
+  if (!success) {
+    logger().debug("validate_record_metadata: failed, metadata crc mismatch.");
+  }
+  return success;
+}
+
+bool validate_record_data(
+    const record_header_t& header,
+    const ceph::bufferlist& data_bl)
+{
+  bool success = (data_bl.crc32c(-1) == header.data_crc);
+  if (!success) {
+    logger().debug("validate_record_data: failed, data crc mismatch!");
+  }
+  return success;
+}
+
+std::optional<record_extent_infos_t>
+try_decode_extent_info(
+    const record_header_t& header,
+    const ceph::bufferlist& md_bl)
+{
+  auto bliter = md_bl.cbegin();
+  bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+  bliter += sizeof(checksum_t) /* metadata crc hole */;
+
+  record_extent_infos_t record_extent_info;
+  record_extent_info.extent_infos.resize(header.extents);
+  for (auto& i: record_extent_info.extent_infos) {
+    try {
+      decode(i, bliter);
+    } catch (ceph::buffer::error &e) {
+      logger().debug(
+          "try_decode_extent_infos: failed, "
+          "cannot decode extent_info_t, got {}.",
+          e);
+      return std::nullopt;
+    }
+  }
+  return record_extent_info;
+}
+
+std::optional<record_deltas_t>
+try_decode_deltas(
+    const record_header_t& header,
+    const ceph::bufferlist& md_bl)
+{
+  auto bliter = md_bl.cbegin();
+  bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+  bliter += sizeof(checksum_t) /* metadata crc hole */;
+  bliter += header.extents  * ceph::encoded_sizeof_bounded<extent_info_t>();
+
+  record_deltas_t record_delta;
+  record_delta.deltas.resize(header.deltas);
+  for (auto& i: record_delta.deltas) {
+    try {
+      decode(i, bliter);
+    } catch (ceph::buffer::error &e) {
+      logger().debug(
+          "try_decode_deltas: failed, "
+          "cannot decode delta_info_t, got {}.",
+          e);
+      return std::nullopt;
+    }
+  }
+  return record_delta;
+}
+
 bool can_delay_allocation(device_type_t type) {
   // Some types of device may not support delayed allocation, for example PMEM.
   return type <= RANDOM_BLOCK;
index 2c115df23c9194550b35759c0294aa37afc639de..bd05d2271592526cb929f3f5899d8523760b599d 100644 (file)
@@ -5,7 +5,9 @@
 
 #include <limits>
 #include <numeric>
+#include <optional>
 #include <iostream>
+#include <vector>
 
 #include "include/byteorder.h"
 #include "include/denc.h"
@@ -822,27 +824,6 @@ struct delta_info_t {
 
 std::ostream &operator<<(std::ostream &lhs, const delta_info_t &rhs);
 
-struct record_t {
-  std::vector<extent_t> extents;
-  std::vector<delta_info_t> deltas;
-
-  std::size_t get_raw_data_size() const {
-    auto extent_size = std::accumulate(
-        extents.begin(), extents.end(), 0,
-        [](uint64_t sum, auto& extent) {
-          return sum + extent.bl.length();
-        }
-    );
-    auto delta_size = std::accumulate(
-        deltas.begin(), deltas.end(), 0,
-        [](uint64_t sum, auto& delta) {
-          return sum + delta.bl.length();
-        }
-    );
-    return extent_size + delta_size;
-  }
-};
-
 class object_data_t {
   laddr_t reserved_data_base = L_ADDR_NULL;
   extent_len_t reserved_data_len = 0;
@@ -1179,6 +1160,7 @@ struct extent_info_t {
     DENC_FINISH(p);
   }
 };
+std::ostream &operator<<(std::ostream &out, const extent_info_t &header);
 
 using segment_nonce_t = uint32_t;
 
@@ -1210,6 +1192,71 @@ struct segment_header_t {
 };
 std::ostream &operator<<(std::ostream &out, const segment_header_t &header);
 
+struct record_size_t {
+  extent_len_t plain_mdlength = 0; // mdlength without the record header
+  extent_len_t dlength = 0;
+
+  void account_extent(extent_len_t extent_len);
+
+  void account(const extent_t& extent) {
+    account_extent(extent.bl.length());
+  }
+
+  void account(const delta_info_t& delta);
+
+  // TODO: remove
+  extent_len_t get_raw_mdlength() const;
+
+  extent_len_t get_mdlength(extent_len_t block_size) const {
+    assert(block_size > 0);
+    return p2roundup(get_raw_mdlength(), block_size);
+  }
+
+  extent_len_t get_encoded_length(extent_len_t block_size) const {
+    assert(block_size > 0);
+    assert(dlength % block_size == 0);
+    return get_mdlength(block_size) + dlength;
+  }
+};
+
+struct record_t {
+  std::vector<extent_t> extents;
+  std::vector<delta_info_t> deltas;
+  record_size_t size;
+
+  record_t() = default;
+  record_t(std::vector<extent_t>&& _extents,
+           std::vector<delta_info_t>&& _deltas) {
+    for (auto& e: _extents) {
+      push_back(std::move(e));
+    }
+    for (auto& d: _deltas) {
+      push_back(std::move(d));
+    }
+  }
+
+  // the size of extents and delta buffers
+  std::size_t get_raw_data_size() const {
+    auto delta_size = std::accumulate(
+        deltas.begin(), deltas.end(), 0,
+        [](uint64_t sum, auto& delta) {
+          return sum + delta.bl.length();
+        }
+    );
+    return size.dlength + delta_size;
+  }
+
+  void push_back(extent_t&& extent) {
+    size.account(extent);
+    extents.push_back(std::move(extent));
+  }
+
+  void push_back(delta_info_t&& delta) {
+    size.account(delta);
+    deltas.push_back(std::move(delta));
+  }
+};
+
 struct record_header_t {
   // Fixed portion
   extent_len_t  mdlength;       // block aligned, length of metadata
@@ -1235,32 +1282,119 @@ struct record_header_t {
   }
 };
 
-std::ostream &operator<<(std::ostream &out, const extent_info_t &header);
-
-struct record_size_t {
+struct record_group_size_t {
   extent_len_t raw_mdlength = 0;
   extent_len_t mdlength = 0;
   extent_len_t dlength = 0;
+  extent_len_t block_size = 0;
+
+  record_group_size_t() = default;
+  record_group_size_t(
+      const record_size_t& rsize,
+      extent_len_t block_size) {
+    account(rsize, block_size);
+  }
+
+  extent_len_t get_raw_mdlength() const {
+    assert(block_size > 0);
+    return raw_mdlength;
+  }
+
+  extent_len_t get_mdlength() const {
+    assert(block_size > 0);
+    assert(mdlength % block_size == 0);
+    return mdlength;
+  }
+
+  extent_len_t get_encoded_length() const {
+    assert(block_size > 0);
+    assert(dlength % block_size == 0);
+    return get_mdlength() + dlength;
+  }
+
+  extent_len_t get_encoded_length_after(
+      const record_size_t& rsize,
+      extent_len_t block_size) const {
+    record_group_size_t tmp = *this;
+    tmp.account(rsize, block_size);
+    return tmp.get_encoded_length();
+  }
+
+  void account(const record_size_t& rsize,
+               extent_len_t block_size);
 };
 
-extent_len_t get_encoded_record_raw_mdlength(
-  const record_t &record,
-  size_t block_size);
+struct record_group_t {
+  std::vector<record_t> records;
+  record_group_size_t size;
 
-/**
- * Return <mdlength, dlength> pair denoting length of
- * metadata and blocks respectively.
- */
-record_size_t get_encoded_record_length(
-  const record_t &record,
-  size_t block_size);
+  record_group_t() = default;
+  record_group_t(
+      record_t&& record,
+      extent_len_t block_size) {
+    push_back(std::move(record), block_size);
+  }
+
+  std::size_t get_size() const {
+    return records.size();
+  }
+
+  void push_back(
+      record_t&& record,
+      extent_len_t block_size) {
+    size.account(record.size, block_size);
+    records.push_back(std::move(record));
+    assert(size.get_encoded_length() < MAX_SEG_OFF);
+  }
+
+  void reserve(std::size_t limit) {
+    records.reserve(limit);
+  }
+
+  void clear() {
+    records.clear();
+    size = {};
+  }
+};
 
 ceph::bufferlist encode_record(
-  record_size_t rsize,
-  record_t &&record,
-  size_t block_size,
+  record_t&& record,
+  extent_len_t block_size,
+  const journal_seq_t& committed_to,
+  segment_nonce_t current_segment_nonce);
+
+ceph::bufferlist encode_records(
+  record_group_t& record_group,
   const journal_seq_t& committed_to,
-  segment_nonce_t current_segment_nonce = 0);
+  segment_nonce_t current_segment_nonce);
+
+std::optional<record_header_t>
+try_decode_record_header(
+    const ceph::bufferlist& header_bl,
+    segment_nonce_t expected_nonce);
+
+bool validate_record_metadata(
+    const ceph::bufferlist& md_bl);
+
+bool validate_record_data(
+    const record_header_t& header,
+    const ceph::bufferlist& data_bl);
+
+struct record_extent_infos_t {
+  std::vector<extent_info_t> extent_infos;
+};
+std::optional<record_extent_infos_t>
+try_decode_extent_info(
+    const record_header_t& header,
+    const ceph::bufferlist& md_bl);
+
+struct record_deltas_t {
+  std::vector<delta_info_t> deltas;
+};
+std::optional<record_deltas_t>
+try_decode_deltas(
+    const record_header_t& header,
+    const ceph::bufferlist& md_bl);
 
 struct write_result_t {
   journal_seq_t start_seq;