From 95ace7e7000950a4c97feac009bdb082aa30d21f Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 29 Jun 2021 10:06:45 -0700 Subject: [PATCH] crimson/os/seastore: add collection ordering Adds a mutex to SeastoreCollection which ensures that transactions are ordered on a per-collection basis. Future optimizations could enable the transaction construction phase to be pipelined for a single collection, but would require correctly handling the case where a more recently submitted transaction encounters a conflict. Fixes: https://tracker.ceph.com/issues/51358 Signed-off-by: Samuel Just --- src/crimson/os/seastore/ordering_handle.h | 32 ++++++++++++++--- src/crimson/os/seastore/seastore.cc | 19 ++-------- src/crimson/os/seastore/seastore.h | 36 +++++++++++++------ .../os/seastore/transaction_manager.cc | 9 +++-- 4 files changed, 60 insertions(+), 36 deletions(-) diff --git a/src/crimson/os/seastore/ordering_handle.h b/src/crimson/os/seastore/ordering_handle.h index 7afb22bb7d5..6e519114ca6 100644 --- a/src/crimson/os/seastore/ordering_handle.h +++ b/src/crimson/os/seastore/ordering_handle.h @@ -3,6 +3,8 @@ #pragma once +#include + #include "crimson/common/operation.h" namespace crimson::os::seastore { @@ -35,6 +37,27 @@ private: struct OrderingHandle { OperationRef op; PipelineHandle phase_handle; + seastar::shared_mutex *collection_ordering_lock = nullptr; + + OrderingHandle(OperationRef &&op) : op(std::move(op)) {} + OrderingHandle(OrderingHandle &&other) + : op(std::move(other.op)), phase_handle(std::move(other.phase_handle)), + collection_ordering_lock(other.collection_ordering_lock) { + other.collection_ordering_lock = nullptr; + } + + seastar::future<> take_collection_lock(seastar::shared_mutex &mutex) { + ceph_assert(!collection_ordering_lock); + collection_ordering_lock = &mutex; + return collection_ordering_lock->lock(); + } + + void maybe_release_collection_lock() { + if (collection_ordering_lock) { + collection_ordering_lock->unlock(); + collection_ordering_lock = nullptr; + } + } template seastar::future<> enter(T &t) { @@ -48,16 +71,17 @@ struct OrderingHandle { seastar::future<> complete() { return phase_handle.complete(); } + + ~OrderingHandle() { + maybe_release_collection_lock(); + } }; inline OrderingHandle get_dummy_ordering_handle() { - return OrderingHandle{new PlaceholderOperation, {}}; + return OrderingHandle{new PlaceholderOperation}; } struct WritePipeline { - OrderedExclusivePhase wait_throttle{ - "WritePipeline::wait_throttle_phase" - }; OrderedExclusivePhase prepare{ "WritePipeline::prepare_phase" }; diff --git a/src/crimson/os/seastore/seastore.cc b/src/crimson/os/seastore/seastore.cc index 20f36667dde..8fad148b59a 100644 --- a/src/crimson/os/seastore/seastore.cc +++ b/src/crimson/os/seastore/seastore.cc @@ -9,6 +9,8 @@ #include #include +#include + #include "common/safe_io.h" #include "os/Transaction.h" @@ -42,13 +44,6 @@ SeaStore::SeaStore( SeaStore::~SeaStore() = default; -class SeastoreCollection final : public FuturizedCollection { -public: - template - SeastoreCollection(T&&... args) : - FuturizedCollection(std::forward(args)...) {} -}; - seastar::future<> SeaStore::stop() { return seastar::now(); @@ -599,15 +594,7 @@ seastar::future<> SeaStore::do_transaction( CollectionRef _ch, ceph::os::Transaction&& _t) { - /* TODO: add ordering to Collection - * - * TransactionManager::submit_transction will ensure that - * beginning at that point operations remain ordered through - * to the jorunal. We still need a pipeline stage associated - * with each collection to ensure that this portion in - * SeaStore::do_transaction remains correctly ordered for operations - * submitted on the same collection. TODO - */ + // repeat_with_internal_context ensures ordering via collection lock return repeat_with_internal_context( _ch, std::move(_t), diff --git a/src/crimson/os/seastore/seastore.h b/src/crimson/os/seastore/seastore.h index c745daf13b3..682c5626f76 100644 --- a/src/crimson/os/seastore/seastore.h +++ b/src/crimson/os/seastore/seastore.h @@ -25,11 +25,19 @@ namespace crimson::os::seastore { -class SeastoreCollection; class Onode; using OnodeRef = boost::intrusive_ptr; class TransactionManager; +class SeastoreCollection final : public FuturizedCollection { +public: + template + SeastoreCollection(T&&... args) : + FuturizedCollection(std::forward(args)...) {} + + seastar::shared_mutex ordering_lock; +}; + class SeaStore final : public FuturizedStore { public: @@ -128,8 +136,10 @@ private: internal_context_t( CollectionRef ch, - ceph::os::Transaction &&_ext_transaction) + ceph::os::Transaction &&_ext_transaction, + TransactionRef &&transaction) : ch(ch), ext_transaction(std::move(_ext_transaction)), + transaction(std::move(transaction)), iter(ext_transaction.begin()) {} TransactionRef transaction; @@ -158,15 +168,19 @@ private: transaction_manager->create_transaction()), std::forward(f), [this](auto &ctx, auto &f) { - return repeat_eagain([&]() { - ctx.reset_preserve_handle(transaction_manager); - return std::invoke(f, ctx); - }).handle_error( - crimson::ct_error::eagain::pass_further{}, - crimson::ct_error::all_same_way([&ctx](auto e) { - on_error(ctx.ext_transaction); - }) - ); + return ctx.transaction->get_handle().take_collection_lock( + static_cast(*(ctx.ch)).ordering_lock + ).then([&, this] { + return repeat_eagain([&, this] { + ctx.reset_preserve_handle(transaction_manager); + return std::invoke(f, ctx); + }).handle_error( + crimson::ct_error::eagain::pass_further{}, + crimson::ct_error::all_same_way([&ctx](auto e) { + on_error(ctx.ext_transaction); + }) + ); + }); }); } diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc index 5cdb498bc9f..8f34581eb44 100644 --- a/src/crimson/os/seastore/transaction_manager.cc +++ b/src/crimson/os/seastore/transaction_manager.cc @@ -223,11 +223,8 @@ TransactionManager::submit_transaction( { LOG_PREFIX(TransactionManager::submit_transaction); DEBUGT("about to await throttle", t); - return trans_intr::make_interruptible( - t.get_handle().enter(write_pipeline.wait_throttle) - ).then_interruptible([this] { - return trans_intr::make_interruptible(segment_cleaner->await_hard_limits()); - }).then_interruptible([this, &t]() { + return trans_intr::make_interruptible(segment_cleaner->await_hard_limits() + ).then_interruptible([this, &t]() { return submit_transaction_direct(t); }); } @@ -245,6 +242,8 @@ TransactionManager::submit_transaction_direct( auto record = cache->try_construct_record(tref); assert(record); // interruptible future would have already failed + tref.get_handle().maybe_release_collection_lock(); + DEBUGT("about to submit to journal", tref); return journal->submit_record(std::move(*record), tref.get_handle() -- 2.39.5