]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/.../client_request: convert with_pg_process_interruptible coroutine
authorSamuel Just <sjust@redhat.com>
Mon, 5 Feb 2024 20:46:19 +0000 (20:46 +0000)
committerSamuel Just <sjust@redhat.com>
Mon, 1 Apr 2024 23:11:32 +0000 (16:11 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/osd_operations/client_request.cc

index 9b9de50c149d3176512e77eeb8258c1178d9a226..a3c895d9654107e89265e741ec81641774bfbe52 100644 (file)
@@ -4,6 +4,7 @@
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
 
+#include "crimson/common/coroutine.h"
 #include "crimson/common/exception.h"
 #include "crimson/common/log.h"
 #include "crimson/osd/pg.h"
@@ -109,57 +110,52 @@ ClientRequest::interruptible_future<> ClientRequest::with_pg_process_interruptib
   DEBUGDPP("{} start", *pgref, *this);
   PG &pg = *pgref;
   if (pg.can_discard_op(*m)) {
-    return shard_services->send_incremental_map(
-      std::ref(get_foreign_connection()), m->get_map_epoch()
-    ).then([FNAME, this, this_instance_id, pgref] {
-      DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
-      pgref->client_request_orderer.remove_request(*this);
-      complete_request();
-      return interruptor::now();
-    });
+    co_await interruptor::make_interruptible(
+      shard_services->send_incremental_map(
+       std::ref(get_foreign_connection()), m->get_map_epoch()
+      ));
+    DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
+    pgref->client_request_orderer.remove_request(*this);
+    complete_request();
+    co_return;
   }
   DEBUGDPP("{}.{}: entering await_map stage",
           *pgref, *this, this_instance_id);
-  return ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this
-  ).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref] {
-    DEBUGDPP("{}.{}: entered await_map stage, waiting for map",
-            pg, *this, this_instance_id);
-    return ihref.enter_blocker(
+  co_await ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this);
+  DEBUGDPP("{}.{}: entered await_map stage, waiting for map",
+          pg, *this, this_instance_id);
+  auto map_epoch = co_await interruptor::make_interruptible(
+    ihref.enter_blocker(
       *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map,
-      m->get_min_epoch(), nullptr);
-  }).then_interruptible(
-    [FNAME, this, this_instance_id, &pg, &ihref](auto map_epoch) {
-    DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active",
-            pg, *this, this_instance_id, map_epoch);
-    return ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
-  }).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref]() {
-    DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active",
-            pg, *this, this_instance_id);
-    return ihref.enter_blocker(
+      m->get_min_epoch(), nullptr));
+
+  DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active",
+          pg, *this, this_instance_id, map_epoch);
+  co_await ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
+
+  DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active",
+          pg, *this, this_instance_id);
+  co_await interruptor::make_interruptible(
+    ihref.enter_blocker(
       *this,
       pg.wait_for_active_blocker,
-      &decltype(pg.wait_for_active_blocker)::wait);
-  }).then_interruptible(
-    [FNAME, this, pgref, this_instance_id, &ihref]() mutable
-    -> interruptible_future<> {
-    DEBUGDPP("{}.{}: pg active, entering process[_pg]_op",
-            *pgref, *this, this_instance_id);
-    if (is_pg_op()) {
-      return process_pg_op(pgref);
-    } else {
-      return process_op(ihref, pgref, this_instance_id);
-    }
-  }).then_interruptible([FNAME, this, this_instance_id, pgref, &ihref] {
-    DEBUGDPP("{}.{}: process[_pg]_op complete, completing handle",
-            *pgref, *this, this_instance_id);
-    return ihref.handle.complete();
-  }).then_interruptible([FNAME, this, this_instance_id, pgref] {
-    DEBUGDPP("{}.{}: process[_pg]_op complete,"
-            "removing request from orderer",
-            *pgref, *this, this_instance_id);
-    pgref->client_request_orderer.remove_request(*this);
-    complete_request();
-  });
+      &decltype(pg.wait_for_active_blocker)::wait));
+
+  DEBUGDPP("{}.{}: pg active, entering process[_pg]_op",
+          *pgref, *this, this_instance_id);
+
+  co_await (is_pg_op() ? process_pg_op(pgref) :
+           process_op(ihref, pgref, this_instance_id));
+
+  DEBUGDPP("{}.{}: process[_pg]_op complete, completing handle",
+          *pgref, *this, this_instance_id);
+  co_await interruptor::make_interruptible(ihref.handle.complete());
+
+  DEBUGDPP("{}.{}: process[_pg]_op complete,"
+          "removing request from orderer",
+          *pgref, *this, this_instance_id);
+  pgref->client_request_orderer.remove_request(*this);
+  complete_request();
 }
 
 seastar::future<> ClientRequest::with_pg_process(