]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: execute PGListener async operations in order 58840/head
authorSamuel Just <sjust@redhat.com>
Tue, 2 Jul 2024 22:27:55 +0000 (15:27 -0700)
committerMatan Breizman <mbreizma@redhat.com>
Thu, 25 Jul 2024 07:54:19 +0000 (10:54 +0300)
- Adds ShardServices::singleton_orderer_t mechanism to ensure that
  OSDSingleton calls are completed in order.
- Updates ShardServices accessors invoked from PeeringListener handlers
  to use orderer.
- Updates PGListener handlers and complete_rctx to use orderer.

Fixes: https://tracker.ceph.com/issues/66316
Signed-off-by: Samuel Just <sjust@redhat.com>
(cherry picked from commit e12e92c50fdd0288a7bfcf43c60fba5b938914a1)

src/crimson/osd/pg.h
src/crimson/osd/shard_services.h

index d65535c6d084479622d2149dfacc4b67cae86a4a..3d588466e0b4194bf481a5059fe018f3f019a84a 100644 (file)
@@ -166,17 +166,37 @@ public:
     return std::size(snap_trimq);
   }
 
+  /**
+   * complete_rctx
+   *
+   * complete_rctx is responsible for submitting writes and messages
+   * resulting from processing a PeeringState event as well as resolving
+   * any asyncronous actions initiated by the PeeringState::Listener
+   * callbacks below.  The caller is responsible for calling complete_rctx
+   * and waiting for the future to resolve before exiting the
+   * PGPeeringPipeline::process stage (see osd_operations/peering_event.h).
+   *
+   * orderer below ensures that operations submitted on the OSD-wide
+   * OSDSingleton instance are completed in the order initiated.  This is
+   * specifically important for operations on the local and remote async
+   * reserver instances, as well as setting and clearing pg_temp mapping
+   * requests.
+   */
+  ShardServices::singleton_orderer_t orderer;
   seastar::future<> complete_rctx(PeeringCtx &&rctx) {
-    return seastar::when_all_succeed(
-      get_need_up_thru()
-      ? shard_services.send_alive(
-       get_same_interval_since())
-      : seastar::now(),
+    shard_services.send_pg_temp(orderer);
+    if (get_need_up_thru()) {
+      shard_services.send_alive(orderer, get_same_interval_since());
+    }
+
+    ShardServices::singleton_orderer_t o;
+    std::swap(o, orderer);
+    return seastar::when_all(
       shard_services.dispatch_context(
        get_collection_ref(),
        std::move(rctx)),
-      shard_services.send_pg_temp()
-    ).then([](auto){});
+      shard_services.run_orderer(std::move(o))
+    ).then([](auto) {});
   }
 
   void send_cluster_message(
@@ -186,13 +206,21 @@ public:
     SUBDEBUGDPP(
       osd, "message {} to {} share_map_update {}",
       *this, *m, osd, share_map_update);
-    (void)shard_services.send_to_osd(osd, std::move(m), epoch);
+    /* We don't bother to queue this one in the orderer because capturing the
+     * message ref in std::function is problematic as it isn't copyable.  This
+     * is solvable, but it's not quite worth the effort at the moment as we
+     * aren't worried about ordering of message send events except between
+     * messages to the same target within an interval, which doesn't really
+     * happen while processing a single event.  It'll probably be worth
+     * generalizing the orderer structure to fix this in the future, probably
+     * by using std::move_only_function once widely available. */
+    std::ignore = shard_services.send_to_osd(osd, std::move(m), epoch);
   }
 
   void send_pg_created(pg_t pgid) final {
     LOG_PREFIX(PG::send_pg_created);
     SUBDEBUGDPP(osd, "pgid {}", *this, pgid);
-    (void)shard_services.send_pg_created(pgid);
+    shard_services.send_pg_created(orderer, pgid);
   }
 
   bool try_flush_or_schedule_async() final;
@@ -237,9 +265,8 @@ public:
     SUBDEBUGDPP(
       osd, "priority {} on_grant {} on_preempt {}",
       *this, on_grant->get_desc(), on_preempt->get_desc());
-    // TODO -- we probably want to add a mechanism for blocking on this
-    // after handling the peering event
-    std::ignore = shard_services.local_request_reservation(
+    shard_services.local_request_reservation(
+      orderer,
       pgid,
       on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
        start_peering_event_operation(std::move(*on_grant));
@@ -247,17 +274,17 @@ public:
       priority,
       on_preempt ? make_lambda_context(
        [this, on_preempt=std::move(on_preempt)] (int) {
-       start_peering_event_operation(std::move(*on_preempt));
-      }) : nullptr);
+         start_peering_event_operation(std::move(*on_preempt));
+       }) : nullptr
+    );
   }
 
   void update_local_background_io_priority(
     unsigned priority) final {
     LOG_PREFIX(PG::update_local_background_io_priority);
     SUBDEBUGDPP(osd, "priority {}", *this, priority);
-    // TODO -- we probably want to add a mechanism for blocking on this
-    // after handling the peering event
-    std::ignore = shard_services.local_update_priority(
+    shard_services.local_update_priority(
+      orderer,
       pgid,
       priority);
   }
@@ -265,9 +292,8 @@ public:
   void cancel_local_background_io_reservation() final {
     LOG_PREFIX(PG::cancel_local_background_io_reservation);
     SUBDEBUGDPP(osd, "", *this);
-    // TODO -- we probably want to add a mechanism for blocking on this
-    // after handling the peering event
-    std::ignore = shard_services.local_cancel_reservation(
+    shard_services.local_cancel_reservation(
+      orderer,
       pgid);
   }
 
@@ -279,9 +305,8 @@ public:
     SUBDEBUGDPP(
       osd, "priority {} on_grant {} on_preempt {}",
       *this, on_grant->get_desc(), on_preempt->get_desc());
-    // TODO -- we probably want to add a mechanism for blocking on this
-    // after handling the peering event
-    std::ignore = shard_services.remote_request_reservation(
+    shard_services.remote_request_reservation(
+      orderer,
       pgid,
       on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
        start_peering_event_operation(std::move(*on_grant));
@@ -289,17 +314,15 @@ public:
       priority,
       on_preempt ? make_lambda_context(
        [this, on_preempt=std::move(on_preempt)] (int) {
-       start_peering_event_operation(std::move(*on_preempt));
-      }) : nullptr);
+         start_peering_event_operation(std::move(*on_preempt));
+      }) : nullptr
+    );
   }
 
   void cancel_remote_recovery_reservation() final {
     LOG_PREFIX(PG::cancel_remote_recovery_reservation);
     SUBDEBUGDPP(osd, "", *this);
-    // TODO -- we probably want to add a mechanism for blocking on this
-    // after handling the peering event
-    std::ignore =  shard_services.remote_cancel_reservation(
-      pgid);
+    shard_services.remote_cancel_reservation(orderer, pgid);
   }
 
   void schedule_event_on_commit(
@@ -326,16 +349,12 @@ public:
   void queue_want_pg_temp(const std::vector<int> &wanted) final {
     LOG_PREFIX(PG::queue_want_pg_temp);
     SUBDEBUGDPP(osd, "wanted {}", *this, wanted);
-    // TODO -- we probably want to add a mechanism for blocking on this
-    // after handling the peering event
-    std::ignore = shard_services.queue_want_pg_temp(pgid.pgid, wanted);
+    shard_services.queue_want_pg_temp(orderer, pgid.pgid, wanted);
   }
   void clear_want_pg_temp() final {
     LOG_PREFIX(PG::clear_want_pg_temp);
     SUBDEBUGDPP(osd, "", *this);
-    // TODO -- we probably want to add a mechanism for blocking on this
-    // after handling the peering event
-    std::ignore = shard_services.remove_want_pg_temp(pgid.pgid);
+    shard_services.remove_want_pg_temp(orderer, pgid.pgid);
   }
   void check_recovery_sources(const OSDMapRef& newmap) final {
     // Not needed yet
index 7163852a7adc32208416cc617f336178cacef4c3..fb86418aba2a643f61252fe452980c5a8484700f 100644 (file)
@@ -359,6 +359,60 @@ class ShardServices : public OSDMapService {
     );
   }
 
+public:
+  /**
+   * singleton_orderer_t
+   *
+   * schedule_for_singleton/run_orderer allows users to queue a sequence
+   * of operations on the OSDSingleton instance and run them as an ordered
+   * batch.  The user may rely on operations being completed in the order
+   * submitted.
+   *
+   * Generally, users will declare a singleton_orderer_t instance, pass it
+   * by reference to ShardServices methods implemented with
+   * schedule_for_singleton or the QUEUE_FOR_OSD_SINGLETON_* macros,
+   * and finally call run_orderer to submit the batch.
+   */
+  struct singleton_orderer_t {
+    using remote_func_t = std::function<seastar::future<>(OSDSingletonState&)>;
+    std::vector<remote_func_t> queue;
+
+    singleton_orderer_t() = default;
+    singleton_orderer_t(singleton_orderer_t &&) = default;
+    singleton_orderer_t &operator=(singleton_orderer_t &&) = default;
+
+    singleton_orderer_t(const singleton_orderer_t &) = delete;
+    singleton_orderer_t &operator=(const singleton_orderer_t &) = delete;
+  };
+
+  seastar::future<> run_orderer(singleton_orderer_t &&orderer) {
+    return with_singleton([](auto &singleton, auto &&orderer) {
+      return seastar::do_with(
+       std::move(orderer),
+       [&singleton](auto &orderer) {
+         return seastar::do_for_each(
+           orderer.queue,
+           [&singleton](auto &func) {
+             return std::invoke(func, singleton);
+           });
+       });
+    }, std::move(orderer));
+  }
+
+private:
+  template <typename F, typename... Args>
+  void schedule_for_singleton(
+    singleton_orderer_t &orderer, F &&f, Args&&... args) {
+    orderer.queue.push_back(
+      [f=std::forward<F>(f),
+       args=std::make_tuple(
+        std::forward<Args>(args)...)](OSDSingletonState &state) -> seastar::future<> {
+       return seastar::futurize_apply<>(
+         std::move(f),
+         std::tuple_cat(std::make_tuple(std::ref(state)), std::move(args)));
+      });
+  }
+
 #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET)          \
   template <typename... Args>                                  \
   auto FROM_METHOD(Args&&... args) const {                     \
@@ -387,6 +441,19 @@ class ShardServices : public OSDMapService {
 #define FORWARD_TO_OSD_SINGLETON(METHOD) \
   FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, METHOD)
 
+#define QUEUE_FOR_OSD_SINGLETON_TARGET(METHOD, TARGET)                 \
+  template <typename... Args>                                          \
+  auto METHOD(singleton_orderer_t &orderer, Args&&... args) {          \
+    return schedule_for_singleton(                                     \
+      orderer,                                                         \
+      [](auto &local_state, auto&&... args) {                          \
+        return local_state.TARGET(                                     \
+         std::forward<decltype(args)>(args)...);                       \
+      }, std::forward<Args>(args)...);                                 \
+  }
+#define QUEUE_FOR_OSD_SINGLETON(METHOD) \
+  QUEUE_FOR_OSD_SINGLETON_TARGET(METHOD, METHOD)
+
 public:
   template <typename... PSSArgs>
   ShardServices(
@@ -527,12 +594,12 @@ public:
   FORWARD_TO_OSD_SINGLETON(send_incremental_map_to_osd)
 
   FORWARD_TO_OSD_SINGLETON(osdmap_subscribe)
-  FORWARD_TO_OSD_SINGLETON(queue_want_pg_temp)
-  FORWARD_TO_OSD_SINGLETON(remove_want_pg_temp)
+  QUEUE_FOR_OSD_SINGLETON(queue_want_pg_temp)
+  QUEUE_FOR_OSD_SINGLETON(remove_want_pg_temp)
   FORWARD_TO_OSD_SINGLETON(requeue_pg_temp)
-  FORWARD_TO_OSD_SINGLETON(send_pg_created)
-  FORWARD_TO_OSD_SINGLETON(send_alive)
-  FORWARD_TO_OSD_SINGLETON(send_pg_temp)
+  QUEUE_FOR_OSD_SINGLETON(send_pg_created)
+  QUEUE_FOR_OSD_SINGLETON(send_alive)
+  QUEUE_FOR_OSD_SINGLETON(send_pg_temp)
   FORWARD_TO_LOCAL_CONST(get_mnow)
   FORWARD_TO_LOCAL(get_hb_stamps)
   FORWARD_TO_LOCAL(update_shard_superblock)
@@ -544,41 +611,58 @@ public:
     snap_dump_reservations,
     snap_reserver.dump)
 
-  auto local_update_priority(spg_t pgid, unsigned newprio) {
+
+  auto local_update_priority(
+    singleton_orderer_t &orderer,
+    spg_t pgid, unsigned newprio) {
     LOG_PREFIX(ShardServices::local_update_priority);
     SUBDEBUG(osd, "sending to singleton pgid {} newprio {}", pgid, newprio);
-    return with_singleton([FNAME, pgid, newprio](auto &singleton) {
-      SUBDEBUG(osd, "on singleton pgid {} newprio {}", pgid, newprio);
-      return singleton.local_reserver.update_priority(pgid, newprio);
-    });
+    return schedule_for_singleton(
+      orderer,
+      [FNAME, pgid, newprio](auto &singleton) {
+       SUBDEBUG(osd, "on singleton pgid {} newprio {}", pgid, newprio);
+       return singleton.local_reserver.update_priority(pgid, newprio);
+      });
   }
-  auto local_cancel_reservation(spg_t pgid) {
+  auto local_cancel_reservation(
+    singleton_orderer_t &orderer,
+    spg_t pgid) {
     LOG_PREFIX(ShardServices::local_cancel_reservation);
     SUBDEBUG(osd, "sending to singleton pgid {}", pgid);
-    return with_singleton([FNAME, pgid](auto &singleton) {
-      SUBDEBUG(osd, "on singleton pgid {}", pgid);
-      return singleton.local_reserver.cancel_reservation(pgid);
-    });
+    return schedule_for_singleton(
+      orderer,
+      [FNAME, pgid](auto &singleton) {
+       SUBDEBUG(osd, "on singleton pgid {}", pgid);
+       return singleton.local_reserver.cancel_reservation(pgid);
+      });
   }
   FORWARD_TO_OSD_SINGLETON_TARGET(
     local_dump_reservations,
     local_reserver.dump)
 
-  auto remote_update_priority(spg_t pgid, unsigned newprio) {
+  auto remote_update_priority(
+    singleton_orderer_t &orderer,
+    spg_t pgid, unsigned newprio) {
     LOG_PREFIX(ShardServices::remote_update_priority);
     SUBDEBUG(osd, "sending to singleton pgid {} newprio {}", pgid, newprio);
-    return with_singleton([FNAME, pgid, newprio](auto &singleton) {
-      SUBDEBUG(osd, "on singleton pgid {} newprio {}", pgid, newprio);
-      return singleton.remote_reserver.update_priority(pgid, newprio);
-    });
+    return schedule_for_singleton(
+      orderer,
+      [FNAME, pgid, newprio](auto &singleton) {
+       SUBDEBUG(osd, "on singleton pgid {} newprio {}", pgid, newprio);
+       return singleton.remote_reserver.update_priority(pgid, newprio);
+      });
   }
-  auto remote_cancel_reservation(spg_t pgid) {
+  auto remote_cancel_reservation(
+    singleton_orderer_t &orderer,
+    spg_t pgid) {
     LOG_PREFIX(ShardServices::remote_cancel_reservation);
     SUBDEBUG(osd, "sending to singleton pgid {}", pgid);
-    return with_singleton([FNAME, pgid](auto &singleton) {
-      SUBDEBUG(osd, "on singleton pgid {}", pgid);
-      return singleton.remote_reserver.cancel_reservation(pgid);
-    });
+    return schedule_for_singleton(
+      orderer,
+      [FNAME, pgid](auto &singleton) {
+       SUBDEBUG(osd, "on singleton pgid {}", pgid);
+       return singleton.remote_reserver.cancel_reservation(pgid);
+      });
   }
   FORWARD_TO_OSD_SINGLETON_TARGET(
     remote_dump_reservations,
@@ -594,14 +678,16 @@ public:
        });
     });
   }
-  seastar::future<> local_request_reservation(
+  void local_request_reservation(
+    singleton_orderer_t &orderer,
     spg_t item,
     Context *on_reserved,
     unsigned prio,
     Context *on_preempt) {
     LOG_PREFIX(ShardServices::local_request_reservation);
     SUBDEBUG(osd, "sending to singleton pgid {} prio {}", item, prio);
-    return with_singleton(
+    return schedule_for_singleton(
+      orderer,
       [FNAME, item, prio](
        OSDSingletonState &singleton,
        Context *wrapped_on_reserved, Context *wrapped_on_preempt) {
@@ -615,14 +701,16 @@ public:
       invoke_context_on_core(seastar::this_shard_id(), on_reserved),
       invoke_context_on_core(seastar::this_shard_id(), on_preempt));
   }
-  seastar::future<> remote_request_reservation(
+  void remote_request_reservation(
+    singleton_orderer_t &orderer,
     spg_t item,
     Context *on_reserved,
     unsigned prio,
     Context *on_preempt) {
     LOG_PREFIX(ShardServices::remote_request_reservation);
     SUBDEBUG(osd, "sending to singleton pgid {} prio {}", item, prio);
-    return with_singleton(
+    return schedule_for_singleton(
+      orderer,
       [FNAME, item, prio](
        OSDSingletonState &singleton,
        Context *wrapped_on_reserved, Context *wrapped_on_preempt) {
@@ -636,11 +724,13 @@ public:
       invoke_context_on_core(seastar::this_shard_id(), on_reserved),
       invoke_context_on_core(seastar::this_shard_id(), on_preempt));
   }
-  seastar::future<> snap_request_reservation(
+  void snap_request_reservation(
+    singleton_orderer_t &orderer,
     spg_t item,
     Context *on_reserved,
     unsigned prio) {
-    return with_singleton(
+    return schedule_for_singleton(
+      orderer,
       [item, prio](OSDSingletonState &singleton,
                   Context *wrapped_on_reserved) {
        return singleton.snap_reserver.request_reservation(