]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore: introduce initial journal implementation and tests
authorSamuel Just <sjust@redhat.com>
Mon, 4 May 2020 18:47:13 +0000 (11:47 -0700)
committerSamuel Just <sjust@redhat.com>
Fri, 15 May 2020 06:45:02 +0000 (23:45 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/journal.cc [new file with mode: 0644]
src/crimson/os/seastore/journal.h [new file with mode: 0644]
src/crimson/os/seastore/seastore_types.cc [new file with mode: 0644]
src/crimson/os/seastore/seastore_types.h [new file with mode: 0644]
src/crimson/os/seastore/segment_manager.cc [new file with mode: 0644]
src/crimson/os/seastore/segment_manager.h [new file with mode: 0644]
src/crimson/os/seastore/segment_manager/ephemeral.cc [new file with mode: 0644]
src/crimson/os/seastore/segment_manager/ephemeral.h [new file with mode: 0644]
src/test/crimson/seastore/CMakeLists.txt
src/test/crimson/seastore/test_seastore_journal.cc [new file with mode: 0644]

index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..32b756a471dba0e31489902babe2909862556e2a 100644 (file)
@@ -0,0 +1,8 @@
+add_library(crimson-seastore
+  seastore_types.cc
+  segment_manager/ephemeral.cc
+  segment_manager.cc
+  journal.cc
+       )
+target_link_libraries(crimson-seastore
+  crimson)
diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc
new file mode 100644 (file)
index 0000000..6d041f7
--- /dev/null
@@ -0,0 +1,370 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/iterator/counting_iterator.hpp>
+
+#include "crimson/os/seastore/journal.h"
+
+#include "include/intarith.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_filestore);
+  }
+}
+
+namespace crimson::os::seastore {
+
+Journal::Journal(SegmentManager &segment_manager)
+  : block_size(segment_manager.get_block_size()),
+    max_record_length(
+      segment_manager.get_segment_size() -
+      p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
+             size_t(block_size))),
+    segment_manager(segment_manager) {}
+
+
+Journal::initialize_segment_ertr::future<> Journal::initialize_segment(
+  Segment &segment)
+{
+  logger().debug("initialize_segment {}", segment.get_segment_id());
+  // write out header
+  ceph_assert(segment.get_write_ptr() == 0);
+  bufferlist bl;
+  auto header = segment_header_t{
+    current_journal_segment_seq++,
+    segment.get_segment_id(),
+    current_replay_point};
+  ::encode(header, bl);
+
+  written_to = segment_manager.get_block_size();
+  return segment.write(0, bl).handle_error(
+    init_ertr::pass_further{},
+    crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); }));
+}
+
+ceph::bufferlist Journal::encode_record(
+  record_size_t rsize,
+  record_t &&record)
+{
+  bufferlist metadatabl;
+  record_header_t header{
+    rsize.mdlength,
+    rsize.dlength,
+    current_journal_seq,
+    0 /* checksum, TODO */,
+    record.deltas.size(),
+    record.extents.size()
+  };
+  ::encode(header, metadatabl);
+  for (const auto &i: record.deltas) {
+    ::encode(i, metadatabl);
+  }
+  bufferlist databl;
+  for (auto &i: record.extents) {
+    databl.claim_append(i.bl);
+  }
+  if (metadatabl.length() % block_size != 0) {
+    metadatabl.append(
+      ceph::bufferptr(
+       block_size - (metadatabl.length() % block_size)));
+  }
+
+  ceph_assert(metadatabl.length() == rsize.mdlength);
+  ceph_assert(databl.length() == rsize.dlength);
+  metadatabl.claim_append(databl);
+  ceph_assert(metadatabl.length() == (rsize.mdlength + rsize.dlength));
+  return metadatabl;
+}
+
+Journal::write_record_ertr::future<> Journal::write_record(
+  record_size_t rsize,
+  record_t &&record)
+{
+  ceph::bufferlist to_write = encode_record(
+    rsize, std::move(record));
+  auto target = written_to;
+  written_to += p2roundup(to_write.length(), (unsigned)block_size);
+  logger().debug(
+    "write_record, mdlength {}, dlength {}",
+    rsize.mdlength,
+    rsize.dlength);
+  return current_journal_segment->write(target, to_write).handle_error(
+    write_record_ertr::pass_further{},
+    crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); }));
+}
+
+Journal::record_size_t Journal::get_encoded_record_length(
+  const record_t &record) const {
+  extent_len_t metadata =
+    (extent_len_t)ceph::encoded_sizeof_bounded<record_header_t>();
+  extent_len_t data = 0;
+  for (const auto &i: record.deltas) {
+    metadata += ceph::encoded_sizeof(i);
+  }
+  for (const auto &i: record.extents) {
+    data += i.bl.length();
+  }
+  metadata = p2roundup(metadata, block_size);
+  return record_size_t{metadata, data};
+}
+
+bool Journal::needs_roll(segment_off_t length) const
+{
+  return length + written_to >
+    current_journal_segment->get_write_capacity();
+}
+
+paddr_t Journal::next_record_addr() const
+{
+  paddr_t ret{current_journal_segment->get_segment_id(), written_to};
+  logger().debug("next_record_addr: {}", ret);
+  return ret;
+}
+
+Journal::roll_journal_segment_ertr::future<>
+Journal::roll_journal_segment()
+{
+  auto old_segment_id = current_journal_segment ?
+    current_journal_segment->get_segment_id() :
+    NULL_SEG_ID;
+
+  return (current_journal_segment ?
+         current_journal_segment->close() :
+         Segment::close_ertr::now()).safe_then(
+    [this, old_segment_id] {
+      // TODO: pretty sure this needs to be atomic in some sense with
+      // making use of the new segment, maybe this bit needs to take
+      // the first transaction of the new segment?  Or the segment
+      // header should include deltas?
+      if (old_segment_id != NULL_SEG_ID) {
+       segment_provider->put_segment(old_segment_id);
+      }
+      return segment_provider->get_segment();
+    }).safe_then([this](auto segment) {
+      return segment_manager.open(segment);
+    }).safe_then([this](auto sref) {
+      current_journal_segment = sref;
+      written_to = 0;
+      return initialize_segment(*current_journal_segment);
+    }).handle_error(
+      roll_journal_segment_ertr::pass_further{},
+      crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); })
+    );
+}
+
+Journal::init_ertr::future<> Journal::open_for_write()
+{
+  return roll_journal_segment();
+}
+
+Journal::find_replay_segments_fut Journal::find_replay_segments()
+{
+  return seastar::do_with(
+    std::vector<std::pair<segment_id_t, segment_header_t>>(),
+    [this](auto &&segments) mutable {
+      return crimson::do_for_each(
+       boost::make_counting_iterator(segment_id_t{0}),
+       boost::make_counting_iterator(segment_manager.get_num_segments()),
+       [this, &segments](auto i) {
+         return segment_manager.read(paddr_t{i, 0}, block_size
+         ).safe_then([this, &segments, i](bufferptr bptr) mutable {
+           logger().debug("segment {} bptr size {}", i, bptr.length());
+           segment_header_t header;
+           bufferlist bl;
+           bl.push_back(bptr);
+
+           logger().debug(
+             "find_replay_segments: segment {} block crc {}",
+             i,
+             bl.begin().crc32c(block_size, 0));
+
+           auto bp = bl.cbegin();
+           try {
+             ::decode(header, bp);
+           } catch (ceph::buffer::error &e) {
+             logger().debug(
+               "find_replay_segments: segment {} unable to decode "
+               "header, skipping",
+               i);
+             return find_replay_segments_ertr::now();
+           }
+           segments.emplace_back(i, std::move(header));
+           return find_replay_segments_ertr::now();
+         }).handle_error(
+           find_replay_segments_ertr::pass_further{},
+           crimson::ct_error::discard_all{}
+         );
+       }).safe_then([this, &segments]() mutable -> find_replay_segments_fut {
+         logger().debug(
+           "find_replay_segments: have {} segments",
+           segments.size());
+         if (segments.empty()) {
+           return crimson::ct_error::input_output_error::make();
+         }
+         std::sort(
+           segments.begin(),
+           segments.end(),
+           [](const auto &lt, const auto &rt) {
+             return lt.second.journal_segment_seq <
+               rt.second.journal_segment_seq;
+           });
+
+         auto replay_from = segments.rbegin()->second.journal_replay_lb;
+         auto from = segments.begin();
+         if (replay_from != P_ADDR_NULL) {
+           from = std::find_if(
+             segments.begin(),
+             segments.end(),
+             [&replay_from](const auto &seg) -> bool {
+               return seg.first == replay_from.segment;
+             });
+         } else {
+           replay_from = paddr_t{from->first, (segment_off_t)block_size};
+         }
+         auto ret = std::vector<paddr_t>(segments.end() - from);
+         std::transform(
+           from, segments.end(), ret.begin(),
+           [this](const auto &p) {
+             return paddr_t{p.first, (segment_off_t)block_size};
+           });
+         ret[0] = replay_from;
+         return find_replay_segments_fut(
+           find_replay_segments_ertr::ready_future_marker{},
+           std::move(ret));
+       });
+    });
+}
+
+Journal::read_record_metadata_ret Journal::read_record_metadata(
+  paddr_t start)
+{
+  return segment_manager.read(start, block_size
+  ).safe_then(
+    [this, start](bufferptr bptr) mutable
+    -> read_record_metadata_ret {
+      logger().debug("read_record_metadata: reading {}", start);
+      bufferlist bl;
+      bl.append(bptr);
+      auto bp = bl.cbegin();
+      record_header_t header;
+      try {
+       ::decode(header, bp);
+      } catch (ceph::buffer::error &e) {
+       return read_record_metadata_ret(
+         read_record_metadata_ertr::ready_future_marker{},
+         std::nullopt);
+      }
+      if (header.mdlength > block_size) {
+       return segment_manager.read(
+         {start.segment, start.offset + (segment_off_t)block_size},
+         header.mdlength - block_size).safe_then(
+           [this, header=std::move(header), bl=std::move(bl)](
+             auto &&bptail) mutable {
+             bl.push_back(bptail);
+             return read_record_metadata_ret(
+               read_record_metadata_ertr::ready_future_marker{},
+               std::make_pair(std::move(header), std::move(bl)));
+           });
+      } else {
+         return read_record_metadata_ret(
+           read_record_metadata_ertr::ready_future_marker{},
+           std::make_pair(std::move(header), std::move(bl))
+         );
+      }
+    });
+}
+
+std::optional<std::vector<delta_info_t>> Journal::try_decode_deltas(
+  record_header_t header,
+  bufferlist &bl)
+{
+  auto bliter = bl.cbegin();
+  bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+  std::vector<delta_info_t> deltas(header.deltas);
+  for (auto &&i : deltas) {
+    try {
+      ::decode(i, bliter);
+    } catch (ceph::buffer::error &e) {
+      return std::nullopt;
+    }
+  }
+  return deltas;
+}
+
+Journal::replay_ertr::future<>
+Journal::replay_segment(
+  paddr_t start,
+  delta_handler_t &delta_handler)
+{
+  logger().debug("replay_segment: starting at {}", start);
+  return seastar::do_with(
+    std::move(start),
+    [this, &delta_handler](auto &current) {
+      return crimson::do_until(
+       [this, &current, &delta_handler]() -> replay_ertr::future<bool> {
+         return read_record_metadata(current).safe_then
+           ([this, &current, &delta_handler](auto p)
+            -> replay_ertr::future<bool> {
+             if (!p.has_value()) {
+               return replay_ertr::make_ready_future<bool>(true);
+             }
+
+             auto &[header, bl] = *p;
+
+             logger().debug(
+               "replay_segment: next record offset {} mdlength {} dlength {}",
+               current,
+               header.mdlength,
+               header.dlength);
+
+             auto record_start = current;
+             current.offset += header.mdlength + header.dlength;
+
+             auto deltas = try_decode_deltas(
+               header,
+               bl);
+             if (!deltas) {
+               return replay_ertr::make_ready_future<bool>(true);
+             }
+
+             return seastar::do_with(
+               std::move(*deltas),
+               [this, &delta_handler, record_start](auto &deltas) {
+                 return crimson::do_for_each(
+                   deltas,
+                   [this, &delta_handler, record_start](auto &info) {
+                     return delta_handler(
+                       record_start.add_relative(
+                         make_relative_paddr(block_size)),
+                       info);
+                   });
+               }).safe_then([] {
+                 return replay_ertr::make_ready_future<bool>(false);
+               });
+           });
+       });
+    });
+}
+
+Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
+{
+  return seastar::do_with(
+    std::make_pair(std::move(delta_handler), std::vector<paddr_t>()),
+    [this](auto &&item) mutable -> replay_ret {
+      auto &[handler, segments] = item;
+      return find_replay_segments(
+      ).safe_then([this, &handler, &segments](auto osegments) {
+       logger().debug("replay: found {} segments", segments.size());
+       segments.swap(osegments);
+       return crimson::do_for_each(
+         segments,
+         [this, &handler](auto i) {
+           return replay_segment(i, handler);
+         });
+      });
+    });
+}
+
+}
diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h
new file mode 100644 (file)
index 0000000..aed11b5
--- /dev/null
@@ -0,0 +1,262 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/common/log.h"
+
+#include <boost/intrusive_ptr.hpp>
+
+#include <seastar/core/future.hh>
+
+#include "include/ceph_assert.h"
+#include "include/buffer.h"
+#include "include/denc.h"
+
+#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/osd/exceptions.h"
+
+namespace crimson::os::seastore {
+
+using journal_seq_t = uint64_t;
+static constexpr journal_seq_t NO_DELTAS =
+  std::numeric_limits<journal_seq_t>::max();
+
+/**
+ * Segment header
+ *
+ * Every segment contains and encode segment_header_t in the first block.
+ * Our strategy for finding the journal replay point is:
+ * 1) Find the segment with the highest journal_segment_seq
+ * 2) Scan forward from committed_journal_lb to find the most recent
+ *    journal_commit_lb record
+ * 3) Replay starting at the most recent found journal_commit_lb record
+ */
+struct segment_header_t {
+  segment_seq_t journal_segment_seq;
+  segment_id_t physical_segment_id; // debugging
+
+  paddr_t journal_replay_lb;
+
+  DENC(segment_header_t, v, p) {
+    DENC_START(1, 1, p);
+    denc(v.journal_segment_seq, p);
+    denc(v.physical_segment_id, p);
+    denc(v.journal_replay_lb, p);
+    DENC_FINISH(p);
+  }
+};
+
+struct record_header_t {
+  // Fixed portion
+  extent_len_t  mdlength;       // block aligned, length of metadata
+  extent_len_t  dlength;        // block aligned, length of data
+  journal_seq_t seq;            // current journal seqid
+  checksum_t    full_checksum;  // checksum for full record (TODO)
+  size_t deltas;                // number of deltas
+  size_t extents;               // number of extents
+
+  DENC(record_header_t, v, p) {
+    DENC_START(1, 1, p);
+    denc(v.mdlength, p);
+    denc(v.dlength, p);
+    denc(v.seq, p);
+    denc(v.full_checksum, p);
+    denc(v.deltas, p);
+    denc(v.extents, p);
+    DENC_FINISH(p);
+  }
+};
+
+/**
+ * Callback interface for managing available segments
+ */
+class JournalSegmentProvider {
+public:
+  using get_segment_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error>;
+  using get_segment_ret = get_segment_ertr::future<segment_id_t>;
+  virtual get_segment_ret get_segment() = 0;
+
+  /* TODO: we'll want to use this to propogate information about segment contents */
+  virtual void put_segment(segment_id_t segment) = 0;
+
+  virtual ~JournalSegmentProvider() {}
+};
+
+/**
+ * Manages stream of atomically written records to a SegmentManager.
+ */
+class Journal {
+public:
+  Journal(SegmentManager &segment_manager);
+
+  /**
+   * Sets the JournalSegmentProvider.
+   *
+   * Not provided in constructor to allow the provider to not own
+   * or construct the Journal (TransactionManager).
+   *
+   * Note, Journal does not own this ptr, user must ensure that
+   * *provider outlives Journal.
+   */
+  void set_segment_provider(JournalSegmentProvider *provider) {
+    segment_provider = provider;
+  }
+
+  /**
+   * initializes journal for new writes -- must run prior to calls
+   * to submit_record.  Should be called after replay if not a new
+   * Journal.
+   */
+  using init_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error
+    >;
+  init_ertr::future<> open_for_write();
+
+  /**
+   * close journal
+   *
+   * TODO: should probably flush and disallow further writes
+   */
+  using close_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error>;
+  close_ertr::future<> close() { return close_ertr::now(); }
+
+  /**
+   * write_record
+   *
+   * @param write record and returns offset of first block
+   */
+  using submit_record_ertr = crimson::errorator<
+    crimson::ct_error::erange,
+    crimson::ct_error::input_output_error
+    >;
+  using submit_record_ret = submit_record_ertr::future<paddr_t>;
+  submit_record_ret submit_record(record_t &&record) {
+    auto rsize = get_encoded_record_length(record);
+    auto total = rsize.mdlength + rsize.dlength;
+    if (total > max_record_length) {
+      return crimson::ct_error::erange::make();
+    }
+    auto roll = needs_roll(total)
+      ? roll_journal_segment()
+      : roll_journal_segment_ertr::now();
+    return roll.safe_then(
+      [this, rsize, record=std::move(record)]() mutable {
+       auto ret = next_record_addr();
+       return write_record(rsize, std::move(record)
+       ).safe_then([this, ret] {
+         return ret.add_relative(make_relative_paddr(block_size));
+       });
+      });
+  }
+
+  /**
+   * Read deltas and pass to delta_handler
+   *
+   * record_block_start (argument to delta_handler) is the start of the
+   * of the first block in the record
+   */
+  using replay_ertr = SegmentManager::read_ertr;
+  using replay_ret = replay_ertr::future<>;
+  using delta_handler_t = std::function<
+    replay_ret(paddr_t record_start, const delta_info_t&)>;
+  replay_ret replay(delta_handler_t &&delta_handler);
+
+private:
+  const extent_len_t block_size;
+  const extent_len_t max_record_length;
+
+  JournalSegmentProvider *segment_provider = nullptr;
+  SegmentManager &segment_manager;
+
+  paddr_t current_replay_point;
+
+  segment_seq_t current_journal_segment_seq = 0;
+
+  SegmentRef current_journal_segment;
+  segment_off_t written_to = 0;
+
+  segment_id_t next_journal_segment_seq = NULL_SEG_ID;
+  journal_seq_t current_journal_seq = 0;
+
+  /// prepare segment for writes, writes out segment header
+  using initialize_segment_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error>;
+  initialize_segment_ertr::future<> initialize_segment(
+    Segment &segment);
+
+  struct record_size_t {
+    extent_len_t mdlength = 0;
+    extent_len_t dlength = 0;
+
+    record_size_t(
+      extent_len_t mdlength,
+      extent_len_t dlength)
+      : mdlength(mdlength), dlength(dlength) {}
+  };
+
+  /**
+   * Return <mdlength, dlength> pair denoting length of
+   * metadata and blocks respectively.
+   */
+  record_size_t get_encoded_record_length(
+    const record_t &record) const;
+
+  /// create encoded record bl
+  ceph::bufferlist encode_record(
+    record_size_t rsize,
+    record_t &&record);
+
+  /// do record write
+  using write_record_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error>;
+  write_record_ertr::future<> write_record(
+    record_size_t rsize,
+    record_t &&record);
+
+  /// close current segment and initialize next one
+  using roll_journal_segment_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error>;
+  roll_journal_segment_ertr::future<> roll_journal_segment();
+
+  /// returns true iff current segment has insufficient space
+  bool needs_roll(segment_off_t length) const;
+
+  /// returns next record addr
+  paddr_t next_record_addr() const;
+
+  /// return ordered vector of segments to replay
+  using find_replay_segments_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error
+    >;
+  using find_replay_segments_fut =
+    find_replay_segments_ertr::future<std::vector<paddr_t>>;
+  find_replay_segments_fut find_replay_segments();
+
+  /// read record metadata for record starting at start
+  using read_record_metadata_ertr = replay_ertr;
+  using read_record_metadata_ret = read_record_metadata_ertr::future<
+    std::optional<std::pair<record_header_t, bufferlist>>
+    >;
+  read_record_metadata_ret read_record_metadata(
+    paddr_t start);
+
+  /// attempts to decode deltas from bl, return nullopt if unsuccessful
+  std::optional<std::vector<delta_info_t>> try_decode_deltas(
+    record_header_t header,
+    bufferlist &bl);
+
+  /// replays records starting at start through end of segment
+  replay_ertr::future<>
+  replay_segment(
+    paddr_t start,                 ///< [in] starting addr
+    delta_handler_t &delta_handler ///< [in] processes deltas in order
+  );
+};
+
+}
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::segment_header_t)
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::record_header_t)
diff --git a/src/crimson/os/seastore/seastore_types.cc b/src/crimson/os/seastore/seastore_types.cc
new file mode 100644 (file)
index 0000000..af8516e
--- /dev/null
@@ -0,0 +1,74 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/os/seastore/seastore_types.h"
+
+namespace crimson::os::seastore {
+
+std::ostream &segment_to_stream(std::ostream &out, const segment_id_t &t)
+{
+  if (t == NULL_SEG_ID)
+    return out << "NULL_SEG";
+  else if (t == REL_SEG_ID)
+    return out << "REL_SEG";
+  else
+    return out << t;
+}
+
+std::ostream &offset_to_stream(std::ostream &out, const segment_off_t &t)
+{
+  if (t == NULL_SEG_OFF)
+    return out << "NULL_OFF";
+  else
+    return out << t;
+}
+
+std::ostream &operator<<(std::ostream &out, const paddr_t &rhs)
+{
+  out << "paddr_t<";
+  segment_to_stream(out, rhs.segment);
+  out << ", ";
+  offset_to_stream(out, rhs.offset);
+  return out << ">";
+}
+
+std::ostream &operator<<(std::ostream &out, extent_types_t t)
+{
+  switch (t) {
+  case extent_types_t::ROOT_LOCATION:
+    return out << "ROOT_LOCATION";
+  case extent_types_t::ROOT:
+    return out << "ROOT";
+  case extent_types_t::LADDR_INTERNAL:
+    return out << "LADDR_INTERNAL";
+  case extent_types_t::LADDR_LEAF:
+    return out << "LADDR_LEAF";
+  case extent_types_t::TEST_BLOCK:
+    return out << "TEST_BLOCK";
+  case extent_types_t::NONE:
+    return out << "NONE";
+  default:
+    return out << "UNKNOWN";
+  }
+}
+
+std::ostream &operator<<(std::ostream &out, const laddr_list_t &rhs)
+{
+  bool first = false;
+  for (auto &i: rhs) {
+    out << (first ? '[' : ',') << '(' << i.first << ',' << i.second << ')';
+    first = true;
+  }
+  return out << ']';
+}
+std::ostream &operator<<(std::ostream &out, const paddr_list_t &rhs)
+{
+  bool first = false;
+  for (auto &i: rhs) {
+    out << (first ? '[' : ',') << '(' << i.first << ',' << i.second << ')';
+    first = true;
+  }
+  return out << ']';
+}
+
+}
diff --git a/src/crimson/os/seastore/seastore_types.h b/src/crimson/os/seastore/seastore_types.h
new file mode 100644 (file)
index 0000000..ea94722
--- /dev/null
@@ -0,0 +1,184 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <limits>
+#include <iostream>
+
+#include "include/denc.h"
+#include "include/buffer.h"
+#include "include/cmp.h"
+
+namespace crimson::os::seastore {
+
+using checksum_t = uint32_t;
+
+// Identifies segment location on disk, see SegmentManager,
+using segment_id_t = uint32_t;
+constexpr segment_id_t NULL_SEG_ID =
+  std::numeric_limits<segment_id_t>::max() - 1;
+/* Used to denote relative paddr_t */
+constexpr segment_id_t REL_SEG_ID =
+  std::numeric_limits<segment_id_t>::max() - 2;
+
+std::ostream &segment_to_stream(std::ostream &, const segment_id_t &t);
+
+// Offset within a segment on disk, see SegmentManager
+// may be negative for relative offsets
+using segment_off_t = int32_t;
+constexpr segment_off_t NULL_SEG_OFF =
+  std::numeric_limits<segment_id_t>::max();
+
+std::ostream &offset_to_stream(std::ostream &, const segment_off_t &t);
+
+/* Monotonically increasing segment seq, uniquely identifies
+ * the incarnation of a segment */
+using segment_seq_t = uint32_t;
+static constexpr segment_seq_t NULL_SEG_SEQ =
+  std::numeric_limits<segment_seq_t>::max();
+
+// Offset of delta within a record
+using record_delta_idx_t = uint32_t;
+constexpr record_delta_idx_t NULL_DELTA_IDX =
+  std::numeric_limits<record_delta_idx_t>::max();
+
+// <segment, offset> offset on disk, see SegmentManager
+struct paddr_t {
+  segment_id_t segment = NULL_SEG_ID;
+  segment_off_t offset = NULL_SEG_OFF;
+
+  bool is_relative() const {
+    return segment == REL_SEG_ID;
+  }
+
+  paddr_t add_relative(paddr_t o) const {
+    assert(o.is_relative());
+    assert(!is_relative());
+    return paddr_t{segment, offset + o.offset};
+  }
+
+  paddr_t operator-(paddr_t rhs) const {
+    assert(rhs.is_relative() && is_relative());
+    return paddr_t{
+      REL_SEG_ID,
+      offset - rhs.offset
+    };
+  }
+
+  paddr_t maybe_relative_to(paddr_t base) const {
+    if (is_relative())
+      return base.add_relative(*this);
+    else
+      return *this;
+  }
+
+  DENC(paddr_t, v, p) {
+    DENC_START(1, 1, p);
+    denc(v.segment, p);
+    denc(v.offset, p);
+    DENC_FINISH(p);
+  }
+};
+WRITE_CMP_OPERATORS_2(paddr_t, segment, offset)
+WRITE_EQ_OPERATORS_2(paddr_t, segment, offset)
+constexpr paddr_t P_ADDR_NULL = paddr_t{};
+constexpr paddr_t make_relative_paddr(segment_off_t off) {
+  return paddr_t{REL_SEG_ID, off};
+}
+
+std::ostream &operator<<(std::ostream &out, const paddr_t &rhs);
+
+// logical addr, see LBAManager, TransactionManager
+using laddr_t = uint64_t;
+constexpr laddr_t L_ADDR_MIN = std::numeric_limits<laddr_t>::min();
+constexpr laddr_t L_ADDR_MAX = std::numeric_limits<laddr_t>::max();
+constexpr laddr_t L_ADDR_NULL = std::numeric_limits<laddr_t>::max();
+constexpr laddr_t L_ADDR_ROOT = std::numeric_limits<laddr_t>::max() - 1;
+constexpr laddr_t L_ADDR_LBAT = std::numeric_limits<laddr_t>::max() - 2;
+
+// logical offset, see LBAManager, TransactionManager
+using loff_t = uint32_t;
+constexpr loff_t L_OFF_NULL = std::numeric_limits<loff_t>::max();
+
+struct laddr_list_t : std::list<std::pair<laddr_t, loff_t>> {
+  template <typename... T>
+  laddr_list_t(T&&... args)
+    : std::list<std::pair<laddr_t, loff_t>>(std::forward<T>(args)...) {}
+};
+struct paddr_list_t : std::list<std::pair<paddr_t, loff_t>> {
+  template <typename... T>
+  paddr_list_t(T&&... args)
+    : std::list<std::pair<paddr_t, loff_t>>(std::forward<T>(args)...) {}
+};
+
+std::ostream &operator<<(std::ostream &out, const laddr_list_t &rhs);
+std::ostream &operator<<(std::ostream &out, const paddr_list_t &rhs);
+
+/* identifies type of extent, used for interpretting deltas, managing
+ * writeback */
+enum class extent_types_t : uint8_t {
+  ROOT_LOCATION = 0, // delta only
+  ROOT = 1,
+  LADDR_INTERNAL = 2,
+  LADDR_LEAF = 3,
+  LBA_BLOCK = 4,
+
+  // Test Block Types
+  TEST_BLOCK = 0xF0,
+
+  // None
+  NONE = 0xFF
+};
+
+std::ostream &operator<<(std::ostream &out, extent_types_t t);
+
+/* description of a new physical extent */
+struct extent_t {
+  ceph::bufferlist bl;  ///< payload, bl.length() == length, aligned
+};
+
+using extent_version_t = uint32_t;
+constexpr extent_version_t EXTENT_VERSION_NULL = 0;
+
+/* description of a mutation to a physical extent */
+struct delta_info_t {
+  extent_types_t type = extent_types_t::NONE;  ///< delta type
+  paddr_t paddr;                               ///< physical address
+  /* logical address -- needed for repopulating cache -- TODO don't actually need */
+  // laddr_t laddr = L_ADDR_NULL;
+  segment_off_t length = NULL_SEG_OFF;         ///< extent length
+  extent_version_t pversion;                   ///< prior version
+  ceph::bufferlist bl;                         ///< payload
+
+  DENC(delta_info_t, v, p) {
+    DENC_START(1, 1, p);
+    denc(v.type, p);
+    denc(v.paddr, p);
+    //denc(v.laddr, p);
+    denc(v.length, p);
+    denc(v.pversion, p);
+    denc(v.bl, p);
+    DENC_FINISH(p);
+  }
+
+  bool operator==(const delta_info_t &rhs) const {
+    return (
+      type == rhs.type &&
+      paddr == rhs.paddr &&
+      length == rhs.length &&
+      pversion == rhs.pversion &&
+      bl == rhs.bl
+    );
+  }
+};
+
+struct record_t {
+  std::vector<extent_t> extents;
+  std::vector<delta_info_t> deltas;
+};
+
+}
+
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::paddr_t)
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::delta_info_t)
diff --git a/src/crimson/os/seastore/segment_manager.cc b/src/crimson/os/seastore/segment_manager.cc
new file mode 100644 (file)
index 0000000..b20bc69
--- /dev/null
@@ -0,0 +1,19 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <iostream>
+
+#include "crimson/os/seastore/segment_manager/ephemeral.h"
+
+namespace crimson::os::seastore::segment_manager {
+
+SegmentManagerRef create_ephemeral(ephemeral_config_t config) {
+  return SegmentManagerRef{new EphemeralSegmentManager(config)};
+}
+
+std::ostream &operator<<(std::ostream &lhs, const ephemeral_config_t &c) {
+  return lhs << "ephemeral_config_t(size=" << c.size << ", block_size=" << c.block_size
+            << ", segment_size=" << c.segment_size << ")";
+}
+
+}
diff --git a/src/crimson/os/seastore/segment_manager.h b/src/crimson/os/seastore/segment_manager.h
new file mode 100644 (file)
index 0000000..6304586
--- /dev/null
@@ -0,0 +1,145 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iosfwd>
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <seastar/core/future.hh>
+
+#include "include/ceph_assert.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "include/buffer_fwd.h"
+#include "crimson/osd/exceptions.h"
+
+namespace crimson::os::seastore {
+
+class Segment : public boost::intrusive_ref_counter<
+  Segment,
+  boost::thread_unsafe_counter>{
+public:
+
+  /**
+   * get_segment_id
+   */
+  virtual segment_id_t get_segment_id() const = 0;
+
+  /**
+   * min next write location
+   */
+  virtual segment_off_t get_write_ptr() const = 0;
+
+  /**
+   * max capacity
+   */
+  virtual segment_off_t get_write_capacity() const = 0;
+
+  /**
+   * close
+   *
+   * Closes segment for writes.  Won't complete until
+   * outstanding writes to this segment are complete.
+   */
+  using close_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error,
+    crimson::ct_error::invarg,
+    crimson::ct_error::enoent>;
+  virtual close_ertr::future<> close() = 0;
+
+
+  /**
+   * write
+   *
+   * @param offset offset of write, must be aligned to <> and >= write pointer, advances
+   *               write pointer
+   * @param bl     buffer to write, will be padded if not aligned
+  */
+  using write_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error, // media error or corruption
+    crimson::ct_error::invarg,             // if offset is < write pointer or misaligned
+    crimson::ct_error::ebadf,              // segment closed
+    crimson::ct_error::enospc              // write exceeds segment size
+    >;
+  virtual write_ertr::future<> write(
+    segment_off_t offset, ceph::bufferlist bl) = 0;
+
+  virtual ~Segment() {}
+};
+using SegmentRef = boost::intrusive_ptr<Segment>;
+
+constexpr size_t PADDR_SIZE = sizeof(paddr_t);
+
+class SegmentManager {
+public:
+  using init_ertr = crimson::errorator<
+    crimson::ct_error::enospc,
+    crimson::ct_error::invarg,
+    crimson::ct_error::erange>;
+  virtual init_ertr::future<> init() = 0;
+
+  using open_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error,
+    crimson::ct_error::invarg,
+    crimson::ct_error::enoent>;
+  virtual open_ertr::future<SegmentRef> open(segment_id_t id) = 0;
+
+  using release_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error,
+    crimson::ct_error::invarg,
+    crimson::ct_error::enoent>;
+  virtual release_ertr::future<> release(segment_id_t id) = 0;
+
+  using read_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error,
+    crimson::ct_error::invarg,
+    crimson::ct_error::enoent,
+    crimson::ct_error::erange>;
+  virtual read_ertr::future<> read(
+    paddr_t addr,
+    size_t len,
+    ceph::bufferptr &out) = 0;
+  read_ertr::future<ceph::bufferptr> read(
+    paddr_t addr,
+    size_t len) {
+    auto ptrref = std::make_unique<ceph::bufferptr>(len);
+    return read(addr, len, *ptrref).safe_then(
+      [ptrref=std::move(ptrref)]() mutable {
+       return read_ertr::make_ready_future<bufferptr>(std::move(*ptrref));
+      });
+  }
+
+  /* Methods for discovering device geometry, segmentid set, etc */
+  virtual size_t get_size() const = 0;
+  virtual segment_off_t get_block_size() const = 0;
+  virtual segment_off_t get_segment_size() const = 0;
+  virtual segment_id_t get_num_segments() const {
+    ceph_assert(get_size() % get_segment_size() == 0);
+    return ((segment_id_t)(get_size() / get_segment_size()));
+  }
+
+
+  virtual ~SegmentManager() {}
+};
+using SegmentManagerRef = std::unique_ptr<SegmentManager>;
+
+namespace segment_manager {
+
+struct ephemeral_config_t {
+  size_t size;
+  size_t block_size;
+  size_t segment_size;
+};
+constexpr ephemeral_config_t DEFAULT_TEST_EPHEMERAL = {
+  1 << 30,
+  4 << 10,
+  32 << 20
+};
+
+std::ostream &operator<<(std::ostream &, const ephemeral_config_t &);
+SegmentManagerRef create_ephemeral(ephemeral_config_t config);
+
+}
+
+}
diff --git a/src/crimson/os/seastore/segment_manager/ephemeral.cc b/src/crimson/os/seastore/segment_manager/ephemeral.cc
new file mode 100644 (file)
index 0000000..c2b32eb
--- /dev/null
@@ -0,0 +1,172 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <sys/mman.h>
+#include <string.h>
+
+#include "crimson/common/log.h"
+
+#include "include/buffer.h"
+#include "crimson/os/seastore/segment_manager/ephemeral.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_filestore);
+  }
+}
+
+namespace crimson::os::seastore::segment_manager {
+
+EphemeralSegment::EphemeralSegment(
+  EphemeralSegmentManager &manager, segment_id_t id)
+  : manager(manager), id(id) {}
+
+segment_off_t EphemeralSegment::get_write_capacity() const
+{
+  return manager.get_segment_size();
+}
+
+Segment::close_ertr::future<> EphemeralSegment::close()
+{
+  manager.segment_close(id);
+  return close_ertr::now();
+}
+
+Segment::write_ertr::future<> EphemeralSegment::write(
+  segment_off_t offset, ceph::bufferlist bl)
+{
+  if (offset < write_pointer || offset % manager.config.block_size != 0)
+    return crimson::ct_error::invarg::make();
+
+  if (offset + bl.length() >= manager.config.segment_size)
+    return crimson::ct_error::enospc::make();
+
+  return manager.segment_write({id, offset}, bl);
+}
+
+EphemeralSegmentManager::EphemeralSegmentManager(ephemeral_config_t config)
+  : config(config) {}
+
+Segment::close_ertr::future<> EphemeralSegmentManager::segment_close(segment_id_t id)
+{
+  if (segment_state[id] != segment_state_t::OPEN)
+    return crimson::ct_error::invarg::make();
+
+  segment_state[id] = segment_state_t::CLOSED;
+  return Segment::close_ertr::now();
+}
+
+Segment::write_ertr::future<> EphemeralSegmentManager::segment_write(
+  paddr_t addr,
+  ceph::bufferlist bl,
+  bool ignore_check)
+{
+  logger().debug(
+    "segment_write to segment {} at offset {}, physical offset {}, len {}, crc {}",
+    addr.segment,
+    addr.offset,
+    get_offset(addr),
+    bl.length(),
+    bl.crc32c(0));
+  if (!ignore_check && segment_state[addr.segment] != segment_state_t::OPEN)
+    return crimson::ct_error::invarg::make();
+
+  bl.begin().copy(bl.length(), buffer + get_offset(addr));
+  return Segment::write_ertr::now();
+}
+
+EphemeralSegmentManager::init_ertr::future<> EphemeralSegmentManager::init()
+{
+  logger().debug(
+    "Initing ephemeral segment manager with config {}",
+    config);
+
+  if (config.block_size % (4<<10) != 0) {
+    return crimson::ct_error::invarg::make();
+  }
+  if (config.segment_size % config.block_size != 0) {
+    return crimson::ct_error::invarg::make();
+  }
+  if (config.size % config.segment_size != 0) {
+    return crimson::ct_error::invarg::make();
+  }
+
+  auto addr = ::mmap(
+    nullptr,
+    config.size,
+    PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
+    -1,
+    0);
+
+  segment_state.resize(config.size / config.segment_size, segment_state_t::EMPTY);
+
+  if (addr == MAP_FAILED)
+    return crimson::ct_error::enospc::make();
+
+  buffer = (char*)addr;
+
+  ::memset(buffer, 0, config.size);
+  return init_ertr::now();
+}
+
+EphemeralSegmentManager::~EphemeralSegmentManager()
+{
+  if (buffer) {
+    ::munmap(buffer, config.size);
+  }
+}
+
+SegmentManager::open_ertr::future<SegmentRef> EphemeralSegmentManager::open(
+  segment_id_t id)
+{
+  if (id >= get_num_segments())
+    return crimson::ct_error::invarg::make();
+
+  if (segment_state[id] != segment_state_t::EMPTY)
+    return crimson::ct_error::invarg::make();
+
+  segment_state[id] = segment_state_t::OPEN;
+  return open_ertr::make_ready_future<SegmentRef>(new EphemeralSegment(*this, id));
+}
+
+SegmentManager::release_ertr::future<> EphemeralSegmentManager::release(
+  segment_id_t id)
+{
+  if (id >= get_num_segments())
+    return crimson::ct_error::invarg::make();
+
+  if (segment_state[id] != segment_state_t::CLOSED)
+    return crimson::ct_error::invarg::make();
+
+  ::memset(buffer + get_offset({id, 0}), 0, config.segment_size);
+  segment_state[id] = segment_state_t::EMPTY;
+  return release_ertr::now();
+}
+
+SegmentManager::read_ertr::future<> EphemeralSegmentManager::read(
+  paddr_t addr,
+  size_t len,
+  ceph::bufferptr &out)
+{
+  if (addr.segment >= get_num_segments())
+    return crimson::ct_error::invarg::make();
+
+  if (addr.offset + len >= config.segment_size)
+    return crimson::ct_error::invarg::make();
+
+  out.copy_in(0, len, buffer + get_offset(addr));
+
+  bufferlist bl;
+  bl.push_back(out);
+  logger().debug(
+    "segment_read to segment {} at offset {}, physical offset {}, length {}, crc {}",
+    addr.segment,
+    addr.offset,
+    get_offset(addr),
+    len,
+    bl.begin().crc32c(config.block_size, 0));
+
+  return read_ertr::now();
+}
+
+}
diff --git a/src/crimson/os/seastore/segment_manager/ephemeral.h b/src/crimson/os/seastore/segment_manager/ephemeral.h
new file mode 100644 (file)
index 0000000..60b78d6
--- /dev/null
@@ -0,0 +1,86 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <seastar/core/future.hh>
+
+#include "crimson/os/seastore/segment_manager.h"
+
+#include "crimson/os/seastore/segment_manager/ephemeral.h"
+
+namespace crimson::os::seastore::segment_manager {
+
+class EphemeralSegmentManager;
+class EphemeralSegment final : public Segment {
+  friend class EphemeralSegmentManager;
+  EphemeralSegmentManager &manager;
+  const segment_id_t id;
+  segment_off_t write_pointer = 0;
+public:
+  EphemeralSegment(EphemeralSegmentManager &manager, segment_id_t id);
+
+  segment_id_t get_segment_id() const final { return id; }
+  segment_off_t get_write_capacity() const final;
+  segment_off_t get_write_ptr() const final { return write_pointer; }
+  close_ertr::future<> close() final;
+  write_ertr::future<> write(segment_off_t offset, ceph::bufferlist bl) final;
+
+  ~EphemeralSegment() {}
+};
+
+class EphemeralSegmentManager final : public SegmentManager {
+  friend class EphemeralSegment;
+
+  const ephemeral_config_t config;
+
+  size_t get_offset(paddr_t addr) {
+    return (addr.segment * config.segment_size) + addr.offset;
+  }
+
+  enum class segment_state_t {
+    EMPTY,
+    OPEN,
+    CLOSED
+  };
+  std::vector<segment_state_t> segment_state;
+
+  char *buffer = nullptr;
+
+  Segment::close_ertr::future<> segment_close(segment_id_t id);
+
+public:
+  EphemeralSegmentManager(ephemeral_config_t config);
+  ~EphemeralSegmentManager();
+
+  init_ertr::future<> init() final;
+
+  open_ertr::future<SegmentRef> open(segment_id_t id) final;
+
+  release_ertr::future<> release(segment_id_t id) final;
+
+  read_ertr::future<> read(
+    paddr_t addr,
+    size_t len,
+    ceph::bufferptr &out) final;
+
+  size_t get_size() const final {
+    return config.size;
+  }
+  segment_off_t get_block_size() const {
+    return config.block_size;
+  }
+  segment_off_t get_segment_size() const {
+    return config.segment_size;
+  }
+
+  // public so tests can bypass segment interface when simpler
+  Segment::write_ertr::future<> segment_write(
+    paddr_t addr,
+    ceph::bufferlist bl,
+    bool ignore_check=false);
+};
+
+}
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..1e447a314200db00ef34fcca291c782e944c0dab 100644 (file)
@@ -0,0 +1,9 @@
+
+add_executable(unittest_seastore_journal
+  test_seastore_journal.cc
+  ../gtest_seastar.cc)
+add_ceph_unittest(unittest_seastore_journal)
+target_link_libraries(
+  unittest_seastore_journal
+  ${CMAKE_DL_LIBS}
+  crimson-seastore)
diff --git a/src/test/crimson/seastore/test_seastore_journal.cc b/src/test/crimson/seastore/test_seastore_journal.cc
new file mode 100644 (file)
index 0000000..309d5a2
--- /dev/null
@@ -0,0 +1,260 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/crimson/gtest_seastar.h"
+
+#include <random>
+
+#include "crimson/common/log.h"
+#include "crimson/os/seastore/journal.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+using namespace crimson;
+using namespace crimson::os;
+using namespace crimson::os::seastore;
+
+namespace {
+  [[maybe_unused]] seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_test);
+  }
+}
+
+struct record_validator_t {
+  record_t record;
+  paddr_t record_final_offset;
+
+  template <typename... T>
+  record_validator_t(T&&... record) : record(std::forward<T>(record)...) {}
+
+  void validate(SegmentManager &manager) {
+    paddr_t addr = make_relative_paddr(0);
+    for (auto &&block : record.extents) {
+      auto test = manager.read(
+       record_final_offset.add_relative(addr),
+       block.bl.length()).unsafe_get0();
+      addr.offset += block.bl.length();
+      bufferlist bl;
+      bl.push_back(test);
+      ASSERT_EQ(
+       bl.length(),
+       block.bl.length());
+      ASSERT_EQ(
+       bl.begin().crc32c(bl.length(), 1),
+       block.bl.begin().crc32c(block.bl.length(), 1));
+    }
+  }
+
+  auto get_replay_handler() {
+    auto checker = [this, iter=record.deltas.begin()] (
+      paddr_t base,
+      const delta_info_t &di) mutable {
+      EXPECT_EQ(base, record_final_offset);
+      ceph_assert(iter != record.deltas.end());
+      EXPECT_EQ(di, *iter++);
+      EXPECT_EQ(base, record_final_offset);
+      return iter != record.deltas.end();
+    };
+    if (record.deltas.size()) {
+      return std::make_optional(std::move(checker));
+    } else {
+      return std::optional<decltype(checker)>();
+    }
+  }
+};
+
+struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider {
+  std::unique_ptr<SegmentManager> segment_manager;
+  std::unique_ptr<Journal> journal;
+
+  std::vector<record_validator_t> records;
+
+  std::default_random_engine generator;
+
+  const segment_off_t block_size;
+
+  journal_test_t()
+    : segment_manager(create_ephemeral(segment_manager::DEFAULT_TEST_EPHEMERAL)),
+      block_size(segment_manager->get_block_size())
+  {
+  }
+
+  segment_id_t next = 0;
+  get_segment_ret get_segment() final {
+    return get_segment_ret(
+      get_segment_ertr::ready_future_marker{},
+      next++);
+  }
+
+  void put_segment(segment_id_t segment) final {
+    return;
+  }
+
+  seastar::future<> set_up_fut() final {
+    journal.reset(new Journal(*segment_manager));
+    journal->set_segment_provider(this);
+    return segment_manager->init(
+    ).safe_then([this] {
+      return journal->open_for_write();
+    }).handle_error(
+      crimson::ct_error::all_same_way([] {
+       ASSERT_FALSE("Unable to mount");
+      }));
+  }
+
+  template <typename T>
+  auto replay(T &&f) {
+    return journal->close(
+    ).safe_then([this, f=std::move(f)]() mutable {
+      journal.reset(new Journal(*segment_manager));
+      journal->set_segment_provider(this);
+      return journal->replay(std::forward<T>(std::move(f)));
+    }).safe_then([this] {
+      return journal->open_for_write();
+    });
+  }
+
+  auto replay_and_check() {
+    auto record_iter = records.begin();
+    decltype(record_iter->get_replay_handler()) delta_checker = std::nullopt;
+    auto advance = [this, &record_iter, &delta_checker] {
+      ceph_assert(!delta_checker);
+      while (record_iter != records.end()) {
+       auto checker = record_iter->get_replay_handler();
+       record_iter++;
+       if (checker) {
+         delta_checker.emplace(std::move(*checker));
+         break;
+       }
+      }
+    };
+    advance();
+    replay(
+      [this,
+       &advance,
+       &record_iter,
+       &delta_checker]
+      (auto base, const auto &di) mutable {
+       if (!delta_checker) {
+         EXPECT_FALSE("No Deltas Left");
+       }
+       if (!(*delta_checker)(base, di)) {
+         delta_checker = std::nullopt;
+         advance();
+       }
+       return Journal::replay_ertr::now();
+      }).unsafe_get0();
+    ASSERT_EQ(record_iter, records.end());
+    for (auto &i : records) {
+      i.validate(*segment_manager);
+    }
+  }
+
+  template <typename... T>
+  auto submit_record(T&&... _record) {
+    auto record{std::forward<T>(_record)...};
+    records.push_back(record);
+    auto addr = journal->submit_record(std::move(record)).unsafe_get0();
+    records.back().record_final_offset = addr;
+    return addr;
+  }
+
+  seastar::future<> tear_down_fut() final {
+    return seastar::now();
+  }
+
+  extent_t generate_extent(size_t blocks) {
+    std::uniform_int_distribution<char> distribution(
+      std::numeric_limits<char>::min(),
+      std::numeric_limits<char>::max()
+    );
+    char contents = distribution(generator);
+    bufferlist bl;
+    bl.append(buffer::ptr(buffer::create(blocks * block_size, contents)));
+    return extent_t{bl};
+  }
+
+  delta_info_t generate_delta(size_t bytes) {
+    std::uniform_int_distribution<char> distribution(
+      std::numeric_limits<char>::min(),
+      std::numeric_limits<char>::max()
+    );
+    char contents = distribution(generator);
+    bufferlist bl;
+    bl.append(buffer::ptr(buffer::create(bytes, contents)));
+    return delta_info_t{
+      extent_types_t::TEST_BLOCK,
+      paddr_t{},
+      block_size,
+      1,
+      bl
+    };
+  }
+};
+
+TEST_F(journal_test_t, replay_one_journal_segment)
+{
+ run_async([this] {
+   submit_record(record_t{
+     { generate_extent(1), generate_extent(2) },
+     { generate_delta(23), generate_delta(30) }
+     });
+   replay_and_check();
+ });
+}
+
+TEST_F(journal_test_t, replay_two_records)
+{
+ run_async([this] {
+   submit_record(record_t{
+     { generate_extent(1), generate_extent(2) },
+     { generate_delta(23), generate_delta(30) }
+     });
+   submit_record(record_t{
+     { generate_extent(4), generate_extent(1) },
+     { generate_delta(23), generate_delta(400) }
+     });
+   replay_and_check();
+ });
+}
+
+TEST_F(journal_test_t, replay_twice)
+{
+ run_async([this] {
+   submit_record(record_t{
+     { generate_extent(1), generate_extent(2) },
+     { generate_delta(23), generate_delta(30) }
+     });
+   submit_record(record_t{
+     { generate_extent(4), generate_extent(1) },
+     { generate_delta(23), generate_delta(400) }
+     });
+   replay_and_check();
+   submit_record(record_t{
+     { generate_extent(2), generate_extent(5) },
+     { generate_delta(230), generate_delta(40) }
+     });
+   replay_and_check();
+ });
+}
+
+TEST_F(journal_test_t, roll_journal_and_replay)
+{
+ run_async([this] {
+   paddr_t current = submit_record(
+     record_t{
+       { generate_extent(1), generate_extent(2) },
+       { generate_delta(23), generate_delta(30) }
+     });
+   auto starting_segment = current.segment;
+   unsigned so_far = 0;
+   while (current.segment == starting_segment) {
+     current = submit_record(record_t{
+        { generate_extent(512), generate_extent(512) },
+        { generate_delta(23), generate_delta(400) }
+       });
+     ++so_far;
+     ASSERT_FALSE(so_far > 10);
+   }
+   replay_and_check();
+ });
+}