From 4ae2099628b67006c358b90aeb291fd9b1f78964 Mon Sep 17 00:00:00 2001 From: Xuehan Xu Date: Sun, 7 Feb 2021 17:19:12 +0800 Subject: [PATCH] crimson/osd: add I/O sequencer to preserve client_requests' order across PG interval change Signed-off-by: Xuehan Xu --- src/crimson/osd/osd_connection_priv.h | 1 + src/crimson/osd/osd_operation.h | 8 +- src/crimson/osd/osd_operation_sequencer.h | 80 +++++++++++++++++++ .../osd/osd_operations/client_request.cc | 67 ++++++++++------ .../osd/osd_operations/client_request.h | 2 + src/crimson/osd/pg.h | 7 +- 6 files changed, 136 insertions(+), 29 deletions(-) create mode 100644 src/crimson/osd/osd_operation_sequencer.h diff --git a/src/crimson/osd/osd_connection_priv.h b/src/crimson/osd/osd_connection_priv.h index a265bb43268c..018b16eebdb6 100644 --- a/src/crimson/osd/osd_connection_priv.h +++ b/src/crimson/osd/osd_connection_priv.h @@ -12,6 +12,7 @@ namespace crimson::osd { struct OSDConnectionPriv : public crimson::net::Connection::user_private_t { + OperationRepeatSequencer opSequencer; ClientRequest::ConnectionPipeline client_request_conn_pipeline; RemotePeeringEvent::ConnectionPipeline peering_request_conn_pipeline; RepRequest::ConnectionPipeline replicated_request_conn_pipeline; diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 85f5be2edf03..455d55414249 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -3,7 +3,7 @@ #pragma once -#include "crimson/common/operation.h" +#include "crimson/osd/osd_operation_sequencer.h" #include "crimson/osd/scheduler/scheduler.h" namespace crimson::osd { @@ -53,7 +53,13 @@ public: virtual ~OperationT() = default; private: + epoch_t interval_start_epoch = 0; + using ops_seq_iter_t = + typename OperationRepeatSequencer::ops_sequence_t::iterator; + std::optional pos; virtual void dump_detail(ceph::Formatter *f) const = 0; + template + friend class OperationRepeatSequencer; }; /** diff --git a/src/crimson/osd/osd_operation_sequencer.h b/src/crimson/osd/osd_operation_sequencer.h new file mode 100644 index 000000000000..3ce9d82827ee --- /dev/null +++ b/src/crimson/osd/osd_operation_sequencer.h @@ -0,0 +1,80 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include + +#include "crimson/common/operation.h" +#include "osd/osd_types.h" + +namespace crimson::osd { + +template +struct OperationComparator; + +template +class OperationRepeatSequencer { +public: + using OpRef = boost::intrusive_ptr; + using ops_sequence_t = std::map>; + + template > + seastar::futurize_t start_op( + HandleT& handle, + epoch_t same_interval_since, + OpRef& op, + const spg_t& pgid, + Func&& func) { + auto& ops = pg_ops[pgid]; + if (!op->pos) { + [[maybe_unused]] auto [it, inserted] = ops.emplace(op, seastar::promise<>()); + assert(inserted); + op->pos = it; + } + + auto curr_op_pos = *(op->pos); + const bool first = (curr_op_pos == ops.begin()); + auto prev_op_pos = first ? curr_op_pos : std::prev(curr_op_pos); + auto prev_ops_drained = seastar::now(); + if (epoch_t prev_interval = prev_op_pos->first->interval_start_epoch; + !first && same_interval_since > prev_interval) { + // need to wait for previous operations, + // release the current pipepline stage + handle.exit(); + auto& [prev_op, prev_op_complete] = *prev_op_pos; + ::crimson::get_logger(ceph_subsys_osd).debug( + "{}, same_interval_since: {}, previous op: {}, last_interval_start: {}", + *op, same_interval_since, prev_op, prev_interval); + prev_ops_drained = prev_op_complete.get_future(); + } else { + assert(same_interval_since == prev_interval || first); + } + return prev_ops_drained.then( + [op, same_interval_since, func=std::forward(func)]() mutable { + op->interval_start_epoch = same_interval_since; + auto fut = seastar::futurize_invoke(func); + auto curr_op_pos = *(op->pos); + curr_op_pos->second.set_value(); + curr_op_pos->second = seastar::promise<>(); + return fut; + }); + } + + void finish_op(OpRef& op, const spg_t& pgid) { + assert(op->pos); + pg_ops.at(pgid).erase(*(op->pos)); + } +private: + std::map, OperationComparator>> pg_ops; +}; +template +struct OperationComparator { + bool operator()( + const typename OperationRepeatSequencer::OpRef& left, + const typename OperationRepeatSequencer::OpRef& right) const { + return left->get_id() < right->get_id(); + } +}; + +} // namespace crimson::osd diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 357ad513f769..899a065fc69b 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -23,9 +23,14 @@ namespace crimson::osd { ClientRequest::ClientRequest( OSD &osd, crimson::net::ConnectionRef conn, Ref &&m) - : osd(osd), conn(conn), m(m) + : osd(osd), conn(conn), m(m), ors(get_osd_priv(conn.get()).opSequencer) {} +ClientRequest::~ClientRequest() +{ + logger().debug("{}: destroying", *this); +} + void ClientRequest::print(std::ostream &lhs) const { lhs << *m; @@ -60,6 +65,7 @@ seastar::future<> ClientRequest::start() return crimson::common::handle_system_shutdown( [this, opref=std::move(opref)]() mutable { return seastar::repeat([this, opref]() mutable { + logger().debug("{}: in repeat", *this); return with_blocking_future(handle.enter(cp().await_map)) .then([this]() { return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch())); @@ -67,33 +73,42 @@ seastar::future<> ClientRequest::start() return with_blocking_future(handle.enter(cp().get_pg)); }).then([this] { return with_blocking_future(osd.wait_for_pg(m->get_spg())); - }).then([this, opref](Ref pgref) { - PG &pg = *pgref; - if (pg.can_discard_op(*m)) { - return osd.send_incremental_map(conn, m->get_map_epoch()); - } - return with_blocking_future( - handle.enter(pp(pg).await_map) - ).then([this, &pg]() mutable { - return with_blocking_future( - pg.osdmap_gate.wait_for_map(m->get_min_epoch())); - }).then([this, &pg](auto map) mutable { - return with_blocking_future( - handle.enter(pp(pg).wait_for_active)); - }).then([this, &pg]() mutable { - return with_blocking_future(pg.wait_for_active_blocker.wait()); - }).then([this, pgref=std::move(pgref)]() mutable { - if (m->finish_decode()) { - m->clear_payload(); - } - if (is_pg_op()) { - return process_pg_op(pgref); - } else { - return process_op(pgref); + }).then([this, opref](Ref pgref) mutable { + epoch_t same_interval_since = pgref->get_interval_start_epoch(); + logger().debug("{} same_interval_since: {}", *this, same_interval_since); + return ors.start_op( + handle, same_interval_since, opref, pgref->get_pgid(), + [this, opref, pgref] { + PG &pg = *pgref; + if (pg.can_discard_op(*m)) { + logger().debug("{} op discarded, {}, same_primary_since: {}", + *this, pg, pg.get_info().history.same_primary_since); + return osd.send_incremental_map(conn, m->get_map_epoch()); } + return with_blocking_future( + handle.enter(pp(pg).await_map) + ).then([this, &pg]() mutable { + return with_blocking_future( + pg.osdmap_gate.wait_for_map(m->get_min_epoch())); + }).then([this, &pg](auto map) mutable { + return with_blocking_future( + handle.enter(pp(pg).wait_for_active)); + }).then([this, &pg]() mutable { + return with_blocking_future(pg.wait_for_active_blocker.wait()); + }).then([this, pgref=std::move(pgref)]() mutable { + if (m->finish_decode()) { + m->clear_payload(); + } + if (is_pg_op()) { + return process_pg_op(pgref); + } else { + return process_op(pgref); + } + }); + }).then([this, opref, pgref]() mutable { + ors.finish_op(opref, pgref->get_pgid()); + return seastar::stop_iteration::yes; }); - }).then([] { - return seastar::stop_iteration::yes; }).handle_exception_type([](crimson::common::actingset_changed& e) { if (e.is_primary()) { logger().debug("operation restart, acting set changed"); diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index b72195850834..33bb35ee38e4 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -53,6 +53,7 @@ public: static constexpr OperationTypeCode type = OperationTypeCode::client_request; ClientRequest(OSD &osd, crimson::net::ConnectionRef, Ref &&m); + ~ClientRequest(); void print(std::ostream &) const final; void dump_detail(Formatter *f) const final; @@ -73,6 +74,7 @@ private: ConnectionPipeline &cp(); PGPipeline &pp(PG &pg); + OperationRepeatSequencer& ors; private: bool is_misdirected(const PG& pg) const; }; diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 6a9ab301fe58..ac220f841731 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -1,5 +1,5 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab #pragma once @@ -618,6 +618,9 @@ public: const map &get_shard_missing() const { return peering_state.get_peer_missing(); } + epoch_t get_interval_start_epoch() const { + return get_info().history.same_interval_since; + } const pg_missing_const_i* get_shard_missing(pg_shard_t shard) const { if (shard == pg_whoami) return &get_local_missing(); -- 2.47.3