]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: osd operations respect interruptor's InterruptCondition.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 12 Oct 2021 09:38:23 +0000 (09:38 +0000)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 12 Oct 2021 11:31:33 +0000 (11:31 +0000)
For the sake of DRY.

Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/common/interruptible_future.h
src/crimson/osd/osd_operations/background_recovery.cc
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/internal_client_request.cc
src/crimson/osd/osd_operations/peering_event.cc

index f41bc4f3ededd5af81760ad21ebaf4b6c62a67a5..b490945f0f76d95adcb4b62f093f972c4e83a58c 100644 (file)
@@ -1034,6 +1034,8 @@ template <typename InterruptCond>
 struct interruptor
 {
 public:
+  using condition = InterruptCond;
+
   template <typename FutureType>
   [[gnu::always_inline]]
   static interruptible_future_detail<InterruptCond, FutureType>
index 7614cc9102a6c7d40aa64d1ba9c60a6f68b7fcc4..50089dbeda68476f7707d04664a43ffcb388a8ac 100644 (file)
@@ -70,7 +70,7 @@ UrgentRecovery::do_recovery()
 {
   logger().debug("{}: {}", __func__, *this);
   if (!pg->has_reset_since(epoch_started)) {
-    return with_blocking_future_interruptible<IOInterruptCondition>(
+    return with_blocking_future_interruptible<interruptor::condition>(
       pg->get_recovery_handler()->recover_missing(soid, need)
     ).then_interruptible([] {
       return seastar::make_ready_future<bool>(false);
@@ -113,7 +113,7 @@ PglogBasedRecovery::do_recovery()
 {
   if (pg->has_reset_since(epoch_started))
     return seastar::make_ready_future<bool>(false);
-  return with_blocking_future_interruptible<IOInterruptCondition>(
+  return with_blocking_future_interruptible<interruptor::condition>(
     pg->get_recovery_handler()->start_recovery_ops(
       crimson::common::local_conf()->osd_recovery_max_single_start));
 }
@@ -134,7 +134,7 @@ BackfillRecovery::do_recovery()
     return seastar::make_ready_future<bool>(false);
   }
   // TODO: limits
-  return with_blocking_future_interruptible<IOInterruptCondition>(
+  return with_blocking_future_interruptible<interruptor::condition>(
     // process_event() of our boost::statechart machine is non-reentrant.
     // with the backfill_pipeline we protect it from a second entry from
     // the implementation of BackfillListener.
index d5d90f4a2061f822ae18095fa02fc3f544705f38..3a5111ce0d4171e2da7d444c84db27fb73e47f17 100644 (file)
@@ -104,16 +104,16 @@ seastar::future<> ClientRequest::start()
                   return interruptor::now();
               });
             }
-            return with_blocking_future_interruptible<IOInterruptCondition>(
+            return with_blocking_future_interruptible<interruptor::condition>(
               handle.enter(pp(pg).await_map)
             ).then_interruptible([this, &pg] {
-              return with_blocking_future_interruptible<IOInterruptCondition>(
+              return with_blocking_future_interruptible<interruptor::condition>(
                 pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
             }).then_interruptible([this, &pg](auto map) {
-              return with_blocking_future_interruptible<IOInterruptCondition>(
+              return with_blocking_future_interruptible<interruptor::condition>(
                 handle.enter(pp(pg).wait_for_active));
             }).then_interruptible([this, &pg]() {
-              return with_blocking_future_interruptible<IOInterruptCondition>(
+              return with_blocking_future_interruptible<interruptor::condition>(
                 pg.wait_for_active_blocker.wait());
             }).then_interruptible([this, pgref=std::move(pgref)]() mutable {
               if (is_pg_op()) {
@@ -157,7 +157,7 @@ ClientRequest::process_pg_op(
 ClientRequest::interruptible_future<>
 ClientRequest::process_op(Ref<PG> &pg)
 {
-  return with_blocking_future_interruptible<IOInterruptCondition>(
+  return with_blocking_future_interruptible<interruptor::condition>(
       handle.enter(pp(*pg).recover_missing))
   .then_interruptible(
     [this, pg]() mutable {
@@ -172,14 +172,14 @@ ClientRequest::process_op(Ref<PG> &pg)
           CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
         return conn->send(std::move(reply));
       } else {
-        return with_blocking_future_interruptible<IOInterruptCondition>(
+        return with_blocking_future_interruptible<interruptor::condition>(
             handle.enter(pp(*pg).get_obc)).then_interruptible(
           [this, pg]() mutable -> PG::load_obc_iertr::future<> {
           logger().debug("{}: got obc lock", *this);
           op_info.set_from_op(&*m, *pg->get_osdmap());
           return pg->with_locked_obc(m->get_hobj(), op_info,
                                      [this, pg](auto obc) mutable {
-            return with_blocking_future_interruptible<IOInterruptCondition>(
+            return with_blocking_future_interruptible<interruptor::condition>(
               handle.enter(pp(*pg).process)
             ).then_interruptible([this, pg, obc]() mutable {
               return do_process(pg, obc);
@@ -217,13 +217,13 @@ ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
     [this, pg](auto submitted, auto all_completed) mutable {
     return submitted.then_interruptible(
       [this, pg] {
-        return with_blocking_future_interruptible<IOInterruptCondition>(
+        return with_blocking_future_interruptible<interruptor::condition>(
             handle.enter(pp(*pg).wait_repop));
     }).then_interruptible(
       [this, pg, all_completed=std::move(all_completed)]() mutable {
       return all_completed.safe_then_interruptible(
         [this, pg](MURef<MOSDOpReply> reply) {
-        return with_blocking_future_interruptible<IOInterruptCondition>(
+        return with_blocking_future_interruptible<interruptor::condition>(
             handle.enter(pp(*pg).send_reply)).then_interruptible(
               [this, reply=std::move(reply)]() mutable{
               return conn->send(std::move(reply));
index 3859b388a8cb2e8e4f494748dda7ac7d2f46a57e..8b15005ef3d20318e302ae957f2950ee9d2c30d0 100644 (file)
@@ -43,18 +43,18 @@ seastar::future<> InternalClientRequest::start()
     return seastar::repeat([this] {
       logger().debug("{}: in repeat", *this);
       return interruptor::with_interruption([this]() mutable {
-        return with_blocking_future_interruptible<IOInterruptCondition>(
+        return with_blocking_future_interruptible<interruptor::condition>(
           handle.enter(pp().wait_for_active)
         ).then_interruptible([this] {
-          return with_blocking_future_interruptible<IOInterruptCondition>(
+          return with_blocking_future_interruptible<interruptor::condition>(
             pg->wait_for_active_blocker.wait());
         }).then_interruptible([this] {
-          return with_blocking_future_interruptible<IOInterruptCondition>(
+          return with_blocking_future_interruptible<interruptor::condition>(
             handle.enter(pp().recover_missing)
           ).then_interruptible([this] {
             return do_recover_missing(pg, {});
           }).then_interruptible([this] {
-            return with_blocking_future_interruptible<IOInterruptCondition>(
+            return with_blocking_future_interruptible<interruptor::condition>(
               handle.enter(pp().get_obc)
             ).then_interruptible([this] () -> PG::load_obc_iertr::future<> {
               logger().debug("{}: getting obc lock", *this);
@@ -67,7 +67,7 @@ seastar::future<> InternalClientRequest::start()
                 assert(ret == 0);
                 return pg->with_locked_obc(get_target_oid(), op_info,
                   [&osd_ops, this](auto obc) {
-                  return with_blocking_future_interruptible<IOInterruptCondition>(
+                  return with_blocking_future_interruptible<interruptor::condition>(
                     handle.enter(pp().process)
                   ).then_interruptible(
                     [obc=std::move(obc), &osd_ops, this] {
index 194876b1d5e1364a10c7ed1dc130d530d7a177a6..398fc945a099f9bb0ab4659a68aee084c303bcd1 100644 (file)
@@ -70,18 +70,18 @@ seastar::future<> PeeringEvent::start()
         return complete_rctx(pg);
       }
       logger().debug("{}: pg present", *this);
-      return with_blocking_future_interruptible<IOInterruptCondition>(
+      return with_blocking_future_interruptible<interruptor::condition>(
         handle.enter(pp(*pg).await_map)
       ).then_interruptible([this, pg] {
-        return with_blocking_future_interruptible<IOInterruptCondition>(
+        return with_blocking_future_interruptible<interruptor::condition>(
           pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
       }).then_interruptible([this, pg](auto) {
-        return with_blocking_future_interruptible<IOInterruptCondition>(
+        return with_blocking_future_interruptible<interruptor::condition>(
           handle.enter(pp(*pg).process));
       }).then_interruptible([this, pg] {
         // TODO: likely we should synchronize also with the pg log-based
         // recovery.
-        return with_blocking_future_interruptible<IOInterruptCondition>(
+        return with_blocking_future_interruptible<interruptor::condition>(
           handle.enter(BackfillRecovery::bp(*pg).process));
       }).then_interruptible([this, pg] {
         pg->do_peering_event(evt, ctx);