]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/journal: add scan_valid_records
authorSamuel Just <sjust@redhat.com>
Sat, 21 Nov 2020 03:09:25 +0000 (19:09 -0800)
committerSamuel Just <sjust@redhat.com>
Sat, 12 Dec 2020 02:18:16 +0000 (18:18 -0800)
scan_valid_records validates record checksums and uses last_comitted
to avoid checking data crcs except on the final few records.  Will
replace scan_segment in a few patches.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h

index 71fc5eea7452dd6aaccae9d4d740144caf730b34..6dc9b2303851d98612f01bfc7c8a3e84e6a5620f 100644 (file)
@@ -160,6 +160,20 @@ bool Journal::validate_metadata(const bufferlist &bl)
   return test_crc == recorded_crc;
 }
 
+Journal::read_validate_data_ret Journal::read_validate_data(
+  paddr_t record_base,
+  const record_header_t &header)
+{
+  return segment_manager.read(
+    record_base.add_offset(header.mdlength),
+    header.dlength
+  ).safe_then([=, &header](auto bptr) {
+    bufferlist bl;
+    bl.append(bptr);
+    return bl.crc32c(-1) == header.data_crc;
+  });
+}
+
 Journal::write_record_ret Journal::write_record(
   record_size_t rsize,
   record_t &&record)
@@ -722,4 +736,108 @@ Journal::scan_segment_ret Journal::scan_segment(
     });
 }
 
+Journal::scan_valid_records_ret Journal::scan_valid_records(
+    scan_valid_records_cursor &cursor,
+    segment_nonce_t nonce,
+    size_t budget,
+    found_record_handler_t &handler)
+{
+  if (cursor.offset.offset == 0) {
+    cursor.offset.offset = block_size;
+  }
+  auto retref = std::make_unique<size_t>(0);
+  auto budget_used = *retref;
+  return crimson::do_until(
+    [=, &cursor, &budget_used, &handler]() mutable
+    -> scan_valid_records_ertr::future<bool> {
+      return [=, &handler, &cursor, &budget_used] {
+       if (!cursor.last_valid_header_found) {
+         return read_validate_record_metadata(cursor.offset, nonce
+         ).safe_then([=, &cursor, &handler](auto md) {
+           logger().debug(
+             "Journal::scan_valid_records: read complete {}",
+             cursor.offset);
+           if (!md) {
+             logger().debug(
+               "Journal::scan_valid_records: found invalid header at {}, presumably at end",
+               cursor.offset);
+             cursor.last_valid_header_found = true;
+             return scan_valid_records_ertr::now();
+           } else {
+             logger().debug(
+               "Journal::scan_valid_records: valid record read at {}",
+               cursor.offset);
+             cursor.last_committed = paddr_t{
+               cursor.offset.segment,
+               md->first.committed_to};
+             cursor.pending_records.emplace_back(
+               cursor.offset,
+               md->first,
+               md->second);
+             cursor.offset.offset +=
+               md->first.dlength + md->first.mdlength;
+             return scan_valid_records_ertr::now();
+           }
+         }).safe_then([=, &cursor, &budget_used, &handler] {
+           return crimson::do_until(
+             [=, &budget_used, &cursor, &handler] {
+               logger().debug(
+                 "Journal::scan_valid_records: valid record read, processing queue");
+               if (cursor.pending_records.empty()) {
+                 /* This is only possible if the segment is empty.
+                  * A record's last_commited must be prior to its own
+                  * location since it itself cannot yet have been committed
+                  * at its own time of submission.  Thus, the most recently
+                  * read record must always fall after cursor.last_committed */
+                 return scan_valid_records_ertr::make_ready_future<bool>(true);
+               }
+               auto &next = cursor.pending_records.front();
+               if (next.offset > cursor.last_committed) {
+                 return scan_valid_records_ertr::make_ready_future<bool>(true);
+               }
+               budget_used +=
+                 next.header.dlength + next.header.mdlength;
+               return handler(
+                 next.offset,
+                 next.header,
+                 next.mdbuffer
+               ).safe_then([&cursor] {
+                 cursor.pending_records.pop_front();
+                 return scan_valid_records_ertr::make_ready_future<bool>(false);
+               });
+             });
+         });
+       } else {
+         assert(!cursor.pending_records.empty());
+         auto &next = cursor.pending_records.front();
+         return read_validate_data(next.offset, next.header
+         ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
+           if (!valid) {
+             cursor.pending_records.clear();
+             return scan_valid_records_ertr::now();
+           }
+           budget_used +=
+             next.header.dlength + next.header.mdlength;
+           return handler(
+             next.offset,
+             next.header,
+             next.mdbuffer
+           ).safe_then([&cursor] {
+             cursor.pending_records.pop_front();
+             return scan_valid_records_ertr::now();
+           });
+         });
+       }
+      }().safe_then([=, &budget_used, &cursor] {
+       return scan_valid_records_ertr::make_ready_future<bool>(
+         cursor.is_complete() || budget_used >= budget);
+      });
+    }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
+      return scan_valid_records_ret(
+       scan_valid_records_ertr::ready_future_marker{},
+       std::move(*retref));
+    });
+}
+
+
 }
index 383c49155c526d7166e683abe9c1e6cd0ec90b79..5caa380b131487213fc71edf6fc521f05d83fef8 100644 (file)
@@ -366,6 +366,56 @@ private:
     delta_scan_handler_t *delta_handler,
     extent_handler_t *extent_info_handler
   );
+public:
+  /// scan segment for end incrementally
+  struct scan_valid_records_cursor {
+    bool last_valid_header_found = false;
+    paddr_t offset;
+    paddr_t last_committed;
+
+    struct found_record_t {
+      paddr_t offset;
+      record_header_t header;
+      bufferlist mdbuffer;
+
+      found_record_t(
+       paddr_t offset,
+       const record_header_t &header,
+       const bufferlist &mdbuffer)
+       : offset(offset), header(header), mdbuffer(mdbuffer) {}
+    };
+    std::deque<found_record_t> pending_records;
+
+    bool is_complete() const {
+      return last_valid_header_found && pending_records.empty();
+    }
+
+    paddr_t get_offset() const {
+      return offset;
+    }
+
+    scan_valid_records_cursor(
+      paddr_t offset)
+      : offset(offset) {}
+  };
+private:
+
+  using scan_valid_records_ertr = SegmentManager::read_ertr;
+  using scan_valid_records_ret = scan_valid_records_ertr::future<
+    size_t>;
+  using found_record_handler_t = std::function<
+    scan_valid_records_ertr::future<>(
+      paddr_t record_block_base,
+      // callee may assume header and bl will remain valid until
+      // returned future resolves
+      const record_header_t &header,
+      const bufferlist &bl)>;
+  scan_valid_records_ret scan_valid_records(
+    scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
+    segment_nonce_t nonce,             ///< [in] nonce for segment
+    size_t budget,                     ///< [in] max budget to use
+    found_record_handler_t &handler    ///< [in] handler for records
+  ); ///< @return used budget
 
   /// replays records starting at start through end of segment
   replay_ertr::future<>