]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/.../internal_client_request: factor out with_interruption
authorSamuel Just <sjust@redhat.com>
Wed, 11 Sep 2024 01:31:57 +0000 (01:31 +0000)
committerSamuel Just <sjust@redhat.com>
Tue, 15 Oct 2024 03:37:26 +0000 (20:37 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/osd_operations/internal_client_request.cc
src/crimson/osd/osd_operations/internal_client_request.h

index b1224f6e25942c8aaf554f11a00d0422305e25b3..d4213928a3e462842039d288c35b739d10e08346 100644 (file)
@@ -50,6 +50,77 @@ CommonPGPipeline& InternalClientRequest::client_pp()
   return pg->request_pg_pipeline;
 }
 
+InternalClientRequest::interruptible_future<>
+InternalClientRequest::with_interruption()
+{
+  return enter_stage<interruptor>(
+    client_pp().wait_for_active
+  ).then_interruptible([this] {
+    return with_blocking_event<PGActivationBlocker::BlockingEvent,
+      interruptor>([this] (auto&& trigger) {
+       return pg->wait_for_active_blocker.wait(std::move(trigger));
+      });
+  }).then_interruptible([this] {
+    return enter_stage<interruptor>(
+      client_pp().recover_missing);
+  }).then_interruptible([this] {
+    return do_recover_missing(pg, get_target_oid(), osd_reqid_t());
+  }).then_interruptible([this](bool unfound) {
+    if (unfound) {
+      throw std::system_error(
+       std::make_error_code(std::errc::operation_canceled),
+       fmt::format("{} is unfound, drop it!", get_target_oid()));
+    }
+    return enter_stage<interruptor>(
+      client_pp().get_obc);
+  }).then_interruptible([this] () -> PG::load_obc_iertr::future<> {
+      LOG_PREFIX(InternalClientRequest::with_interruption);
+      DEBUGI("{}: getting obc lock", *this);
+      return seastar::do_with(
+       create_osd_ops(),
+       [this](auto& osd_ops) mutable {
+         LOG_PREFIX(InternalClientRequest::with_interruption);
+         DEBUGI("InternalClientRequest: got {} OSDOps to execute",
+                std::size(osd_ops));
+         [[maybe_unused]] const int ret = op_info.set_from_op(
+           std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap());
+         assert(ret == 0);
+         // call with_locked_obc() in order, but wait concurrently for loading.
+         enter_stage_sync(client_pp().lock_obc);
+         return pg->with_locked_obc(
+           get_target_oid(), op_info,
+           [&osd_ops, this](auto, auto obc) {
+             return enter_stage<interruptor>(client_pp().process
+             ).then_interruptible(
+               [obc=std::move(obc), &osd_ops, this] {
+                 return pg->do_osd_ops(
+                   std::move(obc),
+                   osd_ops,
+                   std::as_const(op_info),
+                   get_do_osd_ops_params()
+                 ).safe_then_unpack_interruptible(
+                   [](auto submitted, auto all_completed) {
+                     return all_completed.handle_error_interruptible(
+                       crimson::ct_error::eagain::handle([] {
+                           return seastar::now();
+                       }));
+                   }, crimson::ct_error::eagain::handle([] {
+                     return interruptor::now();
+                   })
+                 );
+               });
+           });
+       });
+    }).si_then([this] {
+      logger().debug("{}: complete", *this);
+      return handle.complete();
+    }).handle_error_interruptible(
+      PG::load_obc_ertr::all_same_way([] {
+       return seastar::now();
+      })
+    );
+}
+
 seastar::future<> InternalClientRequest::start()
 {
   track_event<StartEvent>();
@@ -57,72 +128,7 @@ seastar::future<> InternalClientRequest::start()
   DEBUGI("{}: in repeat", *this);
 
   return interruptor::with_interruption([this]() mutable {
-    return enter_stage<interruptor>(
-      client_pp().wait_for_active
-    ).then_interruptible([this] {
-      return with_blocking_event<PGActivationBlocker::BlockingEvent,
-       interruptor>([this] (auto&& trigger) {
-         return pg->wait_for_active_blocker.wait(std::move(trigger));
-       });
-    }).then_interruptible([this] {
-      return enter_stage<interruptor>(
-       client_pp().recover_missing);
-    }).then_interruptible([this] {
-      return do_recover_missing(pg, get_target_oid(), osd_reqid_t());
-    }).then_interruptible([this](bool unfound) {
-      if (unfound) {
-       throw std::system_error(
-         std::make_error_code(std::errc::operation_canceled),
-         fmt::format("{} is unfound, drop it!", get_target_oid()));
-      }
-      return enter_stage<interruptor>(
-       client_pp().get_obc);
-    }).then_interruptible([this] () -> PG::load_obc_iertr::future<> {
-       LOG_PREFIX(InternalClientRequest::start);
-       DEBUGI("{}: getting obc lock", *this);
-       return seastar::do_with(
-         create_osd_ops(),
-         [this](auto& osd_ops) mutable {
-           LOG_PREFIX(InternalClientRequest::start);
-           DEBUGI("InternalClientRequest: got {} OSDOps to execute",
-                  std::size(osd_ops));
-            [[maybe_unused]] const int ret = op_info.set_from_op(
-              std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap());
-            assert(ret == 0);
-            // call with_locked_obc() in order, but wait concurrently for loading.
-            enter_stage_sync(client_pp().lock_obc);
-            return pg->with_locked_obc(
-             get_target_oid(), op_info,
-             [&osd_ops, this](auto, auto obc) {
-               return enter_stage<interruptor>(client_pp().process
-               ).then_interruptible(
-                 [obc=std::move(obc), &osd_ops, this] {
-                   return pg->do_osd_ops(
-                     std::move(obc),
-                     osd_ops,
-                     std::as_const(op_info),
-                     get_do_osd_ops_params()
-                   ).safe_then_unpack_interruptible(
-                     [](auto submitted, auto all_completed) {
-                       return all_completed.handle_error_interruptible(
-                         crimson::ct_error::eagain::handle([] {
-                           return seastar::now();
-                         }));
-                     }, crimson::ct_error::eagain::handle([] {
-                       return interruptor::now();
-                     })
-                   );
-                 });
-             });
-          });
-      }).si_then([this] {
-       logger().debug("{}: complete", *this);
-       return handle.complete();
-      }).handle_error_interruptible(
-       PG::load_obc_ertr::all_same_way([] {
-         return seastar::now();
-       })
-      );
+    return with_interruption();
   }, [](std::exception_ptr eptr) {
     return seastar::now();
   }, pg, start_epoch).then([this] {
index f198e5846433801c875b4b8e37e9b3bd7c291e62..2f3585013344d271ae1b2323f5aae0dccf842570 100644 (file)
@@ -41,6 +41,8 @@ private:
 
   CommonPGPipeline& client_pp();
 
+  InternalClientRequest::interruptible_future<> with_interruption();
+
   seastar::future<> do_process();
 
   Ref<PG> pg;