Get rid of the undefined behavior of destroying condition variables
while they're being waited on.
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
(cherry picked from commit
0f52f486ef05edf8bf89e2bbb7b79f1bc1644cfa)
Conflicts:
src/common/Throttle.cc
src/common/Throttle.h
src/test/common/Throttle.cc: in jewel, we don't have perfconter
for throttles, neither do we have backport14.h back then, so we need to
resolve these conflicts, by removing perfcounter related change in
Throttle.cc, and add the make_unique helper for test/common/Throttle.cc,
add scope_guard to Throttle.cc.
Throttle::~Throttle()
{
- while (!cond.empty()) {
- Cond *cv = cond.front();
- delete cv;
- cond.pop_front();
+ {
+ Mutex::Locker l(lock);
+ assert(cond.empty());
}
if (!use_perf)
max.set((size_t)m);
}
+template <typename F>
+struct scope_guard {
+ F f;
+ scope_guard() = delete;
+ scope_guard(const scope_guard &) = delete;
+ scope_guard(scope_guard &&) = default;
+ scope_guard & operator=(const scope_guard &) = delete;
+ scope_guard & operator=(scope_guard &&) = default;
+ scope_guard(F &&f) : f(std::move(f)) {}
+ ~scope_guard() {
+ f();
+ }
+};
+
+template <typename F>
+scope_guard<F> make_scope_guard(F &&f) {
+ return scope_guard<F>(std::forward<F>(f));
+}
+
bool Throttle::_wait(int64_t c)
{
utime_t start;
bool waited = false;
if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
- Cond *cv = new Cond;
- cond.push_back(cv);
- do {
- if (!waited) {
- ldout(cct, 2) << "_wait waiting..." << dendl;
- if (logger)
- start = ceph_clock_now(cct);
- }
+ {
+ auto cv = cond.insert(cond.end(), new Cond);
+ auto w = make_scope_guard([this, cv]() {
+ delete *cv;
+ cond.erase(cv);
+ });
waited = true;
- cv->Wait(lock);
- } while (_should_wait(c) || cv != cond.front());
+ ldout(cct, 2) << "_wait waiting..." << dendl;
+ if (logger)
+ start = ceph_clock_now(cct);
+
+ do {
+ (*cv)->Wait(lock);
+ } while ((_should_wait(c) || cv != cond.begin()));
- if (waited) {
- ldout(cct, 3) << "_wait finished waiting" << dendl;
+ ldout(cct, 2) << "_wait finished waiting" << dendl;
if (logger) {
utime_t dur = ceph_clock_now(cct) - start;
- logger->tinc(l_throttle_wait, dur);
+ logger->tinc(l_throttle_wait, dur);
}
}
-
- delete cv;
- cond.pop_front();
-
// wake up the next guy
if (!cond.empty())
cond.front()->SignalOne();
return count.read();
}
+BackoffThrottle::~BackoffThrottle()
+{
+ {
+ locker l(lock);
+ assert(waiters.empty());
+ }
+}
+
bool BackoffThrottle::set_params(
double _low_threshhold,
double _high_threshhold,
{
Mutex::Locker l(m_lock);
assert(m_current == 0);
+ assert(waiters == 0);
}
void SimpleThrottle::start_op()
{
Mutex::Locker l(m_lock);
- while (m_max == m_current)
+ while (m_max == m_current) {
+ waiters++;
m_cond.Wait(m_lock);
+ waiters--;
+ }
++m_current;
}
int SimpleThrottle::wait_for_ret()
{
Mutex::Locker l(m_lock);
- while (m_current > 0)
+ while (m_current > 0) {
+ waiters++;
m_cond.Wait(m_lock);
+ waiters--;
+ }
return m_ret;
}
m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) {
}
+OrderedThrottle::~OrderedThrottle() {
+ Mutex::Locker locker(m_lock);
+ assert(waiters == 0);
+}
+
C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
assert(on_finish != NULL);
complete_pending_ops();
while (m_max == m_current) {
+ ++waiters;
m_cond.Wait(m_lock);
+ --waiters;
complete_pending_ops();
}
++m_current;
complete_pending_ops();
while (m_current > 0) {
+ ++waiters;
m_cond.Wait(m_lock);
+ --waiters;
complete_pending_ops();
}
return m_ret_val;
#include <iostream>
#include <condition_variable>
#include <chrono>
+#include <stdexcept>
#include "include/atomic.h"
+
+#include "Cond.h"
#include "include/Context.h"
class CephContext;
Mutex lock;
list<Cond*> cond;
const bool use_perf;
+ bool shutting_down = false;
+ Cond shutdown_clear;
public:
Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true);
BackoffThrottle(
unsigned expected_concurrency ///< [in] determines size of conds
) : conds(expected_concurrency) {}
+ ~BackoffThrottle();
};
uint64_t m_current;
int m_ret;
bool m_ignore_enoent;
+ uint32_t waiters = 0;
};
class C_SimpleThrottle : public Context {
class OrderedThrottle {
public:
OrderedThrottle(uint64_t max, bool ignore_enoent);
+ ~OrderedThrottle();
C_OrderedThrottle *start_op(Context *on_finish);
void end_op(int r);
TidResult m_tid_result;
void complete_pending_ops();
+ uint32_t waiters = 0;
};
#endif
return NULL;
}
};
-
};
TEST_F(ThrottleTest, Throttle) {
} while(!waited);
}
-TEST_F(ThrottleTest, destructor) {
- Thread_get *t;
- {
- int64_t throttle_max = 10;
- Throttle *throttle = new Throttle(g_ceph_context, "throttle", throttle_max);
-
- ASSERT_FALSE(throttle->get(5));
-
- t = new Thread_get(*throttle, 7);
- t->create("t_throttle");
- bool blocked;
- useconds_t delay = 1;
- do {
- usleep(delay);
- if (throttle->get_or_fail(1)) {
- throttle->put(1);
- blocked = false;
- } else {
- blocked = true;
- }
- delay *= 2;
- } while(!blocked);
- delete throttle;
- }
+namespace ceph {
+template<typename T>
+struct uniquity {
+ using datum = std::unique_ptr<T>;
+};
- { //
- // The thread is left hanging, otherwise it will abort().
- // Deleting the Throttle on which it is waiting creates a
- // inconsistency that will be detected: the Throttle object that
- // it references no longer exists.
- //
- pthread_t id = t->get_thread_id();
- ASSERT_EQ(pthread_kill(id, 0), 0);
- delete t;
- ASSERT_EQ(pthread_kill(id, 0), 0);
- }
+template<typename T, typename... Args>
+inline typename uniquity<T>::datum make_unique(Args&&... args) {
+ return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
+}
+}
+
+TEST_F(ThrottleTest, destructor) {
+ EXPECT_DEATH({
+ int64_t throttle_max = 10;
+ auto throttle = ceph::make_unique<Throttle>(g_ceph_context, "throttle",
+ throttle_max);
+
+
+ ASSERT_FALSE(throttle->get(5));
+ unique_ptr<Thread_get> t = ceph::make_unique<Thread_get>(*throttle, 7);
+ t->create("t_throttle");
+ bool blocked;
+ useconds_t delay = 1;
+ do {
+ usleep(delay);
+ if (throttle->get_or_fail(1)) {
+ throttle->put(1);
+ blocked = false;
+ } else {
+ blocked = true;
+ }
+ delay *= 2;
+ } while (!blocked);
+ }, ".*");
}
std::pair<double, std::chrono::duration<double> > test_backoff(
wait_time / waits);
}
+TEST(BackoffThrottle, destruct) {
+ EXPECT_DEATH({
+ auto throttle = ceph::make_unique<BackoffThrottle>(10);
+ ASSERT_TRUE(throttle->set_params(0.4, 0.6, 1000, 2, 10, 6, nullptr));
+
+ throttle->get(5);
+ {
+ auto& t = *throttle;
+ std::thread([&t]() {
+ usleep(5);
+ t.get(6);
+ });
+ }
+ // No equivalent of get_or_fail()
+ std::this_thread::sleep_for(std::chrono::milliseconds(250));
+ }, ".*");
+}
+
TEST(BackoffThrottle, undersaturated)
{
auto results = test_backoff(
ASSERT_GT(results.second.count(), 0.0005);
}
+TEST(OrderedThrottle, destruct) {
+ EXPECT_DEATH({
+ auto throttle = ceph::make_unique<OrderedThrottle>(1, false);
+ throttle->start_op(nullptr);
+ {
+ auto& t = *throttle;
+ std::thread([&t]() {
+ usleep(5);
+ t.start_op(nullptr);
+ });
+ }
+ // No equivalent of get_or_fail()
+ std::this_thread::sleep_for(std::chrono::milliseconds(250));
+ }, ".*");
+}
+
/*
* Local Variables:
* compile-command: "cd ../.. ;
* "
* End:
*/
-