From e6d10da26ed41b859b19f292d6a75ed037258c4f Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Mon, 4 Oct 2021 14:49:24 +0000 Subject: [PATCH] crimson/osd: implement interruptions in PeeringEvent. Signed-off-by: Radoslaw Zarzynski --- .../compound_peering_request.cc | 3 +- .../osd/osd_operations/peering_event.cc | 69 +++++++++++-------- .../osd/osd_operations/peering_event.h | 4 +- 3 files changed, 43 insertions(+), 33 deletions(-) diff --git a/src/crimson/osd/osd_operations/compound_peering_request.cc b/src/crimson/osd/osd_operations/compound_peering_request.cc index 762421a7f55bf..42e827600b2fc 100644 --- a/src/crimson/osd/osd_operations/compound_peering_request.cc +++ b/src/crimson/osd/osd_operations/compound_peering_request.cc @@ -42,7 +42,8 @@ public: PeeringSubEvent(compound_state_ref state, Args &&... args) : RemotePeeringEvent(std::forward(args)...), state(state) {} - seastar::future<> complete_rctx(Ref pg) final { + PeeringEvent::interruptible_future<> + complete_rctx(Ref pg) final { logger().debug("{}: submitting ctx transaction", *this); state->ctx.accept_buffered_messages(ctx); state = {}; diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc index 83e37734f198c..2b8da710e59ad 100644 --- a/src/crimson/osd/osd_operations/peering_event.cc +++ b/src/crimson/osd/osd_operations/peering_event.cc @@ -62,34 +62,43 @@ seastar::future<> PeeringEvent::start() return maybe_delay.then([this] { return get_pg(); }).then([this](Ref pg) { - if (!pg) { - logger().warn("{}: pg absent, did not create", *this); - on_pg_absent(); - handle.exit(); - return complete_rctx(pg); - } - logger().debug("{}: pg present", *this); - return with_blocking_future(handle.enter(pp(*pg).await_map) - ).then([this, pg] { - return with_blocking_future( - pg->osdmap_gate.wait_for_map(evt.get_epoch_sent())); - }).then([this, pg](auto) { - return with_blocking_future(handle.enter(pp(*pg).process)); - }).then([this, pg] { - // TODO: likely we should synchronize also with the pg log-based - // recovery. - return with_blocking_future( - handle.enter(BackfillRecovery::bp(*pg).process)); - }).then([this, pg] { - pg->do_peering_event(evt, ctx); - handle.exit(); - return complete_rctx(pg); - }).then([this, pg] { - return pg->get_need_up_thru() ? shard_services.send_alive(pg->get_same_interval_since()) - : seastar::now(); - }); - }).then([this] { - return shard_services.send_pg_temp(); + return interruptor::with_interruption([this, pg] { + if (!pg) { + logger().warn("{}: pg absent, did not create", *this); + on_pg_absent(); + handle.exit(); + return complete_rctx(pg); + } + logger().debug("{}: pg present", *this); + return with_blocking_future(handle.enter(pp(*pg).await_map) + ).then([this, pg] { + return with_blocking_future( + pg->osdmap_gate.wait_for_map(evt.get_epoch_sent())); + }).then([this, pg](auto) { + return with_blocking_future(handle.enter(pp(*pg).process)); + }).then([this, pg] { + // TODO: likely we should synchronize also with the pg log-based + // recovery. + return with_blocking_future( + handle.enter(BackfillRecovery::bp(*pg).process)); + }).then([this, pg] { + pg->do_peering_event(evt, ctx); + handle.exit(); + return complete_rctx(pg); + }).then_interruptible([this, pg] () -> PeeringEvent::interruptible_future<> { + if (!pg->get_need_up_thru()) { + return seastar::now(); + } + return shard_services.send_alive(pg->get_same_interval_since()); + }).then_interruptible([this] { + return shard_services.send_pg_temp(); + }); + }, + [this](std::exception_ptr ep) { + logger().debug("{}: interrupted with {}", *this, ep); + return seastar::now(); + }, + pg); }).finally([ref=std::move(ref)] { logger().debug("{}: complete", *ref); }); @@ -100,7 +109,7 @@ void PeeringEvent::on_pg_absent() logger().debug("{}: pg absent, dropping", *this); } -seastar::future<> PeeringEvent::complete_rctx(Ref pg) +PeeringEvent::interruptible_future<> PeeringEvent::complete_rctx(Ref pg) { logger().debug("{}: submitting ctx", *this); return shard_services.dispatch_context( @@ -141,7 +150,7 @@ void RemotePeeringEvent::on_pg_absent() } } -seastar::future<> RemotePeeringEvent::complete_rctx(Ref pg) +PeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(Ref pg) { if (pg) { return PeeringEvent::complete_rctx(pg); diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h index 9d9478fa53590..3e6b907e9ffed 100644 --- a/src/crimson/osd/osd_operations/peering_event.h +++ b/src/crimson/osd/osd_operations/peering_event.h @@ -60,7 +60,7 @@ protected: } virtual void on_pg_absent(); - virtual seastar::future<> complete_rctx(Ref); + virtual PeeringEvent::interruptible_future<> complete_rctx(Ref); virtual seastar::future> get_pg() = 0; public: @@ -95,7 +95,7 @@ protected: crimson::net::ConnectionRef conn; void on_pg_absent() final; - seastar::future<> complete_rctx(Ref pg) override; + PeeringEvent::interruptible_future<> complete_rctx(Ref pg) override; seastar::future> get_pg() final; public: -- 2.39.5