From e3fca0ea7c63e45615d8d73f5786706a0a9be6cb Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Mon, 22 Feb 2021 10:17:23 +0800 Subject: [PATCH] crimson/osd: refactor OperationRepeatSequencer * extract the OpSequencer out from OperationRepeatSequencer * refactor OpSequencer so we don't need to track the ops using a map, only track the last op and last pg interval for better performance and smaller memory footprint. Signed-off-by: Kefu Chai --- src/crimson/osd/osd_connection_priv.h | 2 +- src/crimson/osd/osd_operation.h | 6 - src/crimson/osd/osd_operation_sequencer.h | 156 +++++++++++------- .../osd/osd_operations/client_request.cc | 24 ++- .../osd/osd_operations/client_request.h | 4 +- 5 files changed, 116 insertions(+), 76 deletions(-) diff --git a/src/crimson/osd/osd_connection_priv.h b/src/crimson/osd/osd_connection_priv.h index 10410ec7206..a0482077b57 100644 --- a/src/crimson/osd/osd_connection_priv.h +++ b/src/crimson/osd/osd_connection_priv.h @@ -12,7 +12,7 @@ namespace crimson::osd { struct OSDConnectionPriv : public crimson::net::Connection::user_private_t { - OperationRepeatSequencer op_sequencer; + OperationRepeatSequencer op_sequencer; ClientRequest::ConnectionPipeline client_request_conn_pipeline; RemotePeeringEvent::ConnectionPipeline peering_request_conn_pipeline; RepRequest::ConnectionPipeline replicated_request_conn_pipeline; diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 455d5541424..1e438f90dba 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -53,13 +53,7 @@ public: virtual ~OperationT() = default; private: - epoch_t interval_start_epoch = 0; - using ops_seq_iter_t = - typename OperationRepeatSequencer::ops_sequence_t::iterator; - std::optional pos; virtual void dump_detail(ceph::Formatter *f) const = 0; - template - friend class OperationRepeatSequencer; }; /** diff --git a/src/crimson/osd/osd_operation_sequencer.h b/src/crimson/osd/osd_operation_sequencer.h index 86662e5108b..7e3ac7bd920 100644 --- a/src/crimson/osd/osd_operation_sequencer.h +++ b/src/crimson/osd/osd_operation_sequencer.h @@ -1,84 +1,120 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- // vim: ts=8 sw=2 smarttab #pragma once #include - +#include #include "crimson/common/operation.h" #include "osd/osd_types.h" namespace crimson::osd { -template -struct OperationComparator; - -template -class OperationRepeatSequencer { +// when PG interval changes, we are supposed to interrupt all in-flight ops. +// but in the order in which the ops are interrupted are not determined +// because they are scheduled by the seastar scheduler, if we just interrupt +// them at seeing a different interval when moving to a new continuation. but +// we are supposed to replay the ops from the same client targeting the same +// PG in the exact order that they are received. +// +// the way how we address this problem is to set up a blocker which blocks an +// op until the preceding op is unblocked if the blocked one is issued in a new +// pg interval. +// +// here, the ops from the same client are grouped by PG, and then ordered by +// their id which is monotonically increasing and unique on per PG basis, so we +// can keep an op waiting in the case explained above. +class OpSequencer { public: - using OpRef = boost::intrusive_ptr; - using ops_sequence_t = std::map>; - - template > - seastar::futurize_t start_op( - HandleT& handle, - epoch_t same_interval_since, - OpRef& op, - const spg_t& pgid, - Func&& func) { - auto& ops = pg_ops[pgid]; - if (!op->pos) { - [[maybe_unused]] auto [it, inserted] = ops.emplace(op, seastar::promise<>()); - assert(inserted); - op->pos = it; + template > + seastar::futurize_t + start_op(HandleT& handle, + uint64_t prev_op, + uint64_t this_op, + FuncT&& do_op) { + auto have_green_light = seastar::make_ready_future<>(); + assert(prev_op < this_op); + if (last_issued == prev_op) { + // starting a new op, let's advance the last_issued! + last_issued = this_op; } - - auto curr_op_pos = *(op->pos); - const bool first = (curr_op_pos == ops.begin()); - auto prev_op_pos = first ? curr_op_pos : std::prev(curr_op_pos); - auto prev_ops_drained = seastar::now(); - if (epoch_t prev_interval = prev_op_pos->first->interval_start_epoch; - !first && same_interval_since > prev_interval) { - // need to wait for previous operations, - // release the current pipepline stage + if (prev_op != last_unblocked) { + // this implies that there are some blocked ops before me, so i have to + // wait until they are unblocked. + // + // i should leave the current pipeline stage when waiting for the blocked + // ones, so that the following ops can be queued up here. we cannot let + // the seastar scheduler to determine the order of performing these ops, + // once they are unblocked after the first op of the same pg interval is + // scheduled. + assert(prev_op > last_unblocked); handle.exit(); - auto& [prev_op, prev_op_complete] = *prev_op_pos; ::crimson::get_logger(ceph_subsys_osd).debug( - "{}, same_interval_since: {}, previous op: {}, last_interval_start: {}", - *op, same_interval_since, prev_op, prev_interval); - prev_ops_drained = prev_op_complete.get_future(); - } else { - assert(same_interval_since == prev_interval || first); + "OpSequencer::start_op: {} waiting ({} > {})", + this_op, prev_op, last_unblocked); + have_green_light = unblocked.wait([prev_op, this] { + // wait until the previous op is unblocked + return last_unblocked == prev_op; + }); } - return prev_ops_drained.then( - [op, same_interval_since, func=std::forward(func)]() mutable { - op->interval_start_epoch = same_interval_since; - auto fut = seastar::futurize_invoke(func); - auto curr_op_pos = *(op->pos); - curr_op_pos->second.set_value(); - curr_op_pos->second = seastar::promise<>(); - return fut; + return have_green_light.then([this_op, do_op=std::move(do_op), this] { + auto result = seastar::futurize_invoke(do_op); + // unblock the next one + last_unblocked = this_op; + unblocked.broadcast(); + return result; }); } - - void finish_op(OpRef& op, const spg_t& pgid, bool interrutped) { - assert(op->pos); - auto curr_op_pos = *(op->pos); - if (interrutped) { - curr_op_pos->second.set_value(); + uint64_t get_last_issued() const { + return last_issued; + } + void finish_op(uint64_t op_id) { + assert(op_id > last_completed); + last_completed = op_id; + } + void maybe_reset(uint64_t op_id) { + // pg interval changes, so we need to reenqueue the previously unblocked + // ops by rewinding the "last_unblock" pointer + if (op_id <= last_unblocked) { + last_unblocked = last_completed; } - pg_ops.at(pgid).erase(curr_op_pos); + } + void abort() { + // all blocked ops should be canceled, likely due to the osd is not primary + // anymore. + unblocked.broken(); } private: - std::map, OperationComparator>> pg_ops; + // /--- unblocked (in pg pipeline) + // | /--- blocked + // V V + // |////|.....|.......| <--- last_issued + // ^ ^ ^ + // | | \- prev_op + // | \--- last_unblocked + // last_completed + // + // the id of last op which is issued + uint64_t last_issued = 0; + // the id of last op which is unblocked + uint64_t last_unblocked = 0; + // the id of last op which is completed + uint64_t last_completed = 0; + seastar::condition_variable unblocked; }; -template -struct OperationComparator { - bool operator()( - const typename OperationRepeatSequencer::OpRef& left, - const typename OperationRepeatSequencer::OpRef& right) const { - return left->get_id() < right->get_id(); + +class OperationRepeatSequencer { +public: + OpSequencer& get(const spg_t& pgid) { + return pg_ops.at(pgid); } + OpSequencer& operator[](const spg_t& pgid) { + // TODO: trim pg_ops if there are too many empty sequencers + return pg_ops[pgid]; + } +private: + std::map pg_ops; }; - } // namespace crimson::osd diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index ffdfe205914..1f59a907080 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -23,7 +23,11 @@ namespace crimson::osd { ClientRequest::ClientRequest( OSD &osd, crimson::net::ConnectionRef conn, Ref &&m) - : osd(osd), conn(conn), m(m), ors(get_osd_priv(conn.get()).op_sequencer) + : osd(osd), + conn(conn), + m(m), + sequencer(get_osd_priv(conn.get()).op_sequencer[m->get_spg()]), + prev_op_id(sequencer.get_last_issued()) {} ClientRequest::~ClientRequest() @@ -76,8 +80,8 @@ seastar::future<> ClientRequest::start() }).then([this, opref](Ref pgref) mutable { epoch_t same_interval_since = pgref->get_interval_start_epoch(); logger().debug("{} same_interval_since: {}", *this, same_interval_since); - return ors.start_op( - handle, same_interval_since, opref, pgref->get_pgid(), + return sequencer.start_op( + handle, prev_op_id, opref->get_id(), [this, opref, pgref] { PG &pg = *pgref; if (pg.can_discard_op(*m)) { @@ -105,19 +109,23 @@ seastar::future<> ClientRequest::start() return process_op(pgref); } }); - }).then([this, opref, pgref]() mutable { - ors.finish_op(opref, pgref->get_pgid(), false); - return seastar::stop_iteration::yes; + }).then([this] { + sequencer.finish_op(get_id()); + return seastar::stop_iteration::yes; }).handle_exception_type( - [opref, pgref, this](crimson::common::actingset_changed& e) mutable { + [this](crimson::common::actingset_changed& e) mutable { if (e.is_primary()) { logger().debug("operation restart, acting set changed"); + sequencer.maybe_reset(get_id()); return seastar::stop_iteration::no; } else { - ors.finish_op(opref, pgref->get_pgid(), true); + sequencer.abort(); logger().debug("operation abort, up primary changed"); return seastar::stop_iteration::yes; } + }).handle_exception_type( + [](seastar::broken_condition_variable&) { + return seastar::stop_iteration::yes; }); }); }); diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index 33bb35ee38e..a88c3b331c2 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -74,7 +74,9 @@ private: ConnectionPipeline &cp(); PGPipeline &pp(PG &pg); - OperationRepeatSequencer& ors; + OpSequencer& sequencer; + const uint64_t prev_op_id; + private: bool is_misdirected(const PG& pg) const; }; -- 2.39.5