]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: retain reference to WaitForObjectRecovery until blocker resolves
authorSamuel Just <sjust@redhat.com>
Fri, 6 May 2022 05:05:31 +0000 (05:05 +0000)
committerSamuel Just <sjust@redhat.com>
Fri, 6 May 2022 23:21:54 +0000 (23:21 +0000)
PGRecovery::on_global_recover destroys the map entry without waiting for
the future returned from

    seastar::future<> wait_for_recovered(BlockingEvent::TriggerI& trigger) {

This commit changes WaitForObjectRecovery to be refcounted and retains a
reference until the future resolves.

Fixes: https://tracker.ceph.com/issues/55565
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/pg_recovery.cc
src/crimson/osd/recovery_backend.cc
src/crimson/osd/recovery_backend.h
src/crimson/osd/replicated_recovery_backend.cc

index 62c8730fedb4a18683a4954c4f2b7117052dde84..9ebcad8c0c81cd8b33eb686f7e1f36a5543d9967 100644 (file)
@@ -272,11 +272,11 @@ PGRecovery::recover_missing(
   const hobject_t &soid, eversion_t need)
 {
   if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
-    return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+    return pg->get_recovery_backend()->add_recovering(soid).wait_track_blocking(
       trigger,
       pg->get_recovery_backend()->recover_delete(soid, need));
   } else {
-    return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+    return pg->get_recovery_backend()->add_recovering(soid).wait_track_blocking(
       trigger,
       pg->get_recovery_backend()->recover_object(soid, need)
       .handle_exception_interruptible(
@@ -293,7 +293,7 @@ RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_deletes(
   const hobject_t& soid,
   eversion_t need)
 {
-  return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+  return pg->get_recovery_backend()->add_recovering(soid).wait_track_blocking(
     trigger,
     pg->get_recovery_backend()->push_delete(soid, need).then_interruptible(
       [=] {
@@ -310,7 +310,7 @@ RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_pushes(
   const hobject_t& soid,
   eversion_t need)
 {
-  return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+  return pg->get_recovery_backend()->add_recovering(soid).wait_track_blocking(
     trigger,
     pg->get_recovery_backend()->recover_object(soid, need)
     .handle_exception_interruptible(
index 04c9034a52c181854fbac86d9de39a1e9dc9bc6a..040cf0d88b2b676b20c385a0f55151b87cc19d7e 100644 (file)
@@ -41,13 +41,13 @@ void RecoveryBackend::clean_up(ceph::os::Transaction& t,
   temp_contents.clear();
 
   for (auto& [soid, recovery_waiter] : recovering) {
-    if ((recovery_waiter.pi && recovery_waiter.pi->is_complete())
-       || (!recovery_waiter.pi
-         && recovery_waiter.obc && recovery_waiter.obc->obs.exists)) {
-      recovery_waiter.obc->interrupt(
+    if ((recovery_waiter->pi && recovery_waiter->pi->is_complete())
+       || (!recovery_waiter->pi
+         && recovery_waiter->obc && recovery_waiter->obc->obs.exists)) {
+      recovery_waiter->obc->interrupt(
          ::crimson::common::actingset_changed(
              pg.is_primary()));
-      recovery_waiter.interrupt(why);
+      recovery_waiter->interrupt(why);
     }
   }
   recovering.clear();
index cc48dd7d0de33d286c7e32face3c6143a408b287..a2b2df71168e99356955de0e4e8aae37547141e8 100644 (file)
@@ -46,13 +46,13 @@ public:
       backend{backend} {}
   virtual ~RecoveryBackend() {}
   WaitForObjectRecovery& add_recovering(const hobject_t& soid) {
-    auto [it, added] = recovering.emplace(soid, WaitForObjectRecovery{});
+    auto [it, added] = recovering.emplace(soid, new WaitForObjectRecovery{});
     assert(added);
-    return it->second;
+    return *(it->second);
   }
   WaitForObjectRecovery& get_recovering(const hobject_t& soid) {
     assert(is_recovering(soid));
-    return recovering.at(soid);
+    return *(recovering.at(soid));
   }
   void remove_recovering(const hobject_t& soid) {
     recovering.erase(soid);
@@ -88,7 +88,7 @@ public:
 
   seastar::future<> stop() {
     for (auto& [soid, recovery_waiter] : recovering) {
-      recovery_waiter.stop();
+      recovery_waiter->stop();
     }
     return on_stop();
   }
@@ -120,7 +120,10 @@ protected:
   };
 
 public:
-  class WaitForObjectRecovery : public crimson::BlockerT<WaitForObjectRecovery> {
+  class WaitForObjectRecovery :
+    public boost::intrusive_ref_counter<
+      WaitForObjectRecovery, boost::thread_unsafe_counter>,
+    public crimson::BlockerT<WaitForObjectRecovery> {
     seastar::shared_promise<> readable, recovered, pulled;
     std::map<pg_shard_t, seastar::shared_promise<>> pushes;
   public:
@@ -139,8 +142,17 @@ public:
     seastar::future<> wait_for_recovered() {
       return recovered.get_shared_future();
     }
+    template <typename T, typename F>
+    auto wait_track_blocking(T &trigger, F &&fut) {
+      WaitForObjectRecoveryRef ref = this;
+      return track_blocking(
+       trigger,
+       std::forward<F>(fut)
+      ).finally([ref] {});
+    }
     seastar::future<> wait_for_recovered(BlockingEvent::TriggerI& trigger) {
-      return trigger.maybe_record_blocking(recovered.get_shared_future(), *this);
+      WaitForObjectRecoveryRef ref = this;
+      return wait_track_blocking(trigger, recovered.get_shared_future());
     }
     seastar::future<> wait_for_pull() {
       return pulled.get_shared_future();
@@ -178,8 +190,9 @@ public:
   };
   using RecoveryBlockingEvent =
     crimson::AggregateBlockingEvent<WaitForObjectRecovery::BlockingEvent>;
+  using WaitForObjectRecoveryRef = boost::intrusive_ptr<WaitForObjectRecovery>;
 protected:
-  std::map<hobject_t, WaitForObjectRecovery> recovering;
+  std::map<hobject_t, WaitForObjectRecoveryRef> recovering;
   hobject_t get_temp_recovery_object(
     const hobject_t& target,
     eversion_t version) const;
index a96f1b8dd2bfdd8cfe0c6ee14c6a2cb477051c2a..3adc3a0cc3b66fad9965d35ec60f06b175901958 100644 (file)
@@ -36,7 +36,7 @@ ReplicatedRecoveryBackend::recover_object(
     return pg.with_head_obc<RWState::RWREAD>(soid,
       [this, soid, need](auto obc) {
       logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
-      auto& recovery_waiter = recovering.at(soid);
+      auto& recovery_waiter = get_recovering(soid);
       recovery_waiter.obc = obc;
       recovery_waiter.obc->wait_recovery_read();
       return maybe_push_shards(soid, need);
@@ -70,11 +70,11 @@ ReplicatedRecoveryBackend::maybe_push_shards(
                                     pg.get_osdmap_epoch()))
       .then_interruptible(
         [this, soid, shard] {
-        return recovering.at(soid).wait_for_pushes(shard);
+       return get_recovering(soid).wait_for_pushes(shard);
       });
     });
   }).then_interruptible([this, soid] {
-    auto &recovery = recovering.at(soid);
+    auto &recovery = get_recovering(soid);
     if (auto push_info = recovery.pushing.begin();
         push_info != recovery.pushing.end()) {
       pg.get_recovery_handler()->on_global_recover(soid,
@@ -90,7 +90,7 @@ ReplicatedRecoveryBackend::maybe_push_shards(
     }
     return seastar::make_ready_future<>();
   }).handle_exception_interruptible([this, soid](auto e) {
-    auto &recovery = recovering.at(soid);
+    auto &recovery = get_recovering(soid);
     if (recovery.obc) {
       recovery.obc->drop_recovery_read();
     }
@@ -109,7 +109,7 @@ ReplicatedRecoveryBackend::maybe_pull_missing_obj(
     return seastar::make_ready_future<>();
   }
   PullOp po;
-  auto& recovery_waiter = recovering.at(soid);
+  auto& recovery_waiter = get_recovering(soid);
   recovery_waiter.pi = std::make_optional<RecoveryBackend::PullInfo>();
   auto& pi = *recovery_waiter.pi;
   prepare_pull(po, pi, soid, need);
@@ -159,7 +159,7 @@ ReplicatedRecoveryBackend::push_delete(
          shard_services.send_to_osd(shard.osd, std::move(msg),
                                     pg.get_osdmap_epoch())).then_interruptible(
        [this, soid, shard] {
-       return recovering.at(soid).wait_for_pushes(shard);
+       return get_recovering(soid).wait_for_pushes(shard);
       });
     }
     return seastar::make_ready_future<>();
@@ -293,7 +293,7 @@ ReplicatedRecoveryBackend::prep_push(
 {
   logger().debug("{}: {}, {}", __func__, soid, need);
 
-  auto& recovery_waiter = recovering.at(soid);
+  auto& recovery_waiter = get_recovering(soid);
   auto& obc = recovery_waiter.obc;
   interval_set<uint64_t> data_subset;
   if (obc->obs.oi.size) {
@@ -325,7 +325,7 @@ ReplicatedRecoveryBackend::prep_push(
 
   return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then_interruptible(
     [this, soid, pg_shard](auto pop) {
-    auto& recovery_waiter = recovering.at(soid);
+    auto& recovery_waiter = get_recovering(soid);
     auto& pi = recovery_waiter.pushing[pg_shard];
     pi.recovery_progress = pop.after_progress;
     return pop;
@@ -647,7 +647,7 @@ ReplicatedRecoveryBackend::_handle_pull_response(
       pop.recovery_info, pop.after_progress, pop.data.length(), pop.data_included);
 
   const hobject_t &hoid = pop.soid;
-  auto& recovery_waiter = recovering.at(hoid);
+  auto& recovery_waiter = get_recovering(hoid);
   auto& pi = *recovery_waiter.pi;
   if (pi.recovery_info.size == (uint64_t(-1))) {
     pi.recovery_info.size = pop.recovery_info.size;
@@ -702,7 +702,7 @@ ReplicatedRecoveryBackend::_handle_pull_response(
       if (complete) {
        pi.stat.num_objects_recovered++;
        pg.get_recovery_handler()->on_local_recover(
-           pop.soid, recovering.at(pop.soid).pi->recovery_info,
+           pop.soid, get_recovering(pop.soid).pi->recovery_info,
            false, *t);
        return true;
       } else {
@@ -754,7 +754,7 @@ ReplicatedRecoveryBackend::handle_pull_response(
     }).then_interruptible([this, m, &response](bool complete) {
       if (complete) {
        auto& pop = m->pushes[0];
-       recovering.at(pop.soid).set_pulled();
+       get_recovering(pop.soid).set_pulled();
        return seastar::make_ready_future<>();
       } else {
        auto reply = crimson::make_message<MOSDPGPull>();
@@ -860,11 +860,11 @@ ReplicatedRecoveryBackend::_handle_push_reply(
   logger().debug("{}, soid {}, from {}", __func__, soid, peer);
   auto recovering_iter = recovering.find(soid);
   if (recovering_iter == recovering.end()
-      || !recovering_iter->second.pushing.count(peer)) {
+      || !recovering_iter->second->pushing.count(peer)) {
     logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer);
     return seastar::make_ready_future<std::optional<PushOp>>();
   } else {
-    auto& pi = recovering_iter->second.pushing[peer];
+    auto& pi = recovering_iter->second->pushing[peer];
     bool error = pi.recovery_progress.error;
     if (!pi.recovery_progress.data_complete && !error) {
       return build_push_op(pi.recovery_info, pi.recovery_progress,
@@ -873,14 +873,14 @@ ReplicatedRecoveryBackend::_handle_push_reply(
        return seastar::make_ready_future<std::optional<PushOp>>(std::move(pop));
       }).handle_exception_interruptible([recovering_iter, &pi, peer] (auto e) {
         pi.recovery_progress.error = true;
-        recovering_iter->second.set_push_failed(peer, e);
+        recovering_iter->second->set_push_failed(peer, e);
         return seastar::make_ready_future<std::optional<PushOp>>();
       });
     }
     if (!error) {
       pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info);
     }
-    recovering_iter->second.set_pushed(peer);
+    recovering_iter->second->set_pushed(peer);
     return seastar::make_ready_future<std::optional<PushOp>>();
   }
 }