]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/pg: convert PG::handle_rep_op to coroutine
authorSamuel Just <sjust@redhat.com>
Wed, 24 Jul 2024 04:56:39 +0000 (21:56 -0700)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 4 Sep 2024 09:48:34 +0000 (09:48 +0000)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/pg.cc

index 21b1e5db66a57237b270a8a16feb3beb08c198d3..291385de64991f4f159e875fe8a8b78a0f8e09eb 100644 (file)
@@ -27,6 +27,7 @@
 
 #include "os/Transaction.h"
 
+#include "crimson/common/coroutine.h"
 #include "crimson/common/exception.h"
 #include "crimson/common/log.h"
 #include "crimson/net/Connection.h"
@@ -1442,7 +1443,7 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
   LOG_PREFIX(PG::handle_rep_op);
   DEBUGDPP("{}", *this, *req);
   if (can_discard_replica_op(*req)) {
-    return seastar::now();
+    co_return;
   }
 
   ceph::os::Transaction txn;
@@ -1460,17 +1461,22 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
                 txn,
                 false);
   DEBUGDPP("{} do_transaction", *this, *req);
-  return interruptor::make_interruptible(shard_services.get_store().do_transaction(
-       coll_ref, std::move(txn))).then_interruptible(
-      [req, lcod=peering_state.get_info().last_complete, this] {
-      peering_state.update_last_complete_ondisk(lcod);
-      const auto map_epoch = get_osdmap_epoch();
-      auto reply = crimson::make_message<MOSDRepOpReply>(
-        req.get(), pg_whoami, 0,
-       map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
-      reply->set_last_complete_ondisk(lcod);
-      return shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch);
-    });
+  co_await interruptor::make_interruptible(
+    shard_services.get_store().do_transaction(coll_ref, std::move(txn))
+  );
+
+  const auto &lcod = peering_state.get_info().last_complete;
+  peering_state.update_last_complete_ondisk(lcod);
+  const auto map_epoch = get_osdmap_epoch();
+  auto reply = crimson::make_message<MOSDRepOpReply>(
+    req.get(), pg_whoami, 0,
+    map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
+  reply->set_last_complete_ondisk(lcod);
+  co_await interruptor::make_interruptible(
+    shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch)
+  );
+  co_return;
+}
 }
 
 void PG::log_operation(