]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: migrate AggregateBlocker-related ops to new tracking infra
authorRadosław Zarzyński <rzarzyns@redhat.com>
Fri, 22 Apr 2022 14:50:47 +0000 (16:50 +0200)
committerRadosław Zarzyński <rzarzyns@redhat.com>
Thu, 5 May 2022 02:06:32 +0000 (04:06 +0200)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/crimson/osd/osd_operations/background_recovery.cc
src/crimson/osd/osd_operations/background_recovery.h
src/crimson/osd/pg_recovery.cc
src/crimson/osd/pg_recovery.h
src/crimson/osd/recovery_backend.h

index d3f57d7dee68539bbacd3100b543f2e7020ec6e0..0c40e2705340ad11a18de57d46e0b74dd5199053 100644 (file)
@@ -18,6 +18,22 @@ namespace {
   }
 }
 
+namespace crimson {
+  template <>
+  struct EventBackendRegistry<osd::UrgentRecovery> {
+    static std::tuple<> get_backends() {
+      return {};
+    }
+  };
+
+  template <>
+  struct EventBackendRegistry<osd::PglogBasedRecovery> {
+    static std::tuple<> get_backends() {
+      return {};
+    }
+  };
+}
+
 namespace crimson::osd {
 
 template <class T>
@@ -97,9 +113,10 @@ UrgentRecovery::do_recovery()
 {
   logger().debug("{}: {}", __func__, *this);
   if (!pg->has_reset_since(epoch_started)) {
-    return with_blocking_future_interruptible<interruptor::condition>(
-      pg->get_recovery_handler()->recover_missing(soid, need)
-    ).then_interruptible([] {
+    return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent,
+                              interruptor>([this] (auto&& trigger) {
+      return pg->get_recovery_handler()->recover_missing(trigger, soid, need);
+    }).then_interruptible([] {
       return seastar::make_ready_future<bool>(false);
     });
   }
@@ -143,9 +160,12 @@ PglogBasedRecovery::do_recovery()
   if (pg->has_reset_since(epoch_started)) {
     return seastar::make_ready_future<bool>(false);
   }
-  return with_blocking_future_interruptible<interruptor::condition>(
-    pg->get_recovery_handler()->start_recovery_ops(
-      crimson::common::local_conf()->osd_recovery_max_single_start));
+  return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent,
+                            interruptor>([this] (auto&& trigger) {
+    return pg->get_recovery_handler()->start_recovery_ops(
+      trigger,
+      crimson::common::local_conf()->osd_recovery_max_single_start);
+  });
 }
 
 BackfillRecovery::BackfillRecoveryPipeline &BackfillRecovery::bp(PG &pg)
index 8f726c51a6370f5fa7cc610bc9bbbbeb70c042c3..1f293e12ba74da63907cb5703e72d12a4aa76475 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "crimson/net/Connection.h"
 #include "crimson/osd/osd_operation.h"
+#include "crimson/osd/recovery_backend.h"
 #include "crimson/common/type_helpers.h"
 
 namespace crimson::osd {
@@ -63,6 +64,10 @@ public:
     epoch_t epoch_started);
   void print(std::ostream&) const final;
 
+  std::tuple<
+    RecoveryBackend::RecoveryBlockingEvent
+  > tracking_events;
+
 private:
   void dump_detail(Formatter* f) const final;
   interruptible_future<bool> do_recovery() override;
@@ -78,6 +83,10 @@ public:
     epoch_t epoch_started,
     float delay = 0);
 
+  std::tuple<
+    RecoveryBackend::RecoveryBlockingEvent
+  > tracking_events;
+
 private:
   interruptible_future<bool> do_recovery() override;
 };
index f4a4058dc908e44b4304eccc7709406c9e3203ce..62c8730fedb4a18683a4954c4f2b7117052dde84 100644 (file)
@@ -34,8 +34,10 @@ void PGRecovery::start_pglogbased_recovery()
     float(0.001));
 }
 
-PGRecovery::blocking_interruptible_future<bool>
-PGRecovery::start_recovery_ops(size_t max_to_start)
+PGRecovery::interruptible_future<bool>
+PGRecovery::start_recovery_ops(
+  RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
+  size_t max_to_start)
 {
   assert(pg->is_primary());
   assert(pg->is_peered());
@@ -49,16 +51,20 @@ PGRecovery::start_recovery_ops(size_t max_to_start)
   assert(!pg->is_backfilling());
   assert(!pg->get_peering_state().is_deleting());
 
-  std::vector<blocking_interruptible_future<>> started;
+  std::vector<RecoveryBackend::interruptible_future<>> new_started;
+  std::vector<interruptible_future<>> started;
+  new_started.reserve(max_to_start);
   started.reserve(max_to_start);
-  max_to_start -= start_primary_recovery_ops(max_to_start, &started);
+  max_to_start -= start_primary_recovery_ops(trigger, max_to_start, &started);
   if (max_to_start > 0) {
-    max_to_start -= start_replica_recovery_ops(max_to_start, &started);
+    max_to_start -= start_replica_recovery_ops(trigger, max_to_start, &started);
   }
-  return crimson::join_blocking_interruptible_futures<
-    ::crimson::osd::IOInterruptCondition>(std::move(started)).then_interruptible<
-    ::crimson::osd::IOInterruptCondition>(
-    [this] {
+  using interruptor =
+    crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
+  return interruptor::parallel_for_each(std::move(new_started),
+                                       [] (auto&& ifut) {
+    return std::move(ifut);
+  }).then_interruptible([this] {
     bool done = !pg->get_peering_state().needs_recovery();
     if (done) {
       logger().debug("start_recovery_ops: AllReplicasRecovered for pg: {}",
@@ -93,8 +99,9 @@ PGRecovery::start_recovery_ops(size_t max_to_start)
 }
 
 size_t PGRecovery::start_primary_recovery_ops(
+  RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
   size_t max_to_start,
-  std::vector<PGRecovery::blocking_interruptible_future<>> *out)
+  std::vector<PGRecovery::interruptible_future<>> *out)
 {
   if (!pg->is_recovering()) {
     return 0;
@@ -149,13 +156,13 @@ size_t PGRecovery::start_primary_recovery_ops(
     // TODO: handle lost/unfound
     if (pg->get_recovery_backend()->is_recovering(soid)) {
       auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
-      out->push_back(recovery_waiter.wait_for_recovered_blocking<
-           ::crimson::osd::IOInterruptCondition>());
+      out->emplace_back(recovery_waiter.wait_for_recovered(
+       *trigger.create_part_trigger()));
       ++started;
     } else if (pg->get_recovery_backend()->is_recovering(head)) {
       ++skipped;
     } else {
-      out->push_back(recover_missing(soid, item.need));
+      out->emplace_back(recover_missing(trigger, soid, item.need));
       ++started;
     }
 
@@ -169,8 +176,9 @@ size_t PGRecovery::start_primary_recovery_ops(
 }
 
 size_t PGRecovery::start_replica_recovery_ops(
+  RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
   size_t max_to_start,
-  std::vector<PGRecovery::blocking_interruptible_future<>> *out)
+  std::vector<PGRecovery::interruptible_future<>> *out)
 {
   if (!pg->is_recovering()) {
     return 0;
@@ -217,8 +225,8 @@ size_t PGRecovery::start_replica_recovery_ops(
       if (pg->get_recovery_backend()->is_recovering(soid)) {
        logger().debug("{}: already recovering object {}", __func__, soid);
        auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
-       out->push_back(recovery_waiter.wait_for_recovered_blocking<
-           ::crimson::osd::IOInterruptCondition>());
+       out->emplace_back(recovery_waiter.wait_for_recovered(
+         *trigger.create_part_trigger()));
        started++;
        continue;
       }
@@ -227,8 +235,9 @@ size_t PGRecovery::start_replica_recovery_ops(
        logger().debug("{}: soid {} is a delete, removing", __func__, soid);
        map<hobject_t,pg_missing_item>::const_iterator r =
          pm.get_items().find(soid);
-       started += prep_object_replica_deletes(
-         soid, r->second.need, out);
+       started++;
+       out->emplace_back(
+         prep_object_replica_deletes(trigger, soid, r->second.need));
        continue;
       }
 
@@ -248,23 +257,27 @@ size_t PGRecovery::start_replica_recovery_ops(
       logger().debug("{}: recover_object_replicas({})", __func__,soid);
       map<hobject_t,pg_missing_item>::const_iterator r = pm.get_items().find(
        soid);
-      started += prep_object_replica_pushes(
-       soid, r->second.need, out);
+      started++;
+      out->emplace_back(
+       prep_object_replica_pushes(trigger, soid, r->second.need));
     }
   }
 
   return started;
 }
 
-PGRecovery::blocking_interruptible_future<>
+PGRecovery::interruptible_future<>
 PGRecovery::recover_missing(
+  RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
   const hobject_t &soid, eversion_t need)
 {
   if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
-    return pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
-       pg->get_recovery_backend()->recover_delete(soid, need));
+    return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+      trigger,
+      pg->get_recovery_backend()->recover_delete(soid, need));
   } else {
-    return pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
+    return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+      trigger,
       pg->get_recovery_backend()->recover_object(soid, need)
       .handle_exception_interruptible(
        [=, soid = std::move(soid)] (auto e) {
@@ -275,41 +288,37 @@ PGRecovery::recover_missing(
   }
 }
 
-size_t PGRecovery::prep_object_replica_deletes(
+RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_deletes(
+  RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
   const hobject_t& soid,
-  eversion_t need,
-  std::vector<PGRecovery::blocking_interruptible_future<>> *in_progress)
+  eversion_t need)
 {
-  in_progress->push_back(
-    pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
-      pg->get_recovery_backend()->push_delete(soid, need).then_interruptible(
-       [=] {
-       object_stat_sum_t stat_diff;
-       stat_diff.num_objects_recovered = 1;
-       on_global_recover(soid, stat_diff, true);
-       return seastar::make_ready_future<>();
-      })
-    )
+  return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+    trigger,
+    pg->get_recovery_backend()->push_delete(soid, need).then_interruptible(
+      [=] {
+      object_stat_sum_t stat_diff;
+      stat_diff.num_objects_recovered = 1;
+      on_global_recover(soid, stat_diff, true);
+      return seastar::make_ready_future<>();
+    })
   );
-  return 1;
 }
 
-size_t PGRecovery::prep_object_replica_pushes(
+RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_pushes(
+  RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
   const hobject_t& soid,
-  eversion_t need,
-  std::vector<PGRecovery::blocking_interruptible_future<>> *in_progress)
+  eversion_t need)
 {
-  in_progress->push_back(
-    pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
-      pg->get_recovery_backend()->recover_object(soid, need)
-      .handle_exception_interruptible(
-       [=, soid = std::move(soid)] (auto e) {
-       on_failed_recover({ pg->get_pg_whoami() }, soid, need);
-       return seastar::make_ready_future<>();
-      })
-    )
+  return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+    trigger,
+    pg->get_recovery_backend()->recover_object(soid, need)
+    .handle_exception_interruptible(
+      [=, soid = std::move(soid)] (auto e) {
+      on_failed_recover({ pg->get_pg_whoami() }, soid, need);
+      return seastar::make_ready_future<>();
+    })
   );
-  return 1;
 }
 
 void PGRecovery::on_local_recover(
index cdb07cc5c3945035f239c0858980a533fecdc149..7840d85be08591a5441ba6356d6f8106b301058f 100644 (file)
@@ -20,14 +20,14 @@ class PGBackend;
 class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
 public:
   template <typename T = void>
-  using blocking_interruptible_future =
-    ::crimson::blocking_interruptible_future<
-      ::crimson::osd::IOInterruptCondition, T>;
+  using interruptible_future = RecoveryBackend::interruptible_future<T>;
   PGRecovery(PGRecoveryListener* pg) : pg(pg) {}
   virtual ~PGRecovery() {}
   void start_pglogbased_recovery();
 
-  blocking_interruptible_future<bool> start_recovery_ops(size_t max_to_start);
+  interruptible_future<bool> start_recovery_ops(
+    RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
+    size_t max_to_start);
   void on_backfill_reserved();
   void dispatch_backfill_event(
     boost::intrusive_ptr<const boost::statechart::event_base> evt);
@@ -36,25 +36,28 @@ public:
 private:
   PGRecoveryListener* pg;
   size_t start_primary_recovery_ops(
+    RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
     size_t max_to_start,
-    std::vector<blocking_interruptible_future<>> *out);
+    std::vector<interruptible_future<>> *out);
   size_t start_replica_recovery_ops(
+    RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
     size_t max_to_start,
-    std::vector<blocking_interruptible_future<>> *out);
+    std::vector<interruptible_future<>> *out);
 
   std::vector<pg_shard_t> get_replica_recovery_order() const {
     return pg->get_replica_recovery_order();
   }
-  blocking_interruptible_future<> recover_missing(
+  RecoveryBackend::interruptible_future<> recover_missing(
+    RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
     const hobject_t &soid, eversion_t need);
-  size_t prep_object_replica_deletes(
+  RecoveryBackend::interruptible_future<> prep_object_replica_deletes(
+    RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
     const hobject_t& soid,
-    eversion_t need,
-    std::vector<blocking_interruptible_future<>> *in_progress);
-  size_t prep_object_replica_pushes(
+    eversion_t need);
+  RecoveryBackend::interruptible_future<> prep_object_replica_pushes(
+    RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
     const hobject_t& soid,
-    eversion_t need,
-    std::vector<blocking_interruptible_future<>> *in_progress);
+    eversion_t need);
 
   void on_local_recover(
     const hobject_t& soid,
index eb621487a18a6f4412751bb0b98e44f8df738444..cc48dd7d0de33d286c7e32face3c6143a408b287 100644 (file)
@@ -25,7 +25,7 @@ namespace crimson::osd{
 class PGBackend;
 
 class RecoveryBackend {
-protected:
+public:
   class WaitForObjectRecovery;
 public:
   template <typename T = void>
@@ -119,6 +119,7 @@ protected:
     object_stat_sum_t stat;
   };
 
+public:
   class WaitForObjectRecovery : public crimson::BlockerT<WaitForObjectRecovery> {
     seastar::shared_promise<> readable, recovered, pulled;
     std::map<pg_shard_t, seastar::shared_promise<>> pushes;
@@ -138,11 +139,8 @@ protected:
     seastar::future<> wait_for_recovered() {
       return recovered.get_shared_future();
     }
-    template <typename InterruptCond>
-    crimson::blocking_interruptible_future<InterruptCond>
-    wait_for_recovered_blocking() {
-      return make_blocking_interruptible_future<InterruptCond>(
-         recovered.get_shared_future());
+    seastar::future<> wait_for_recovered(BlockingEvent::TriggerI& trigger) {
+      return trigger.maybe_record_blocking(recovered.get_shared_future(), *this);
     }
     seastar::future<> wait_for_pull() {
       return pulled.get_shared_future();
@@ -178,6 +176,9 @@ protected:
     void dump_detail(Formatter* f) const {
     }
   };
+  using RecoveryBlockingEvent =
+    crimson::AggregateBlockingEvent<WaitForObjectRecovery::BlockingEvent>;
+protected:
   std::map<hobject_t, WaitForObjectRecovery> recovering;
   hobject_t get_temp_recovery_object(
     const hobject_t& target,