From 3a5765bd2f3b3d733439b7f9d98e10b2c519b89f Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 4 May 2020 11:50:37 -0700 Subject: [PATCH] crimson/os/seastore: add initial cache support with tests Signed-off-by: Samuel Just --- src/crimson/os/seastore/CMakeLists.txt | 3 + src/crimson/os/seastore/cache.cc | 220 +++++++++ src/crimson/os/seastore/cache.h | 382 +++++++++++++++ src/crimson/os/seastore/cached_extent.cc | 68 +++ src/crimson/os/seastore/cached_extent.h | 446 ++++++++++++++++++ src/crimson/os/seastore/root_block.cc | 28 ++ src/crimson/os/seastore/root_block.h | 83 ++++ src/crimson/os/seastore/seastore_types.h | 17 +- src/test/crimson/seastore/CMakeLists.txt | 9 + .../crimson/seastore/test_seastore_cache.cc | 251 ++++++++++ 10 files changed, 1501 insertions(+), 6 deletions(-) create mode 100644 src/crimson/os/seastore/cache.cc create mode 100644 src/crimson/os/seastore/cache.h create mode 100644 src/crimson/os/seastore/cached_extent.cc create mode 100644 src/crimson/os/seastore/cached_extent.h create mode 100644 src/crimson/os/seastore/root_block.cc create mode 100644 src/crimson/os/seastore/root_block.h create mode 100644 src/test/crimson/seastore/test_seastore_cache.cc diff --git a/src/crimson/os/seastore/CMakeLists.txt b/src/crimson/os/seastore/CMakeLists.txt index 32b756a471dba..5df0d94123f5c 100644 --- a/src/crimson/os/seastore/CMakeLists.txt +++ b/src/crimson/os/seastore/CMakeLists.txt @@ -1,8 +1,11 @@ add_library(crimson-seastore + cached_extent.cc seastore_types.cc segment_manager/ephemeral.cc segment_manager.cc journal.cc + cache.cc + root_block.cc ) target_link_libraries(crimson-seastore crimson) diff --git a/src/crimson/os/seastore/cache.cc b/src/crimson/os/seastore/cache.cc new file mode 100644 index 0000000000000..b6795244655db --- /dev/null +++ b/src/crimson/os/seastore/cache.cc @@ -0,0 +1,220 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "crimson/os/seastore/cache.h" +#include "crimson/common/log.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_filestore); + } +} + +namespace crimson::os::seastore { + +Cache::~Cache() +{ + retire_extent(root); + root.reset(); + for (auto i = dirty.begin(); i != dirty.end(); ) { + auto ptr = &*i; + dirty.erase(i++); + intrusive_ptr_release(ptr); + } + for (auto &i: extents) { + logger().error("~Cache: extent {} still alive", i); + } + ceph_assert(extents.empty()); +} + +void Cache::add_extent(CachedExtentRef ref) +{ + assert(ref->is_valid()); + extents.insert(*ref); + + ceph_assert(!ref->primary_ref_list_hook.is_linked()); + if (ref->is_dirty()) { + intrusive_ptr_add_ref(&*ref); + dirty.push_back(*ref); + } + logger().debug("add_extent: {}", *ref); +} + +void Cache::retire_extent(CachedExtentRef ref) +{ + logger().debug("retire_extent: {}", *ref); + assert(ref->is_valid()); + extents.erase(*ref); + + if (ref->is_dirty()) { + ceph_assert(ref->primary_ref_list_hook.is_linked()); + dirty.erase(dirty.s_iterator_to(*ref)); + intrusive_ptr_release(&*ref); + } else { + ceph_assert(!ref->primary_ref_list_hook.is_linked()); + } +} + +CachedExtentRef Cache::duplicate_for_write( + Transaction &t, + CachedExtentRef i) { + if (i->is_pending()) + return i; + + auto ret = i->duplicate_for_write(); + ret->version++; + ret->state = CachedExtent::extent_state_t::MUTATION_PENDING; + + if (ret->get_type() == extent_types_t::ROOT) { + t.root = ret->cast(); + } + + t.add_to_retired_set(i); + t.add_mutated_extent(ret); + + return ret; +} + +std::optional Cache::try_construct_record(Transaction &t) +{ + // First, validate read set + for (auto &i: t.read_set) { + if (i->state == CachedExtent::extent_state_t::INVALID) + return std::nullopt; + } + + record_t record; + + // Transaction is now a go, set up in-memory cache state + // invalidate now invalid blocks + for (auto &i: t.retired_set) { + logger().debug("try_construct_record: retiring {}", *i); + ceph_assert(!i->is_pending()); + ceph_assert(i->is_valid()); + retire_extent(i); + i->state = CachedExtent::extent_state_t::INVALID; + } + + t.write_set.clear(); + + // Add new copy of mutated blocks, set_io_wait to block until written + record.deltas.reserve(t.mutated_block_list.size()); + for (auto &i: t.mutated_block_list) { + logger().debug("try_construct_record: mutating {}", *i); + add_extent(i); + i->prepare_write(); + i->set_io_wait(); + record.deltas.push_back( + delta_info_t{ + i->get_type(), + i->get_paddr(), + (segment_off_t)i->get_length(), + i->get_version(), + i->get_delta() + }); + } + + record.extents.reserve(t.fresh_block_list.size()); + for (auto &i: t.fresh_block_list) { + logger().debug("try_construct_record: fresh block {}", *i); + bufferlist bl; + i->prepare_write(); + bl.append(i->get_bptr()); + if (i->get_type() == extent_types_t::ROOT) { + record.deltas.push_back( + delta_info_t{ + extent_types_t::ROOT_LOCATION, + i->get_paddr(), + 0, + 0, + bufferlist() + }); + } + record.extents.push_back(extent_t{std::move(bl)}); + } + + t.read_set.clear(); + return std::make_optional(std::move(record)); +} + +void Cache::complete_commit( + Transaction &t, + paddr_t final_block_start) +{ + if (t.root) + root = t.root; + + paddr_t cur = final_block_start; + for (auto &i: t.fresh_block_list) { + i->set_paddr(cur); + cur.offset += i->get_length(); + i->state = CachedExtent::extent_state_t::CLEAN; + logger().debug("complete_commit: fresh {}", *i); + i->on_initial_write(); + add_extent(i); + } + + // Add new copy of mutated blocks, set_io_wait to block until written + for (auto &i: t.mutated_block_list) { + i->state = CachedExtent::extent_state_t::DIRTY; + logger().debug("complete_commit: mutated {}", *i); + i->on_delta_write(final_block_start); + } + + for (auto &i: t.mutated_block_list) { + i->complete_io(); + } +} + +Cache::mkfs_ertr::future<> Cache::mkfs(Transaction &t) +{ + t.root = alloc_new_extent(t, RootBlock::SIZE); + return mkfs_ertr::now(); +} + +Cache::close_ertr::future<> Cache::close() +{ + return close_ertr::now(); +} + +Cache::replay_delta_ret +Cache::replay_delta(paddr_t record_base, const delta_info_t &delta) +{ + if (delta.type == extent_types_t::ROOT_LOCATION) { + auto root_location = delta.paddr.is_relative() + ? record_base.add_relative(delta.paddr) + : delta.paddr; + logger().debug("replay_delta: found root addr {}", root_location); + return get_extent( + root_location, + RootBlock::SIZE + ).safe_then([this, root_location](auto ref) { + logger().debug("replay_delta: finished reading root at {}", root_location); + root = ref; + return root->complete_load(); + }).safe_then([this, root_location] { + logger().debug("replay_delta: finished loading root at {}", root_location); + return replay_delta_ret(replay_delta_ertr::ready_future_marker{}); + }); + } + // TODO + return replay_delta_ret(replay_delta_ertr::ready_future_marker{}); +} + +Cache::get_root_ret Cache::get_root(Transaction &t) +{ + if (t.root) { + return get_root_ret( + get_root_ertr::ready_future_marker{}, + t.root); + } else { + auto ret = root; + return ret->wait_io().then([this, &t, ret] { + return get_root_ret( + get_root_ertr::ready_future_marker{}, + ret); + }); + } +} + +} diff --git a/src/crimson/os/seastore/cache.h b/src/crimson/os/seastore/cache.h new file mode 100644 index 0000000000000..b8c3fe2d2c0be --- /dev/null +++ b/src/crimson/os/seastore/cache.h @@ -0,0 +1,382 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include + +#include "seastar/core/shared_future.hh" + +#include "include/buffer.h" +#include "crimson/os/seastore/seastore_types.h" +#include "crimson/os/seastore/segment_manager.h" +#include "crimson/common/errorator.h" +#include "crimson/os/seastore/cached_extent.h" +#include "crimson/os/seastore/root_block.h" + +namespace crimson::os::seastore { + +/** + * Transaction + * + * Representation of in-progress mutation. Used exclusively through Cache methods. + */ +class Transaction { + friend class Cache; + + RootBlockRef root; ///< ref to root if mutated by transaction + + segment_off_t offset = 0; ///< relative offset of next block + + pextent_set_t read_set; ///< set of extents read by paddr + ExtentIndex write_set; ///< set of extents written by paddr + + std::list fresh_block_list; ///< list of fresh blocks + std::list mutated_block_list; ///< list of mutated blocks + + pextent_set_t retired_set; ///< list of extents mutated by this transaction + + CachedExtentRef get_extent(paddr_t addr) { + if (auto iter = write_set.find_offset(addr); + iter != write_set.end()) { + return CachedExtentRef(&*iter); + } else if ( + auto iter = read_set.find(addr); + iter != read_set.end()) { + return *iter; + } else { + return CachedExtentRef(); + } + } + + void add_to_retired_set(CachedExtentRef ref) { + ceph_assert(retired_set.count(ref->get_paddr()) == 0); + retired_set.insert(ref); + } + + void add_to_read_set(CachedExtentRef ref) { + ceph_assert(read_set.count(ref) == 0); + read_set.insert(ref); + } + + void add_fresh_extent(CachedExtentRef ref) { + fresh_block_list.push_back(ref); + ref->set_paddr(make_relative_paddr(offset)); + offset += ref->get_length(); + write_set.insert(*ref); + } + + void add_mutated_extent(CachedExtentRef ref) { + mutated_block_list.push_back(ref); + write_set.insert(*ref); + } +}; +using TransactionRef = std::unique_ptr; + +/** + * Cache + * + * This component is responsible for buffer management, including + * transaction lifecycle. + * + * Seastore transactions are expressed as an atomic combination of + * 1) newly written blocks + * 2) logical mutations to existing physical blocks + * + * See record_t + * + * As such, any transaction has 3 components: + * 1) read_set: references to extents read during the transaction + * See Transaction::read_set + * 2) write_set: references to extents to be written as: + * a) new physical blocks, see Transaction::fresh_block_list + * b) mutations to existing physical blocks, + * see Transaction::mutated_block_list + * 3) retired_set: extent refs to be retired either due to 2b or + * due to releasing the extent generally. + + * In the case of 2b, the CachedExtent will have been copied into + * a fresh CachedExtentRef such that the source extent ref is present + * in the read set and the newly allocated extent is present in the + * write_set. + * + * A transaction has 3 phases: + * 1) construction: user calls Cache::get_transaction() and populates + * the returned transaction by calling Cache methods + * 2) submission: user calls Cache::try_start_transaction(). If + * succcessful, the user may construct a record and submit the + * transaction to the journal. + * 3) completion: once the transaction is durable, the user must call + * Cache::complete_transaction() with the block offset to complete + * the transaction. + * + * Internally, in phase 1, the fields in Transaction are filled in. + * - reads may block if the referenced extent is being written + * - once a read obtains a particular CachedExtentRef for a paddr_t, + * it'll always get the same one until overwritten + * - once a paddr_t is overwritten or written, subsequent reads of + * that addr will get the new ref + * + * In phase 2, if all extents in the read set are valid (not expired), + * we can commit (otherwise, we fail and the user must retry). + * - Expire all extents in the retired_set (they must all be valid) + * - Remove all extents in the retired_set from Cache::extents + * - Mark all extents in the write_set wait_io(), add promises to + * transaction + * - Merge Transaction::write_set into Cache::extents + * + * After phase 2, the user will submit the record to the journal. + * Once complete, we perform phase 3: + * - For each CachedExtent in block_list, call + * CachedExtent::complete_initial_write(paddr_t) with the block's + * final offset (inferred from the extent's position in the block_list + * and extent lengths). + * - For each block in mutation_list, call + * CachedExtent::delta_written(paddr_t) with the address of the start + * of the record + * - Complete all promises with the final record start paddr_t + */ +class Cache { +public: + Cache(SegmentManager &segment_manager) : segment_manager(segment_manager) {} + ~Cache(); + + TransactionRef get_transaction() { + return std::make_unique(); + } + + /// Declare ref retired in t + void retire_extent(Transaction &t, CachedExtentRef ref) { + t.add_to_retired_set(ref); + } + + /** + * get_root + * + * returns ref to current root or t.root if modified in t + */ + using get_root_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + using get_root_ret = get_root_ertr::future; + get_root_ret get_root(Transaction &t); + + /** + * get_extent + * + * returns ref to extent at offset~length of type T either from + * - extent_set if already in cache + * - disk + */ + using get_extent_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + template + get_extent_ertr::future> get_extent( + paddr_t offset, ///< [in] starting addr + segment_off_t length ///< [in] length + ) { + if (auto iter = extents.find_offset(offset); + iter != extents.end()) { + auto ret = TCachedExtentRef(static_cast(&*iter)); + return ret->wait_io().then([ret=std::move(ret)]() mutable { + return get_extent_ertr::make_ready_future>( + std::move(ret)); + }); + } else { + auto ref = CachedExtent::make_cached_extent_ref( + alloc_cache_buf(length)); + ref->set_io_wait(); + ref->set_paddr(offset); + ref->state = CachedExtent::extent_state_t::CLEAN; + return segment_manager.read( + offset, + length, + ref->get_bptr()).safe_then( + [this, ref=std::move(ref)]() mutable { + ref->complete_io(); + return get_extent_ertr::make_ready_future>( + std::move(ref)); + }, + get_extent_ertr::pass_further{}, + crimson::ct_error::discard_all{}); + } + } + + /** + * get_extent + * + * returns ref to extent at offset~length of type T either from + * - t if modified by t + * - extent_set if already in cache + * - disk + */ + template + get_extent_ertr::future> get_extent( + Transaction &t, ///< [in,out] current transaction + paddr_t offset, ///< [in] starting addr + segment_off_t length ///< [in] length + ) { + if (auto i = t.get_extent(offset)) { + return get_extent_ertr::make_ready_future>( + TCachedExtentRef(static_cast(&*i))); + } else { + return get_extent(offset, length).safe_then( + [this, &t](auto ref) mutable { + t.add_to_read_set(ref); + return get_extent_ertr::make_ready_future>(std::move(ref)); + }); + } + } + + /** + * get_extents + * + * returns refs to extents in extents from: + * - t if modified by t + * - extent_set if already in cache + * - disk + */ + template + get_extent_ertr::future> get_extents( + Transaction &t, ///< [in, out] current transaction + paddr_list_t &&extents ///< [in] extent list for lookup + ) { + auto retref = std::make_unique>(); + auto &ret = *retref; + auto ext = std::make_unique(std::move(extents)); + return crimson::do_for_each( + ext->begin(), + ext->end(), + [this, &t, &ret](auto &p) { + auto &[offset, len] = p; + return get_extent(t, offset, len).safe_then([&ret](auto cext) { + ret.push_back(std::move(cext)); + }); + }).safe_then([retref=std::move(retref), ext=std::move(ext)]() mutable { + return get_extent_ertr::make_ready_future>( + std::move(*retref)); + }); + } + + /** + * alloc_new_extent + * + * Allocates a fresh extent. addr will be relative until commit. + */ + template + TCachedExtentRef alloc_new_extent( + Transaction &t, ///< [in, out] current transaction + segment_off_t length ///< [in] length + ) { + auto ret = CachedExtent::make_cached_extent_ref( + alloc_cache_buf(length)); + t.add_fresh_extent(ret); + ret->state = CachedExtent::extent_state_t::INITIAL_WRITE_PENDING; + return ret; + } + + /** + * Allocates mutable buffer from extent_set on offset~len + * + * TODO: Note, currently all implementations literally copy the + * buffer. This needn't be true, CachedExtent implementations could + * choose to refer to the same buffer unmodified until commit and just + * buffer the mutations in an ancillary data structure. + * + * @param current transaction + * @param extent to duplicate + * @return mutable extent + */ + CachedExtentRef duplicate_for_write( + Transaction &t, ///< [in, out] current transaction + CachedExtentRef i ///< [in] ref to existing extent + ); + + /** + * try_construct_record + * + * First checks for conflicts. If a racing write has mutated/retired + * an extent mutated by this transaction, nullopt will be returned. + * + * Otherwise, a record will be returned valid for use with Journal. + */ + std::optional try_construct_record( + Transaction &t ///< [in, out] current transaction + ); + + /** + * complete_commit + * + * Must be called upon completion of write. Releases blocks on mutating + * extents, fills in addresses, and calls relevant callbacks on fresh + * and mutated exents. + */ + void complete_commit( + Transaction &t, ///< [in, out] current transaction + paddr_t final_block_start ///< [in] offset of initial block + ); + + /** + * mkfs + * + * Alloc initial root node and add to t. The intention is for other + * components to use t to adjust the resulting root ref prior to commit. + */ + using mkfs_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + mkfs_ertr::future<> mkfs(Transaction &t); + + /** + * close + * + * TODO: currently a noop -- probably should be used to flush dirty blocks + */ + using close_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + close_ertr::future<> close(); + + /** + * replay_delta + * + * Intended for use in Journal::delta. For each delta, should decode delta, + * read relevant block from disk or cache (using correct type), and call + * CachedExtent::apply_delta marking the extent dirty. + * + * TODO: currently only handles the ROOT_LOCATION delta. + */ + using replay_delta_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + using replay_delta_ret = replay_delta_ertr::future<>; + replay_delta_ret replay_delta(paddr_t record_base, const delta_info_t &delta); + + /** + * print + * + * Dump summary of contents (TODO) + */ + std::ostream &print( + std::ostream &out) const { + return out; + } + +private: + SegmentManager &segment_manager; ///< ref to segment_manager + RootBlockRef root; ///< ref to current root + ExtentIndex extents; ///< set of live extents + CachedExtent::list dirty; ///< holds refs to dirty extents + + /// alloc buffer for cached extent + bufferptr alloc_cache_buf(size_t size) { + // TODO: memory pooling etc + auto bp = ceph::bufferptr(size); + bp.zero(); + return bp; + } + + /// Add extent to extents handling dirty and refcounting + void add_extent(CachedExtentRef ref); + + /// Remove extent from extents handling dirty and refcounting + void retire_extent(CachedExtentRef ref); +}; + +} diff --git a/src/crimson/os/seastore/cached_extent.cc b/src/crimson/os/seastore/cached_extent.cc new file mode 100644 index 0000000000000..e901913975a15 --- /dev/null +++ b/src/crimson/os/seastore/cached_extent.cc @@ -0,0 +1,68 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "crimson/os/seastore/cached_extent.h" + +#include "crimson/common/log.h" + +namespace { + [[maybe_unused]] seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_filestore); + } +} + +namespace crimson::os::seastore { + +#ifdef DEBUG_CACHED_EXTENT_REF + +void intrusive_ptr_add_ref(CachedExtent *ptr) +{ + intrusive_ptr_add_ref( + static_cast*>(ptr)); + logger().debug("intrusive_ptr_add_ref: {}", *ptr); +} + +void intrusive_ptr_release(CachedExtent *ptr) +{ + logger().debug("intrusive_ptr_release: {}", *ptr); + intrusive_ptr_release( + static_cast*>(ptr)); +} + +#endif + +std::ostream &operator<<(std::ostream &out, CachedExtent::extent_state_t state) +{ + switch (state) { + case CachedExtent::extent_state_t::INITIAL_WRITE_PENDING: + return out << "INITIAL_WRITE_PENDING"; + case CachedExtent::extent_state_t::MUTATION_PENDING: + return out << "MUTATION_PENDING"; + case CachedExtent::extent_state_t::CLEAN: + return out << "CLEAN"; + case CachedExtent::extent_state_t::DIRTY: + return out << "DIRTY"; + case CachedExtent::extent_state_t::INVALID: + return out << "INVALID"; + default: + return out << "UNKNOWN"; + } +} + +std::ostream &operator<<(std::ostream &out, const CachedExtent &ext) +{ + return ext.print(out); +} + +CachedExtent::~CachedExtent() +{ + if (parent_index) { + parent_index->erase(*this); + } +} + +} diff --git a/src/crimson/os/seastore/cached_extent.h b/src/crimson/os/seastore/cached_extent.h new file mode 100644 index 0000000000000..0e7bfa4bdd4cc --- /dev/null +++ b/src/crimson/os/seastore/cached_extent.h @@ -0,0 +1,446 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include + +#include +#include +#include + +#include "seastar/core/shared_future.hh" + +#include "include/buffer.h" +#include "crimson/common/errorator.h" +#include "crimson/os/seastore/seastore_types.h" + +namespace crimson::os::seastore { + +class CachedExtent; +using CachedExtentRef = boost::intrusive_ptr; + +// #define DEBUG_CACHED_EXTENT_REF +#ifdef DEBUG_CACHED_EXTENT_REF + +void intrusive_ptr_add_ref(CachedExtent *); +void intrusive_ptr_release(CachedExtent *); + +#endif + +template +using TCachedExtentRef = boost::intrusive_ptr; + +/** + * CachedExtent + */ +class ExtentIndex; +class CachedExtent : public boost::intrusive_ref_counter< + CachedExtent, boost::thread_unsafe_counter> { + enum class extent_state_t : uint8_t { + INITIAL_WRITE_PENDING, // In Transaction::write_set and fresh_block_list + MUTATION_PENDING, // In Transaction::write_set and mutated_block_list + CLEAN, // In Cache::extent_index, Transaction::read_set + // during write, contents match disk, version == 0 + DIRTY, // Same as CLEAN, but contents do not match disk, + // version > 0 + INVALID // Part of no ExtentIndex set + } state = extent_state_t::INVALID; + friend std::ostream &operator<<(std::ostream &, extent_state_t); + +public: + /** + * duplicate_for_write + * + * Implementation should return a fresh CachedExtentRef + * which represents a copy of *this until on_delta_write() + * is complete, at which point the user may assume *this + * will be in state INVALID. As such, the implementation + * may involve a copy of get_bptr(), or an ancillary + * structure which defers updating the actual buffer until + * on_delta_write(). + */ + virtual CachedExtentRef duplicate_for_write() = 0; + + /** + * prepare_write + * + * Called prior to reading buffer. + * Implemenation may use this callback to fully write out + * updates to the buffer. + */ + virtual void prepare_write() {} + + /** + * on_initial_write + * + * Called after commit of extent. State will be CLEAN. + * Implentation may use this call to fixup the buffer + * with the newly available absolute get_paddr(). + */ + virtual void on_initial_write() {} + + /** + * on_delta_write + * + * Called after commit of delta. State will be DIRTY. + * Implentation may use this call to fixup any relative + * references in the the buffer with the passed + * record_block_offset record location. + */ + virtual void on_delta_write(paddr_t record_block_offset) {} + + /** + * get_type + * + * Returns concrete type. + */ + virtual extent_types_t get_type() const = 0; + + friend std::ostream &operator<<(std::ostream &, extent_state_t); + std::ostream &print(std::ostream &out) const { + return out << "CachedExtent(addr=" << this + << ", type=" << get_type() + << ", version=" << version + << ", paddr=" << get_paddr() + << ", state=" << state + << ", refcount=" << use_count() + << ")"; + } + + /** + * get_delta + * + * Must return a valid delta usable in apply_delta() in submit_transaction + * if state == MUTATION_PENDING. + */ + virtual ceph::bufferlist get_delta() = 0; + + /** + * bl is a delta obtained previously from get_delta. The versions will + * match. Implementation should mutate buffer based on bl. base matches + * the address passed on_delta_write. + */ + virtual void apply_delta(paddr_t base, ceph::bufferlist &bl) = 0; + + /** + * Called on dirty CachedExtent implementation after replay. + * Implementation should perform any reads/in-memory-setup + * necessary. (for instance, the lba implementation will use this + * to load in lba_manager blocks) + */ + using complete_load_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + virtual complete_load_ertr::future<> complete_load() { + return complete_load_ertr::now(); + } + + /** + * cast + * + * Returns a TCachedExtentRef of the specified type. + * TODO: add dynamic check that the requested type is actually correct. + */ + template + TCachedExtentRef cast() { + return TCachedExtentRef(static_cast(this)); + } + + /// Returns true if extent is part of an open transaction + bool is_pending() const { + return state == extent_state_t::INITIAL_WRITE_PENDING || + state == extent_state_t::MUTATION_PENDING; + } + + /// Returns true if extent has a pending delta + bool is_mutation_pending() const { + return state == extent_state_t::MUTATION_PENDING; + } + + /// Returns true if extent is a fresh extent + bool is_initial_pending() const { + return state == extent_state_t::INITIAL_WRITE_PENDING; + } + + /// Returns true if extent is clean (does not have deltas on disk) + bool is_clean() const { + ceph_assert(is_valid()); + return state == extent_state_t::INITIAL_WRITE_PENDING || + state == extent_state_t::CLEAN; + } + + /// Returns true if extent is dirty (has deltas on disk) + bool is_dirty() const { + ceph_assert(is_valid()); + return !is_clean(); + } + + /// Returns true if extent has not been superceded or retired + bool is_valid() const { + return state != extent_state_t::INVALID; + } + + /** + * get_paddr + * + * Returns current address of extent. If is_initial_pending(), address will + * be relative, otherwise address will be absolute. + */ + paddr_t get_paddr() const { return poffset; } + + /// Returns length of extent + extent_len_t get_length() const { return ptr.length(); } + + /// Returns version, get_version() == 0 iff is_clean() + extent_version_t get_version() const { + return version; + } + + /// Get ref to raw buffer + bufferptr &get_bptr() { return ptr; } + const bufferptr &get_bptr() const { return ptr; } + + /// Compare by paddr + friend bool operator< (const CachedExtent &a, const CachedExtent &b) { + return a.poffset < b.poffset; + } + friend bool operator> (const CachedExtent &a, const CachedExtent &b) { + return a.poffset > b.poffset; + } + friend bool operator== (const CachedExtent &a, const CachedExtent &b) { + return a.poffset == b.poffset; + } + + virtual ~CachedExtent(); + +private: + friend struct paddr_cmp; + friend struct ref_paddr_cmp; + friend class ExtentIndex; + + /// Pointer to containing index (or null) + ExtentIndex *parent_index = nullptr; + + /// hook for intrusive extent_index + boost::intrusive::set_member_hook<> extent_index_hook; + using index_member_options = boost::intrusive::member_hook< + CachedExtent, + boost::intrusive::set_member_hook<>, + &CachedExtent::extent_index_hook>; + using index = boost::intrusive::set; + friend class ExtentIndex; + friend class Transaction; + + /// hook for intrusive ref list (mainly dirty or lru list) + boost::intrusive::list_member_hook<> primary_ref_list_hook; + using primary_ref_list_member_options = boost::intrusive::member_hook< + CachedExtent, + boost::intrusive::list_member_hook<>, + &CachedExtent::primary_ref_list_hook>; + using list = boost::intrusive::list< + CachedExtent, + primary_ref_list_member_options>; + + /// Actual data contents + ceph::bufferptr ptr; + + /// number of deltas since initial write + extent_version_t version = EXTENT_VERSION_NULL; + + /// address of original block -- relative iff is_pending() and is_clean() + paddr_t poffset; + + /// used to wait while in-progress commit completes + std::optional> io_wait_promise; + void set_io_wait() { + ceph_assert(!io_wait_promise); + io_wait_promise = seastar::shared_promise<>(); + } + void complete_io() { + ceph_assert(io_wait_promise); + io_wait_promise->set_value(); + io_wait_promise = std::nullopt; + } + seastar::future<> wait_io() { + if (!io_wait_promise) { + return seastar::now(); + } else { + return io_wait_promise->get_shared_future(); + } + } + +protected: + CachedExtent(ceph::bufferptr &&ptr) : ptr(std::move(ptr)) {} + CachedExtent(const CachedExtent &other) + : state(other.state), + ptr(other.ptr.c_str(), other.ptr.length()), + version(other.version), + poffset(other.poffset) {} + + friend class Cache; + template + static TCachedExtentRef make_cached_extent_ref(Args&&... args) { + return new T(std::forward(args)...); + } + + void set_paddr(paddr_t offset) { poffset = offset; } + + /** + * maybe_generate_relative + * + * There are three kinds of addresses one might want to + * store within an extent: + * - addr for a block within the same transaction relative to the + * physical location of this extent in the + * event that we will read it in the initial read of the extent + * - addr relative to the physical location of the next record to a + * block within that record to contain a delta for this extent in + * the event that we'll read it from a delta and overlay it onto a + * dirty representation of the extent. + * - absolute addr to a block already written outside of the current + * transaction. + * + * This helper checks addr and the current state to create the correct + * reference. + */ + paddr_t maybe_generate_relative(paddr_t addr) { + if (!addr.is_relative()) { + return addr; + } else if (is_mutation_pending()) { + return addr; + } else { + ceph_assert(get_paddr().is_relative()); + return addr - get_paddr(); + } + } + +}; + +std::ostream &operator<<(std::ostream &, CachedExtent::extent_state_t); +std::ostream &operator<<(std::ostream &, const CachedExtent&); + +/// Compare extents by paddr +struct paddr_cmp { + bool operator()(paddr_t lhs, const CachedExtent &rhs) const { + return lhs < rhs.poffset; + } + bool operator()(const CachedExtent &lhs, paddr_t rhs) const { + return lhs.poffset < rhs; + } +}; + +/// Compare extent refs by paddr +struct ref_paddr_cmp { + using is_transparent = paddr_t; + bool operator()(const CachedExtentRef &lhs, const CachedExtentRef &rhs) const { + return lhs->poffset < rhs->poffset; + } + bool operator()(const paddr_t &lhs, const CachedExtentRef &rhs) const { + return lhs < rhs->poffset; + } + bool operator()(const CachedExtentRef &lhs, const paddr_t &rhs) const { + return lhs->poffset < rhs; + } +}; + +template +class addr_extent_list_base_t + : public std::list> {}; + +using pextent_list_t = addr_extent_list_base_t; + +template +class addr_extent_set_base_t + : public std::set {}; + +using pextent_set_t = addr_extent_set_base_t< + paddr_t, + CachedExtentRef, + ref_paddr_cmp + >; + +template +using t_pextent_list_t = addr_extent_list_base_t>; + +/** + * ExtentIndex + * + * Index of CachedExtent & by poffset, does not hold a reference, + * user must ensure each extent is removed prior to deletion + */ +class ExtentIndex { + friend class Cache; + CachedExtent::index extent_index; +public: + auto get_overlap(paddr_t addr, segment_off_t len) { + auto bottom = extent_index.upper_bound(addr, paddr_cmp()); + if (bottom != extent_index.begin()) + --bottom; + if (bottom != extent_index.end() && + bottom->get_paddr().add_offset(bottom->get_length()) <= addr) + ++bottom; + + auto top = extent_index.upper_bound(addr.add_offset(len), paddr_cmp()); + return std::make_pair( + bottom, + top + ); + } + + void clear() { + extent_index.clear(); + } + + void insert(CachedExtent &extent) { + // sanity check + auto [a, b] = get_overlap( + extent.get_paddr(), + extent.get_length()); + ceph_assert(a == b); + + extent_index.insert(extent); + extent.parent_index = this; + } + + void erase(CachedExtent &extent) { + extent_index.erase(extent); + extent.parent_index = nullptr; + } + + bool empty() const { + return extent_index.empty(); + } + + auto find_offset(paddr_t offset) { + return extent_index.find(offset, paddr_cmp()); + } + + auto begin() { + return extent_index.begin(); + } + + auto end() { + return extent_index.end(); + } + + void merge(ExtentIndex &&other) { + for (auto it = other.extent_index.begin(); + it != other.extent_index.end(); + ) { + auto &ext = *it; + ++it; + other.extent_index.erase(ext); + extent_index.insert(ext); + } + } + + template + void remove(T &l) { + for (auto &ext : l) { + extent_index.erase(l); + } + } +}; + + +} diff --git a/src/crimson/os/seastore/root_block.cc b/src/crimson/os/seastore/root_block.cc new file mode 100644 index 0000000000000..cd7150310af00 --- /dev/null +++ b/src/crimson/os/seastore/root_block.cc @@ -0,0 +1,28 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "crimson/os/seastore/root_block.h" + +namespace crimson::os::seastore { + +void RootBlock::prepare_write() +{ + bufferlist tmp; + ::encode(root, tmp); + auto bpiter = tmp.begin(); + bpiter.copy(tmp.length(), get_bptr().c_str()); +} + +CachedExtent::complete_load_ertr::future<> RootBlock::complete_load() +{ + auto biter = get_bptr().cbegin(); + root.decode(biter); + return complete_load_ertr::now(); +} + +void RootBlock::set_lba_root(btree_lba_root_t lba_root) +{ + root.lba_root = lba_root; +} + +} diff --git a/src/crimson/os/seastore/root_block.h b/src/crimson/os/seastore/root_block.h new file mode 100644 index 0000000000000..96fc14dfb6d33 --- /dev/null +++ b/src/crimson/os/seastore/root_block.h @@ -0,0 +1,83 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/os/seastore/cached_extent.h" + +namespace crimson::os::seastore { + +using depth_t = uint32_t; + +/* Belongs in lba_manager/btree, TODO: generalize this to + * permit more than one lba_manager implementation + */ +struct btree_lba_root_t { + depth_t lba_depth; + depth_t segment_depth; + paddr_t lba_root_addr; + paddr_t segment_root; + + DENC(btree_lba_root_t, v, p) { + DENC_START(1, 1, p); + denc(v.lba_depth, p); + denc(v.segment_depth, p); + denc(v.lba_root_addr, p); + denc(v.segment_root, p); + DENC_FINISH(p); + } +}; + +struct root_block_t { + btree_lba_root_t lba_root; + + DENC(root_block_t, v, p) { + DENC_START(1, 1, p); + denc(v.lba_root, p); + DENC_FINISH(p); + } +}; + +struct RootBlock : CachedExtent { + constexpr static segment_off_t SIZE = 4<<10; + using Ref = TCachedExtentRef; + + root_block_t root; + + template + RootBlock(T&&... t) : CachedExtent(std::forward(t)...) {} + + CachedExtentRef duplicate_for_write() final { + return CachedExtentRef(new RootBlock(*this)); + }; + + void prepare_write() final; + + static constexpr extent_types_t TYPE = extent_types_t::ROOT; + extent_types_t get_type() const final { + return extent_types_t::ROOT; + } + + ceph::bufferlist get_delta() final { + ceph_assert(0 == "TODO"); + return ceph::bufferlist(); + } + + void apply_delta(paddr_t base, ceph::bufferlist &bl) final { + ceph_assert(0 == "TODO"); + } + + complete_load_ertr::future<> complete_load() final; + + void set_lba_root(btree_lba_root_t lba_root); + btree_lba_root_t &get_lba_root() { + return root.lba_root; + } + +}; +using RootBlockRef = RootBlock::Ref; + +} + +WRITE_CLASS_DENC(crimson::os::seastore::btree_lba_root_t) +WRITE_CLASS_DENC(crimson::os::seastore::root_block_t) diff --git a/src/crimson/os/seastore/seastore_types.h b/src/crimson/os/seastore/seastore_types.h index ea94722d917c4..8cd4c4a631e29 100644 --- a/src/crimson/os/seastore/seastore_types.h +++ b/src/crimson/os/seastore/seastore_types.h @@ -52,6 +52,10 @@ struct paddr_t { return segment == REL_SEG_ID; } + paddr_t add_offset(segment_off_t o) const { + return paddr_t{segment, offset + o}; + } + paddr_t add_relative(paddr_t o) const { assert(o.is_relative()); assert(!is_relative()); @@ -98,18 +102,19 @@ constexpr laddr_t L_ADDR_ROOT = std::numeric_limits::max() - 1; constexpr laddr_t L_ADDR_LBAT = std::numeric_limits::max() - 2; // logical offset, see LBAManager, TransactionManager -using loff_t = uint32_t; -constexpr loff_t L_OFF_NULL = std::numeric_limits::max(); +using extent_len_t = uint32_t; +constexpr extent_len_t EXTENT_LEN_MAX = + std::numeric_limits::max(); -struct laddr_list_t : std::list> { +struct laddr_list_t : std::list> { template laddr_list_t(T&&... args) - : std::list>(std::forward(args)...) {} + : std::list>(std::forward(args)...) {} }; -struct paddr_list_t : std::list> { +struct paddr_list_t : std::list> { template paddr_list_t(T&&... args) - : std::list>(std::forward(args)...) {} + : std::list>(std::forward(args)...) {} }; std::ostream &operator<<(std::ostream &out, const laddr_list_t &rhs); diff --git a/src/test/crimson/seastore/CMakeLists.txt b/src/test/crimson/seastore/CMakeLists.txt index 1e447a314200d..8784f85ce29a5 100644 --- a/src/test/crimson/seastore/CMakeLists.txt +++ b/src/test/crimson/seastore/CMakeLists.txt @@ -7,3 +7,12 @@ target_link_libraries( unittest_seastore_journal ${CMAKE_DL_LIBS} crimson-seastore) + +add_executable(unittest_seastore_cache + test_seastore_cache.cc + ../gtest_seastar.cc) +add_ceph_unittest(unittest_seastore_cache) +target_link_libraries( + unittest_seastore_cache + ${CMAKE_DL_LIBS} + crimson-seastore) diff --git a/src/test/crimson/seastore/test_seastore_cache.cc b/src/test/crimson/seastore/test_seastore_cache.cc new file mode 100644 index 0000000000000..d6ecd83435e12 --- /dev/null +++ b/src/test/crimson/seastore/test_seastore_cache.cc @@ -0,0 +1,251 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/crimson/gtest_seastar.h" + +#include "crimson/common/log.h" +#include "crimson/os/seastore/cache.h" +#include "crimson/os/seastore/segment_manager/ephemeral.h" + +using namespace crimson; +using namespace crimson::os; +using namespace crimson::os::seastore; + +namespace { + [[maybe_unused]] seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_test); + } +} + +struct CacheTestBlock : CachedExtent { + constexpr static segment_off_t SIZE = 4<<10; + using Ref = TCachedExtentRef; + + template + CacheTestBlock(T&&... t) : CachedExtent(std::forward(t)...) {} + + CachedExtentRef duplicate_for_write() final { + return CachedExtentRef(new CacheTestBlock(*this)); + }; + + static constexpr extent_types_t TYPE = extent_types_t::TEST_BLOCK; + extent_types_t get_type() const final { + return TYPE; + } + + ceph::bufferlist get_delta() final { + return ceph::bufferlist(); + } + + void apply_delta(paddr_t delta_base, ceph::bufferlist &bl) final { + ceph_assert(0 == "TODO"); + } + + void set_contents(char c) { + ::memset(get_bptr().c_str(), c, get_length()); + } + + int checksum() { + return ceph_crc32c( + 1, + (const unsigned char *)get_bptr().c_str(), + get_length()); + } +}; + +struct cache_test_t : public seastar_test_suite_t { + segment_manager::EphemeralSegmentManager segment_manager; + Cache cache; + paddr_t current{0, 0}; + + cache_test_t() + : segment_manager(segment_manager::DEFAULT_TEST_EPHEMERAL), + cache(segment_manager) {} + + seastar::future> submit_transaction( + TransactionRef t) { + auto record = cache.try_construct_record(*t); + if (!record) { + return seastar::make_ready_future>( + std::nullopt); + } + + bufferlist bl; + for (auto &&block : record->extents) { + bl.append(block.bl); + } + + ceph_assert((segment_off_t)bl.length() < + segment_manager.get_segment_size()); + if (current.offset + (segment_off_t)bl.length() > + segment_manager.get_segment_size()) + current = paddr_t{current.segment + 1, 0}; + + auto prev = current; + current.offset += bl.length(); + return segment_manager.segment_write( + prev, + std::move(bl), + true + ).safe_then( + [this, prev, t=std::move(t)] { + cache.complete_commit(*t, prev); + return seastar::make_ready_future>(prev); + }, + crimson::ct_error::all_same_way([](auto e) { + ASSERT_FALSE("failed to submit"); + }) + ); + } + + auto get_transaction() { + return TransactionRef(new Transaction); + } + + seastar::future<> set_up_fut() final { + return segment_manager.init().safe_then( + [this] { + return seastar::do_with( + TransactionRef(new Transaction()), + [this](auto &transaction) { + return cache.mkfs(*transaction).safe_then( + [this, &transaction] { + return submit_transaction(std::move(transaction)).then( + [](auto p) { + ASSERT_TRUE(p); + }); + }); + }); + }).handle_error( + crimson::ct_error::all_same_way([](auto e) { + ASSERT_FALSE("failed to submit"); + }) + ); + } + + seastar::future<> tear_down_fut() final { + return seastar::now(); + } +}; + +TEST_F(cache_test_t, test_addr_fixup) +{ + run_async([this] { + paddr_t addr; + int csum = 0; + { + auto t = get_transaction(); + auto extent = cache.alloc_new_extent( + *t, + CacheTestBlock::SIZE); + extent->set_contents('c'); + csum = extent->checksum(); + auto ret = submit_transaction(std::move(t)).get0(); + ASSERT_TRUE(ret); + addr = extent->get_paddr(); + } + { + auto t = get_transaction(); + auto extent = cache.get_extent( + *t, + addr, + CacheTestBlock::SIZE).unsafe_get0(); + ASSERT_EQ(extent->get_paddr(), addr); + ASSERT_EQ(extent->checksum(), csum); + } + }); +} + +TEST_F(cache_test_t, test_dirty_extent) +{ + run_async([this] { + paddr_t addr; + int csum = 0; + int csum2 = 0; + { + // write out initial test block + auto t = get_transaction(); + auto extent = cache.alloc_new_extent( + *t, + CacheTestBlock::SIZE); + extent->set_contents('c'); + csum = extent->checksum(); + auto reladdr = extent->get_paddr(); + ASSERT_TRUE(reladdr.is_relative()); + { + // test that read with same transaction sees new block though + // uncommitted + auto extent = cache.get_extent( + *t, + reladdr, + CacheTestBlock::SIZE).unsafe_get0(); + ASSERT_TRUE(extent->is_clean()); + ASSERT_TRUE(extent->is_pending()); + ASSERT_TRUE(extent->get_paddr().is_relative()); + ASSERT_EQ(extent->get_version(), 0); + ASSERT_EQ(csum, extent->checksum()); + } + auto ret = submit_transaction(std::move(t)).get0(); + ASSERT_TRUE(ret); + addr = extent->get_paddr(); + } + { + // read back test block + auto t = get_transaction(); + auto extent = cache.get_extent( + *t, + addr, + CacheTestBlock::SIZE).unsafe_get0(); + // duplicate and reset contents + extent = cache.duplicate_for_write(*t, extent)->cast(); + extent->set_contents('c'); + csum2 = extent->checksum(); + ASSERT_EQ(extent->get_paddr(), addr); + { + // test that concurrent read with fresh transaction sees old + // block + auto t2 = get_transaction(); + auto extent = cache.get_extent( + *t2, + addr, + CacheTestBlock::SIZE).unsafe_get0(); + ASSERT_TRUE(extent->is_clean()); + ASSERT_FALSE(extent->is_pending()); + ASSERT_EQ(addr, extent->get_paddr()); + ASSERT_EQ(extent->get_version(), 0); + ASSERT_EQ(csum, extent->checksum()); + } + { + // test that read with same transaction sees new block + auto extent = cache.get_extent( + *t, + addr, + CacheTestBlock::SIZE).unsafe_get0(); + ASSERT_TRUE(extent->is_dirty()); + ASSERT_TRUE(extent->is_pending()); + ASSERT_EQ(addr, extent->get_paddr()); + ASSERT_EQ(extent->get_version(), 1); + ASSERT_EQ(csum2, extent->checksum()); + } + // submit transaction + auto ret = submit_transaction(std::move(t)).get0(); + ASSERT_TRUE(ret); + ASSERT_TRUE(extent->is_dirty()); + ASSERT_EQ(addr, extent->get_paddr()); + ASSERT_EQ(extent->get_version(), 1); + ASSERT_EQ(extent->checksum(), csum2); + } + { + // test that fresh transaction now sees newly dirty block + auto t = get_transaction(); + auto extent = cache.get_extent( + *t, + addr, + CacheTestBlock::SIZE).unsafe_get0(); + ASSERT_TRUE(extent->is_dirty()); + ASSERT_EQ(addr, extent->get_paddr()); + ASSERT_EQ(extent->get_version(), 1); + ASSERT_EQ(csum2, extent->checksum()); + } + }); +} -- 2.39.5