]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
config: drop config_proxy::lock when invoking config observer
authorVenky Shankar <vshankar@redhat.com>
Wed, 18 Jul 2018 12:50:49 +0000 (08:50 -0400)
committerVenky Shankar <vshankar@redhat.com>
Thu, 13 Dec 2018 16:32:41 +0000 (11:32 -0500)
To prevent deadlocking when observer acquires its own locks
(lock order: config_proxy::lock -> foo::lock) and another
thread (say IO path) attempts to fetch an config value (lock
order: foo:lock -> config_proxy::lock).

Side effect of dropping locks when invoking config observer is
that remove_observer() can sneak in when the observer is still
in execution resulting in use-after-free. To mitigate this, any
in-progress observer call need to be completed before removing
the obserer. Also, remove_observer() needs to be invoked without
holding any observer locks so as to not run into deadlocks.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/common/config_obs_mgr.h
src/common/config_proxy.h
src/crimson/common/config_proxy.h

index 5240764524a1efd1634435d2986281c331774f5b..25a8b05ba4b5168a4a4cc75c17a5664b994aa215 100644 (file)
@@ -17,11 +17,11 @@ class ObserverMgr : public ConfigTracker {
   // Maps configuration options to the observer listening for them.
   using obs_map_t = std::multimap<std::string, ConfigObs*>;
   obs_map_t observers;
-  /* Maps observers to the configuration options that they care about which
-   * have changed. */
-  using rev_obs_map_t = std::map<ConfigObs*, std::set<std::string>>;
 
 public:
+  typedef std::map<ConfigObs*, std::set<std::string>> rev_obs_map;
+  typedef std::function<void(ConfigObs*, const std::string&)> config_gather_cb;
+
   // Adds a new observer to this configuration. You can do this at any time,
   // but it will only receive notifications for the changes that happen after
   // you attach it, obviously.
@@ -38,12 +38,13 @@ public:
   // This function will assert if you try to delete an observer that isn't
   // there.
   void remove_observer(ConfigObs* observer);
+  // invoke callback for every observers tracking keys
+  void for_each_observer(config_gather_cb callback);
+  // invoke callback for observers keys tracking the provided change set
   template<class ConfigProxyT>
-  void call_all_observers(const ConfigProxyT& proxy);
-  template<class ConfigProxyT>
-  void apply_changes(const std::set<std::string>& changes,
-                    const ConfigProxyT& proxy,
-                    std::ostream *oss);
+  void for_each_change(const std::set<std::string>& changes,
+                       ConfigProxyT& proxy,
+                       config_gather_cb callback, std::ostream *oss);
   bool is_tracking(const std::string& name) const override;
 };
 
@@ -80,27 +81,21 @@ void ObserverMgr<ConfigObs>::remove_observer(ConfigObs* observer)
 }
 
 template<class ConfigObs>
-template<class ConfigProxyT>
-void ObserverMgr<ConfigObs>::call_all_observers(const ConfigProxyT& proxy)
+void ObserverMgr<ConfigObs>::for_each_observer(config_gather_cb callback)
 {
-  rev_obs_map_t rev_obs;
   for (const auto& [key, obs] : observers) {
-    rev_obs[obs].insert(key);
-  }
-  for (auto& [obs, keys] : rev_obs) {
-    obs->handle_conf_change(proxy, keys);
+    callback(obs, key);
   }
 }
 
 template<class ConfigObs>
-template<class ConfigProxy>
-void ObserverMgr<ConfigObs>::apply_changes(const std::set<std::string>& changes,
-                                           const ConfigProxy& proxy,
-                                           std::ostream *oss)
+template<class ConfigProxyT>
+void ObserverMgr<ConfigObs>::for_each_change(const std::set<std::string>& changes,
+                                             ConfigProxyT& proxy,
+                                             config_gather_cb callback, std::ostream *oss)
 {
   // create the reverse observer mapping, mapping observers to the set of
   // changed keys that they'll get.
-  rev_obs_map_t robs;
   string val;
   for (auto& key : changes) {
     auto [first, last] = observers.equal_range(key);
@@ -111,14 +106,9 @@ void ObserverMgr<ConfigObs>::apply_changes(const std::set<std::string>& changes,
       }
     }
     for (auto r = first; r != last; ++r) {
-      auto obs = r->second;
-      robs[obs].insert(key);
+      callback(r->second, key);
     }
   }
-  // Make any pending observer callbacks
-  for (auto& [obs, keys] : robs) {
-    obs->handle_conf_change(proxy, keys);
-  }
 }
 
 template<class ConfigObs>
index 70e373f13ba09a22a76e16ad6d3af036bac7023d..9d645abbc557e9cbc267e94698354f07c16088a0 100644 (file)
@@ -31,6 +31,79 @@ class ConfigProxy {
   mutable ceph::recursive_mutex lock =
     ceph::make_recursive_mutex("ConfigProxy::lock");
 
+  class CallGate {
+  private:
+    uint32_t call_count = 0;
+    ceph::mutex lock;
+    ceph::condition_variable cond;
+  public:
+    CallGate()
+      : lock(ceph::make_mutex("call::gate::lock")) {
+    }
+
+    void enter() {
+      std::lock_guard<ceph::mutex> locker(lock);
+      ++call_count;
+    }
+    void leave() {
+      std::lock_guard<ceph::mutex> locker(lock);
+      ceph_assert(call_count > 0);
+      if (--call_count == 0) {
+        cond.notify_all();
+      }
+    }
+    void close() {
+      std::unique_lock<ceph::mutex> locker(lock);
+      while (call_count != 0) {
+        cond.wait(locker);
+      }
+    }
+  };
+
+  void call_gate_enter(md_config_obs_t *obs) {
+    auto p = obs_call_gate.find(obs);
+    ceph_assert(p != obs_call_gate.end());
+    p->second->enter();
+  }
+  void call_gate_leave(md_config_obs_t *obs) {
+    auto p = obs_call_gate.find(obs);
+    ceph_assert(p != obs_call_gate.end());
+    p->second->leave();
+  }
+  void call_gate_close(md_config_obs_t *obs) {
+    auto p = obs_call_gate.find(obs);
+    ceph_assert(p != obs_call_gate.end());
+    p->second->close();
+  }
+
+  using rev_obs_map_t = ObserverMgr<md_config_obs_t>::rev_obs_map;
+  typedef std::unique_ptr<CallGate> CallGateRef;
+
+  std::map<md_config_obs_t*, CallGateRef> obs_call_gate;
+
+  void call_observers(rev_obs_map_t &rev_obs) {
+    for (auto& [obs, keys] : rev_obs) {
+      obs->handle_conf_change(*this, keys);
+      // this can be done outside the lock as call_gate_enter()
+      // and remove_observer() are serialized via lock
+      call_gate_leave(obs);
+    }
+  }
+
+  void map_observer_changes(md_config_obs_t *obs, const std::string &key,
+                            rev_obs_map_t *rev_obs) {
+    ceph_assert(ceph_mutex_is_locked(lock));
+
+    auto [it, new_entry] = rev_obs->emplace(obs, std::set<std::string>{});
+    it->second.emplace(key);
+    if (new_entry) {
+      // this needs to be done under lock as once this lock is
+      // dropped (before calling observers) a remove_observer()
+      // can sneak in and cause havoc.
+      call_gate_enter(obs);
+    }
+  }
+
 public:
   explicit ConfigProxy(bool is_daemon)
     : config{values, obs_mgr, is_daemon}
@@ -111,31 +184,39 @@ public:
   }
   // for those want to reexpand special meta, e.g, $pid
   void finalize_reexpand_meta() {
-    std::lock_guard l(lock);
-    if (config.finalize_reexpand_meta(values, obs_mgr)) {
-      obs_mgr.apply_changes(values.changed, *this, nullptr);
-      values.changed.clear();
+    rev_obs_map_t rev_obs;
+    {
+      std::lock_guard l(lock);
+      if (config.finalize_reexpand_meta(values, obs_mgr)) {
+        _gather_changes(values.changed, &rev_obs, nullptr);
+        values.changed.clear();
+      }
     }
+
+    call_observers(rev_obs);
   }
   void add_observer(md_config_obs_t* obs) {
     std::lock_guard l(lock);
     obs_mgr.add_observer(obs);
+    obs_call_gate.emplace(obs, std::make_unique<CallGate>());
   }
   void remove_observer(md_config_obs_t* obs) {
     std::lock_guard l(lock);
+    call_gate_close(obs);
+    obs_call_gate.erase(obs);
     obs_mgr.remove_observer(obs);
   }
   void call_all_observers() {
-    std::lock_guard l(lock);
-    // Have the scope of the lock extend to the scope of
-    // handle_conf_change since that function expects to be called with
-    // the lock held. (And the comment in config.h says that is the
-    // expected behavior.)
-    //
-    // An alternative might be to pass a std::unique_lock to
-    // handle_conf_change and have a version of get_var that can take it
-    // by reference and lock as appropriate.
-    obs_mgr.call_all_observers(*this);
+    rev_obs_map_t rev_obs;
+    {
+      std::lock_guard l(lock);
+      obs_mgr.for_each_observer(
+        [this, &rev_obs](md_config_obs_t *obs, const std::string &key) {
+          map_observer_changes(obs, key, &rev_obs);
+        });
+    }
+
+    call_observers(rev_obs);
   }
   void set_safe_to_start_threads() {
     config.set_safe_to_start_threads();
@@ -161,13 +242,26 @@ public:
   }
   // Expand all metavariables. Make any pending observer callbacks.
   void apply_changes(std::ostream* oss) {
-    std::lock_guard l{lock};
-    // apply changes until the cluster name is assigned
-    if (!values.cluster.empty()) {
-      // meta expands could have modified anything.  Copy it all out again.
-      obs_mgr.apply_changes(values.changed, *this, oss);
-      values.changed.clear();
+    rev_obs_map_t rev_obs;
+    {
+      std::lock_guard l{lock};
+      // apply changes until the cluster name is assigned
+      if (!values.cluster.empty()) {
+        // meta expands could have modified anything.  Copy it all out again.
+        _gather_changes(values.changed, &rev_obs, oss);
+        values.changed.clear();
+      }
     }
+
+    call_observers(rev_obs);
+  }
+  void _gather_changes(std::set<std::string> &changes,
+                       rev_obs_map_t *rev_obs, std::ostream* oss) {
+    obs_mgr.for_each_change(
+      changes, *this,
+      [this, rev_obs](md_config_obs_t *obs, const std::string &key) {
+        map_observer_changes(obs, key, rev_obs);
+      }, oss);
   }
   int set_val(const std::string& key, const std::string& s,
               std::stringstream* err_ss=nullptr) {
@@ -185,17 +279,29 @@ public:
   int set_mon_vals(CephContext *cct,
                   const map<std::string,std::string>& kv,
                   md_config_t::config_callback config_cb) {
-    std::lock_guard l{lock};
-    int ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb);
-    obs_mgr.apply_changes(values.changed, *this, nullptr);
-    values.changed.clear();
+    int ret;
+    rev_obs_map_t rev_obs;
+    {
+      std::lock_guard l{lock};
+      ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb);
+      _gather_changes(values.changed, &rev_obs, nullptr);
+      values.changed.clear();
+    }
+
+    call_observers(rev_obs);
     return ret;
   }
   int injectargs(const std::string &s, std::ostream *oss) {
-    std::lock_guard l{lock};
-    int ret = config.injectargs(values, obs_mgr, s, oss);
-    obs_mgr.apply_changes(values.changed, *this, oss);
-    values.changed.clear();
+    int ret;
+    rev_obs_map_t rev_obs;
+    {
+      std::lock_guard l{lock};
+      ret = config.injectargs(values, obs_mgr, s, oss);
+      _gather_changes(values.changed, &rev_obs, oss);
+      values.changed.clear();
+    }
+
+    call_observers(rev_obs);
     return ret;
   }
   void parse_env(const char *env_var = "CEPH_ARGS") {
index 6a63f30f53bdc6eb75c3660be266e72f35fb203a..5cda295e213ce943dd600ff677aeba235ee484f9 100644 (file)
@@ -47,9 +47,16 @@ class ConfigProxy : public seastar::peering_sharded_service<ConfigProxy>
 
       // always apply the new settings synchronously on the owner shard, to
       // avoid racings with other do_change() calls in parallel.
+      ObserverMgr<ConfigObserver>::rev_obs_map rev_obs;
       owner.values.reset(new_values);
-      owner.obs_mgr.apply_changes(owner.values->changed,
-                                  owner, nullptr);
+      owner.obs_mgr.for_each_change(owner.values->changed, owner,
+                                    [&rev_obs](ConfigObserver *obs,
+                                               const std::string &key) {
+                                      rev_obs[obs].insert(key);
+                                    }, nullptr);
+      for (auto& [obs, keys] : rev_obs) {
+        obs->handle_conf_change(owner, keys);
+      }
 
       return seastar::parallel_for_each(boost::irange(1u, seastar::smp::count),
                                         [&owner, new_values] (auto cpu) {
@@ -57,8 +64,16 @@ class ConfigProxy : public seastar::peering_sharded_service<ConfigProxy>
           [foreign_values = seastar::make_foreign(new_values)](ConfigProxy& proxy) mutable {
             proxy.values.reset();
             proxy.values = std::move(foreign_values);
-            proxy.obs_mgr.apply_changes(proxy.values->changed,
-                                        proxy, nullptr);
+
+            ObserverMgr<ConfigObserver>::rev_obs_map rev_obs;
+            proxy.obs_mgr.for_each_change(proxy.values->changed, proxy,
+                                          [&rev_obs](md_config_obs_t *obs,
+                                                     const std::string &key) {
+                                            rev_obs[obs].insert(key);
+                                          }, nullptr);
+            for (auto& [obs, keys] : rev_obs) {
+              obs->handle_conf_change(proxy, keys);
+            }
           });
         }).finally([new_values] {
           new_values->changed.clear();