From abaa12ff2d8629d20b9c1409f5c72a4ca07fcfef Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 27 Jan 2022 22:55:11 +0000 Subject: [PATCH] crimson/osd/osd_operations/peering_event: refactor to use OSD::start_pg_operation Signed-off-by: Samuel Just --- src/crimson/osd/osd.cc | 6 +- .../compound_peering_request.cc | 8 +- .../osd/osd_operations/peering_event.cc | 142 +++++++----------- .../osd/osd_operations/peering_event.h | 24 +-- 4 files changed, 70 insertions(+), 110 deletions(-) diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 17339eaddd8..c73920c2ce7 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -1267,8 +1267,7 @@ seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn, pg_shard_t from_shard{static_cast(m->get_source().num()), pgid.shard}; PeeringState::RequestScrub scrub_request{m->deep, m->repair}; - return shard_services.start_operation( - *this, + return start_pg_operation( conn, shard_services, from_shard, @@ -1381,8 +1380,7 @@ seastar::future<> OSD::handle_peering_op( const int from = m->get_source().num(); logger().debug("handle_peering_op on {} from {}", m->get_spg(), from); std::unique_ptr evt(m->get_event()); - (void) shard_services.start_operation( - *this, + (void) start_pg_operation( conn, shard_services, pg_shard_t{from, m->get_spg().shard}, diff --git a/src/crimson/osd/osd_operations/compound_peering_request.cc b/src/crimson/osd/osd_operations/compound_peering_request.cc index 86da1a47006..ec6487dfa21 100644 --- a/src/crimson/osd/osd_operations/compound_peering_request.cc +++ b/src/crimson/osd/osd_operations/compound_peering_request.cc @@ -52,7 +52,7 @@ public: ceph_assert(ctx.transaction.empty()); return seastar::now(); } else { - return osd.get_shard_services().dispatch_context_transaction( + return shard_services.dispatch_context_transaction( pg->get_collection_ref(), ctx); } } @@ -83,9 +83,8 @@ std::vector handle_pg_create( pgid, m->epoch, pi, history); } else { - auto op = osd.get_shard_services().start_operation( + auto op = osd.start_pg_operation( state, - osd, conn, osd.get_shard_services(), pg_shard_t(), @@ -106,7 +105,8 @@ std::vector handle_pg_create( namespace crimson::osd { CompoundPeeringRequest::CompoundPeeringRequest( - OSD &osd, crimson::net::ConnectionRef conn, Ref m) + OSD &osd, + crimson::net::ConnectionRef conn, Ref m) : osd(osd), conn(conn), m(m) diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc index 19cf992031c..0b26833fd75 100644 --- a/src/crimson/osd/osd_operations/peering_event.cc +++ b/src/crimson/osd/osd_operations/peering_event.cc @@ -53,63 +53,50 @@ PGPeeringPipeline &PeeringEvent::pp(PG &pg) } template -seastar::future<> PeeringEvent::start() +seastar::future<> PeeringEvent::with_pg( + ShardServices &shard_services, Ref pg) { - logger().debug("{}: start", *this); - - typename T::IRef ref = static_cast(this); - auto maybe_delay = seastar::now(); - if (delay) { - maybe_delay = seastar::sleep( - std::chrono::milliseconds(std::lround(delay * 1000))); + if (!pg) { + logger().warn("{}: pg absent, did not create", *this); + on_pg_absent(); + handle.exit(); + return complete_rctx_no_pg(); } - return maybe_delay.then([this] { - return get_pg(); - }).then([this](Ref pg) { - if (!pg) { - logger().warn("{}: pg absent, did not create", *this); - on_pg_absent(); - handle.exit(); - return complete_rctx_no_pg(); - } - using interruptor = typename T::interruptor; - return interruptor::with_interruption([this, pg] { - logger().debug("{}: pg present", *this); - return this->template enter_stage( - pp(*pg).await_map - ).then_interruptible([this, pg] { - return this->template with_blocking_event( - [this, pg] (auto&& trigger) { - return pg->osdmap_gate.wait_for_map(std::move(trigger), - evt.get_epoch_sent()); + + using interruptor = typename T::interruptor; + return interruptor::with_interruption([this, pg, &shard_services] { + logger().debug("{}: pg present", *this); + return this->template enter_stage(pp(*pg).await_map + ).then_interruptible([this, pg] { + return this->template with_blocking_event< + PG_OSDMapGate::OSDMapBlocker::BlockingEvent + >([this, pg](auto &&trigger) { + return pg->osdmap_gate.wait_for_map( + std::move(trigger), evt.get_epoch_sent()); }); - }).then_interruptible([this, pg](auto) { - return this->template enter_stage(pp(*pg).process); - }).then_interruptible([this, pg] { - // TODO: likely we should synchronize also with the pg log-based - // recovery. - return this->template enter_stage(BackfillRecovery::bp(*pg).process); - }).then_interruptible([this, pg] { - pg->do_peering_event(evt, ctx); - handle.exit(); - return complete_rctx(pg); - }).then_interruptible([this, pg] () -> typename T::template interruptible_future<> { + }).then_interruptible([this, pg](auto) { + return this->template enter_stage(pp(*pg).process); + }).then_interruptible([this, pg] { + // TODO: likely we should synchronize also with the pg log-based + // recovery. + return this->template enter_stage( + BackfillRecovery::bp(*pg).process); + }).then_interruptible([this, pg] { + pg->do_peering_event(evt, ctx); + handle.exit(); + return complete_rctx(pg); + }).then_interruptible([pg, &shard_services]() + -> typename T::template interruptible_future<> { if (!pg->get_need_up_thru()) { return seastar::now(); } return shard_services.send_alive(pg->get_same_interval_since()); - }).then_interruptible([this] { + }).then_interruptible([&shard_services] { return shard_services.send_pg_temp(); }); - }, - [this](std::exception_ptr ep) { - logger().debug("{}: interrupted with {}", *this, ep); - return seastar::now(); - }, - pg); - }).finally([ref=std::move(ref)] { - logger().debug("{}: complete", *ref); - }); + }, [this](std::exception_ptr ep) { + logger().debug("{}: interrupted with {}", *this, ep); + }, pg); } template @@ -128,16 +115,11 @@ PeeringEvent::complete_rctx(Ref pg) std::move(ctx)); } -ConnectionPipeline &RemotePeeringEvent::cp() +ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline() { return get_osd_priv(conn.get()).peering_request_conn_pipeline; } -RemotePeeringEvent::OSDPipeline &RemotePeeringEvent::op() -{ - return osd.peering_request_osd_pipeline; -} - void RemotePeeringEvent::on_pg_absent() { if (auto& e = get_event().get_event(); @@ -166,54 +148,32 @@ RemotePeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(Ref if (pg) { return PeeringEvent::complete_rctx(pg); } else { - logger().debug("{}: OSDState is {}", *this, osd.state); - return osd.state.when_active().then([this] { - assert(osd.state.is_active()); - return shard_services.dispatch_context_messages(std::move(ctx)); - }); + return shard_services.dispatch_context_messages(std::move(ctx)); } } seastar::future<> RemotePeeringEvent::complete_rctx_no_pg() { - logger().debug("{}: OSDState is {}", *this, osd.state); - return osd.state.when_active().then([this] { - assert(osd.state.is_active()); - return shard_services.dispatch_context_messages(std::move(ctx)); - }); + return shard_services.dispatch_context_messages(std::move(ctx)); } -seastar::future> RemotePeeringEvent::get_pg() +seastar::future<> LocalPeeringEvent::start() { - return enter_stage<>(op().await_active).then([this] { - return osd.state.when_active(); - }).then([this] { - return enter_stage<>(cp().await_map); - }).then([this] { - using OSDMapBlockingEvent = - OSD_OSDMapGate::OSDMapBlocker::BlockingEvent; - return with_blocking_event( - [this] (auto&& trigger) { - return osd.osdmap_gate.wait_for_map(std::move(trigger), - evt.get_epoch_sent()); - }); - }).then([this](auto epoch) { - logger().debug("{}: got map {}", *this, epoch); - return enter_stage<>(cp().get_pg); - }).then([this] { - return with_blocking_event( - [this] (auto&& trigger) { - return osd.get_or_create_pg(std::move(trigger), - pgid, - evt.get_epoch_sent(), - std::move(evt.create_info)); - }); + logger().debug("{}: start", *this); + + IRef ref = this; + auto maybe_delay = seastar::now(); + if (delay) { + maybe_delay = seastar::sleep( + std::chrono::milliseconds(std::lround(delay * 1000))); + } + return maybe_delay.then([this] { + return with_pg(shard_services, pg); + }).finally([ref=std::move(ref)] { + logger().debug("{}: complete", *ref); }); } -seastar::future> LocalPeeringEvent::get_pg() { - return seastar::make_ready_future>(pg); -} LocalPeeringEvent::~LocalPeeringEvent() {} diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h index ac81345a76d..5c1b707c8b6 100644 --- a/src/crimson/osd/osd_operations/peering_event.h +++ b/src/crimson/osd/osd_operations/peering_event.h @@ -71,7 +71,6 @@ protected: complete_rctx(Ref); virtual seastar::future<> complete_rctx_no_pg() { return seastar::now();} - virtual seastar::future> get_pg() = 0; public: template @@ -96,18 +95,17 @@ public: void print(std::ostream &) const final; void dump_detail(ceph::Formatter* f) const final; - seastar::future<> start(); + seastar::future<> with_pg( + ShardServices &shard_services, Ref pg); }; class RemotePeeringEvent : public PeeringEvent { protected: - OSD &osd; crimson::net::ConnectionRef conn; void on_pg_absent() final; PeeringEvent::interruptible_future<> complete_rctx(Ref pg) override; seastar::future<> complete_rctx_no_pg() override; - seastar::future> get_pg() final; public: class OSDPipeline { @@ -119,9 +117,8 @@ public: }; template - RemotePeeringEvent(OSD &osd, crimson::net::ConnectionRef conn, Args&&... args) : + RemotePeeringEvent(crimson::net::ConnectionRef conn, Args&&... args) : PeeringEvent(std::forward(args)...), - osd(osd), conn(conn) {} @@ -152,15 +149,19 @@ public: #endif CompletionEvent > tracking_events; -private: - ConnectionPipeline &cp(); - OSDPipeline &op(); + + static constexpr bool can_create() { return true; } + auto get_create_info() { return std::move(evt.create_info); } + spg_t get_pgid() const { + return pgid; + } + ConnectionPipeline &get_connection_pipeline(); + PipelineHandle &get_handle() { return handle; } + epoch_t get_epoch() const { return evt.get_epoch_sent(); } }; class LocalPeeringEvent final : public PeeringEvent { protected: - seastar::future> get_pg() final; - Ref pg; public: @@ -170,6 +171,7 @@ public: pg(pg) {} + seastar::future<> start(); virtual ~LocalPeeringEvent(); std::tuple< -- 2.39.5