From 9dcb612b93b296b16deea1afcc12a889863873bf Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Wed, 1 Sep 2021 12:57:54 +0000 Subject: [PATCH] crimson/osd: rework OpSequencer for out-of-order op execution. Signed-off-by: Radoslaw Zarzynski --- src/crimson/common/operation.h | 28 ++-- src/crimson/osd/osd_operation_sequencer.h | 137 +++++++++++++----- .../osd/osd_operations/client_request.cc | 59 ++++---- .../osd/osd_operations/client_request.h | 11 +- 4 files changed, 155 insertions(+), 80 deletions(-) diff --git a/src/crimson/common/operation.h b/src/crimson/common/operation.h index 3106da5c05c9e..3559f6591555d 100644 --- a/src/crimson/common/operation.h +++ b/src/crimson/common/operation.h @@ -314,6 +314,12 @@ std::ostream &operator<<(std::ostream &, const Operation &op); * Maintains a set of lists of all active ops. */ class OperationRegistryI { + using op_list_member_option = boost::intrusive::member_hook< + Operation, + registry_hook_t, + &Operation::registry_hook + >; + friend class Operation; seastar::timer shutdown_timer; seastar::promise<> shutdown; @@ -323,6 +329,11 @@ protected: virtual bool registries_empty() const = 0; public: + using op_list = boost::intrusive::list< + Operation, + op_list_member_option, + boost::intrusive::constant_time_size>; + template typename T::IRef create_operation(Args&&... args) { typename T::IRef op = new T(std::forward(args)...); @@ -346,16 +357,6 @@ public: template class OperationRegistryT : public OperationRegistryI { - using op_list_member_option = boost::intrusive::member_hook< - Operation, - registry_hook_t, - &Operation::registry_hook - >; - using op_list = boost::intrusive::list< - Operation, - op_list_member_option, - boost::intrusive::constant_time_size>; - std::array< op_list, NUM_REGISTRIES @@ -379,6 +380,13 @@ protected: return opl.empty(); }); } +public: + template + const op_list& get_registry() const { + static_assert( + REGISTRY_INDEX < std::tuple_size::value); + return registries[REGISTRY_INDEX]; + } }; class PipelineExitBarrierI { diff --git a/src/crimson/osd/osd_operation_sequencer.h b/src/crimson/osd/osd_operation_sequencer.h index 4a80a92307f5d..dd89f2d4223fc 100644 --- a/src/crimson/osd/osd_operation_sequencer.h +++ b/src/crimson/osd/osd_operation_sequencer.h @@ -25,6 +25,42 @@ namespace crimson::osd { // their id which is monotonically increasing and unique on per PG basis, so we // can keep an op waiting in the case explained above. class OpSequencer { + bool resequencing{false}; + + static bool is_unfinished(const ClientRequest& this_op) { + // TODO: kill the tombstone; reuse op status tracking. + return !this_op.finished; + } + + std::uint64_t get_prev_id(const ClientRequest& this_op, + const OSDOperationRegistry& registry) { + // an alternative to iterating till the registy's beginning could be + // holding a pointer to next(last_completed). + constexpr auto type_idx = static_cast(ClientRequest::type); + for (auto it = OSDOperationRegistry::op_list::s_iterator_to(this_op); + it != std::begin(registry.get_registry()); + --it) { + // we're iterating over the operation registry of all ClientRequests. + // this list aggrates every single instance in the system, and thus + // we need to skip operations coming from different client's session + // or targeting different PGs. + // as this is supposed to happen on cold paths only, the overhead is + // a thing we can live with. + auto* maybe_prev_op = std::addressof(static_cast(*it)); + if (maybe_prev_op->same_session_and_pg(this_op)) { + if (is_unfinished(*maybe_prev_op)) { + return maybe_prev_op->get_id(); + } else { + // an early exited one + } + } + } + // the prev op of this session targeting the same PG as this_op must has + // been completed and hence already has been removed from the list, that's + // the only way we got here + return last_completed_id; + } + public: template start_op(const ClientRequest& op, HandleT& handle, + const OSDOperationRegistry& registry, FuncT&& do_op) { - const uint64_t prev_op = op.get_prev_id(); - const uint64_t this_op = op.get_id(); + ::crimson::get_logger(ceph_subsys_osd).debug( + "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}", + __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id); auto have_green_light = seastar::make_ready_future<>(); - assert(prev_op < this_op); - if (last_issued == prev_op) { - // starting a new op, let's advance the last_issued! - last_issued = this_op; + if (last_started_id < op.get_id()) { + // starting a new op, let's advance the last_started! + last_started_id = op.get_id(); } - if (prev_op != last_unblocked) { - // this implies that there are some blocked ops before me, so i have to - // wait until they are unblocked. + if (__builtin_expect(resequencing, false)) { + // this implies that there was a reset condition and there may me some + // older ops before me, so i have to wait until they are unblocked. // // i should leave the current pipeline stage when waiting for the blocked // ones, so that the following ops can be queued up here. we cannot let // the seastar scheduler to determine the order of performing these ops, // once they are unblocked after the first op of the same pg interval is // scheduled. - assert(prev_op > last_unblocked); + const auto prev_id = get_prev_id(op, registry); + assert(prev_id >= last_unblocked_id); handle.exit(); ::crimson::get_logger(ceph_subsys_osd).debug( - "OpSequencer::start_op: {} waiting ({} > {})", - op, prev_op, last_unblocked); - have_green_light = unblocked.wait([prev_op, this] { + "OpSequencer::start_op: {} resequencing ({} >= {})", + op, prev_id, last_unblocked_id); + have_green_light = unblocked.wait([&op, ®istry, this] { // wait until the previous op is unblocked - return last_unblocked == prev_op; + const bool unblocking = + get_prev_id(op, registry) == last_unblocked_id; + if (unblocking) { + // stop resequencing if everything is handled which means there is no + // operation after us. the range could be minimized by snapshotting + // `last_started` on `maybe_reset()`. + // `<=` is to handle the situation when `last_started` has finished out- + // of-the-order. + resequencing = !(last_started_id <= op.get_id()); + } + return unblocking; }); } - return have_green_light.then([this_op, do_op=std::move(do_op), this]() mutable { + return have_green_light.then([&op, do_op=std::move(do_op), this]() mutable { auto result = seastar::futurize_invoke(std::move(do_op)); // unblock the next one - last_unblocked = this_op; + last_unblocked_id = op.get_id(); unblocked.broadcast(); return result; }); } - uint64_t get_last_issued() const { - return last_issued; + void finish_op_in_order(ClientRequest& op) { + ::crimson::get_logger(ceph_subsys_osd).debug( + "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}", + __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id); + assert(op.get_id() > last_completed_id); + last_completed_id = op.get_id(); + op.finished = true; } - void finish_op(const ClientRequest& op) { - assert(op.get_id() > last_completed); - last_completed = op.get_id(); + void finish_op_out_of_order(ClientRequest& op, + const OSDOperationRegistry& registry) { + ::crimson::get_logger(ceph_subsys_osd).debug( + "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}", + __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id); + op.finished = true; + // fix the `last_unblocked_id`. otherwise we wouldn't be able to leave + // the wait loop in `start_op()` as any concrete value of `last_unblocked_id` + // can wake at most one blocked operation (the successor of `op` if there is any) + // and above we lowered this number to 0 (`get_prev_id()` there would never return + // a value matching `last_unblocked_id`). + if (last_unblocked_id == op.get_id()) { + last_unblocked_id = get_prev_id(op, registry); + } } void maybe_reset(const ClientRequest& op) { + ::crimson::get_logger(ceph_subsys_osd).debug( + "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}", + __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id); const auto op_id = op.get_id(); // pg interval changes, so we need to reenqueue the previously unblocked - // ops by rewinding the "last_unblock" pointer - if (op_id <= last_unblocked) { + // ops by rewinding the "last_unblock" ID. + if (op_id <= last_unblocked_id) { ::crimson::get_logger(ceph_subsys_osd).debug( "OpSequencer::maybe_reset:{} {} <= {}, resetting to {}", - op, op_id, last_unblocked, last_completed); - last_unblocked = last_completed; + op, op_id, last_unblocked_id, last_completed_id); + last_unblocked_id = last_completed_id; + resequencing = true; } } void abort() { + ::crimson::get_logger(ceph_subsys_osd).debug( + "OpSequencer::{}: last_started={}, last_unblocked={}, last_completed={}", + __func__, last_started_id, last_unblocked_id, last_completed_id); // all blocked ops should be canceled, likely due to the osd is not primary // anymore. unblocked.broken(); @@ -95,18 +166,18 @@ private: // /--- unblocked (in pg pipeline) // | /--- blocked // V V - // |////|.....|.......| <--- last_issued + // |////|.....|.......| <--- last_started // ^ ^ ^ // | | \- prev_op // | \--- last_unblocked // last_completed // // the id of last op which is issued - uint64_t last_issued = 0; + std::uint64_t last_started_id = 0; // the id of last op which is unblocked - uint64_t last_unblocked = 0; + std::uint64_t last_unblocked_id = 0; // the id of last op which is completed - uint64_t last_completed = 0; + std::uint64_t last_completed_id = 0; seastar::condition_variable unblocked; friend fmt::formatter; @@ -137,9 +208,9 @@ struct fmt::formatter { FormatContext& ctx) { return fmt::format_to(ctx.out(), - "(last_completed={},last_unblocked={},last_issued={})", - sequencer.last_completed, - sequencer.last_unblocked, - sequencer.last_issued); + "(last_completed={},last_unblocked={},last_started={})", + sequencer.last_completed_id, + sequencer.last_unblocked_id, + sequencer.last_started_id); } }; diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 68d64ab5ce1c5..d5d90f4a2061f 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -37,7 +37,7 @@ ClientRequest::~ClientRequest() void ClientRequest::print(std::ostream &lhs) const { - lhs << "m=[" << *m << "], prev_op_id=" << prev_op_id; + lhs << "m=[" << *m << "]"; } void ClientRequest::dump_detail(Formatter *f) const @@ -54,6 +54,12 @@ ClientRequest::PGPipeline &ClientRequest::pp(PG &pg) return pg.client_request_pg_pipeline; } +bool ClientRequest::same_session_and_pg(const ClientRequest& other_op) const +{ + return &get_osd_priv(conn.get()) == &get_osd_priv(other_op.conn.get()) && + m->get_spg() == other_op.m->get_spg(); +} + bool ClientRequest::is_pg_op() const { return std::any_of( @@ -61,22 +67,10 @@ bool ClientRequest::is_pg_op() const [](auto& op) { return ceph_osd_op_type_pg(op.op.op); }); } -void ClientRequest::may_set_prev_op() -{ - // set prev_op_id if it's not set yet - if (__builtin_expect(!prev_op_id.has_value(), true)) { - prev_op_id.emplace(sequencer.get_last_issued()); - } -} - template ClientRequest::interruptible_future<> ClientRequest::with_sequencer(FuncT&& func) { - may_set_prev_op(); - return sequencer.start_op(*this, handle, std::forward(func)) - .then_interruptible([this] { - sequencer.finish_op(*this); - }); + return sequencer.start_op(*this, handle, osd.get_shard_services().registry, std::forward(func)); } seastar::future<> ClientRequest::start() @@ -101,12 +95,14 @@ seastar::future<> ClientRequest::start() if (m->finish_decode()) { m->clear_payload(); } - const bool has_pg_op = is_pg_op(); - auto interruptible_do_op = interruptor::wrap_function([=] { + return with_sequencer(interruptor::wrap_function([pgref, this] { PG &pg = *pgref; if (pg.can_discard_op(*m)) { - return interruptible_future<>( - osd.send_incremental_map(conn, m->get_map_epoch())); + return osd.send_incremental_map( + conn, m->get_map_epoch()).then([this] { + sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry); + return interruptor::now(); + }); } return with_blocking_future_interruptible( handle.enter(pp(pg).await_map) @@ -119,21 +115,22 @@ seastar::future<> ClientRequest::start() }).then_interruptible([this, &pg]() { return with_blocking_future_interruptible( pg.wait_for_active_blocker.wait()); - }).then_interruptible([this, - has_pg_op, - pgref=std::move(pgref)]() mutable { - return (has_pg_op ? - process_pg_op(pgref) : - process_op(pgref)); + }).then_interruptible([this, pgref=std::move(pgref)]() mutable { + if (is_pg_op()) { + return process_pg_op(pgref).then_interruptible([this] { + sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry); + }); + } else { + return process_op(pgref).then_interruptible([this] { + // NOTE: this assumes process_op() handles everything + // in-order which I'm not sure about. + sequencer.finish_op_in_order(*this); + }); + } }); + })).then_interruptible([pgref] { + return seastar::stop_iteration::yes; }); - // keep the ordering of non-pg ops when across pg internvals - return (has_pg_op ? - interruptible_do_op() : - with_sequencer(std::move(interruptible_do_op))) - .then_interruptible([pgref]() { - return seastar::stop_iteration::yes; - }); }, [this, pgref](std::exception_ptr eptr) { if (should_abort_request(*this, std::move(eptr))) { sequencer.abort(); diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index 3f14acd1cd7af..9563c10226060 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -57,10 +57,7 @@ public: public: seastar::future<> start(); - uint64_t get_prev_id() const { - assert(prev_op_id.has_value()); - return *prev_op_id; - } + bool same_session_and_pg(const ClientRequest& other_op) const; private: template @@ -80,7 +77,10 @@ private: PGPipeline &pp(PG &pg); class OpSequencer& sequencer; - std::optional prev_op_id; + // a tombstone used currently by OpSequencer. In the future it's supposed + // to be replaced with a reusage of OpTracking facilities. + bool finished = false; + friend class OpSequencer; template using interruptible_errorator = @@ -89,7 +89,6 @@ private: Errorator>; private: bool is_misdirected(const PG& pg) const; - void may_set_prev_op(); }; } -- 2.39.5