From dd8aec3cc896667d4d668bf479f93dc357826955 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Thu, 27 Jul 2017 00:55:36 -0400 Subject: [PATCH] throttle: Do not destroy condition variables with waiters Destroying a condition variable on which someone is waiting is Undefined Behavior. it's bad and terrible and awful. On some machines it makes the destructor just outright hang. Signed-off-by: Adam C. Emerson --- src/common/Throttle.cc | 216 ++++++++++++++++-------------------- src/common/Throttle.h | 52 ++++----- src/test/common/Throttle.cc | 12 +- 3 files changed, 122 insertions(+), 158 deletions(-) diff --git a/src/common/Throttle.cc b/src/common/Throttle.cc index 1d84be68d4f..d0d3f612e33 100644 --- a/src/common/Throttle.cc +++ b/src/common/Throttle.cc @@ -4,7 +4,10 @@ #include "include/scope_guard.h" #include "common/Throttle.h" +#include "common/ceph_time.h" #include "common/perf_counters.h" +#include "common/Throttle.h" + // re-include our assert to clobber the system one; fix dout: #include "include/assert.h" @@ -14,6 +17,10 @@ #undef dout_prefix #define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") " +using ceph::mono_clock; +using ceph::mono_time; +using ceph::uniquely_lock; + enum { l_throttle_first = 532430, l_throttle_val, @@ -31,11 +38,9 @@ enum { l_throttle_last, }; -Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_perf) - : cct(cct), name(n), logger(NULL), - max(m), - lock("Throttle::lock"), - use_perf(_use_perf) +Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, + bool _use_perf) + : cct(cct), name(n), max(m), use_perf(_use_perf) { assert(m >= 0); @@ -57,69 +62,55 @@ Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_ b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data"); b.add_time_avg(l_throttle_wait, "wait", "Waiting latency"); - logger = b.create_perf_counters(); - cct->get_perfcounters_collection()->add(logger); + logger = { b.create_perf_counters(), cct }; + cct->get_perfcounters_collection()->add(logger.get()); logger->set(l_throttle_max, max); } } Throttle::~Throttle() { - { - Mutex::Locker l(lock); - assert(cond.empty()); - } - - if (!use_perf) - return; - - if (logger) { - cct->get_perfcounters_collection()->remove(logger); - delete logger; - } + auto l = uniquely_lock(lock); + assert(conds.empty()); } void Throttle::_reset_max(int64_t m) { - assert(lock.is_locked()); + // lock must be held. if (static_cast(max) == m) return; - if (!cond.empty()) - cond.front()->SignalOne(); + if (!conds.empty()) + conds.front().notify_one(); if (logger) logger->set(l_throttle_max, m); max = m; } -bool Throttle::_wait(int64_t c) +bool Throttle::_wait(int64_t c, UNIQUE_LOCK_T(lock)& l) { - utime_t start; + mono_time start; bool waited = false; - if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters. + if (_should_wait(c) || !conds.empty()) { // always wait behind other waiters. { - auto cv = cond.insert(cond.end(), new Cond); + auto cv = conds.emplace(conds.end()); auto w = make_scope_guard([this, cv]() { - delete *cv; - cond.erase(cv); + conds.erase(cv); }); waited = true; ldout(cct, 2) << "_wait waiting..." << dendl; if (logger) - start = ceph_clock_now(); - - do { - (*cv)->Wait(lock); - } while ((_should_wait(c) || cv != cond.begin())); + start = mono_clock::now(); + cv->wait(l, [this, c, cv]() { return (!_should_wait(c) && + cv == conds.begin()); }); ldout(cct, 2) << "_wait finished waiting" << dendl; if (logger) { - utime_t dur = ceph_clock_now() - start; - logger->tinc(l_throttle_wait, dur); + logger->tinc(l_throttle_wait, mono_clock::now() - start); } } // wake up the next guy - if (!cond.empty()) - cond.front()->SignalOne(); + if (!conds.empty()) + conds.front().notify_one(); } return waited; } @@ -130,13 +121,13 @@ bool Throttle::wait(int64_t m) return false; } - Mutex::Locker l(lock); + auto l = uniquely_lock(lock); if (m) { assert(m > 0); _reset_max(m); } ldout(cct, 10) << "wait" << dendl; - return _wait(0); + return _wait(0, l); } int64_t Throttle::take(int64_t c) @@ -147,7 +138,7 @@ int64_t Throttle::take(int64_t c) assert(c >= 0); ldout(cct, 10) << "take " << c << dendl; { - Mutex::Locker l(lock); + auto l = uniquely_lock(lock); count += c; } if (logger) { @@ -171,12 +162,12 @@ bool Throttle::get(int64_t c, int64_t m) } bool waited = false; { - Mutex::Locker l(lock); + auto l = uniquely_lock(lock); if (m) { assert(m > 0); _reset_max(m); } - waited = _wait(c); + waited = _wait(c, l); count += c; } if (logger) { @@ -197,15 +188,16 @@ bool Throttle::get_or_fail(int64_t c) } assert (c >= 0); - Mutex::Locker l(lock); - if (_should_wait(c) || !cond.empty()) { + auto l = uniquely_lock(lock); + if (_should_wait(c) || !conds.empty()) { ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl; if (logger) { logger->inc(l_throttle_get_or_fail_fail); } return false; } else { - ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() << " -> " << (count.load() + c) << ")" << dendl; + ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() + << " -> " << (count.load() + c) << ")" << dendl; count += c; if (logger) { logger->inc(l_throttle_get_or_fail_success); @@ -224,12 +216,14 @@ int64_t Throttle::put(int64_t c) } assert(c >= 0); - ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " << (count.load()-c) << ")" << dendl; - Mutex::Locker l(lock); + ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " + << (count.load()-c) << ")" << dendl; + auto l = uniquely_lock(lock); if (c) { - if (!cond.empty()) - cond.front()->SignalOne(); - assert(static_cast(count) >= c); // if count goes negative, we failed somewhere! + if (!conds.empty()) + conds.front().notify_one(); + // if count goes negative, we failed somewhere! + assert(static_cast(count) >= c); count -= c; if (logger) { logger->inc(l_throttle_put); @@ -242,9 +236,9 @@ int64_t Throttle::put(int64_t c) void Throttle::reset() { - Mutex::Locker l(lock); - if (!cond.empty()) - cond.front()->SignalOne(); + auto l = uniquely_lock(lock); + if (!conds.empty()) + conds.front().notify_one(); count = 0; if (logger) { logger->set(l_throttle_val, 0); @@ -265,8 +259,9 @@ enum { l_backoff_throttle_last, }; -BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigned expected_concurrency, bool _use_perf) - : cct(cct), name(n), logger(NULL), +BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, + unsigned expected_concurrency, bool _use_perf) + : cct(cct), name(n), conds(expected_concurrency),///< [in] determines size of conds use_perf(_use_perf) { @@ -274,7 +269,8 @@ BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigne return; if (cct->_conf->throttler_perf_counter) { - PerfCountersBuilder b(cct, string("throttle-") + name, l_backoff_throttle_first, l_backoff_throttle_last); + PerfCountersBuilder b(cct, string("throttle-") + name, + l_backoff_throttle_first, l_backoff_throttle_last); b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle"); b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle"); b.add_u64_counter(l_backoff_throttle_get, "get", "Gets"); @@ -285,26 +281,16 @@ BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigne b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data"); b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency"); - logger = b.create_perf_counters(); - cct->get_perfcounters_collection()->add(logger); + logger = { b.create_perf_counters(), cct }; + cct->get_perfcounters_collection()->add(logger.get()); logger->set(l_backoff_throttle_max, max); } } BackoffThrottle::~BackoffThrottle() { - { - locker l(lock); - assert(waiters.empty()); - } - - if (!use_perf) - return; - - if (logger) { - cct->get_perfcounters_collection()->remove(logger); - delete logger; - } + auto l = uniquely_lock(lock); + assert(waiters.empty()); } bool BackoffThrottle::set_params( @@ -451,7 +437,7 @@ std::chrono::duration BackoffThrottle::get(uint64_t c) } auto ticket = _push_waiter(); - utime_t wait_from = ceph_clock_now(); + auto wait_from = mono_clock::now(); bool waited = false; while (waiters.begin() != ticket) { @@ -482,7 +468,7 @@ std::chrono::duration BackoffThrottle::get(uint64_t c) if (logger) { logger->set(l_backoff_throttle_val, current); if (waited) { - logger->tinc(l_backoff_throttle_wait, ceph_clock_now() - wait_from); + logger->tinc(l_backoff_throttle_wait, mono_clock::now() - wait_from); } } @@ -532,55 +518,45 @@ uint64_t BackoffThrottle::get_max() } SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent) - : m_lock("SimpleThrottle"), - m_max(max), - m_current(0), - m_ret(0), - m_ignore_enoent(ignore_enoent) -{ -} + : m_max(max), m_ignore_enoent(ignore_enoent) {} SimpleThrottle::~SimpleThrottle() { - Mutex::Locker l(m_lock); + auto l = uniquely_lock(m_lock); assert(m_current == 0); assert(waiters == 0); } void SimpleThrottle::start_op() { - Mutex::Locker l(m_lock); - while (m_max == m_current) { - waiters++; - m_cond.Wait(m_lock); - waiters--; - } + auto l = uniquely_lock(m_lock); + waiters++; + m_cond.wait(l, [this]() { return m_max != m_current; }); + waiters--; ++m_current; } void SimpleThrottle::end_op(int r) { - Mutex::Locker l(m_lock); + auto l = uniquely_lock(m_lock); --m_current; if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent)) m_ret = r; - m_cond.Signal(); + m_cond.notify_all(); } bool SimpleThrottle::pending_error() const { - Mutex::Locker l(m_lock); + auto l = uniquely_lock(m_lock); return (m_ret < 0); } int SimpleThrottle::wait_for_ret() { - Mutex::Locker l(m_lock); - while (m_current > 0) { - waiters++; - m_cond.Wait(m_lock); - waiters--; - } + auto l = uniquely_lock(m_lock); + waiters++; + m_cond.wait(l, [this]() { return m_current == 0; }); + waiters--; return m_ret; } @@ -589,80 +565,76 @@ void C_OrderedThrottle::finish(int r) { } OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent) - : m_lock("OrderedThrottle::m_lock"), m_max(max), m_current(0), m_ret_val(0), - m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) { -} + : m_max(max), m_ignore_enoent(ignore_enoent) {} OrderedThrottle::~OrderedThrottle() { - Mutex::Locker locker(m_lock); + auto l = uniquely_lock(m_lock); assert(waiters == 0); } C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) { - assert(on_finish != NULL); + assert(on_finish); - Mutex::Locker locker(m_lock); + auto l = uniquely_lock(m_lock); uint64_t tid = m_next_tid++; m_tid_result[tid] = Result(on_finish); - C_OrderedThrottle *ctx = new C_OrderedThrottle(this, tid); + auto ctx = make_unique(this, tid); - complete_pending_ops(); + complete_pending_ops(l); while (m_max == m_current) { ++waiters; - m_cond.Wait(m_lock); + m_cond.wait(l); --waiters; - complete_pending_ops(); + complete_pending_ops(l); } ++m_current; - return ctx; + return ctx.release(); } void OrderedThrottle::end_op(int r) { - Mutex::Locker locker(m_lock); + auto l = uniquely_lock(m_lock); assert(m_current > 0); if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) { m_ret_val = r; } --m_current; - m_cond.Signal(); + m_cond.notify_all(); } void OrderedThrottle::finish_op(uint64_t tid, int r) { - Mutex::Locker locker(m_lock); + auto l = uniquely_lock(m_lock); - TidResult::iterator it = m_tid_result.find(tid); + auto it = m_tid_result.find(tid); assert(it != m_tid_result.end()); it->second.finished = true; it->second.ret_val = r; - m_cond.Signal(); + m_cond.notify_all(); } bool OrderedThrottle::pending_error() const { - Mutex::Locker locker(m_lock); + auto l = uniquely_lock(m_lock); return (m_ret_val < 0); } int OrderedThrottle::wait_for_ret() { - Mutex::Locker locker(m_lock); - complete_pending_ops(); + auto l = uniquely_lock(m_lock); + complete_pending_ops(l); while (m_current > 0) { ++waiters; - m_cond.Wait(m_lock); + m_cond.wait(l); --waiters; - complete_pending_ops(); + complete_pending_ops(l); } return m_ret_val; } -void OrderedThrottle::complete_pending_ops() { - assert(m_lock.is_locked()); - +void OrderedThrottle::complete_pending_ops(UNIQUE_LOCK_T(m_lock)& l) { while (true) { - TidResult::iterator it = m_tid_result.begin(); + auto it = m_tid_result.begin(); if (it == m_tid_result.end() || it->first != m_complete_tid || !it->second.finished) { break; @@ -671,9 +643,9 @@ void OrderedThrottle::complete_pending_ops() { Result result = it->second; m_tid_result.erase(it); - m_lock.Unlock(); + l.unlock(); result.on_finish->complete(result.ret_val); - m_lock.Lock(); + l.lock(); ++m_complete_tid; } diff --git a/src/common/Throttle.h b/src/common/Throttle.h index efc5ba037ae..162e597a655 100644 --- a/src/common/Throttle.h +++ b/src/common/Throttle.h @@ -4,19 +4,17 @@ #ifndef CEPH_THROTTLE_H #define CEPH_THROTTLE_H -#include -#include -#include #include -#include +#include #include -#include +#include +#include +#include +#include -#include "Cond.h" #include "include/Context.h" - -class CephContext; -class PerfCounters; +#include "common/convenience.h" +#include "common/perf_counters.h" /** * @class Throttle @@ -29,13 +27,11 @@ class PerfCounters; class Throttle { CephContext *cct; const std::string name; - PerfCounters *logger; + PerfCountersRef logger; std::atomic count = { 0 }, max = { 0 }; - Mutex lock; - list cond; + std::mutex lock; + std::list conds; const bool use_perf; - bool shutting_down = false; - Cond shutdown_clear; public: Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true); @@ -52,7 +48,7 @@ private: (c >= m && cur > m)); // except for large c } - bool _wait(int64_t c); + bool _wait(int64_t c, UNIQUE_LOCK_T(lock)& l); public: /** @@ -124,7 +120,7 @@ public: return _should_wait(c); } void reset_max(int64_t m) { - Mutex::Locker l(lock); + auto l = ceph::uniquely_lock(lock); _reset_max(m); } }; @@ -158,7 +154,7 @@ public: class BackoffThrottle { CephContext *cct; const std::string name; - PerfCounters *logger; + PerfCountersRef logger; std::mutex lock; using locker = std::unique_lock; @@ -256,11 +252,11 @@ public: bool pending_error() const; int wait_for_ret(); private: - mutable Mutex m_lock; - Cond m_cond; + mutable std::mutex m_lock; + std::condition_variable m_cond; uint64_t m_max; - uint64_t m_current; - int m_ret; + uint64_t m_current = 0; + int m_ret = 0; bool m_ignore_enoent; uint32_t waiters = 0; }; @@ -318,19 +314,19 @@ private: typedef std::map TidResult; - mutable Mutex m_lock; - Cond m_cond; + mutable std::mutex m_lock; + std::condition_variable m_cond; uint64_t m_max; - uint64_t m_current; - int m_ret_val; + uint64_t m_current = 0; + int m_ret_val = 0; bool m_ignore_enoent; - uint64_t m_next_tid; - uint64_t m_complete_tid; + uint64_t m_next_tid = 0; + uint64_t m_complete_tid = 0; TidResult m_tid_result; - void complete_pending_ops(); + void complete_pending_ops(UNIQUE_LOCK_T(m_lock)& l); uint32_t waiters = 0; }; diff --git a/src/test/common/Throttle.cc b/src/test/common/Throttle.cc index 836f22fe7a2..1e80dc1f35b 100644 --- a/src/test/common/Throttle.cc +++ b/src/test/common/Throttle.cc @@ -22,6 +22,7 @@ #include #include #include "gtest/gtest.h" +#include "common/backport14.h" #include "common/Mutex.h" #include "common/Thread.h" #include "common/Throttle.h" @@ -42,20 +43,16 @@ protected: public: Throttle &throttle; int64_t count; - bool waited; + bool waited = false; Thread_get(Throttle& _throttle, int64_t _count) : - throttle(_throttle), - count(_count), - waited(false) - { - } + throttle(_throttle), count(_count) {} void *entry() override { usleep(5); waited = throttle.get(count); throttle.put(count); - return NULL; + return nullptr; } }; }; @@ -216,7 +213,6 @@ TEST_F(ThrottleTest, wait) { } while(!waited); } - TEST_F(ThrottleTest, destructor) { EXPECT_DEATH({ int64_t throttle_max = 10; -- 2.47.3