From 5e6e360aa1d78eb545944cf7f2d65e3915467597 Mon Sep 17 00:00:00 2001 From: Patrick Donnelly Date: Wed, 20 Sep 2023 16:57:01 -0400 Subject: [PATCH] common: resolve config proxy deadlock using refcounted pointers 7e8c683 introduced some gymnastics with a "CallGate" to maintain a count for each observer we may be "calling into" with a config change (namely: handle_conf_change). This was to prevent remove_observer coming in and deleting the observer in the middle of the call. More importantly, it was to avoid holding the lock while traversing the observers so that the config_proxy lock can be dropped while calling handle_conf_change. This is important as e.g. the MDS may attempt to acquire the config_proxy lock in its MDSRank::handle_conf_change method (what prompted the change). However, this introduces a new deadlock: - Thread 2 acquires the config_proxy lock and then removes an observer. It blocks waiting for the observer's CallGate to close. - Thread 1 had dropped the config_proxy lock while traversing the observers to call each observer's handle_conf_change method. Those methods may attempt to reacquire the config_proxy lock. This creates the deadlock as it's waiting for Thread 2 to drop the lock while Thread 1 cannot release the CallGate. The solution, I believe, is to properly refcount "uses" of the observers for the purposes of flushing these changes. Use std::shared_ptr to effect this. Reproducing this is fairly simply with several parallel calls to `config set`. During the course of executing `config set`, the Objecter may receive config updates that will be flushed and potentially race with cleanup of observers during shutdown. Fixes: https://tracker.ceph.com/issues/62832 Partial-revert: 7e8c683 Partial-revert: 4458a72 Signed-off-by: Patrick Donnelly (cherry picked from commit 0c70dd8e39cc3d0cdef8bbcc8a0c6f214e54c770) --- src/common/config_obs_mgr.h | 39 ++++--- src/common/config_proxy.h | 180 ++++++++++++------------------ src/crimson/common/config_proxy.h | 25 +++-- 3 files changed, 110 insertions(+), 134 deletions(-) diff --git a/src/common/config_obs_mgr.h b/src/common/config_obs_mgr.h index 06b3cf934a5..759930df92d 100644 --- a/src/common/config_obs_mgr.h +++ b/src/common/config_obs_mgr.h @@ -14,13 +14,11 @@ class ConfigValues; // the changes of settings at runtime. template class ObserverMgr : public ConfigTracker { - // Maps configuration options to the observer listening for them. - using obs_map_t = std::multimap; - obs_map_t observers; - public: - typedef std::map> rev_obs_map; - typedef std::function config_gather_cb; + using config_obs_ptr = std::shared_ptr; + using config_obs_wptr = std::weak_ptr; + typedef std::map> rev_obs_map; + typedef std::function config_gather_cb; // Adds a new observer to this configuration. You can do this at any time, // but it will only receive notifications for the changes that happen after @@ -37,15 +35,18 @@ public: // you need to delete it yourself. // This function will assert if you try to delete an observer that isn't // there. - void remove_observer(ConfigObs* observer); + config_obs_wptr remove_observer(ConfigObs* observer); // invoke callback for every observers tracking keys void for_each_observer(config_gather_cb callback); // invoke callback for observers keys tracking the provided change set - template - void for_each_change(const std::set& changes, - ConfigProxyT& proxy, + void for_each_change(const std::map& changes, config_gather_cb callback, std::ostream *oss); bool is_tracking(const std::string& name) const override; + +private: + // Maps configuration options to the observer listening for them. + using obs_map_t = std::multimap; + obs_map_t observers; }; // we could put the implementations in a .cc file, and only instantiate the @@ -60,17 +61,20 @@ template void ObserverMgr::add_observer(ConfigObs* observer) { const char **keys = observer->get_tracked_conf_keys(); + auto ptr = std::make_shared(observer); for (const char ** k = keys; *k; ++k) { - observers.emplace(*k, observer); + observers.emplace(*k, ptr); } } template -void ObserverMgr::remove_observer(ConfigObs* observer) +typename ObserverMgr::config_obs_wptr ObserverMgr::remove_observer(ConfigObs* observer) { [[maybe_unused]] bool found_obs = false; + config_obs_ptr ptr; for (auto o = observers.begin(); o != observers.end(); ) { - if (o->second == observer) { + if (*o->second == observer) { + ptr = std::move(o->second); observers.erase(o++); found_obs = true; } else { @@ -78,6 +82,7 @@ void ObserverMgr::remove_observer(ConfigObs* observer) } } ceph_assert(found_obs); + return config_obs_wptr(ptr); } template @@ -89,17 +94,15 @@ void ObserverMgr::for_each_observer(config_gather_cb callback) } template -template -void ObserverMgr::for_each_change(const std::set& changes, - ConfigProxyT& proxy, +void ObserverMgr::for_each_change(const std::map& changes, config_gather_cb callback, std::ostream *oss) { // create the reverse observer mapping, mapping observers to the set of // changed keys that they'll get. std::string val; - for (auto& key : changes) { + for (auto& [key, present] : changes) { auto [first, last] = observers.equal_range(key); - if ((oss) && !proxy.get_val(key, &val)) { + if ((oss) && present) { (*oss) << key << " = '" << val << "' "; if (first == last) { (*oss) << "(not observed, change may require restart) "; diff --git a/src/common/config_proxy.h b/src/common/config_proxy.h index 400aa4ed052..b9b47d9cef4 100644 --- a/src/common/config_proxy.h +++ b/src/common/config_proxy.h @@ -18,91 +18,51 @@ class ConfigProxy { */ ConfigValues values; using md_config_obs_t = ceph::md_config_obs_impl; - ObserverMgr obs_mgr; + using ObsMgr = ObserverMgr; + ObsMgr obs_mgr; md_config_t config; /** A lock that protects the md_config_t internals. It is * recursive, for simplicity. * It is best if this lock comes first in the lock hierarchy. We will * hold this lock when calling configuration observers. */ - mutable ceph::recursive_mutex lock = - ceph::make_recursive_mutex("ConfigProxy::lock"); + mutable ceph::mutex lock = ceph::make_mutex("ConfigProxy::lock"); + ceph::condition_variable cond; - class CallGate { - private: - uint32_t call_count = 0; - ceph::mutex lock; - ceph::condition_variable cond; - public: - CallGate() - : lock(ceph::make_mutex("call::gate::lock")) { - } + using rev_obs_map_t = ObsMgr::rev_obs_map; - void enter() { - std::lock_guard locker(lock); - ++call_count; - } - void leave() { - std::lock_guard locker(lock); - ceph_assert(call_count > 0); - if (--call_count == 0) { - cond.notify_all(); - } + void _call_observers(rev_obs_map_t& rev_obs) { + ceph_assert(!ceph::mutex_debugging || !ceph_mutex_is_locked_by_me(lock)); + for (auto& [obs, keys] : rev_obs) { + (*obs)->handle_conf_change(*this, keys); } - void close() { - std::unique_lock locker(lock); - while (call_count != 0) { - cond.wait(locker); - } + rev_obs.clear(); // drop shared_ptrs + { + std::lock_guard l{lock}; + cond.notify_all(); } - }; - - void call_gate_enter(md_config_obs_t *obs) { - auto p = obs_call_gate.find(obs); - ceph_assert(p != obs_call_gate.end()); - p->second->enter(); - } - void call_gate_leave(md_config_obs_t *obs) { - auto p = obs_call_gate.find(obs); - ceph_assert(p != obs_call_gate.end()); - p->second->leave(); - } - void call_gate_close(md_config_obs_t *obs) { - auto p = obs_call_gate.find(obs); - ceph_assert(p != obs_call_gate.end()); - p->second->close(); } - - using rev_obs_map_t = ObserverMgr::rev_obs_map; - typedef std::unique_ptr CallGateRef; - - std::map obs_call_gate; - - void call_observers(std::unique_lock& locker, - rev_obs_map_t& rev_obs) { - // observers are notified outside of lock - locker.unlock(); - for (auto& [obs, keys] : rev_obs) { - obs->handle_conf_change(*this, keys); - } - locker.lock(); - - for (auto& rev_ob : rev_obs) { - call_gate_leave(rev_ob.first); + void _gather_changes(std::set &changes, + rev_obs_map_t *rev_obs, std::ostream* oss) { + ceph_assert(ceph_mutex_is_locked_by_me(lock)); + std::map changes_present; + for (auto& change : changes) { + std::string dummy; + changes_present[change] = (0 == config.get_val(values, change, &dummy)); } + obs_mgr.for_each_change( + changes_present, + [this, rev_obs](auto obs, const std::string &key) { + _map_observer_changes(obs, key, rev_obs); + }, oss); + changes.clear(); } - void map_observer_changes(md_config_obs_t *obs, const std::string &key, + void _map_observer_changes(ObsMgr::config_obs_ptr obs, const std::string& key, rev_obs_map_t *rev_obs) { - ceph_assert(ceph_mutex_is_locked(lock)); + ceph_assert(ceph_mutex_is_locked_by_me(lock)); auto [it, new_entry] = rev_obs->emplace(obs, std::set{}); it->second.emplace(key); - if (new_entry) { - // this needs to be done under lock as once this lock is - // dropped (before calling observers) a remove_observer() - // can sneak in and cause havoc. - call_gate_enter(obs); - } } public: @@ -200,34 +160,39 @@ public: } // for those want to reexpand special meta, e.g, $pid void finalize_reexpand_meta() { - std::unique_lock locker(lock); rev_obs_map_t rev_obs; - if (config.finalize_reexpand_meta(values, obs_mgr)) { - _gather_changes(values.changed, &rev_obs, nullptr); + { + std::lock_guard locker(lock); + if (config.finalize_reexpand_meta(values, obs_mgr)) { + _gather_changes(values.changed, &rev_obs, nullptr); + } } - call_observers(locker, rev_obs); + _call_observers(rev_obs); } void add_observer(md_config_obs_t* obs) { std::lock_guard l(lock); obs_mgr.add_observer(obs); - obs_call_gate.emplace(obs, std::make_unique()); + cond.notify_all(); } void remove_observer(md_config_obs_t* obs) { - std::lock_guard l(lock); - call_gate_close(obs); - obs_call_gate.erase(obs); - obs_mgr.remove_observer(obs); + std::unique_lock l(lock); + auto wptr = obs_mgr.remove_observer(obs); + while (!wptr.expired()) { + cond.wait(l); + } } void call_all_observers() { - std::unique_lock locker(lock); rev_obs_map_t rev_obs; - obs_mgr.for_each_observer( - [this, &rev_obs](md_config_obs_t *obs, const std::string &key) { - map_observer_changes(obs, key, &rev_obs); - }); + { + std::lock_guard locker(lock); + obs_mgr.for_each_observer( + [this, &rev_obs](auto obs, const std::string& key) { + _map_observer_changes(obs, key, &rev_obs); + }); + } - call_observers(locker, rev_obs); + _call_observers(rev_obs); } void set_safe_to_start_threads() { std::lock_guard l(lock); @@ -255,25 +220,18 @@ public: } // Expand all metavariables. Make any pending observer callbacks. void apply_changes(std::ostream* oss) { - std::unique_lock locker(lock); rev_obs_map_t rev_obs; - // apply changes until the cluster name is assigned - if (!values.cluster.empty()) { - // meta expands could have modified anything. Copy it all out again. - _gather_changes(values.changed, &rev_obs, oss); + { + std::lock_guard locker(lock); + // apply changes until the cluster name is assigned + if (!values.cluster.empty()) { + // meta expands could have modified anything. Copy it all out again. + _gather_changes(values.changed, &rev_obs, oss); + } } - call_observers(locker, rev_obs); - } - void _gather_changes(std::set &changes, - rev_obs_map_t *rev_obs, std::ostream* oss) { - obs_mgr.for_each_change( - changes, *this, - [this, rev_obs](md_config_obs_t *obs, const std::string &key) { - map_observer_changes(obs, key, rev_obs); - }, oss); - changes.clear(); + _call_observers(rev_obs); } int set_val(const std::string_view key, const std::string& s, std::stringstream* err_ss=nullptr) { @@ -291,23 +249,27 @@ public: int set_mon_vals(CephContext *cct, const std::map>& kv, md_config_t::config_callback config_cb) { - std::unique_lock locker(lock); - int ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb); - + int ret; rev_obs_map_t rev_obs; - _gather_changes(values.changed, &rev_obs, nullptr); - call_observers(locker, rev_obs); + { + std::lock_guard locker(lock); + ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb); + _gather_changes(values.changed, &rev_obs, nullptr); + } + + _call_observers(rev_obs); return ret; } int injectargs(const std::string &s, std::ostream *oss) { - std::unique_lock locker(lock); - int ret = config.injectargs(values, obs_mgr, s, oss); - + int ret; rev_obs_map_t rev_obs; - _gather_changes(values.changed, &rev_obs, oss); - - call_observers(locker, rev_obs); + { + std::lock_guard locker(lock); + ret = config.injectargs(values, obs_mgr, s, oss); + _gather_changes(values.changed, &rev_obs, oss); + } + _call_observers(rev_obs); return ret; } void parse_env(unsigned entity_type, diff --git a/src/crimson/common/config_proxy.h b/src/crimson/common/config_proxy.h index 4c0e655075a..822db34f61a 100644 --- a/src/crimson/common/config_proxy.h +++ b/src/crimson/common/config_proxy.h @@ -54,13 +54,18 @@ class ConfigProxy : public seastar::peering_sharded_service // avoid racings with other do_change() calls in parallel. ObserverMgr::rev_obs_map rev_obs; owner.values.reset(new_values); - owner.obs_mgr.for_each_change(owner.values->changed, owner, - [&rev_obs](ConfigObserver *obs, + std::map changes_present; + for (const auto& change : owner.values->changed) { + std::string dummy; + changes_present[change] = owner.get_val(change, &dummy); + } + owner.obs_mgr.for_each_change(changes_present, + [&rev_obs](auto obs, const std::string &key) { rev_obs[obs].insert(key); }, nullptr); for (auto& [obs, keys] : rev_obs) { - obs->handle_conf_change(owner, keys); + (*obs)->handle_conf_change(owner, keys); } return seastar::parallel_for_each(boost::irange(1u, seastar::smp::count), @@ -70,13 +75,19 @@ class ConfigProxy : public seastar::peering_sharded_service proxy.values.reset(); proxy.values = std::move(foreign_values); + std::map changes_present; + for (const auto& change : proxy.values->changed) { + std::string dummy; + changes_present[change] = proxy.get_val(change, &dummy); + } + ObserverMgr::rev_obs_map rev_obs; - proxy.obs_mgr.for_each_change(proxy.values->changed, proxy, - [&rev_obs](ConfigObserver *obs, const std::string& key) { + proxy.obs_mgr.for_each_change(changes_present, + [&rev_obs](auto obs, const std::string& key) { rev_obs[obs].insert(key); }, nullptr); - for (auto& obs_keys : rev_obs) { - obs_keys.first->handle_conf_change(proxy, obs_keys.second); + for (auto& [obs, keys] : rev_obs) { + (*obs)->handle_conf_change(proxy, keys); } }); }).finally([new_values] { -- 2.39.5