]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/.../client_request: convert process_op to coroutine
authorSamuel Just <sjust@redhat.com>
Wed, 7 Feb 2024 01:19:37 +0000 (17:19 -0800)
committerMatan Breizman <mbreizma@redhat.com>
Thu, 16 May 2024 11:39:18 +0000 (14:39 +0300)
Signed-off-by: Samuel Just <sjust@redhat.com>
(cherry picked from commit a4905fb3e2f2400656f0b436275e9fa745f14cc1)

src/crimson/osd/osd_operations/client_request.cc

index 2f71cdacd3a87790a37478ed549900fe107d73b0..2514c08993ff53dc605293fd2e7c61a1821edf99 100644 (file)
@@ -231,74 +231,73 @@ ClientRequest::process_op(
   instance_handle_t &ihref, Ref<PG> pg, unsigned this_instance_id)
 {
   LOG_PREFIX(ClientRequest::process_op);
-  return ihref.enter_stage<interruptor>(
+  co_await ihref.enter_stage<interruptor>(
     client_pp(*pg).recover_missing, *this
-  ).then_interruptible([pg, this]() mutable {
-    return recover_missings(pg, m->get_hobj(), snaps_need_to_recover());
-  }).then_interruptible([FNAME, this, pg, this_instance_id, &ihref]() mutable {
-    DEBUGDPP("{}.{}: checking already_complete",
+  );
+  co_await recover_missings(pg, m->get_hobj(), snaps_need_to_recover());
+
+  DEBUGDPP("{}.{}: checking already_complete",
+          *pg, *this, this_instance_id);
+  auto completed = co_await pg->already_complete(m->get_reqid());
+
+  if (completed) {
+    DEBUGDPP("{}.{}: already completed, sending reply",
             *pg, *this, this_instance_id);
-    return pg->already_complete(m->get_reqid()).then_interruptible(
-      [FNAME, this, pg, this_instance_id, &ihref](auto completed) mutable
-      -> PG::load_obc_iertr::future<> {
-      if (completed) {
-       DEBUGDPP("{}.{}: already completed, sending reply",
-                *pg, *this, this_instance_id);
-        auto reply = crimson::make_message<MOSDOpReply>(
-          m.get(), completed->err, pg->get_osdmap_epoch(),
-          CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
-       reply->set_reply_versions(completed->version, completed->user_version);
-        // TODO: gate the crosscore sending
-        return get_foreign_connection().send_with_throttling(std::move(reply));
-      } else {
-       DEBUGDPP("{}.{}: not completed, entering get_obc stage",
-                *pg, *this, this_instance_id);
-        return ihref.enter_stage<interruptor>(client_pp(*pg).get_obc, *this
-       ).then_interruptible(
-          [FNAME, this, pg, this_instance_id, &ihref]() mutable
-         -> PG::load_obc_iertr::future<> {
-         DEBUGDPP("{}.{}: entered get_obc stage, about to wait_scrub",
+    auto reply = crimson::make_message<MOSDOpReply>(
+      m.get(), completed->err, pg->get_osdmap_epoch(),
+      CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
+    reply->set_reply_versions(completed->version, completed->user_version);
+    // TODO: gate the crosscore sending
+    co_await interruptor::make_interruptible(
+      get_foreign_connection().send_with_throttling(std::move(reply))
+    );
+    co_return;
+  }
+
+  DEBUGDPP("{}.{}: not completed, entering get_obc stage",
+          *pg, *this, this_instance_id);
+  co_await ihref.enter_stage<interruptor>(client_pp(*pg).get_obc, *this);
+
+  DEBUGDPP("{}.{}: entered get_obc stage, about to wait_scrub",
+          *pg, *this, this_instance_id);
+  if (int res = op_info.set_from_op(&*m, *pg->get_osdmap());
+      res != 0) {
+    co_await reply_op_error(pg, res);
+    co_return;
+  }
+  co_await ihref.enter_blocker(
+    *this, pg->scrubber, &decltype(pg->scrubber)::wait_scrub,
+    m->get_hobj());
+
+  DEBUGDPP("{}.{}: past scrub blocker, getting obc",
+          *pg, *this, this_instance_id);
+  co_await pg->with_locked_obc(
+    m->get_hobj(), op_info,
+    [FNAME, this, pg, this_instance_id, &ihref] (
+      auto head, auto obc
+    ) -> interruptible_future<> {
+      DEBUGDPP("{}.{}: got obc {}, entering process stage",
+              *pg, *this, this_instance_id, obc->obs);
+      return ihref.enter_stage<interruptor>(
+       client_pp(*pg).process, *this
+      ).then_interruptible(
+       [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable {
+         DEBUGDPP("{}.{}: in process stage, calling do_process",
                   *pg, *this, this_instance_id);
-          if (int res = op_info.set_from_op(&*m, *pg->get_osdmap());
-              res != 0) {
-           return reply_op_error(pg, res);
-          }
-         return ihref.enter_blocker(
-           *this,
-           pg->scrubber,
-           &decltype(pg->scrubber)::wait_scrub,
-           m->get_hobj()
-         ).then_interruptible(
-           [FNAME, this, pg, this_instance_id, &ihref]() mutable {
-             DEBUGDPP("{}.{}: past scrub blocker, getting obc",
-                      *pg, *this, this_instance_id);
-           return pg->with_locked_obc(
-             m->get_hobj(), op_info,
-             [FNAME, this, pg, this_instance_id, &ihref](
-               auto head, auto obc) mutable {
-               DEBUGDPP("{}.{}: got obc {}, entering process stage",
-                        *pg, *this, this_instance_id, obc->obs);
-               return ihref.enter_stage<interruptor>(
-                 client_pp(*pg).process, *this
-               ).then_interruptible(
-                 [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable {
-                   DEBUGDPP("{}.{}: in process stage, calling do_process",
-                            *pg, *this, this_instance_id);
-                 return do_process(ihref, pg, obc, this_instance_id);
-               });
-             });
-         });
-        });
-      }
-    });
-  }).handle_error_interruptible(
+         return do_process(ihref, pg, obc, this_instance_id);
+       });
+    }
+  ).handle_error_interruptible(
     PG::load_obc_ertr::all_same_way(
-      [FNAME, this, pg=std::move(pg), this_instance_id](const auto &code) {
-      DEBUGDPP("{}.{}: saw error code {}",
-              *pg, *this, this_instance_id, code);
-      assert(code.value() > 0);
-      return reply_op_error(pg, -code.value());
-  }));
+      [FNAME, this, pg=std::move(pg), this_instance_id](
+       const auto &code
+      ) -> interruptible_future<> {
+       DEBUGDPP("{}.{}: saw error code {}",
+                *pg, *this, this_instance_id, code);
+       assert(code.value() > 0);
+       return reply_op_error(pg, -code.value());
+      })
+  );
 }
 
 ClientRequest::interruptible_future<>