]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/journal: generalize replay scan
authorSamuel Just <sjust@redhat.com>
Fri, 4 Sep 2020 07:46:19 +0000 (00:46 -0700)
committerSamuel Just <sjust@redhat.com>
Tue, 20 Oct 2020 19:27:08 +0000 (12:27 -0700)
We'll need to do at least two forms of scan:
- deltas for replay
- extents for gc

Most of the mechanics are common, however, so this patch
hoists the common machinery into scan_segemnt.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h

index 1489919b6c3bceb5099180615f846b5e8579b861..1b6578c93b471da48044c9a3db292d6e12b8e34c 100644 (file)
@@ -339,28 +339,87 @@ std::optional<std::vector<delta_info_t>> Journal::try_decode_deltas(
   return deltas;
 }
 
+std::optional<std::vector<extent_info_t>> Journal::try_decode_extent_infos(
+  record_header_t header,
+  bufferlist &bl)
+{
+  auto bliter = bl.cbegin();
+  bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+  logger().debug("{}: decoding {} extents", __func__, header.extents);
+  std::vector<extent_info_t> extent_infos(header.extents);
+  for (auto &&i : extent_infos) {
+    try {
+      ::decode(i, bliter);
+    } catch (ceph::buffer::error &e) {
+      return std::nullopt;
+    }
+  }
+  return extent_infos;
+}
+
 Journal::replay_ertr::future<>
 Journal::replay_segment(
   journal_seq_t seq,
-  delta_handler_t &delta_handler)
+  delta_handler_t &handler)
 {
   logger().debug("replay_segment: starting at {}", seq);
   return seastar::do_with(
-    paddr_t(seq.offset),
-    [=, &delta_handler](paddr_t &current) {
+    delta_scan_handler_t(
+      [&handler, seq](auto addr, auto base, const auto &delta) {
+       return handler(
+         journal_seq_t{seq.segment_seq, addr},
+         base,
+         delta);
+      }),
+    [this, seq](auto &dhandler) {
+      return scan_segment(
+       seq.offset,
+       segment_manager.get_segment_size(),
+       &dhandler,
+       nullptr).safe_then([](auto){});
+    });
+}
+
+Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
+{
+  return seastar::do_with(
+    std::move(delta_handler), std::vector<journal_seq_t>(),
+    [this](auto &handler, auto &segments) mutable -> replay_ret {
+      return find_replay_segments().safe_then(
+        [this, &handler, &segments](auto replay_segs) mutable {
+          logger().debug("replay: found {} segments", replay_segs.size());
+          segments = std::move(replay_segs);
+          return crimson::do_for_each(segments, [this, &handler](auto i) mutable {
+            return replay_segment(i, handler);
+          });
+        });
+    });
+}
+
+Journal::scan_segment_ret Journal::scan_segment(
+  paddr_t addr,
+  extent_len_t bytes_to_read,
+  delta_scan_handler_t *delta_handler,
+  extent_handler_t *extent_info_handler)
+{
+  logger().debug("Journal::scan_segment: starting at {}", addr);
+  return seastar::do_with(
+    addr,
+    [=](paddr_t &current) {
       return crimson::do_until(
-       [=, &current, &delta_handler]() -> replay_ertr::future<bool> {
+       [=, &current]() -> scan_segment_ertr::future<bool> {
          return read_record_metadata(current).safe_then
-           ([=, &current, &delta_handler](auto p)
-            -> replay_ertr::future<bool> {
+           ([=, &current](auto p)
+            -> scan_segment_ertr::future<bool> {
              if (!p.has_value()) {
-               return replay_ertr::make_ready_future<bool>(true);
+               current = P_ADDR_NULL;
+               return scan_segment_ertr::make_ready_future<bool>(true);
              }
 
              auto &[header, bl] = *p;
 
              logger().debug(
-               "replay_segment: next record offset {} mdlength {} dlength {}",
+               "Journal::scan_segment: next record offset {} mdlength {} dlength {}",
                current,
                header.mdlength,
                header.dlength);
@@ -368,48 +427,87 @@ Journal::replay_segment(
              auto record_start = current;
              current.offset += header.mdlength + header.dlength;
 
-             auto deltas = try_decode_deltas(
-               header,
-               bl);
-             if (!deltas) {
-               return replay_ertr::make_ready_future<bool>(true);
-             }
-
              return seastar::do_with(
-               std::move(*deltas),
-               [=, &delta_handler](auto &deltas) {
-                 return crimson::do_for_each(
-                   deltas,
-                   [=, &delta_handler](auto &info) {
-                     return delta_handler(
-                       journal_seq_t{
-                         seq.segment_seq,
-                         record_start},
-                       record_start.add_offset(block_size),
-                       info);
-                   });
-               }).safe_then([] {
-                 return replay_ertr::make_ready_future<bool>(false);
+               header,
+               bl,
+               [=, &current](auto &header, auto &bl) {
+                 return scan_segment_ertr::now(
+                 ).safe_then(
+                   [=, &current, &header, &bl]()
+                   -> scan_segment_ertr::future<> {
+                   if (!delta_handler) {
+                     return scan_segment_ertr::now();
+                   }
+
+                   auto deltas = try_decode_deltas(
+                     header,
+                     bl);
+                   if (!deltas) {
+                     logger().error(
+                       "Journal::scan_segment unable to decode deltas for record {}",
+                       addr);
+                     return crimson::ct_error::input_output_error::make();
+                   }
+
+                   return seastar::do_with(
+                     std::move(*deltas),
+                     [=](auto &deltas) {
+                       return crimson::do_for_each(
+                         deltas,
+                         [=](auto &info) {
+                           return (*delta_handler)(
+                             record_start,
+                             record_start.add_offset(header.mdlength),
+                             info);
+                         });
+                     });
+                 }).safe_then(
+                   [=, &header, &bl]() -> scan_segment_ertr::future<> {
+                   if (!extent_info_handler) {
+                     return scan_segment_ertr::now();
+                   }
+
+                   auto infos = try_decode_extent_infos(
+                     header,
+                     bl);
+                   if (!infos) {
+                     logger().error(
+                       "Journal::scan_segment unable to decode extent infos for record {}",
+                       addr);
+                     return crimson::ct_error::input_output_error::make();
+                   }
+
+                   return seastar::do_with(
+                     segment_off_t(0),
+                     std::move(*infos),
+                     [=](auto &pos, auto &deltas) {
+                       return crimson::do_for_each(
+                         deltas,
+                         [=, &pos](auto &info) {
+                           auto addr = record_start
+                             .add_offset(header.mdlength)
+                             .add_offset(pos);
+                           pos += info.len;
+                           return (*extent_info_handler)(
+                             addr,
+                             info);
+                         });
+                     });
+                   return scan_segment_ertr::now();
+                 });
+               }).safe_then([=, &current] {
+                 if ((segment_off_t)(addr.offset + bytes_to_read)
+                     <= current.offset) {
+                   return scan_segment_ertr::make_ready_future<bool>(true);
+                 } else {
+                   return scan_segment_ertr::make_ready_future<bool>(false);
+                 }
                });
            });
+       }).safe_then([this, &current] {
+         return scan_segment_ertr::make_ready_future<paddr_t>(current);
        });
     });
 }
 
-Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
-{
-  return seastar::do_with(
-    std::move(delta_handler), std::vector<journal_seq_t>(),
-    [this](auto&& handler, auto&& segments) mutable -> replay_ret {
-      return find_replay_segments().safe_then(
-        [this, &handler, &segments](auto replay_segs) {
-          logger().debug("replay: found {} segments", replay_segs.size());
-          segments = std::move(replay_segs);
-          return crimson::do_for_each(segments, [this, &handler](auto i) {
-            return replay_segment(i, handler);
-          });
-        });
-    });
-}
-
 }
index f1bdaff3a2b0eeba12d82c90fe85bd50c63ea2c0..bedfa3ab5638db92714d50c1afc642bff77575af 100644 (file)
@@ -273,12 +273,42 @@ private:
     record_header_t header,
     bufferlist &bl);
 
+  /// attempts to decode extent infos from bl, return nullopt if unsuccessful
+  std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
+    record_header_t header,
+    bufferlist &bl);
+
+  /**
+   * scan_segment
+   *
+   * Scans bytes_to_read forward from addr to the first record after
+   * addr+bytes_to_read invoking delta_handler and extent_info_handler
+   * on deltas and extent_infos respectively.  deltas, extent_infos
+   * will only be decoded if the corresponding handler is included.
+   */
+  using scan_segment_ertr = SegmentManager::read_ertr;
+  using scan_segment_ret = scan_segment_ertr::future<paddr_t>;
+  using delta_scan_handler_t = std::function<
+    replay_ret(paddr_t record_start,
+              paddr_t record_block_base,
+              const delta_info_t&)>;
+  using extent_handler_t = std::function<
+    scan_segment_ertr::future<>(paddr_t addr,
+                               const extent_info_t &info)>;
+  scan_segment_ret scan_segment(
+    paddr_t addr,
+    extent_len_t bytes_to_read,
+    delta_scan_handler_t *delta_handler,
+    extent_handler_t *extent_info_handler
+  );
+
   /// replays records starting at start through end of segment
   replay_ertr::future<>
   replay_segment(
-    journal_seq_t start,           ///< [in] starting addr, seq
-    delta_handler_t &delta_handler ///< [in] processes deltas in order
+    journal_seq_t start,             ///< [in] starting addr, seq
+    delta_handler_t &delta_handler   ///< [in] processes deltas in order
   );
+
 };
 
 }