]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: implement interruptions in PeeringEvent.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 4 Oct 2021 14:49:24 +0000 (14:49 +0000)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 7 Oct 2021 11:33:18 +0000 (11:33 +0000)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/osd_operations/compound_peering_request.cc
src/crimson/osd/osd_operations/peering_event.cc
src/crimson/osd/osd_operations/peering_event.h

index 762421a7f55bfa788d866ae309eac70512720647..42e827600b2fc71ccfc36b5ee1b51e6a35f77c91 100644 (file)
@@ -42,7 +42,8 @@ public:
   PeeringSubEvent(compound_state_ref state, Args &&... args) :
     RemotePeeringEvent(std::forward<Args>(args)...), state(state) {}
 
-  seastar::future<> complete_rctx(Ref<crimson::osd::PG> pg) final {
+  PeeringEvent::interruptible_future<>
+  complete_rctx(Ref<crimson::osd::PG> pg) final {
     logger().debug("{}: submitting ctx transaction", *this);
     state->ctx.accept_buffered_messages(ctx);
     state = {};
index 83e37734f198c3ff927e1965c83006751c51bcf1..2b8da710e59ad815ec0afc0bf3d45e8647f82425 100644 (file)
@@ -62,34 +62,43 @@ seastar::future<> PeeringEvent::start()
   return maybe_delay.then([this] {
     return get_pg();
   }).then([this](Ref<PG> pg) {
-    if (!pg) {
-      logger().warn("{}: pg absent, did not create", *this);
-      on_pg_absent();
-      handle.exit();
-      return complete_rctx(pg);
-    }
-    logger().debug("{}: pg present", *this);
-    return with_blocking_future(handle.enter(pp(*pg).await_map)
-    ).then([this, pg] {
-      return with_blocking_future(
-        pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
-    }).then([this, pg](auto) {
-      return with_blocking_future(handle.enter(pp(*pg).process));
-    }).then([this, pg] {
-      // TODO: likely we should synchronize also with the pg log-based
-      // recovery.
-      return with_blocking_future(
-        handle.enter(BackfillRecovery::bp(*pg).process));
-    }).then([this, pg] {
-      pg->do_peering_event(evt, ctx);
-      handle.exit();
-      return complete_rctx(pg);
-    }).then([this, pg] {
-      return pg->get_need_up_thru() ? shard_services.send_alive(pg->get_same_interval_since())
-                             : seastar::now();
-    });
-  }).then([this] {
-    return shard_services.send_pg_temp();
+    return interruptor::with_interruption([this, pg] {
+      if (!pg) {
+        logger().warn("{}: pg absent, did not create", *this);
+        on_pg_absent();
+        handle.exit();
+        return complete_rctx(pg);
+      }
+      logger().debug("{}: pg present", *this);
+      return with_blocking_future(handle.enter(pp(*pg).await_map)
+      ).then([this, pg] {
+        return with_blocking_future(
+          pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
+      }).then([this, pg](auto) {
+        return with_blocking_future(handle.enter(pp(*pg).process));
+      }).then([this, pg] {
+        // TODO: likely we should synchronize also with the pg log-based
+        // recovery.
+        return with_blocking_future(
+          handle.enter(BackfillRecovery::bp(*pg).process));
+      }).then([this, pg] {
+        pg->do_peering_event(evt, ctx);
+        handle.exit();
+        return complete_rctx(pg);
+      }).then_interruptible([this, pg] () -> PeeringEvent::interruptible_future<> {
+        if (!pg->get_need_up_thru()) {
+          return seastar::now();
+        }
+        return shard_services.send_alive(pg->get_same_interval_since());
+      }).then_interruptible([this] {
+        return shard_services.send_pg_temp();
+      });
+    },
+    [this](std::exception_ptr ep) {
+      logger().debug("{}: interrupted with {}", *this, ep);
+      return seastar::now();
+    },
+    pg);
   }).finally([ref=std::move(ref)] {
     logger().debug("{}: complete", *ref);
   });
@@ -100,7 +109,7 @@ void PeeringEvent::on_pg_absent()
   logger().debug("{}: pg absent, dropping", *this);
 }
 
-seastar::future<> PeeringEvent::complete_rctx(Ref<PG> pg)
+PeeringEvent::interruptible_future<> PeeringEvent::complete_rctx(Ref<PG> pg)
 {
   logger().debug("{}: submitting ctx", *this);
   return shard_services.dispatch_context(
@@ -141,7 +150,7 @@ void RemotePeeringEvent::on_pg_absent()
   }
 }
 
-seastar::future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg)
+PeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg)
 {
   if (pg) {
     return PeeringEvent::complete_rctx(pg);
index 9d9478fa5359029c463130e80497f0689bde35f3..3e6b907e9ffedf9eeaf6e3899b4025cf7d4785cb 100644 (file)
@@ -60,7 +60,7 @@ protected:
   }
 
   virtual void on_pg_absent();
-  virtual seastar::future<> complete_rctx(Ref<PG>);
+  virtual PeeringEvent::interruptible_future<> complete_rctx(Ref<PG>);
   virtual seastar::future<Ref<PG>> get_pg() = 0;
 
 public:
@@ -95,7 +95,7 @@ protected:
   crimson::net::ConnectionRef conn;
 
   void on_pg_absent() final;
-  seastar::future<> complete_rctx(Ref<PG> pg) override;
+  PeeringEvent::interruptible_future<> complete_rctx(Ref<PG> pg) override;
   seastar::future<Ref<PG>> get_pg() final;
 
 public: