]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
common: resolve config proxy deadlock using refcounted pointers
authorPatrick Donnelly <pdonnell@redhat.com>
Wed, 20 Sep 2023 20:57:01 +0000 (16:57 -0400)
committerPatrick Donnelly <pdonnell@redhat.com>
Mon, 6 Nov 2023 20:28:43 +0000 (15:28 -0500)
7e8c683 introduced some gymnastics with a "CallGate" to maintain a count for
each observer we may be "calling into" with a config change (namely:
handle_conf_change). This was to prevent remove_observer coming in and deleting
the observer in the middle of the call. More importantly, it was to avoid
holding the lock while traversing the observers so that the config_proxy lock
can be dropped while calling handle_conf_change. This is important as e.g. the
MDS may attempt to acquire the config_proxy lock in its
MDSRank::handle_conf_change method (what prompted the change).

However, this introduces a new deadlock:

- Thread 2 acquires the config_proxy lock and then removes an observer. It blocks
  waiting for the observer's CallGate to close.

- Thread 1 had dropped the config_proxy lock while traversing the observers to call each
  observer's handle_conf_change method. Those methods may attempt to reacquire the
  config_proxy lock. This creates the deadlock as it's waiting for Thread 2 to drop the lock
  while Thread 1 cannot release the CallGate.

The solution, I believe, is to properly refcount "uses" of the observers for the purposes
of flushing these changes. Use std::shared_ptr to effect this.

Reproducing this is fairly simply with several parallel calls to `config set`.
During the course of executing `config set`, the Objecter may receive config
updates that will be flushed and potentially race with cleanup of observers
during shutdown.

Fixes: https://tracker.ceph.com/issues/62832
Partial-revert: 7e8c683
Partial-revert: 4458a72
Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
(cherry picked from commit 0c70dd8e39cc3d0cdef8bbcc8a0c6f214e54c770)

src/common/config_obs_mgr.h
src/common/config_proxy.h
src/crimson/common/config_proxy.h

index 06b3cf934a53b1500d57a0d255823b6de7b3dce6..759930df92d9bc37954ed6865a255eb864c5513f 100644 (file)
@@ -14,13 +14,11 @@ class ConfigValues;
 // the changes of settings at runtime.
 template<class ConfigObs>
 class ObserverMgr : public ConfigTracker {
-  // Maps configuration options to the observer listening for them.
-  using obs_map_t = std::multimap<std::string, ConfigObs*>;
-  obs_map_t observers;
-
 public:
-  typedef std::map<ConfigObs*, std::set<std::string>> rev_obs_map;
-  typedef std::function<void(ConfigObs*, const std::string&)> config_gather_cb;
+  using config_obs_ptr = std::shared_ptr<ConfigObs*>;
+  using config_obs_wptr = std::weak_ptr<ConfigObs*>;
+  typedef std::map<config_obs_ptr, std::set<std::string>> rev_obs_map;
+  typedef std::function<void(config_obs_ptr, const std::string&)> config_gather_cb;
 
   // Adds a new observer to this configuration. You can do this at any time,
   // but it will only receive notifications for the changes that happen after
@@ -37,15 +35,18 @@ public:
   // you need to delete it yourself.
   // This function will assert if you try to delete an observer that isn't
   // there.
-  void remove_observer(ConfigObs* observer);
+  config_obs_wptr remove_observer(ConfigObs* observer);
   // invoke callback for every observers tracking keys
   void for_each_observer(config_gather_cb callback);
   // invoke callback for observers keys tracking the provided change set
-  template<class ConfigProxyT>
-  void for_each_change(const std::set<std::string>& changes,
-                       ConfigProxyT& proxy,
+  void for_each_change(const std::map<std::string,bool>& changes,
                        config_gather_cb callback, std::ostream *oss);
   bool is_tracking(const std::string& name) const override;
+
+private:
+  // Maps configuration options to the observer listening for them.
+  using obs_map_t = std::multimap<std::string, config_obs_ptr>;
+  obs_map_t observers;
 };
 
 // we could put the implementations in a .cc file, and only instantiate the
@@ -60,17 +61,20 @@ template<class ConfigObs>
 void ObserverMgr<ConfigObs>::add_observer(ConfigObs* observer)
 {
   const char **keys = observer->get_tracked_conf_keys();
+  auto ptr = std::make_shared<ConfigObs*>(observer);
   for (const char ** k = keys; *k; ++k) {
-    observers.emplace(*k, observer);
+    observers.emplace(*k, ptr);
   }
 }
 
 template<class ConfigObs>
-void ObserverMgr<ConfigObs>::remove_observer(ConfigObs* observer)
+typename ObserverMgr<ConfigObs>::config_obs_wptr ObserverMgr<ConfigObs>::remove_observer(ConfigObs* observer)
 {
   [[maybe_unused]] bool found_obs = false;
+  config_obs_ptr ptr;
   for (auto o = observers.begin(); o != observers.end(); ) {
-    if (o->second == observer) {
+    if (*o->second == observer) {
+      ptr = std::move(o->second);
       observers.erase(o++);
       found_obs = true;
     } else {
@@ -78,6 +82,7 @@ void ObserverMgr<ConfigObs>::remove_observer(ConfigObs* observer)
     }
   }
   ceph_assert(found_obs);
+  return config_obs_wptr(ptr);
 }
 
 template<class ConfigObs>
@@ -89,17 +94,15 @@ void ObserverMgr<ConfigObs>::for_each_observer(config_gather_cb callback)
 }
 
 template<class ConfigObs>
-template<class ConfigProxyT>
-void ObserverMgr<ConfigObs>::for_each_change(const std::set<std::string>& changes,
-                                             ConfigProxyT& proxy,
+void ObserverMgr<ConfigObs>::for_each_change(const std::map<std::string,bool>& changes,
                                              config_gather_cb callback, std::ostream *oss)
 {
   // create the reverse observer mapping, mapping observers to the set of
   // changed keys that they'll get.
   std::string val;
-  for (auto& key : changes) {
+  for (auto& [key, present] : changes) {
     auto [first, last] = observers.equal_range(key);
-    if ((oss) && !proxy.get_val(key, &val)) {
+    if ((oss) && present) {
       (*oss) << key << " = '" << val << "' ";
       if (first == last) {
         (*oss) << "(not observed, change may require restart) ";
index 400aa4ed052d6046efb95c447d771fb93acd45fa..b9b47d9cef472f7477f44ba6e7bb8b3434470f22 100644 (file)
@@ -18,91 +18,51 @@ class ConfigProxy {
    */
   ConfigValues values;
   using md_config_obs_t = ceph::md_config_obs_impl<ConfigProxy>;
-  ObserverMgr<md_config_obs_t> obs_mgr;
+  using ObsMgr = ObserverMgr<md_config_obs_t>;
+  ObsMgr obs_mgr;
   md_config_t config;
   /** A lock that protects the md_config_t internals. It is
    * recursive, for simplicity.
    * It is best if this lock comes first in the lock hierarchy. We will
    * hold this lock when calling configuration observers.  */
-  mutable ceph::recursive_mutex lock =
-    ceph::make_recursive_mutex("ConfigProxy::lock");
+  mutable ceph::mutex lock = ceph::make_mutex("ConfigProxy::lock");
+  ceph::condition_variable cond;
 
-  class CallGate {
-  private:
-    uint32_t call_count = 0;
-    ceph::mutex lock;
-    ceph::condition_variable cond;
-  public:
-    CallGate()
-      : lock(ceph::make_mutex("call::gate::lock")) {
-    }
+  using rev_obs_map_t = ObsMgr::rev_obs_map;
 
-    void enter() {
-      std::lock_guard<ceph::mutex> locker(lock);
-      ++call_count;
-    }
-    void leave() {
-      std::lock_guard<ceph::mutex> locker(lock);
-      ceph_assert(call_count > 0);
-      if (--call_count == 0) {
-        cond.notify_all();
-      }
+  void _call_observers(rev_obs_map_t& rev_obs) {
+    ceph_assert(!ceph::mutex_debugging || !ceph_mutex_is_locked_by_me(lock));
+    for (auto& [obs, keys] : rev_obs) {
+      (*obs)->handle_conf_change(*this, keys);
     }
-    void close() {
-      std::unique_lock<ceph::mutex> locker(lock);
-      while (call_count != 0) {
-        cond.wait(locker);
-      }
+    rev_obs.clear(); // drop shared_ptrs
+    {
+      std::lock_guard l{lock};
+      cond.notify_all();
     }
-  };
-
-  void call_gate_enter(md_config_obs_t *obs) {
-    auto p = obs_call_gate.find(obs);
-    ceph_assert(p != obs_call_gate.end());
-    p->second->enter();
-  }
-  void call_gate_leave(md_config_obs_t *obs) {
-    auto p = obs_call_gate.find(obs);
-    ceph_assert(p != obs_call_gate.end());
-    p->second->leave();
-  }
-  void call_gate_close(md_config_obs_t *obs) {
-    auto p = obs_call_gate.find(obs);
-    ceph_assert(p != obs_call_gate.end());
-    p->second->close();
   }
-
-  using rev_obs_map_t = ObserverMgr<md_config_obs_t>::rev_obs_map;
-  typedef std::unique_ptr<CallGate> CallGateRef;
-
-  std::map<md_config_obs_t*, CallGateRef> obs_call_gate;
-
-  void call_observers(std::unique_lock<ceph::recursive_mutex>& locker,
-                      rev_obs_map_t& rev_obs) {
-    // observers are notified outside of lock
-    locker.unlock();
-    for (auto& [obs, keys] : rev_obs) {
-      obs->handle_conf_change(*this, keys);
-    }
-    locker.lock();
-
-    for (auto& rev_ob : rev_obs) {
-      call_gate_leave(rev_ob.first);
+  void _gather_changes(std::set<std::string> &changes,
+                       rev_obs_map_t *rev_obs, std::ostream* oss) {
+    ceph_assert(ceph_mutex_is_locked_by_me(lock));
+    std::map<std::string,bool> changes_present;
+    for (auto& change : changes) {
+      std::string dummy;
+      changes_present[change] = (0 == config.get_val(values, change, &dummy));
     }
+    obs_mgr.for_each_change(
+      changes_present,
+      [this, rev_obs](auto obs, const std::string &key) {
+        _map_observer_changes(obs, key, rev_obs);
+      }, oss);
+    changes.clear();
   }
 
-  void map_observer_changes(md_config_obs_t *obs, const std::string &key,
+  void _map_observer_changes(ObsMgr::config_obs_ptr obs, const std::string& key,
                             rev_obs_map_t *rev_obs) {
-    ceph_assert(ceph_mutex_is_locked(lock));
+    ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
     auto [it, new_entry] = rev_obs->emplace(obs, std::set<std::string>{});
     it->second.emplace(key);
-    if (new_entry) {
-      // this needs to be done under lock as once this lock is
-      // dropped (before calling observers) a remove_observer()
-      // can sneak in and cause havoc.
-      call_gate_enter(obs);
-    }
   }
 
 public:
@@ -200,34 +160,39 @@ public:
   }
   // for those want to reexpand special meta, e.g, $pid
   void finalize_reexpand_meta() {
-    std::unique_lock locker(lock);
     rev_obs_map_t rev_obs;
-    if (config.finalize_reexpand_meta(values, obs_mgr)) {
-      _gather_changes(values.changed, &rev_obs, nullptr);
+    {
+      std::lock_guard locker(lock);
+      if (config.finalize_reexpand_meta(values, obs_mgr)) {
+        _gather_changes(values.changed, &rev_obs, nullptr);
+      }
     }
 
-    call_observers(locker, rev_obs);
+    _call_observers(rev_obs);
   }
   void add_observer(md_config_obs_t* obs) {
     std::lock_guard l(lock);
     obs_mgr.add_observer(obs);
-    obs_call_gate.emplace(obs, std::make_unique<CallGate>());
+    cond.notify_all();
   }
   void remove_observer(md_config_obs_t* obs) {
-    std::lock_guard l(lock);
-    call_gate_close(obs);
-    obs_call_gate.erase(obs);
-    obs_mgr.remove_observer(obs);
+    std::unique_lock l(lock);
+    auto wptr = obs_mgr.remove_observer(obs);
+    while (!wptr.expired()) {
+      cond.wait(l);
+    }
   }
   void call_all_observers() {
-    std::unique_lock locker(lock);
     rev_obs_map_t rev_obs;
-    obs_mgr.for_each_observer(
-      [this, &rev_obs](md_config_obs_t *obs, const std::string &key) {
-        map_observer_changes(obs, key, &rev_obs);
-      });
+    {
+      std::lock_guard locker(lock);
+      obs_mgr.for_each_observer(
+        [this, &rev_obs](auto obs, const std::string& key) {
+          _map_observer_changes(obs, key, &rev_obs);
+        });
+    }
 
-    call_observers(locker, rev_obs);
+    _call_observers(rev_obs);
   }
   void set_safe_to_start_threads() {
     std::lock_guard l(lock);
@@ -255,25 +220,18 @@ public:
   }
   // Expand all metavariables. Make any pending observer callbacks.
   void apply_changes(std::ostream* oss) {
-    std::unique_lock locker(lock);
     rev_obs_map_t rev_obs;
 
-    // apply changes until the cluster name is assigned
-    if (!values.cluster.empty()) {
-      // meta expands could have modified anything.  Copy it all out again.
-      _gather_changes(values.changed, &rev_obs, oss);
+    {
+      std::lock_guard locker(lock);
+      // apply changes until the cluster name is assigned
+      if (!values.cluster.empty()) {
+        // meta expands could have modified anything.  Copy it all out again.
+        _gather_changes(values.changed, &rev_obs, oss);
+      }
     }
 
-    call_observers(locker, rev_obs);
-  }
-  void _gather_changes(std::set<std::string> &changes,
-                       rev_obs_map_t *rev_obs, std::ostream* oss) {
-    obs_mgr.for_each_change(
-      changes, *this,
-      [this, rev_obs](md_config_obs_t *obs, const std::string &key) {
-        map_observer_changes(obs, key, rev_obs);
-      }, oss);
-      changes.clear();
+    _call_observers(rev_obs);
   }
   int set_val(const std::string_view key, const std::string& s,
               std::stringstream* err_ss=nullptr) {
@@ -291,23 +249,27 @@ public:
   int set_mon_vals(CephContext *cct,
                   const std::map<std::string,std::string,std::less<>>& kv,
                   md_config_t::config_callback config_cb) {
-    std::unique_lock locker(lock);
-    int ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb);
-
+    int ret;
     rev_obs_map_t rev_obs;
-    _gather_changes(values.changed, &rev_obs, nullptr);
 
-    call_observers(locker, rev_obs);
+    {
+      std::lock_guard locker(lock);
+      ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb);
+      _gather_changes(values.changed, &rev_obs, nullptr);
+    }
+
+    _call_observers(rev_obs);
     return ret;
   }
   int injectargs(const std::string &s, std::ostream *oss) {
-    std::unique_lock locker(lock);
-    int ret = config.injectargs(values, obs_mgr, s, oss);
-
+    int ret;
     rev_obs_map_t rev_obs;
-    _gather_changes(values.changed, &rev_obs, oss);
-
-    call_observers(locker, rev_obs);
+    {
+      std::lock_guard locker(lock);
+      ret = config.injectargs(values, obs_mgr, s, oss);
+      _gather_changes(values.changed, &rev_obs, oss);
+    }
+    _call_observers(rev_obs);
     return ret;
   }
   void parse_env(unsigned entity_type,
index 4c0e655075adb92f1ffc5cd5879dd5b99bf0fc18..822db34f61a4fd626a56ffaff8bd3ab9268bf0e4 100644 (file)
@@ -54,13 +54,18 @@ class ConfigProxy : public seastar::peering_sharded_service<ConfigProxy>
       // avoid racings with other do_change() calls in parallel.
       ObserverMgr<ConfigObserver>::rev_obs_map rev_obs;
       owner.values.reset(new_values);
-      owner.obs_mgr.for_each_change(owner.values->changed, owner,
-                                    [&rev_obs](ConfigObserver *obs,
+      std::map<std::string, bool> changes_present;
+      for (const auto& change : owner.values->changed) {
+        std::string dummy;
+        changes_present[change] = owner.get_val(change, &dummy);
+      }
+      owner.obs_mgr.for_each_change(changes_present,
+                                    [&rev_obs](auto obs,
                                                const std::string &key) {
                                       rev_obs[obs].insert(key);
                                     }, nullptr);
       for (auto& [obs, keys] : rev_obs) {
-        obs->handle_conf_change(owner, keys);
+        (*obs)->handle_conf_change(owner, keys);
       }
 
       return seastar::parallel_for_each(boost::irange(1u, seastar::smp::count),
@@ -70,13 +75,19 @@ class ConfigProxy : public seastar::peering_sharded_service<ConfigProxy>
             proxy.values.reset();
             proxy.values = std::move(foreign_values);
 
+            std::map<std::string, bool> changes_present;
+            for (const auto& change : proxy.values->changed) {
+              std::string dummy;
+              changes_present[change] = proxy.get_val(change, &dummy);
+            }
+
             ObserverMgr<ConfigObserver>::rev_obs_map rev_obs;
-            proxy.obs_mgr.for_each_change(proxy.values->changed, proxy,
-              [&rev_obs](ConfigObserver *obs, const std::string& key) {
+            proxy.obs_mgr.for_each_change(changes_present,
+              [&rev_obs](auto obs, const std::string& key) {
                 rev_obs[obs].insert(key);
               }, nullptr);
-            for (auto& obs_keys : rev_obs) {
-              obs_keys.first->handle_conf_change(proxy, obs_keys.second);
+            for (auto& [obs, keys] : rev_obs) {
+              (*obs)->handle_conf_change(proxy, keys);
             }
           });
         }).finally([new_values] {