From c391e8677c14032349af03b6d6be82e85ae44b9b Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 26 May 2020 18:15:09 -0700 Subject: [PATCH] crimson/os/seastore: add transaction_manager Signed-off-by: Samuel Just --- src/crimson/os/seastore/CMakeLists.txt | 1 + .../os/seastore/transaction_manager.cc | 144 +++++++ src/crimson/os/seastore/transaction_manager.h | 274 +++++++++++++ src/test/crimson/seastore/CMakeLists.txt | 9 + .../seastore/test_transaction_manager.cc | 365 ++++++++++++++++++ 5 files changed, 793 insertions(+) create mode 100644 src/crimson/os/seastore/transaction_manager.cc create mode 100644 src/crimson/os/seastore/transaction_manager.h create mode 100644 src/test/crimson/seastore/test_transaction_manager.cc diff --git a/src/crimson/os/seastore/CMakeLists.txt b/src/crimson/os/seastore/CMakeLists.txt index eda36b4f2721b..92edd040e6c13 100644 --- a/src/crimson/os/seastore/CMakeLists.txt +++ b/src/crimson/os/seastore/CMakeLists.txt @@ -3,6 +3,7 @@ add_library(crimson-seastore seastore_types.cc segment_manager/ephemeral.cc segment_manager.cc + transaction_manager.cc journal.cc cache.cc lba_manager.cc diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc new file mode 100644 index 0000000000000..89281c9550d4f --- /dev/null +++ b/src/crimson/os/seastore/transaction_manager.cc @@ -0,0 +1,144 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/denc.h" +#include "include/intarith.h" + +#include "crimson/common/log.h" + +#include "crimson/os/seastore/transaction_manager.h" +#include "crimson/os/seastore/segment_manager.h" +#include "crimson/os/seastore/journal.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_filestore); + } +} + +namespace crimson::os::seastore { + +TransactionManager::TransactionManager( + SegmentManager &segment_manager, + Journal &journal, + Cache &cache, + LBAManager &lba_manager) + : segment_manager(segment_manager), + cache(cache), + lba_manager(lba_manager), + journal(journal) +{ + journal.set_segment_provider(this); +} + +TransactionManager::mkfs_ertr::future<> TransactionManager::mkfs() +{ + return journal.open_for_write().safe_then([this] { + logger().debug("TransactionManager::mkfs: about to do_with"); + return seastar::do_with( + lba_manager.create_transaction(), + [this](auto &transaction) { + logger().debug("TransactionManager::mkfs: about to cache.mkfs"); + return cache.mkfs(*transaction + ).safe_then([this, &transaction] { + return lba_manager.mkfs(*transaction); + }).safe_then([this, &transaction] { + logger().debug("TransactionManager::mkfs: about to submit_transaction"); + return submit_transaction(std::move(transaction)).handle_error( + crimson::ct_error::eagain::handle([] { + ceph_assert(0 == "eagain impossible"); + return mkfs_ertr::now(); + }), + mkfs_ertr::pass_further{} + ); + }); + }); + }).safe_then([this] { + return journal.close(); + }); +} + +TransactionManager::mount_ertr::future<> TransactionManager::mount() +{ + return journal.replay([this](auto paddr, const auto &e) { + return cache.replay_delta(paddr, e); + }).safe_then([this] { + return journal.open_for_write(); + }).handle_error( + mount_ertr::pass_further{}, + crimson::ct_error::all_same_way([] { + ceph_assert(0 == "unhandled error"); + return mount_ertr::now(); + })); +} + +TransactionManager::close_ertr::future<> TransactionManager::close() { + return cache.close( + ).safe_then([this] { + return journal.close(); + }); +} + +TransactionManager::ref_ret TransactionManager::inc_ref( + Transaction &t, + LogicalCachedExtentRef &ref) +{ + return lba_manager.incref_extent(t, ref->get_laddr() + ).handle_error( + ref_ertr::pass_further{}, + ct_error::all_same_way([](auto e) { + ceph_assert(0 == "unhandled error, TODO"); + })); +} + +TransactionManager::ref_ret TransactionManager::inc_ref( + Transaction &t, + laddr_t offset) +{ + return lba_manager.incref_extent(t, offset); +} + +TransactionManager::ref_ret TransactionManager::dec_ref( + Transaction &t, + LogicalCachedExtentRef &ref) +{ + return dec_ref(t, ref->get_laddr() + ).handle_error( + ref_ertr::pass_further{}, + ct_error::all_same_way([](auto e) { + ceph_assert(0 == "unhandled error, TODO"); + })); +} + +TransactionManager::ref_ret TransactionManager::dec_ref( + Transaction &t, + laddr_t offset) +{ + // TODO: need to retire the extent (only) if it's live, will need cache call + return lba_manager.decref_extent(t, offset); +} + +TransactionManager::submit_transaction_ertr::future<> +TransactionManager::submit_transaction( + TransactionRef t) +{ + auto record = cache.try_construct_record(*t); + if (!record) { + return crimson::ct_error::eagain::make(); + } + + logger().debug("TransactionManager::submit_transaction"); + + return journal.submit_record(std::move(*record)).safe_then( + [this, t=std::move(t)](paddr_t addr) { + cache.complete_commit(*t, addr); + }, + submit_transaction_ertr::pass_further{}, + crimson::ct_error::all_same_way([](auto e) { + ceph_assert(0 == "Hit error submitting to journal"); + })); +} + +TransactionManager::~TransactionManager() {} + +} diff --git a/src/crimson/os/seastore/transaction_manager.h b/src/crimson/os/seastore/transaction_manager.h new file mode 100644 index 0000000000000..4e10f272df7c8 --- /dev/null +++ b/src/crimson/os/seastore/transaction_manager.h @@ -0,0 +1,274 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include "include/ceph_assert.h" +#include "include/buffer.h" + +#include "crimson/osd/exceptions.h" + +#include "crimson/os/seastore/seastore_types.h" +#include "crimson/os/seastore/cache.h" +#include "crimson/os/seastore/segment_manager.h" +#include "crimson/os/seastore/lba_manager.h" +#include "crimson/os/seastore/journal.h" + +namespace crimson::os::seastore { +class Journal; + +/** + * LogicalCachedExtent + * + * CachedExtent with associated lba mapping. + * + * Users of TransactionManager should be using extents derived from + * LogicalCachedExtent. + */ +class LogicalCachedExtent : public CachedExtent { +public: + template + LogicalCachedExtent(T&&... t) : CachedExtent(std::forward(t)...) {} + + void set_pin(LBAPinRef &&pin) { this->pin = std::move(pin); } + + LBAPin &get_pin() { + assert(pin); + return *pin; + } + + laddr_t get_laddr() const { + assert(pin); + return pin->get_laddr(); + } + +private: + LBAPinRef pin; +}; + +using LogicalCachedExtentRef = TCachedExtentRef; +struct ref_laddr_cmp { + using is_transparent = laddr_t; + bool operator()(const LogicalCachedExtentRef &lhs, + const LogicalCachedExtentRef &rhs) const { + return lhs->get_laddr() < rhs->get_laddr(); + } + bool operator()(const laddr_t &lhs, + const LogicalCachedExtentRef &rhs) const { + return lhs < rhs->get_laddr(); + } + bool operator()(const LogicalCachedExtentRef &lhs, + const laddr_t &rhs) const { + return lhs->get_laddr() < rhs; + } +}; + +using lextent_set_t = addr_extent_set_base_t< + laddr_t, + LogicalCachedExtentRef, + ref_laddr_cmp + >; + +template +using lextent_list_t = addr_extent_list_base_t< + laddr_t, TCachedExtentRef>; + +/** + * TransactionManager + * + * Abstraction hiding reading and writing to persistence. + * Exposes transaction based interface with read isolation. + */ +class TransactionManager : public JournalSegmentProvider { +public: + TransactionManager( + SegmentManager &segment_manager, + Journal &journal, + Cache &cache, + LBAManager &lba_manager); + + segment_id_t next = 0; + get_segment_ret get_segment() final { + // TODO -- part of gc + return get_segment_ret( + get_segment_ertr::ready_future_marker{}, + next++); + } + + void put_segment(segment_id_t segment) final { + // TODO -- part of gc + return; + } + + /// Writes initial metadata to disk + using mkfs_ertr = crimson::errorator< + crimson::ct_error::input_output_error + >; + mkfs_ertr::future<> mkfs(); + + /// Reads initial metadata from disk + using mount_ertr = crimson::errorator< + crimson::ct_error::input_output_error + >; + mount_ertr::future<> mount(); + + /// Closes transaction_manager + using close_ertr = crimson::errorator< + crimson::ct_error::input_output_error + >; + close_ertr::future<> close(); + + /// Creates empty transaction + TransactionRef create_transaction() { + return lba_manager.create_transaction(); + } + + /** + * Read extents corresponding to specified lba range + */ + using read_extent_ertr = SegmentManager::read_ertr; + template + using read_extent_ret = read_extent_ertr::future>; + template + read_extent_ret read_extents( + Transaction &t, + laddr_t offset, + extent_len_t length) + { + std::unique_ptr> ret = + std::make_unique>(); + auto &ret_ref = *ret; + std::unique_ptr pin_list = + std::make_unique(); + auto &pin_list_ref = *pin_list; + return lba_manager.get_mapping( + t, offset, length + ).safe_then([this, &t, &pin_list_ref, &ret_ref](auto pins) { + crimson::get_logger(ceph_subsys_filestore).debug( + "read_extents: mappings {}", + pins); + pins.swap(pin_list_ref); + return crimson::do_for_each( + pin_list_ref.begin(), + pin_list_ref.end(), + [this, &t, &ret_ref](auto &pin) { + crimson::get_logger(ceph_subsys_filestore).debug( + "read_extents: get_extent {}~{}", + pin->get_paddr(), + pin->get_length()); + return cache.get_extent( + t, + pin->get_paddr(), + pin->get_length() + ).safe_then([&pin, &ret_ref](auto ref) mutable { + ref->set_pin(std::move(pin)); + ret_ref.push_back(std::make_pair(ref->get_laddr(), ref)); + crimson::get_logger(ceph_subsys_filestore).debug( + "read_extents: got extent {}", + *ref); + return read_extent_ertr::now(); + }); + }); + }).safe_then([ret=std::move(ret), pin_list=std::move(pin_list)]() mutable { + return read_extent_ret( + read_extent_ertr::ready_future_marker{}, + std::move(*ret)); + }); + } + + /// Obtain mutable copy of extent + LogicalCachedExtentRef get_mutable_extent(Transaction &t, LogicalCachedExtentRef ref) { + auto ret = cache.duplicate_for_write( + t, + ref)->cast(); + ret->set_pin(ref->get_pin().duplicate()); + return ret; + } + + using ref_ertr = LBAManager::ref_ertr; + using ref_ret = LBAManager::ref_ret; + + /// Add refcount for ref + ref_ret inc_ref( + Transaction &t, + LogicalCachedExtentRef &ref); + + /// Add refcount for offset + ref_ret inc_ref( + Transaction &t, + laddr_t offset); + + /// Remove refcount for ref + ref_ret dec_ref( + Transaction &t, + LogicalCachedExtentRef &ref); + + /// Remove refcount for offset + ref_ret dec_ref( + Transaction &t, + laddr_t offset); + + /** + * alloc_extent + * + * Allocates a new block of type T with the minimum lba range of size len + * greater than hint. + */ + using alloc_extent_ertr = SegmentManager::read_ertr; + template + using alloc_extent_ret = alloc_extent_ertr::future>; + template + alloc_extent_ret alloc_extent( + Transaction &t, + laddr_t hint, + extent_len_t len) { + auto ext = cache.alloc_new_extent( + t, + len); + return lba_manager.alloc_extent( + t, + hint, + len, + ext->get_paddr() + ).safe_then([ext=std::move(ext)](auto &&ref) mutable { + ext->set_pin(std::move(ref)); + return alloc_extent_ertr::make_ready_future>( + std::move(ext)); + }); + } + + /** + * submit_transaction + * + * Atomically submits transaction to persistence + */ + using submit_transaction_ertr = crimson::errorator< + crimson::ct_error::eagain, // Caller should retry transaction from beginning + crimson::ct_error::input_output_error // Media error + >; + submit_transaction_ertr::future<> submit_transaction(TransactionRef); + + ~TransactionManager(); + +private: + friend class Transaction; + + SegmentManager &segment_manager; + Cache &cache; + LBAManager &lba_manager; + Journal &journal; +}; +using TransactionManagerRef = std::unique_ptr; + +} diff --git a/src/test/crimson/seastore/CMakeLists.txt b/src/test/crimson/seastore/CMakeLists.txt index 0a0863c66332a..9e801cd37119a 100644 --- a/src/test/crimson/seastore/CMakeLists.txt +++ b/src/test/crimson/seastore/CMakeLists.txt @@ -1,3 +1,12 @@ +add_executable(unittest_transaction_manager + test_transaction_manager.cc + ../gtest_seastar.cc) +add_ceph_unittest(unittest_transaction_manager) +target_link_libraries( + unittest_transaction_manager + ${CMAKE_DL_LIBS} + crimson-seastore) + add_executable(unittest_btree_lba_manager test_btree_lba_manager.cc ../gtest_seastar.cc) diff --git a/src/test/crimson/seastore/test_transaction_manager.cc b/src/test/crimson/seastore/test_transaction_manager.cc new file mode 100644 index 0000000000000..e0d8ef7f694c5 --- /dev/null +++ b/src/test/crimson/seastore/test_transaction_manager.cc @@ -0,0 +1,365 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/crimson/gtest_seastar.h" + +#include "crimson/os/seastore/cache.h" +#include "crimson/os/seastore/transaction_manager.h" +#include "crimson/os/seastore/segment_manager.h" + +using namespace crimson; +using namespace crimson::os; +using namespace crimson::os::seastore; + +namespace { + [[maybe_unused]] seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_test); + } +} + +struct test_extent_desc_t { + size_t len = 0; + unsigned checksum = 0; + + bool operator==(const test_extent_desc_t &rhs) const { + return (len == rhs.len && + checksum == rhs.checksum); + } + bool operator!=(const test_extent_desc_t &rhs) const { + return !(*this == rhs); + } +}; + +std::ostream &operator<<(std::ostream &lhs, const test_extent_desc_t &rhs) { + return lhs << "test_extent_desc_t(len=" << rhs.len + << ", checksum=" << rhs.checksum << ")"; +} + +struct test_extent_record_t { + test_extent_desc_t desc; + unsigned refcount = 0; + test_extent_record_t() = default; + test_extent_record_t( + const test_extent_desc_t &desc, + unsigned refcount) : desc(desc), refcount(refcount) {} + + void update(const test_extent_desc_t &to) { + desc = to; + } + + bool operator==(const test_extent_desc_t &rhs) const { + return desc == rhs; + } + bool operator!=(const test_extent_desc_t &rhs) const { + return desc != rhs; + } +}; + +std::ostream &operator<<(std::ostream &lhs, const test_extent_record_t &rhs) { + return lhs << "test_extent_record_t(" << rhs.desc + << ", refcount=" << rhs.refcount << ")"; +} + + +struct TestBlock : LogicalCachedExtent { + using Ref = TCachedExtentRef; + + TestBlock(ceph::bufferptr &&ptr) : LogicalCachedExtent(std::move(ptr)) {} + TestBlock(const TestBlock &other) : LogicalCachedExtent(other) {} + + CachedExtentRef duplicate_for_write() final { + return CachedExtentRef(new TestBlock(*this)); + }; + + static constexpr extent_types_t TYPE = extent_types_t::TEST_BLOCK; + extent_types_t get_type() const final { + return TYPE; + } + + ceph::bufferlist get_delta() final { + return ceph::bufferlist(); + } + + void set_contents(char c) { + ::memset(get_bptr().c_str(), c, get_length()); + } + + test_extent_desc_t get_desc() { + return { get_length(), get_crc32c(1) }; + } + + void apply_delta(paddr_t delta_base, ceph::bufferlist &bl) final { + ceph_assert(0 == "TODO"); + } +}; +using TestBlockRef = TCachedExtentRef; + +struct transaction_manager_test_t : public seastar_test_suite_t { + std::unique_ptr segment_manager; + Journal journal; + Cache cache; + LBAManagerRef lba_manager; + TransactionManager tm; + + transaction_manager_test_t() + : segment_manager(create_ephemeral(segment_manager::DEFAULT_TEST_EPHEMERAL)), + journal(*segment_manager), + cache(*segment_manager), + lba_manager( + lba_manager::create_lba_manager(*segment_manager, cache)), + tm(*segment_manager, journal, cache, *lba_manager) {} + + seastar::future<> set_up_fut() final { + return segment_manager->init().safe_then([this] { + return tm.mkfs(); + }).safe_then([this] { + return tm.mount(); + }).handle_error( + crimson::ct_error::all_same_way([] { + ASSERT_FALSE("Unable to mount"); + }) + ); + } + + seastar::future<> tear_down_fut() final { + return tm.close( + ).handle_error( + crimson::ct_error::all_same_way([] { + ASSERT_FALSE("Unable to close"); + }) + ); + } + struct test_extents_t : std::map { + private: + void check_available(laddr_t addr, extent_len_t len) { + auto iter = upper_bound(addr); + if (iter != begin()) { + auto liter = iter; + liter--; + EXPECT_FALSE(liter->first + liter->second.desc.len > addr); + } + if (iter != end()) { + EXPECT_FALSE(iter->first < addr + len); + } + } + void check_hint(laddr_t hint, laddr_t addr, extent_len_t len) { + auto iter = lower_bound(hint); + laddr_t last = hint; + while (true) { + if (iter == end() || iter->first > addr) { + EXPECT_EQ(addr, last); + break; + } + EXPECT_FALSE(iter->first - last > len); + last = iter->first + iter->second.desc.len; + ++iter; + } + } + public: + void insert(TestBlock &extent) { + check_available(extent.get_laddr(), extent.get_length()); + emplace( + std::make_pair( + extent.get_laddr(), + test_extent_record_t{extent.get_desc(), 1} + )); + } + void alloced(laddr_t hint, TestBlock &extent) { + check_hint(hint, extent.get_laddr(), extent.get_length()); + insert(extent); + } + } test_mappings; + + struct test_transaction_t { + TransactionRef t; + test_extents_t mappings; + }; + + test_transaction_t create_transaction() { + return { tm.create_transaction(), test_mappings }; + } + + TestBlockRef alloc_extent( + test_transaction_t &t, + laddr_t hint, + extent_len_t len, + char contents) { + auto extent = tm.alloc_extent( + *(t.t), + hint, + len).unsafe_get0(); + extent->set_contents(contents); + EXPECT_FALSE(t.mappings.count(extent->get_laddr())); + EXPECT_EQ(len, extent->get_length()); + t.mappings.alloced(hint, *extent); + return extent; + } + + void check_mappings() { + auto t = create_transaction(); + check_mappings(t); + } + + TestBlockRef get_extent( + test_transaction_t &t, + laddr_t addr, + extent_len_t len) { + ceph_assert(t.mappings.count(addr)); + ceph_assert(t.mappings[addr].desc.len == len); + + auto ret_list = tm.read_extents( + *t.t, addr, len + ).unsafe_get0(); + EXPECT_EQ(ret_list.size(), 1); + auto &ext = ret_list.begin()->second; + auto &laddr = ret_list.begin()->first; + EXPECT_EQ(addr, laddr); + EXPECT_EQ(addr, ext->get_laddr()); + return ext; + } + + TestBlockRef mutate_extent( + test_transaction_t &t, + TestBlockRef ref, + char contents) { + ceph_assert(t.mappings.count(ref->get_laddr())); + ceph_assert(t.mappings[ref->get_laddr()].desc.len == ref->get_length()); + auto ext = tm.get_mutable_extent(*t.t, ref)->cast(); + EXPECT_EQ(ext->get_laddr(), ref->get_laddr()); + EXPECT_EQ(ext->get_desc(), ref->get_desc()); + ext->set_contents(contents); + t.mappings[ext->get_laddr()].update(ext->get_desc()); + return ext; + } + + void inc_ref(test_transaction_t &t, laddr_t offset) { + ceph_assert(t.mappings.count(offset)); + ceph_assert(t.mappings[offset].refcount > 0); + auto refcnt = tm.inc_ref(*t.t, offset).unsafe_get0(); + t.mappings[offset].refcount++; + EXPECT_EQ(refcnt, t.mappings[offset].refcount); + } + + void dec_ref(test_transaction_t &t, laddr_t offset) { + ceph_assert(t.mappings.count(offset)); + ceph_assert(t.mappings[offset].refcount > 0); + auto refcnt = tm.dec_ref(*t.t, offset).unsafe_get0(); + t.mappings[offset].refcount--; + EXPECT_EQ(refcnt, t.mappings[offset].refcount); + if (t.mappings[offset].refcount == 0) { + t.mappings.erase(offset); + } + } + + void check_mappings(test_transaction_t &t) { + for (auto &i: t.mappings) { + logger().debug("check_mappings: {}->{}", i.first, i.second); + auto ext = get_extent(t, i.first, i.second.desc.len); + EXPECT_EQ(i.second, ext->get_desc()); + } + } + + void submit_transaction(test_transaction_t t) { + tm.submit_transaction(std::move(t.t)).unsafe_get(); + test_mappings = t.mappings; + } +}; + +TEST_F(transaction_manager_test_t, basic) +{ + constexpr laddr_t SIZE = 4096; + run_async([this] { + constexpr laddr_t ADDR = 0xFF * SIZE; + { + auto t = create_transaction(); + auto extent = alloc_extent( + t, + ADDR, + SIZE, + 'a'); + ASSERT_EQ(ADDR, extent->get_laddr()); + check_mappings(t); + check_mappings(); + submit_transaction(std::move(t)); + check_mappings(); + } + }); +} + +TEST_F(transaction_manager_test_t, mutate) +{ + constexpr laddr_t SIZE = 4096; + run_async([this] { + constexpr laddr_t ADDR = 0xFF * SIZE; + { + auto t = create_transaction(); + auto extent = alloc_extent( + t, + ADDR, + SIZE, + 'a'); + ASSERT_EQ(ADDR, extent->get_laddr()); + check_mappings(t); + check_mappings(); + submit_transaction(std::move(t)); + check_mappings(); + } + { + auto t = create_transaction(); + auto ext = get_extent( + t, + ADDR, + SIZE); + auto mut = mutate_extent(t, ext, 'c'); + check_mappings(t); + check_mappings(); + submit_transaction(std::move(t)); + check_mappings(); + } + }); +} + +TEST_F(transaction_manager_test_t, inc_dec_ref) +{ + constexpr laddr_t SIZE = 4096; + run_async([this] { + constexpr laddr_t ADDR = 0xFF * SIZE; + { + auto t = create_transaction(); + auto extent = alloc_extent( + t, + ADDR, + SIZE, + 'a'); + ASSERT_EQ(ADDR, extent->get_laddr()); + check_mappings(t); + check_mappings(); + submit_transaction(std::move(t)); + check_mappings(); + } + { + auto t = create_transaction(); + inc_ref(t, ADDR); + check_mappings(t); + check_mappings(); + submit_transaction(std::move(t)); + check_mappings(); + } + { + auto t = create_transaction(); + dec_ref(t, ADDR); + check_mappings(t); + check_mappings(); + submit_transaction(std::move(t)); + check_mappings(); + } + { + auto t = create_transaction(); + dec_ref(t, ADDR); + check_mappings(t); + check_mappings(); + submit_transaction(std::move(t)); + check_mappings(); + } + }); +} -- 2.39.5