]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore: misc cleanup and reformat
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 16 Nov 2021 06:23:15 +0000 (14:23 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 9 Dec 2021 01:37:05 +0000 (09:37 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/extent_reader.cc
src/crimson/os/seastore/extent_reader.h
src/crimson/os/seastore/journal.cc

index 8b407eb2c69181e920f0fbd43eb802011e8d8a68..bd95eb9dae7730ae340605a25675d8c5a3a0299c 100644 (file)
@@ -72,39 +72,40 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents(
   ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
     auto segment_nonce = segment_header.segment_nonce;
     return seastar::do_with(
-      found_record_handler_t(
-       [extents, this](
-         paddr_t base,
-         const record_header_t &header,
-         const bufferlist &mdbuf) mutable {
+      found_record_handler_t([extents, this](
+        paddr_t base,
+        const record_header_t& header,
+        const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
+      {
+        auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf);
+        if (!maybe_record_extent_infos) {
+          // This should be impossible, we did check the crc on the mdbuf
+          logger().error(
+            "ExtentReader::scan_extents: unable to decode extents for record {}",
+            base);
+          return crimson::ct_error::input_output_error::make();
+        }
 
-         auto infos = try_decode_extent_infos(
-           header,
-           mdbuf);
-         if (!infos) {
-           // This should be impossible, we did check the crc on the mdbuf
-           logger().error(
-             "ExtentReader::scan_extents unable to decode extents for record {}",
-             base);
-           assert(infos);
-         }
-
-         paddr_t extent_offset = base.add_offset(header.mdlength);
-         for (const auto &i : *infos) {
-           extents->emplace_back(extent_offset, i);
-           auto& seg_addr = extent_offset.as_seg_paddr();
-           seg_addr.set_segment_off(
-             seg_addr.get_segment_off() + i.len);
-         }
-         return scan_extents_ertr::now();
-       }),
+        paddr_t extent_offset = base.add_offset(header.mdlength);
+        logger().debug("ExtentReader::scan_extents: decoded {} extents",
+                       maybe_record_extent_infos->size());
+        for (const auto &i : *maybe_record_extent_infos) {
+          extents->emplace_back(extent_offset, i);
+          auto& seg_addr = extent_offset.as_seg_paddr();
+          seg_addr.set_segment_off(
+            seg_addr.get_segment_off() + i.len);
+        }
+        return scan_extents_ertr::now();
+      }),
       [=, &cursor](auto &dhandler) {
-       return scan_valid_records(
-         cursor,
-         segment_nonce,
-         bytes_to_read,
-         dhandler).discard_result();
-      });
+        return scan_valid_records(
+          cursor,
+          segment_nonce,
+          bytes_to_read,
+          dhandler
+        ).discard_result();
+      }
+    );
   }).safe_then([ret=std::move(ret)] {
     return std::move(*ret);
   });
@@ -140,7 +141,8 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
              cursor.last_valid_header_found = true;
              return scan_valid_records_ertr::now();
            } else {
-             auto new_committed_to = md->first.committed_to;
+             auto& [header, md_bl] = *md;
+             auto new_committed_to = header.committed_to;
              logger().debug(
                "ExtentReader::scan_valid_records: valid record read at {}, now committed at {}",
                cursor.seq,
@@ -150,9 +152,9 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
              cursor.last_committed = new_committed_to;
              cursor.pending_records.emplace_back(
                cursor.seq.offset,
-               md->first,
-               md->second);
-             cursor.increment(md->first.dlength + md->first.mdlength);
+               header,
+               std::move(md_bl));
+             cursor.increment(header.dlength + header.mdlength);
              ceph_assert(new_committed_to == journal_seq_t() ||
                          new_committed_to < cursor.seq);
              return scan_valid_records_ertr::now();
@@ -178,14 +180,8 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
                  return scan_valid_records_ertr::make_ready_future<
                    seastar::stop_iteration>(seastar::stop_iteration::yes);
                }
-               budget_used +=
-                 next.header.dlength + next.header.mdlength;
-               return handler(
-                 next.offset,
-                 next.header,
-                 next.mdbuffer
-               ).safe_then([&cursor] {
-                 cursor.pending_records.pop_front();
+               return consume_next_records(cursor, handler, budget_used
+               ).safe_then([] {
                  return scan_valid_records_ertr::make_ready_future<
                    seastar::stop_iteration>(seastar::stop_iteration::no);
                });
@@ -195,21 +191,12 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
          assert(!cursor.pending_records.empty());
          auto &next = cursor.pending_records.front();
          return read_validate_data(next.offset, next.header
-         ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
+         ).safe_then([this, &budget_used, &cursor, &handler](auto valid) {
            if (!valid) {
              cursor.pending_records.clear();
              return scan_valid_records_ertr::now();
            }
-           budget_used +=
-             next.header.dlength + next.header.mdlength;
-           return handler(
-             next.offset,
-             next.header,
-             next.mdbuffer
-           ).safe_then([&cursor] {
-             cursor.pending_records.pop_front();
-             return scan_valid_records_ertr::now();
-           });
+            return consume_next_records(cursor, handler, budget_used);
          });
        }
       }().safe_then([=, &budget_used, &cursor] {
@@ -244,61 +231,63 @@ ExtentReader::read_validate_record_metadata(
   logger().debug("read_validate_record_metadata: reading header block {}...",
                  start);
   return segment_manager.read(start, block_size
-  ).safe_then(
-    [=, &segment_manager](bufferptr bptr) mutable
-    -> read_validate_record_metadata_ret {
-      auto block_size = segment_manager.get_block_size();
-      bufferlist bl;
-      bl.append(bptr);
-      auto bp = bl.cbegin();
-      record_header_t header;
-      try {
-       decode(header, bp);
-      } catch (ceph::buffer::error &e) {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::nullopt);
-      }
-      if (header.segment_nonce != nonce) {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::nullopt);
-      }
-      auto& seg_addr = start.as_seg_paddr();
-      if (header.mdlength > (extent_len_t)block_size) {
-       if (seg_addr.get_segment_off() + header.mdlength >
-           (int64_t)segment_manager.get_segment_size()) {
-         return crimson::ct_error::input_output_error::make();
-       }
-       return segment_manager.read(
-         paddr_t::make_seg_paddr(seg_addr.get_segment_id(),
-          seg_addr.get_segment_off() + (segment_off_t)block_size),
-         header.mdlength - block_size).safe_then(
-           [header=std::move(header), bl=std::move(bl)](
-             auto &&bptail) mutable {
-             bl.push_back(bptail);
-             return read_validate_record_metadata_ret(
-               read_validate_record_metadata_ertr::ready_future_marker{},
-               std::make_pair(std::move(header), std::move(bl)));
-           });
-      } else {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::make_pair(std::move(header), std::move(bl))
-       );
-      }
-    }).safe_then([=](auto p) {
-      if (p && validate_metadata(p->second)) {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::move(*p)
-       );
-      } else {
-       return read_validate_record_metadata_ret(
-         read_validate_record_metadata_ertr::ready_future_marker{},
-         std::nullopt);
-      }
+  ).safe_then([=, &segment_manager](bufferptr bptr) mutable
+              -> read_validate_record_metadata_ret {
+    auto block_size = static_cast<extent_len_t>(
+        segment_manager.get_block_size());
+    bufferlist bl;
+    bl.append(bptr);
+    auto bp = bl.cbegin();
+    record_header_t header;
+    try {
+      decode(header, bp);
+    } catch (ceph::buffer::error &e) {
+      return read_validate_record_metadata_ret(
+        read_validate_record_metadata_ertr::ready_future_marker{},
+        std::nullopt);
+    }
+    if (header.segment_nonce != nonce) {
+      return read_validate_record_metadata_ret(
+        read_validate_record_metadata_ertr::ready_future_marker{},
+        std::nullopt);
+    }
+    auto& seg_addr = start.as_seg_paddr();
+    if (seg_addr.get_segment_off() + header.mdlength >
+        (int64_t)segment_manager.get_segment_size()) {
+      logger().error("read_validate_record_metadata: failed, invalid header");
+      return crimson::ct_error::input_output_error::make();
+    }
+    if (header.mdlength == block_size) {
+      return read_validate_record_metadata_ret(
+        read_validate_record_metadata_ertr::ready_future_marker{},
+        std::make_pair(std::move(header), std::move(bl))
+      );
+    }
+    return segment_manager.read(
+      paddr_t::make_seg_paddr(
+        seg_addr.get_segment_id(),
+        seg_addr.get_segment_off() + (segment_off_t)block_size
+      ),
+      header.mdlength - block_size
+    ).safe_then([header=std::move(header), bl=std::move(bl)
+                ](auto&& bptail) mutable {
+      bl.push_back(bptail);
+      return read_validate_record_metadata_ret(
+        read_validate_record_metadata_ertr::ready_future_marker{},
+        std::make_pair(std::move(header), std::move(bl)));
     });
+  }).safe_then([this](auto p) {
+    if (p && validate_metadata(p->second)) {
+      return read_validate_record_metadata_ret(
+        read_validate_record_metadata_ertr::ready_future_marker{},
+        std::move(*p)
+      );
+    } else {
+      return read_validate_record_metadata_ret(
+        read_validate_record_metadata_ertr::ready_future_marker{},
+        std::nullopt);
+    }
+  });
 }
 
 std::optional<std::vector<extent_info_t>>
@@ -355,4 +344,22 @@ bool ExtentReader::validate_metadata(const bufferlist &bl)
   return test_crc == recorded_crc;
 }
 
+ExtentReader::consume_record_group_ertr::future<>
+ExtentReader::consume_next_records(
+  scan_valid_records_cursor& cursor,
+  found_record_handler_t& handler,
+  std::size_t& budget_used)
+{
+  auto& next = cursor.pending_records.front();
+  auto total_length = next.header.dlength + next.header.mdlength;
+  budget_used += total_length;
+  return handler(
+    next.offset,
+    next.header,
+    next.mdbuffer
+  ).safe_then([&cursor] {
+    cursor.pending_records.pop_front();
+  });
+}
+
 } // namespace crimson::os::seastore
index e0d66495af55959dd7910b74ef0496055bf08366..99c5895847dce4f1ee24af0cad7537bc37741eef 100644 (file)
@@ -112,6 +112,13 @@ private:
                                    ///  future resolution
   );
 
+  using consume_record_group_ertr = scan_valid_records_ertr;
+  consume_record_group_ertr::future<> consume_next_records(
+      scan_valid_records_cursor& cursor,
+      found_record_handler_t& handler,
+      std::size_t& budget_used);
+
+
   /// validate embedded metadata checksum
   static bool validate_metadata(const bufferlist &bl);
 
index 90bf37dcd1ee1bd6bf7d0b642945d04224365b46..c83cd4a494d133a5081862198ace92d9ee937452 100644 (file)
@@ -121,7 +121,7 @@ Journal::prep_replay_segments(
   } else {
     replay_from = paddr_t::make_seg_paddr(
       from->first,
-      (segment_off_t)journal_segment_manager.get_block_size());
+      journal_segment_manager.get_block_size());
   }
   auto ret = replay_segments_t(segments.end() - from);
   std::transform(
@@ -131,7 +131,8 @@ Journal::prep_replay_segments(
        p.second.journal_segment_seq,
        paddr_t::make_seg_paddr(
          p.first,
-         (segment_off_t)journal_segment_manager.get_block_size())};
+         journal_segment_manager.get_block_size())
+      };
       logger().debug(
        "Journal::prep_replay_segments: replaying from  {}",
        ret);
@@ -172,57 +173,61 @@ Journal::replay_segment(
   logger().debug("Journal::replay_segment: starting at {}", seq);
   return seastar::do_with(
     scan_valid_records_cursor(seq),
-    ExtentReader::found_record_handler_t(
-      [=, &handler](paddr_t base,
-                   const record_header_t &header,
-                   const bufferlist &mdbuf) {
-       auto deltas = try_decode_deltas(
-         header,
-         mdbuf);
-       if (!deltas) {
-         // This should be impossible, we did check the crc on the mdbuf
-         logger().error(
-           "Journal::replay_segment: unable to decode deltas for record {}",
-           base);
-         assert(deltas);
-       }
+    ExtentReader::found_record_handler_t([=, &handler](
+      paddr_t base,
+      const record_header_t& header,
+      const bufferlist& mdbuf)
+      -> ExtentReader::scan_valid_records_ertr::future<>
+    {
+      auto maybe_record_deltas_list = try_decode_deltas(
+        header, mdbuf);
+      if (!maybe_record_deltas_list) {
+        // This should be impossible, we did check the crc on the mdbuf
+        logger().error(
+          "Journal::replay_segment: unable to decode deltas for record {}",
+          base);
+        return crimson::ct_error::input_output_error::make();
+      }
 
-       return seastar::do_with(
-         std::move(*deltas),
-         [=](auto &deltas) {
-           return crimson::do_for_each(
-             deltas,
-             [=](auto &delta) {
-               /* The journal may validly contain deltas for extents in
-                * since released segments.  We can detect those cases by
-                * checking whether the segment in question currently has a
-                * sequence number > the current journal segment seq. We can
-                * safetly skip these deltas because the extent must already
-                * have been rewritten.
-                *
-                * Note, this comparison exploits the fact that
-                * SEGMENT_SEQ_NULL is a large number.
-                */
-               auto& seg_addr = delta.paddr.as_seg_paddr();
-               if (delta.paddr != P_ADDR_NULL &&
-                   (segment_provider->get_seq(seg_addr.get_segment_id()) >
-                    seq.segment_seq)) {
-                 return replay_ertr::now();
-               } else {
-                 auto offsets = submit_result_t{
-                   base.add_offset(header.mdlength),
-                   write_result_t{
-                     journal_seq_t{seq.segment_seq, base},
-                     static_cast<segment_off_t>(header.mdlength + header.dlength)
-                   }
-                 };
-                 return handler(
-                   offsets,
-                   delta);
-               }
-             });
-         });
-      }),
+      return seastar::do_with(
+        std::move(*maybe_record_deltas_list),
+        [=](auto &deltas)
+      {
+        logger().debug("Journal::replay_segment: decoded {} deltas at base {}",
+                       deltas.size(),
+                       base);
+        return crimson::do_for_each(
+          deltas,
+          [=](auto &delta)
+        {
+          /* The journal may validly contain deltas for extents in
+           * since released segments.  We can detect those cases by
+           * checking whether the segment in question currently has a
+           * sequence number > the current journal segment seq. We can
+           * safetly skip these deltas because the extent must already
+           * have been rewritten.
+           *
+           * Note, this comparison exploits the fact that
+           * SEGMENT_SEQ_NULL is a large number.
+           */
+          auto& seg_addr = delta.paddr.as_seg_paddr();
+          if (delta.paddr != P_ADDR_NULL &&
+              (segment_provider->get_seq(seg_addr.get_segment_id()) >
+               seq.segment_seq)) {
+            return replay_ertr::now();
+          } else {
+            auto offsets = submit_result_t{
+              base.add_offset(header.mdlength),
+              write_result_t{
+                journal_seq_t{seq.segment_seq, base},
+                static_cast<segment_off_t>(header.mdlength + header.dlength)
+              }
+            };
+            return handler(offsets, delta);
+          }
+        });
+      });
+    }),
     [=](auto &cursor, auto &dhandler) {
       return scanner.scan_valid_records(
        cursor,
@@ -234,8 +239,9 @@ Journal::replay_segment(
        crimson::ct_error::assert_all{
          "shouldn't meet with any other error other replay_ertr"
        }
-      );;
-    });
+      );
+    }
+  );
 }
 
 Journal::replay_ret Journal::replay(