]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
config: drop config_proxy::lock when invoking config observer
authorVenky Shankar <vshankar@redhat.com>
Wed, 18 Jul 2018 12:50:49 +0000 (08:50 -0400)
committerVenky Shankar <vshankar@redhat.com>
Tue, 8 Jan 2019 08:45:04 +0000 (03:45 -0500)
To prevent deadlocking when observer acquires its own locks
(lock order: config_proxy::lock -> foo::lock) and another
thread (say IO path) attempts to fetch an config value (lock
order: foo:lock -> config_proxy::lock).

Side effect of dropping locks when invoking config observer is
that remove_observer() can sneak in when the observer is still
in execution resulting in use-after-free. To mitigate this, any
in-progress observer call need to be completed before removing
the obserer. Also, remove_observer() needs to be invoked without
holding any observer locks so as to not run into deadlocks.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit 909c792457f9e8efaf2e53c968afbcf96cbd03e3)

 Conflicts:
src/common/config_obs_mgr.h
src/common/config_proxy.h
crimson/common/config_proxy.h

sources config_obs_mgr.h, config_proxy.h and crimson bits are not in
luminous, therfore the appropriate logic is moved to config.{h,cc}.

src/common/Cond.h
src/common/CondVar.h [new file with mode: 0644]
src/common/config.cc
src/common/config.h

index aa53b60f2349e16ff488b9fcf6f827d564412041..1777827e3691a3cbc318e455d3b1930e5c8744c7 100644 (file)
 #define CEPH_COND_H
 
 #include "include/Context.h"
-
-class Cond {
-  // my bits
-  pthread_cond_t _c;
-
-  Mutex *waiter_mutex;
-
-  // don't allow copying.
-  void operator=(Cond &C);
-  Cond(const Cond &C);
-
- public:
-  Cond() : waiter_mutex(NULL) {
-    int r = pthread_cond_init(&_c,NULL);
-    assert(r == 0);
-  }
-  virtual ~Cond() { 
-    pthread_cond_destroy(&_c); 
-  }
-
-  int Wait(Mutex &mutex)  { 
-    // make sure this cond is used with one mutex only
-    assert(waiter_mutex == NULL || waiter_mutex == &mutex);
-    waiter_mutex = &mutex;
-
-    assert(mutex.is_locked());
-
-    mutex._pre_unlock();
-    int r = pthread_cond_wait(&_c, &mutex._m);
-    mutex._post_lock();
-    return r;
-  }
-
-  int WaitUntil(Mutex &mutex, utime_t when) {
-    // make sure this cond is used with one mutex only
-    assert(waiter_mutex == NULL || waiter_mutex == &mutex);
-    waiter_mutex = &mutex;
-
-    assert(mutex.is_locked());
-
-    struct timespec ts;
-    when.to_timespec(&ts);
-
-    mutex._pre_unlock();
-    int r = pthread_cond_timedwait(&_c, &mutex._m, &ts);
-    mutex._post_lock();
-
-    return r;
-  }
-
-  int WaitInterval(Mutex &mutex, utime_t interval) {
-    utime_t when = ceph_clock_now();
-    when += interval;
-    return WaitUntil(mutex, when);
-  }
-
-  template<typename Duration>
-  int WaitInterval(Mutex &mutex, Duration interval) {
-    ceph::real_time when(ceph::real_clock::now());
-    when += interval;
-
-    struct timespec ts = ceph::real_clock::to_timespec(when);
-
-    mutex._pre_unlock();
-    int r = pthread_cond_timedwait(&_c, &mutex._m, &ts);
-    mutex._post_lock();
-
-    return r;
-  }
-
-  int SloppySignal() { 
-    int r = pthread_cond_broadcast(&_c);
-    return r;
-  }
-  int Signal() { 
-    // make sure signaler is holding the waiter's lock.
-    assert(waiter_mutex == NULL ||
-          waiter_mutex->is_locked());
-
-    int r = pthread_cond_broadcast(&_c);
-    return r;
-  }
-  int SignalOne() { 
-    // make sure signaler is holding the waiter's lock.
-    assert(waiter_mutex == NULL ||
-          waiter_mutex->is_locked());
-
-    int r = pthread_cond_signal(&_c);
-    return r;
-  }
-  int SignalAll() { 
-    // make sure signaler is holding the waiter's lock.
-    assert(waiter_mutex == NULL ||
-          waiter_mutex->is_locked());
-
-    int r = pthread_cond_broadcast(&_c);
-    return r;
-  }
-};
+#include "CondVar.h"
 
 /**
  * context to signal a cond
diff --git a/src/common/CondVar.h b/src/common/CondVar.h
new file mode 100644 (file)
index 0000000..c193b99
--- /dev/null
@@ -0,0 +1,109 @@
+#ifndef CEPH_COND_VAR_H
+#define CEPH_COND_VAR_H
+
+#include "include/utime.h"
+
+#include "Clock.h"
+#include "Mutex.h"
+#include "pthread.h"
+
+class Cond {
+  // my bits
+  pthread_cond_t _c;
+
+  Mutex *waiter_mutex;
+
+  // don't allow copying.
+  void operator=(Cond &C);
+  Cond(const Cond &C);
+
+ public:
+  Cond() : waiter_mutex(NULL) {
+    int r = pthread_cond_init(&_c,NULL);
+    assert(r == 0);
+  }
+  virtual ~Cond() { 
+    pthread_cond_destroy(&_c); 
+  }
+
+  int Wait(Mutex &mutex)  { 
+    // make sure this cond is used with one mutex only
+    assert(waiter_mutex == NULL || waiter_mutex == &mutex);
+    waiter_mutex = &mutex;
+
+    assert(mutex.is_locked());
+
+    mutex._pre_unlock();
+    int r = pthread_cond_wait(&_c, &mutex._m);
+    mutex._post_lock();
+    return r;
+  }
+
+  int WaitUntil(Mutex &mutex, utime_t when) {
+    // make sure this cond is used with one mutex only
+    assert(waiter_mutex == NULL || waiter_mutex == &mutex);
+    waiter_mutex = &mutex;
+
+    assert(mutex.is_locked());
+
+    struct timespec ts;
+    when.to_timespec(&ts);
+
+    mutex._pre_unlock();
+    int r = pthread_cond_timedwait(&_c, &mutex._m, &ts);
+    mutex._post_lock();
+
+    return r;
+  }
+
+  int WaitInterval(Mutex &mutex, utime_t interval) {
+    utime_t when = ceph_clock_now();
+    when += interval;
+    return WaitUntil(mutex, when);
+  }
+
+  template<typename Duration>
+  int WaitInterval(Mutex &mutex, Duration interval) {
+    ceph::real_time when(ceph::real_clock::now());
+    when += interval;
+
+    struct timespec ts = ceph::real_clock::to_timespec(when);
+
+    mutex._pre_unlock();
+    int r = pthread_cond_timedwait(&_c, &mutex._m, &ts);
+    mutex._post_lock();
+
+    return r;
+  }
+
+  int SloppySignal() { 
+    int r = pthread_cond_broadcast(&_c);
+    return r;
+  }
+  int Signal() { 
+    // make sure signaler is holding the waiter's lock.
+    assert(waiter_mutex == NULL ||
+          waiter_mutex->is_locked());
+
+    int r = pthread_cond_broadcast(&_c);
+    return r;
+  }
+  int SignalOne() { 
+    // make sure signaler is holding the waiter's lock.
+    assert(waiter_mutex == NULL ||
+          waiter_mutex->is_locked());
+
+    int r = pthread_cond_signal(&_c);
+    return r;
+  }
+  int SignalAll() { 
+    // make sure signaler is holding the waiter's lock.
+    assert(waiter_mutex == NULL ||
+          waiter_mutex->is_locked());
+
+    int r = pthread_cond_broadcast(&_c);
+    return r;
+  }
+};
+
+#endif // CEPH_COND_VAR_H
index b3a98a595f2e4089bf9fb017e0a880ca8b9ed446..ef348f95d5c3d86d2a2b0f6b5c7211bb455bae13 100644 (file)
@@ -20,6 +20,7 @@
 #include "osd/osd_types.h"
 #include "common/errno.h"
 #include "common/hostname.h"
+#include "common/backport14.h"
 
 #include <boost/type_traits.hpp>
 
@@ -197,11 +198,16 @@ void md_config_t::add_observer(md_config_obs_t* observer_)
     obs_map_t::value_type val(*k, observer_);
     observers.insert(val);
   }
+  obs_call_gate.emplace(observer_, ceph::make_unique<CallGate>());
 }
 
 void md_config_t::remove_observer(md_config_obs_t* observer_)
 {
   Mutex::Locker l(lock);
+
+  call_gate_close(observer_);
+  obs_call_gate.erase(observer_);
+
   bool found_obs = false;
   for (obs_map_t::iterator o = observers.begin(); o != observers.end(); ) {
     if (o->second == observer_) {
@@ -665,12 +671,21 @@ int md_config_t::parse_injectargs(std::vector<const char*>& args,
 
 void md_config_t::apply_changes(std::ostream *oss)
 {
-  Mutex::Locker l(lock);
-  /*
-   * apply changes until the cluster name is assigned
-   */
-  if (cluster.size())
-    _apply_changes(oss);
+  rev_obs_map_t rev_obs;
+  {
+    Mutex::Locker l(lock);
+    /*
+     * apply changes until the cluster name is assigned
+     */
+    if (cluster.size()) {
+      for_each_change(
+        oss, [this, &rev_obs](md_config_obs_t *obs, const std::string &key) {
+          map_observer_changes(obs, key, &rev_obs);
+        });
+    }
+  }
+
+  call_observers(rev_obs);
 }
 
 bool md_config_t::_internal_field(const string& s)
@@ -680,12 +695,8 @@ bool md_config_t::_internal_field(const string& s)
   return false;
 }
 
-void md_config_t::_apply_changes(std::ostream *oss)
+void md_config_t::for_each_change(std::ostream *oss, config_gather_cb callback)
 {
-  /* Maps observers to the configuration options that they care about which
-   * have changed. */
-  typedef std::map < md_config_obs_t*, std::set <std::string> > rev_obs_map_t;
-
   expand_all_meta();
 
   // expand_all_meta could have modified anything.  Copy it all out again.
@@ -697,9 +708,6 @@ void md_config_t::_apply_changes(std::ostream *oss)
     update_legacy_val(option, ptr);
   }
 
-  // create the reverse observer mapping, mapping observers to the set of
-  // changed keys that they'll get.
-  rev_obs_map_t robs;
   std::set <std::string> empty_set;
   char buf[128];
   char *bufptr = (char*)buf;
@@ -717,71 +725,68 @@ void md_config_t::_apply_changes(std::ostream *oss)
       }
     }
     for (obs_map_t::iterator r = range.first; r != range.second; ++r) {
-      rev_obs_map_t::value_type robs_val(r->second, empty_set);
-      pair < rev_obs_map_t::iterator, bool > robs_ret(robs.insert(robs_val));
-      std::set <std::string> &keys(robs_ret.first->second);
-      keys.insert(key);
+      callback(r->second, key);
     }
   }
 
   changed.clear();
-
-  // Make any pending observer callbacks
-  for (rev_obs_map_t::const_iterator r = robs.begin(); r != robs.end(); ++r) {
-    md_config_obs_t *obs = r->first;
-    obs->handle_conf_change(this, r->second);
-  }
-
 }
 
 void md_config_t::call_all_observers()
 {
-  std::map<md_config_obs_t*,std::set<std::string> > obs;
+  rev_obs_map_t rev_obs;
   {
     Mutex::Locker l(lock);
 
     expand_all_meta();
 
     for (auto r = observers.begin(); r != observers.end(); ++r) {
-      obs[r->second].insert(r->first);
+      map_observer_changes(r->second, r->first, &rev_obs);
     }
   }
-  for (auto p = obs.begin();
-       p != obs.end();
-       ++p) {
-    p->first->handle_conf_change(this, p->second);
-  }
+
+  call_observers(rev_obs);
 }
 
 int md_config_t::injectargs(const std::string& s, std::ostream *oss)
 {
   int ret;
-  Mutex::Locker l(lock);
-  char b[s.length()+1];
-  strcpy(b, s.c_str());
-  std::vector<const char*> nargs;
-  char *p = b;
-  while (*p) {
-    nargs.push_back(p);
-    while (*p && *p != ' ') p++;
-    if (!*p)
-      break;
-    *p++ = 0;
-    while (*p && *p == ' ') p++;
-  }
-  ret = parse_injectargs(nargs, oss);
-  if (!nargs.empty()) {
-    *oss << " failed to parse arguments: ";
-    std::string prefix;
-    for (std::vector<const char*>::const_iterator i = nargs.begin();
-        i != nargs.end(); ++i) {
-      *oss << prefix << *i;
-      prefix = ",";
+  rev_obs_map_t rev_obs;
+  {
+    Mutex::Locker l(lock);
+
+    char b[s.length()+1];
+    strcpy(b, s.c_str());
+    std::vector<const char*> nargs;
+    char *p = b;
+    while (*p) {
+      nargs.push_back(p);
+      while (*p && *p != ' ') p++;
+      if (!*p)
+        break;
+      *p++ = 0;
+      while (*p && *p == ' ') p++;
+    }
+    ret = parse_injectargs(nargs, oss);
+    if (!nargs.empty()) {
+      *oss << " failed to parse arguments: ";
+      std::string prefix;
+      for (std::vector<const char*>::const_iterator i = nargs.begin();
+           i != nargs.end(); ++i) {
+        *oss << prefix << *i;
+        prefix = ",";
+      }
+      *oss << "\n";
+      ret = -EINVAL;
     }
-    *oss << "\n";
-    ret = -EINVAL;
+
+    for_each_change(
+      oss, [this, &rev_obs](md_config_obs_t *obs, const std::string &key) {
+        map_observer_changes(obs, key, &rev_obs);
+      });
   }
-  _apply_changes(oss);
+
+  call_observers(rev_obs);
   return ret;
 }
 
@@ -1389,3 +1394,26 @@ void md_config_t::complain_about_parse_errors(CephContext *cct)
   ::complain_about_parse_errors(cct, &parse_errors);
 }
 
+void md_config_t::call_observers(rev_obs_map_t &rev_obs) {
+  for (auto p : rev_obs) {
+    p.first->handle_conf_change(this, p.second);
+    // this can be done outside the lock as call_gate_enter()
+    // and remove_observer() are serialized via lock
+    call_gate_leave(p.first);
+  }
+}
+
+void md_config_t::map_observer_changes(md_config_obs_t *obs, const std::string &key,
+                                       rev_obs_map_t *rev_obs) {
+  ceph_assert(lock.is_locked());
+
+  auto p = rev_obs->emplace(obs, std::set<std::string>{});
+
+  p.first->second.emplace(key);
+  if (p.second) {
+    // this needs to be done under lock as once this lock is
+    // dropped (before calling observers) a remove_observer()
+    // can sneak in and cause havoc.
+    call_gate_enter(p.first->first);
+  }
+}
index 612f083d89cfe06ee536fb2b8e6803fc4b9c085f..1145e12e3c58652e39058cdecc89bb01a842aff4 100644 (file)
@@ -19,6 +19,7 @@
 #include "common/entity_name.h"
 #include "common/code_environment.h"
 #include "common/Mutex.h"
+#include "common/CondVar.h"
 #include "log/SubsystemMap.h"
 #include "common/config_obs.h"
 #include "common/options.h"
@@ -65,6 +66,62 @@ extern const char *CEPH_CONF_FILE_DEFAULT;
  * while another thread is reading them, either.
  */
 struct md_config_t {
+private:
+  class CallGate {
+  private:
+    uint32_t call_count = 0;
+    Mutex lock;
+    Cond cond;
+  public:
+    CallGate()
+      : lock("call::gate::lock", false, true) {
+    }
+
+    void enter() {
+      Mutex::Locker locker(lock);
+      ++call_count;
+    }
+    void leave() {
+      Mutex::Locker locker(lock);
+      ceph_assert(call_count > 0);
+      if (--call_count == 0) {
+        cond.Signal();
+      }
+    }
+    void close() {
+      Mutex::Locker locker(lock);
+      while (call_count != 0) {
+        cond.Wait(lock);
+      }
+    }
+  };
+
+  void call_gate_enter(md_config_obs_t *obs) {
+    auto p = obs_call_gate.find(obs);
+    ceph_assert(p != obs_call_gate.end());
+    p->second->enter();
+  }
+  void call_gate_leave(md_config_obs_t *obs) {
+    auto p = obs_call_gate.find(obs);
+    ceph_assert(p != obs_call_gate.end());
+    p->second->leave();
+  }
+  void call_gate_close(md_config_obs_t *obs) {
+    auto p = obs_call_gate.find(obs);
+    ceph_assert(p != obs_call_gate.end());
+    p->second->close();
+  }
+
+  typedef std::unique_ptr<CallGate> CallGateRef;
+  std::map<md_config_obs_t*, CallGateRef> obs_call_gate;
+
+  typedef std::map<md_config_obs_t*, std::set<std::string>> rev_obs_map_t;
+  typedef std::function<void(md_config_obs_t*, const std::string&)> config_gather_cb;
+
+  void call_observers(rev_obs_map_t &rev_obs);
+  void map_observer_changes(md_config_obs_t *obs, const std::string &key,
+                            rev_obs_map_t *rev_obs);
+
 public:
   typedef boost::variant<int64_t md_config_t::*,
                          uint64_t md_config_t::*,
@@ -135,7 +192,7 @@ public:
 
   // Expand all metavariables. Make any pending observer callbacks.
   void apply_changes(std::ostream *oss);
-  void _apply_changes(std::ostream *oss);
+  void for_each_change(std::ostream *oss, config_gather_cb callback);
   bool _internal_field(const string& k);
   void call_all_observers();