From: Samuel Just Date: Mon, 4 May 2020 18:47:13 +0000 (-0700) Subject: crimson/os/seastore: introduce initial journal implementation and tests X-Git-Tag: wip-pdonnell-testing-20200918.022351~1267^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=01a1a8c1584f81764a1d0196364681865fc9068b;p=ceph-ci.git crimson/os/seastore: introduce initial journal implementation and tests Signed-off-by: Samuel Just --- diff --git a/src/crimson/os/seastore/CMakeLists.txt b/src/crimson/os/seastore/CMakeLists.txt index e69de29bb2d..32b756a471d 100644 --- a/src/crimson/os/seastore/CMakeLists.txt +++ b/src/crimson/os/seastore/CMakeLists.txt @@ -0,0 +1,8 @@ +add_library(crimson-seastore + seastore_types.cc + segment_manager/ephemeral.cc + segment_manager.cc + journal.cc + ) +target_link_libraries(crimson-seastore + crimson) diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc new file mode 100644 index 00000000000..6d041f7b215 --- /dev/null +++ b/src/crimson/os/seastore/journal.cc @@ -0,0 +1,370 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include "crimson/os/seastore/journal.h" + +#include "include/intarith.h" +#include "crimson/os/seastore/segment_manager.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_filestore); + } +} + +namespace crimson::os::seastore { + +Journal::Journal(SegmentManager &segment_manager) + : block_size(segment_manager.get_block_size()), + max_record_length( + segment_manager.get_segment_size() - + p2align(ceph::encoded_sizeof_bounded(), + size_t(block_size))), + segment_manager(segment_manager) {} + + +Journal::initialize_segment_ertr::future<> Journal::initialize_segment( + Segment &segment) +{ + logger().debug("initialize_segment {}", segment.get_segment_id()); + // write out header + ceph_assert(segment.get_write_ptr() == 0); + bufferlist bl; + auto header = segment_header_t{ + current_journal_segment_seq++, + segment.get_segment_id(), + current_replay_point}; + ::encode(header, bl); + + written_to = segment_manager.get_block_size(); + return segment.write(0, bl).handle_error( + init_ertr::pass_further{}, + crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); })); +} + +ceph::bufferlist Journal::encode_record( + record_size_t rsize, + record_t &&record) +{ + bufferlist metadatabl; + record_header_t header{ + rsize.mdlength, + rsize.dlength, + current_journal_seq, + 0 /* checksum, TODO */, + record.deltas.size(), + record.extents.size() + }; + ::encode(header, metadatabl); + for (const auto &i: record.deltas) { + ::encode(i, metadatabl); + } + bufferlist databl; + for (auto &i: record.extents) { + databl.claim_append(i.bl); + } + if (metadatabl.length() % block_size != 0) { + metadatabl.append( + ceph::bufferptr( + block_size - (metadatabl.length() % block_size))); + } + + ceph_assert(metadatabl.length() == rsize.mdlength); + ceph_assert(databl.length() == rsize.dlength); + metadatabl.claim_append(databl); + ceph_assert(metadatabl.length() == (rsize.mdlength + rsize.dlength)); + return metadatabl; +} + +Journal::write_record_ertr::future<> Journal::write_record( + record_size_t rsize, + record_t &&record) +{ + ceph::bufferlist to_write = encode_record( + rsize, std::move(record)); + auto target = written_to; + written_to += p2roundup(to_write.length(), (unsigned)block_size); + logger().debug( + "write_record, mdlength {}, dlength {}", + rsize.mdlength, + rsize.dlength); + return current_journal_segment->write(target, to_write).handle_error( + write_record_ertr::pass_further{}, + crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); })); +} + +Journal::record_size_t Journal::get_encoded_record_length( + const record_t &record) const { + extent_len_t metadata = + (extent_len_t)ceph::encoded_sizeof_bounded(); + extent_len_t data = 0; + for (const auto &i: record.deltas) { + metadata += ceph::encoded_sizeof(i); + } + for (const auto &i: record.extents) { + data += i.bl.length(); + } + metadata = p2roundup(metadata, block_size); + return record_size_t{metadata, data}; +} + +bool Journal::needs_roll(segment_off_t length) const +{ + return length + written_to > + current_journal_segment->get_write_capacity(); +} + +paddr_t Journal::next_record_addr() const +{ + paddr_t ret{current_journal_segment->get_segment_id(), written_to}; + logger().debug("next_record_addr: {}", ret); + return ret; +} + +Journal::roll_journal_segment_ertr::future<> +Journal::roll_journal_segment() +{ + auto old_segment_id = current_journal_segment ? + current_journal_segment->get_segment_id() : + NULL_SEG_ID; + + return (current_journal_segment ? + current_journal_segment->close() : + Segment::close_ertr::now()).safe_then( + [this, old_segment_id] { + // TODO: pretty sure this needs to be atomic in some sense with + // making use of the new segment, maybe this bit needs to take + // the first transaction of the new segment? Or the segment + // header should include deltas? + if (old_segment_id != NULL_SEG_ID) { + segment_provider->put_segment(old_segment_id); + } + return segment_provider->get_segment(); + }).safe_then([this](auto segment) { + return segment_manager.open(segment); + }).safe_then([this](auto sref) { + current_journal_segment = sref; + written_to = 0; + return initialize_segment(*current_journal_segment); + }).handle_error( + roll_journal_segment_ertr::pass_further{}, + crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); }) + ); +} + +Journal::init_ertr::future<> Journal::open_for_write() +{ + return roll_journal_segment(); +} + +Journal::find_replay_segments_fut Journal::find_replay_segments() +{ + return seastar::do_with( + std::vector>(), + [this](auto &&segments) mutable { + return crimson::do_for_each( + boost::make_counting_iterator(segment_id_t{0}), + boost::make_counting_iterator(segment_manager.get_num_segments()), + [this, &segments](auto i) { + return segment_manager.read(paddr_t{i, 0}, block_size + ).safe_then([this, &segments, i](bufferptr bptr) mutable { + logger().debug("segment {} bptr size {}", i, bptr.length()); + segment_header_t header; + bufferlist bl; + bl.push_back(bptr); + + logger().debug( + "find_replay_segments: segment {} block crc {}", + i, + bl.begin().crc32c(block_size, 0)); + + auto bp = bl.cbegin(); + try { + ::decode(header, bp); + } catch (ceph::buffer::error &e) { + logger().debug( + "find_replay_segments: segment {} unable to decode " + "header, skipping", + i); + return find_replay_segments_ertr::now(); + } + segments.emplace_back(i, std::move(header)); + return find_replay_segments_ertr::now(); + }).handle_error( + find_replay_segments_ertr::pass_further{}, + crimson::ct_error::discard_all{} + ); + }).safe_then([this, &segments]() mutable -> find_replay_segments_fut { + logger().debug( + "find_replay_segments: have {} segments", + segments.size()); + if (segments.empty()) { + return crimson::ct_error::input_output_error::make(); + } + std::sort( + segments.begin(), + segments.end(), + [](const auto <, const auto &rt) { + return lt.second.journal_segment_seq < + rt.second.journal_segment_seq; + }); + + auto replay_from = segments.rbegin()->second.journal_replay_lb; + auto from = segments.begin(); + if (replay_from != P_ADDR_NULL) { + from = std::find_if( + segments.begin(), + segments.end(), + [&replay_from](const auto &seg) -> bool { + return seg.first == replay_from.segment; + }); + } else { + replay_from = paddr_t{from->first, (segment_off_t)block_size}; + } + auto ret = std::vector(segments.end() - from); + std::transform( + from, segments.end(), ret.begin(), + [this](const auto &p) { + return paddr_t{p.first, (segment_off_t)block_size}; + }); + ret[0] = replay_from; + return find_replay_segments_fut( + find_replay_segments_ertr::ready_future_marker{}, + std::move(ret)); + }); + }); +} + +Journal::read_record_metadata_ret Journal::read_record_metadata( + paddr_t start) +{ + return segment_manager.read(start, block_size + ).safe_then( + [this, start](bufferptr bptr) mutable + -> read_record_metadata_ret { + logger().debug("read_record_metadata: reading {}", start); + bufferlist bl; + bl.append(bptr); + auto bp = bl.cbegin(); + record_header_t header; + try { + ::decode(header, bp); + } catch (ceph::buffer::error &e) { + return read_record_metadata_ret( + read_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + if (header.mdlength > block_size) { + return segment_manager.read( + {start.segment, start.offset + (segment_off_t)block_size}, + header.mdlength - block_size).safe_then( + [this, header=std::move(header), bl=std::move(bl)]( + auto &&bptail) mutable { + bl.push_back(bptail); + return read_record_metadata_ret( + read_record_metadata_ertr::ready_future_marker{}, + std::make_pair(std::move(header), std::move(bl))); + }); + } else { + return read_record_metadata_ret( + read_record_metadata_ertr::ready_future_marker{}, + std::make_pair(std::move(header), std::move(bl)) + ); + } + }); +} + +std::optional> Journal::try_decode_deltas( + record_header_t header, + bufferlist &bl) +{ + auto bliter = bl.cbegin(); + bliter += ceph::encoded_sizeof_bounded(); + std::vector deltas(header.deltas); + for (auto &&i : deltas) { + try { + ::decode(i, bliter); + } catch (ceph::buffer::error &e) { + return std::nullopt; + } + } + return deltas; +} + +Journal::replay_ertr::future<> +Journal::replay_segment( + paddr_t start, + delta_handler_t &delta_handler) +{ + logger().debug("replay_segment: starting at {}", start); + return seastar::do_with( + std::move(start), + [this, &delta_handler](auto ¤t) { + return crimson::do_until( + [this, ¤t, &delta_handler]() -> replay_ertr::future { + return read_record_metadata(current).safe_then + ([this, ¤t, &delta_handler](auto p) + -> replay_ertr::future { + if (!p.has_value()) { + return replay_ertr::make_ready_future(true); + } + + auto &[header, bl] = *p; + + logger().debug( + "replay_segment: next record offset {} mdlength {} dlength {}", + current, + header.mdlength, + header.dlength); + + auto record_start = current; + current.offset += header.mdlength + header.dlength; + + auto deltas = try_decode_deltas( + header, + bl); + if (!deltas) { + return replay_ertr::make_ready_future(true); + } + + return seastar::do_with( + std::move(*deltas), + [this, &delta_handler, record_start](auto &deltas) { + return crimson::do_for_each( + deltas, + [this, &delta_handler, record_start](auto &info) { + return delta_handler( + record_start.add_relative( + make_relative_paddr(block_size)), + info); + }); + }).safe_then([] { + return replay_ertr::make_ready_future(false); + }); + }); + }); + }); +} + +Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler) +{ + return seastar::do_with( + std::make_pair(std::move(delta_handler), std::vector()), + [this](auto &&item) mutable -> replay_ret { + auto &[handler, segments] = item; + return find_replay_segments( + ).safe_then([this, &handler, &segments](auto osegments) { + logger().debug("replay: found {} segments", segments.size()); + segments.swap(osegments); + return crimson::do_for_each( + segments, + [this, &handler](auto i) { + return replay_segment(i, handler); + }); + }); + }); +} + +} diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h new file mode 100644 index 00000000000..aed11b54d1d --- /dev/null +++ b/src/crimson/os/seastore/journal.h @@ -0,0 +1,262 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/common/log.h" + +#include + +#include + +#include "include/ceph_assert.h" +#include "include/buffer.h" +#include "include/denc.h" + +#include "crimson/os/seastore/segment_manager.h" +#include "crimson/os/seastore/seastore_types.h" +#include "crimson/osd/exceptions.h" + +namespace crimson::os::seastore { + +using journal_seq_t = uint64_t; +static constexpr journal_seq_t NO_DELTAS = + std::numeric_limits::max(); + +/** + * Segment header + * + * Every segment contains and encode segment_header_t in the first block. + * Our strategy for finding the journal replay point is: + * 1) Find the segment with the highest journal_segment_seq + * 2) Scan forward from committed_journal_lb to find the most recent + * journal_commit_lb record + * 3) Replay starting at the most recent found journal_commit_lb record + */ +struct segment_header_t { + segment_seq_t journal_segment_seq; + segment_id_t physical_segment_id; // debugging + + paddr_t journal_replay_lb; + + DENC(segment_header_t, v, p) { + DENC_START(1, 1, p); + denc(v.journal_segment_seq, p); + denc(v.physical_segment_id, p); + denc(v.journal_replay_lb, p); + DENC_FINISH(p); + } +}; + +struct record_header_t { + // Fixed portion + extent_len_t mdlength; // block aligned, length of metadata + extent_len_t dlength; // block aligned, length of data + journal_seq_t seq; // current journal seqid + checksum_t full_checksum; // checksum for full record (TODO) + size_t deltas; // number of deltas + size_t extents; // number of extents + + DENC(record_header_t, v, p) { + DENC_START(1, 1, p); + denc(v.mdlength, p); + denc(v.dlength, p); + denc(v.seq, p); + denc(v.full_checksum, p); + denc(v.deltas, p); + denc(v.extents, p); + DENC_FINISH(p); + } +}; + +/** + * Callback interface for managing available segments + */ +class JournalSegmentProvider { +public: + using get_segment_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + using get_segment_ret = get_segment_ertr::future; + virtual get_segment_ret get_segment() = 0; + + /* TODO: we'll want to use this to propogate information about segment contents */ + virtual void put_segment(segment_id_t segment) = 0; + + virtual ~JournalSegmentProvider() {} +}; + +/** + * Manages stream of atomically written records to a SegmentManager. + */ +class Journal { +public: + Journal(SegmentManager &segment_manager); + + /** + * Sets the JournalSegmentProvider. + * + * Not provided in constructor to allow the provider to not own + * or construct the Journal (TransactionManager). + * + * Note, Journal does not own this ptr, user must ensure that + * *provider outlives Journal. + */ + void set_segment_provider(JournalSegmentProvider *provider) { + segment_provider = provider; + } + + /** + * initializes journal for new writes -- must run prior to calls + * to submit_record. Should be called after replay if not a new + * Journal. + */ + using init_ertr = crimson::errorator< + crimson::ct_error::input_output_error + >; + init_ertr::future<> open_for_write(); + + /** + * close journal + * + * TODO: should probably flush and disallow further writes + */ + using close_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + close_ertr::future<> close() { return close_ertr::now(); } + + /** + * write_record + * + * @param write record and returns offset of first block + */ + using submit_record_ertr = crimson::errorator< + crimson::ct_error::erange, + crimson::ct_error::input_output_error + >; + using submit_record_ret = submit_record_ertr::future; + submit_record_ret submit_record(record_t &&record) { + auto rsize = get_encoded_record_length(record); + auto total = rsize.mdlength + rsize.dlength; + if (total > max_record_length) { + return crimson::ct_error::erange::make(); + } + auto roll = needs_roll(total) + ? roll_journal_segment() + : roll_journal_segment_ertr::now(); + return roll.safe_then( + [this, rsize, record=std::move(record)]() mutable { + auto ret = next_record_addr(); + return write_record(rsize, std::move(record) + ).safe_then([this, ret] { + return ret.add_relative(make_relative_paddr(block_size)); + }); + }); + } + + /** + * Read deltas and pass to delta_handler + * + * record_block_start (argument to delta_handler) is the start of the + * of the first block in the record + */ + using replay_ertr = SegmentManager::read_ertr; + using replay_ret = replay_ertr::future<>; + using delta_handler_t = std::function< + replay_ret(paddr_t record_start, const delta_info_t&)>; + replay_ret replay(delta_handler_t &&delta_handler); + +private: + const extent_len_t block_size; + const extent_len_t max_record_length; + + JournalSegmentProvider *segment_provider = nullptr; + SegmentManager &segment_manager; + + paddr_t current_replay_point; + + segment_seq_t current_journal_segment_seq = 0; + + SegmentRef current_journal_segment; + segment_off_t written_to = 0; + + segment_id_t next_journal_segment_seq = NULL_SEG_ID; + journal_seq_t current_journal_seq = 0; + + /// prepare segment for writes, writes out segment header + using initialize_segment_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + initialize_segment_ertr::future<> initialize_segment( + Segment &segment); + + struct record_size_t { + extent_len_t mdlength = 0; + extent_len_t dlength = 0; + + record_size_t( + extent_len_t mdlength, + extent_len_t dlength) + : mdlength(mdlength), dlength(dlength) {} + }; + + /** + * Return pair denoting length of + * metadata and blocks respectively. + */ + record_size_t get_encoded_record_length( + const record_t &record) const; + + /// create encoded record bl + ceph::bufferlist encode_record( + record_size_t rsize, + record_t &&record); + + /// do record write + using write_record_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + write_record_ertr::future<> write_record( + record_size_t rsize, + record_t &&record); + + /// close current segment and initialize next one + using roll_journal_segment_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + roll_journal_segment_ertr::future<> roll_journal_segment(); + + /// returns true iff current segment has insufficient space + bool needs_roll(segment_off_t length) const; + + /// returns next record addr + paddr_t next_record_addr() const; + + /// return ordered vector of segments to replay + using find_replay_segments_ertr = crimson::errorator< + crimson::ct_error::input_output_error + >; + using find_replay_segments_fut = + find_replay_segments_ertr::future>; + find_replay_segments_fut find_replay_segments(); + + /// read record metadata for record starting at start + using read_record_metadata_ertr = replay_ertr; + using read_record_metadata_ret = read_record_metadata_ertr::future< + std::optional> + >; + read_record_metadata_ret read_record_metadata( + paddr_t start); + + /// attempts to decode deltas from bl, return nullopt if unsuccessful + std::optional> try_decode_deltas( + record_header_t header, + bufferlist &bl); + + /// replays records starting at start through end of segment + replay_ertr::future<> + replay_segment( + paddr_t start, ///< [in] starting addr + delta_handler_t &delta_handler ///< [in] processes deltas in order + ); +}; + +} +WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::segment_header_t) +WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::record_header_t) diff --git a/src/crimson/os/seastore/seastore_types.cc b/src/crimson/os/seastore/seastore_types.cc new file mode 100644 index 00000000000..af8516e2eac --- /dev/null +++ b/src/crimson/os/seastore/seastore_types.cc @@ -0,0 +1,74 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "crimson/os/seastore/seastore_types.h" + +namespace crimson::os::seastore { + +std::ostream &segment_to_stream(std::ostream &out, const segment_id_t &t) +{ + if (t == NULL_SEG_ID) + return out << "NULL_SEG"; + else if (t == REL_SEG_ID) + return out << "REL_SEG"; + else + return out << t; +} + +std::ostream &offset_to_stream(std::ostream &out, const segment_off_t &t) +{ + if (t == NULL_SEG_OFF) + return out << "NULL_OFF"; + else + return out << t; +} + +std::ostream &operator<<(std::ostream &out, const paddr_t &rhs) +{ + out << "paddr_t<"; + segment_to_stream(out, rhs.segment); + out << ", "; + offset_to_stream(out, rhs.offset); + return out << ">"; +} + +std::ostream &operator<<(std::ostream &out, extent_types_t t) +{ + switch (t) { + case extent_types_t::ROOT_LOCATION: + return out << "ROOT_LOCATION"; + case extent_types_t::ROOT: + return out << "ROOT"; + case extent_types_t::LADDR_INTERNAL: + return out << "LADDR_INTERNAL"; + case extent_types_t::LADDR_LEAF: + return out << "LADDR_LEAF"; + case extent_types_t::TEST_BLOCK: + return out << "TEST_BLOCK"; + case extent_types_t::NONE: + return out << "NONE"; + default: + return out << "UNKNOWN"; + } +} + +std::ostream &operator<<(std::ostream &out, const laddr_list_t &rhs) +{ + bool first = false; + for (auto &i: rhs) { + out << (first ? '[' : ',') << '(' << i.first << ',' << i.second << ')'; + first = true; + } + return out << ']'; +} +std::ostream &operator<<(std::ostream &out, const paddr_list_t &rhs) +{ + bool first = false; + for (auto &i: rhs) { + out << (first ? '[' : ',') << '(' << i.first << ',' << i.second << ')'; + first = true; + } + return out << ']'; +} + +} diff --git a/src/crimson/os/seastore/seastore_types.h b/src/crimson/os/seastore/seastore_types.h new file mode 100644 index 00000000000..ea94722d917 --- /dev/null +++ b/src/crimson/os/seastore/seastore_types.h @@ -0,0 +1,184 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include "include/denc.h" +#include "include/buffer.h" +#include "include/cmp.h" + +namespace crimson::os::seastore { + +using checksum_t = uint32_t; + +// Identifies segment location on disk, see SegmentManager, +using segment_id_t = uint32_t; +constexpr segment_id_t NULL_SEG_ID = + std::numeric_limits::max() - 1; +/* Used to denote relative paddr_t */ +constexpr segment_id_t REL_SEG_ID = + std::numeric_limits::max() - 2; + +std::ostream &segment_to_stream(std::ostream &, const segment_id_t &t); + +// Offset within a segment on disk, see SegmentManager +// may be negative for relative offsets +using segment_off_t = int32_t; +constexpr segment_off_t NULL_SEG_OFF = + std::numeric_limits::max(); + +std::ostream &offset_to_stream(std::ostream &, const segment_off_t &t); + +/* Monotonically increasing segment seq, uniquely identifies + * the incarnation of a segment */ +using segment_seq_t = uint32_t; +static constexpr segment_seq_t NULL_SEG_SEQ = + std::numeric_limits::max(); + +// Offset of delta within a record +using record_delta_idx_t = uint32_t; +constexpr record_delta_idx_t NULL_DELTA_IDX = + std::numeric_limits::max(); + +// offset on disk, see SegmentManager +struct paddr_t { + segment_id_t segment = NULL_SEG_ID; + segment_off_t offset = NULL_SEG_OFF; + + bool is_relative() const { + return segment == REL_SEG_ID; + } + + paddr_t add_relative(paddr_t o) const { + assert(o.is_relative()); + assert(!is_relative()); + return paddr_t{segment, offset + o.offset}; + } + + paddr_t operator-(paddr_t rhs) const { + assert(rhs.is_relative() && is_relative()); + return paddr_t{ + REL_SEG_ID, + offset - rhs.offset + }; + } + + paddr_t maybe_relative_to(paddr_t base) const { + if (is_relative()) + return base.add_relative(*this); + else + return *this; + } + + DENC(paddr_t, v, p) { + DENC_START(1, 1, p); + denc(v.segment, p); + denc(v.offset, p); + DENC_FINISH(p); + } +}; +WRITE_CMP_OPERATORS_2(paddr_t, segment, offset) +WRITE_EQ_OPERATORS_2(paddr_t, segment, offset) +constexpr paddr_t P_ADDR_NULL = paddr_t{}; +constexpr paddr_t make_relative_paddr(segment_off_t off) { + return paddr_t{REL_SEG_ID, off}; +} + +std::ostream &operator<<(std::ostream &out, const paddr_t &rhs); + +// logical addr, see LBAManager, TransactionManager +using laddr_t = uint64_t; +constexpr laddr_t L_ADDR_MIN = std::numeric_limits::min(); +constexpr laddr_t L_ADDR_MAX = std::numeric_limits::max(); +constexpr laddr_t L_ADDR_NULL = std::numeric_limits::max(); +constexpr laddr_t L_ADDR_ROOT = std::numeric_limits::max() - 1; +constexpr laddr_t L_ADDR_LBAT = std::numeric_limits::max() - 2; + +// logical offset, see LBAManager, TransactionManager +using loff_t = uint32_t; +constexpr loff_t L_OFF_NULL = std::numeric_limits::max(); + +struct laddr_list_t : std::list> { + template + laddr_list_t(T&&... args) + : std::list>(std::forward(args)...) {} +}; +struct paddr_list_t : std::list> { + template + paddr_list_t(T&&... args) + : std::list>(std::forward(args)...) {} +}; + +std::ostream &operator<<(std::ostream &out, const laddr_list_t &rhs); +std::ostream &operator<<(std::ostream &out, const paddr_list_t &rhs); + +/* identifies type of extent, used for interpretting deltas, managing + * writeback */ +enum class extent_types_t : uint8_t { + ROOT_LOCATION = 0, // delta only + ROOT = 1, + LADDR_INTERNAL = 2, + LADDR_LEAF = 3, + LBA_BLOCK = 4, + + // Test Block Types + TEST_BLOCK = 0xF0, + + // None + NONE = 0xFF +}; + +std::ostream &operator<<(std::ostream &out, extent_types_t t); + +/* description of a new physical extent */ +struct extent_t { + ceph::bufferlist bl; ///< payload, bl.length() == length, aligned +}; + +using extent_version_t = uint32_t; +constexpr extent_version_t EXTENT_VERSION_NULL = 0; + +/* description of a mutation to a physical extent */ +struct delta_info_t { + extent_types_t type = extent_types_t::NONE; ///< delta type + paddr_t paddr; ///< physical address + /* logical address -- needed for repopulating cache -- TODO don't actually need */ + // laddr_t laddr = L_ADDR_NULL; + segment_off_t length = NULL_SEG_OFF; ///< extent length + extent_version_t pversion; ///< prior version + ceph::bufferlist bl; ///< payload + + DENC(delta_info_t, v, p) { + DENC_START(1, 1, p); + denc(v.type, p); + denc(v.paddr, p); + //denc(v.laddr, p); + denc(v.length, p); + denc(v.pversion, p); + denc(v.bl, p); + DENC_FINISH(p); + } + + bool operator==(const delta_info_t &rhs) const { + return ( + type == rhs.type && + paddr == rhs.paddr && + length == rhs.length && + pversion == rhs.pversion && + bl == rhs.bl + ); + } +}; + +struct record_t { + std::vector extents; + std::vector deltas; +}; + +} + +WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::paddr_t) +WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::delta_info_t) diff --git a/src/crimson/os/seastore/segment_manager.cc b/src/crimson/os/seastore/segment_manager.cc new file mode 100644 index 00000000000..b20bc699260 --- /dev/null +++ b/src/crimson/os/seastore/segment_manager.cc @@ -0,0 +1,19 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include "crimson/os/seastore/segment_manager/ephemeral.h" + +namespace crimson::os::seastore::segment_manager { + +SegmentManagerRef create_ephemeral(ephemeral_config_t config) { + return SegmentManagerRef{new EphemeralSegmentManager(config)}; +} + +std::ostream &operator<<(std::ostream &lhs, const ephemeral_config_t &c) { + return lhs << "ephemeral_config_t(size=" << c.size << ", block_size=" << c.block_size + << ", segment_size=" << c.segment_size << ")"; +} + +} diff --git a/src/crimson/os/seastore/segment_manager.h b/src/crimson/os/seastore/segment_manager.h new file mode 100644 index 00000000000..6304586083e --- /dev/null +++ b/src/crimson/os/seastore/segment_manager.h @@ -0,0 +1,145 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include + +#include +#include +#include + +#include "include/ceph_assert.h" +#include "crimson/os/seastore/seastore_types.h" +#include "include/buffer_fwd.h" +#include "crimson/osd/exceptions.h" + +namespace crimson::os::seastore { + +class Segment : public boost::intrusive_ref_counter< + Segment, + boost::thread_unsafe_counter>{ +public: + + /** + * get_segment_id + */ + virtual segment_id_t get_segment_id() const = 0; + + /** + * min next write location + */ + virtual segment_off_t get_write_ptr() const = 0; + + /** + * max capacity + */ + virtual segment_off_t get_write_capacity() const = 0; + + /** + * close + * + * Closes segment for writes. Won't complete until + * outstanding writes to this segment are complete. + */ + using close_ertr = crimson::errorator< + crimson::ct_error::input_output_error, + crimson::ct_error::invarg, + crimson::ct_error::enoent>; + virtual close_ertr::future<> close() = 0; + + + /** + * write + * + * @param offset offset of write, must be aligned to <> and >= write pointer, advances + * write pointer + * @param bl buffer to write, will be padded if not aligned + */ + using write_ertr = crimson::errorator< + crimson::ct_error::input_output_error, // media error or corruption + crimson::ct_error::invarg, // if offset is < write pointer or misaligned + crimson::ct_error::ebadf, // segment closed + crimson::ct_error::enospc // write exceeds segment size + >; + virtual write_ertr::future<> write( + segment_off_t offset, ceph::bufferlist bl) = 0; + + virtual ~Segment() {} +}; +using SegmentRef = boost::intrusive_ptr; + +constexpr size_t PADDR_SIZE = sizeof(paddr_t); + +class SegmentManager { +public: + using init_ertr = crimson::errorator< + crimson::ct_error::enospc, + crimson::ct_error::invarg, + crimson::ct_error::erange>; + virtual init_ertr::future<> init() = 0; + + using open_ertr = crimson::errorator< + crimson::ct_error::input_output_error, + crimson::ct_error::invarg, + crimson::ct_error::enoent>; + virtual open_ertr::future open(segment_id_t id) = 0; + + using release_ertr = crimson::errorator< + crimson::ct_error::input_output_error, + crimson::ct_error::invarg, + crimson::ct_error::enoent>; + virtual release_ertr::future<> release(segment_id_t id) = 0; + + using read_ertr = crimson::errorator< + crimson::ct_error::input_output_error, + crimson::ct_error::invarg, + crimson::ct_error::enoent, + crimson::ct_error::erange>; + virtual read_ertr::future<> read( + paddr_t addr, + size_t len, + ceph::bufferptr &out) = 0; + read_ertr::future read( + paddr_t addr, + size_t len) { + auto ptrref = std::make_unique(len); + return read(addr, len, *ptrref).safe_then( + [ptrref=std::move(ptrref)]() mutable { + return read_ertr::make_ready_future(std::move(*ptrref)); + }); + } + + /* Methods for discovering device geometry, segmentid set, etc */ + virtual size_t get_size() const = 0; + virtual segment_off_t get_block_size() const = 0; + virtual segment_off_t get_segment_size() const = 0; + virtual segment_id_t get_num_segments() const { + ceph_assert(get_size() % get_segment_size() == 0); + return ((segment_id_t)(get_size() / get_segment_size())); + } + + + virtual ~SegmentManager() {} +}; +using SegmentManagerRef = std::unique_ptr; + +namespace segment_manager { + +struct ephemeral_config_t { + size_t size; + size_t block_size; + size_t segment_size; +}; +constexpr ephemeral_config_t DEFAULT_TEST_EPHEMERAL = { + 1 << 30, + 4 << 10, + 32 << 20 +}; + +std::ostream &operator<<(std::ostream &, const ephemeral_config_t &); +SegmentManagerRef create_ephemeral(ephemeral_config_t config); + +} + +} diff --git a/src/crimson/os/seastore/segment_manager/ephemeral.cc b/src/crimson/os/seastore/segment_manager/ephemeral.cc new file mode 100644 index 00000000000..c2b32eb00c9 --- /dev/null +++ b/src/crimson/os/seastore/segment_manager/ephemeral.cc @@ -0,0 +1,172 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include + +#include "crimson/common/log.h" + +#include "include/buffer.h" +#include "crimson/os/seastore/segment_manager/ephemeral.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_filestore); + } +} + +namespace crimson::os::seastore::segment_manager { + +EphemeralSegment::EphemeralSegment( + EphemeralSegmentManager &manager, segment_id_t id) + : manager(manager), id(id) {} + +segment_off_t EphemeralSegment::get_write_capacity() const +{ + return manager.get_segment_size(); +} + +Segment::close_ertr::future<> EphemeralSegment::close() +{ + manager.segment_close(id); + return close_ertr::now(); +} + +Segment::write_ertr::future<> EphemeralSegment::write( + segment_off_t offset, ceph::bufferlist bl) +{ + if (offset < write_pointer || offset % manager.config.block_size != 0) + return crimson::ct_error::invarg::make(); + + if (offset + bl.length() >= manager.config.segment_size) + return crimson::ct_error::enospc::make(); + + return manager.segment_write({id, offset}, bl); +} + +EphemeralSegmentManager::EphemeralSegmentManager(ephemeral_config_t config) + : config(config) {} + +Segment::close_ertr::future<> EphemeralSegmentManager::segment_close(segment_id_t id) +{ + if (segment_state[id] != segment_state_t::OPEN) + return crimson::ct_error::invarg::make(); + + segment_state[id] = segment_state_t::CLOSED; + return Segment::close_ertr::now(); +} + +Segment::write_ertr::future<> EphemeralSegmentManager::segment_write( + paddr_t addr, + ceph::bufferlist bl, + bool ignore_check) +{ + logger().debug( + "segment_write to segment {} at offset {}, physical offset {}, len {}, crc {}", + addr.segment, + addr.offset, + get_offset(addr), + bl.length(), + bl.crc32c(0)); + if (!ignore_check && segment_state[addr.segment] != segment_state_t::OPEN) + return crimson::ct_error::invarg::make(); + + bl.begin().copy(bl.length(), buffer + get_offset(addr)); + return Segment::write_ertr::now(); +} + +EphemeralSegmentManager::init_ertr::future<> EphemeralSegmentManager::init() +{ + logger().debug( + "Initing ephemeral segment manager with config {}", + config); + + if (config.block_size % (4<<10) != 0) { + return crimson::ct_error::invarg::make(); + } + if (config.segment_size % config.block_size != 0) { + return crimson::ct_error::invarg::make(); + } + if (config.size % config.segment_size != 0) { + return crimson::ct_error::invarg::make(); + } + + auto addr = ::mmap( + nullptr, + config.size, + PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, + -1, + 0); + + segment_state.resize(config.size / config.segment_size, segment_state_t::EMPTY); + + if (addr == MAP_FAILED) + return crimson::ct_error::enospc::make(); + + buffer = (char*)addr; + + ::memset(buffer, 0, config.size); + return init_ertr::now(); +} + +EphemeralSegmentManager::~EphemeralSegmentManager() +{ + if (buffer) { + ::munmap(buffer, config.size); + } +} + +SegmentManager::open_ertr::future EphemeralSegmentManager::open( + segment_id_t id) +{ + if (id >= get_num_segments()) + return crimson::ct_error::invarg::make(); + + if (segment_state[id] != segment_state_t::EMPTY) + return crimson::ct_error::invarg::make(); + + segment_state[id] = segment_state_t::OPEN; + return open_ertr::make_ready_future(new EphemeralSegment(*this, id)); +} + +SegmentManager::release_ertr::future<> EphemeralSegmentManager::release( + segment_id_t id) +{ + if (id >= get_num_segments()) + return crimson::ct_error::invarg::make(); + + if (segment_state[id] != segment_state_t::CLOSED) + return crimson::ct_error::invarg::make(); + + ::memset(buffer + get_offset({id, 0}), 0, config.segment_size); + segment_state[id] = segment_state_t::EMPTY; + return release_ertr::now(); +} + +SegmentManager::read_ertr::future<> EphemeralSegmentManager::read( + paddr_t addr, + size_t len, + ceph::bufferptr &out) +{ + if (addr.segment >= get_num_segments()) + return crimson::ct_error::invarg::make(); + + if (addr.offset + len >= config.segment_size) + return crimson::ct_error::invarg::make(); + + out.copy_in(0, len, buffer + get_offset(addr)); + + bufferlist bl; + bl.push_back(out); + logger().debug( + "segment_read to segment {} at offset {}, physical offset {}, length {}, crc {}", + addr.segment, + addr.offset, + get_offset(addr), + len, + bl.begin().crc32c(config.block_size, 0)); + + return read_ertr::now(); +} + +} diff --git a/src/crimson/os/seastore/segment_manager/ephemeral.h b/src/crimson/os/seastore/segment_manager/ephemeral.h new file mode 100644 index 00000000000..60b78d66f3d --- /dev/null +++ b/src/crimson/os/seastore/segment_manager/ephemeral.h @@ -0,0 +1,86 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include + +#include "crimson/os/seastore/segment_manager.h" + +#include "crimson/os/seastore/segment_manager/ephemeral.h" + +namespace crimson::os::seastore::segment_manager { + +class EphemeralSegmentManager; +class EphemeralSegment final : public Segment { + friend class EphemeralSegmentManager; + EphemeralSegmentManager &manager; + const segment_id_t id; + segment_off_t write_pointer = 0; +public: + EphemeralSegment(EphemeralSegmentManager &manager, segment_id_t id); + + segment_id_t get_segment_id() const final { return id; } + segment_off_t get_write_capacity() const final; + segment_off_t get_write_ptr() const final { return write_pointer; } + close_ertr::future<> close() final; + write_ertr::future<> write(segment_off_t offset, ceph::bufferlist bl) final; + + ~EphemeralSegment() {} +}; + +class EphemeralSegmentManager final : public SegmentManager { + friend class EphemeralSegment; + + const ephemeral_config_t config; + + size_t get_offset(paddr_t addr) { + return (addr.segment * config.segment_size) + addr.offset; + } + + enum class segment_state_t { + EMPTY, + OPEN, + CLOSED + }; + std::vector segment_state; + + char *buffer = nullptr; + + Segment::close_ertr::future<> segment_close(segment_id_t id); + +public: + EphemeralSegmentManager(ephemeral_config_t config); + ~EphemeralSegmentManager(); + + init_ertr::future<> init() final; + + open_ertr::future open(segment_id_t id) final; + + release_ertr::future<> release(segment_id_t id) final; + + read_ertr::future<> read( + paddr_t addr, + size_t len, + ceph::bufferptr &out) final; + + size_t get_size() const final { + return config.size; + } + segment_off_t get_block_size() const { + return config.block_size; + } + segment_off_t get_segment_size() const { + return config.segment_size; + } + + // public so tests can bypass segment interface when simpler + Segment::write_ertr::future<> segment_write( + paddr_t addr, + ceph::bufferlist bl, + bool ignore_check=false); +}; + +} diff --git a/src/test/crimson/seastore/CMakeLists.txt b/src/test/crimson/seastore/CMakeLists.txt index e69de29bb2d..1e447a31420 100644 --- a/src/test/crimson/seastore/CMakeLists.txt +++ b/src/test/crimson/seastore/CMakeLists.txt @@ -0,0 +1,9 @@ + +add_executable(unittest_seastore_journal + test_seastore_journal.cc + ../gtest_seastar.cc) +add_ceph_unittest(unittest_seastore_journal) +target_link_libraries( + unittest_seastore_journal + ${CMAKE_DL_LIBS} + crimson-seastore) diff --git a/src/test/crimson/seastore/test_seastore_journal.cc b/src/test/crimson/seastore/test_seastore_journal.cc new file mode 100644 index 00000000000..309d5a2d9bd --- /dev/null +++ b/src/test/crimson/seastore/test_seastore_journal.cc @@ -0,0 +1,260 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/crimson/gtest_seastar.h" + +#include + +#include "crimson/common/log.h" +#include "crimson/os/seastore/journal.h" +#include "crimson/os/seastore/segment_manager.h" + +using namespace crimson; +using namespace crimson::os; +using namespace crimson::os::seastore; + +namespace { + [[maybe_unused]] seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_test); + } +} + +struct record_validator_t { + record_t record; + paddr_t record_final_offset; + + template + record_validator_t(T&&... record) : record(std::forward(record)...) {} + + void validate(SegmentManager &manager) { + paddr_t addr = make_relative_paddr(0); + for (auto &&block : record.extents) { + auto test = manager.read( + record_final_offset.add_relative(addr), + block.bl.length()).unsafe_get0(); + addr.offset += block.bl.length(); + bufferlist bl; + bl.push_back(test); + ASSERT_EQ( + bl.length(), + block.bl.length()); + ASSERT_EQ( + bl.begin().crc32c(bl.length(), 1), + block.bl.begin().crc32c(block.bl.length(), 1)); + } + } + + auto get_replay_handler() { + auto checker = [this, iter=record.deltas.begin()] ( + paddr_t base, + const delta_info_t &di) mutable { + EXPECT_EQ(base, record_final_offset); + ceph_assert(iter != record.deltas.end()); + EXPECT_EQ(di, *iter++); + EXPECT_EQ(base, record_final_offset); + return iter != record.deltas.end(); + }; + if (record.deltas.size()) { + return std::make_optional(std::move(checker)); + } else { + return std::optional(); + } + } +}; + +struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider { + std::unique_ptr segment_manager; + std::unique_ptr journal; + + std::vector records; + + std::default_random_engine generator; + + const segment_off_t block_size; + + journal_test_t() + : segment_manager(create_ephemeral(segment_manager::DEFAULT_TEST_EPHEMERAL)), + block_size(segment_manager->get_block_size()) + { + } + + segment_id_t next = 0; + get_segment_ret get_segment() final { + return get_segment_ret( + get_segment_ertr::ready_future_marker{}, + next++); + } + + void put_segment(segment_id_t segment) final { + return; + } + + seastar::future<> set_up_fut() final { + journal.reset(new Journal(*segment_manager)); + journal->set_segment_provider(this); + return segment_manager->init( + ).safe_then([this] { + return journal->open_for_write(); + }).handle_error( + crimson::ct_error::all_same_way([] { + ASSERT_FALSE("Unable to mount"); + })); + } + + template + auto replay(T &&f) { + return journal->close( + ).safe_then([this, f=std::move(f)]() mutable { + journal.reset(new Journal(*segment_manager)); + journal->set_segment_provider(this); + return journal->replay(std::forward(std::move(f))); + }).safe_then([this] { + return journal->open_for_write(); + }); + } + + auto replay_and_check() { + auto record_iter = records.begin(); + decltype(record_iter->get_replay_handler()) delta_checker = std::nullopt; + auto advance = [this, &record_iter, &delta_checker] { + ceph_assert(!delta_checker); + while (record_iter != records.end()) { + auto checker = record_iter->get_replay_handler(); + record_iter++; + if (checker) { + delta_checker.emplace(std::move(*checker)); + break; + } + } + }; + advance(); + replay( + [this, + &advance, + &record_iter, + &delta_checker] + (auto base, const auto &di) mutable { + if (!delta_checker) { + EXPECT_FALSE("No Deltas Left"); + } + if (!(*delta_checker)(base, di)) { + delta_checker = std::nullopt; + advance(); + } + return Journal::replay_ertr::now(); + }).unsafe_get0(); + ASSERT_EQ(record_iter, records.end()); + for (auto &i : records) { + i.validate(*segment_manager); + } + } + + template + auto submit_record(T&&... _record) { + auto record{std::forward(_record)...}; + records.push_back(record); + auto addr = journal->submit_record(std::move(record)).unsafe_get0(); + records.back().record_final_offset = addr; + return addr; + } + + seastar::future<> tear_down_fut() final { + return seastar::now(); + } + + extent_t generate_extent(size_t blocks) { + std::uniform_int_distribution distribution( + std::numeric_limits::min(), + std::numeric_limits::max() + ); + char contents = distribution(generator); + bufferlist bl; + bl.append(buffer::ptr(buffer::create(blocks * block_size, contents))); + return extent_t{bl}; + } + + delta_info_t generate_delta(size_t bytes) { + std::uniform_int_distribution distribution( + std::numeric_limits::min(), + std::numeric_limits::max() + ); + char contents = distribution(generator); + bufferlist bl; + bl.append(buffer::ptr(buffer::create(bytes, contents))); + return delta_info_t{ + extent_types_t::TEST_BLOCK, + paddr_t{}, + block_size, + 1, + bl + }; + } +}; + +TEST_F(journal_test_t, replay_one_journal_segment) +{ + run_async([this] { + submit_record(record_t{ + { generate_extent(1), generate_extent(2) }, + { generate_delta(23), generate_delta(30) } + }); + replay_and_check(); + }); +} + +TEST_F(journal_test_t, replay_two_records) +{ + run_async([this] { + submit_record(record_t{ + { generate_extent(1), generate_extent(2) }, + { generate_delta(23), generate_delta(30) } + }); + submit_record(record_t{ + { generate_extent(4), generate_extent(1) }, + { generate_delta(23), generate_delta(400) } + }); + replay_and_check(); + }); +} + +TEST_F(journal_test_t, replay_twice) +{ + run_async([this] { + submit_record(record_t{ + { generate_extent(1), generate_extent(2) }, + { generate_delta(23), generate_delta(30) } + }); + submit_record(record_t{ + { generate_extent(4), generate_extent(1) }, + { generate_delta(23), generate_delta(400) } + }); + replay_and_check(); + submit_record(record_t{ + { generate_extent(2), generate_extent(5) }, + { generate_delta(230), generate_delta(40) } + }); + replay_and_check(); + }); +} + +TEST_F(journal_test_t, roll_journal_and_replay) +{ + run_async([this] { + paddr_t current = submit_record( + record_t{ + { generate_extent(1), generate_extent(2) }, + { generate_delta(23), generate_delta(30) } + }); + auto starting_segment = current.segment; + unsigned so_far = 0; + while (current.segment == starting_segment) { + current = submit_record(record_t{ + { generate_extent(512), generate_extent(512) }, + { generate_delta(23), generate_delta(400) } + }); + ++so_far; + ASSERT_FALSE(so_far > 10); + } + replay_and_check(); + }); +}