]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: scan records based on record_locator_t
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 16 Nov 2021 08:47:12 +0000 (16:47 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 9 Dec 2021 01:37:05 +0000 (09:37 +0800)
Record may not have its own base if headers are merged.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/extent_reader.cc
src/crimson/os/seastore/extent_reader.h
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/seastore_types.h

index e3c3a5a45c5cf8fe27c0564e155289de6026fb94..b257d266ea66a944bf6e6dd26ab13dce4bf092bd 100644 (file)
@@ -73,7 +73,7 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents(
     auto segment_nonce = segment_header.segment_nonce;
     return seastar::do_with(
       found_record_handler_t([extents, this](
-        paddr_t base,
+        record_locator_t locator,
         const record_header_t& header,
         const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
       {
@@ -82,11 +82,11 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents(
           // This should be impossible, we did check the crc on the mdbuf
           logger().error(
             "ExtentReader::scan_extents: unable to decode extents for record {}",
-            base);
+            locator.record_block_base);
           return crimson::ct_error::input_output_error::make();
         }
 
-        paddr_t extent_offset = base.add_offset(header.mdlength);
+        paddr_t extent_offset = locator.record_block_base;
         logger().debug("ExtentReader::scan_extents: decoded {} extents",
                        maybe_record_extent_infos->size());
         for (const auto &i : *maybe_record_extent_infos) {
@@ -358,8 +358,18 @@ ExtentReader::consume_next_records(
   auto& next = cursor.pending_records.front();
   auto total_length = next.header.dlength + next.header.mdlength;
   budget_used += total_length;
+  auto locator = record_locator_t{
+    next.offset.add_offset(next.header.mdlength),
+    write_result_t{
+      journal_seq_t{
+        cursor.seq.segment_seq,
+        next.offset
+      },
+      static_cast<segment_off_t>(total_length)
+    }
+  };
   return handler(
-    next.offset,
+    locator,
     next.header,
     next.mdbuffer
   ).safe_then([&cursor] {
index 99c5895847dce4f1ee24af0cad7537bc37741eef..243f3dc86d63067778f38be86c1c36eb06f291fa 100644 (file)
@@ -56,11 +56,11 @@ public:
     size_t>;
   using found_record_handler_t = std::function<
     scan_valid_records_ertr::future<>(
-      paddr_t record_block_base,
+      record_locator_t record_locator,
       // callee may assume header and bl will remain valid until
       // returned future resolves
       const record_header_t &header,
-      const bufferlist &bl)>;
+      const bufferlist &mdbuf)>;
   scan_valid_records_ret scan_valid_records(
     scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
     segment_nonce_t nonce,             ///< [in] nonce for segment
index c83cd4a494d133a5081862198ace92d9ee937452..f0331970236afc5993dc6060c190e9482df714b8 100644 (file)
@@ -174,7 +174,7 @@ Journal::replay_segment(
   return seastar::do_with(
     scan_valid_records_cursor(seq),
     ExtentReader::found_record_handler_t([=, &handler](
-      paddr_t base,
+      record_locator_t locator,
       const record_header_t& header,
       const bufferlist& mdbuf)
       -> ExtentReader::scan_valid_records_ertr::future<>
@@ -185,7 +185,7 @@ Journal::replay_segment(
         // This should be impossible, we did check the crc on the mdbuf
         logger().error(
           "Journal::replay_segment: unable to decode deltas for record {}",
-          base);
+          locator.record_block_base);
         return crimson::ct_error::input_output_error::make();
       }
 
@@ -193,9 +193,9 @@ Journal::replay_segment(
         std::move(*maybe_record_deltas_list),
         [=](auto &deltas)
       {
-        logger().debug("Journal::replay_segment: decoded {} deltas at base {}",
+        logger().debug("Journal::replay_segment: decoded {} deltas at block_base {}",
                        deltas.size(),
-                       base);
+                       locator.record_block_base);
         return crimson::do_for_each(
           deltas,
           [=](auto &delta)
@@ -216,14 +216,7 @@ Journal::replay_segment(
                seq.segment_seq)) {
             return replay_ertr::now();
           } else {
-            auto offsets = submit_result_t{
-              base.add_offset(header.mdlength),
-              write_result_t{
-                journal_seq_t{seq.segment_seq, base},
-                static_cast<segment_off_t>(header.mdlength + header.dlength)
-              }
-            };
-            return handler(offsets, delta);
+            return handler(locator, delta);
           }
         });
       });
@@ -474,7 +467,7 @@ Journal::RecordBatch::add_pending(
     if (!maybe_write_result.has_value()) {
       return crimson::ct_error::input_output_error::make();
     }
-    auto submit_result = submit_result_t{
+    auto submit_result = record_locator_t{
       maybe_write_result->start_seq.offset.add_offset(block_start_offset),
       *maybe_write_result
     };
@@ -681,7 +674,7 @@ Journal::RecordSubmitter::submit_pending(
         journal_segment_manager.get_nonce());
       return journal_segment_manager.write(to_write
       ).safe_then([rsize](auto write_result) {
-        return submit_result_t{
+        return record_locator_t{
           write_result.start_seq.offset.add_offset(rsize.mdlength),
           write_result
         };
index a8bf5372f05f54fba4c68eec4cfa10d2e5011f98..5aed2ed57e64b7a652a46e553b42c436b169b6ab 100644 (file)
@@ -85,24 +85,12 @@ public:
    *
    * write record with the ordering handle
    */
-  struct write_result_t {
-    journal_seq_t start_seq;
-    segment_off_t length;
-
-    journal_seq_t get_end_seq() const {
-      return start_seq.add_offset(length);
-    }
-  };
-  struct submit_result_t {
-    paddr_t record_block_base;
-    write_result_t write_result;
-  };
   using submit_record_ertr = crimson::errorator<
     crimson::ct_error::erange,
     crimson::ct_error::input_output_error
     >;
   using submit_record_ret = submit_record_ertr::future<
-    submit_result_t
+    record_locator_t
     >;
   submit_record_ret submit_record(
     record_t &&record,
@@ -120,7 +108,7 @@ public:
   using replay_ertr = SegmentManager::read_ertr;
   using replay_ret = replay_ertr::future<>;
   using delta_handler_t = std::function<
-    replay_ret(const submit_result_t&,
+    replay_ret(const record_locator_t&,
               const delta_info_t&)>;
   replay_ret replay(
     std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
@@ -294,7 +282,7 @@ private:
     // Set write_result_t::write_length to 0 if the record is not the first one
     // in the batch.
     using add_pending_ertr = JournalSegmentManager::write_ertr;
-    using add_pending_ret = add_pending_ertr::future<submit_result_t>;
+    using add_pending_ret = add_pending_ertr::future<record_locator_t>;
     add_pending_ret add_pending(record_t&&, const record_size_t&);
 
     // Encode the batched records for write.
@@ -426,7 +414,7 @@ private:
 
     using submit_pending_ertr = JournalSegmentManager::write_ertr;
     using submit_pending_ret = submit_pending_ertr::future<
-      submit_result_t>;
+      record_locator_t>;
     submit_pending_ret submit_pending(
         record_t&&, const record_size_t&, OrderingHandle &handle, bool flush);
 
index b8275cda4a0a9731f99399c50b6967fe9d58620c..2c115df23c9194550b35759c0294aa37afc639de 100644 (file)
@@ -1262,6 +1262,20 @@ ceph::bufferlist encode_record(
   const journal_seq_t& committed_to,
   segment_nonce_t current_segment_nonce = 0);
 
+struct write_result_t {
+  journal_seq_t start_seq;
+  segment_off_t length;
+
+  journal_seq_t get_end_seq() const {
+    return start_seq.add_offset(length);
+  }
+};
+
+struct record_locator_t {
+  paddr_t record_block_base;
+  write_result_t write_result;
+};
+
 /// scan segment for end incrementally
 struct scan_valid_records_cursor {
   bool last_valid_header_found = false;