From 83e9db3a8b070527880673ec23ebcae7abdca96c Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 6 May 2022 05:05:31 +0000 Subject: [PATCH] crimson: retain reference to WaitForObjectRecovery until blocker resolves PGRecovery::on_global_recover destroys the map entry without waiting for the future returned from seastar::future<> wait_for_recovered(BlockingEvent::TriggerI& trigger) { This commit changes WaitForObjectRecovery to be refcounted and retains a reference until the future resolves. Fixes: https://tracker.ceph.com/issues/55565 Signed-off-by: Samuel Just --- src/crimson/osd/pg_recovery.cc | 8 ++--- src/crimson/osd/recovery_backend.cc | 10 +++---- src/crimson/osd/recovery_backend.h | 27 ++++++++++++----- .../osd/replicated_recovery_backend.cc | 30 +++++++++---------- 4 files changed, 44 insertions(+), 31 deletions(-) diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc index 62c8730fedb4a..9ebcad8c0c81c 100644 --- a/src/crimson/osd/pg_recovery.cc +++ b/src/crimson/osd/pg_recovery.cc @@ -272,11 +272,11 @@ PGRecovery::recover_missing( const hobject_t &soid, eversion_t need) { if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) { - return pg->get_recovery_backend()->add_recovering(soid).track_blocking( + return pg->get_recovery_backend()->add_recovering(soid).wait_track_blocking( trigger, pg->get_recovery_backend()->recover_delete(soid, need)); } else { - return pg->get_recovery_backend()->add_recovering(soid).track_blocking( + return pg->get_recovery_backend()->add_recovering(soid).wait_track_blocking( trigger, pg->get_recovery_backend()->recover_object(soid, need) .handle_exception_interruptible( @@ -293,7 +293,7 @@ RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_deletes( const hobject_t& soid, eversion_t need) { - return pg->get_recovery_backend()->add_recovering(soid).track_blocking( + return pg->get_recovery_backend()->add_recovering(soid).wait_track_blocking( trigger, pg->get_recovery_backend()->push_delete(soid, need).then_interruptible( [=] { @@ -310,7 +310,7 @@ RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_pushes( const hobject_t& soid, eversion_t need) { - return pg->get_recovery_backend()->add_recovering(soid).track_blocking( + return pg->get_recovery_backend()->add_recovering(soid).wait_track_blocking( trigger, pg->get_recovery_backend()->recover_object(soid, need) .handle_exception_interruptible( diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc index 04c9034a52c18..040cf0d88b2b6 100644 --- a/src/crimson/osd/recovery_backend.cc +++ b/src/crimson/osd/recovery_backend.cc @@ -41,13 +41,13 @@ void RecoveryBackend::clean_up(ceph::os::Transaction& t, temp_contents.clear(); for (auto& [soid, recovery_waiter] : recovering) { - if ((recovery_waiter.pi && recovery_waiter.pi->is_complete()) - || (!recovery_waiter.pi - && recovery_waiter.obc && recovery_waiter.obc->obs.exists)) { - recovery_waiter.obc->interrupt( + if ((recovery_waiter->pi && recovery_waiter->pi->is_complete()) + || (!recovery_waiter->pi + && recovery_waiter->obc && recovery_waiter->obc->obs.exists)) { + recovery_waiter->obc->interrupt( ::crimson::common::actingset_changed( pg.is_primary())); - recovery_waiter.interrupt(why); + recovery_waiter->interrupt(why); } } recovering.clear(); diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h index cc48dd7d0de33..a2b2df71168e9 100644 --- a/src/crimson/osd/recovery_backend.h +++ b/src/crimson/osd/recovery_backend.h @@ -46,13 +46,13 @@ public: backend{backend} {} virtual ~RecoveryBackend() {} WaitForObjectRecovery& add_recovering(const hobject_t& soid) { - auto [it, added] = recovering.emplace(soid, WaitForObjectRecovery{}); + auto [it, added] = recovering.emplace(soid, new WaitForObjectRecovery{}); assert(added); - return it->second; + return *(it->second); } WaitForObjectRecovery& get_recovering(const hobject_t& soid) { assert(is_recovering(soid)); - return recovering.at(soid); + return *(recovering.at(soid)); } void remove_recovering(const hobject_t& soid) { recovering.erase(soid); @@ -88,7 +88,7 @@ public: seastar::future<> stop() { for (auto& [soid, recovery_waiter] : recovering) { - recovery_waiter.stop(); + recovery_waiter->stop(); } return on_stop(); } @@ -120,7 +120,10 @@ protected: }; public: - class WaitForObjectRecovery : public crimson::BlockerT { + class WaitForObjectRecovery : + public boost::intrusive_ref_counter< + WaitForObjectRecovery, boost::thread_unsafe_counter>, + public crimson::BlockerT { seastar::shared_promise<> readable, recovered, pulled; std::map> pushes; public: @@ -139,8 +142,17 @@ public: seastar::future<> wait_for_recovered() { return recovered.get_shared_future(); } + template + auto wait_track_blocking(T &trigger, F &&fut) { + WaitForObjectRecoveryRef ref = this; + return track_blocking( + trigger, + std::forward(fut) + ).finally([ref] {}); + } seastar::future<> wait_for_recovered(BlockingEvent::TriggerI& trigger) { - return trigger.maybe_record_blocking(recovered.get_shared_future(), *this); + WaitForObjectRecoveryRef ref = this; + return wait_track_blocking(trigger, recovered.get_shared_future()); } seastar::future<> wait_for_pull() { return pulled.get_shared_future(); @@ -178,8 +190,9 @@ public: }; using RecoveryBlockingEvent = crimson::AggregateBlockingEvent; + using WaitForObjectRecoveryRef = boost::intrusive_ptr; protected: - std::map recovering; + std::map recovering; hobject_t get_temp_recovery_object( const hobject_t& target, eversion_t version) const; diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index a96f1b8dd2bfd..3adc3a0cc3b66 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -36,7 +36,7 @@ ReplicatedRecoveryBackend::recover_object( return pg.with_head_obc(soid, [this, soid, need](auto obc) { logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid); - auto& recovery_waiter = recovering.at(soid); + auto& recovery_waiter = get_recovering(soid); recovery_waiter.obc = obc; recovery_waiter.obc->wait_recovery_read(); return maybe_push_shards(soid, need); @@ -70,11 +70,11 @@ ReplicatedRecoveryBackend::maybe_push_shards( pg.get_osdmap_epoch())) .then_interruptible( [this, soid, shard] { - return recovering.at(soid).wait_for_pushes(shard); + return get_recovering(soid).wait_for_pushes(shard); }); }); }).then_interruptible([this, soid] { - auto &recovery = recovering.at(soid); + auto &recovery = get_recovering(soid); if (auto push_info = recovery.pushing.begin(); push_info != recovery.pushing.end()) { pg.get_recovery_handler()->on_global_recover(soid, @@ -90,7 +90,7 @@ ReplicatedRecoveryBackend::maybe_push_shards( } return seastar::make_ready_future<>(); }).handle_exception_interruptible([this, soid](auto e) { - auto &recovery = recovering.at(soid); + auto &recovery = get_recovering(soid); if (recovery.obc) { recovery.obc->drop_recovery_read(); } @@ -109,7 +109,7 @@ ReplicatedRecoveryBackend::maybe_pull_missing_obj( return seastar::make_ready_future<>(); } PullOp po; - auto& recovery_waiter = recovering.at(soid); + auto& recovery_waiter = get_recovering(soid); recovery_waiter.pi = std::make_optional(); auto& pi = *recovery_waiter.pi; prepare_pull(po, pi, soid, need); @@ -159,7 +159,7 @@ ReplicatedRecoveryBackend::push_delete( shard_services.send_to_osd(shard.osd, std::move(msg), pg.get_osdmap_epoch())).then_interruptible( [this, soid, shard] { - return recovering.at(soid).wait_for_pushes(shard); + return get_recovering(soid).wait_for_pushes(shard); }); } return seastar::make_ready_future<>(); @@ -293,7 +293,7 @@ ReplicatedRecoveryBackend::prep_push( { logger().debug("{}: {}, {}", __func__, soid, need); - auto& recovery_waiter = recovering.at(soid); + auto& recovery_waiter = get_recovering(soid); auto& obc = recovery_waiter.obc; interval_set data_subset; if (obc->obs.oi.size) { @@ -325,7 +325,7 @@ ReplicatedRecoveryBackend::prep_push( return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then_interruptible( [this, soid, pg_shard](auto pop) { - auto& recovery_waiter = recovering.at(soid); + auto& recovery_waiter = get_recovering(soid); auto& pi = recovery_waiter.pushing[pg_shard]; pi.recovery_progress = pop.after_progress; return pop; @@ -647,7 +647,7 @@ ReplicatedRecoveryBackend::_handle_pull_response( pop.recovery_info, pop.after_progress, pop.data.length(), pop.data_included); const hobject_t &hoid = pop.soid; - auto& recovery_waiter = recovering.at(hoid); + auto& recovery_waiter = get_recovering(hoid); auto& pi = *recovery_waiter.pi; if (pi.recovery_info.size == (uint64_t(-1))) { pi.recovery_info.size = pop.recovery_info.size; @@ -702,7 +702,7 @@ ReplicatedRecoveryBackend::_handle_pull_response( if (complete) { pi.stat.num_objects_recovered++; pg.get_recovery_handler()->on_local_recover( - pop.soid, recovering.at(pop.soid).pi->recovery_info, + pop.soid, get_recovering(pop.soid).pi->recovery_info, false, *t); return true; } else { @@ -754,7 +754,7 @@ ReplicatedRecoveryBackend::handle_pull_response( }).then_interruptible([this, m, &response](bool complete) { if (complete) { auto& pop = m->pushes[0]; - recovering.at(pop.soid).set_pulled(); + get_recovering(pop.soid).set_pulled(); return seastar::make_ready_future<>(); } else { auto reply = crimson::make_message(); @@ -860,11 +860,11 @@ ReplicatedRecoveryBackend::_handle_push_reply( logger().debug("{}, soid {}, from {}", __func__, soid, peer); auto recovering_iter = recovering.find(soid); if (recovering_iter == recovering.end() - || !recovering_iter->second.pushing.count(peer)) { + || !recovering_iter->second->pushing.count(peer)) { logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer); return seastar::make_ready_future>(); } else { - auto& pi = recovering_iter->second.pushing[peer]; + auto& pi = recovering_iter->second->pushing[peer]; bool error = pi.recovery_progress.error; if (!pi.recovery_progress.data_complete && !error) { return build_push_op(pi.recovery_info, pi.recovery_progress, @@ -873,14 +873,14 @@ ReplicatedRecoveryBackend::_handle_push_reply( return seastar::make_ready_future>(std::move(pop)); }).handle_exception_interruptible([recovering_iter, &pi, peer] (auto e) { pi.recovery_progress.error = true; - recovering_iter->second.set_push_failed(peer, e); + recovering_iter->second->set_push_failed(peer, e); return seastar::make_ready_future>(); }); } if (!error) { pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info); } - recovering_iter->second.set_pushed(peer); + recovering_iter->second->set_pushed(peer); return seastar::make_ready_future>(); } } -- 2.39.5