]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/pg_recovery: rewrite start_recovery_ops
authorMatan Breizman <mbreizma@redhat.com>
Wed, 16 Apr 2025 11:58:51 +0000 (11:58 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Tue, 6 May 2025 07:49:07 +0000 (10:49 +0300)
We had few confusions around the return value from start_recovery_ops.
This commit is a groundwork for the return type change.

* Move to coroutines
* Update logging macro

Signed-off-by: Matan Breizman <mbreizma@redhat.com>
(cherry picked from commit ce4e9aaad8f2cafae24511fe1687c61dc41affc1)

src/crimson/common/interruptible_future.h
src/crimson/osd/pg_recovery.cc
src/crimson/osd/pg_recovery.h

index a3cdacbc6d66171a89d903a8897d3d667590fc18..b773b0b8720eccb674f3bd228bf6f6963677b26d 100644 (file)
@@ -1486,6 +1486,17 @@ public:
        futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
   }
 
+  // This is a simpler implemation than seastar::when_all_succeed.
+  // We are not using ::seastar::internal::complete_when_all
+  template <typename T>
+  static inline auto when_all_succeed(std::vector<interruptible_future<InterruptCond, T>>&& futures) noexcept {
+    return interruptor::parallel_for_each(futures,
+    [] (auto&& ifut) -> interruptible_future<InterruptCond, T> {
+      return std::move(ifut);
+    });
+  }
+
+
   template <typename Func,
            typename Result = futurize_t<std::invoke_result_t<Func>>>
   static inline Result async(Func&& func) {
index 84a8b2c235b419a767f8978943998cd5442acc79..b890c55312e08d11da6608ba2588390dae61be64 100644 (file)
@@ -46,13 +46,14 @@ PGRecovery::start_recovery_ops(
   PglogBasedRecovery &recover_op,
   size_t max_to_start)
 {
+  LOG_PREFIX(PGRecovery::start_recovery_ops);
   assert(pg->is_primary());
   assert(pg->is_peered());
 
   if (pg->has_reset_since(recover_op.get_epoch_started()) ||
       recover_op.is_cancelled()) {
-    logger().debug("recovery {} cancelled.", recover_op);
-    return seastar::make_ready_future<bool>(false);
+    DEBUGDPP("recovery {} cancelled.", pg->get_pgid(), recover_op);
+    co_return false;
   }
   ceph_assert(pg->is_recovering());
 
@@ -71,51 +72,32 @@ PGRecovery::start_recovery_ops(
   if (max_to_start > 0) {
     max_to_start -= start_replica_recovery_ops(trigger, max_to_start, &started);
   }
-  using interruptor =
-    crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
-  return interruptor::parallel_for_each(started,
-                                       [] (auto&& ifut) {
-    return std::move(ifut);
-  }).then_interruptible([this, &recover_op] {
-    //TODO: maybe we should implement a recovery race interruptor in the future
-    if (pg->has_reset_since(recover_op.get_epoch_started()) ||
-       recover_op.is_cancelled()) {
-      logger().debug("recovery {} cancelled.", recover_op);
-      return seastar::make_ready_future<bool>(false);
-    }
-    ceph_assert(pg->is_recovering());
-    ceph_assert(!pg->is_backfilling());
-
-    bool do_recovery = pg->get_peering_state().needs_recovery();
-    if (!do_recovery) {
-      logger().debug("start_recovery_ops: AllReplicasRecovered for pg: {}",
-                     pg->get_pgid());
-      using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
-      if (!pg->get_peering_state().needs_backfill()) {
-        logger().debug("start_recovery_ops: AllReplicasRecovered for pg: {}",
-                      pg->get_pgid());
-        (void) pg->get_shard_services().start_operation<LocalPeeringEvent>(
-          static_cast<crimson::osd::PG*>(pg),
-          pg->get_pg_whoami(),
-          pg->get_pgid(),
-          pg->get_osdmap_epoch(),
-          pg->get_osdmap_epoch(),
-          PeeringState::AllReplicasRecovered{});
-      } else {
-        logger().debug("start_recovery_ops: RequestBackfill for pg: {}",
-                      pg->get_pgid());
-        (void) pg->get_shard_services().start_operation<LocalPeeringEvent>(
-          static_cast<crimson::osd::PG*>(pg),
-          pg->get_pg_whoami(),
-          pg->get_pgid(),
-          pg->get_osdmap_epoch(),
-          pg->get_osdmap_epoch(),
-          PeeringState::RequestBackfill{});
-      }
-      pg->reset_pglog_based_recovery_op();
-    }
-    return seastar::make_ready_future<bool>(do_recovery);
+
+  co_await interruptor::when_all_succeed(std::move(started));
+
+  //TODO: maybe we should implement a recovery race interruptor in the future
+  if (pg->has_reset_since(recover_op.get_epoch_started()) ||
+      recover_op.is_cancelled()) {
+    DEBUGDPP("recovery {} cancelled.", pg->get_pgid(), recover_op);
+    co_return false;
+  }
+  ceph_assert(pg->is_recovering());
+  ceph_assert(!pg->is_backfilling());
+
+  // move to unnamed placeholder when C++ 26 is available
+  auto reset_pglog_based_recovery_op = seastar::defer([this] {
+    pg->reset_pglog_based_recovery_op();
   });
+
+  if (!pg->get_peering_state().needs_recovery()) {
+    if (pg->get_peering_state().needs_backfill()) {
+      request_backfill();
+    } else {
+      all_replicas_recovered();
+    }
+    co_return false;
+  }
+  co_return true;
 }
 
 size_t PGRecovery::start_primary_recovery_ops(
@@ -642,6 +624,8 @@ void PGRecovery::on_pg_clean()
 
 void PGRecovery::backfilled()
 {
+  LOG_PREFIX(PGRecovery::backfilled);
+  DEBUGDPP("", pg->get_pgid());
   using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
   std::ignore = pg->get_shard_services().start_operation<LocalPeeringEvent>(
     static_cast<crimson::osd::PG*>(pg),
@@ -652,6 +636,35 @@ void PGRecovery::backfilled()
     PeeringState::Backfilled{});
 }
 
+void PGRecovery::request_backfill()
+{
+  LOG_PREFIX(PGRecovery::request_backfill);
+  DEBUGDPP("", pg->get_pgid());
+  using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
+  std::ignore = pg->get_shard_services().start_operation<LocalPeeringEvent>(
+    static_cast<crimson::osd::PG*>(pg),
+    pg->get_pg_whoami(),
+    pg->get_pgid(),
+    pg->get_osdmap_epoch(),
+    pg->get_osdmap_epoch(),
+    PeeringState::RequestBackfill{});
+}
+
+
+void PGRecovery::all_replicas_recovered()
+{
+  LOG_PREFIX(PGRecovery::all_replicas_recovered);
+  DEBUGDPP("", pg->get_pgid());
+  using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
+  std::ignore = pg->get_shard_services().start_operation<LocalPeeringEvent>(
+    static_cast<crimson::osd::PG*>(pg),
+    pg->get_pg_whoami(),
+    pg->get_pgid(),
+    pg->get_osdmap_epoch(),
+    pg->get_osdmap_epoch(),
+    PeeringState::AllReplicasRecovered{});
+}
+
 void PGRecovery::backfill_suspended()
 {
   using BackfillState = crimson::osd::BackfillState;
index a44118cac5a008ea2eaa1760518048ede41e7e55..015e5114518c2abe0bc030ae651569e45b5d4744 100644 (file)
@@ -137,7 +137,12 @@ private:
   void update_peers_last_backfill(
     const hobject_t& new_last_backfill) final;
   bool budget_available() const final;
+
+  // TODO: move to start_peering_event_operation
   void backfilled() final;
+  void request_backfill();
+  void all_replicas_recovered();
+
   friend crimson::osd::BackfillState::PGFacade;
   friend crimson::osd::PG;
   // backfill end