]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/.../client_request: convert ClientRequest::do_process to coroutine
authorSamuel Just <sjust@redhat.com>
Wed, 7 Feb 2024 02:05:42 +0000 (02:05 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Thu, 16 May 2024 11:39:18 +0000 (14:39 +0300)
Signed-off-by: Samuel Just <sjust@redhat.com>
(cherry picked from commit 30237472bea485cb57f5c14031fb545aed97da15)

src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h

index 2514c08993ff53dc605293fd2e7c61a1821edf99..22c32dd4e50aee7669b9fb2b8131421057074e45 100644 (file)
@@ -284,23 +284,30 @@ ClientRequest::process_op(
        [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable {
          DEBUGDPP("{}.{}: in process stage, calling do_process",
                   *pg, *this, this_instance_id);
-         return do_process(ihref, pg, obc, this_instance_id);
-       });
-    }
-  ).handle_error_interruptible(
-    PG::load_obc_ertr::all_same_way(
-      [FNAME, this, pg=std::move(pg), this_instance_id](
-       const auto &code
-      ) -> interruptible_future<> {
-       DEBUGDPP("{}.{}: saw error code {}",
-                *pg, *this, this_instance_id, code);
-       assert(code.value() > 0);
-       return reply_op_error(pg, -code.value());
-      })
-  );
+         return do_process(
+           ihref, pg, obc, this_instance_id
+         ).handle_error_interruptible(
+           crimson::ct_error::eagain::handle(
+             [this, pg, this_instance_id, &ihref]() mutable {
+               return process_op(ihref, pg, this_instance_id);
+             })
+         );
+       }
+      );
+    }).handle_error_interruptible(
+      PG::load_obc_ertr::all_same_way(
+       [FNAME, this, pg=std::move(pg), this_instance_id](
+         const auto &code
+       ) -> interruptible_future<> {
+         DEBUGDPP("{}.{}: saw error code {}",
+                  *pg, *this, this_instance_id, code);
+         assert(code.value() > 0);
+         return reply_op_error(pg, -code.value());
+       })
+    );
 }
 
-ClientRequest::interruptible_future<>
+ClientRequest::do_process_iertr::future<>
 ClientRequest::do_process(
   instance_handle_t &ihref,
   Ref<PG> pg, crimson::osd::ObjectContextRef obc,
@@ -308,7 +315,8 @@ ClientRequest::do_process(
 {
   LOG_PREFIX(ClientRequest::do_process);
   if (m->has_flag(CEPH_OSD_FLAG_PARALLELEXEC)) {
-    return reply_op_error(pg, -EINVAL);
+    co_await reply_op_error(pg, -EINVAL);
+    co_return;
   }
   const pg_pool_t pool = pg->get_pgpool().info;
   if (pool.has_flag(pg_pool_t::FLAG_EIO)) {
@@ -316,36 +324,44 @@ ClientRequest::do_process(
     if (m->has_flag(CEPH_OSD_FLAG_SUPPORTSPOOLEIO)) {
       DEBUGDPP("{}.{}: discarding op due to pool EIO flag",
               *pg, *this, this_instance_id);
-      return seastar::now();
+      co_return;
     } else {
       DEBUGDPP("{}.{}: replying EIO due to pool EIO flag",
               *pg, *this, this_instance_id);
-      return reply_op_error(pg, -EIO);
+      co_await reply_op_error(pg, -EIO);
+      co_return;
     }
   }
   if (m->get_oid().name.size()
     > crimson::common::local_conf()->osd_max_object_name_len) {
-    return reply_op_error(pg, -ENAMETOOLONG);
+    co_await reply_op_error(pg, -ENAMETOOLONG);
+    co_return;
   } else if (m->get_hobj().get_key().size()
     > crimson::common::local_conf()->osd_max_object_name_len) {
-    return reply_op_error(pg, -ENAMETOOLONG);
+    co_await reply_op_error(pg, -ENAMETOOLONG);
+    co_return;
   } else if (m->get_hobj().nspace.size()
     > crimson::common::local_conf()->osd_max_object_namespace_len) {
-    return reply_op_error(pg, -ENAMETOOLONG);
+    co_await reply_op_error(pg, -ENAMETOOLONG);
+    co_return;
   } else if (m->get_hobj().oid.name.empty()) {
-    return reply_op_error(pg, -EINVAL);
+    co_await reply_op_error(pg, -EINVAL);
+    co_return;
   } else if (m->get_hobj().is_internal_pg_local()) {
     // clients are not allowed to write to hobject_t::INTERNAL_PG_LOCAL_NS
-    return reply_op_error(pg, -EINVAL);
+    co_await reply_op_error(pg, -EINVAL);
+    co_return;
   } else if (pg->get_osdmap()->is_blocklisted(
         get_foreign_connection().get_peer_addr())) {
     DEBUGDPP("{}.{}: {} is blocklisted",
             *pg, *this, this_instance_id, get_foreign_connection().get_peer_addr());
-    return reply_op_error(pg, -EBLOCKLISTED);
+    co_await reply_op_error(pg, -EBLOCKLISTED);
+    co_return;
   }
 
   if (!obc->obs.exists && !op_info.may_write()) {
-    return reply_op_error(pg, -ENOENT);
+    co_await reply_op_error(pg, -ENOENT);
+    co_return;
   }
 
   SnapContext snapc = get_snapc(*pg,obc);
@@ -357,7 +373,8 @@ ClientRequest::do_process(
             *pg, *this, this_instance_id,
             snapc.seq, obc->ssc->snapset.seq,
             obc->obs.oi.soid);
-    return reply_op_error(pg, -EOLDSNAPC);
+    co_await reply_op_error(pg, -EOLDSNAPC);
+    co_return;
   }
 
   if (!pg->is_primary()) {
@@ -365,50 +382,35 @@ ClientRequest::do_process(
     if (is_misdirected(*pg)) {
       DEBUGDPP("{}.{}: dropping misdirected op",
               *pg, *this, this_instance_id);
-      return seastar::now();
+      co_return;
     } else if (const hobject_t& hoid = m->get_hobj();
                !pg->get_peering_state().can_serve_replica_read(hoid)) {
       DEBUGDPP("{}.{}: unstable write on replica, bouncing to primary",
               *pg, *this, this_instance_id);
-      return reply_op_error(pg, -EAGAIN);
+      co_await reply_op_error(pg, -EAGAIN);
+      co_return;
     } else {
       DEBUGDPP("{}.{}: serving replica read on oid {}",
               *pg, *this, this_instance_id, m->get_hobj());
     }
   }
-  return pg->do_osd_ops(
+
+  auto [submitted, all_completed] = co_await pg->do_osd_ops(
     m, r_conn, obc, op_info, snapc
-  ).safe_then_unpack_interruptible(
-    [FNAME, this, pg, this_instance_id, &ihref](
-      auto submitted, auto all_completed) mutable {
-      return submitted.then_interruptible(
-       [this, pg, &ihref] {
-       return ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
-      }).then_interruptible(
-       [FNAME, this, pg, this_instance_id,
-        all_completed=std::move(all_completed), &ihref]() mutable {
-         return all_completed.safe_then_interruptible(
-           [FNAME, this, pg, this_instance_id, &ihref](
-             MURef<MOSDOpReply> reply) {
-             return ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this
-             ).then_interruptible(
-               [FNAME, this, pg, this_instance_id,
-                reply=std::move(reply)]() mutable {
-                 DEBUGDPP("{}.{}: sending response",
-                          *pg, *this, this_instance_id);
-                 // TODO: gate the crosscore sending
-                 return get_foreign_connection(
-                      ).send_with_throttling(std::move(reply));
-               });
-           }, crimson::ct_error::eagain::handle(
-             [this, pg, this_instance_id, &ihref]() mutable {
-               return process_op(ihref, pg, this_instance_id);
-           }));
-       });
-    }, crimson::ct_error::eagain::handle(
-      [this, pg, this_instance_id, &ihref]() mutable {
-       return process_op(ihref, pg, this_instance_id);
-      }));
+  );
+  co_await std::move(submitted);
+
+  co_await ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
+
+  auto reply = co_await std::move(all_completed);
+
+  co_await ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this);
+  DEBUGDPP("{}.{}: sending response",
+          *pg, *this, this_instance_id);
+  // TODO: gate the crosscore sending
+  co_await interruptor::make_interruptible(
+    get_foreign_connection().send_with_throttling(std::move(reply))
+  );
 }
 
 bool ClientRequest::is_misdirected(const PG& pg) const
index 358a6bbe44e579c41189d502ec4df19cc07cf55d..eb27f91296499370086937a4af2bf40fc39ecf80 100644 (file)
@@ -267,7 +267,12 @@ private:
   interruptible_future<> with_sequencer(FuncT&& func);
   interruptible_future<> reply_op_error(const Ref<PG>& pg, int err);
 
-  interruptible_future<> do_process(
+
+  using do_process_iertr =
+    ::crimson::interruptible::interruptible_errorator<
+      ::crimson::osd::IOInterruptCondition,
+      ::crimson::errorator<crimson::ct_error::eagain>>;
+  do_process_iertr::future<> do_process(
     instance_handle_t &ihref,
     Ref<PG> pg,
     crimson::osd::ObjectContextRef obc,