]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore: rename ExtentReader to SegmentManagerGroup
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 24 Mar 2022 06:31:03 +0000 (14:31 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 6 Apr 2022 02:39:14 +0000 (10:39 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
15 files changed:
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/extent_reader.cc [deleted file]
src/crimson/os/seastore/extent_reader.h [deleted file]
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/journal/segmented_journal.cc
src/crimson/os/seastore/journal/segmented_journal.h
src/crimson/os/seastore/segment_cleaner.cc
src/crimson/os/seastore/segment_cleaner.h
src/crimson/os/seastore/segment_manager_group.cc [new file with mode: 0644]
src/crimson/os/seastore/segment_manager_group.h [new file with mode: 0644]
src/crimson/os/seastore/transaction_manager.cc
src/crimson/os/seastore/transaction_manager.h
src/test/crimson/seastore/test_btree_lba_manager.cc
src/test/crimson/seastore/test_seastore_journal.cc

index a10d1c677d8db9e887c00a13a0a02a45109867e0..2e3ec4a5fe0366ffa204af6a9e4652d2fb2e19bd 100644 (file)
@@ -7,7 +7,6 @@ set(crimson_seastore_srcs
   transaction_manager.cc
   transaction.cc
   cache.cc
-  extent_reader.cc
   lba_manager.cc
   segment_cleaner.cc
   lba_manager/btree/btree_lba_manager.cc
@@ -40,6 +39,7 @@ set(crimson_seastore_srcs
   journal/segment_allocator.cc
   journal.cc
   device.cc
+  segment_manager_group.cc
   ../../../test/crimson/seastore/test_block.cc
   ${PROJECT_SOURCE_DIR}/src/os/Transaction.cc
        )
diff --git a/src/crimson/os/seastore/extent_reader.cc b/src/crimson/os/seastore/extent_reader.cc
deleted file mode 100644 (file)
index a2f3c10..0000000
+++ /dev/null
@@ -1,383 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 smarttab expandtab
-
-#include "crimson/os/seastore/extent_reader.h"
-
-#include "crimson/os/seastore/logging.h"
-
-SET_SUBSYS(seastore_journal);
-
-namespace crimson::os::seastore {
-
-ExtentReader::read_segment_tail_ret
-ExtentReader::read_segment_tail(segment_id_t segment)
-{
-  auto& segment_manager = *segment_managers[segment.device_id()];
-  return segment_manager.read(
-    paddr_t::make_seg_paddr(
-      segment,
-      segment_manager.get_segment_size() -
-        segment_manager.get_rounded_tail_length()),
-    segment_manager.get_rounded_tail_length()
-  ).handle_error(
-    read_segment_header_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in ExtentReader::read_segment_tail"
-    }
-  ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_tail_ret {
-    LOG_PREFIX(ExtentReader::read_segment_tail);
-    DEBUG("segment {} bptr size {}", segment, bptr.length());
-
-    segment_tail_t tail;
-    bufferlist bl;
-    bl.push_back(bptr);
-
-    DEBUG("segment {} block crc {}",
-          segment,
-          bl.begin().crc32c(segment_manager.get_block_size(), 0));
-
-    auto bp = bl.cbegin();
-    try {
-      decode(tail, bp);
-    } catch (ceph::buffer::error &e) {
-      DEBUG("segment {} unable to decode tail, skipping -- {}",
-            segment, e);
-      return crimson::ct_error::enodata::make();
-    }
-    DEBUG("segment {} tail {}", segment, tail);
-    return read_segment_tail_ret(
-      read_segment_tail_ertr::ready_future_marker{},
-      tail);
-  });
-}
-
-ExtentReader::read_segment_header_ret
-ExtentReader::read_segment_header(segment_id_t segment)
-{
-  auto& segment_manager = *segment_managers[segment.device_id()];
-  return segment_manager.read(
-    paddr_t::make_seg_paddr(segment, 0),
-    segment_manager.get_block_size()
-  ).handle_error(
-    read_segment_header_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in ExtentReader::read_segment_header"
-    }
-  ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_header_ret {
-    LOG_PREFIX(ExtentReader::read_segment_header);
-    DEBUG("segment {} bptr size {}", segment, bptr.length());
-
-    segment_header_t header;
-    bufferlist bl;
-    bl.push_back(bptr);
-
-    DEBUG("segment {} block crc {}",
-          segment,
-          bl.begin().crc32c(segment_manager.get_block_size(), 0));
-
-    auto bp = bl.cbegin();
-    try {
-      decode(header, bp);
-    } catch (ceph::buffer::error &e) {
-      DEBUG("segment {} unable to decode header, skipping -- {}",
-            segment, e);
-      return crimson::ct_error::enodata::make();
-    }
-    DEBUG("segment {} header {}", segment, header);
-    return read_segment_header_ret(
-      read_segment_header_ertr::ready_future_marker{},
-      header);
-  });
-}
-
-ExtentReader::scan_extents_ret ExtentReader::scan_extents(
-  scan_extents_cursor &cursor,
-  extent_len_t bytes_to_read)
-{
-  auto ret = std::make_unique<scan_extents_ret_bare>();
-  auto* extents = ret.get();
-  return read_segment_header(cursor.get_segment_id()
-  ).handle_error(
-    scan_extents_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in ExtentReader::scan_extents"
-    }
-  ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
-    auto segment_nonce = segment_header.segment_nonce;
-    return seastar::do_with(
-      found_record_handler_t([extents](
-        record_locator_t locator,
-        const record_group_header_t& header,
-        const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
-      {
-        LOG_PREFIX(ExtentReader::scan_extents);
-        DEBUG("decoding {} records", header.records);
-        auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf);
-        if (!maybe_record_extent_infos) {
-          // This should be impossible, we did check the crc on the mdbuf
-          ERROR("unable to decode extents for record {}",
-                locator.record_block_base);
-          return crimson::ct_error::input_output_error::make();
-        }
-
-        paddr_t extent_offset = locator.record_block_base;
-        for (auto& r: *maybe_record_extent_infos) {
-          DEBUG("decoded {} extents", r.extent_infos.size());
-          for (const auto &i : r.extent_infos) {
-            extents->emplace_back(
-              extent_offset,
-              std::pair<commit_info_t, extent_info_t>(
-                {r.header.commit_time,
-                r.header.commit_type},
-                i));
-            auto& seg_addr = extent_offset.as_seg_paddr();
-            seg_addr.set_segment_off(
-              seg_addr.get_segment_off() + i.len);
-          }
-        }
-        return scan_extents_ertr::now();
-      }),
-      [bytes_to_read, segment_nonce, &cursor, this](auto &dhandler) {
-        return scan_valid_records(
-          cursor,
-          segment_nonce,
-          bytes_to_read,
-          dhandler
-        ).discard_result();
-      }
-    );
-  }).safe_then([ret=std::move(ret)] {
-    return std::move(*ret);
-  });
-}
-
-ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
-  scan_valid_records_cursor &cursor,
-  segment_nonce_t nonce,
-  size_t budget,
-  found_record_handler_t &handler)
-{
-  LOG_PREFIX(ExtentReader::scan_valid_records);
-  auto& segment_manager =
-    *segment_managers[cursor.get_segment_id().device_id()];
-  if (cursor.get_segment_offset() == 0) {
-    INFO("start to scan segment {}", cursor.get_segment_id());
-    cursor.increment_seq(segment_manager.get_block_size());
-  }
-  DEBUG("starting at {}, budget={}", cursor, budget);
-  auto retref = std::make_unique<size_t>(0);
-  auto &budget_used = *retref;
-  return crimson::repeat(
-    [=, &cursor, &budget_used, &handler]() mutable
-    -> scan_valid_records_ertr::future<seastar::stop_iteration> {
-      return [=, &handler, &cursor, &budget_used] {
-       if (!cursor.last_valid_header_found) {
-         return read_validate_record_metadata(cursor.seq.offset, nonce
-         ).safe_then([=, &cursor](auto md) {
-           if (!md) {
-             cursor.last_valid_header_found = true;
-             if (cursor.is_complete()) {
-               INFO("complete at {}, invalid record group metadata",
-                     cursor);
-             } else {
-               DEBUG("found invalid record group metadata at {}, "
-                     "processing {} pending record groups",
-                     cursor.seq,
-                     cursor.pending_record_groups.size());
-             }
-             return scan_valid_records_ertr::now();
-           } else {
-             auto& [header, md_bl] = *md;
-             DEBUG("found valid {} at {}", header, cursor.seq);
-             cursor.emplace_record_group(header, std::move(md_bl));
-             return scan_valid_records_ertr::now();
-           }
-         }).safe_then([=, &cursor, &budget_used, &handler] {
-           DEBUG("processing committed record groups until {}, {} pending",
-                 cursor.last_committed,
-                 cursor.pending_record_groups.size());
-           return crimson::repeat(
-             [=, &budget_used, &cursor, &handler] {
-               if (cursor.pending_record_groups.empty()) {
-                 /* This is only possible if the segment is empty.
-                  * A record's last_commited must be prior to its own
-                  * location since it itself cannot yet have been committed
-                  * at its own time of submission.  Thus, the most recently
-                  * read record must always fall after cursor.last_committed */
-                 return scan_valid_records_ertr::make_ready_future<
-                   seastar::stop_iteration>(seastar::stop_iteration::yes);
-               }
-               auto &next = cursor.pending_record_groups.front();
-               journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset};
-               if (cursor.last_committed == JOURNAL_SEQ_NULL ||
-                   next_seq > cursor.last_committed) {
-                 return scan_valid_records_ertr::make_ready_future<
-                   seastar::stop_iteration>(seastar::stop_iteration::yes);
-               }
-               return consume_next_records(cursor, handler, budget_used
-               ).safe_then([] {
-                 return scan_valid_records_ertr::make_ready_future<
-                   seastar::stop_iteration>(seastar::stop_iteration::no);
-               });
-             });
-         });
-       } else {
-         assert(!cursor.pending_record_groups.empty());
-         auto &next = cursor.pending_record_groups.front();
-         return read_validate_data(next.offset, next.header
-         ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) {
-           if (!valid) {
-             INFO("complete at {}, invalid record group data at {}, {}",
-                  cursor, next.offset, next.header);
-             cursor.pending_record_groups.clear();
-             return scan_valid_records_ertr::now();
-           }
-            return consume_next_records(cursor, handler, budget_used);
-         });
-       }
-      }().safe_then([=, &budget_used, &cursor] {
-       if (cursor.is_complete() || budget_used >= budget) {
-         DEBUG("finish at {}, budget_used={}, budget={}",
-                cursor, budget_used, budget);
-         return seastar::stop_iteration::yes;
-       } else {
-         return seastar::stop_iteration::no;
-       }
-      });
-    }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
-      return scan_valid_records_ret(
-       scan_valid_records_ertr::ready_future_marker{},
-       std::move(*retref));
-    });
-}
-
-ExtentReader::read_validate_record_metadata_ret
-ExtentReader::read_validate_record_metadata(
-  paddr_t start,
-  segment_nonce_t nonce)
-{
-  LOG_PREFIX(ExtentReader::read_validate_record_metadata);
-  auto& seg_addr = start.as_seg_paddr();
-  auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()];
-  auto block_size = segment_manager.get_block_size();
-  auto segment_size = static_cast<int64_t>(segment_manager.get_segment_size());
-  if (seg_addr.get_segment_off() + block_size > segment_size) {
-    DEBUG("failed -- record group header block {}~4096 > segment_size {}", start, segment_size);
-    return read_validate_record_metadata_ret(
-      read_validate_record_metadata_ertr::ready_future_marker{},
-      std::nullopt);
-  }
-  TRACE("reading record group header block {}~4096", start);
-  return segment_manager.read(start, block_size
-  ).safe_then([=, &segment_manager](bufferptr bptr) mutable
-              -> read_validate_record_metadata_ret {
-    auto block_size = static_cast<extent_len_t>(
-        segment_manager.get_block_size());
-    bufferlist bl;
-    bl.append(bptr);
-    auto maybe_header = try_decode_records_header(bl, nonce);
-    if (!maybe_header.has_value()) {
-      return read_validate_record_metadata_ret(
-        read_validate_record_metadata_ertr::ready_future_marker{},
-        std::nullopt);
-    }
-    auto& seg_addr = start.as_seg_paddr();
-    auto& header = *maybe_header;
-    if (header.mdlength < block_size ||
-        header.mdlength % block_size != 0 ||
-        header.dlength % block_size != 0 ||
-        (header.committed_to != JOURNAL_SEQ_NULL &&
-         header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) ||
-        (seg_addr.get_segment_off() + header.mdlength + header.dlength > segment_size)) {
-      ERROR("failed, invalid record group header {}", start);
-      return crimson::ct_error::input_output_error::make();
-    }
-    if (header.mdlength == block_size) {
-      return read_validate_record_metadata_ret(
-        read_validate_record_metadata_ertr::ready_future_marker{},
-        std::make_pair(std::move(header), std::move(bl))
-      );
-    }
-
-    auto rest_start = paddr_t::make_seg_paddr(
-        seg_addr.get_segment_id(),
-        seg_addr.get_segment_off() + (seastore_off_t)block_size
-    );
-    auto rest_len = header.mdlength - block_size;
-    TRACE("reading record group header rest {}~{}", rest_start, rest_len);
-    return segment_manager.read(rest_start, rest_len
-    ).safe_then([header=std::move(header), bl=std::move(bl)
-                ](auto&& bptail) mutable {
-      bl.push_back(bptail);
-      return read_validate_record_metadata_ret(
-        read_validate_record_metadata_ertr::ready_future_marker{},
-        std::make_pair(std::move(header), std::move(bl)));
-    });
-  }).safe_then([](auto p) {
-    if (p && validate_records_metadata(p->second)) {
-      return read_validate_record_metadata_ret(
-        read_validate_record_metadata_ertr::ready_future_marker{},
-        std::move(*p)
-      );
-    } else {
-      return read_validate_record_metadata_ret(
-        read_validate_record_metadata_ertr::ready_future_marker{},
-        std::nullopt);
-    }
-  });
-}
-
-ExtentReader::read_validate_data_ret
-ExtentReader::read_validate_data(
-  paddr_t record_base,
-  const record_group_header_t &header)
-{
-  LOG_PREFIX(ExtentReader::read_validate_data);
-  auto& segment_manager = *segment_managers[record_base.get_device_id()];
-  auto data_addr = record_base.add_offset(header.mdlength);
-  TRACE("reading record group data blocks {}~{}", data_addr, header.dlength);
-  return segment_manager.read(
-    data_addr,
-    header.dlength
-  ).safe_then([=, &header](auto bptr) {
-    bufferlist bl;
-    bl.append(bptr);
-    return validate_records_data(header, bl);
-  });
-}
-
-ExtentReader::consume_record_group_ertr::future<>
-ExtentReader::consume_next_records(
-  scan_valid_records_cursor& cursor,
-  found_record_handler_t& handler,
-  std::size_t& budget_used)
-{
-  LOG_PREFIX(ExtentReader::consume_next_records);
-  auto& next = cursor.pending_record_groups.front();
-  auto total_length = next.header.dlength + next.header.mdlength;
-  budget_used += total_length;
-  auto locator = record_locator_t{
-    next.offset.add_offset(next.header.mdlength),
-    write_result_t{
-      journal_seq_t{
-        cursor.seq.segment_seq,
-        next.offset
-      },
-      static_cast<seastore_off_t>(total_length)
-    }
-  };
-  DEBUG("processing {} at {}, budget_used={}",
-        next.header, locator, budget_used);
-  return handler(
-    locator,
-    next.header,
-    next.mdbuffer
-  ).safe_then([FNAME, &cursor] {
-    cursor.pop_record_group();
-    if (cursor.is_complete()) {
-      INFO("complete at {}, no more record group", cursor);
-    }
-  });
-}
-
-} // namespace crimson::os::seastore
diff --git a/src/crimson/os/seastore/extent_reader.h b/src/crimson/os/seastore/extent_reader.h
deleted file mode 100644 (file)
index b32ae81..0000000
+++ /dev/null
@@ -1,127 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 smarttab expandtab
-
-#pragma once
-
-#include "crimson/common/errorator.h"
-#include "crimson/os/seastore/seastore_types.h"
-#include "crimson/os/seastore/segment_manager.h"
-#include "crimson/os/seastore/logging.h"
-
-namespace crimson::os::seastore {
-
-class SegmentCleaner;
-class TransactionManager;
-
-class ExtentReader {
-public:
-  std::vector<SegmentManager*>& get_segment_managers() {
-    return segment_managers;
-  }
-
-  using read_ertr = SegmentManager::read_ertr;
-  ExtentReader() {
-    segment_managers.resize(DEVICE_ID_MAX, nullptr);
-  }
-  using read_segment_header_ertr = crimson::errorator<
-    crimson::ct_error::enoent,
-    crimson::ct_error::enodata,
-    crimson::ct_error::input_output_error
-    >;
-  using read_segment_header_ret = read_segment_header_ertr::future<
-    segment_header_t>;
-  read_segment_header_ret read_segment_header(segment_id_t segment);
-
-  using read_segment_tail_ertr = read_segment_header_ertr;
-  using read_segment_tail_ret = read_segment_tail_ertr::future<
-    segment_tail_t>;
-  read_segment_tail_ret  read_segment_tail(segment_id_t segment);
-
-  struct commit_info_t {
-    mod_time_point_t commit_time;
-    record_commit_type_t commit_type;
-  };
-
-  /**
-   * scan_extents
-   *
-   * Scans records beginning at addr until the first record boundary after
-   * addr + bytes_to_read.
-   *
-   * Returns list<extent, extent_info>
-   * cursor.is_complete() will be true when no further extents exist in segment.
-   */
-  using scan_extents_cursor = scan_valid_records_cursor;
-  using scan_extents_ertr = read_ertr::extend<crimson::ct_error::enodata>;
-  using scan_extents_ret_bare =
-    std::list<std::pair<paddr_t, std::pair<commit_info_t, extent_info_t>>>;
-  using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
-  scan_extents_ret scan_extents(
-    scan_extents_cursor &cursor,
-    extent_len_t bytes_to_read
-  );
-
-  using scan_valid_records_ertr = read_ertr::extend<crimson::ct_error::enodata>;
-  using scan_valid_records_ret = scan_valid_records_ertr::future<
-    size_t>;
-  using found_record_handler_t = std::function<
-    scan_valid_records_ertr::future<>(
-      record_locator_t record_locator,
-      // callee may assume header and bl will remain valid until
-      // returned future resolves
-      const record_group_header_t &header,
-      const bufferlist &mdbuf)>;
-  scan_valid_records_ret scan_valid_records(
-    scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
-    segment_nonce_t nonce,             ///< [in] nonce for segment
-    size_t budget,                     ///< [in] max budget to use
-    found_record_handler_t &handler    ///< [in] handler for records
-  ); ///< @return used budget
-
-  using release_ertr = SegmentManager::release_ertr;
-  release_ertr::future<> release_segment(segment_id_t id) {
-    assert(segment_managers[id.device_id()] != nullptr);
-    return segment_managers[id.device_id()]->release(id);
-  }
-
-  void add_segment_manager(SegmentManager* segment_manager) {
-    ceph_assert(!segment_managers[segment_manager->get_device_id()]);
-    segment_managers[segment_manager->get_device_id()] = segment_manager;
-  }
-
-  void reset() {
-    segment_managers.clear();
-    segment_managers.resize(DEVICE_ID_MAX, nullptr);
-  }
-
-private:
-  std::vector<SegmentManager*> segment_managers;
-  /// read record metadata for record starting at start
-  using read_validate_record_metadata_ertr = read_ertr;
-  using read_validate_record_metadata_ret =
-    read_validate_record_metadata_ertr::future<
-      std::optional<std::pair<record_group_header_t, bufferlist>>
-    >;
-  read_validate_record_metadata_ret read_validate_record_metadata(
-    paddr_t start,
-    segment_nonce_t nonce);
-
-  /// read and validate data
-  using read_validate_data_ertr = read_ertr;
-  using read_validate_data_ret = read_validate_data_ertr::future<bool>;
-  read_validate_data_ret read_validate_data(
-    paddr_t record_base,
-    const record_group_header_t &header  ///< caller must ensure lifetime through
-                                         ///  future resolution
-  );
-
-  using consume_record_group_ertr = scan_valid_records_ertr;
-  consume_record_group_ertr::future<> consume_next_records(
-      scan_valid_records_cursor& cursor,
-      found_record_handler_t& handler,
-      std::size_t& budget_used);
-};
-
-using ExtentReaderRef = std::unique_ptr<ExtentReader>;
-
-} // namespace crimson::os::seastore
index 56e582390dc273f248bdd3ac350cf7c108de5991..2ed4d49739a72e582718a6a7296f7b4f3317f5f1 100644 (file)
@@ -8,10 +8,10 @@ namespace crimson::os::seastore::journal {
 
 JournalRef make_segmented(
   SegmentManager &sm,
-  ExtentReader &reader,
+  SegmentManagerGroup &sms,
   SegmentProvider &provider)
 {
-  return std::make_unique<SegmentedJournal>(sm, reader, provider);
+  return std::make_unique<SegmentedJournal>(sm, sms, provider);
 }
 
 }
index 77ecc34df93b728e97be4858aee0588f69f335ff..b0448d7e9508f63c128d734e5fad69cf8b2e254e 100644 (file)
@@ -16,7 +16,7 @@ class NVMeBlockDevice;
 }
 
 class SegmentManager;
-class ExtentReader;
+class SegmentManagerGroup;
 class SegmentProvider;
 
 class Journal {
@@ -93,7 +93,7 @@ namespace journal {
 
 JournalRef make_segmented(
   SegmentManager &sm,
-  ExtentReader &reader,
+  SegmentManagerGroup &sms,
   SegmentProvider &provider);
 
 }
index e4a29d795ef6a99087880bfbaacfc37d9845d108..18b29e50fab1e3a628fb21040a80137846a7c226 100644 (file)
@@ -28,7 +28,7 @@ namespace crimson::os::seastore::journal {
 
 SegmentedJournal::SegmentedJournal(
   SegmentManager &segment_manager,
-  ExtentReader &scanner,
+  SegmentManagerGroup &sms,
   SegmentProvider &segment_provider)
   : segment_provider(segment_provider),
     segment_seq_allocator(
@@ -47,7 +47,7 @@ SegmentedJournal::SegmentedJournal(
                      crimson::common::get_conf<double>(
                        "seastore_journal_batch_preferred_fullness"),
                      journal_segment_allocator),
-    scanner(scanner)
+    sms(sms)
 {
 }
 
@@ -150,12 +150,12 @@ SegmentedJournal::replay_segment(
   INFO("starting at {} -- {}", seq, header);
   return seastar::do_with(
     scan_valid_records_cursor(seq),
-    ExtentReader::found_record_handler_t(
+    SegmentManagerGroup::found_record_handler_t(
       [s_type=header.type, &handler, this](
       record_locator_t locator,
       const record_group_header_t& header,
       const bufferlist& mdbuf)
-      -> ExtentReader::scan_valid_records_ertr::future<>
+      -> SegmentManagerGroup::scan_valid_records_ertr::future<>
     {
       LOG_PREFIX(Journal::replay_segment);
       auto maybe_record_deltas_list = try_decode_deltas(
@@ -233,7 +233,7 @@ SegmentedJournal::replay_segment(
       });
     }),
     [=](auto &cursor, auto &dhandler) {
-      return scanner.scan_valid_records(
+      return sms.scan_valid_records(
        cursor,
        header.segment_nonce,
        std::numeric_limits<size_t>::max(),
@@ -262,7 +262,7 @@ SegmentedJournal::find_journal_segments()
          segment_id_t segment_id{
            journal_segment_allocator.get_device_id(),
            d_segment_id};
-         return scanner.read_segment_header(
+         return sms.read_segment_header(
            segment_id
          ).safe_then([segment_id, &ret](auto &&header) {
            if (header.get_type() == segment_type_t::JOURNAL) {
index e571583bcb99fd3dcf054d66192d1f7df0376176..973d04fd6f1ecbd6df0fe3b2477110d949ad2ba3 100644 (file)
@@ -11,7 +11,7 @@
 
 #include "crimson/os/seastore/segment_cleaner.h"
 #include "crimson/os/seastore/journal.h"
-#include "crimson/os/seastore/extent_reader.h"
+#include "crimson/os/seastore/segment_manager_group.h"
 #include "crimson/os/seastore/ordering_handle.h"
 #include "crimson/os/seastore/seastore_types.h"
 #include "crimson/osd/exceptions.h"
@@ -26,7 +26,7 @@ class SegmentedJournal : public Journal {
 public:
   SegmentedJournal(
     SegmentManager &segment_manager,
-    ExtentReader& scanner,
+    SegmentManagerGroup& sms,
     SegmentProvider& cleaner);
   ~SegmentedJournal() {}
 
@@ -56,10 +56,10 @@ private:
   SegmentSeqAllocatorRef segment_seq_allocator;
   SegmentAllocator journal_segment_allocator;
   RecordSubmitter record_submitter;
-  ExtentReader& scanner;
+  SegmentManagerGroup& sms;
   WritePipeline* write_pipeline = nullptr;
 
-  /// read journal segment headers from scanner
+  /// read journal segment headers from sms
   using find_journal_segments_ertr = crimson::errorator<
     crimson::ct_error::input_output_error>;
   using find_journal_segments_ret_bare = std::vector<
index f629f7e818108bfaf7bcd83bb4043f7afcf28862..085fe74994ffecd685bd74d1a709b55b1b4f2768 100644 (file)
@@ -178,11 +178,11 @@ void SpaceTrackerSimple::dump_usage(segment_id_t id) const
 
 SegmentCleaner::SegmentCleaner(
   config_t config,
-  ExtentReaderRef&& scr,
+  SegmentManagerGroupRef&& sm_group,
   bool detailed)
   : detailed(detailed),
     config(config),
-    scanner(std::move(scr)),
+    sm_group(std::move(sm_group)),
     ool_segment_seq_allocator(
       new SegmentSeqAllocator(segment_type_t::OOL)),
     gc_process(*this)
@@ -391,7 +391,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
       return seastar::now();
     }
     scan_cursor =
-      std::make_unique<ExtentReader::scan_extents_cursor>(
+      std::make_unique<SegmentManagerGroup::scan_extents_cursor>(
        next);
     logger().debug(
       "SegmentCleaner::do_gc: starting gc on segment {}",
@@ -400,7 +400,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
     ceph_assert(!scan_cursor->is_complete());
   }
 
-  return scanner->scan_extents(
+  return sm_group->scan_extents(
     *scan_cursor,
     config.reclaim_bytes_stride
   ).safe_then([this](auto &&_extents) {
@@ -505,7 +505,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
 SegmentCleaner::mount_ret SegmentCleaner::mount(
   device_id_t pdevice_id)
 {
-  auto& sms = scanner->get_segment_managers();
+  auto& sms = sm_group->get_segment_managers();
   logger().debug(
     "SegmentCleaner::mount: {} segment managers", sms.size());
   init_complete = false;
@@ -545,20 +545,20 @@ SegmentCleaner::mount_ret SegmentCleaner::mount(
       segments.end(),
       [this, &segment_set](auto& it) {
        auto segment_id = it.first;
-       return scanner->read_segment_header(
+       return sm_group->read_segment_header(
          segment_id
        ).safe_then([segment_id, this, &segment_set](auto header) {
          logger().debug(
-           "ExtentReader::mount: segment_id={} -- {}",
+           "SegmentCleaner::mount: segment_id={} -- {}",
            segment_id, header);
          auto s_type = header.get_type();
          if (s_type == segment_type_t::NULL_SEG) {
            logger().error(
-             "ExtentReader::mount: got null segment, segment_id={} -- {}",
+             "SegmentCleaner::mount: got null segment, segment_id={} -- {}",
              segment_id, header);
            ceph_abort();
          }
-         return scanner->read_segment_tail(
+         return sm_group->read_segment_tail(
            segment_id
          ).safe_then([this, segment_id, &segment_set, header](auto tail)
            -> scan_extents_ertr::future<> {
@@ -608,7 +608,7 @@ SegmentCleaner::scan_extents_ret SegmentCleaner::scan_nonfull_segment(
 {
   if (header.get_type() == segment_type_t::OOL) {
     logger().info(
-      "ExtentReader::init_segments: out-of-line segment {}",
+      "SegmentCleaner::scan_nonfull_segment: out-of-line segment {}",
       segment_id);
     return seastar::do_with(
       scan_valid_records_cursor({
@@ -616,11 +616,11 @@ SegmentCleaner::scan_extents_ret SegmentCleaner::scan_nonfull_segment(
        paddr_t::make_seg_paddr(segment_id, 0)}),
       [this, segment_id, header](auto& cursor) {
       return seastar::do_with(
-       ExtentReader::found_record_handler_t([this, segment_id](
+       SegmentManagerGroup::found_record_handler_t([this, segment_id](
            record_locator_t locator,
            const record_group_header_t& header,
            const bufferlist& mdbuf
-         ) mutable -> ExtentReader::scan_valid_records_ertr::future<> {
+         ) mutable -> SegmentManagerGroup::scan_valid_records_ertr::future<> {
          LOG_PREFIX(SegmentCleaner::scan_nonfull_segment);
          DEBUG("decodeing {} records", header.records);
          auto maybe_headers = try_decode_record_headers(header, mdbuf);
@@ -634,7 +634,7 @@ SegmentCleaner::scan_extents_ret SegmentCleaner::scan_nonfull_segment(
            mod_time_point_t ctime = header.commit_time;
            auto commit_type = header.commit_type;
            if (!ctime) {
-             ERROR("Scanner::init_segments: extent {} 0 commit_time",
+             ERROR("SegmentCleaner::scan_nonfull_segment: extent {} 0 commit_time",
                ctime);
              ceph_abort("0 commit_time");
            }
@@ -654,7 +654,7 @@ SegmentCleaner::scan_extents_ret SegmentCleaner::scan_nonfull_segment(
          return seastar::now();
        }),
        [&cursor, header, segment_id, this](auto& handler) {
-         return scanner->scan_valid_records(
+         return sm_group->scan_valid_records(
            cursor,
            header.segment_nonce,
            segments[segment_id.device_id()]->segment_size,
@@ -670,7 +670,7 @@ SegmentCleaner::scan_extents_ret SegmentCleaner::scan_nonfull_segment(
     });
   } else if (header.get_type() == segment_type_t::JOURNAL) {
     logger().info(
-      "ExtentReader::init_segments: journal segment {}",
+      "SEgmentCleaner::scan_nonfull_segment: journal segment {}",
       segment_id);
     segment_set.emplace_back(std::make_pair(segment_id, std::move(header)));
   } else {
@@ -690,7 +690,7 @@ SegmentCleaner::maybe_release_segment(Transaction &t)
   if (to_release != NULL_SEG_ID) {
     LOG_PREFIX(SegmentCleaner::maybe_release_segment);
     INFOT("releasing segment {}", t, to_release);
-    return scanner->release_segment(to_release
+    return sm_group->release_segment(to_release
     ).safe_then([this, to_release] {
       stats.segments_released++;
       mark_empty(to_release);
index e703b19793d68dacb7e4fe369698259be3070e53..df7c225ef8a508530e0fd0b09ff8d49b4a149180 100644 (file)
@@ -12,9 +12,9 @@
 
 #include "crimson/common/log.h"
 #include "crimson/os/seastore/cached_extent.h"
-#include "crimson/os/seastore/extent_reader.h"
 #include "crimson/os/seastore/seastore_types.h"
 #include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/segment_manager_group.h"
 #include "crimson/os/seastore/transaction.h"
 #include "crimson/os/seastore/segment_seq_allocator.h"
 
@@ -667,7 +667,7 @@ private:
   const bool detailed;
   const config_t config;
 
-  ExtentReaderRef scanner;
+  SegmentManagerGroupRef sm_group;
 
   SpaceTrackerIRef space_tracker;
   segment_info_set_t segments;
@@ -716,7 +716,7 @@ private:
 public:
   SegmentCleaner(
     config_t config,
-    ExtentReaderRef&& scanner,
+    SegmentManagerGroupRef&& sm_group,
     bool detailed = false);
 
   SegmentSeqAllocator& get_ool_segment_seq_allocator() {
@@ -766,7 +766,7 @@ public:
     return segments[id].get_type();
   }
 
-  using release_ertr = ExtentReader::release_ertr;
+  using release_ertr = SegmentManagerGroup::release_ertr;
   release_ertr::future<> maybe_release_segment(Transaction &t);
 
   void adjust_segment_util(double old_usage, double new_usage) {
@@ -968,7 +968,7 @@ private:
 
   // GC status helpers
   std::unique_ptr<
-    ExtentReader::scan_extents_cursor
+    SegmentManagerGroup::scan_extents_cursor
     > scan_cursor;
 
   /**
@@ -1046,7 +1046,7 @@ private:
   } gc_process;
 
   using gc_ertr = work_ertr::extend_ertr<
-    ExtentReader::scan_extents_ertr
+    SegmentManagerGroup::scan_extents_ertr
     >;
 
   gc_cycle_ret do_gc_cycle();
@@ -1278,7 +1278,7 @@ private:
 
   using scan_extents_ret_bare =
     std::vector<std::pair<segment_id_t, segment_header_t>>;
-  using scan_extents_ertr = ExtentReader::scan_extents_ertr;
+  using scan_extents_ertr = SegmentManagerGroup::scan_extents_ertr;
   using scan_extents_ret = scan_extents_ertr::future<>;
   scan_extents_ret scan_nonfull_segment(
     const segment_header_t& header,
diff --git a/src/crimson/os/seastore/segment_manager_group.cc b/src/crimson/os/seastore/segment_manager_group.cc
new file mode 100644 (file)
index 0000000..3e53021
--- /dev/null
@@ -0,0 +1,385 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "crimson/os/seastore/segment_manager_group.h"
+
+#include "crimson/os/seastore/logging.h"
+
+SET_SUBSYS(seastore_journal);
+
+namespace crimson::os::seastore {
+
+SegmentManagerGroup::read_segment_tail_ret
+SegmentManagerGroup::read_segment_tail(segment_id_t segment)
+{
+  auto& segment_manager = *segment_managers[segment.device_id()];
+  return segment_manager.read(
+    paddr_t::make_seg_paddr(
+      segment,
+      segment_manager.get_segment_size() -
+        segment_manager.get_rounded_tail_length()),
+    segment_manager.get_rounded_tail_length()
+  ).handle_error(
+    read_segment_header_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in SegmentManagerGroup::read_segment_tail"
+    }
+  ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_tail_ret {
+    LOG_PREFIX(SegmentManagerGroup::read_segment_tail);
+    DEBUG("segment {} bptr size {}", segment, bptr.length());
+
+    segment_tail_t tail;
+    bufferlist bl;
+    bl.push_back(bptr);
+
+    DEBUG("segment {} block crc {}",
+          segment,
+          bl.begin().crc32c(segment_manager.get_block_size(), 0));
+
+    auto bp = bl.cbegin();
+    try {
+      decode(tail, bp);
+    } catch (ceph::buffer::error &e) {
+      DEBUG("segment {} unable to decode tail, skipping -- {}",
+            segment, e);
+      return crimson::ct_error::enodata::make();
+    }
+    DEBUG("segment {} tail {}", segment, tail);
+    return read_segment_tail_ret(
+      read_segment_tail_ertr::ready_future_marker{},
+      tail);
+  });
+}
+
+SegmentManagerGroup::read_segment_header_ret
+SegmentManagerGroup::read_segment_header(segment_id_t segment)
+{
+  auto& segment_manager = *segment_managers[segment.device_id()];
+  return segment_manager.read(
+    paddr_t::make_seg_paddr(segment, 0),
+    segment_manager.get_block_size()
+  ).handle_error(
+    read_segment_header_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in SegmentManagerGroup::read_segment_header"
+    }
+  ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_header_ret {
+    LOG_PREFIX(SegmentManagerGroup::read_segment_header);
+    DEBUG("segment {} bptr size {}", segment, bptr.length());
+
+    segment_header_t header;
+    bufferlist bl;
+    bl.push_back(bptr);
+
+    DEBUG("segment {} block crc {}",
+          segment,
+          bl.begin().crc32c(segment_manager.get_block_size(), 0));
+
+    auto bp = bl.cbegin();
+    try {
+      decode(header, bp);
+    } catch (ceph::buffer::error &e) {
+      DEBUG("segment {} unable to decode header, skipping -- {}",
+            segment, e);
+      return crimson::ct_error::enodata::make();
+    }
+    DEBUG("segment {} header {}", segment, header);
+    return read_segment_header_ret(
+      read_segment_header_ertr::ready_future_marker{},
+      header);
+  });
+}
+
+SegmentManagerGroup::scan_extents_ret
+SegmentManagerGroup::scan_extents(
+  scan_extents_cursor &cursor,
+  extent_len_t bytes_to_read)
+{
+  auto ret = std::make_unique<scan_extents_ret_bare>();
+  auto* extents = ret.get();
+  return read_segment_header(cursor.get_segment_id()
+  ).handle_error(
+    scan_extents_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in SegmentManagerGroup::scan_extents"
+    }
+  ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
+    auto segment_nonce = segment_header.segment_nonce;
+    return seastar::do_with(
+      found_record_handler_t([extents](
+        record_locator_t locator,
+        const record_group_header_t& header,
+        const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
+      {
+        LOG_PREFIX(SegmentManagerGroup::scan_extents);
+        DEBUG("decoding {} records", header.records);
+        auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf);
+        if (!maybe_record_extent_infos) {
+          // This should be impossible, we did check the crc on the mdbuf
+          ERROR("unable to decode extents for record {}",
+                locator.record_block_base);
+          return crimson::ct_error::input_output_error::make();
+        }
+
+        paddr_t extent_offset = locator.record_block_base;
+        for (auto& r: *maybe_record_extent_infos) {
+          DEBUG("decoded {} extents", r.extent_infos.size());
+          for (const auto &i : r.extent_infos) {
+            extents->emplace_back(
+              extent_offset,
+              std::pair<commit_info_t, extent_info_t>(
+                {r.header.commit_time,
+                r.header.commit_type},
+                i));
+            auto& seg_addr = extent_offset.as_seg_paddr();
+            seg_addr.set_segment_off(
+              seg_addr.get_segment_off() + i.len);
+          }
+        }
+        return scan_extents_ertr::now();
+      }),
+      [bytes_to_read, segment_nonce, &cursor, this](auto &dhandler) {
+        return scan_valid_records(
+          cursor,
+          segment_nonce,
+          bytes_to_read,
+          dhandler
+        ).discard_result();
+      }
+    );
+  }).safe_then([ret=std::move(ret)] {
+    return std::move(*ret);
+  });
+}
+
+SegmentManagerGroup::scan_valid_records_ret
+SegmentManagerGroup::scan_valid_records(
+  scan_valid_records_cursor &cursor,
+  segment_nonce_t nonce,
+  size_t budget,
+  found_record_handler_t &handler)
+{
+  LOG_PREFIX(SegmentManagerGroup::scan_valid_records);
+  auto& segment_manager =
+    *segment_managers[cursor.get_segment_id().device_id()];
+  if (cursor.get_segment_offset() == 0) {
+    INFO("start to scan segment {}", cursor.get_segment_id());
+    cursor.increment_seq(segment_manager.get_block_size());
+  }
+  DEBUG("starting at {}, budget={}", cursor, budget);
+  auto retref = std::make_unique<size_t>(0);
+  auto &budget_used = *retref;
+  return crimson::repeat(
+    [=, &cursor, &budget_used, &handler]() mutable
+    -> scan_valid_records_ertr::future<seastar::stop_iteration> {
+      return [=, &handler, &cursor, &budget_used] {
+       if (!cursor.last_valid_header_found) {
+         return read_validate_record_metadata(cursor.seq.offset, nonce
+         ).safe_then([=, &cursor](auto md) {
+           if (!md) {
+             cursor.last_valid_header_found = true;
+             if (cursor.is_complete()) {
+               INFO("complete at {}, invalid record group metadata",
+                     cursor);
+             } else {
+               DEBUG("found invalid record group metadata at {}, "
+                     "processing {} pending record groups",
+                     cursor.seq,
+                     cursor.pending_record_groups.size());
+             }
+             return scan_valid_records_ertr::now();
+           } else {
+             auto& [header, md_bl] = *md;
+             DEBUG("found valid {} at {}", header, cursor.seq);
+             cursor.emplace_record_group(header, std::move(md_bl));
+             return scan_valid_records_ertr::now();
+           }
+         }).safe_then([=, &cursor, &budget_used, &handler] {
+           DEBUG("processing committed record groups until {}, {} pending",
+                 cursor.last_committed,
+                 cursor.pending_record_groups.size());
+           return crimson::repeat(
+             [=, &budget_used, &cursor, &handler] {
+               if (cursor.pending_record_groups.empty()) {
+                 /* This is only possible if the segment is empty.
+                  * A record's last_commited must be prior to its own
+                  * location since it itself cannot yet have been committed
+                  * at its own time of submission.  Thus, the most recently
+                  * read record must always fall after cursor.last_committed */
+                 return scan_valid_records_ertr::make_ready_future<
+                   seastar::stop_iteration>(seastar::stop_iteration::yes);
+               }
+               auto &next = cursor.pending_record_groups.front();
+               journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset};
+               if (cursor.last_committed == JOURNAL_SEQ_NULL ||
+                   next_seq > cursor.last_committed) {
+                 return scan_valid_records_ertr::make_ready_future<
+                   seastar::stop_iteration>(seastar::stop_iteration::yes);
+               }
+               return consume_next_records(cursor, handler, budget_used
+               ).safe_then([] {
+                 return scan_valid_records_ertr::make_ready_future<
+                   seastar::stop_iteration>(seastar::stop_iteration::no);
+               });
+             });
+         });
+       } else {
+         assert(!cursor.pending_record_groups.empty());
+         auto &next = cursor.pending_record_groups.front();
+         return read_validate_data(next.offset, next.header
+         ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) {
+           if (!valid) {
+             INFO("complete at {}, invalid record group data at {}, {}",
+                  cursor, next.offset, next.header);
+             cursor.pending_record_groups.clear();
+             return scan_valid_records_ertr::now();
+           }
+            return consume_next_records(cursor, handler, budget_used);
+         });
+       }
+      }().safe_then([=, &budget_used, &cursor] {
+       if (cursor.is_complete() || budget_used >= budget) {
+         DEBUG("finish at {}, budget_used={}, budget={}",
+                cursor, budget_used, budget);
+         return seastar::stop_iteration::yes;
+       } else {
+         return seastar::stop_iteration::no;
+       }
+      });
+    }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
+      return scan_valid_records_ret(
+       scan_valid_records_ertr::ready_future_marker{},
+       std::move(*retref));
+    });
+}
+
+SegmentManagerGroup::read_validate_record_metadata_ret
+SegmentManagerGroup::read_validate_record_metadata(
+  paddr_t start,
+  segment_nonce_t nonce)
+{
+  LOG_PREFIX(SegmentManagerGroup::read_validate_record_metadata);
+  auto& seg_addr = start.as_seg_paddr();
+  auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()];
+  auto block_size = segment_manager.get_block_size();
+  auto segment_size = static_cast<int64_t>(segment_manager.get_segment_size());
+  if (seg_addr.get_segment_off() + block_size > segment_size) {
+    DEBUG("failed -- record group header block {}~4096 > segment_size {}", start, segment_size);
+    return read_validate_record_metadata_ret(
+      read_validate_record_metadata_ertr::ready_future_marker{},
+      std::nullopt);
+  }
+  TRACE("reading record group header block {}~4096", start);
+  return segment_manager.read(start, block_size
+  ).safe_then([=, &segment_manager](bufferptr bptr) mutable
+              -> read_validate_record_metadata_ret {
+    auto block_size = static_cast<extent_len_t>(
+        segment_manager.get_block_size());
+    bufferlist bl;
+    bl.append(bptr);
+    auto maybe_header = try_decode_records_header(bl, nonce);
+    if (!maybe_header.has_value()) {
+      return read_validate_record_metadata_ret(
+        read_validate_record_metadata_ertr::ready_future_marker{},
+        std::nullopt);
+    }
+    auto& seg_addr = start.as_seg_paddr();
+    auto& header = *maybe_header;
+    if (header.mdlength < block_size ||
+        header.mdlength % block_size != 0 ||
+        header.dlength % block_size != 0 ||
+        (header.committed_to != JOURNAL_SEQ_NULL &&
+         header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) ||
+        (seg_addr.get_segment_off() + header.mdlength + header.dlength > segment_size)) {
+      ERROR("failed, invalid record group header {}", start);
+      return crimson::ct_error::input_output_error::make();
+    }
+    if (header.mdlength == block_size) {
+      return read_validate_record_metadata_ret(
+        read_validate_record_metadata_ertr::ready_future_marker{},
+        std::make_pair(std::move(header), std::move(bl))
+      );
+    }
+
+    auto rest_start = paddr_t::make_seg_paddr(
+        seg_addr.get_segment_id(),
+        seg_addr.get_segment_off() + (seastore_off_t)block_size
+    );
+    auto rest_len = header.mdlength - block_size;
+    TRACE("reading record group header rest {}~{}", rest_start, rest_len);
+    return segment_manager.read(rest_start, rest_len
+    ).safe_then([header=std::move(header), bl=std::move(bl)
+                ](auto&& bptail) mutable {
+      bl.push_back(bptail);
+      return read_validate_record_metadata_ret(
+        read_validate_record_metadata_ertr::ready_future_marker{},
+        std::make_pair(std::move(header), std::move(bl)));
+    });
+  }).safe_then([](auto p) {
+    if (p && validate_records_metadata(p->second)) {
+      return read_validate_record_metadata_ret(
+        read_validate_record_metadata_ertr::ready_future_marker{},
+        std::move(*p)
+      );
+    } else {
+      return read_validate_record_metadata_ret(
+        read_validate_record_metadata_ertr::ready_future_marker{},
+        std::nullopt);
+    }
+  });
+}
+
+SegmentManagerGroup::read_validate_data_ret
+SegmentManagerGroup::read_validate_data(
+  paddr_t record_base,
+  const record_group_header_t &header)
+{
+  LOG_PREFIX(SegmentManagerGroup::read_validate_data);
+  auto& segment_manager = *segment_managers[record_base.get_device_id()];
+  auto data_addr = record_base.add_offset(header.mdlength);
+  TRACE("reading record group data blocks {}~{}", data_addr, header.dlength);
+  return segment_manager.read(
+    data_addr,
+    header.dlength
+  ).safe_then([=, &header](auto bptr) {
+    bufferlist bl;
+    bl.append(bptr);
+    return validate_records_data(header, bl);
+  });
+}
+
+SegmentManagerGroup::consume_record_group_ertr::future<>
+SegmentManagerGroup::consume_next_records(
+  scan_valid_records_cursor& cursor,
+  found_record_handler_t& handler,
+  std::size_t& budget_used)
+{
+  LOG_PREFIX(SegmentManagerGroup::consume_next_records);
+  auto& next = cursor.pending_record_groups.front();
+  auto total_length = next.header.dlength + next.header.mdlength;
+  budget_used += total_length;
+  auto locator = record_locator_t{
+    next.offset.add_offset(next.header.mdlength),
+    write_result_t{
+      journal_seq_t{
+        cursor.seq.segment_seq,
+        next.offset
+      },
+      static_cast<seastore_off_t>(total_length)
+    }
+  };
+  DEBUG("processing {} at {}, budget_used={}",
+        next.header, locator, budget_used);
+  return handler(
+    locator,
+    next.header,
+    next.mdbuffer
+  ).safe_then([FNAME, &cursor] {
+    cursor.pop_record_group();
+    if (cursor.is_complete()) {
+      INFO("complete at {}, no more record group", cursor);
+    }
+  });
+}
+
+} // namespace crimson::os::seastore
diff --git a/src/crimson/os/seastore/segment_manager_group.h b/src/crimson/os/seastore/segment_manager_group.h
new file mode 100644 (file)
index 0000000..522eed1
--- /dev/null
@@ -0,0 +1,123 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+namespace crimson::os::seastore {
+
+class SegmentManagerGroup {
+public:
+  std::vector<SegmentManager*>& get_segment_managers() {
+    return segment_managers;
+  }
+
+  using read_ertr = SegmentManager::read_ertr;
+  SegmentManagerGroup() {
+    segment_managers.resize(DEVICE_ID_MAX, nullptr);
+  }
+  using read_segment_header_ertr = crimson::errorator<
+    crimson::ct_error::enoent,
+    crimson::ct_error::enodata,
+    crimson::ct_error::input_output_error
+    >;
+  using read_segment_header_ret = read_segment_header_ertr::future<
+    segment_header_t>;
+  read_segment_header_ret read_segment_header(segment_id_t segment);
+
+  using read_segment_tail_ertr = read_segment_header_ertr;
+  using read_segment_tail_ret = read_segment_tail_ertr::future<
+    segment_tail_t>;
+  read_segment_tail_ret  read_segment_tail(segment_id_t segment);
+
+  struct commit_info_t {
+    mod_time_point_t commit_time;
+    record_commit_type_t commit_type;
+  };
+
+  /**
+   * scan_extents
+   *
+   * Scans records beginning at addr until the first record boundary after
+   * addr + bytes_to_read.
+   *
+   * Returns list<extent, extent_info>
+   * cursor.is_complete() will be true when no further extents exist in segment.
+   */
+  using scan_extents_cursor = scan_valid_records_cursor;
+  using scan_extents_ertr = read_ertr::extend<crimson::ct_error::enodata>;
+  using scan_extents_ret_bare =
+    std::list<std::pair<paddr_t, std::pair<commit_info_t, extent_info_t>>>;
+  using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
+  scan_extents_ret scan_extents(
+    scan_extents_cursor &cursor,
+    extent_len_t bytes_to_read
+  );
+
+  using scan_valid_records_ertr = read_ertr::extend<crimson::ct_error::enodata>;
+  using scan_valid_records_ret = scan_valid_records_ertr::future<
+    size_t>;
+  using found_record_handler_t = std::function<
+    scan_valid_records_ertr::future<>(
+      record_locator_t record_locator,
+      // callee may assume header and bl will remain valid until
+      // returned future resolves
+      const record_group_header_t &header,
+      const bufferlist &mdbuf)>;
+  scan_valid_records_ret scan_valid_records(
+    scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
+    segment_nonce_t nonce,             ///< [in] nonce for segment
+    size_t budget,                     ///< [in] max budget to use
+    found_record_handler_t &handler    ///< [in] handler for records
+  ); ///< @return used budget
+
+  using release_ertr = SegmentManager::release_ertr;
+  release_ertr::future<> release_segment(segment_id_t id) {
+    assert(segment_managers[id.device_id()] != nullptr);
+    return segment_managers[id.device_id()]->release(id);
+  }
+
+  void add_segment_manager(SegmentManager* segment_manager) {
+    ceph_assert(!segment_managers[segment_manager->get_device_id()]);
+    segment_managers[segment_manager->get_device_id()] = segment_manager;
+  }
+
+  void reset() {
+    segment_managers.clear();
+    segment_managers.resize(DEVICE_ID_MAX, nullptr);
+  }
+
+private:
+  std::vector<SegmentManager*> segment_managers;
+  /// read record metadata for record starting at start
+  using read_validate_record_metadata_ertr = read_ertr;
+  using read_validate_record_metadata_ret =
+    read_validate_record_metadata_ertr::future<
+      std::optional<std::pair<record_group_header_t, bufferlist>>
+    >;
+  read_validate_record_metadata_ret read_validate_record_metadata(
+    paddr_t start,
+    segment_nonce_t nonce);
+
+  /// read and validate data
+  using read_validate_data_ertr = read_ertr;
+  using read_validate_data_ret = read_validate_data_ertr::future<bool>;
+  read_validate_data_ret read_validate_data(
+    paddr_t record_base,
+    const record_group_header_t &header  ///< caller must ensure lifetime through
+                                         ///  future resolution
+  );
+
+  using consume_record_group_ertr = scan_valid_records_ertr;
+  consume_record_group_ertr::future<> consume_next_records(
+      scan_valid_records_cursor& cursor,
+      found_record_handler_t& handler,
+      std::size_t& budget_used);
+};
+
+using SegmentManagerGroupRef = std::unique_ptr<SegmentManagerGroup>;
+
+} // namespace crimson::os::seastore
index b922aedb5523418809c2507e2286c939f854d7a1..768007027233d8d4e4800e532ea79e1d6529787d 100644 (file)
@@ -27,13 +27,13 @@ TransactionManager::TransactionManager(
   CacheRef _cache,
   LBAManagerRef _lba_manager,
   ExtentPlacementManagerRef&& epm,
-  ExtentReader& scanner)
+  SegmentManagerGroup& sms)
   : segment_cleaner(std::move(_segment_cleaner)),
     cache(std::move(_cache)),
     lba_manager(std::move(_lba_manager)),
     journal(std::move(_journal)),
     epm(std::move(epm)),
-    scanner(scanner)
+    sms(sms)
 {
   segment_cleaner->set_extent_callback(this);
   journal->set_write_pipeline(&write_pipeline);
@@ -156,7 +156,7 @@ TransactionManager::close_ertr::future<> TransactionManager::close() {
     cache->dump_contents();
     return journal->close();
   }).safe_then([this] {
-    scanner.reset();
+    sms.reset();
     return epm->close();
   }).safe_then([FNAME] {
     INFO("completed");
@@ -545,16 +545,16 @@ TransactionManagerRef make_transaction_manager(
     Device &device,
     bool detailed)
 {
-  auto scanner = std::make_unique<ExtentReader>();
-  auto& scanner_ref = *scanner.get();
+  auto sms = std::make_unique<SegmentManagerGroup>();
+  auto& sms_ref = *sms.get();
   auto segment_cleaner = std::make_unique<SegmentCleaner>(
     SegmentCleaner::config_t::get_default(),
-    std::move(scanner),
+    std::move(sms),
     detailed);
   ceph_assert(device.get_device_type() == device_type_t::SEGMENTED);
   auto sm = dynamic_cast<SegmentManager*>(&device);
   ceph_assert(sm != nullptr);
-  auto journal = journal::make_segmented(*sm, scanner_ref, *segment_cleaner);
+  auto journal = journal::make_segmented(*sm, sms_ref, *segment_cleaner);
   auto epm = std::make_unique<ExtentPlacementManager>();
   auto cache = std::make_unique<Cache>(*epm);
   auto lba_manager = lba_manager::create_lba_manager(*cache);
@@ -565,7 +565,7 @@ TransactionManagerRef make_transaction_manager(
     std::move(cache),
     std::move(lba_manager),
     std::move(epm),
-    scanner_ref);
+    sms_ref);
 }
 
 }
index cc54ba2fb1515921661bc78415d1d155598ad7fb..2e5385025b2052f0c71d0cedd871768cbf721158 100644 (file)
@@ -28,6 +28,7 @@
 #include "crimson/os/seastore/journal.h"
 #include "crimson/os/seastore/extent_placement_manager.h"
 #include "crimson/os/seastore/device.h"
+#include "crimson/os/seastore/segment_manager_group.h"
 
 namespace crimson::os::seastore {
 class Journal;
@@ -69,7 +70,7 @@ public:
     CacheRef cache,
     LBAManagerRef lba_manager,
     ExtentPlacementManagerRef&& epm,
-    ExtentReader& scanner);
+    SegmentManagerGroup& sms);
 
   /// Writes initial metadata to disk
   using mkfs_ertr = base_ertr;
@@ -548,7 +549,7 @@ public:
        *segment_cleaner,
        *sm,
        segment_cleaner->get_ool_segment_seq_allocator()));
-    scanner.add_segment_manager(sm);
+    sms.add_segment_manager(sm);
   }
 
   ~TransactionManager();
@@ -561,7 +562,7 @@ private:
   LBAManagerRef lba_manager;
   JournalRef journal;
   ExtentPlacementManagerRef epm;
-  ExtentReader& scanner;
+  SegmentManagerGroup& sms;
 
   WritePipeline write_pipeline;
 
index 63261111b6096c7c11ab2d362547ac76168246dc..66f405006e995672f434887b19fd0ea019b9a2b0 100644 (file)
@@ -28,7 +28,7 @@ struct btree_test_base :
   public seastar_test_suite_t, SegmentProvider {
 
   segment_manager::EphemeralSegmentManagerRef segment_manager;
-  ExtentReaderRef scanner;
+  SegmentManagerGroupRef sms;
   JournalRef journal;
   ExtentPlacementManagerRef epm;
   CacheRef cache;
@@ -98,16 +98,16 @@ struct btree_test_base :
   virtual LBAManager::mkfs_ret test_structure_setup(Transaction &t) = 0;
   seastar::future<> set_up_fut() final {
     segment_manager = segment_manager::create_test_ephemeral();
-    scanner.reset(new ExtentReader());
-    auto& scanner_ref = *scanner.get();
+    sms.reset(new SegmentManagerGroup());
+    auto& sms_ref = *sms.get();
     journal = journal::make_segmented(
-      *segment_manager, scanner_ref, *this);
+      *segment_manager, sms_ref, *this);
     epm.reset(new ExtentPlacementManager());
     cache.reset(new Cache(*epm));
 
     block_size = segment_manager->get_block_size();
     next = segment_id_t{segment_manager->get_device_id(), 0};
-    scanner_ref.add_segment_manager(segment_manager.get());
+    sms_ref.add_segment_manager(segment_manager.get());
     epm->add_device(segment_manager.get(), true);
     journal->set_write_pipeline(&pipeline);
 
@@ -148,7 +148,7 @@ struct btree_test_base :
     }).safe_then([this] {
       test_structure_reset();
       segment_manager.reset();
-      scanner.reset();
+      sms.reset();
       journal.reset();
       epm.reset();
       cache.reset();
index 3599dc16033435fa2db189f22f950cc65b576c74..0daae06f734762bcb38bf75bb7d3f3c55ec0b30a 100644 (file)
@@ -76,7 +76,7 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider {
 
   seastore_off_t block_size;
 
-  ExtentReaderRef scanner;
+  SegmentManagerGroupRef sms;
 
   segment_id_t next;
 
@@ -125,11 +125,11 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider {
   seastar::future<> set_up_fut() final {
     segment_manager = segment_manager::create_test_ephemeral();
     block_size = segment_manager->get_block_size();
-    scanner.reset(new ExtentReader());
+    sms.reset(new SegmentManagerGroup());
     next = segment_id_t(segment_manager->get_device_id(), 0);
-    journal = journal::make_segmented(*segment_manager, *scanner, *this);
+    journal = journal::make_segmented(*segment_manager, *sms, *this);
     journal->set_write_pipeline(&pipeline);
-    scanner->add_segment_manager(segment_manager.get());
+    sms->add_segment_manager(segment_manager.get());
     return segment_manager->init(
     ).safe_then([this] {
       return journal->open_for_write();
@@ -144,7 +144,7 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider {
     return journal->close(
     ).safe_then([this] {
       segment_manager.reset();
-      scanner.reset();
+      sms.reset();
       journal.reset();
     }).handle_error(
       crimson::ct_error::all_same_way([](auto e) {
@@ -158,7 +158,7 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider {
     return journal->close(
     ).safe_then([this, f=std::move(f)]() mutable {
       journal = journal::make_segmented(
-       *segment_manager, *scanner, *this);
+       *segment_manager, *sms, *this);
       journal->set_write_pipeline(&pipeline);
       return journal->replay(std::forward<T>(std::move(f)));
     }).safe_then([this] {