]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
throttle: Minimal destructor fix for Luminous 20306/head
authorAdam C. Emerson <aemerson@redhat.com>
Fri, 28 Jul 2017 18:14:48 +0000 (14:14 -0400)
committerKefu Chai <kchai@redhat.com>
Mon, 5 Feb 2018 07:40:31 +0000 (15:40 +0800)
Get rid of the undefined behavior of destroying condition variables
while they're being waited on.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
(cherry picked from commit 0f52f486ef05edf8bf89e2bbb7b79f1bc1644cfa)

Conflicts:
src/common/Throttle.cc
src/common/Throttle.h
src/test/common/Throttle.cc: in jewel, we don't have perfconter
for throttles, neither do we have backport14.h back then, so we need to
resolve these conflicts, by removing perfcounter related change in
Throttle.cc, and add the make_unique helper for test/common/Throttle.cc,
add scope_guard to Throttle.cc.

src/common/Throttle.cc
src/common/Throttle.h
src/test/common/Throttle.cc

index 8a52906cd5e5de436ac1470eb80e4f877be4c248..24e1b096c985534b75aa650b0acb993d973b7cac 100644 (file)
@@ -63,10 +63,9 @@ Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_
 
 Throttle::~Throttle()
 {
-  while (!cond.empty()) {
-    Cond *cv = cond.front();
-    delete cv;
-    cond.pop_front();
+  {
+    Mutex::Locker l(lock);
+    assert(cond.empty());
   }
 
   if (!use_perf)
@@ -90,34 +89,51 @@ void Throttle::_reset_max(int64_t m)
   max.set((size_t)m);
 }
 
+template <typename F>
+struct scope_guard {
+  F f;
+  scope_guard() = delete;
+  scope_guard(const scope_guard &) = delete;
+  scope_guard(scope_guard &&) = default;
+  scope_guard & operator=(const scope_guard &) = delete;
+  scope_guard & operator=(scope_guard &&) = default;
+  scope_guard(F &&f) : f(std::move(f)) {}
+  ~scope_guard() {
+    f();
+  }
+};
+
+template <typename F>
+scope_guard<F> make_scope_guard(F &&f) {
+  return scope_guard<F>(std::forward<F>(f));
+}
+
 bool Throttle::_wait(int64_t c)
 {
   utime_t start;
   bool waited = false;
   if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
-    Cond *cv = new Cond;
-    cond.push_back(cv);
-    do {
-      if (!waited) {
-       ldout(cct, 2) << "_wait waiting..." << dendl;
-       if (logger)
-         start = ceph_clock_now(cct);
-      }
+    {
+      auto cv = cond.insert(cond.end(), new Cond);
+      auto w = make_scope_guard([this, cv]() {
+         delete *cv;
+         cond.erase(cv);
+       });
       waited = true;
-      cv->Wait(lock);
-    } while (_should_wait(c) || cv != cond.front());
+      ldout(cct, 2) << "_wait waiting..." << dendl;
+      if (logger)
+       start = ceph_clock_now(cct);
+
+      do {
+       (*cv)->Wait(lock);
+      } while ((_should_wait(c) || cv != cond.begin()));
 
-    if (waited) {
-      ldout(cct, 3) << "_wait finished waiting" << dendl;
+      ldout(cct, 2) << "_wait finished waiting" << dendl;
       if (logger) {
        utime_t dur = ceph_clock_now(cct) - start;
-        logger->tinc(l_throttle_wait, dur);
+       logger->tinc(l_throttle_wait, dur);
       }
     }
-
-    delete cv;
-    cond.pop_front();
-
     // wake up the next guy
     if (!cond.empty())
       cond.front()->SignalOne();
@@ -238,6 +254,14 @@ int64_t Throttle::put(int64_t c)
   return count.read();
 }
 
+BackoffThrottle::~BackoffThrottle()
+{
+  {
+    locker l(lock);
+    assert(waiters.empty());
+  }
+}
+
 bool BackoffThrottle::set_params(
   double _low_threshhold,
   double _high_threshhold,
@@ -435,13 +459,17 @@ SimpleThrottle::~SimpleThrottle()
 {
   Mutex::Locker l(m_lock);
   assert(m_current == 0);
+  assert(waiters == 0);
 }
 
 void SimpleThrottle::start_op()
 {
   Mutex::Locker l(m_lock);
-  while (m_max == m_current)
+  while (m_max == m_current) {
+    waiters++;
     m_cond.Wait(m_lock);
+    waiters--;
+  }
   ++m_current;
 }
 
@@ -463,8 +491,11 @@ bool SimpleThrottle::pending_error() const
 int SimpleThrottle::wait_for_ret()
 {
   Mutex::Locker l(m_lock);
-  while (m_current > 0)
+  while (m_current > 0) {
+    waiters++;
     m_cond.Wait(m_lock);
+    waiters--;
+  }
   return m_ret;
 }
 
@@ -477,6 +508,11 @@ OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent)
     m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) {
 }
 
+OrderedThrottle::~OrderedThrottle() {
+  Mutex::Locker locker(m_lock);
+  assert(waiters == 0);
+}
+
 C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
   assert(on_finish != NULL);
 
@@ -487,7 +523,9 @@ C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
 
   complete_pending_ops();
   while (m_max == m_current) {
+    ++waiters;
     m_cond.Wait(m_lock);
+    --waiters;
     complete_pending_ops();
   }
   ++m_current;
@@ -527,7 +565,9 @@ int OrderedThrottle::wait_for_ret() {
   complete_pending_ops();
 
   while (m_current > 0) {
+    ++waiters;
     m_cond.Wait(m_lock);
+    --waiters;
     complete_pending_ops();
   }
   return m_ret_val;
index f182c0c8797d69bd56074f97d4a47df18d412e15..3d692f4f4c949cdb1696f5d5e53eec4f35bccfcc 100644 (file)
 #include <iostream>
 #include <condition_variable>
 #include <chrono>
+#include <stdexcept>
 #include "include/atomic.h"
+
+#include "Cond.h"
 #include "include/Context.h"
 
 class CephContext;
@@ -33,6 +36,8 @@ class Throttle {
   Mutex lock;
   list<Cond*> cond;
   const bool use_perf;
+  bool shutting_down = false;
+  Cond shutdown_clear;
 
 public:
   Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true);
@@ -211,6 +216,7 @@ public:
   BackoffThrottle(
     unsigned expected_concurrency ///< [in] determines size of conds
     ) : conds(expected_concurrency) {}
+  ~BackoffThrottle();
 };
 
 
@@ -240,6 +246,7 @@ private:
   uint64_t m_current;
   int m_ret;
   bool m_ignore_enoent;
+  uint32_t waiters = 0;
 };
 
 class C_SimpleThrottle : public Context {
@@ -280,6 +287,7 @@ private:
 class OrderedThrottle {
 public:
   OrderedThrottle(uint64_t max, bool ignore_enoent);
+  ~OrderedThrottle();
 
   C_OrderedThrottle *start_op(Context *on_finish);
   void end_op(int r);
@@ -318,6 +326,7 @@ private:
   TidResult m_tid_result;
 
   void complete_pending_ops();
+  uint32_t waiters = 0;
 };
 
 #endif
index 279b62edbab42f8990815aa923c5f2e407e7e2de..096bdb007c07244e0ee56b742b407c5fd25537f3 100644 (file)
@@ -56,7 +56,6 @@ protected:
       return NULL;
     }
   };
-
 };
 
 TEST_F(ThrottleTest, Throttle) {
@@ -220,42 +219,41 @@ TEST_F(ThrottleTest, wait) {
   } while(!waited);
 }
 
-TEST_F(ThrottleTest, destructor) {
-  Thread_get *t;
-  {
-    int64_t throttle_max = 10;
-    Throttle *throttle = new Throttle(g_ceph_context, "throttle", throttle_max);
-
-    ASSERT_FALSE(throttle->get(5));
-
-    t = new Thread_get(*throttle, 7);
-    t->create("t_throttle");
-    bool blocked;
-    useconds_t delay = 1;
-    do {
-      usleep(delay);
-      if (throttle->get_or_fail(1)) {
-       throttle->put(1);
-       blocked = false;
-      } else {
-       blocked = true;
-      }
-      delay *= 2;
-    } while(!blocked);
-    delete throttle;
-  }
+namespace ceph {
+template<typename T>
+struct uniquity {
+  using datum = std::unique_ptr<T>;
+};
 
-  { //
-    // The thread is left hanging, otherwise it will abort().
-    // Deleting the Throttle on which it is waiting creates a
-    // inconsistency that will be detected: the Throttle object that
-    // it references no longer exists.
-    //
-    pthread_t id = t->get_thread_id();
-    ASSERT_EQ(pthread_kill(id, 0), 0);
-    delete t;
-    ASSERT_EQ(pthread_kill(id, 0), 0);
-  }
+template<typename T, typename... Args>
+inline typename uniquity<T>::datum make_unique(Args&&... args) {
+  return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
+}
+}
+
+TEST_F(ThrottleTest, destructor) {
+  EXPECT_DEATH({
+      int64_t throttle_max = 10;
+      auto throttle = ceph::make_unique<Throttle>(g_ceph_context, "throttle",
+                                                 throttle_max);
+
+
+      ASSERT_FALSE(throttle->get(5));
+      unique_ptr<Thread_get> t = ceph::make_unique<Thread_get>(*throttle, 7);
+      t->create("t_throttle");
+      bool blocked;
+      useconds_t delay = 1;
+      do {
+       usleep(delay);
+       if (throttle->get_or_fail(1)) {
+         throttle->put(1);
+         blocked = false;
+       } else {
+         blocked = true;
+       }
+       delay *= 2;
+      } while (!blocked);
+    }, ".*");
 }
 
 std::pair<double, std::chrono::duration<double> > test_backoff(
@@ -357,6 +355,24 @@ std::pair<double, std::chrono::duration<double> > test_backoff(
     wait_time / waits);
 }
 
+TEST(BackoffThrottle, destruct) {
+  EXPECT_DEATH({
+      auto throttle = ceph::make_unique<BackoffThrottle>(10);
+      ASSERT_TRUE(throttle->set_params(0.4, 0.6, 1000, 2, 10, 6, nullptr));
+
+      throttle->get(5);
+      {
+       auto& t = *throttle;
+       std::thread([&t]() {
+           usleep(5);
+           t.get(6);
+         });
+      }
+      // No equivalent of get_or_fail()
+      std::this_thread::sleep_for(std::chrono::milliseconds(250));
+    }, ".*");
+}
+
 TEST(BackoffThrottle, undersaturated)
 {
   auto results = test_backoff(
@@ -411,6 +427,22 @@ TEST(BackoffThrottle, oversaturated)
   ASSERT_GT(results.second.count(), 0.0005);
 }
 
+TEST(OrderedThrottle, destruct) {
+  EXPECT_DEATH({
+      auto throttle = ceph::make_unique<OrderedThrottle>(1, false);
+      throttle->start_op(nullptr);
+      {
+       auto& t = *throttle;
+       std::thread([&t]() {
+           usleep(5);
+           t.start_op(nullptr);
+         });
+      }
+      // No equivalent of get_or_fail()
+      std::this_thread::sleep_for(std::chrono::milliseconds(250));
+    }, ".*");
+}
+
 /*
  * Local Variables:
  * compile-command: "cd ../.. ;
@@ -420,4 +452,3 @@ TEST(BackoffThrottle, oversaturated)
  * "
  * End:
  */
-