From 41ae1c497b11cdc882da818ab2a696a60af2c1d5 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 11 Jan 2021 15:03:31 -0800 Subject: [PATCH] crimson/os/seastore: add basic pipeline phases to TransactionManager We need to ensure that the metadata preperation and completions happen in order. Signed-off-by: Samuel Just --- src/crimson/os/seastore/journal.cc | 22 ++++-- src/crimson/os/seastore/journal.h | 21 ++++-- src/crimson/os/seastore/ordering_handle.h | 69 +++++++++++++++++++ src/crimson/os/seastore/transaction.h | 26 ++++++- .../os/seastore/transaction_manager.cc | 31 ++++++--- src/crimson/os/seastore/transaction_manager.h | 2 + .../seastore/test_btree_lba_manager.cc | 5 +- .../crimson/seastore/test_seastore_journal.cc | 8 ++- 8 files changed, 160 insertions(+), 24 deletions(-) create mode 100644 src/crimson/os/seastore/ordering_handle.h diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index 21b918e5474af..a866058c5a63c 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -178,7 +178,8 @@ Journal::read_validate_data_ret Journal::read_validate_data( Journal::write_record_ret Journal::write_record( record_size_t rsize, - record_t &&record) + record_t &&record, + OrderingHandle &handle) { ceph::bufferlist to_write = encode_record( rsize, std::move(record)); @@ -190,10 +191,20 @@ Journal::write_record_ret Journal::write_record( rsize.mdlength, rsize.dlength, target); - return current_journal_segment->write(target, to_write).handle_error( - write_record_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in Journal::write_record" + // Start write under the current exclusive stage, but wait for it + // in the device_submission concurrent stage to permit multiple + // overlapping writes. + auto write_fut = current_journal_segment->write(target, to_write); + return handle.enter(write_pipeline->device_submission + ).then([write_fut = std::move(write_fut)]() mutable { + return std::move(write_fut + ).handle_error( + write_record_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in Journal::write_record" + } + ).safe_then([this, &handle] { + return handle.enter(write_pipeline->finalize); }).safe_then([this, target] { committed_to = target; return write_record_ret( @@ -202,6 +213,7 @@ Journal::write_record_ret Journal::write_record( current_journal_segment->get_segment_id(), target}); }); + }); } Journal::record_size_t Journal::get_encoded_record_length( diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index 7424d78b31f14..ecdd64d316fc5 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -14,6 +14,7 @@ #include "include/denc.h" #include "crimson/os/seastore/segment_manager.h" +#include "crimson/os/seastore/ordering_handle.h" #include "crimson/os/seastore/seastore_types.h" #include "crimson/osd/exceptions.h" @@ -170,7 +171,11 @@ public: using submit_record_ret = submit_record_ertr::future< std::pair >; - submit_record_ret submit_record(record_t &&record) { + submit_record_ret submit_record( + record_t &&record, + OrderingHandle &handle + ) { + assert(write_pipeline); auto rsize = get_encoded_record_length(record); auto total = rsize.mdlength + rsize.dlength; if (total > max_record_length) { @@ -180,8 +185,10 @@ public: ? roll_journal_segment().safe_then([](auto){}) : roll_journal_segment_ertr::now(); return roll.safe_then( - [this, rsize, record=std::move(record)]() mutable { - return write_record(rsize, std::move(record) + [this, rsize, record=std::move(record), &handle]() mutable { + return write_record( + rsize, std::move(record), + handle ).safe_then([this, rsize](auto addr) { return std::make_pair( addr.add_offset(rsize.mdlength), @@ -223,6 +230,9 @@ public: extent_len_t bytes_to_read ); + void set_write_pipeline(WritePipeline *_write_pipeline) { + write_pipeline = _write_pipeline; + } private: const extent_len_t block_size; @@ -238,6 +248,8 @@ private: segment_off_t written_to = 0; segment_off_t committed_to = 0; + WritePipeline *write_pipeline = nullptr; + journal_seq_t get_journal_seq(paddr_t addr) { return journal_seq_t{next_journal_segment_seq-1, addr}; } @@ -289,7 +301,8 @@ private: using write_record_ret = write_record_ertr::future; write_record_ret write_record( record_size_t rsize, - record_t &&record); + record_t &&record, + OrderingHandle &handle); /// close current segment and initialize next one using roll_journal_segment_ertr = crimson::errorator< diff --git a/src/crimson/os/seastore/ordering_handle.h b/src/crimson/os/seastore/ordering_handle.h new file mode 100644 index 0000000000000..0f6a77a5ceb63 --- /dev/null +++ b/src/crimson/os/seastore/ordering_handle.h @@ -0,0 +1,69 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/common/operation.h" + +namespace crimson::os::seastore { + +/** + * PlaceholderOperation + * + * Once seastore is more complete, I expect to update the externally + * facing interfaces to permit passing the osd level operation through. + * Until then (and for tests likely permanently) we'll use this unregistered + * placeholder for the pipeline phases necessary for journal correctness. + */ +class PlaceholderOperation : public Operation { +public: + using IRef = boost::intrusive_ptr; + + unsigned get_type() const final { + return 0; + } + + const char *get_type_name() const final { + return "crimson::os::seastore::PlaceholderOperation"; + } + +private: + void dump_detail(ceph::Formatter *f) const final {} + void print(std::ostream &) const final {} +}; + +struct OrderingHandle { + OperationRef op; + PipelineHandle phase_handle; + + template + seastar::future<> enter(T &t) { + return op->with_blocking_future(phase_handle.enter(t)); + } + + void exit() { + return phase_handle.exit(); + } + + seastar::future<> complete() { + return phase_handle.complete(); + } +}; + +inline OrderingHandle get_dummy_ordering_handle() { + return OrderingHandle{new PlaceholderOperation, {}}; +} + +struct WritePipeline { + OrderedExclusivePhase prepare{ + "TransactionManager::prepare_phase" + }; + OrderedConcurrentPhase device_submission{ + "TransactionManager::journal_phase" + }; + OrderedExclusivePhase finalize{ + "TransactionManager::finalize_phase" + }; +}; + +} diff --git a/src/crimson/os/seastore/transaction.h b/src/crimson/os/seastore/transaction.h index e189d1d32da03..a2ad59825ccac 100644 --- a/src/crimson/os/seastore/transaction.h +++ b/src/crimson/os/seastore/transaction.h @@ -5,6 +5,7 @@ #include +#include "crimson/os/seastore/ordering_handle.h" #include "crimson/os/seastore/seastore_types.h" #include "crimson/os/seastore/cached_extent.h" #include "crimson/os/seastore/root_block.h" @@ -18,6 +19,8 @@ namespace crimson::os::seastore { */ class Transaction { public: + OrderingHandle handle; + using Ref = std::unique_ptr; enum class get_extent_ret { PRESENT, @@ -130,16 +133,33 @@ private: ///< if != NULL_SEG_ID, release this segment after completion segment_id_t to_release = NULL_SEG_ID; - Transaction(bool weak) : weak(weak) {} +public: + Transaction( + OrderingHandle &&handle, + bool weak + ) : handle(std::move(handle)), weak(weak) {} + + ~Transaction() { + for (auto i = write_set.begin(); + i != write_set.end();) { + i->state = CachedExtent::extent_state_t::INVALID; + write_set.erase(*i++); + } + } }; using TransactionRef = Transaction::Ref; inline TransactionRef make_transaction() { - return std::unique_ptr(new Transaction(false)); + return std::make_unique( + get_dummy_ordering_handle(), + false + ); } inline TransactionRef make_weak_transaction() { - return std::unique_ptr(new Transaction(true)); + return std::make_unique( + get_dummy_ordering_handle(), + true); } } diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc index f4cb6da58885f..ae23b4ba0df54 100644 --- a/src/crimson/os/seastore/transaction_manager.cc +++ b/src/crimson/os/seastore/transaction_manager.cc @@ -29,7 +29,9 @@ TransactionManager::TransactionManager( cache(cache), lba_manager(lba_manager), journal(journal) -{} +{ + journal.set_write_pipeline(&write_pipeline); +} TransactionManager::mkfs_ertr::future<> TransactionManager::mkfs() { @@ -190,32 +192,41 @@ TransactionManager::submit_transaction( TransactionRef t) { logger().debug("TransactionManager::submit_transaction"); - return segment_cleaner.do_immediate_work(*t - ).safe_then([this, t=std::move(t)]() mutable -> submit_transaction_ertr::future<> { - auto record = cache.try_construct_record(*t); + auto &tref = *t; + return tref.handle.enter(write_pipeline.prepare + ).then([this, &tref]() mutable { + return segment_cleaner.do_immediate_work(tref); + }).safe_then([this, &tref]() mutable + -> submit_transaction_ertr::future<> { + logger().debug("TransactionManager::submit_transaction after do_immediate"); + auto record = cache.try_construct_record(tref); if (!record) { return crimson::ct_error::eagain::make(); } - return journal.submit_record(std::move(*record) - ).safe_then([this, t=std::move(t)](auto p) mutable { + return journal.submit_record(std::move(*record), tref.handle + ).safe_then([this, &tref](auto p) mutable { auto [addr, journal_seq] = p; segment_cleaner.set_journal_head(journal_seq); - cache.complete_commit(*t, addr, journal_seq, &segment_cleaner); - lba_manager.complete_transaction(*t); - auto to_release = t->get_segment_to_release(); + cache.complete_commit(tref, addr, journal_seq, &segment_cleaner); + lba_manager.complete_transaction(tref); + auto to_release = tref.get_segment_to_release(); if (to_release != NULL_SEG_ID) { segment_cleaner.mark_segment_released(to_release); return segment_manager.release(to_release); } else { return SegmentManager::release_ertr::now(); } + }).safe_then([&tref] { + return tref.handle.complete(); }).handle_error( submit_transaction_ertr::pass_further{}, crimson::ct_error::all_same_way([](auto e) { ceph_assert(0 == "Hit error submitting to journal"); })); - }); + }).finally([t=std::move(t)]() mutable { + t->handle.exit(); + }); } TransactionManager::get_next_dirty_extents_ret diff --git a/src/crimson/os/seastore/transaction_manager.h b/src/crimson/os/seastore/transaction_manager.h index e105b87e1f68b..9e08e66437a58 100644 --- a/src/crimson/os/seastore/transaction_manager.h +++ b/src/crimson/os/seastore/transaction_manager.h @@ -332,6 +332,8 @@ private: Cache &cache; LBAManager &lba_manager; Journal &journal; + + WritePipeline write_pipeline; }; using TransactionManagerRef = std::unique_ptr; diff --git a/src/test/crimson/seastore/test_btree_lba_manager.cc b/src/test/crimson/seastore/test_btree_lba_manager.cc index 60d5c3497ee71..544d074e81644 100644 --- a/src/test/crimson/seastore/test_btree_lba_manager.cc +++ b/src/test/crimson/seastore/test_btree_lba_manager.cc @@ -33,6 +33,8 @@ struct btree_lba_manager_test : const size_t block_size; + WritePipeline pipeline; + btree_lba_manager_test() : segment_manager(segment_manager::create_test_ephemeral()), journal(*segment_manager), @@ -41,6 +43,7 @@ struct btree_lba_manager_test : block_size(segment_manager->get_block_size()) { journal.set_segment_provider(this); + journal.set_write_pipeline(&pipeline); } segment_id_t next = 0; @@ -60,7 +63,7 @@ struct btree_lba_manager_test : ceph_assert(0 == "cannot fail"); } - return journal.submit_record(std::move(*record)).safe_then( + return journal.submit_record(std::move(*record), t->handle).safe_then( [this, t=std::move(t)](auto p) mutable { auto [addr, seq] = p; cache.complete_commit(*t, addr, seq); diff --git a/src/test/crimson/seastore/test_seastore_journal.cc b/src/test/crimson/seastore/test_seastore_journal.cc index 0bed505ffa1d1..fe824061552c7 100644 --- a/src/test/crimson/seastore/test_seastore_journal.cc +++ b/src/test/crimson/seastore/test_seastore_journal.cc @@ -64,6 +64,7 @@ struct record_validator_t { struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider { segment_manager::EphemeralSegmentManagerRef segment_manager; + WritePipeline pipeline; std::unique_ptr journal; std::vector records; @@ -91,6 +92,7 @@ struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider { seastar::future<> set_up_fut() final { journal.reset(new Journal(*segment_manager)); journal->set_segment_provider(this); + journal->set_write_pipeline(&pipeline); return segment_manager->init( ).safe_then([this] { return journal->open_for_write(); @@ -107,6 +109,7 @@ struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider { ).safe_then([this, f=std::move(f)]() mutable { journal.reset(new Journal(*segment_manager)); journal->set_segment_provider(this); + journal->set_write_pipeline(&pipeline); return journal->replay(std::forward(std::move(f))); }).safe_then([this] { return journal->open_for_write(); @@ -151,7 +154,10 @@ struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider { auto submit_record(T&&... _record) { auto record{std::forward(_record)...}; records.push_back(record); - auto [addr, _] = journal->submit_record(std::move(record)).unsafe_get0(); + OrderingHandle handle = get_dummy_ordering_handle(); + auto [addr, _] = journal->submit_record( + std::move(record), + handle).unsafe_get0(); records.back().record_final_offset = addr; return addr; } -- 2.39.5