]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: merge records metadata if they are grouped
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 19 Nov 2021 05:17:43 +0000 (13:17 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 9 Dec 2021 01:37:05 +0000 (09:37 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/cache.cc
src/crimson/os/seastore/extent_reader.cc
src/crimson/os/seastore/extent_reader.h
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/seastore_types.cc
src/crimson/os/seastore/seastore_types.h

index 57a3c8cf795eca2ada314dee6650f8718b5ff4f0..9ab7c9e6f19857184f77338531bbf056bdb727f6 100644 (file)
@@ -1087,6 +1087,7 @@ record_t Cache::prepare_record(Transaction &t)
   record_header_fullness.ool_stats.filled_bytes += ool_stats.header_raw_bytes;
   record_header_fullness.ool_stats.total_bytes += ool_stats.header_bytes;
 
+  // TODO: move to Journal to get accurate result
   auto record_size = record_group_size_t(
       record.size, reader.get_block_size());
   auto inline_overhead =
index 46b4165055fe40f2be4fad53a0a7acb5f47a4506..dd245d5eaf7b26415bf24b259ad098186d83c302 100644 (file)
@@ -72,12 +72,14 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents(
   ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
     auto segment_nonce = segment_header.segment_nonce;
     return seastar::do_with(
-      found_record_handler_t([extents, this](
+      found_record_handler_t([extents](
         record_locator_t locator,
-        const record_header_t& header,
+        const record_group_header_t& header,
         const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
       {
-        auto maybe_record_extent_infos = try_decode_extent_info(header, mdbuf);
+        logger().debug("ExtentReader::scan_extents: decoding {} records",
+                       header.records);
+        auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf);
         if (!maybe_record_extent_infos) {
           // This should be impossible, we did check the crc on the mdbuf
           logger().error(
@@ -87,17 +89,19 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents(
         }
 
         paddr_t extent_offset = locator.record_block_base;
-        logger().debug("ExtentReader::scan_extents: decoded {} extents",
-                       maybe_record_extent_infos->extent_infos.size());
-        for (const auto &i : maybe_record_extent_infos->extent_infos) {
-          extents->emplace_back(extent_offset, i);
-          auto& seg_addr = extent_offset.as_seg_paddr();
-          seg_addr.set_segment_off(
-            seg_addr.get_segment_off() + i.len);
+        for (auto& r: *maybe_record_extent_infos) {
+          logger().debug("ExtentReader::scan_extents: decoded {} extents",
+                         r.extent_infos.size());
+          for (const auto &i : r.extent_infos) {
+            extents->emplace_back(extent_offset, i);
+            auto& seg_addr = extent_offset.as_seg_paddr();
+            seg_addr.set_segment_off(
+              seg_addr.get_segment_off() + i.len);
+          }
         }
         return scan_extents_ertr::now();
       }),
-      [=, &cursor](auto &dhandler) {
+      [bytes_to_read, segment_nonce, &cursor, this](auto &dhandler) {
         return scan_valid_records(
           cursor,
           segment_nonce,
@@ -150,7 +154,7 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
              ceph_assert(cursor.last_committed == journal_seq_t() ||
                          cursor.last_committed <= new_committed_to);
              cursor.last_committed = new_committed_to;
-             cursor.pending_records.emplace_back(
+             cursor.pending_record_groups.emplace_back(
                cursor.seq.offset,
                header,
                std::move(md_bl));
@@ -164,7 +168,7 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
              [=, &budget_used, &cursor, &handler] {
                logger().debug(
                  "ExtentReader::scan_valid_records: valid record read, processing queue");
-               if (cursor.pending_records.empty()) {
+               if (cursor.pending_record_groups.empty()) {
                  /* This is only possible if the segment is empty.
                   * A record's last_commited must be prior to its own
                   * location since it itself cannot yet have been committed
@@ -173,7 +177,7 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
                  return scan_valid_records_ertr::make_ready_future<
                    seastar::stop_iteration>(seastar::stop_iteration::yes);
                }
-               auto &next = cursor.pending_records.front();
+               auto &next = cursor.pending_record_groups.front();
                journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset};
                if (cursor.last_committed == journal_seq_t() ||
                    next_seq > cursor.last_committed) {
@@ -188,12 +192,12 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
              });
          });
        } else {
-         assert(!cursor.pending_records.empty());
-         auto &next = cursor.pending_records.front();
+         assert(!cursor.pending_record_groups.empty());
+         auto &next = cursor.pending_record_groups.front();
          return read_validate_data(next.offset, next.header
          ).safe_then([this, &budget_used, &cursor, &handler](auto valid) {
            if (!valid) {
-             cursor.pending_records.clear();
+             cursor.pending_record_groups.clear();
              return scan_valid_records_ertr::now();
            }
             return consume_next_records(cursor, handler, budget_used);
@@ -237,7 +241,7 @@ ExtentReader::read_validate_record_metadata(
         segment_manager.get_block_size());
     bufferlist bl;
     bl.append(bptr);
-    auto maybe_header = try_decode_record_header(bl, nonce);
+    auto maybe_header = try_decode_records_header(bl, nonce);
     if (!maybe_header.has_value()) {
       return read_validate_record_metadata_ret(
         read_validate_record_metadata_ertr::ready_future_marker{},
@@ -275,7 +279,7 @@ ExtentReader::read_validate_record_metadata(
         std::make_pair(std::move(header), std::move(bl)));
     });
   }).safe_then([](auto p) {
-    if (p && validate_record_metadata(p->second)) {
+    if (p && validate_records_metadata(p->second)) {
       return read_validate_record_metadata_ret(
         read_validate_record_metadata_ertr::ready_future_marker{},
         std::move(*p)
@@ -291,7 +295,7 @@ ExtentReader::read_validate_record_metadata(
 ExtentReader::read_validate_data_ret
 ExtentReader::read_validate_data(
   paddr_t record_base,
-  const record_header_t &header)
+  const record_group_header_t &header)
 {
   auto& segment_manager = *segment_managers[record_base.get_device_id()];
   auto data_addr = record_base.add_offset(header.mdlength);
@@ -303,7 +307,7 @@ ExtentReader::read_validate_data(
   ).safe_then([=, &header](auto bptr) {
     bufferlist bl;
     bl.append(bptr);
-    return validate_record_data(header, bl);
+    return validate_records_data(header, bl);
   });
 }
 
@@ -313,7 +317,7 @@ ExtentReader::consume_next_records(
   found_record_handler_t& handler,
   std::size_t& budget_used)
 {
-  auto& next = cursor.pending_records.front();
+  auto& next = cursor.pending_record_groups.front();
   auto total_length = next.header.dlength + next.header.mdlength;
   budget_used += total_length;
   auto locator = record_locator_t{
@@ -331,7 +335,7 @@ ExtentReader::consume_next_records(
     next.header,
     next.mdbuffer
   ).safe_then([&cursor] {
-    cursor.pending_records.pop_front();
+    cursor.pending_record_groups.pop_front();
   });
 }
 
index dfa2cd5b67dc5225b13b18a8675583b0b9cd1f3c..b6e5013cba79f20a30154ed69e7a9fe26ef338b3 100644 (file)
@@ -59,7 +59,7 @@ public:
       record_locator_t record_locator,
       // callee may assume header and bl will remain valid until
       // returned future resolves
-      const record_header_t &header,
+      const record_group_header_t &header,
       const bufferlist &mdbuf)>;
   scan_valid_records_ret scan_valid_records(
     scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
@@ -92,7 +92,7 @@ private:
   using read_validate_record_metadata_ertr = read_ertr;
   using read_validate_record_metadata_ret =
     read_validate_record_metadata_ertr::future<
-      std::optional<std::pair<record_header_t, bufferlist>>
+      std::optional<std::pair<record_group_header_t, bufferlist>>
     >;
   read_validate_record_metadata_ret read_validate_record_metadata(
     paddr_t start,
@@ -103,8 +103,8 @@ private:
   using read_validate_data_ret = read_validate_data_ertr::future<bool>;
   read_validate_data_ret read_validate_data(
     paddr_t record_base,
-    const record_header_t &header  ///< caller must ensure lifetime through
-                                   ///  future resolution
+    const record_group_header_t &header  ///< caller must ensure lifetime through
+                                         ///  future resolution
   );
 
   using consume_record_group_ertr = scan_valid_records_ertr;
index 3d4eb66df561a88fd3c7713d14160a23d894603d..71534cbc1e8ba903f10c6eb7da65de2f551ef83c 100644 (file)
@@ -155,12 +155,14 @@ Journal::replay_segment(
     scan_valid_records_cursor(seq),
     ExtentReader::found_record_handler_t([=, &handler](
       record_locator_t locator,
-      const record_header_t& header,
+      const record_group_header_t& header,
       const bufferlist& mdbuf)
       -> ExtentReader::scan_valid_records_ertr::future<>
     {
+      logger().debug("Journal::replay_segment: decoding {} records",
+                     header.records);
       auto maybe_record_deltas_list = try_decode_deltas(
-        header, mdbuf);
+          header, mdbuf, locator.record_block_base);
       if (!maybe_record_deltas_list) {
         // This should be impossible, we did check the crc on the mdbuf
         logger().error(
@@ -171,37 +173,48 @@ Journal::replay_segment(
 
       return seastar::do_with(
         std::move(*maybe_record_deltas_list),
-        [locator,
+        [write_result=locator.write_result,
          this,
-         &handler](record_deltas_t& record_deltas)
+         &handler](auto& record_deltas_list)
       {
-        logger().debug("Journal::replay_segment: decoded {} deltas at block_base {}",
-                       record_deltas.deltas.size(),
-                       locator.record_block_base);
         return crimson::do_for_each(
-          record_deltas.deltas,
-          [locator,
+          record_deltas_list,
+          [write_result,
            this,
-           &handler](delta_info_t& delta)
+           &handler](record_deltas_t& record_deltas)
         {
-          /* The journal may validly contain deltas for extents in
-           * since released segments.  We can detect those cases by
-           * checking whether the segment in question currently has a
-           * sequence number > the current journal segment seq. We can
-           * safetly skip these deltas because the extent must already
-           * have been rewritten.
-           *
-           * Note, this comparison exploits the fact that
-           * SEGMENT_SEQ_NULL is a large number.
-           */
-          auto& seg_addr = delta.paddr.as_seg_paddr();
-          if (delta.paddr != P_ADDR_NULL &&
-              (segment_provider->get_seq(seg_addr.get_segment_id()) >
-               locator.write_result.start_seq.segment_seq)) {
-            return replay_ertr::now();
-          } else {
-            return handler(locator, delta);
-          }
+          logger().debug("Journal::replay_segment: decoded {} deltas at block_base {}",
+                         record_deltas.deltas.size(),
+                         record_deltas.record_block_base);
+          auto locator = record_locator_t{
+            record_deltas.record_block_base,
+            write_result
+          };
+          return crimson::do_for_each(
+            record_deltas.deltas,
+            [locator,
+             this,
+             &handler](delta_info_t& delta)
+          {
+            /* The journal may validly contain deltas for extents in
+             * since released segments.  We can detect those cases by
+             * checking whether the segment in question currently has a
+             * sequence number > the current journal segment seq. We can
+             * safetly skip these deltas because the extent must already
+             * have been rewritten.
+             *
+             * Note, this comparison exploits the fact that
+             * SEGMENT_SEQ_NULL is a large number.
+             */
+            auto& seg_addr = delta.paddr.as_seg_paddr();
+            if (delta.paddr != P_ADDR_NULL &&
+                (segment_provider->get_seq(seg_addr.get_segment_id()) >
+                 locator.write_result.start_seq.segment_seq)) {
+              return replay_ertr::now();
+            } else {
+              return handler(locator, delta);
+            }
+          });
         });
       });
     }),
@@ -432,30 +445,29 @@ Journal::RecordBatch::add_pending(
   assert(state != state_t::SUBMITTING);
   assert(can_batch(record, block_size) == new_encoded_length);
 
-  auto block_start_offset = record.size.get_mdlength(block_size);
-  if (state != state_t::EMPTY) {
-    block_start_offset += pending.size.get_encoded_length();
-  }
+  auto dlength_offset = pending.current_dlength;
   pending.push_back(
       std::move(record), block_size);
   assert(pending.size.get_encoded_length() == new_encoded_length);
   if (state == state_t::EMPTY) {
     assert(!io_promise.has_value());
-    io_promise = seastar::shared_promise<maybe_result_t>();
+    io_promise = seastar::shared_promise<maybe_promise_result_t>();
   } else {
     assert(io_promise.has_value());
   }
   state = state_t::PENDING;
 
   return io_promise->get_shared_future(
-  ).then([block_start_offset
-         ](auto maybe_write_result) -> add_pending_ret {
-    if (!maybe_write_result.has_value()) {
+  ).then([dlength_offset
+         ](auto maybe_promise_result) -> add_pending_ret {
+    if (!maybe_promise_result.has_value()) {
       return crimson::ct_error::input_output_error::make();
     }
+    auto write_result = maybe_promise_result->write_result;
     auto submit_result = record_locator_t{
-      maybe_write_result->start_seq.offset.add_offset(block_start_offset),
-      *maybe_write_result
+      write_result.start_seq.offset.add_offset(
+          maybe_promise_result->mdlength + dlength_offset),
+      write_result
     };
     return add_pending_ret(
       add_pending_ertr::ready_future_marker{},
@@ -478,6 +490,7 @@ ceph::bufferlist Journal::RecordBatch::encode_batch(
   state = state_t::SUBMITTING;
   submitting_size = pending.get_size();
   submitting_length = pending.size.get_encoded_length();
+  submitting_mdlength = pending.size.get_mdlength();
   auto bl = encode_records(pending, committed_to, segment_nonce);
   // Note: pending is cleared here
   assert(bl.length() == (std::size_t)submitting_length);
@@ -487,6 +500,7 @@ ceph::bufferlist Journal::RecordBatch::encode_batch(
 void Journal::RecordBatch::set_result(
   maybe_result_t maybe_write_result)
 {
+  maybe_promise_result_t result;
   if (maybe_write_result.has_value()) {
     logger().debug(
       "Journal::RecordBatch::set_result: batches={}, write_start {} + {}",
@@ -494,6 +508,10 @@ void Journal::RecordBatch::set_result(
       maybe_write_result->start_seq,
       maybe_write_result->length);
     assert(maybe_write_result->length == submitting_length);
+    result = promise_result_t{
+      *maybe_write_result,
+      submitting_mdlength
+    };
   } else {
     logger().error(
       "Journal::RecordBatch::set_result: batches={}, write is failed!",
@@ -505,7 +523,8 @@ void Journal::RecordBatch::set_result(
   state = state_t::EMPTY;
   submitting_size = 0;
   submitting_length = 0;
-  io_promise->set_value(maybe_write_result);
+  submitting_mdlength = 0;
+  io_promise->set_value(result);
   io_promise.reset();
 }
 
index ba4c4498efee00a75feca4cde06599725f192f22..60484afce613aeda41bd1fed7978acf6106d364b 100644 (file)
@@ -327,8 +327,14 @@ private:
     record_group_t pending;
     std::size_t submitting_size = 0;
     segment_off_t submitting_length = 0;
+    segment_off_t submitting_mdlength = 0;
 
-    std::optional<seastar::shared_promise<maybe_result_t> > io_promise;
+    struct promise_result_t {
+      write_result_t write_result;
+      segment_off_t mdlength;
+    };
+    using maybe_promise_result_t = std::optional<promise_result_t>;
+    std::optional<seastar::shared_promise<maybe_promise_result_t> > io_promise;
   };
 
   class RecordSubmitter {
index 2901ab1a6f6a75104092ae8d0583cb96b9c43c78..e51f9f08689a07219968d91526aa921b4338927e 100644 (file)
@@ -147,11 +147,11 @@ void record_size_t::account(const delta_info_t& delta)
   plain_mdlength += ceph::encoded_sizeof(delta);
 }
 
-extent_len_t record_size_t::get_raw_mdlength() const
+extent_len_t record_group_size_t::get_raw_mdlength() const
 {
   return plain_mdlength +
          sizeof(checksum_t) +
-         ceph::encoded_sizeof_bounded<record_header_t>();
+         ceph::encoded_sizeof_bounded<record_group_header_t>();
 }
 
 void record_group_size_t::account(
@@ -163,8 +163,10 @@ void record_group_size_t::account(
   assert(_block_size > 0);
   assert(rsize.dlength % _block_size == 0);
   assert(block_size == 0 || block_size == _block_size);
-  raw_mdlength += rsize.get_raw_mdlength();
-  mdlength += rsize.get_mdlength(_block_size);
+  plain_mdlength += (
+      rsize.plain_mdlength +
+      ceph::encoded_sizeof_bounded<record_header_t>()
+  );
   dlength += rsize.dlength;
   block_size = _block_size;
 }
@@ -175,17 +177,34 @@ ceph::bufferlist encode_record(
   const journal_seq_t& committed_to,
   segment_nonce_t current_segment_nonce)
 {
+  record_group_t record_group(std::move(record), block_size);
+  return encode_records(
+      record_group,
+      committed_to,
+      current_segment_nonce);
+}
+
+ceph::bufferlist encode_records(
+  record_group_t& record_group,
+  const journal_seq_t& committed_to,
+  segment_nonce_t current_segment_nonce)
+{
+  assert(record_group.size.block_size > 0);
+  assert(record_group.records.size() > 0);
+
   bufferlist data_bl;
-  for (auto &i: record.extents) {
-    data_bl.append(i.bl);
+  for (auto& r: record_group.records) {
+    for (auto& i: r.extents) {
+      assert(i.bl.length());
+      data_bl.append(i.bl);
+    }
   }
 
   bufferlist bl;
-  record_header_t header{
-    record.size.get_mdlength(block_size),
-    record.size.dlength,
-    (extent_len_t)record.deltas.size(),
-    (extent_len_t)record.extents.size(),
+  record_group_header_t header{
+    static_cast<extent_len_t>(record_group.records.size()),
+    record_group.size.get_mdlength(),
+    record_group.size.dlength,
     current_segment_nonce,
     committed_to,
     data_bl.crc32c(-1)
@@ -194,15 +213,26 @@ ceph::bufferlist encode_record(
 
   auto metadata_crc_filler = bl.append_hole(sizeof(checksum_t));
 
-  for (const auto &i: record.extents) {
-    encode(extent_info_t(i), bl);
+  for (auto& r: record_group.records) {
+    record_header_t rheader{
+      (extent_len_t)r.deltas.size(),
+      (extent_len_t)r.extents.size(),
+    };
+    encode(rheader, bl);
   }
-  for (const auto &i: record.deltas) {
-    encode(i, bl);
+  for (auto& r: record_group.records) {
+    for (const auto& i: r.extents) {
+      encode(extent_info_t(i), bl);
+    }
   }
-  ceph_assert(bl.length() == record.size.get_raw_mdlength());
+  for (auto& r: record_group.records) {
+    for (const auto& i: r.deltas) {
+      encode(i, bl);
+    }
+  }
+  ceph_assert(bl.length() == record_group.size.get_raw_mdlength());
 
-  auto aligned_mdlength = record.size.get_mdlength(block_size);
+  auto aligned_mdlength = record_group.size.get_mdlength();
   if (bl.length() != aligned_mdlength) {
     assert(bl.length() < aligned_mdlength);
     bl.append_zero(aligned_mdlength - bl.length());
@@ -210,7 +240,7 @@ ceph::bufferlist encode_record(
 
   auto bliter = bl.cbegin();
   auto metadata_crc = bliter.crc32c(
-    ceph::encoded_sizeof_bounded<record_header_t>(),
+    ceph::encoded_sizeof_bounded<record_group_header_t>(),
     -1);
   bliter += sizeof(checksum_t); /* metadata crc hole */
   metadata_crc = bliter.crc32c(
@@ -223,53 +253,31 @@ ceph::bufferlist encode_record(
     reinterpret_cast<const char *>(&metadata_crc_le));
 
   bl.claim_append(data_bl);
-  ceph_assert(bl.length() == (record.size.get_encoded_length(block_size)));
-
-  return bl;
-}
-
-ceph::bufferlist encode_records(
-  record_group_t& record_group,
-  const journal_seq_t& committed_to,
-  segment_nonce_t current_segment_nonce)
-{
-  assert(record_group.size.block_size > 0);
-  assert(record_group.records.size() > 0);
-
-  bufferlist bl;
-  for (auto& r: record_group.records) {
-    bl.claim_append(
-        encode_record(
-          std::move(r),
-          record_group.size.block_size,
-          committed_to,
-          current_segment_nonce));
-  }
   ceph_assert(bl.length() == record_group.size.get_encoded_length());
 
   record_group.clear();
   return bl;
 }
 
-std::optional<record_header_t>
-try_decode_record_header(
+std::optional<record_group_header_t>
+try_decode_records_header(
     const ceph::bufferlist& header_bl,
     segment_nonce_t expected_nonce)
 {
   auto bp = header_bl.cbegin();
-  record_header_t header;
+  record_group_header_t header;
   try {
     decode(header, bp);
   } catch (ceph::buffer::error &e) {
     logger().debug(
-        "try_decode_record_header: failed, "
-        "cannot decode record_header_t, got {}.",
+        "try_decode_records_header: failed, "
+        "cannot decode record_group_header_t, got {}.",
         e);
     return std::nullopt;
   }
   if (header.segment_nonce != expected_nonce) {
     logger().debug(
-        "try_decode_record_header: failed, record_header nonce mismatch, "
+        "try_decode_records_header: failed, record_group_header nonce mismatch, "
         "read {}, expected {}!",
         header.segment_nonce,
         expected_nonce);
@@ -278,12 +286,12 @@ try_decode_record_header(
   return header;
 }
 
-bool validate_record_metadata(
+bool validate_records_metadata(
     const ceph::bufferlist& md_bl)
 {
   auto bliter = md_bl.cbegin();
   auto test_crc = bliter.crc32c(
-    ceph::encoded_sizeof_bounded<record_header_t>(),
+    ceph::encoded_sizeof_bounded<record_group_header_t>(),
     -1);
   ceph_le32 recorded_crc_le;
   decode(recorded_crc_le, bliter);
@@ -293,71 +301,136 @@ bool validate_record_metadata(
     test_crc);
   bool success = (test_crc == recorded_crc);
   if (!success) {
-    logger().debug("validate_record_metadata: failed, metadata crc mismatch.");
+    logger().debug("validate_records_metadata: failed, metadata crc mismatch.");
   }
   return success;
 }
 
-bool validate_record_data(
-    const record_header_t& header,
+bool validate_records_data(
+    const record_group_header_t& header,
     const ceph::bufferlist& data_bl)
 {
   bool success = (data_bl.crc32c(-1) == header.data_crc);
   if (!success) {
-    logger().debug("validate_record_data: failed, data crc mismatch!");
+    logger().debug("validate_records_data: failed, data crc mismatch!");
   }
   return success;
 }
 
-std::optional<record_extent_infos_t>
-try_decode_extent_info(
-    const record_header_t& header,
+namespace {
+
+std::optional<std::vector<record_header_t>>
+try_decode_record_headers(
+    const record_group_header_t& header,
     const ceph::bufferlist& md_bl)
 {
   auto bliter = md_bl.cbegin();
-  bliter += ceph::encoded_sizeof_bounded<record_header_t>();
-  bliter += sizeof(checksum_t) /* metadata crc hole */;
-
-  record_extent_infos_t record_extent_info;
-  record_extent_info.extent_infos.resize(header.extents);
-  for (auto& i: record_extent_info.extent_infos) {
+  bliter += ceph::encoded_sizeof_bounded<record_group_header_t>();
+  bliter += sizeof(checksum_t); /* metadata crc hole */
+  std::vector<record_header_t> record_headers(header.records);
+  for (auto &&i: record_headers) {
     try {
       decode(i, bliter);
     } catch (ceph::buffer::error &e) {
       logger().debug(
-          "try_decode_extent_infos: failed, "
-          "cannot decode extent_info_t, got {}.",
+          "try_decode_record_headers: failed, "
+          "cannot decode record_header_t, got {}.",
           e);
       return std::nullopt;
     }
   }
-  return record_extent_info;
+  return record_headers;
 }
 
-std::optional<record_deltas_t>
-try_decode_deltas(
-    const record_header_t& header,
+}
+
+std::optional<std::vector<record_extent_infos_t> >
+try_decode_extent_infos(
+    const record_group_header_t& header,
     const ceph::bufferlist& md_bl)
 {
+  auto maybe_headers = try_decode_record_headers(header, md_bl);
+  if (!maybe_headers) {
+    logger().debug(
+        "try_decode_extent_infos: failed, cannot decode record headers.");
+    return std::nullopt;
+  }
+
   auto bliter = md_bl.cbegin();
-  bliter += ceph::encoded_sizeof_bounded<record_header_t>();
-  bliter += sizeof(checksum_t) /* metadata crc hole */;
-  bliter += header.extents  * ceph::encoded_sizeof_bounded<extent_info_t>();
+  bliter += ceph::encoded_sizeof_bounded<record_group_header_t>();
+  bliter += sizeof(checksum_t); /* metadata crc hole */
+  bliter += (ceph::encoded_sizeof_bounded<record_header_t>() *
+             maybe_headers->size());
+
+  std::vector<record_extent_infos_t> record_extent_infos(
+      maybe_headers->size());
+  auto result_iter = record_extent_infos.begin();
+  for (auto& h: *maybe_headers) {
+    result_iter->header = h;
+    result_iter->extent_infos.resize(h.extents);
+    for (auto& i: result_iter->extent_infos) {
+      try {
+        decode(i, bliter);
+      } catch (ceph::buffer::error &e) {
+        logger().debug(
+            "try_decode_extent_infos: failed, "
+            "cannot decode extent_info_t, got {}.",
+            e);
+        return std::nullopt;
+      }
+    }
+    ++result_iter;
+  }
+  return record_extent_infos;
+}
 
-  record_deltas_t record_delta;
-  record_delta.deltas.resize(header.deltas);
-  for (auto& i: record_delta.deltas) {
-    try {
-      decode(i, bliter);
-    } catch (ceph::buffer::error &e) {
-      logger().debug(
-          "try_decode_deltas: failed, "
-          "cannot decode delta_info_t, got {}.",
-          e);
-      return std::nullopt;
+std::optional<std::vector<record_deltas_t> >
+try_decode_deltas(
+    const record_group_header_t& header,
+    const ceph::bufferlist& md_bl,
+    paddr_t record_block_base)
+{
+  auto maybe_record_extent_infos = try_decode_extent_infos(header, md_bl);
+  if (!maybe_record_extent_infos) {
+    logger().debug(
+        "try_decode_deltas: failed, cannot decode extent_infos.");
+    return std::nullopt;
+  }
+
+  auto bliter = md_bl.cbegin();
+  bliter += ceph::encoded_sizeof_bounded<record_group_header_t>();
+  bliter += sizeof(checksum_t); /* metadata crc hole */
+  bliter += (ceph::encoded_sizeof_bounded<record_header_t>() *
+             maybe_record_extent_infos->size());
+  for (auto& r: *maybe_record_extent_infos) {
+    bliter += (ceph::encoded_sizeof_bounded<extent_info_t>() *
+               r.extent_infos.size());
+  }
+
+  std::vector<record_deltas_t> record_deltas(
+      maybe_record_extent_infos->size());
+  auto result_iter = record_deltas.begin();
+  for (auto& r: *maybe_record_extent_infos) {
+    result_iter->record_block_base = record_block_base;
+    result_iter->deltas.resize(r.header.deltas);
+    for (auto& i: result_iter->deltas) {
+      try {
+        decode(i, bliter);
+      } catch (ceph::buffer::error &e) {
+        logger().debug(
+            "try_decode_deltas: failed, "
+            "cannot decode delta_info_t, got {}.",
+            e);
+        return std::nullopt;
+      }
+    }
+    for (auto& i: r.extent_infos) {
+      auto& seg_addr = record_block_base.as_seg_paddr();
+      seg_addr.set_segment_off(seg_addr.get_segment_off() + i.len);
     }
+    ++result_iter;
   }
-  return record_delta;
+  return record_deltas;
 }
 
 bool can_delay_allocation(device_type_t type) {
index 63582ce3d3121b4557d26348c79e037097b9960c..9d9119a2ec09364a3cd55757570f999a12a811c4 100644 (file)
@@ -1208,20 +1208,6 @@ struct record_size_t {
   }
 
   void account(const delta_info_t& delta);
-
-  // TODO: remove
-  extent_len_t get_raw_mdlength() const;
-
-  extent_len_t get_mdlength(extent_len_t block_size) const {
-    assert(block_size > 0);
-    return p2roundup(get_raw_mdlength(), block_size);
-  }
-
-  extent_len_t get_encoded_length(extent_len_t block_size) const {
-    assert(block_size > 0);
-    assert(dlength % block_size == 0);
-    return get_mdlength(block_size) + dlength;
-  }
 };
 
 struct record_t {
@@ -1268,23 +1254,33 @@ struct record_t {
 };
 
 struct record_header_t {
-  // Fixed portion
-  extent_len_t  mdlength;       // block aligned, length of metadata
-  extent_len_t  dlength;        // block aligned, length of data
   uint32_t deltas;              // number of deltas
   uint32_t extents;             // number of extents
+
+
+  DENC(record_header_t, v, p) {
+    DENC_START(1, 1, p);
+    denc(v.deltas, p);
+    denc(v.extents, p);
+    DENC_FINISH(p);
+  }
+};
+
+struct record_group_header_t {
+  uint32_t      records;
+  extent_len_t  mdlength;       // block aligned, length of metadata
+  extent_len_t  dlength;        // block aligned, length of data
   segment_nonce_t segment_nonce;// nonce of containing segment
   journal_seq_t committed_to;   // records prior to committed_to have been
                                 // fully written, maybe in another segment.
   checksum_t data_crc;          // crc of data payload
 
 
-  DENC(record_header_t, v, p) {
+  DENC(record_group_header_t, v, p) {
     DENC_START(1, 1, p);
+    denc(v.records, p);
     denc(v.mdlength, p);
     denc(v.dlength, p);
-    denc(v.deltas, p);
-    denc(v.extents, p);
     denc(v.segment_nonce, p);
     denc(v.committed_to, p);
     denc(v.data_crc, p);
@@ -1293,8 +1289,7 @@ struct record_header_t {
 };
 
 struct record_group_size_t {
-  extent_len_t raw_mdlength = 0;
-  extent_len_t mdlength = 0;
+  extent_len_t plain_mdlength = 0; // mdlength without the group header
   extent_len_t dlength = 0;
   extent_len_t block_size = 0;
 
@@ -1305,15 +1300,11 @@ struct record_group_size_t {
     account(rsize, block_size);
   }
 
-  extent_len_t get_raw_mdlength() const {
-    assert(block_size > 0);
-    return raw_mdlength;
-  }
+  extent_len_t get_raw_mdlength() const;
 
   extent_len_t get_mdlength() const {
     assert(block_size > 0);
-    assert(mdlength % block_size == 0);
-    return mdlength;
+    return p2roundup(get_raw_mdlength(), block_size);
   }
 
   extent_len_t get_encoded_length() const {
@@ -1337,6 +1328,7 @@ struct record_group_size_t {
 struct record_group_t {
   std::vector<record_t> records;
   record_group_size_t size;
+  extent_len_t current_dlength = 0;
 
   record_group_t() = default;
   record_group_t(
@@ -1353,6 +1345,7 @@ struct record_group_t {
       record_t&& record,
       extent_len_t block_size) {
     size.account(record.size, block_size);
+    current_dlength += record.size.dlength;
     records.push_back(std::move(record));
     assert(size.get_encoded_length() < MAX_SEG_OFF);
   }
@@ -1364,6 +1357,7 @@ struct record_group_t {
   void clear() {
     records.clear();
     size = {};
+    current_dlength = 0;
   }
 };
 
@@ -1378,33 +1372,36 @@ ceph::bufferlist encode_records(
   const journal_seq_t& committed_to,
   segment_nonce_t current_segment_nonce);
 
-std::optional<record_header_t>
-try_decode_record_header(
+std::optional<record_group_header_t>
+try_decode_records_header(
     const ceph::bufferlist& header_bl,
     segment_nonce_t expected_nonce);
 
-bool validate_record_metadata(
+bool validate_records_metadata(
     const ceph::bufferlist& md_bl);
 
-bool validate_record_data(
-    const record_header_t& header,
+bool validate_records_data(
+    const record_group_header_t& header,
     const ceph::bufferlist& data_bl);
 
 struct record_extent_infos_t {
+  record_header_t header;
   std::vector<extent_info_t> extent_infos;
 };
-std::optional<record_extent_infos_t>
-try_decode_extent_info(
-    const record_header_t& header,
+std::optional<std::vector<record_extent_infos_t> >
+try_decode_extent_infos(
+    const record_group_header_t& header,
     const ceph::bufferlist& md_bl);
 
 struct record_deltas_t {
+  paddr_t record_block_base;
   std::vector<delta_info_t> deltas;
 };
-std::optional<record_deltas_t>
+std::optional<std::vector<record_deltas_t> >
 try_decode_deltas(
-    const record_header_t& header,
-    const ceph::bufferlist& md_bl);
+    const record_group_header_t& header,
+    const ceph::bufferlist& md_bl,
+    paddr_t record_block_base);
 
 struct write_result_t {
   journal_seq_t start_seq;
@@ -1426,21 +1423,21 @@ struct scan_valid_records_cursor {
   journal_seq_t seq;
   journal_seq_t last_committed;
 
-  struct found_record_t {
+  struct found_record_group_t {
     paddr_t offset;
-    record_header_t header;
+    record_group_header_t header;
     bufferlist mdbuffer;
 
-    found_record_t(
+    found_record_group_t(
       paddr_t offset,
-      const record_header_t &header,
+      const record_group_header_t &header,
       const bufferlist &mdbuffer)
       : offset(offset), header(header), mdbuffer(mdbuffer) {}
   };
-  std::deque<found_record_t> pending_records;
+  std::deque<found_record_group_t> pending_record_groups;
 
   bool is_complete() const {
-    return last_valid_header_found && pending_records.empty();
+    return last_valid_header_found && pending_record_groups.empty();
   }
 
   segment_id_t get_segment_id() const {
@@ -1524,6 +1521,7 @@ WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::paddr_t)
 WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal_seq_t)
 WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::delta_info_t)
 WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::record_header_t)
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::record_group_header_t)
 WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::extent_info_t)
 WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::segment_header_t)
 WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::rbm_alloc_delta_t)