+add_library(crimson-seastore
+ seastore_types.cc
+ segment_manager/ephemeral.cc
+ segment_manager.cc
+ journal.cc
+ )
+target_link_libraries(crimson-seastore
+ crimson)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/iterator/counting_iterator.hpp>
+
+#include "crimson/os/seastore/journal.h"
+
+#include "include/intarith.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_filestore);
+ }
+}
+
+namespace crimson::os::seastore {
+
+Journal::Journal(SegmentManager &segment_manager)
+ : block_size(segment_manager.get_block_size()),
+ max_record_length(
+ segment_manager.get_segment_size() -
+ p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
+ size_t(block_size))),
+ segment_manager(segment_manager) {}
+
+
+Journal::initialize_segment_ertr::future<> Journal::initialize_segment(
+ Segment &segment)
+{
+ logger().debug("initialize_segment {}", segment.get_segment_id());
+ // write out header
+ ceph_assert(segment.get_write_ptr() == 0);
+ bufferlist bl;
+ auto header = segment_header_t{
+ current_journal_segment_seq++,
+ segment.get_segment_id(),
+ current_replay_point};
+ ::encode(header, bl);
+
+ written_to = segment_manager.get_block_size();
+ return segment.write(0, bl).handle_error(
+ init_ertr::pass_further{},
+ crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); }));
+}
+
+ceph::bufferlist Journal::encode_record(
+ record_size_t rsize,
+ record_t &&record)
+{
+ bufferlist metadatabl;
+ record_header_t header{
+ rsize.mdlength,
+ rsize.dlength,
+ current_journal_seq,
+ 0 /* checksum, TODO */,
+ record.deltas.size(),
+ record.extents.size()
+ };
+ ::encode(header, metadatabl);
+ for (const auto &i: record.deltas) {
+ ::encode(i, metadatabl);
+ }
+ bufferlist databl;
+ for (auto &i: record.extents) {
+ databl.claim_append(i.bl);
+ }
+ if (metadatabl.length() % block_size != 0) {
+ metadatabl.append(
+ ceph::bufferptr(
+ block_size - (metadatabl.length() % block_size)));
+ }
+
+ ceph_assert(metadatabl.length() == rsize.mdlength);
+ ceph_assert(databl.length() == rsize.dlength);
+ metadatabl.claim_append(databl);
+ ceph_assert(metadatabl.length() == (rsize.mdlength + rsize.dlength));
+ return metadatabl;
+}
+
+Journal::write_record_ertr::future<> Journal::write_record(
+ record_size_t rsize,
+ record_t &&record)
+{
+ ceph::bufferlist to_write = encode_record(
+ rsize, std::move(record));
+ auto target = written_to;
+ written_to += p2roundup(to_write.length(), (unsigned)block_size);
+ logger().debug(
+ "write_record, mdlength {}, dlength {}",
+ rsize.mdlength,
+ rsize.dlength);
+ return current_journal_segment->write(target, to_write).handle_error(
+ write_record_ertr::pass_further{},
+ crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); }));
+}
+
+Journal::record_size_t Journal::get_encoded_record_length(
+ const record_t &record) const {
+ extent_len_t metadata =
+ (extent_len_t)ceph::encoded_sizeof_bounded<record_header_t>();
+ extent_len_t data = 0;
+ for (const auto &i: record.deltas) {
+ metadata += ceph::encoded_sizeof(i);
+ }
+ for (const auto &i: record.extents) {
+ data += i.bl.length();
+ }
+ metadata = p2roundup(metadata, block_size);
+ return record_size_t{metadata, data};
+}
+
+bool Journal::needs_roll(segment_off_t length) const
+{
+ return length + written_to >
+ current_journal_segment->get_write_capacity();
+}
+
+paddr_t Journal::next_record_addr() const
+{
+ paddr_t ret{current_journal_segment->get_segment_id(), written_to};
+ logger().debug("next_record_addr: {}", ret);
+ return ret;
+}
+
+Journal::roll_journal_segment_ertr::future<>
+Journal::roll_journal_segment()
+{
+ auto old_segment_id = current_journal_segment ?
+ current_journal_segment->get_segment_id() :
+ NULL_SEG_ID;
+
+ return (current_journal_segment ?
+ current_journal_segment->close() :
+ Segment::close_ertr::now()).safe_then(
+ [this, old_segment_id] {
+ // TODO: pretty sure this needs to be atomic in some sense with
+ // making use of the new segment, maybe this bit needs to take
+ // the first transaction of the new segment? Or the segment
+ // header should include deltas?
+ if (old_segment_id != NULL_SEG_ID) {
+ segment_provider->put_segment(old_segment_id);
+ }
+ return segment_provider->get_segment();
+ }).safe_then([this](auto segment) {
+ return segment_manager.open(segment);
+ }).safe_then([this](auto sref) {
+ current_journal_segment = sref;
+ written_to = 0;
+ return initialize_segment(*current_journal_segment);
+ }).handle_error(
+ roll_journal_segment_ertr::pass_further{},
+ crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); })
+ );
+}
+
+Journal::init_ertr::future<> Journal::open_for_write()
+{
+ return roll_journal_segment();
+}
+
+Journal::find_replay_segments_fut Journal::find_replay_segments()
+{
+ return seastar::do_with(
+ std::vector<std::pair<segment_id_t, segment_header_t>>(),
+ [this](auto &&segments) mutable {
+ return crimson::do_for_each(
+ boost::make_counting_iterator(segment_id_t{0}),
+ boost::make_counting_iterator(segment_manager.get_num_segments()),
+ [this, &segments](auto i) {
+ return segment_manager.read(paddr_t{i, 0}, block_size
+ ).safe_then([this, &segments, i](bufferptr bptr) mutable {
+ logger().debug("segment {} bptr size {}", i, bptr.length());
+ segment_header_t header;
+ bufferlist bl;
+ bl.push_back(bptr);
+
+ logger().debug(
+ "find_replay_segments: segment {} block crc {}",
+ i,
+ bl.begin().crc32c(block_size, 0));
+
+ auto bp = bl.cbegin();
+ try {
+ ::decode(header, bp);
+ } catch (ceph::buffer::error &e) {
+ logger().debug(
+ "find_replay_segments: segment {} unable to decode "
+ "header, skipping",
+ i);
+ return find_replay_segments_ertr::now();
+ }
+ segments.emplace_back(i, std::move(header));
+ return find_replay_segments_ertr::now();
+ }).handle_error(
+ find_replay_segments_ertr::pass_further{},
+ crimson::ct_error::discard_all{}
+ );
+ }).safe_then([this, &segments]() mutable -> find_replay_segments_fut {
+ logger().debug(
+ "find_replay_segments: have {} segments",
+ segments.size());
+ if (segments.empty()) {
+ return crimson::ct_error::input_output_error::make();
+ }
+ std::sort(
+ segments.begin(),
+ segments.end(),
+ [](const auto <, const auto &rt) {
+ return lt.second.journal_segment_seq <
+ rt.second.journal_segment_seq;
+ });
+
+ auto replay_from = segments.rbegin()->second.journal_replay_lb;
+ auto from = segments.begin();
+ if (replay_from != P_ADDR_NULL) {
+ from = std::find_if(
+ segments.begin(),
+ segments.end(),
+ [&replay_from](const auto &seg) -> bool {
+ return seg.first == replay_from.segment;
+ });
+ } else {
+ replay_from = paddr_t{from->first, (segment_off_t)block_size};
+ }
+ auto ret = std::vector<paddr_t>(segments.end() - from);
+ std::transform(
+ from, segments.end(), ret.begin(),
+ [this](const auto &p) {
+ return paddr_t{p.first, (segment_off_t)block_size};
+ });
+ ret[0] = replay_from;
+ return find_replay_segments_fut(
+ find_replay_segments_ertr::ready_future_marker{},
+ std::move(ret));
+ });
+ });
+}
+
+Journal::read_record_metadata_ret Journal::read_record_metadata(
+ paddr_t start)
+{
+ return segment_manager.read(start, block_size
+ ).safe_then(
+ [this, start](bufferptr bptr) mutable
+ -> read_record_metadata_ret {
+ logger().debug("read_record_metadata: reading {}", start);
+ bufferlist bl;
+ bl.append(bptr);
+ auto bp = bl.cbegin();
+ record_header_t header;
+ try {
+ ::decode(header, bp);
+ } catch (ceph::buffer::error &e) {
+ return read_record_metadata_ret(
+ read_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ if (header.mdlength > block_size) {
+ return segment_manager.read(
+ {start.segment, start.offset + (segment_off_t)block_size},
+ header.mdlength - block_size).safe_then(
+ [this, header=std::move(header), bl=std::move(bl)](
+ auto &&bptail) mutable {
+ bl.push_back(bptail);
+ return read_record_metadata_ret(
+ read_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl)));
+ });
+ } else {
+ return read_record_metadata_ret(
+ read_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl))
+ );
+ }
+ });
+}
+
+std::optional<std::vector<delta_info_t>> Journal::try_decode_deltas(
+ record_header_t header,
+ bufferlist &bl)
+{
+ auto bliter = bl.cbegin();
+ bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+ std::vector<delta_info_t> deltas(header.deltas);
+ for (auto &&i : deltas) {
+ try {
+ ::decode(i, bliter);
+ } catch (ceph::buffer::error &e) {
+ return std::nullopt;
+ }
+ }
+ return deltas;
+}
+
+Journal::replay_ertr::future<>
+Journal::replay_segment(
+ paddr_t start,
+ delta_handler_t &delta_handler)
+{
+ logger().debug("replay_segment: starting at {}", start);
+ return seastar::do_with(
+ std::move(start),
+ [this, &delta_handler](auto ¤t) {
+ return crimson::do_until(
+ [this, ¤t, &delta_handler]() -> replay_ertr::future<bool> {
+ return read_record_metadata(current).safe_then
+ ([this, ¤t, &delta_handler](auto p)
+ -> replay_ertr::future<bool> {
+ if (!p.has_value()) {
+ return replay_ertr::make_ready_future<bool>(true);
+ }
+
+ auto &[header, bl] = *p;
+
+ logger().debug(
+ "replay_segment: next record offset {} mdlength {} dlength {}",
+ current,
+ header.mdlength,
+ header.dlength);
+
+ auto record_start = current;
+ current.offset += header.mdlength + header.dlength;
+
+ auto deltas = try_decode_deltas(
+ header,
+ bl);
+ if (!deltas) {
+ return replay_ertr::make_ready_future<bool>(true);
+ }
+
+ return seastar::do_with(
+ std::move(*deltas),
+ [this, &delta_handler, record_start](auto &deltas) {
+ return crimson::do_for_each(
+ deltas,
+ [this, &delta_handler, record_start](auto &info) {
+ return delta_handler(
+ record_start.add_relative(
+ make_relative_paddr(block_size)),
+ info);
+ });
+ }).safe_then([] {
+ return replay_ertr::make_ready_future<bool>(false);
+ });
+ });
+ });
+ });
+}
+
+Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
+{
+ return seastar::do_with(
+ std::make_pair(std::move(delta_handler), std::vector<paddr_t>()),
+ [this](auto &&item) mutable -> replay_ret {
+ auto &[handler, segments] = item;
+ return find_replay_segments(
+ ).safe_then([this, &handler, &segments](auto osegments) {
+ logger().debug("replay: found {} segments", segments.size());
+ segments.swap(osegments);
+ return crimson::do_for_each(
+ segments,
+ [this, &handler](auto i) {
+ return replay_segment(i, handler);
+ });
+ });
+ });
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/common/log.h"
+
+#include <boost/intrusive_ptr.hpp>
+
+#include <seastar/core/future.hh>
+
+#include "include/ceph_assert.h"
+#include "include/buffer.h"
+#include "include/denc.h"
+
+#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/osd/exceptions.h"
+
+namespace crimson::os::seastore {
+
+using journal_seq_t = uint64_t;
+static constexpr journal_seq_t NO_DELTAS =
+ std::numeric_limits<journal_seq_t>::max();
+
+/**
+ * Segment header
+ *
+ * Every segment contains and encode segment_header_t in the first block.
+ * Our strategy for finding the journal replay point is:
+ * 1) Find the segment with the highest journal_segment_seq
+ * 2) Scan forward from committed_journal_lb to find the most recent
+ * journal_commit_lb record
+ * 3) Replay starting at the most recent found journal_commit_lb record
+ */
+struct segment_header_t {
+ segment_seq_t journal_segment_seq;
+ segment_id_t physical_segment_id; // debugging
+
+ paddr_t journal_replay_lb;
+
+ DENC(segment_header_t, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.journal_segment_seq, p);
+ denc(v.physical_segment_id, p);
+ denc(v.journal_replay_lb, p);
+ DENC_FINISH(p);
+ }
+};
+
+struct record_header_t {
+ // Fixed portion
+ extent_len_t mdlength; // block aligned, length of metadata
+ extent_len_t dlength; // block aligned, length of data
+ journal_seq_t seq; // current journal seqid
+ checksum_t full_checksum; // checksum for full record (TODO)
+ size_t deltas; // number of deltas
+ size_t extents; // number of extents
+
+ DENC(record_header_t, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.mdlength, p);
+ denc(v.dlength, p);
+ denc(v.seq, p);
+ denc(v.full_checksum, p);
+ denc(v.deltas, p);
+ denc(v.extents, p);
+ DENC_FINISH(p);
+ }
+};
+
+/**
+ * Callback interface for managing available segments
+ */
+class JournalSegmentProvider {
+public:
+ using get_segment_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+ using get_segment_ret = get_segment_ertr::future<segment_id_t>;
+ virtual get_segment_ret get_segment() = 0;
+
+ /* TODO: we'll want to use this to propogate information about segment contents */
+ virtual void put_segment(segment_id_t segment) = 0;
+
+ virtual ~JournalSegmentProvider() {}
+};
+
+/**
+ * Manages stream of atomically written records to a SegmentManager.
+ */
+class Journal {
+public:
+ Journal(SegmentManager &segment_manager);
+
+ /**
+ * Sets the JournalSegmentProvider.
+ *
+ * Not provided in constructor to allow the provider to not own
+ * or construct the Journal (TransactionManager).
+ *
+ * Note, Journal does not own this ptr, user must ensure that
+ * *provider outlives Journal.
+ */
+ void set_segment_provider(JournalSegmentProvider *provider) {
+ segment_provider = provider;
+ }
+
+ /**
+ * initializes journal for new writes -- must run prior to calls
+ * to submit_record. Should be called after replay if not a new
+ * Journal.
+ */
+ using init_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error
+ >;
+ init_ertr::future<> open_for_write();
+
+ /**
+ * close journal
+ *
+ * TODO: should probably flush and disallow further writes
+ */
+ using close_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+ close_ertr::future<> close() { return close_ertr::now(); }
+
+ /**
+ * write_record
+ *
+ * @param write record and returns offset of first block
+ */
+ using submit_record_ertr = crimson::errorator<
+ crimson::ct_error::erange,
+ crimson::ct_error::input_output_error
+ >;
+ using submit_record_ret = submit_record_ertr::future<paddr_t>;
+ submit_record_ret submit_record(record_t &&record) {
+ auto rsize = get_encoded_record_length(record);
+ auto total = rsize.mdlength + rsize.dlength;
+ if (total > max_record_length) {
+ return crimson::ct_error::erange::make();
+ }
+ auto roll = needs_roll(total)
+ ? roll_journal_segment()
+ : roll_journal_segment_ertr::now();
+ return roll.safe_then(
+ [this, rsize, record=std::move(record)]() mutable {
+ auto ret = next_record_addr();
+ return write_record(rsize, std::move(record)
+ ).safe_then([this, ret] {
+ return ret.add_relative(make_relative_paddr(block_size));
+ });
+ });
+ }
+
+ /**
+ * Read deltas and pass to delta_handler
+ *
+ * record_block_start (argument to delta_handler) is the start of the
+ * of the first block in the record
+ */
+ using replay_ertr = SegmentManager::read_ertr;
+ using replay_ret = replay_ertr::future<>;
+ using delta_handler_t = std::function<
+ replay_ret(paddr_t record_start, const delta_info_t&)>;
+ replay_ret replay(delta_handler_t &&delta_handler);
+
+private:
+ const extent_len_t block_size;
+ const extent_len_t max_record_length;
+
+ JournalSegmentProvider *segment_provider = nullptr;
+ SegmentManager &segment_manager;
+
+ paddr_t current_replay_point;
+
+ segment_seq_t current_journal_segment_seq = 0;
+
+ SegmentRef current_journal_segment;
+ segment_off_t written_to = 0;
+
+ segment_id_t next_journal_segment_seq = NULL_SEG_ID;
+ journal_seq_t current_journal_seq = 0;
+
+ /// prepare segment for writes, writes out segment header
+ using initialize_segment_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+ initialize_segment_ertr::future<> initialize_segment(
+ Segment &segment);
+
+ struct record_size_t {
+ extent_len_t mdlength = 0;
+ extent_len_t dlength = 0;
+
+ record_size_t(
+ extent_len_t mdlength,
+ extent_len_t dlength)
+ : mdlength(mdlength), dlength(dlength) {}
+ };
+
+ /**
+ * Return <mdlength, dlength> pair denoting length of
+ * metadata and blocks respectively.
+ */
+ record_size_t get_encoded_record_length(
+ const record_t &record) const;
+
+ /// create encoded record bl
+ ceph::bufferlist encode_record(
+ record_size_t rsize,
+ record_t &&record);
+
+ /// do record write
+ using write_record_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+ write_record_ertr::future<> write_record(
+ record_size_t rsize,
+ record_t &&record);
+
+ /// close current segment and initialize next one
+ using roll_journal_segment_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+ roll_journal_segment_ertr::future<> roll_journal_segment();
+
+ /// returns true iff current segment has insufficient space
+ bool needs_roll(segment_off_t length) const;
+
+ /// returns next record addr
+ paddr_t next_record_addr() const;
+
+ /// return ordered vector of segments to replay
+ using find_replay_segments_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error
+ >;
+ using find_replay_segments_fut =
+ find_replay_segments_ertr::future<std::vector<paddr_t>>;
+ find_replay_segments_fut find_replay_segments();
+
+ /// read record metadata for record starting at start
+ using read_record_metadata_ertr = replay_ertr;
+ using read_record_metadata_ret = read_record_metadata_ertr::future<
+ std::optional<std::pair<record_header_t, bufferlist>>
+ >;
+ read_record_metadata_ret read_record_metadata(
+ paddr_t start);
+
+ /// attempts to decode deltas from bl, return nullopt if unsuccessful
+ std::optional<std::vector<delta_info_t>> try_decode_deltas(
+ record_header_t header,
+ bufferlist &bl);
+
+ /// replays records starting at start through end of segment
+ replay_ertr::future<>
+ replay_segment(
+ paddr_t start, ///< [in] starting addr
+ delta_handler_t &delta_handler ///< [in] processes deltas in order
+ );
+};
+
+}
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::segment_header_t)
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::record_header_t)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/os/seastore/seastore_types.h"
+
+namespace crimson::os::seastore {
+
+std::ostream &segment_to_stream(std::ostream &out, const segment_id_t &t)
+{
+ if (t == NULL_SEG_ID)
+ return out << "NULL_SEG";
+ else if (t == REL_SEG_ID)
+ return out << "REL_SEG";
+ else
+ return out << t;
+}
+
+std::ostream &offset_to_stream(std::ostream &out, const segment_off_t &t)
+{
+ if (t == NULL_SEG_OFF)
+ return out << "NULL_OFF";
+ else
+ return out << t;
+}
+
+std::ostream &operator<<(std::ostream &out, const paddr_t &rhs)
+{
+ out << "paddr_t<";
+ segment_to_stream(out, rhs.segment);
+ out << ", ";
+ offset_to_stream(out, rhs.offset);
+ return out << ">";
+}
+
+std::ostream &operator<<(std::ostream &out, extent_types_t t)
+{
+ switch (t) {
+ case extent_types_t::ROOT_LOCATION:
+ return out << "ROOT_LOCATION";
+ case extent_types_t::ROOT:
+ return out << "ROOT";
+ case extent_types_t::LADDR_INTERNAL:
+ return out << "LADDR_INTERNAL";
+ case extent_types_t::LADDR_LEAF:
+ return out << "LADDR_LEAF";
+ case extent_types_t::TEST_BLOCK:
+ return out << "TEST_BLOCK";
+ case extent_types_t::NONE:
+ return out << "NONE";
+ default:
+ return out << "UNKNOWN";
+ }
+}
+
+std::ostream &operator<<(std::ostream &out, const laddr_list_t &rhs)
+{
+ bool first = false;
+ for (auto &i: rhs) {
+ out << (first ? '[' : ',') << '(' << i.first << ',' << i.second << ')';
+ first = true;
+ }
+ return out << ']';
+}
+std::ostream &operator<<(std::ostream &out, const paddr_list_t &rhs)
+{
+ bool first = false;
+ for (auto &i: rhs) {
+ out << (first ? '[' : ',') << '(' << i.first << ',' << i.second << ')';
+ first = true;
+ }
+ return out << ']';
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <limits>
+#include <iostream>
+
+#include "include/denc.h"
+#include "include/buffer.h"
+#include "include/cmp.h"
+
+namespace crimson::os::seastore {
+
+using checksum_t = uint32_t;
+
+// Identifies segment location on disk, see SegmentManager,
+using segment_id_t = uint32_t;
+constexpr segment_id_t NULL_SEG_ID =
+ std::numeric_limits<segment_id_t>::max() - 1;
+/* Used to denote relative paddr_t */
+constexpr segment_id_t REL_SEG_ID =
+ std::numeric_limits<segment_id_t>::max() - 2;
+
+std::ostream &segment_to_stream(std::ostream &, const segment_id_t &t);
+
+// Offset within a segment on disk, see SegmentManager
+// may be negative for relative offsets
+using segment_off_t = int32_t;
+constexpr segment_off_t NULL_SEG_OFF =
+ std::numeric_limits<segment_id_t>::max();
+
+std::ostream &offset_to_stream(std::ostream &, const segment_off_t &t);
+
+/* Monotonically increasing segment seq, uniquely identifies
+ * the incarnation of a segment */
+using segment_seq_t = uint32_t;
+static constexpr segment_seq_t NULL_SEG_SEQ =
+ std::numeric_limits<segment_seq_t>::max();
+
+// Offset of delta within a record
+using record_delta_idx_t = uint32_t;
+constexpr record_delta_idx_t NULL_DELTA_IDX =
+ std::numeric_limits<record_delta_idx_t>::max();
+
+// <segment, offset> offset on disk, see SegmentManager
+struct paddr_t {
+ segment_id_t segment = NULL_SEG_ID;
+ segment_off_t offset = NULL_SEG_OFF;
+
+ bool is_relative() const {
+ return segment == REL_SEG_ID;
+ }
+
+ paddr_t add_relative(paddr_t o) const {
+ assert(o.is_relative());
+ assert(!is_relative());
+ return paddr_t{segment, offset + o.offset};
+ }
+
+ paddr_t operator-(paddr_t rhs) const {
+ assert(rhs.is_relative() && is_relative());
+ return paddr_t{
+ REL_SEG_ID,
+ offset - rhs.offset
+ };
+ }
+
+ paddr_t maybe_relative_to(paddr_t base) const {
+ if (is_relative())
+ return base.add_relative(*this);
+ else
+ return *this;
+ }
+
+ DENC(paddr_t, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.segment, p);
+ denc(v.offset, p);
+ DENC_FINISH(p);
+ }
+};
+WRITE_CMP_OPERATORS_2(paddr_t, segment, offset)
+WRITE_EQ_OPERATORS_2(paddr_t, segment, offset)
+constexpr paddr_t P_ADDR_NULL = paddr_t{};
+constexpr paddr_t make_relative_paddr(segment_off_t off) {
+ return paddr_t{REL_SEG_ID, off};
+}
+
+std::ostream &operator<<(std::ostream &out, const paddr_t &rhs);
+
+// logical addr, see LBAManager, TransactionManager
+using laddr_t = uint64_t;
+constexpr laddr_t L_ADDR_MIN = std::numeric_limits<laddr_t>::min();
+constexpr laddr_t L_ADDR_MAX = std::numeric_limits<laddr_t>::max();
+constexpr laddr_t L_ADDR_NULL = std::numeric_limits<laddr_t>::max();
+constexpr laddr_t L_ADDR_ROOT = std::numeric_limits<laddr_t>::max() - 1;
+constexpr laddr_t L_ADDR_LBAT = std::numeric_limits<laddr_t>::max() - 2;
+
+// logical offset, see LBAManager, TransactionManager
+using loff_t = uint32_t;
+constexpr loff_t L_OFF_NULL = std::numeric_limits<loff_t>::max();
+
+struct laddr_list_t : std::list<std::pair<laddr_t, loff_t>> {
+ template <typename... T>
+ laddr_list_t(T&&... args)
+ : std::list<std::pair<laddr_t, loff_t>>(std::forward<T>(args)...) {}
+};
+struct paddr_list_t : std::list<std::pair<paddr_t, loff_t>> {
+ template <typename... T>
+ paddr_list_t(T&&... args)
+ : std::list<std::pair<paddr_t, loff_t>>(std::forward<T>(args)...) {}
+};
+
+std::ostream &operator<<(std::ostream &out, const laddr_list_t &rhs);
+std::ostream &operator<<(std::ostream &out, const paddr_list_t &rhs);
+
+/* identifies type of extent, used for interpretting deltas, managing
+ * writeback */
+enum class extent_types_t : uint8_t {
+ ROOT_LOCATION = 0, // delta only
+ ROOT = 1,
+ LADDR_INTERNAL = 2,
+ LADDR_LEAF = 3,
+ LBA_BLOCK = 4,
+
+ // Test Block Types
+ TEST_BLOCK = 0xF0,
+
+ // None
+ NONE = 0xFF
+};
+
+std::ostream &operator<<(std::ostream &out, extent_types_t t);
+
+/* description of a new physical extent */
+struct extent_t {
+ ceph::bufferlist bl; ///< payload, bl.length() == length, aligned
+};
+
+using extent_version_t = uint32_t;
+constexpr extent_version_t EXTENT_VERSION_NULL = 0;
+
+/* description of a mutation to a physical extent */
+struct delta_info_t {
+ extent_types_t type = extent_types_t::NONE; ///< delta type
+ paddr_t paddr; ///< physical address
+ /* logical address -- needed for repopulating cache -- TODO don't actually need */
+ // laddr_t laddr = L_ADDR_NULL;
+ segment_off_t length = NULL_SEG_OFF; ///< extent length
+ extent_version_t pversion; ///< prior version
+ ceph::bufferlist bl; ///< payload
+
+ DENC(delta_info_t, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.type, p);
+ denc(v.paddr, p);
+ //denc(v.laddr, p);
+ denc(v.length, p);
+ denc(v.pversion, p);
+ denc(v.bl, p);
+ DENC_FINISH(p);
+ }
+
+ bool operator==(const delta_info_t &rhs) const {
+ return (
+ type == rhs.type &&
+ paddr == rhs.paddr &&
+ length == rhs.length &&
+ pversion == rhs.pversion &&
+ bl == rhs.bl
+ );
+ }
+};
+
+struct record_t {
+ std::vector<extent_t> extents;
+ std::vector<delta_info_t> deltas;
+};
+
+}
+
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::paddr_t)
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::delta_info_t)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <iostream>
+
+#include "crimson/os/seastore/segment_manager/ephemeral.h"
+
+namespace crimson::os::seastore::segment_manager {
+
+SegmentManagerRef create_ephemeral(ephemeral_config_t config) {
+ return SegmentManagerRef{new EphemeralSegmentManager(config)};
+}
+
+std::ostream &operator<<(std::ostream &lhs, const ephemeral_config_t &c) {
+ return lhs << "ephemeral_config_t(size=" << c.size << ", block_size=" << c.block_size
+ << ", segment_size=" << c.segment_size << ")";
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iosfwd>
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <seastar/core/future.hh>
+
+#include "include/ceph_assert.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "include/buffer_fwd.h"
+#include "crimson/osd/exceptions.h"
+
+namespace crimson::os::seastore {
+
+class Segment : public boost::intrusive_ref_counter<
+ Segment,
+ boost::thread_unsafe_counter>{
+public:
+
+ /**
+ * get_segment_id
+ */
+ virtual segment_id_t get_segment_id() const = 0;
+
+ /**
+ * min next write location
+ */
+ virtual segment_off_t get_write_ptr() const = 0;
+
+ /**
+ * max capacity
+ */
+ virtual segment_off_t get_write_capacity() const = 0;
+
+ /**
+ * close
+ *
+ * Closes segment for writes. Won't complete until
+ * outstanding writes to this segment are complete.
+ */
+ using close_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error,
+ crimson::ct_error::invarg,
+ crimson::ct_error::enoent>;
+ virtual close_ertr::future<> close() = 0;
+
+
+ /**
+ * write
+ *
+ * @param offset offset of write, must be aligned to <> and >= write pointer, advances
+ * write pointer
+ * @param bl buffer to write, will be padded if not aligned
+ */
+ using write_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error, // media error or corruption
+ crimson::ct_error::invarg, // if offset is < write pointer or misaligned
+ crimson::ct_error::ebadf, // segment closed
+ crimson::ct_error::enospc // write exceeds segment size
+ >;
+ virtual write_ertr::future<> write(
+ segment_off_t offset, ceph::bufferlist bl) = 0;
+
+ virtual ~Segment() {}
+};
+using SegmentRef = boost::intrusive_ptr<Segment>;
+
+constexpr size_t PADDR_SIZE = sizeof(paddr_t);
+
+class SegmentManager {
+public:
+ using init_ertr = crimson::errorator<
+ crimson::ct_error::enospc,
+ crimson::ct_error::invarg,
+ crimson::ct_error::erange>;
+ virtual init_ertr::future<> init() = 0;
+
+ using open_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error,
+ crimson::ct_error::invarg,
+ crimson::ct_error::enoent>;
+ virtual open_ertr::future<SegmentRef> open(segment_id_t id) = 0;
+
+ using release_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error,
+ crimson::ct_error::invarg,
+ crimson::ct_error::enoent>;
+ virtual release_ertr::future<> release(segment_id_t id) = 0;
+
+ using read_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error,
+ crimson::ct_error::invarg,
+ crimson::ct_error::enoent,
+ crimson::ct_error::erange>;
+ virtual read_ertr::future<> read(
+ paddr_t addr,
+ size_t len,
+ ceph::bufferptr &out) = 0;
+ read_ertr::future<ceph::bufferptr> read(
+ paddr_t addr,
+ size_t len) {
+ auto ptrref = std::make_unique<ceph::bufferptr>(len);
+ return read(addr, len, *ptrref).safe_then(
+ [ptrref=std::move(ptrref)]() mutable {
+ return read_ertr::make_ready_future<bufferptr>(std::move(*ptrref));
+ });
+ }
+
+ /* Methods for discovering device geometry, segmentid set, etc */
+ virtual size_t get_size() const = 0;
+ virtual segment_off_t get_block_size() const = 0;
+ virtual segment_off_t get_segment_size() const = 0;
+ virtual segment_id_t get_num_segments() const {
+ ceph_assert(get_size() % get_segment_size() == 0);
+ return ((segment_id_t)(get_size() / get_segment_size()));
+ }
+
+
+ virtual ~SegmentManager() {}
+};
+using SegmentManagerRef = std::unique_ptr<SegmentManager>;
+
+namespace segment_manager {
+
+struct ephemeral_config_t {
+ size_t size;
+ size_t block_size;
+ size_t segment_size;
+};
+constexpr ephemeral_config_t DEFAULT_TEST_EPHEMERAL = {
+ 1 << 30,
+ 4 << 10,
+ 32 << 20
+};
+
+std::ostream &operator<<(std::ostream &, const ephemeral_config_t &);
+SegmentManagerRef create_ephemeral(ephemeral_config_t config);
+
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <sys/mman.h>
+#include <string.h>
+
+#include "crimson/common/log.h"
+
+#include "include/buffer.h"
+#include "crimson/os/seastore/segment_manager/ephemeral.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_filestore);
+ }
+}
+
+namespace crimson::os::seastore::segment_manager {
+
+EphemeralSegment::EphemeralSegment(
+ EphemeralSegmentManager &manager, segment_id_t id)
+ : manager(manager), id(id) {}
+
+segment_off_t EphemeralSegment::get_write_capacity() const
+{
+ return manager.get_segment_size();
+}
+
+Segment::close_ertr::future<> EphemeralSegment::close()
+{
+ manager.segment_close(id);
+ return close_ertr::now();
+}
+
+Segment::write_ertr::future<> EphemeralSegment::write(
+ segment_off_t offset, ceph::bufferlist bl)
+{
+ if (offset < write_pointer || offset % manager.config.block_size != 0)
+ return crimson::ct_error::invarg::make();
+
+ if (offset + bl.length() >= manager.config.segment_size)
+ return crimson::ct_error::enospc::make();
+
+ return manager.segment_write({id, offset}, bl);
+}
+
+EphemeralSegmentManager::EphemeralSegmentManager(ephemeral_config_t config)
+ : config(config) {}
+
+Segment::close_ertr::future<> EphemeralSegmentManager::segment_close(segment_id_t id)
+{
+ if (segment_state[id] != segment_state_t::OPEN)
+ return crimson::ct_error::invarg::make();
+
+ segment_state[id] = segment_state_t::CLOSED;
+ return Segment::close_ertr::now();
+}
+
+Segment::write_ertr::future<> EphemeralSegmentManager::segment_write(
+ paddr_t addr,
+ ceph::bufferlist bl,
+ bool ignore_check)
+{
+ logger().debug(
+ "segment_write to segment {} at offset {}, physical offset {}, len {}, crc {}",
+ addr.segment,
+ addr.offset,
+ get_offset(addr),
+ bl.length(),
+ bl.crc32c(0));
+ if (!ignore_check && segment_state[addr.segment] != segment_state_t::OPEN)
+ return crimson::ct_error::invarg::make();
+
+ bl.begin().copy(bl.length(), buffer + get_offset(addr));
+ return Segment::write_ertr::now();
+}
+
+EphemeralSegmentManager::init_ertr::future<> EphemeralSegmentManager::init()
+{
+ logger().debug(
+ "Initing ephemeral segment manager with config {}",
+ config);
+
+ if (config.block_size % (4<<10) != 0) {
+ return crimson::ct_error::invarg::make();
+ }
+ if (config.segment_size % config.block_size != 0) {
+ return crimson::ct_error::invarg::make();
+ }
+ if (config.size % config.segment_size != 0) {
+ return crimson::ct_error::invarg::make();
+ }
+
+ auto addr = ::mmap(
+ nullptr,
+ config.size,
+ PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
+ -1,
+ 0);
+
+ segment_state.resize(config.size / config.segment_size, segment_state_t::EMPTY);
+
+ if (addr == MAP_FAILED)
+ return crimson::ct_error::enospc::make();
+
+ buffer = (char*)addr;
+
+ ::memset(buffer, 0, config.size);
+ return init_ertr::now();
+}
+
+EphemeralSegmentManager::~EphemeralSegmentManager()
+{
+ if (buffer) {
+ ::munmap(buffer, config.size);
+ }
+}
+
+SegmentManager::open_ertr::future<SegmentRef> EphemeralSegmentManager::open(
+ segment_id_t id)
+{
+ if (id >= get_num_segments())
+ return crimson::ct_error::invarg::make();
+
+ if (segment_state[id] != segment_state_t::EMPTY)
+ return crimson::ct_error::invarg::make();
+
+ segment_state[id] = segment_state_t::OPEN;
+ return open_ertr::make_ready_future<SegmentRef>(new EphemeralSegment(*this, id));
+}
+
+SegmentManager::release_ertr::future<> EphemeralSegmentManager::release(
+ segment_id_t id)
+{
+ if (id >= get_num_segments())
+ return crimson::ct_error::invarg::make();
+
+ if (segment_state[id] != segment_state_t::CLOSED)
+ return crimson::ct_error::invarg::make();
+
+ ::memset(buffer + get_offset({id, 0}), 0, config.segment_size);
+ segment_state[id] = segment_state_t::EMPTY;
+ return release_ertr::now();
+}
+
+SegmentManager::read_ertr::future<> EphemeralSegmentManager::read(
+ paddr_t addr,
+ size_t len,
+ ceph::bufferptr &out)
+{
+ if (addr.segment >= get_num_segments())
+ return crimson::ct_error::invarg::make();
+
+ if (addr.offset + len >= config.segment_size)
+ return crimson::ct_error::invarg::make();
+
+ out.copy_in(0, len, buffer + get_offset(addr));
+
+ bufferlist bl;
+ bl.push_back(out);
+ logger().debug(
+ "segment_read to segment {} at offset {}, physical offset {}, length {}, crc {}",
+ addr.segment,
+ addr.offset,
+ get_offset(addr),
+ len,
+ bl.begin().crc32c(config.block_size, 0));
+
+ return read_ertr::now();
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <seastar/core/future.hh>
+
+#include "crimson/os/seastore/segment_manager.h"
+
+#include "crimson/os/seastore/segment_manager/ephemeral.h"
+
+namespace crimson::os::seastore::segment_manager {
+
+class EphemeralSegmentManager;
+class EphemeralSegment final : public Segment {
+ friend class EphemeralSegmentManager;
+ EphemeralSegmentManager &manager;
+ const segment_id_t id;
+ segment_off_t write_pointer = 0;
+public:
+ EphemeralSegment(EphemeralSegmentManager &manager, segment_id_t id);
+
+ segment_id_t get_segment_id() const final { return id; }
+ segment_off_t get_write_capacity() const final;
+ segment_off_t get_write_ptr() const final { return write_pointer; }
+ close_ertr::future<> close() final;
+ write_ertr::future<> write(segment_off_t offset, ceph::bufferlist bl) final;
+
+ ~EphemeralSegment() {}
+};
+
+class EphemeralSegmentManager final : public SegmentManager {
+ friend class EphemeralSegment;
+
+ const ephemeral_config_t config;
+
+ size_t get_offset(paddr_t addr) {
+ return (addr.segment * config.segment_size) + addr.offset;
+ }
+
+ enum class segment_state_t {
+ EMPTY,
+ OPEN,
+ CLOSED
+ };
+ std::vector<segment_state_t> segment_state;
+
+ char *buffer = nullptr;
+
+ Segment::close_ertr::future<> segment_close(segment_id_t id);
+
+public:
+ EphemeralSegmentManager(ephemeral_config_t config);
+ ~EphemeralSegmentManager();
+
+ init_ertr::future<> init() final;
+
+ open_ertr::future<SegmentRef> open(segment_id_t id) final;
+
+ release_ertr::future<> release(segment_id_t id) final;
+
+ read_ertr::future<> read(
+ paddr_t addr,
+ size_t len,
+ ceph::bufferptr &out) final;
+
+ size_t get_size() const final {
+ return config.size;
+ }
+ segment_off_t get_block_size() const {
+ return config.block_size;
+ }
+ segment_off_t get_segment_size() const {
+ return config.segment_size;
+ }
+
+ // public so tests can bypass segment interface when simpler
+ Segment::write_ertr::future<> segment_write(
+ paddr_t addr,
+ ceph::bufferlist bl,
+ bool ignore_check=false);
+};
+
+}
+
+add_executable(unittest_seastore_journal
+ test_seastore_journal.cc
+ ../gtest_seastar.cc)
+add_ceph_unittest(unittest_seastore_journal)
+target_link_libraries(
+ unittest_seastore_journal
+ ${CMAKE_DL_LIBS}
+ crimson-seastore)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/crimson/gtest_seastar.h"
+
+#include <random>
+
+#include "crimson/common/log.h"
+#include "crimson/os/seastore/journal.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+using namespace crimson;
+using namespace crimson::os;
+using namespace crimson::os::seastore;
+
+namespace {
+ [[maybe_unused]] seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_test);
+ }
+}
+
+struct record_validator_t {
+ record_t record;
+ paddr_t record_final_offset;
+
+ template <typename... T>
+ record_validator_t(T&&... record) : record(std::forward<T>(record)...) {}
+
+ void validate(SegmentManager &manager) {
+ paddr_t addr = make_relative_paddr(0);
+ for (auto &&block : record.extents) {
+ auto test = manager.read(
+ record_final_offset.add_relative(addr),
+ block.bl.length()).unsafe_get0();
+ addr.offset += block.bl.length();
+ bufferlist bl;
+ bl.push_back(test);
+ ASSERT_EQ(
+ bl.length(),
+ block.bl.length());
+ ASSERT_EQ(
+ bl.begin().crc32c(bl.length(), 1),
+ block.bl.begin().crc32c(block.bl.length(), 1));
+ }
+ }
+
+ auto get_replay_handler() {
+ auto checker = [this, iter=record.deltas.begin()] (
+ paddr_t base,
+ const delta_info_t &di) mutable {
+ EXPECT_EQ(base, record_final_offset);
+ ceph_assert(iter != record.deltas.end());
+ EXPECT_EQ(di, *iter++);
+ EXPECT_EQ(base, record_final_offset);
+ return iter != record.deltas.end();
+ };
+ if (record.deltas.size()) {
+ return std::make_optional(std::move(checker));
+ } else {
+ return std::optional<decltype(checker)>();
+ }
+ }
+};
+
+struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider {
+ std::unique_ptr<SegmentManager> segment_manager;
+ std::unique_ptr<Journal> journal;
+
+ std::vector<record_validator_t> records;
+
+ std::default_random_engine generator;
+
+ const segment_off_t block_size;
+
+ journal_test_t()
+ : segment_manager(create_ephemeral(segment_manager::DEFAULT_TEST_EPHEMERAL)),
+ block_size(segment_manager->get_block_size())
+ {
+ }
+
+ segment_id_t next = 0;
+ get_segment_ret get_segment() final {
+ return get_segment_ret(
+ get_segment_ertr::ready_future_marker{},
+ next++);
+ }
+
+ void put_segment(segment_id_t segment) final {
+ return;
+ }
+
+ seastar::future<> set_up_fut() final {
+ journal.reset(new Journal(*segment_manager));
+ journal->set_segment_provider(this);
+ return segment_manager->init(
+ ).safe_then([this] {
+ return journal->open_for_write();
+ }).handle_error(
+ crimson::ct_error::all_same_way([] {
+ ASSERT_FALSE("Unable to mount");
+ }));
+ }
+
+ template <typename T>
+ auto replay(T &&f) {
+ return journal->close(
+ ).safe_then([this, f=std::move(f)]() mutable {
+ journal.reset(new Journal(*segment_manager));
+ journal->set_segment_provider(this);
+ return journal->replay(std::forward<T>(std::move(f)));
+ }).safe_then([this] {
+ return journal->open_for_write();
+ });
+ }
+
+ auto replay_and_check() {
+ auto record_iter = records.begin();
+ decltype(record_iter->get_replay_handler()) delta_checker = std::nullopt;
+ auto advance = [this, &record_iter, &delta_checker] {
+ ceph_assert(!delta_checker);
+ while (record_iter != records.end()) {
+ auto checker = record_iter->get_replay_handler();
+ record_iter++;
+ if (checker) {
+ delta_checker.emplace(std::move(*checker));
+ break;
+ }
+ }
+ };
+ advance();
+ replay(
+ [this,
+ &advance,
+ &record_iter,
+ &delta_checker]
+ (auto base, const auto &di) mutable {
+ if (!delta_checker) {
+ EXPECT_FALSE("No Deltas Left");
+ }
+ if (!(*delta_checker)(base, di)) {
+ delta_checker = std::nullopt;
+ advance();
+ }
+ return Journal::replay_ertr::now();
+ }).unsafe_get0();
+ ASSERT_EQ(record_iter, records.end());
+ for (auto &i : records) {
+ i.validate(*segment_manager);
+ }
+ }
+
+ template <typename... T>
+ auto submit_record(T&&... _record) {
+ auto record{std::forward<T>(_record)...};
+ records.push_back(record);
+ auto addr = journal->submit_record(std::move(record)).unsafe_get0();
+ records.back().record_final_offset = addr;
+ return addr;
+ }
+
+ seastar::future<> tear_down_fut() final {
+ return seastar::now();
+ }
+
+ extent_t generate_extent(size_t blocks) {
+ std::uniform_int_distribution<char> distribution(
+ std::numeric_limits<char>::min(),
+ std::numeric_limits<char>::max()
+ );
+ char contents = distribution(generator);
+ bufferlist bl;
+ bl.append(buffer::ptr(buffer::create(blocks * block_size, contents)));
+ return extent_t{bl};
+ }
+
+ delta_info_t generate_delta(size_t bytes) {
+ std::uniform_int_distribution<char> distribution(
+ std::numeric_limits<char>::min(),
+ std::numeric_limits<char>::max()
+ );
+ char contents = distribution(generator);
+ bufferlist bl;
+ bl.append(buffer::ptr(buffer::create(bytes, contents)));
+ return delta_info_t{
+ extent_types_t::TEST_BLOCK,
+ paddr_t{},
+ block_size,
+ 1,
+ bl
+ };
+ }
+};
+
+TEST_F(journal_test_t, replay_one_journal_segment)
+{
+ run_async([this] {
+ submit_record(record_t{
+ { generate_extent(1), generate_extent(2) },
+ { generate_delta(23), generate_delta(30) }
+ });
+ replay_and_check();
+ });
+}
+
+TEST_F(journal_test_t, replay_two_records)
+{
+ run_async([this] {
+ submit_record(record_t{
+ { generate_extent(1), generate_extent(2) },
+ { generate_delta(23), generate_delta(30) }
+ });
+ submit_record(record_t{
+ { generate_extent(4), generate_extent(1) },
+ { generate_delta(23), generate_delta(400) }
+ });
+ replay_and_check();
+ });
+}
+
+TEST_F(journal_test_t, replay_twice)
+{
+ run_async([this] {
+ submit_record(record_t{
+ { generate_extent(1), generate_extent(2) },
+ { generate_delta(23), generate_delta(30) }
+ });
+ submit_record(record_t{
+ { generate_extent(4), generate_extent(1) },
+ { generate_delta(23), generate_delta(400) }
+ });
+ replay_and_check();
+ submit_record(record_t{
+ { generate_extent(2), generate_extent(5) },
+ { generate_delta(230), generate_delta(40) }
+ });
+ replay_and_check();
+ });
+}
+
+TEST_F(journal_test_t, roll_journal_and_replay)
+{
+ run_async([this] {
+ paddr_t current = submit_record(
+ record_t{
+ { generate_extent(1), generate_extent(2) },
+ { generate_delta(23), generate_delta(30) }
+ });
+ auto starting_segment = current.segment;
+ unsigned so_far = 0;
+ while (current.segment == starting_segment) {
+ current = submit_record(record_t{
+ { generate_extent(512), generate_extent(512) },
+ { generate_delta(23), generate_delta(400) }
+ });
+ ++so_far;
+ ASSERT_FALSE(so_far > 10);
+ }
+ replay_and_check();
+ });
+}