]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: convert entire ClientRequest to the new infra
authorRadosław Zarzyński <rzarzyns@redhat.com>
Tue, 12 Apr 2022 08:37:16 +0000 (10:37 +0200)
committerRadosław Zarzyński <rzarzyns@redhat.com>
Thu, 5 May 2022 02:06:31 +0000 (04:06 +0200)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/crimson/osd/osd_operation_external_tracking.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h

index eaccd501a09ce8732ccf4e477181103812a895a5..3eedead0468242d0e7ccba53ea1816d17612eca8 100644 (file)
@@ -9,6 +9,8 @@
 #include "crimson/osd/osd_operations/peering_event.h"
 #include "crimson/osd/osd_operations/recovery_subrequest.h"
 #include "crimson/osd/osd_operations/replicated_request.h"
+#include "crimson/osd/pg_activation_blocker.h"
+#include "crimson/osd/pg_map.h"
 
 namespace crimson::osd {
 
@@ -17,6 +19,17 @@ struct LttngBackend
   : ClientRequest::StartEvent::Backend,
     ClientRequest::ConnectionPipeline::AwaitMap::BlockingEvent::Backend,
     OSD_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
+    ClientRequest::ConnectionPipeline::GetPG::BlockingEvent::Backend,
+    PGMap::PGCreationBlockingEvent::Backend,
+    ClientRequest::PGPipeline::AwaitMap::BlockingEvent::Backend,
+    PG_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
+    ClientRequest::PGPipeline::WaitForActive::BlockingEvent::Backend,
+    PGActivationBlocker::BlockingEvent::Backend,
+    ClientRequest::PGPipeline::RecoverMissing::BlockingEvent::Backend,
+    ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend,
+    ClientRequest::PGPipeline::Process::BlockingEvent::Backend,
+    ClientRequest::PGPipeline::WaitRepop::BlockingEvent::Backend,
+    ClientRequest::PGPipeline::SendReply::BlockingEvent::Backend,
     ClientRequest::CompletionEvent::Backend
 {
   void handle(ClientRequest::StartEvent&,
@@ -32,6 +45,61 @@ struct LttngBackend
               const OSD_OSDMapGate::OSDMapBlocker&) override {
   }
 
+  void handle(ClientRequest::ConnectionPipeline::GetPG::BlockingEvent& ev,
+              const Operation& op,
+              const ClientRequest::ConnectionPipeline::GetPG& blocker) override {
+  }
+
+  void handle(PGMap::PGCreationBlockingEvent&,
+              const Operation&,
+              const PGMap::PGCreationBlocker&) override {
+  }
+
+  void handle(ClientRequest::PGPipeline::AwaitMap::BlockingEvent& ev,
+              const Operation& op,
+              const ClientRequest::PGPipeline::AwaitMap& blocker) override {
+  }
+
+  void handle(PG_OSDMapGate::OSDMapBlocker::BlockingEvent&,
+              const Operation&,
+              const PG_OSDMapGate::OSDMapBlocker&) override {
+  }
+
+  void handle(ClientRequest::PGPipeline::WaitForActive::BlockingEvent& ev,
+              const Operation& op,
+              const ClientRequest::PGPipeline::WaitForActive& blocker) override {
+  }
+
+  void handle(PGActivationBlocker::BlockingEvent& ev,
+              const Operation& op,
+              const PGActivationBlocker& blocker) override {
+  }
+
+  void handle(ClientRequest::PGPipeline::RecoverMissing::BlockingEvent& ev,
+              const Operation& op,
+              const ClientRequest::PGPipeline::RecoverMissing& blocker) override {
+  }
+
+  void handle(ClientRequest::PGPipeline::GetOBC::BlockingEvent& ev,
+              const Operation& op,
+              const ClientRequest::PGPipeline::GetOBC& blocker) override {
+  }
+
+  void handle(ClientRequest::PGPipeline::Process::BlockingEvent& ev,
+              const Operation& op,
+              const ClientRequest::PGPipeline::Process& blocker) override {
+  }
+
+  void handle(ClientRequest::PGPipeline::WaitRepop::BlockingEvent& ev,
+              const Operation& op,
+              const ClientRequest::PGPipeline::WaitRepop& blocker) override {
+  }
+
+  void handle(ClientRequest::PGPipeline::SendReply::BlockingEvent& ev,
+              const Operation& op,
+              const ClientRequest::PGPipeline::SendReply& blocker) override {
+  }
+
   void handle(ClientRequest::CompletionEvent&,
               const Operation&) override {}
 };
index bca3952c772d44afcc66d840558a1cd9de0b4566..649293460f3b8e4ea00b7293bf25dfedc6f1f342 100644 (file)
@@ -92,9 +92,12 @@ seastar::future<> ClientRequest::start()
                                              m->get_min_epoch());
        });
       }).then([this](epoch_t epoch) {
-       return with_blocking_future(handle.enter(cp().get_pg));
+       return enter_stage<>(cp().get_pg);
       }).then([this] {
-       return with_blocking_future(osd.wait_for_pg(m->get_spg()));
+       return with_blocking_event<PGMap::PGCreationBlockingEvent>(
+         [this] (auto&& trigger) {
+         return osd.wait_for_pg(std::move(trigger), m->get_spg());
+       });
       }).then([this](Ref<PG> pgref) mutable {
        return interruptor::with_interruption([this, pgref]() mutable {
           epoch_t same_interval_since = pgref->get_interval_start_epoch();
@@ -111,17 +114,21 @@ seastar::future<> ClientRequest::start()
                   return interruptor::now();
               });
             }
-            return with_blocking_future_interruptible<interruptor::condition>(
-              handle.enter(pp(pg).await_map)
-            ).then_interruptible([this, &pg] {
-              return with_blocking_future_interruptible<interruptor::condition>(
-                pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
-            }).then_interruptible([this, &pg](auto map) {
-              return with_blocking_future_interruptible<interruptor::condition>(
-                handle.enter(pp(pg).wait_for_active));
+           return enter_stage<interruptor>(
+             pp(pg).await_map
+           ).then_interruptible([this, &pg] {
+             return with_blocking_event<PG_OSDMapGate::OSDMapBlocker::BlockingEvent>(
+               [this, &pg] (auto&& trigger) {
+                  return pg.osdmap_gate.wait_for_map(std::move(trigger),
+                                                    m->get_min_epoch());
+               });
+            }).then_interruptible([this, &pg](auto&&) {
+              return enter_stage<>(pp(pg).wait_for_active);
             }).then_interruptible([this, &pg]() {
-              return with_blocking_future_interruptible<interruptor::condition>(
-                pg.wait_for_active_blocker.wait());
+             return with_blocking_event<PGActivationBlocker::BlockingEvent>(
+               [&pg] (auto&& trigger) {
+                return pg.wait_for_active_blocker.wait(std::move(trigger));
+             });
             }).then_interruptible([this, pgref=std::move(pgref)]() mutable {
               if (is_pg_op()) {
                 return process_pg_op(pgref).then_interruptible([this] {
@@ -169,9 +176,9 @@ ClientRequest::process_pg_op(
 ClientRequest::interruptible_future<ClientRequest::seq_mode_t>
 ClientRequest::process_op(Ref<PG> &pg)
 {
-  return with_blocking_future_interruptible<interruptor::condition>(
-      handle.enter(pp(*pg).recover_missing))
-  .then_interruptible(
+  return enter_stage<interruptor>(
+    pp(*pg).recover_missing
+  ).then_interruptible(
     [this, pg]() mutable {
     return do_recover_missing(pg, m->get_hobj());
   }).then_interruptible([this, pg]() mutable {
@@ -186,8 +193,7 @@ ClientRequest::process_op(Ref<PG> &pg)
           return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
         });
       } else {
-        return with_blocking_future_interruptible<interruptor::condition>(
-            handle.enter(pp(*pg).get_obc)).then_interruptible(
+        return enter_stage<interruptor>(pp(*pg).get_obc).then_interruptible(
           [this, pg]() mutable -> PG::load_obc_iertr::future<seq_mode_t> {
           logger().debug("{}: got obc lock", *this);
           op_info.set_from_op(&*m, *pg->get_osdmap());
@@ -196,9 +202,8 @@ ClientRequest::process_op(Ref<PG> &pg)
           return seastar::do_with(seq_mode_t{}, [this, &pg] (seq_mode_t& mode) {
             return pg->with_locked_obc(m->get_hobj(), op_info,
                                        [this, pg, &mode](auto obc) mutable {
-              return with_blocking_future_interruptible<interruptor::condition>(
-                handle.enter(pp(*pg).process)
-              ).then_interruptible([this, pg, obc, &mode]() mutable {
+              return enter_stage<interruptor>(pp(*pg).process).then_interruptible(
+              [this, pg, obc, &mode]() mutable {
                 return do_process(pg, obc).then_interruptible([&mode] (seq_mode_t _mode) {
                   mode = _mode;
                   return seastar::now();
@@ -264,21 +269,18 @@ ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
 
   return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible(
     [this, pg](auto submitted, auto all_completed) mutable {
-    return submitted.then_interruptible(
-      [this, pg] {
-        return with_blocking_future_interruptible<interruptor::condition>(
-            handle.enter(pp(*pg).wait_repop));
+    return submitted.then_interruptible([this, pg] {
+      return enter_stage<interruptor>(pp(*pg).wait_repop);
     }).then_interruptible(
       [this, pg, all_completed=std::move(all_completed)]() mutable {
       return all_completed.safe_then_interruptible(
         [this, pg](MURef<MOSDOpReply> reply) {
-        return with_blocking_future_interruptible<interruptor::condition>(
-            handle.enter(pp(*pg).send_reply)).then_interruptible(
-              [this, reply=std::move(reply)]() mutable{
-              return conn->send(std::move(reply)).then([] {
-                return seastar::make_ready_future<seq_mode_t>(seq_mode_t::IN_ORDER);
-              });
-            });
+        return enter_stage<interruptor>(pp(*pg).send_reply).then_interruptible(
+          [this, reply=std::move(reply)]() mutable {
+          return conn->send(std::move(reply)).then([] {
+            return seastar::make_ready_future<seq_mode_t>(seq_mode_t::IN_ORDER);
+          });
+        });
       }, crimson::ct_error::eagain::handle([this, pg]() mutable {
         return process_op(pg);
       }));
index f0fbf4f88aa77b02ad7744283d78dc8f39df61b5..024b514db65bae5d9cb90d60c054abed1eb59eef 100644 (file)
@@ -10,6 +10,8 @@
 #include "crimson/osd/osd_operation.h"
 #include "crimson/osd/osd_operations/client_request_common.h"
 #include "crimson/osd/osd_operations/common/pg_pipeline.h"
+#include "crimson/osd/pg_activation_blocker.h"
+#include "crimson/osd/pg_map.h"
 #include "crimson/common/type_helpers.h"
 #include "messages/MOSDOp.h"
 
@@ -109,6 +111,17 @@ public:
     StartEvent,
     ConnectionPipeline::AwaitMap::BlockingEvent,
     OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
+    ConnectionPipeline::GetPG::BlockingEvent,
+    PGMap::PGCreationBlockingEvent,
+    PGPipeline::AwaitMap::BlockingEvent,
+    PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
+    PGPipeline::WaitForActive::BlockingEvent,
+    PGActivationBlocker::BlockingEvent,
+    PGPipeline::RecoverMissing::BlockingEvent,
+    PGPipeline::GetOBC::BlockingEvent,
+    PGPipeline::Process::BlockingEvent,
+    PGPipeline::WaitRepop::BlockingEvent,
+    PGPipeline::SendReply::BlockingEvent,
     CompletionEvent
   > tracking_events;