]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
throttle: Do not destroy condition variables with waiters 16618/head
authorAdam C. Emerson <aemerson@redhat.com>
Thu, 27 Jul 2017 04:55:36 +0000 (00:55 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Wed, 9 Aug 2017 02:53:12 +0000 (22:53 -0400)
Destroying a condition variable on which someone is waiting is Undefined
Behavior. it's bad and terrible and awful. On some machines it makes
the destructor just outright hang.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/common/Throttle.cc
src/common/Throttle.h
src/test/common/Throttle.cc

index 1d84be68d4f183641f951b9510697f56d04ebe9d..d0d3f612e33c4acefb8617b36503e5f34df624d6 100644 (file)
@@ -4,7 +4,10 @@
 #include "include/scope_guard.h"
 
 #include "common/Throttle.h"
+#include "common/ceph_time.h"
 #include "common/perf_counters.h"
+#include "common/Throttle.h"
+
 
 // re-include our assert to clobber the system one; fix dout:
 #include "include/assert.h"
 #undef dout_prefix
 #define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
 
+using ceph::mono_clock;
+using ceph::mono_time;
+using ceph::uniquely_lock;
+
 enum {
   l_throttle_first = 532430,
   l_throttle_val,
@@ -31,11 +38,9 @@ enum {
   l_throttle_last,
 };
 
-Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_perf)
-  : cct(cct), name(n), logger(NULL),
-    max(m),
-    lock("Throttle::lock"),
-    use_perf(_use_perf)
+Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m,
+                  bool _use_perf)
+  : cct(cct), name(n), max(m), use_perf(_use_perf)
 {
   assert(m >= 0);
 
@@ -57,69 +62,55 @@ Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_
     b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data");
     b.add_time_avg(l_throttle_wait, "wait", "Waiting latency");
 
-    logger = b.create_perf_counters();
-    cct->get_perfcounters_collection()->add(logger);
+    logger = { b.create_perf_counters(), cct };
+    cct->get_perfcounters_collection()->add(logger.get());
     logger->set(l_throttle_max, max);
   }
 }
 
 Throttle::~Throttle()
 {
-  {
-    Mutex::Locker l(lock);
-    assert(cond.empty());
-  }
-
-  if (!use_perf)
-    return;
-
-  if (logger) {
-    cct->get_perfcounters_collection()->remove(logger);
-    delete logger;
-  }
+  auto l = uniquely_lock(lock);
+  assert(conds.empty());
 }
 
 void Throttle::_reset_max(int64_t m)
 {
-  assert(lock.is_locked());
+  // lock must be held.
   if (static_cast<int64_t>(max) == m)
     return;
-  if (!cond.empty())
-    cond.front()->SignalOne();
+  if (!conds.empty())
+    conds.front().notify_one();
   if (logger)
     logger->set(l_throttle_max, m);
   max = m;
 }
 
-bool Throttle::_wait(int64_t c)
+bool Throttle::_wait(int64_t c, UNIQUE_LOCK_T(lock)& l)
 {
-  utime_t start;
+  mono_time start;
   bool waited = false;
-  if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
+  if (_should_wait(c) || !conds.empty()) { // always wait behind other waiters.
     {
-      auto cv = cond.insert(cond.end(), new Cond);
+      auto cv = conds.emplace(conds.end());
       auto w = make_scope_guard([this, cv]() {
-         delete *cv;
-         cond.erase(cv);
+         conds.erase(cv);
        });
       waited = true;
       ldout(cct, 2) << "_wait waiting..." << dendl;
       if (logger)
-       start = ceph_clock_now();
-
-      do {
-       (*cv)->Wait(lock);
-      } while ((_should_wait(c) || cv != cond.begin()));
+       start = mono_clock::now();
 
+      cv->wait(l, [this, c, cv]() { return (!_should_wait(c) &&
+                                           cv == conds.begin()); });
       ldout(cct, 2) << "_wait finished waiting" << dendl;
       if (logger) {
-       utime_t dur = ceph_clock_now() - start;
-       logger->tinc(l_throttle_wait, dur);
+       logger->tinc(l_throttle_wait, mono_clock::now() - start);
       }
     }
     // wake up the next guy
-    if (!cond.empty())
-      cond.front()->SignalOne();
+    if (!conds.empty())
+      conds.front().notify_one();
   }
   return waited;
 }
@@ -130,13 +121,13 @@ bool Throttle::wait(int64_t m)
     return false;
   }
 
-  Mutex::Locker l(lock);
+  auto l = uniquely_lock(lock);
   if (m) {
     assert(m > 0);
     _reset_max(m);
   }
   ldout(cct, 10) << "wait" << dendl;
-  return _wait(0);
+  return _wait(0, l);
 }
 
 int64_t Throttle::take(int64_t c)
@@ -147,7 +138,7 @@ int64_t Throttle::take(int64_t c)
   assert(c >= 0);
   ldout(cct, 10) << "take " << c << dendl;
   {
-    Mutex::Locker l(lock);
+    auto l = uniquely_lock(lock);
     count += c;
   }
   if (logger) {
@@ -171,12 +162,12 @@ bool Throttle::get(int64_t c, int64_t m)
   }
   bool waited = false;
   {
-    Mutex::Locker l(lock);
+    auto l = uniquely_lock(lock);
     if (m) {
       assert(m > 0);
       _reset_max(m);
     }
-    waited = _wait(c);
+    waited = _wait(c, l);
     count += c;
   }
   if (logger) {
@@ -197,15 +188,16 @@ bool Throttle::get_or_fail(int64_t c)
   }
 
   assert (c >= 0);
-  Mutex::Locker l(lock);
-  if (_should_wait(c) || !cond.empty()) {
+  auto l = uniquely_lock(lock);
+  if (_should_wait(c) || !conds.empty()) {
     ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
     if (logger) {
       logger->inc(l_throttle_get_or_fail_fail);
     }
     return false;
   } else {
-    ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
+    ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load()
+                  << " -> " << (count.load() + c) << ")" << dendl;
     count += c;
     if (logger) {
       logger->inc(l_throttle_get_or_fail_success);
@@ -224,12 +216,14 @@ int64_t Throttle::put(int64_t c)
   }
 
   assert(c >= 0);
-  ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " << (count.load()-c) << ")" << dendl;
-  Mutex::Locker l(lock);
+  ldout(cct, 10) << "put " << c << " (" << count.load() << " -> "
+                << (count.load()-c) << ")" << dendl;
+  auto l = uniquely_lock(lock);
   if (c) {
-    if (!cond.empty())
-      cond.front()->SignalOne();
-    assert(static_cast<int64_t>(count) >= c); // if count goes negative, we failed somewhere!
+    if (!conds.empty())
+      conds.front().notify_one();
+    // if count goes negative, we failed somewhere!
+    assert(static_cast<int64_t>(count) >= c);
     count -= c;
     if (logger) {
       logger->inc(l_throttle_put);
@@ -242,9 +236,9 @@ int64_t Throttle::put(int64_t c)
 
 void Throttle::reset()
 {
-  Mutex::Locker l(lock);
-  if (!cond.empty())
-    cond.front()->SignalOne();
+  auto l = uniquely_lock(lock);
+  if (!conds.empty())
+    conds.front().notify_one();
   count = 0;
   if (logger) {
     logger->set(l_throttle_val, 0);
@@ -265,8 +259,9 @@ enum {
   l_backoff_throttle_last,
 };
 
-BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigned expected_concurrency, bool _use_perf)
-  : cct(cct), name(n), logger(NULL),
+BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n,
+                                unsigned expected_concurrency, bool _use_perf)
+  : cct(cct), name(n),
     conds(expected_concurrency),///< [in] determines size of conds
     use_perf(_use_perf)
 {
@@ -274,7 +269,8 @@ BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigne
     return;
 
   if (cct->_conf->throttler_perf_counter) {
-    PerfCountersBuilder b(cct, string("throttle-") + name, l_backoff_throttle_first, l_backoff_throttle_last);
+    PerfCountersBuilder b(cct, string("throttle-") + name,
+                         l_backoff_throttle_first, l_backoff_throttle_last);
     b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle");
     b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle");
     b.add_u64_counter(l_backoff_throttle_get, "get", "Gets");
@@ -285,26 +281,16 @@ BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigne
     b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data");
     b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency");
 
-    logger = b.create_perf_counters();
-    cct->get_perfcounters_collection()->add(logger);
+    logger = { b.create_perf_counters(), cct };
+    cct->get_perfcounters_collection()->add(logger.get());
     logger->set(l_backoff_throttle_max, max);
   }
 }
 
 BackoffThrottle::~BackoffThrottle()
 {
-  {
-    locker l(lock);
-    assert(waiters.empty());
-  }
-
-  if (!use_perf)
-    return;
-
-  if (logger) {
-    cct->get_perfcounters_collection()->remove(logger);
-    delete logger;
-  }
+  auto l = uniquely_lock(lock);
+  assert(waiters.empty());
 }
 
 bool BackoffThrottle::set_params(
@@ -451,7 +437,7 @@ std::chrono::duration<double> BackoffThrottle::get(uint64_t c)
   }
 
   auto ticket = _push_waiter();
-  utime_t wait_from = ceph_clock_now();
+  auto wait_from = mono_clock::now();
   bool waited = false;
 
   while (waiters.begin() != ticket) {
@@ -482,7 +468,7 @@ std::chrono::duration<double> BackoffThrottle::get(uint64_t c)
   if (logger) {
     logger->set(l_backoff_throttle_val, current);
     if (waited) {
-      logger->tinc(l_backoff_throttle_wait, ceph_clock_now() - wait_from);
+      logger->tinc(l_backoff_throttle_wait, mono_clock::now() - wait_from);
     }
   }
 
@@ -532,55 +518,45 @@ uint64_t BackoffThrottle::get_max()
 }
 
 SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent)
-  : m_lock("SimpleThrottle"),
-    m_max(max),
-    m_current(0),
-    m_ret(0),
-    m_ignore_enoent(ignore_enoent)
-{
-}
+  : m_max(max), m_ignore_enoent(ignore_enoent) {}
 
 SimpleThrottle::~SimpleThrottle()
 {
-  Mutex::Locker l(m_lock);
+  auto l = uniquely_lock(m_lock);
   assert(m_current == 0);
   assert(waiters == 0);
 }
 
 void SimpleThrottle::start_op()
 {
-  Mutex::Locker l(m_lock);
-  while (m_max == m_current) {
-    waiters++;
-    m_cond.Wait(m_lock);
-    waiters--;
-  }
+  auto l = uniquely_lock(m_lock);
+  waiters++;
+  m_cond.wait(l, [this]() { return m_max != m_current; });
+  waiters--;
   ++m_current;
 }
 
 void SimpleThrottle::end_op(int r)
 {
-  Mutex::Locker l(m_lock);
+  auto l = uniquely_lock(m_lock);
   --m_current;
   if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent))
     m_ret = r;
-  m_cond.Signal();
+  m_cond.notify_all();
 }
 
 bool SimpleThrottle::pending_error() const
 {
-  Mutex::Locker l(m_lock);
+  auto l = uniquely_lock(m_lock);
   return (m_ret < 0);
 }
 
 int SimpleThrottle::wait_for_ret()
 {
-  Mutex::Locker l(m_lock);
-  while (m_current > 0) {
-    waiters++;
-    m_cond.Wait(m_lock);
-    waiters--;
-  }
+  auto l = uniquely_lock(m_lock);
+  waiters++;
+  m_cond.wait(l, [this]() { return m_current == 0; });
+  waiters--;
   return m_ret;
 }
 
@@ -589,80 +565,76 @@ void C_OrderedThrottle::finish(int r) {
 }
 
 OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent)
-  : m_lock("OrderedThrottle::m_lock"), m_max(max), m_current(0), m_ret_val(0),
-    m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) {
-}
+  : m_max(max), m_ignore_enoent(ignore_enoent) {}
 
 OrderedThrottle::~OrderedThrottle() {
-  Mutex::Locker locker(m_lock);
+  auto l  = uniquely_lock(m_lock);
   assert(waiters == 0);
 }
 
 C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
-  assert(on_finish != NULL);
+  assert(on_finish);
 
-  Mutex::Locker locker(m_lock);
+  auto l = uniquely_lock(m_lock);
   uint64_t tid = m_next_tid++;
   m_tid_result[tid] = Result(on_finish);
-  C_OrderedThrottle *ctx = new C_OrderedThrottle(this, tid);
+  auto ctx = make_unique<C_OrderedThrottle>(this, tid);
 
-  complete_pending_ops();
+  complete_pending_ops(l);
   while (m_max == m_current) {
     ++waiters;
-    m_cond.Wait(m_lock);
+    m_cond.wait(l);
     --waiters;
-    complete_pending_ops();
+    complete_pending_ops(l);
   }
   ++m_current;
 
-  return ctx;
+  return ctx.release();
 }
 
 void OrderedThrottle::end_op(int r) {
-  Mutex::Locker locker(m_lock);
+  auto l = uniquely_lock(m_lock);
   assert(m_current > 0);
 
   if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) {
     m_ret_val = r;
   }
   --m_current;
-  m_cond.Signal();
+  m_cond.notify_all();
 }
 
 void OrderedThrottle::finish_op(uint64_t tid, int r) {
-  Mutex::Locker locker(m_lock);
+  auto l = uniquely_lock(m_lock);
 
-  TidResult::iterator it = m_tid_result.find(tid);
+  auto it = m_tid_result.find(tid);
   assert(it != m_tid_result.end());
 
   it->second.finished = true;
   it->second.ret_val = r;
-  m_cond.Signal();
+  m_cond.notify_all();
 }
 
 bool OrderedThrottle::pending_error() const {
-  Mutex::Locker locker(m_lock);
+  auto l = uniquely_lock(m_lock);
   return (m_ret_val < 0);
 }
 
 int OrderedThrottle::wait_for_ret() {
-  Mutex::Locker locker(m_lock);
-  complete_pending_ops();
+  auto l = uniquely_lock(m_lock);
+  complete_pending_ops(l);
 
   while (m_current > 0) {
     ++waiters;
-    m_cond.Wait(m_lock);
+    m_cond.wait(l);
     --waiters;
-    complete_pending_ops();
+    complete_pending_ops(l);
   }
   return m_ret_val;
 }
 
-void OrderedThrottle::complete_pending_ops() {
-  assert(m_lock.is_locked());
-
+void OrderedThrottle::complete_pending_ops(UNIQUE_LOCK_T(m_lock)& l) {
   while (true) {
-    TidResult::iterator it = m_tid_result.begin();
+    auto it = m_tid_result.begin();
     if (it == m_tid_result.end() || it->first != m_complete_tid ||
         !it->second.finished) {
       break;
@@ -671,9 +643,9 @@ void OrderedThrottle::complete_pending_ops() {
     Result result = it->second;
     m_tid_result.erase(it);
 
-    m_lock.Unlock();
+    l.unlock();
     result.on_finish->complete(result.ret_val);
-    m_lock.Lock();
+    l.lock();
 
     ++m_complete_tid;
   }
index efc5ba037ae80e0b918641ff0702979118a684c6..162e597a655bb3f15372efb01e2b01d7129afb60 100644 (file)
@@ -4,19 +4,17 @@
 #ifndef CEPH_THROTTLE_H
 #define CEPH_THROTTLE_H
 
-#include <map>
-#include <list>
-#include <chrono>
 #include <atomic>
-#include <iostream>
+#include <chrono>
 #include <condition_variable>
-#include <stdexcept>
+#include <iostream>
+#include <list>
+#include <map>
+#include <mutex>
 
-#include "Cond.h"
 #include "include/Context.h"
-
-class CephContext;
-class PerfCounters;
+#include "common/convenience.h"
+#include "common/perf_counters.h"
 
 /**
  * @class Throttle
@@ -29,13 +27,11 @@ class PerfCounters;
 class Throttle {
   CephContext *cct;
   const std::string name;
-  PerfCounters *logger;
+  PerfCountersRef logger;
   std::atomic<unsigned> count = { 0 }, max = { 0 };
-  Mutex lock;
-  list<Cond*> cond;
+  std::mutex lock;
+  std::list<std::condition_variable> conds;
   const bool use_perf;
-  bool shutting_down = false;
-  Cond shutdown_clear;
 
 public:
   Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true);
@@ -52,7 +48,7 @@ private:
        (c >= m && cur > m));     // except for large c
   }
 
-  bool _wait(int64_t c);
+  bool _wait(int64_t c, UNIQUE_LOCK_T(lock)& l);
 
 public:
   /**
@@ -124,7 +120,7 @@ public:
     return _should_wait(c);
   }
   void reset_max(int64_t m) {
-    Mutex::Locker l(lock);
+    auto l = ceph::uniquely_lock(lock);
     _reset_max(m);
   }
 };
@@ -158,7 +154,7 @@ public:
 class BackoffThrottle {
   CephContext *cct;
   const std::string name;
-  PerfCounters *logger;
+  PerfCountersRef logger;
 
   std::mutex lock;
   using locker = std::unique_lock<std::mutex>;
@@ -256,11 +252,11 @@ public:
   bool pending_error() const;
   int wait_for_ret();
 private:
-  mutable Mutex m_lock;
-  Cond m_cond;
+  mutable std::mutex m_lock;
+  std::condition_variable m_cond;
   uint64_t m_max;
-  uint64_t m_current;
-  int m_ret;
+  uint64_t m_current = 0;
+  int m_ret = 0;
   bool m_ignore_enoent;
   uint32_t waiters = 0;
 };
@@ -318,19 +314,19 @@ private:
 
   typedef std::map<uint64_t, Result> TidResult;
 
-  mutable Mutex m_lock;
-  Cond m_cond;
+  mutable std::mutex m_lock;
+  std::condition_variable m_cond;
   uint64_t m_max;
-  uint64_t m_current;
-  int m_ret_val;
+  uint64_t m_current = 0;
+  int m_ret_val = 0;
   bool m_ignore_enoent;
 
-  uint64_t m_next_tid;
-  uint64_t m_complete_tid;
+  uint64_t m_next_tid = 0;
+  uint64_t m_complete_tid = 0;
 
   TidResult m_tid_result;
 
-  void complete_pending_ops();
+  void complete_pending_ops(UNIQUE_LOCK_T(m_lock)& l);
   uint32_t waiters = 0;
 };
 
index 836f22fe7a2c8e52bb2eb484ed554056e17e4c79..1e80dc1f35b8d860df8a3bae9cb3d2732dc12ef9 100644 (file)
@@ -22,6 +22,7 @@
 #include <stdio.h>
 #include <signal.h>
 #include "gtest/gtest.h"
+#include "common/backport14.h"
 #include "common/Mutex.h"
 #include "common/Thread.h"
 #include "common/Throttle.h"
@@ -42,20 +43,16 @@ protected:
   public:
     Throttle &throttle;
     int64_t count;
-    bool waited;
+    bool waited = false;
 
     Thread_get(Throttle& _throttle, int64_t _count) :
-      throttle(_throttle),
-      count(_count),
-      waited(false)
-    {
-    }
+      throttle(_throttle), count(_count) {}
 
     void *entry() override {
       usleep(5);
       waited = throttle.get(count);
       throttle.put(count);
-      return NULL;
+      return nullptr;
     }
   };
 };
@@ -216,7 +213,6 @@ TEST_F(ThrottleTest, wait) {
   } while(!waited);
 }
 
-
 TEST_F(ThrottleTest, destructor) {
   EXPECT_DEATH({
       int64_t throttle_max = 10;