]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: introduce RecordScanner to generalize scan_valid_records()
authormyoungwon oh <ohmyoungwon@gmail.com>
Thu, 20 Jul 2023 02:50:48 +0000 (02:50 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 11 Oct 2023 11:49:52 +0000 (11:49 +0000)
Signed-off-by: Myoungwon Oh <myoungwon.oh@samsung.com>
(cherry picked from commit 22c747826b398cc0a0d68aab9bba1fd096a2de9d)

src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/record_scanner.cc [new file with mode: 0644]
src/crimson/os/seastore/record_scanner.h [new file with mode: 0644]
src/crimson/os/seastore/segment_manager_group.cc
src/crimson/os/seastore/segment_manager_group.h

index 5b1c6187ca2a2061343cc41e4ef83ec0a68e409b..5994a17783135f96a29744ca450fa2e2e97e43f0 100644 (file)
@@ -51,6 +51,7 @@ set(crimson_seastore_srcs
   journal.cc
   device.cc
   segment_manager_group.cc
+  record_scanner.cc
   journal/circular_bounded_journal.cc
   ../../../test/crimson/seastore/test_block.cc
   ${PROJECT_SOURCE_DIR}/src/os/Transaction.cc
diff --git a/src/crimson/os/seastore/record_scanner.cc b/src/crimson/os/seastore/record_scanner.cc
new file mode 100644 (file)
index 0000000..f3ed54d
--- /dev/null
@@ -0,0 +1,142 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "crimson/os/seastore/record_scanner.h"
+
+#include "crimson/os/seastore/logging.h"
+
+SET_SUBSYS(seastore_journal);
+
+namespace crimson::os::seastore {
+
+RecordScanner::scan_valid_records_ret
+RecordScanner::scan_valid_records(
+  scan_valid_records_cursor &cursor,
+  segment_nonce_t nonce,
+  size_t budget,
+  found_record_handler_t &handler)
+{
+  LOG_PREFIX(RecordScanner::scan_valid_records);
+  initialize_cursor(cursor);
+  DEBUG("starting at {}, budget={}", cursor, budget);
+  auto retref = std::make_unique<size_t>(0);
+  auto &budget_used = *retref;
+  return crimson::repeat(
+    [=, &cursor, &budget_used, &handler, this]() mutable
+    -> scan_valid_records_ertr::future<seastar::stop_iteration> {
+      return [=, &handler, &cursor, &budget_used, this] {
+       if (!cursor.last_valid_header_found) {
+         return read_validate_record_metadata(cursor.seq.offset, nonce
+         ).safe_then([=, &cursor](auto md) {
+           if (!md) {
+             cursor.last_valid_header_found = true;
+             if (cursor.is_complete()) {
+               INFO("complete at {}, invalid record group metadata",
+                     cursor);
+             } else {
+               DEBUG("found invalid record group metadata at {}, "
+                     "processing {} pending record groups",
+                     cursor.seq,
+                     cursor.pending_record_groups.size());
+             }
+             return scan_valid_records_ertr::now();
+           } else {
+             auto& [header, md_bl] = *md;
+             DEBUG("found valid {} at {}", header, cursor.seq);
+             cursor.emplace_record_group(header, std::move(md_bl));
+             return scan_valid_records_ertr::now();
+           }
+         }).safe_then([=, &cursor, &budget_used, &handler, this] {
+           DEBUG("processing committed record groups until {}, {} pending",
+                 cursor.last_committed,
+                 cursor.pending_record_groups.size());
+           return crimson::repeat(
+             [=, &budget_used, &cursor, &handler, this] {
+               if (cursor.pending_record_groups.empty()) {
+                 /* This is only possible if the segment is empty.
+                  * A record's last_commited must be prior to its own
+                  * location since it itself cannot yet have been committed
+                  * at its own time of submission.  Thus, the most recently
+                  * read record must always fall after cursor.last_committed */
+                 return scan_valid_records_ertr::make_ready_future<
+                   seastar::stop_iteration>(seastar::stop_iteration::yes);
+               }
+               auto &next = cursor.pending_record_groups.front();
+               journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset};
+               if (cursor.last_committed == JOURNAL_SEQ_NULL ||
+                   next_seq > cursor.last_committed) {
+                 return scan_valid_records_ertr::make_ready_future<
+                   seastar::stop_iteration>(seastar::stop_iteration::yes);
+               }
+               return consume_next_records(cursor, handler, budget_used
+               ).safe_then([] {
+                 return scan_valid_records_ertr::make_ready_future<
+                   seastar::stop_iteration>(seastar::stop_iteration::no);
+               });
+             });
+         });
+       } else {
+         assert(!cursor.pending_record_groups.empty());
+         auto &next = cursor.pending_record_groups.front();
+         return read_validate_data(next.offset, next.header
+         ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) {
+           if (!valid) {
+             INFO("complete at {}, invalid record group data at {}, {}",
+                  cursor, next.offset, next.header);
+             cursor.pending_record_groups.clear();
+             return scan_valid_records_ertr::now();
+           }
+            return consume_next_records(cursor, handler, budget_used);
+         });
+       }
+      }().safe_then([=, &budget_used, &cursor] {
+       if (cursor.is_complete() || budget_used >= budget) {
+         DEBUG("finish at {}, budget_used={}, budget={}",
+                cursor, budget_used, budget);
+         return seastar::stop_iteration::yes;
+       } else {
+         return seastar::stop_iteration::no;
+       }
+      });
+    }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
+      return scan_valid_records_ret(
+       scan_valid_records_ertr::ready_future_marker{},
+       std::move(*retref));
+    });
+}
+
+RecordScanner::consume_record_group_ertr::future<>
+RecordScanner::consume_next_records(
+  scan_valid_records_cursor& cursor,
+  found_record_handler_t& handler,
+  std::size_t& budget_used)
+{
+  LOG_PREFIX(RecordScanner::consume_next_records);
+  auto& next = cursor.pending_record_groups.front();
+  auto total_length = next.header.dlength + next.header.mdlength;
+  budget_used += total_length;
+  auto locator = record_locator_t{
+    next.offset.add_offset(next.header.mdlength),
+    write_result_t{
+      journal_seq_t{
+        cursor.seq.segment_seq,
+        next.offset
+      },
+      total_length
+    }
+  };
+  DEBUG("processing {} at {}, budget_used={}",
+        next.header, locator, budget_used);
+  return handler(
+    locator,
+    next.header,
+    next.mdbuffer
+  ).safe_then([FNAME, &cursor] {
+    cursor.pop_record_group();
+    if (cursor.is_complete()) {
+      INFO("complete at {}, no more record group", cursor);
+    }
+  });
+}
+
+}
diff --git a/src/crimson/os/seastore/record_scanner.h b/src/crimson/os/seastore/record_scanner.h
new file mode 100644 (file)
index 0000000..c8486f5
--- /dev/null
@@ -0,0 +1,65 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+
+namespace crimson::os::seastore {
+
+class RecordScanner {
+public:
+  using read_ertr = SegmentManager::read_ertr;
+  using scan_valid_records_ertr = read_ertr;
+  using scan_valid_records_ret = scan_valid_records_ertr::future<
+    size_t>;
+  using found_record_handler_t = std::function<
+    scan_valid_records_ertr::future<>(
+      record_locator_t record_locator,
+      // callee may assume header and bl will remain valid until
+      // returned future resolves
+      const record_group_header_t &header,
+      const bufferlist &mdbuf)>;
+  scan_valid_records_ret scan_valid_records(
+    scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
+    segment_nonce_t nonce,             ///< [in] nonce for segment
+    size_t budget,                     ///< [in] max budget to use
+    found_record_handler_t &handler    ///< [in] handler for records
+  ); ///< @return used budget
+
+protected:
+  /// read record metadata for record starting at start
+  using read_validate_record_metadata_ertr = read_ertr;
+  using read_validate_record_metadata_ret =
+    read_validate_record_metadata_ertr::future<
+      std::optional<std::pair<record_group_header_t, bufferlist>>
+    >;
+  virtual read_validate_record_metadata_ret read_validate_record_metadata(
+    paddr_t start,
+    segment_nonce_t nonce) = 0;
+
+  /// read and validate data
+  using read_validate_data_ertr = read_ertr;
+  using read_validate_data_ret = read_validate_data_ertr::future<bool>;
+  virtual read_validate_data_ret read_validate_data(
+    paddr_t record_base,
+    const record_group_header_t &header  ///< caller must ensure lifetime through
+                                         ///  future resolution
+  ) = 0;
+
+  using consume_record_group_ertr = scan_valid_records_ertr;
+  consume_record_group_ertr::future<> consume_next_records(
+      scan_valid_records_cursor& cursor,
+      found_record_handler_t& handler,
+      std::size_t& budget_used);
+
+  virtual void initialize_cursor(scan_valid_records_cursor &cursor) = 0;
+
+  virtual ~RecordScanner() {}
+
+};
+
+}
index e78e299e71bc410d6332901ad922679798d09cff..6fe56501c8a0aad8bb43df84a4495f240705ceae 100644 (file)
@@ -91,14 +91,10 @@ SegmentManagerGroup::read_segment_header(segment_id_t segment)
   });
 }
 
-SegmentManagerGroup::scan_valid_records_ret
-SegmentManagerGroup::scan_valid_records(
-  scan_valid_records_cursor &cursor,
-  segment_nonce_t nonce,
-  size_t budget,
-  found_record_handler_t &handler)
+void SegmentManagerGroup::initialize_cursor(
+  scan_valid_records_cursor &cursor)
 {
-  LOG_PREFIX(SegmentManagerGroup::scan_valid_records);
+  LOG_PREFIX(SegmentManagerGroup::initialize_cursor);
   assert(has_device(cursor.get_segment_id().device_id()));
   auto& segment_manager =
     *segment_managers[cursor.get_segment_id().device_id()];
@@ -106,91 +102,6 @@ SegmentManagerGroup::scan_valid_records(
     INFO("start to scan segment {}", cursor.get_segment_id());
     cursor.increment_seq(segment_manager.get_block_size());
   }
-  DEBUG("starting at {}, budget={}", cursor, budget);
-  auto retref = std::make_unique<size_t>(0);
-  auto &budget_used = *retref;
-  return crimson::repeat(
-    [=, &cursor, &budget_used, &handler, this]() mutable
-    -> scan_valid_records_ertr::future<seastar::stop_iteration> {
-      return [=, &handler, &cursor, &budget_used, this] {
-       if (!cursor.last_valid_header_found) {
-         return read_validate_record_metadata(cursor.seq.offset, nonce
-         ).safe_then([=, &cursor](auto md) {
-           if (!md) {
-             cursor.last_valid_header_found = true;
-             if (cursor.is_complete()) {
-               INFO("complete at {}, invalid record group metadata",
-                     cursor);
-             } else {
-               DEBUG("found invalid record group metadata at {}, "
-                     "processing {} pending record groups",
-                     cursor.seq,
-                     cursor.pending_record_groups.size());
-             }
-             return scan_valid_records_ertr::now();
-           } else {
-             auto& [header, md_bl] = *md;
-             DEBUG("found valid {} at {}", header, cursor.seq);
-             cursor.emplace_record_group(header, std::move(md_bl));
-             return scan_valid_records_ertr::now();
-           }
-         }).safe_then([=, &cursor, &budget_used, &handler, this] {
-           DEBUG("processing committed record groups until {}, {} pending",
-                 cursor.last_committed,
-                 cursor.pending_record_groups.size());
-           return crimson::repeat(
-             [=, &budget_used, &cursor, &handler, this] {
-               if (cursor.pending_record_groups.empty()) {
-                 /* This is only possible if the segment is empty.
-                  * A record's last_commited must be prior to its own
-                  * location since it itself cannot yet have been committed
-                  * at its own time of submission.  Thus, the most recently
-                  * read record must always fall after cursor.last_committed */
-                 return scan_valid_records_ertr::make_ready_future<
-                   seastar::stop_iteration>(seastar::stop_iteration::yes);
-               }
-               auto &next = cursor.pending_record_groups.front();
-               journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset};
-               if (cursor.last_committed == JOURNAL_SEQ_NULL ||
-                   next_seq > cursor.last_committed) {
-                 return scan_valid_records_ertr::make_ready_future<
-                   seastar::stop_iteration>(seastar::stop_iteration::yes);
-               }
-               return consume_next_records(cursor, handler, budget_used
-               ).safe_then([] {
-                 return scan_valid_records_ertr::make_ready_future<
-                   seastar::stop_iteration>(seastar::stop_iteration::no);
-               });
-             });
-         });
-       } else {
-         assert(!cursor.pending_record_groups.empty());
-         auto &next = cursor.pending_record_groups.front();
-         return read_validate_data(next.offset, next.header
-         ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) {
-           if (!valid) {
-             INFO("complete at {}, invalid record group data at {}, {}",
-                  cursor, next.offset, next.header);
-             cursor.pending_record_groups.clear();
-             return scan_valid_records_ertr::now();
-           }
-            return consume_next_records(cursor, handler, budget_used);
-         });
-       }
-      }().safe_then([=, &budget_used, &cursor] {
-       if (cursor.is_complete() || budget_used >= budget) {
-         DEBUG("finish at {}, budget_used={}, budget={}",
-                cursor, budget_used, budget);
-         return seastar::stop_iteration::yes;
-       } else {
-         return seastar::stop_iteration::no;
-       }
-      });
-    }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
-      return scan_valid_records_ret(
-       scan_valid_records_ertr::ready_future_marker{},
-       std::move(*retref));
-    });
 }
 
 SegmentManagerGroup::read_validate_record_metadata_ret
@@ -289,40 +200,6 @@ SegmentManagerGroup::read_validate_data(
   });
 }
 
-SegmentManagerGroup::consume_record_group_ertr::future<>
-SegmentManagerGroup::consume_next_records(
-  scan_valid_records_cursor& cursor,
-  found_record_handler_t& handler,
-  std::size_t& budget_used)
-{
-  LOG_PREFIX(SegmentManagerGroup::consume_next_records);
-  auto& next = cursor.pending_record_groups.front();
-  auto total_length = next.header.dlength + next.header.mdlength;
-  budget_used += total_length;
-  auto locator = record_locator_t{
-    next.offset.add_offset(next.header.mdlength),
-    write_result_t{
-      journal_seq_t{
-        cursor.seq.segment_seq,
-        next.offset
-      },
-      total_length
-    }
-  };
-  DEBUG("processing {} at {}, budget_used={}",
-        next.header, locator, budget_used);
-  return handler(
-    locator,
-    next.header,
-    next.mdbuffer
-  ).safe_then([FNAME, &cursor] {
-    cursor.pop_record_group();
-    if (cursor.is_complete()) {
-      INFO("complete at {}, no more record group", cursor);
-    }
-  });
-}
-
 SegmentManagerGroup::find_journal_segment_headers_ret
 SegmentManagerGroup::find_journal_segment_headers()
 {
index bd5af9601a5673657fcbc7d84f10f72d751868a9..826ab61a7e7a51ef9363b8e496fa601abf31b027 100644 (file)
@@ -8,10 +8,11 @@
 #include "crimson/common/errorator.h"
 #include "crimson/os/seastore/seastore_types.h"
 #include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/record_scanner.h"
 
 namespace crimson::os::seastore {
 
-class SegmentManagerGroup {
+class SegmentManagerGroup : public RecordScanner {
 public:
   SegmentManagerGroup() {
     segment_managers.resize(DEVICE_ID_MAX, nullptr);
@@ -96,24 +97,6 @@ public:
     segment_tail_t>;
   read_segment_tail_ret  read_segment_tail(segment_id_t segment);
 
-  using read_ertr = SegmentManager::read_ertr;
-  using scan_valid_records_ertr = read_ertr;
-  using scan_valid_records_ret = scan_valid_records_ertr::future<
-    size_t>;
-  using found_record_handler_t = std::function<
-    scan_valid_records_ertr::future<>(
-      record_locator_t record_locator,
-      // callee may assume header and bl will remain valid until
-      // returned future resolves
-      const record_group_header_t &header,
-      const bufferlist &mdbuf)>;
-  scan_valid_records_ret scan_valid_records(
-    scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
-    segment_nonce_t nonce,             ///< [in] nonce for segment
-    size_t budget,                     ///< [in] max budget to use
-    found_record_handler_t &handler    ///< [in] handler for records
-  ); ///< @return used budget
-
   /*
    * read journal segment headers
    */
@@ -143,30 +126,17 @@ private:
     return device_ids.count(id) >= 1;
   }
 
-  /// read record metadata for record starting at start
-  using read_validate_record_metadata_ertr = read_ertr;
-  using read_validate_record_metadata_ret =
-    read_validate_record_metadata_ertr::future<
-      std::optional<std::pair<record_group_header_t, bufferlist>>
-    >;
+  void initialize_cursor(scan_valid_records_cursor &cursor) final;
+
   read_validate_record_metadata_ret read_validate_record_metadata(
     paddr_t start,
-    segment_nonce_t nonce);
+    segment_nonce_t nonce) final;
 
-  /// read and validate data
-  using read_validate_data_ertr = read_ertr;
-  using read_validate_data_ret = read_validate_data_ertr::future<bool>;
   read_validate_data_ret read_validate_data(
     paddr_t record_base,
     const record_group_header_t &header  ///< caller must ensure lifetime through
                                          ///  future resolution
-  );
-
-  using consume_record_group_ertr = scan_valid_records_ertr;
-  consume_record_group_ertr::future<> consume_next_records(
-      scan_valid_records_cursor& cursor,
-      found_record_handler_t& handler,
-      std::size_t& budget_used);
+  ) final;
 
   std::vector<SegmentManager*> segment_managers;
   std::set<device_id_t> device_ids;