]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/osd_operations/peering_event: refactor to use OSD::start_pg_operation
authorSamuel Just <sjust@redhat.com>
Thu, 27 Jan 2022 22:55:11 +0000 (22:55 +0000)
committerSamuel Just <sjust@redhat.com>
Fri, 6 May 2022 03:45:17 +0000 (03:45 +0000)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/osd.cc
src/crimson/osd/osd_operations/compound_peering_request.cc
src/crimson/osd/osd_operations/peering_event.cc
src/crimson/osd/osd_operations/peering_event.h

index 17339eaddd8dea3dd95b15e7e8514a8499f9b984..c73920c2ce7daa87fd293729b065c2e1a1a4fdff 100644 (file)
@@ -1267,8 +1267,7 @@ seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
     pg_shard_t from_shard{static_cast<int>(m->get_source().num()),
                           pgid.shard};
     PeeringState::RequestScrub scrub_request{m->deep, m->repair};
-    return shard_services.start_operation<RemotePeeringEvent>(
-      *this,
+    return start_pg_operation<RemotePeeringEvent>(
       conn,
       shard_services,
       from_shard,
@@ -1381,8 +1380,7 @@ seastar::future<> OSD::handle_peering_op(
   const int from = m->get_source().num();
   logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
   std::unique_ptr<PGPeeringEvent> evt(m->get_event());
-  (void) shard_services.start_operation<RemotePeeringEvent>(
-    *this,
+  (void) start_pg_operation<RemotePeeringEvent>(
     conn,
     shard_services,
     pg_shard_t{from, m->get_spg().shard},
index 86da1a47006a7999e39fbcfe46f3b70568f01a0a..ec6487dfa21bb4af8c596c8d0fc776b2c701d94e 100644 (file)
@@ -52,7 +52,7 @@ public:
       ceph_assert(ctx.transaction.empty());
       return seastar::now();
     } else {
-      return osd.get_shard_services().dispatch_context_transaction(
+      return shard_services.dispatch_context_transaction(
        pg->get_collection_ref(), ctx);
     }
   }
@@ -83,9 +83,8 @@ std::vector<crimson::OperationRef> handle_pg_create(
         pgid, m->epoch,
         pi, history);
     } else {
-      auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
+      auto op = osd.start_pg_operation<PeeringSubEvent>(
          state,
-         osd,
          conn,
          osd.get_shard_services(),
          pg_shard_t(),
@@ -106,7 +105,8 @@ std::vector<crimson::OperationRef> handle_pg_create(
 namespace crimson::osd {
 
 CompoundPeeringRequest::CompoundPeeringRequest(
-  OSD &osd, crimson::net::ConnectionRef conn, Ref<Message> m)
+  OSD &osd,
+  crimson::net::ConnectionRef conn, Ref<Message> m)
   : osd(osd),
     conn(conn),
     m(m)
index 19cf992031c6c0d3becf354ecc01d2aa7d21b031..0b26833fd75c3f8088f670b05296d65d1affc427 100644 (file)
@@ -53,63 +53,50 @@ PGPeeringPipeline &PeeringEvent<T>::pp(PG &pg)
 }
 
 template <class T>
-seastar::future<> PeeringEvent<T>::start()
+seastar::future<> PeeringEvent<T>::with_pg(
+  ShardServices &shard_services, Ref<PG> pg)
 {
-  logger().debug("{}: start", *this);
-
-  typename T::IRef ref = static_cast<T*>(this);
-  auto maybe_delay = seastar::now();
-  if (delay) {
-    maybe_delay = seastar::sleep(
-      std::chrono::milliseconds(std::lround(delay * 1000)));
+  if (!pg) {
+    logger().warn("{}: pg absent, did not create", *this);
+    on_pg_absent();
+    handle.exit();
+    return complete_rctx_no_pg();
   }
-  return maybe_delay.then([this] {
-    return get_pg();
-  }).then([this](Ref<PG> pg) {
-    if (!pg) {
-      logger().warn("{}: pg absent, did not create", *this);
-      on_pg_absent();
-      handle.exit();
-      return complete_rctx_no_pg();
-    }
-    using interruptor = typename T::interruptor;
-    return interruptor::with_interruption([this, pg] {
-      logger().debug("{}: pg present", *this);
-      return this->template enter_stage<interruptor>(
-        pp(*pg).await_map
-      ).then_interruptible([this, pg] {
-        return this->template with_blocking_event<PG_OSDMapGate::OSDMapBlocker::BlockingEvent>(
-        [this, pg] (auto&& trigger) {
-          return pg->osdmap_gate.wait_for_map(std::move(trigger),
-                                              evt.get_epoch_sent());
+
+  using interruptor = typename T::interruptor;
+  return interruptor::with_interruption([this, pg, &shard_services] {
+    logger().debug("{}: pg present", *this);
+    return this->template enter_stage<interruptor>(pp(*pg).await_map
+    ).then_interruptible([this, pg] {
+      return this->template with_blocking_event<
+       PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+       >([this, pg](auto &&trigger) {
+         return pg->osdmap_gate.wait_for_map(
+           std::move(trigger), evt.get_epoch_sent());
        });
-      }).then_interruptible([this, pg](auto) {
-        return this->template enter_stage<interruptor>(pp(*pg).process);
-      }).then_interruptible([this, pg] {
-        // TODO: likely we should synchronize also with the pg log-based
-        // recovery.
-        return this->template enter_stage<interruptor>(BackfillRecovery::bp(*pg).process);
-      }).then_interruptible([this, pg] {
-        pg->do_peering_event(evt, ctx);
-        handle.exit();
-        return complete_rctx(pg);
-      }).then_interruptible([this, pg] () -> typename T::template interruptible_future<> {
+    }).then_interruptible([this, pg](auto) {
+      return this->template enter_stage<interruptor>(pp(*pg).process);
+    }).then_interruptible([this, pg] {
+      // TODO: likely we should synchronize also with the pg log-based
+      // recovery.
+      return this->template enter_stage<interruptor>(
+       BackfillRecovery::bp(*pg).process);
+    }).then_interruptible([this, pg] {
+      pg->do_peering_event(evt, ctx);
+      handle.exit();
+      return complete_rctx(pg);
+    }).then_interruptible([pg, &shard_services]()
+                         -> typename T::template interruptible_future<> {
         if (!pg->get_need_up_thru()) {
           return seastar::now();
         }
         return shard_services.send_alive(pg->get_same_interval_since());
-      }).then_interruptible([this] {
+      }).then_interruptible([&shard_services] {
         return shard_services.send_pg_temp();
       });
-    },
-    [this](std::exception_ptr ep) {
-      logger().debug("{}: interrupted with {}", *this, ep);
-      return seastar::now();
-    },
-    pg);
-  }).finally([ref=std::move(ref)] {
-    logger().debug("{}: complete", *ref);
-  });
+  }, [this](std::exception_ptr ep) {
+    logger().debug("{}: interrupted with {}", *this, ep);
+  }, pg);
 }
 
 template <class T>
@@ -128,16 +115,11 @@ PeeringEvent<T>::complete_rctx(Ref<PG> pg)
     std::move(ctx));
 }
 
-ConnectionPipeline &RemotePeeringEvent::cp()
+ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline()
 {
   return get_osd_priv(conn.get()).peering_request_conn_pipeline;
 }
 
-RemotePeeringEvent::OSDPipeline &RemotePeeringEvent::op()
-{
-  return osd.peering_request_osd_pipeline;
-}
-
 void RemotePeeringEvent::on_pg_absent()
 {
   if (auto& e = get_event().get_event();
@@ -166,54 +148,32 @@ RemotePeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(Ref
   if (pg) {
     return PeeringEvent::complete_rctx(pg);
   } else {
-    logger().debug("{}: OSDState is {}", *this, osd.state);
-    return osd.state.when_active().then([this] {
-      assert(osd.state.is_active());
-      return shard_services.dispatch_context_messages(std::move(ctx));
-    });
+    return shard_services.dispatch_context_messages(std::move(ctx));
   }
 }
 
 seastar::future<> RemotePeeringEvent::complete_rctx_no_pg()
 {
-  logger().debug("{}: OSDState is {}", *this, osd.state);
-  return osd.state.when_active().then([this] {
-    assert(osd.state.is_active());
-    return shard_services.dispatch_context_messages(std::move(ctx));
-  });
+  return shard_services.dispatch_context_messages(std::move(ctx));
 }
 
-seastar::future<Ref<PG>> RemotePeeringEvent::get_pg()
+seastar::future<> LocalPeeringEvent::start()
 {
-  return enter_stage<>(op().await_active).then([this] {
-    return osd.state.when_active();
-  }).then([this] {
-    return enter_stage<>(cp().await_map);
-  }).then([this] {
-    using OSDMapBlockingEvent =
-      OSD_OSDMapGate::OSDMapBlocker::BlockingEvent;
-    return with_blocking_event<OSDMapBlockingEvent>(
-      [this] (auto&& trigger) {
-      return osd.osdmap_gate.wait_for_map(std::move(trigger),
-                                         evt.get_epoch_sent());
-    });
-  }).then([this](auto epoch) {
-    logger().debug("{}: got map {}", *this, epoch);
-    return enter_stage<>(cp().get_pg);
-  }).then([this] {
-    return with_blocking_event<PGMap::PGCreationBlockingEvent>(
-      [this] (auto&& trigger) {
-      return osd.get_or_create_pg(std::move(trigger),
-                                 pgid,
-                                 evt.get_epoch_sent(),
-                                 std::move(evt.create_info));
-    });
+  logger().debug("{}: start", *this);
+
+  IRef ref = this;
+  auto maybe_delay = seastar::now();
+  if (delay) {
+    maybe_delay = seastar::sleep(
+      std::chrono::milliseconds(std::lround(delay * 1000)));
+  }
+  return maybe_delay.then([this] {
+    return with_pg(shard_services, pg);
+  }).finally([ref=std::move(ref)] {
+    logger().debug("{}: complete", *ref);
   });
 }
 
-seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() {
-  return seastar::make_ready_future<Ref<PG>>(pg);
-}
 
 LocalPeeringEvent::~LocalPeeringEvent() {}
 
index ac81345a76d0306030f5f2501ea0ff8aef8186ca..5c1b707c8b612f1f1a76d2cbeb99c80a1f7b4d8c 100644 (file)
@@ -71,7 +71,6 @@ protected:
   complete_rctx(Ref<PG>);
 
   virtual seastar::future<> complete_rctx_no_pg() { return seastar::now();}
-  virtual seastar::future<Ref<PG>> get_pg() = 0;
 
 public:
   template <typename... Args>
@@ -96,18 +95,17 @@ public:
 
   void print(std::ostream &) const final;
   void dump_detail(ceph::Formatter* f) const final;
-  seastar::future<> start();
+  seastar::future<> with_pg(
+    ShardServices &shard_services, Ref<PG> pg);
 };
 
 class RemotePeeringEvent : public PeeringEvent<RemotePeeringEvent> {
 protected:
-  OSD &osd;
   crimson::net::ConnectionRef conn;
 
   void on_pg_absent() final;
   PeeringEvent::interruptible_future<> complete_rctx(Ref<PG> pg) override;
   seastar::future<> complete_rctx_no_pg() override;
-  seastar::future<Ref<PG>> get_pg() final;
 
 public:
   class OSDPipeline {
@@ -119,9 +117,8 @@ public:
   };
 
   template <typename... Args>
-  RemotePeeringEvent(OSD &osd, crimson::net::ConnectionRef conn, Args&&... args) :
+  RemotePeeringEvent(crimson::net::ConnectionRef conn, Args&&... args) :
     PeeringEvent(std::forward<Args>(args)...),
-    osd(osd),
     conn(conn)
   {}
 
@@ -152,15 +149,19 @@ public:
 #endif
     CompletionEvent
   > tracking_events;
-private:
-  ConnectionPipeline &cp();
-  OSDPipeline &op();
+
+  static constexpr bool can_create() { return true; }
+  auto get_create_info() { return std::move(evt.create_info); }
+  spg_t get_pgid() const {
+    return pgid;
+  }
+  ConnectionPipeline &get_connection_pipeline();
+  PipelineHandle &get_handle() { return handle; }
+  epoch_t get_epoch() const { return evt.get_epoch_sent(); }
 };
 
 class LocalPeeringEvent final : public PeeringEvent<LocalPeeringEvent> {
 protected:
-  seastar::future<Ref<PG>> get_pg() final;
-
   Ref<PG> pg;
 
 public:
@@ -170,6 +171,7 @@ public:
     pg(pg)
   {}
 
+  seastar::future<> start();
   virtual ~LocalPeeringEvent();
 
   std::tuple<