From: Samuel Just Date: Fri, 29 Apr 2022 01:16:32 +0000 (+0000) Subject: crimson/osd: replace OpSequencer with simpler intrusive_list based implementation X-Git-Tag: v18.0.0~914^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5aa342c9f08df251917020d2a0970f94d9d66ec0;p=ceph-ci.git crimson/osd: replace OpSequencer with simpler intrusive_list based implementation Signed-off-by: Samuel Just --- diff --git a/src/crimson/osd/osd_connection_priv.h b/src/crimson/osd/osd_connection_priv.h index 32d677d9652..69edf94b88f 100644 --- a/src/crimson/osd/osd_connection_priv.h +++ b/src/crimson/osd/osd_connection_priv.h @@ -5,7 +5,6 @@ #include "crimson/net/Connection.h" #include "crimson/osd/osd_operation.h" -#include "crimson/osd/osd_operation_sequencer.h" #include "crimson/osd/osd_operations/client_request.h" #include "crimson/osd/osd_operations/peering_event.h" #include "crimson/osd/osd_operations/replicated_request.h" @@ -13,7 +12,6 @@ namespace crimson::osd { struct OSDConnectionPriv : public crimson::net::Connection::user_private_t { - OpSequencers op_sequencer; ConnectionPipeline client_request_conn_pipeline; ConnectionPipeline peering_request_conn_pipeline; ConnectionPipeline replicated_request_conn_pipeline; diff --git a/src/crimson/osd/osd_operation_sequencer.h b/src/crimson/osd/osd_operation_sequencer.h deleted file mode 100644 index dd89f2d4223..00000000000 --- a/src/crimson/osd/osd_operation_sequencer.h +++ /dev/null @@ -1,216 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- -// vim: ts=8 sw=2 smarttab - -#pragma once - -#include -#include -#include -#include "crimson/osd/osd_operations/client_request.h" - -namespace crimson::osd { - -// when PG interval changes, we are supposed to interrupt all in-flight ops. -// but in the order in which the ops are interrupted are not determined -// because they are scheduled by the seastar scheduler, if we just interrupt -// them at seeing a different interval when moving to a new continuation. but -// we are supposed to replay the ops from the same client targeting the same -// PG in the exact order that they are received. -// -// the way how we address this problem is to set up a blocker which blocks an -// op until the preceding op is unblocked if the blocked one is issued in a new -// pg interval. -// -// here, the ops from the same client are grouped by PG, and then ordered by -// their id which is monotonically increasing and unique on per PG basis, so we -// can keep an op waiting in the case explained above. -class OpSequencer { - bool resequencing{false}; - - static bool is_unfinished(const ClientRequest& this_op) { - // TODO: kill the tombstone; reuse op status tracking. - return !this_op.finished; - } - - std::uint64_t get_prev_id(const ClientRequest& this_op, - const OSDOperationRegistry& registry) { - // an alternative to iterating till the registy's beginning could be - // holding a pointer to next(last_completed). - constexpr auto type_idx = static_cast(ClientRequest::type); - for (auto it = OSDOperationRegistry::op_list::s_iterator_to(this_op); - it != std::begin(registry.get_registry()); - --it) { - // we're iterating over the operation registry of all ClientRequests. - // this list aggrates every single instance in the system, and thus - // we need to skip operations coming from different client's session - // or targeting different PGs. - // as this is supposed to happen on cold paths only, the overhead is - // a thing we can live with. - auto* maybe_prev_op = std::addressof(static_cast(*it)); - if (maybe_prev_op->same_session_and_pg(this_op)) { - if (is_unfinished(*maybe_prev_op)) { - return maybe_prev_op->get_id(); - } else { - // an early exited one - } - } - } - // the prev op of this session targeting the same PG as this_op must has - // been completed and hence already has been removed from the list, that's - // the only way we got here - return last_completed_id; - } - -public: - template > - seastar::futurize_t - start_op(const ClientRequest& op, - HandleT& handle, - const OSDOperationRegistry& registry, - FuncT&& do_op) { - ::crimson::get_logger(ceph_subsys_osd).debug( - "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}", - __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id); - auto have_green_light = seastar::make_ready_future<>(); - if (last_started_id < op.get_id()) { - // starting a new op, let's advance the last_started! - last_started_id = op.get_id(); - } - if (__builtin_expect(resequencing, false)) { - // this implies that there was a reset condition and there may me some - // older ops before me, so i have to wait until they are unblocked. - // - // i should leave the current pipeline stage when waiting for the blocked - // ones, so that the following ops can be queued up here. we cannot let - // the seastar scheduler to determine the order of performing these ops, - // once they are unblocked after the first op of the same pg interval is - // scheduled. - const auto prev_id = get_prev_id(op, registry); - assert(prev_id >= last_unblocked_id); - handle.exit(); - ::crimson::get_logger(ceph_subsys_osd).debug( - "OpSequencer::start_op: {} resequencing ({} >= {})", - op, prev_id, last_unblocked_id); - have_green_light = unblocked.wait([&op, ®istry, this] { - // wait until the previous op is unblocked - const bool unblocking = - get_prev_id(op, registry) == last_unblocked_id; - if (unblocking) { - // stop resequencing if everything is handled which means there is no - // operation after us. the range could be minimized by snapshotting - // `last_started` on `maybe_reset()`. - // `<=` is to handle the situation when `last_started` has finished out- - // of-the-order. - resequencing = !(last_started_id <= op.get_id()); - } - return unblocking; - }); - } - return have_green_light.then([&op, do_op=std::move(do_op), this]() mutable { - auto result = seastar::futurize_invoke(std::move(do_op)); - // unblock the next one - last_unblocked_id = op.get_id(); - unblocked.broadcast(); - return result; - }); - } - void finish_op_in_order(ClientRequest& op) { - ::crimson::get_logger(ceph_subsys_osd).debug( - "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}", - __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id); - assert(op.get_id() > last_completed_id); - last_completed_id = op.get_id(); - op.finished = true; - } - void finish_op_out_of_order(ClientRequest& op, - const OSDOperationRegistry& registry) { - ::crimson::get_logger(ceph_subsys_osd).debug( - "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}", - __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id); - op.finished = true; - // fix the `last_unblocked_id`. otherwise we wouldn't be able to leave - // the wait loop in `start_op()` as any concrete value of `last_unblocked_id` - // can wake at most one blocked operation (the successor of `op` if there is any) - // and above we lowered this number to 0 (`get_prev_id()` there would never return - // a value matching `last_unblocked_id`). - if (last_unblocked_id == op.get_id()) { - last_unblocked_id = get_prev_id(op, registry); - } - } - void maybe_reset(const ClientRequest& op) { - ::crimson::get_logger(ceph_subsys_osd).debug( - "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}", - __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id); - const auto op_id = op.get_id(); - // pg interval changes, so we need to reenqueue the previously unblocked - // ops by rewinding the "last_unblock" ID. - if (op_id <= last_unblocked_id) { - ::crimson::get_logger(ceph_subsys_osd).debug( - "OpSequencer::maybe_reset:{} {} <= {}, resetting to {}", - op, op_id, last_unblocked_id, last_completed_id); - last_unblocked_id = last_completed_id; - resequencing = true; - } - } - void abort() { - ::crimson::get_logger(ceph_subsys_osd).debug( - "OpSequencer::{}: last_started={}, last_unblocked={}, last_completed={}", - __func__, last_started_id, last_unblocked_id, last_completed_id); - // all blocked ops should be canceled, likely due to the osd is not primary - // anymore. - unblocked.broken(); - } -private: - // /--- unblocked (in pg pipeline) - // | /--- blocked - // V V - // |////|.....|.......| <--- last_started - // ^ ^ ^ - // | | \- prev_op - // | \--- last_unblocked - // last_completed - // - // the id of last op which is issued - std::uint64_t last_started_id = 0; - // the id of last op which is unblocked - std::uint64_t last_unblocked_id = 0; - // the id of last op which is completed - std::uint64_t last_completed_id = 0; - seastar::condition_variable unblocked; - - friend fmt::formatter; -}; - - -class OpSequencers { -public: - OpSequencer& get(const spg_t& pgid) { - return pg_ops.at(pgid); - } - OpSequencer& operator[](const spg_t& pgid) { - // TODO: trim pg_ops if there are too many empty sequencers - return pg_ops[pgid]; - } -private: - std::map pg_ops; -}; -} // namespace crimson::osd - -template <> -struct fmt::formatter { - // ignore the format string - constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); } - - template - auto format(const crimson::osd::OpSequencer& sequencer, - FormatContext& ctx) - { - return fmt::format_to(ctx.out(), - "(last_completed={},last_unblocked={},last_started={})", - sequencer.last_completed_id, - sequencer.last_unblocked_id, - sequencer.last_started_id); - } -}; diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index bc0c3d94397..b35dbff1609 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -1,8 +1,6 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- // vim: ts=8 sw=2 smarttab expandtab -#include - #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" @@ -10,7 +8,6 @@ #include "crimson/osd/pg.h" #include "crimson/osd/osd.h" #include "common/Formatter.h" -#include "crimson/osd/osd_operation_sequencer.h" #include "crimson/osd/osd_operation_external_tracking.h" #include "crimson/osd/osd_operations/client_request.h" #include "crimson/osd/osd_connection_priv.h" @@ -23,12 +20,42 @@ namespace { namespace crimson::osd { + +void ClientRequest::Orderer::requeue( + ShardServices &shard_services, Ref pg) +{ + for (auto &req: list) { + req.handle.exit(); + } + for (auto &req: list) { + logger().debug("{}: {} requeueing {}", __func__, *pg, req); + std::ignore = req.with_pg_int(shard_services, pg); + } +} + +void ClientRequest::Orderer::clear_and_cancel() +{ + for (auto i = list.begin(); i != list.end(); ) { + logger().debug( + "{}: ClientRequest::Orderer::clear_and_cancel {}", + *i); + i->complete_request(); + remove_request(*(i++)); + } +} + +void ClientRequest::complete_request() +{ + track_event(); + handle.exit(); + on_complete.set_value(); +} + ClientRequest::ClientRequest( OSD &osd, crimson::net::ConnectionRef conn, Ref &&m) : osd(osd), conn(conn), - m(m), - sequencer(get_osd_priv(conn.get()).op_sequencer[m->get_spg()]) + m(m) {} ClientRequest::~ClientRequest() @@ -77,7 +104,7 @@ bool ClientRequest::is_pg_op() const [](auto& op) { return ceph_osd_op_type_pg(op.op.op); }); } -seastar::future ClientRequest::with_pg_int( +seastar::future<> ClientRequest::with_pg_int( ShardServices &shard_services, Ref pgref) { epoch_t same_interval_since = pgref->get_interval_start_epoch(); @@ -85,79 +112,72 @@ seastar::future ClientRequest::with_pg_int( if (m->finish_decode()) { m->clear_payload(); } + const auto this_instance_id = instance_id++; + OperationRef opref{this}; return interruptor::with_interruption( - [this, &shard_services, pgref]() mutable { - return sequencer.start_op( - *this, handle, shard_services.registry, - interruptor::wrap_function([pgref, this, &shard_services] { - PG &pg = *pgref; - if (pg.can_discard_op(*m)) { - return osd.send_incremental_map( - conn, m->get_map_epoch() - ).then([this, &shard_services] { - sequencer.finish_op_out_of_order(*this, shard_services.registry); - return interruptor::now(); - }); - } - return enter_stage(pp(pg).await_map - ).then_interruptible([this, &pg] { - return with_blocking_event< - PG_OSDMapGate::OSDMapBlocker::BlockingEvent - >([this, &pg] (auto&& trigger) { - return pg.osdmap_gate.wait_for_map(std::move(trigger), - m->get_min_epoch()); - }); - }).then_interruptible([this, &pg](auto map) { - return enter_stage(pp(pg).wait_for_active); - }).then_interruptible([this, &pg]() { - return with_blocking_event< - PGActivationBlocker::BlockingEvent - >([&pg] (auto&& trigger) { - return pg.wait_for_active_blocker.wait(std::move(trigger)); - }); - }).then_interruptible( - [this, &shard_services, pgref=std::move(pgref)]() mutable { - if (is_pg_op()) { - return process_pg_op( - pgref - ).then_interruptible([this, &shard_services] { - sequencer.finish_op_out_of_order(*this, shard_services.registry); - }); - } else { - return process_op( - pgref - ).then_interruptible([this, &shard_services](const seq_mode_t mode) { - if (mode == seq_mode_t::IN_ORDER) { - sequencer.finish_op_in_order(*this); - } else { - assert(mode == seq_mode_t::OUT_OF_ORDER); - sequencer.finish_op_out_of_order(*this, shard_services.registry); - } - }); - } - }); - })).then_interruptible([] { - return seastar::stop_iteration::yes; + [this, pgref, this_instance_id]() mutable { + PG &pg = *pgref; + if (pg.can_discard_op(*m)) { + return osd.send_incremental_map( + conn, m->get_map_epoch() + ).then([this, this_instance_id, pgref] { + logger().debug("{}.{}: discarding", *this, this_instance_id); + pgref->client_request_orderer.remove_request(*this); + complete_request(); + return interruptor::now(); }); - }, [this, pgref](std::exception_ptr eptr) { - if (should_abort_request(*this, std::move(eptr))) { - sequencer.abort(); - return seastar::stop_iteration::yes; - } else { - sequencer.maybe_reset(*this); - return seastar::stop_iteration::no; } - }, pgref); + return enter_stage(pp(pg).await_map + ).then_interruptible([this, this_instance_id, &pg] { + logger().debug("{}.{}: after await_map stage", *this, this_instance_id); + return with_blocking_event< + PG_OSDMapGate::OSDMapBlocker::BlockingEvent + >([this, &pg] (auto&& trigger) { + return pg.osdmap_gate.wait_for_map(std::move(trigger), + m->get_min_epoch()); + }); + }).then_interruptible([this, this_instance_id, &pg](auto map) { + logger().debug("{}.{}: after wait_for_map", *this, this_instance_id); + return enter_stage(pp(pg).wait_for_active); + }).then_interruptible([this, this_instance_id, &pg]() { + logger().debug( + "{}.{}: after wait_for_active stage", *this, this_instance_id); + return with_blocking_event< + PGActivationBlocker::BlockingEvent + >([&pg] (auto&& trigger) { + return pg.wait_for_active_blocker.wait(std::move(trigger)); + }); + }).then_interruptible([this, pgref, this_instance_id]() mutable + -> interruptible_future<> { + logger().debug( + "{}.{}: after wait_for_active", *this, this_instance_id); + if (is_pg_op()) { + return process_pg_op(pgref); + } else { + return process_op( + pgref + ).then_interruptible([](auto){}); + } + }).then_interruptible([this, this_instance_id, pgref] { + logger().debug("{}.{}: after process*", *this, this_instance_id); + pgref->client_request_orderer.remove_request(*this); + complete_request(); + }); + }, [this, this_instance_id, pgref](std::exception_ptr eptr) { + // TODO: better debug output + logger().debug("{}.{}: interrupted {}", *this, this_instance_id, eptr); + }, pgref).finally([opref=std::move(opref), pgref=std::move(pgref)] {}); } seastar::future<> ClientRequest::with_pg( ShardServices &shard_services, Ref pgref) { - return seastar::repeat([this, &shard_services, pgref]() mutable { - return with_pg_int(shard_services, pgref); - }).then([this] { - track_event(); - }); + pgref->client_request_orderer.add_request(*this); + auto ret = on_complete.get_future(); + std::ignore = with_pg_int( + shard_services, std::move(pgref) + ); + return ret; } ClientRequest::interruptible_future<> diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index 21e09bc57cc..a222ea6a03a 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -3,6 +3,11 @@ #pragma once +#include + +#include +#include + #include "osd/osd_op_util.h" #include "crimson/net/Connection.h" #include "crimson/osd/object_context.h" @@ -26,6 +31,8 @@ class ClientRequest final : public PhasedOperationT, crimson::net::ConnectionRef conn; Ref m; OpInfo op_info; + seastar::promise<> on_complete; + unsigned instance_id = 0; public: class PGPipeline : public CommonPGPipeline { @@ -42,6 +49,34 @@ public: friend class LttngBackend; }; + using ordering_hook_t = boost::intrusive::list_member_hook<>; + ordering_hook_t ordering_hook; + class Orderer { + using list_t = boost::intrusive::list< + ClientRequest, + boost::intrusive::member_hook< + ClientRequest, + typename ClientRequest::ordering_hook_t, + &ClientRequest::ordering_hook> + >; + list_t list; + + public: + void add_request(ClientRequest &request) { + assert(!request.ordering_hook.is_linked()); + intrusive_ptr_add_ref(&request); + list.push_back(request); + } + void remove_request(ClientRequest &request) { + assert(request.ordering_hook.is_linked()); + list.erase(list_t::s_iterator_to(request)); + intrusive_ptr_release(&request); + } + void requeue(ShardServices &shard_services, Ref pg); + void clear_and_cancel(); + }; + void complete_request(); + static constexpr OperationTypeCode type = OperationTypeCode::client_request; ClientRequest(OSD &osd, crimson::net::ConnectionRef, Ref &&m); @@ -58,7 +93,7 @@ public: PipelineHandle &get_handle() { return handle; } epoch_t get_epoch() const { return m->get_min_epoch(); } - seastar::future with_pg_int( + seastar::future<> with_pg_int( ShardServices &shard_services, Ref pg); public: bool same_session_and_pg(const ClientRequest& other_op) const; @@ -89,12 +124,6 @@ private: ConnectionPipeline &cp(); PGPipeline &pp(PG &pg); - class OpSequencer& sequencer; - // a tombstone used currently by OpSequencer. In the future it's supposed - // to be replaced with a reusage of OpTracking facilities. - bool finished = false; - friend class OpSequencer; - template using interruptible_errorator = ::crimson::interruptible::interruptible_errorator< diff --git a/src/crimson/osd/osd_operations/replicated_request.h b/src/crimson/osd/osd_operations/replicated_request.h index f07bc86105a..39c6d6a2473 100644 --- a/src/crimson/osd/osd_operations/replicated_request.h +++ b/src/crimson/osd/osd_operations/replicated_request.h @@ -39,10 +39,11 @@ public: seastar::future<> start(); std::tuple< + ConnectionPipeline::AwaitActive::BlockingEvent, ConnectionPipeline::AwaitMap::BlockingEvent, - OSD_OSDMapGate::OSDMapBlocker::BlockingEvent, ConnectionPipeline::GetPG::BlockingEvent, - PGMap::PGCreationBlockingEvent + PGMap::PGCreationBlockingEvent, + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent > tracking_events; private: diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 5aded130e55..910ab6c1903 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1149,12 +1149,20 @@ seastar::future<> PG::stop() } void PG::on_change(ceph::os::Transaction &t) { - logger().debug("{}, {}", __func__, *this); + logger().debug("{} {}:", *this, __func__); for (auto& obc : obc_set_accessing) { obc.interrupt(::crimson::common::actingset_changed(is_primary())); } recovery_backend->on_peering_interval_change(t); backend->on_actingset_changed({ is_primary() }); + wait_for_active_blocker.unblock(); + if (is_primary()) { + logger().debug("{} {}: requeueing", *this, __func__); + client_request_orderer.requeue(shard_services, this); + } else { + logger().debug("{} {}: dropping requests", *this, __func__); + client_request_orderer.clear_and_cancel(); + } } bool PG::can_discard_op(const MOSDOp& m) const { diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index b44aac4a952..f1fa1d32b42 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -55,7 +55,6 @@ namespace crimson::os { } namespace crimson::osd { -class ClientRequest; class OpsExecuter; class PG : public boost::intrusive_ref_counter< @@ -72,6 +71,8 @@ class PG : public boost::intrusive_ref_counter< PGPeeringPipeline peering_request_pg_pipeline; RepRequest::PGPipeline replicated_request_pg_pipeline; + ClientRequest::Orderer client_request_orderer; + spg_t pgid; pg_shard_t pg_whoami; crimson::os::CollectionRef coll_ref;