]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd/osd_operations/client_request: refactor for OSD::start_pg_operation
authorSamuel Just <sjust@redhat.com>
Fri, 29 Apr 2022 22:37:57 +0000 (22:37 +0000)
committerSamuel Just <sjust@redhat.com>
Fri, 6 May 2022 03:45:17 +0000 (03:45 +0000)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/osd.cc
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h

index c73920c2ce7daa87fd293729b065c2e1a1a4fdff..6e1d1bf5191e2307dd72440002b1fa8e6c279a18 100644 (file)
@@ -1198,7 +1198,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
 seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
                                      Ref<MOSDOp> m)
 {
-  (void) shard_services.start_operation<ClientRequest>(
+  (void) start_pg_operation<ClientRequest>(
     *this,
     conn,
     std::move(m));
index b6ac8604046546472b6e7881405f4759930bd18c..bc0c3d943970f5f692aed0ebbb4eda1a2d63acd4 100644 (file)
@@ -49,6 +49,11 @@ void ClientRequest::dump_detail(Formatter *f) const
   }, tracking_events);
 }
 
+ConnectionPipeline &ClientRequest::get_connection_pipeline()
+{
+  return get_osd_priv(conn.get()).client_request_conn_pipeline;
+}
+
 ConnectionPipeline &ClientRequest::cp()
 {
   return get_osd_priv(conn.get()).client_request_conn_pipeline;
@@ -72,105 +77,98 @@ bool ClientRequest::is_pg_op() const
     [](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
 }
 
-template <typename FuncT>
-ClientRequest::interruptible_future<> ClientRequest::with_sequencer(FuncT&& func)
+seastar::future<seastar::stop_iteration> ClientRequest::with_pg_int(
+  ShardServices &shard_services, Ref<PG> pgref)
 {
-  return sequencer.start_op(*this, handle, osd.get_shard_services().registry, std::forward<FuncT>(func));
+  epoch_t same_interval_since = pgref->get_interval_start_epoch();
+  logger().debug("{} same_interval_since: {}", *this, same_interval_since);
+  if (m->finish_decode()) {
+    m->clear_payload();
+  }
+  return interruptor::with_interruption(
+    [this, &shard_services, pgref]() mutable {
+      return sequencer.start_op(
+       *this, handle, shard_services.registry,
+       interruptor::wrap_function([pgref, this, &shard_services] {
+         PG &pg = *pgref;
+         if (pg.can_discard_op(*m)) {
+           return osd.send_incremental_map(
+             conn, m->get_map_epoch()
+           ).then([this, &shard_services] {
+             sequencer.finish_op_out_of_order(*this, shard_services.registry);
+             return interruptor::now();
+           });
+         }
+         return enter_stage<interruptor>(pp(pg).await_map
+         ).then_interruptible([this, &pg] {
+           return with_blocking_event<
+             PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+             >([this, &pg] (auto&& trigger) {
+               return pg.osdmap_gate.wait_for_map(std::move(trigger),
+                                                  m->get_min_epoch());
+             });
+         }).then_interruptible([this, &pg](auto map) {
+           return enter_stage<interruptor>(pp(pg).wait_for_active);
+         }).then_interruptible([this, &pg]() {
+           return with_blocking_event<
+             PGActivationBlocker::BlockingEvent
+             >([&pg] (auto&& trigger) {
+                return pg.wait_for_active_blocker.wait(std::move(trigger));
+             });
+         }).then_interruptible(
+           [this, &shard_services, pgref=std::move(pgref)]() mutable {
+             if (is_pg_op()) {
+               return process_pg_op(
+                 pgref
+               ).then_interruptible([this, &shard_services] {
+                 sequencer.finish_op_out_of_order(*this, shard_services.registry);
+               });
+             } else {
+               return process_op(
+                 pgref
+               ).then_interruptible([this, &shard_services](const seq_mode_t mode) {
+                 if (mode == seq_mode_t::IN_ORDER) {
+                   sequencer.finish_op_in_order(*this);
+                 } else {
+                   assert(mode == seq_mode_t::OUT_OF_ORDER);
+                   sequencer.finish_op_out_of_order(*this, shard_services.registry);
+                 }
+               });
+             }
+           });
+       })).then_interruptible([] {
+         return seastar::stop_iteration::yes;
+       });
+    }, [this, pgref](std::exception_ptr eptr) {
+      if (should_abort_request(*this, std::move(eptr))) {
+       sequencer.abort();
+       return seastar::stop_iteration::yes;
+      } else {
+       sequencer.maybe_reset(*this);
+       return seastar::stop_iteration::no;
+      }
+    }, pgref);
 }
 
-seastar::future<> ClientRequest::start()
+seastar::future<> ClientRequest::with_pg(
+  ShardServices &shard_services, Ref<PG> pgref)
 {
-  logger().debug("{}: start", *this);
-
-  track_event<StartEvent>();
-  return seastar::repeat([this, opref=IRef{this}]() mutable {
-      logger().debug("{}: in repeat", *this);
-      return enter_stage<>(cp().await_map).then([this]() {
-       return with_blocking_event<OSD_OSDMapGate::OSDMapBlocker::BlockingEvent>(
-         [this](auto&& trigger) {
-         return osd.osdmap_gate.wait_for_map(std::move(trigger),
-                                             m->get_min_epoch());
-       });
-      }).then([this](epoch_t epoch) {
-       return enter_stage<>(cp().get_pg);
-      }).then([this] {
-       return with_blocking_event<PGMap::PGCreationBlockingEvent>(
-         [this] (auto&& trigger) {
-         return osd.wait_for_pg(std::move(trigger), m->get_spg());
-       });
-      }).then([this](Ref<PG> pgref) mutable {
-       return interruptor::with_interruption([this, pgref]() mutable {
-          epoch_t same_interval_since = pgref->get_interval_start_epoch();
-          logger().debug("{} same_interval_since: {}", *this, same_interval_since);
-          if (m->finish_decode()) {
-            m->clear_payload();
-          }
-          return with_sequencer(interruptor::wrap_function([pgref, this] {
-            PG &pg = *pgref;
-            if (pg.can_discard_op(*m)) {
-              return osd.send_incremental_map(
-                conn, m->get_map_epoch()).then([this] {
-                  sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
-                  return interruptor::now();
-              });
-            }
-           return enter_stage<interruptor>(
-             pp(pg).await_map
-           ).then_interruptible([this, &pg] {
-             return with_blocking_event<PG_OSDMapGate::OSDMapBlocker::BlockingEvent>(
-               [this, &pg] (auto&& trigger) {
-                  return pg.osdmap_gate.wait_for_map(std::move(trigger),
-                                                    m->get_min_epoch());
-               });
-            }).then_interruptible([this, &pg](auto&&) {
-              return enter_stage<>(pp(pg).wait_for_active);
-            }).then_interruptible([this, &pg]() {
-             return with_blocking_event<PGActivationBlocker::BlockingEvent>(
-               [&pg] (auto&& trigger) {
-                return pg.wait_for_active_blocker.wait(std::move(trigger));
-             });
-            }).then_interruptible([this, pgref=std::move(pgref)]() mutable {
-              if (is_pg_op()) {
-                return process_pg_op(pgref).then_interruptible([this] {
-                  sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
-                });
-              } else {
-                return process_op(pgref).then_interruptible([this] (const seq_mode_t mode) {
-                  if (mode == seq_mode_t::IN_ORDER) {
-                    sequencer.finish_op_in_order(*this);
-                  } else {
-                    assert(mode == seq_mode_t::OUT_OF_ORDER);
-                    sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
-                  }
-                });
-              }
-            });
-          })).then_interruptible([pgref] {
-            return seastar::stop_iteration::yes;
-          });
-       }, [this, pgref](std::exception_ptr eptr) {
-          if (should_abort_request(*this, std::move(eptr))) {
-            sequencer.abort();
-            return seastar::stop_iteration::yes;
-          } else {
-            sequencer.maybe_reset(*this);
-            return seastar::stop_iteration::no;
-          }
-       }, pgref);
-      });
-    }).then([this] {
-      track_event<CompletionEvent>();
-    });
+  return seastar::repeat([this, &shard_services, pgref]() mutable {
+    return with_pg_int(shard_services, pgref);
+  }).then([this] {
+    track_event<CompletionEvent>();
+  });
 }
 
 ClientRequest::interruptible_future<>
 ClientRequest::process_pg_op(
   Ref<PG> &pg)
 {
-  return pg->do_pg_ops(m)
-    .then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
-      return conn->send(std::move(reply));
-    });
+  return pg->do_pg_ops(
+    m
+  ).then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
+    return conn->send(std::move(reply));
+  });
 }
 
 ClientRequest::interruptible_future<ClientRequest::seq_mode_t>
index a081f46095529d8777598b774af3122f16f17e1c..21e09bc57cc9e7e272791987f4a04d74bf185345 100644 (file)
@@ -18,6 +18,7 @@
 namespace crimson::osd {
 class PG;
 class OSD;
+class ShardServices;
 
 class ClientRequest final : public PhasedOperationT<ClientRequest>,
                             private CommonClientRequest {
@@ -49,10 +50,21 @@ public:
   void print(std::ostream &) const final;
   void dump_detail(Formatter *f) const final;
 
+  static constexpr bool can_create() { return false; }
+  spg_t get_pgid() const {
+    return m->get_spg();
+  }
+  ConnectionPipeline &get_connection_pipeline();
+  PipelineHandle &get_handle() { return handle; }
+  epoch_t get_epoch() const { return m->get_min_epoch(); }
+
+  seastar::future<seastar::stop_iteration> with_pg_int(
+    ShardServices &shard_services, Ref<PG> pg);
 public:
-  seastar::future<> start();
   bool same_session_and_pg(const ClientRequest& other_op) const;
 
+  seastar::future<> with_pg(
+    ShardServices &shard_services, Ref<PG> pgref);
 private:
   template <typename FuncT>
   interruptible_future<> with_sequencer(FuncT&& func);