return maybe_delay.then([this] {
return get_pg();
}).then([this](Ref<PG> pg) {
- if (!pg) {
- logger().warn("{}: pg absent, did not create", *this);
- on_pg_absent();
- handle.exit();
- return complete_rctx(pg);
- }
- logger().debug("{}: pg present", *this);
- return with_blocking_future(handle.enter(pp(*pg).await_map)
- ).then([this, pg] {
- return with_blocking_future(
- pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
- }).then([this, pg](auto) {
- return with_blocking_future(handle.enter(pp(*pg).process));
- }).then([this, pg] {
- // TODO: likely we should synchronize also with the pg log-based
- // recovery.
- return with_blocking_future(
- handle.enter(BackfillRecovery::bp(*pg).process));
- }).then([this, pg] {
- pg->do_peering_event(evt, ctx);
- handle.exit();
- return complete_rctx(pg);
- }).then([this, pg] {
- return pg->get_need_up_thru() ? shard_services.send_alive(pg->get_same_interval_since())
- : seastar::now();
- });
- }).then([this] {
- return shard_services.send_pg_temp();
+ return interruptor::with_interruption([this, pg] {
+ if (!pg) {
+ logger().warn("{}: pg absent, did not create", *this);
+ on_pg_absent();
+ handle.exit();
+ return complete_rctx(pg);
+ }
+ logger().debug("{}: pg present", *this);
+ return with_blocking_future(handle.enter(pp(*pg).await_map)
+ ).then([this, pg] {
+ return with_blocking_future(
+ pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
+ }).then([this, pg](auto) {
+ return with_blocking_future(handle.enter(pp(*pg).process));
+ }).then([this, pg] {
+ // TODO: likely we should synchronize also with the pg log-based
+ // recovery.
+ return with_blocking_future(
+ handle.enter(BackfillRecovery::bp(*pg).process));
+ }).then([this, pg] {
+ pg->do_peering_event(evt, ctx);
+ handle.exit();
+ return complete_rctx(pg);
+ }).then_interruptible([this, pg] () -> PeeringEvent::interruptible_future<> {
+ if (!pg->get_need_up_thru()) {
+ return seastar::now();
+ }
+ return shard_services.send_alive(pg->get_same_interval_since());
+ }).then_interruptible([this] {
+ return shard_services.send_pg_temp();
+ });
+ },
+ [this](std::exception_ptr ep) {
+ logger().debug("{}: interrupted with {}", *this, ep);
+ return seastar::now();
+ },
+ pg);
}).finally([ref=std::move(ref)] {
logger().debug("{}: complete", *ref);
});
logger().debug("{}: pg absent, dropping", *this);
}
-seastar::future<> PeeringEvent::complete_rctx(Ref<PG> pg)
+PeeringEvent::interruptible_future<> PeeringEvent::complete_rctx(Ref<PG> pg)
{
logger().debug("{}: submitting ctx", *this);
return shard_services.dispatch_context(
}
}
-seastar::future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg)
+PeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg)
{
if (pg) {
return PeeringEvent::complete_rctx(pg);