]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/.../internal_client_request: convert with_interruption to coroutine
authorSamuel Just <sjust@redhat.com>
Wed, 11 Sep 2024 21:16:51 +0000 (21:16 +0000)
committerSamuel Just <sjust@redhat.com>
Tue, 15 Oct 2024 03:37:26 +0000 (20:37 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/osd_operations/internal_client_request.cc

index d4213928a3e462842039d288c35b739d10e08346..d0aa0822f8030cff59e4da4a58b163f1800705e0 100644 (file)
@@ -53,72 +53,71 @@ CommonPGPipeline& InternalClientRequest::client_pp()
 InternalClientRequest::interruptible_future<>
 InternalClientRequest::with_interruption()
 {
-  return enter_stage<interruptor>(
+  LOG_PREFIX(InternalClientRequest::with_interruption);
+  co_await enter_stage<interruptor>(
     client_pp().wait_for_active
-  ).then_interruptible([this] {
-    return with_blocking_event<PGActivationBlocker::BlockingEvent,
-      interruptor>([this] (auto&& trigger) {
-       return pg->wait_for_active_blocker.wait(std::move(trigger));
-      });
-  }).then_interruptible([this] {
-    return enter_stage<interruptor>(
-      client_pp().recover_missing);
-  }).then_interruptible([this] {
-    return do_recover_missing(pg, get_target_oid(), osd_reqid_t());
-  }).then_interruptible([this](bool unfound) {
-    if (unfound) {
-      throw std::system_error(
-       std::make_error_code(std::errc::operation_canceled),
-       fmt::format("{} is unfound, drop it!", get_target_oid()));
-    }
-    return enter_stage<interruptor>(
-      client_pp().get_obc);
-  }).then_interruptible([this] () -> PG::load_obc_iertr::future<> {
-      LOG_PREFIX(InternalClientRequest::with_interruption);
-      DEBUGI("{}: getting obc lock", *this);
-      return seastar::do_with(
-       create_osd_ops(),
-       [this](auto& osd_ops) mutable {
-         LOG_PREFIX(InternalClientRequest::with_interruption);
-         DEBUGI("InternalClientRequest: got {} OSDOps to execute",
-                std::size(osd_ops));
-         [[maybe_unused]] const int ret = op_info.set_from_op(
-           std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap());
-         assert(ret == 0);
-         // call with_locked_obc() in order, but wait concurrently for loading.
-         enter_stage_sync(client_pp().lock_obc);
-         return pg->with_locked_obc(
-           get_target_oid(), op_info,
-           [&osd_ops, this](auto, auto obc) {
-             return enter_stage<interruptor>(client_pp().process
-             ).then_interruptible(
-               [obc=std::move(obc), &osd_ops, this] {
-                 return pg->do_osd_ops(
-                   std::move(obc),
-                   osd_ops,
-                   std::as_const(op_info),
-                   get_do_osd_ops_params()
-                 ).safe_then_unpack_interruptible(
-                   [](auto submitted, auto all_completed) {
-                     return all_completed.handle_error_interruptible(
-                       crimson::ct_error::eagain::handle([] {
-                           return seastar::now();
-                       }));
-                   }, crimson::ct_error::eagain::handle([] {
-                     return interruptor::now();
-                   })
-                 );
-               });
-           });
+  );
+
+  co_await with_blocking_event<PGActivationBlocker::BlockingEvent,
+                              interruptor>([this] (auto&& trigger) {
+    return pg->wait_for_active_blocker.wait(std::move(trigger));
+  });
+
+  co_await enter_stage<interruptor>(client_pp().recover_missing);
+
+  bool unfound = co_await do_recover_missing(
+    pg, get_target_oid(), osd_reqid_t());
+
+  if (unfound) {
+    throw std::system_error(
+      std::make_error_code(std::errc::operation_canceled),
+      fmt::format("{} is unfound, drop it!", get_target_oid()));
+  }
+  co_await enter_stage<interruptor>(
+    client_pp().get_obc);
+
+  DEBUGI("{}: getting obc lock", *this);
+
+  auto osd_ops = create_osd_ops();
+
+  DEBUGI("InternalClientRequest: got {} OSDOps to execute",
+        std::size(osd_ops));
+  [[maybe_unused]] const int ret = op_info.set_from_op(
+    std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap());
+  assert(ret == 0);
+  // call with_locked_obc() in order, but wait concurrently for loading.
+  enter_stage_sync(client_pp().lock_obc);
+
+  auto fut = pg->with_locked_obc(
+    get_target_oid(), op_info,
+    [&osd_ops, this](auto, auto obc) {
+      return enter_stage<interruptor>(client_pp().process
+      ).then_interruptible(
+       [obc=std::move(obc), &osd_ops, this] {
+         return pg->do_osd_ops(
+           std::move(obc),
+           osd_ops,
+           std::as_const(op_info),
+           get_do_osd_ops_params()
+         ).safe_then_unpack_interruptible(
+           [](auto submitted, auto all_completed) {
+             return all_completed.handle_error_interruptible(
+               crimson::ct_error::eagain::handle([] {
+                 return seastar::now();
+               }));
+           }, crimson::ct_error::eagain::handle([] {
+             return interruptor::now();
+           })
+         );
        });
-    }).si_then([this] {
-      logger().debug("{}: complete", *this);
-      return handle.complete();
     }).handle_error_interruptible(
-      PG::load_obc_ertr::all_same_way([] {
-       return seastar::now();
-      })
+      crimson::ct_error::assert_all("unexpected error")
     );
+  co_await std::move(fut);
+
+  logger().debug("{}: complete", *this);
+  co_await interruptor::make_interruptible(handle.complete());
+  co_return;
 }
 
 seastar::future<> InternalClientRequest::start()