};
-
-typedef std::multimap < utime_t, Context *> scheduled_map_t;
-typedef std::map < Context*, scheduled_map_t::iterator > event_lookup_map_t;
-
-SafeTimer::SafeTimer(CephContext *cct_, Mutex &l, bool safe_callbacks)
+SafeTimer::SafeTimer(CephContext *cct_, ceph::mutex &l, bool safe_callbacks)
: cct(cct_), lock(l),
safe_callbacks(safe_callbacks),
thread(NULL),
{
ldout(cct,10) << "shutdown" << dendl;
if (thread) {
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
cancel_all_events();
stopping = true;
- cond.Signal();
+ cond.notify_all();
lock.unlock();
thread->join();
lock.lock();
void SafeTimer::timer_thread()
{
- lock.lock();
+ std::unique_lock l{lock};
ldout(cct,10) << "timer_thread starting" << dendl;
while (!stopping) {
- utime_t now = ceph_clock_now();
+ auto now = clock_t::now();
while (!schedule.empty()) {
- scheduled_map_t::iterator p = schedule.begin();
+ auto p = schedule.begin();
// is the future now?
if (p->first > now)
schedule.erase(p);
ldout(cct,10) << "timer_thread executing " << callback << dendl;
- if (!safe_callbacks)
- lock.unlock();
- callback->complete(0);
- if (!safe_callbacks)
- lock.lock();
+ if (!safe_callbacks) {
+ l.unlock();
+ callback->complete(0);
+ l.lock();
+ } else {
+ callback->complete(0);
+ }
}
// recheck stopping if we dropped the lock
break;
ldout(cct,20) << "timer_thread going to sleep" << dendl;
- if (schedule.empty())
- cond.Wait(lock);
- else
- cond.WaitUntil(lock, schedule.begin()->first);
+ if (schedule.empty()) {
+ cond.wait(l);
+ } else {
+ cond.wait_until(l, schedule.begin()->first);
+ }
ldout(cct,20) << "timer_thread awake" << dendl;
}
ldout(cct,10) << "timer_thread exiting" << dendl;
- lock.unlock();
}
Context* SafeTimer::add_event_after(double seconds, Context *callback)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
- utime_t when = ceph_clock_now();
- when += seconds;
+ auto when = clock_t::now() + ceph::make_timespan(seconds);
return add_event_at(when, callback);
}
-Context* SafeTimer::add_event_at(utime_t when, Context *callback)
+Context* SafeTimer::add_event_at(SafeTimer::clock_t::time_point when, Context *callback)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct,10) << __func__ << " " << when << " -> " << callback << dendl;
if (stopping) {
ldout(cct,5) << __func__ << " already shutdown, event not added" << dendl;
/* If the event we have just inserted comes before everything else, we need to
* adjust our timeout. */
if (i == schedule.begin())
- cond.Signal();
+ cond.notify_all();
return callback;
}
bool SafeTimer::cancel_event(Context *callback)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
auto p = events.find(callback);
if (p == events.end()) {
void SafeTimer::cancel_all_events()
{
ldout(cct,10) << "cancel_all_events" << dendl;
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
while (!events.empty()) {
auto p = events.begin();
#ifndef CEPH_TIMER_H
#define CEPH_TIMER_H
-#include "Cond.h"
-#include "Mutex.h"
+#include "ceph_mutex.h"
class CephContext;
class Context;
class SafeTimer
{
CephContext *cct;
- Mutex& lock;
- Cond cond;
+ ceph::mutex& lock;
+ ceph::condition_variable cond;
bool safe_callbacks;
friend class SafeTimerThread;
void timer_thread();
void _shutdown();
- std::multimap<utime_t, Context*> schedule;
- std::map<Context*, std::multimap<utime_t, Context*>::iterator> events;
+ using clock_t = ceph::real_clock;
+ using scheduled_map_t = std::multimap<clock_t::time_point, Context*>;
+ scheduled_map_t schedule;
+ using event_lookup_map_t = std::map<Context*, scheduled_map_t::iterator>;
+ event_lookup_map_t events;
bool stopping;
void dump(const char *caller = 0) const;
* If you are able to relax requirements on cancelled callbacks, then
* setting safe_callbacks = false eliminates the lock cycle issue.
* */
- SafeTimer(CephContext *cct, Mutex &l, bool safe_callbacks=true);
+ SafeTimer(CephContext *cct, ceph::mutex &l, bool safe_callbacks=true);
virtual ~SafeTimer();
/* Call with the event_lock UNLOCKED.
/* Schedule an event in the future
* Call with the event_lock LOCKED */
Context* add_event_after(double seconds, Context *callback);
- Context* add_event_at(utime_t when, Context *callback);
+ Context* add_event_at(clock_t::time_point when, Context *callback);
/* Cancel an event.
* Call with the event_lock LOCKED
#include "common/ceph_argparse.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/Timer.h"
#include "global/global_init.h"
+#include "include/Context.h"
#include <iostream>
int array_idx;
TestContext* test_contexts[MAX_TEST_CONTEXTS];
- Mutex array_lock("test_timers_mutex");
+ ceph::mutex array_lock = ceph::make_mutex("test_timers_mutex");
}
class TestContext : public Context
void finish(int r) override
{
- array_lock.Lock();
+ lock_guard locker{array_lock};
cout << "TestContext " << num << std::endl;
test_array[array_idx++] = num;
- array_lock.Unlock();
}
~TestContext() override
void finish(int r) override
{
- array_lock.Lock();
+ std::lock_guard locker{array_lock};
cout << "StrictOrderTestContext " << num << std::endl;
test_array[num] = num;
- array_lock.Unlock();
}
~StrictOrderTestContext() override
}
template <typename T>
-static int basic_timer_test(T &timer, Mutex *lock)
+static int basic_timer_test(T &timer, ceph::mutex *lock)
{
int ret = 0;
memset(&test_array, 0, sizeof(test_array));
for (int i = 0; i < MAX_TEST_CONTEXTS; ++i) {
if (lock)
- lock->Lock();
- utime_t inc(2 * i, 0);
- utime_t t = ceph_clock_now() + inc;
+ lock->lock();
+ auto t = ceph::real_clock::now() + std::chrono::seconds(2 * i);
timer.add_event_at(t, test_contexts[i]);
if (lock)
- lock->Unlock();
+ lock->unlock();
}
bool done = false;
do {
sleep(1);
- array_lock.Lock();
+ std::lock_guard locker{array_lock};
done = (array_idx == MAX_TEST_CONTEXTS);
- array_lock.Unlock();
} while (!done);
for (int i = 0; i < MAX_TEST_CONTEXTS; ++i) {
return ret;
}
-static int test_out_of_order_insertion(SafeTimer &timer, Mutex *lock)
+static int test_out_of_order_insertion(SafeTimer &timer, ceph::mutex *lock)
{
int ret = 0;
memset(&test_array, 0, sizeof(test_array));
test_contexts[1] = new StrictOrderTestContext(1);
{
- utime_t inc(100, 0);
- utime_t t = ceph_clock_now() + inc;
- lock->Lock();
+ auto t = ceph::real_clock::now() + 100s;
+ std::lock_guard locker{*lock};
timer.add_event_at(t, test_contexts[0]);
- lock->Unlock();
}
{
- utime_t inc(2, 0);
- utime_t t = ceph_clock_now() + inc;
- lock->Lock();
+ auto t = ceph::real_clock::now() + 2s;
+ std::lock_guard locker{*lock};
timer.add_event_at(t, test_contexts[1]);
- lock->Unlock();
}
int secs = 0;
for (; secs < 100 ; ++secs) {
sleep(1);
- array_lock.Lock();
+ array_lock.lock();
int a = test_array[1];
- array_lock.Unlock();
+ array_lock.unlock();
if (a == 1)
break;
}
return ret;
}
-static int safe_timer_cancel_all_test(SafeTimer &safe_timer, Mutex& safe_timer_lock)
+static int safe_timer_cancel_all_test(SafeTimer &safe_timer,
+ ceph::mutex& safe_timer_lock)
{
cout << __PRETTY_FUNCTION__ << std::endl;
test_contexts[i] = new TestContext(i);
}
- safe_timer_lock.Lock();
+ safe_timer_lock.lock();
for (int i = 0; i < MAX_TEST_CONTEXTS; ++i) {
- utime_t inc(4 * i, 0);
- utime_t t = ceph_clock_now() + inc;
+ auto t = ceph::real_clock::now() + std::chrono::seconds(4 * i);
safe_timer.add_event_at(t, test_contexts[i]);
}
- safe_timer_lock.Unlock();
+ safe_timer_lock.unlock();
sleep(10);
- safe_timer_lock.Lock();
+ safe_timer_lock.lock();
safe_timer.cancel_all_events();
- safe_timer_lock.Unlock();
+ safe_timer_lock.unlock();
for (int i = 0; i < array_idx; ++i) {
if (test_array[i] != i) {
return ret;
}
-static int safe_timer_cancellation_test(SafeTimer &safe_timer, Mutex& safe_timer_lock)
+static int safe_timer_cancellation_test(SafeTimer &safe_timer,
+ ceph::mutex& safe_timer_lock)
{
cout << __PRETTY_FUNCTION__ << std::endl;
test_contexts[i] = new StrictOrderTestContext(i);
}
- safe_timer_lock.Lock();
+ safe_timer_lock.lock();
for (int i = 0; i < MAX_TEST_CONTEXTS; ++i) {
- utime_t inc(4 * i, 0);
- utime_t t = ceph_clock_now() + inc;
+ auto t = ceph::real_clock::now() + std::chrono::seconds(4 * i);
safe_timer.add_event_at(t, test_contexts[i]);
}
- safe_timer_lock.Unlock();
+ safe_timer_lock.unlock();
// cancel the even-numbered events
for (int i = 0; i < MAX_TEST_CONTEXTS; i += 2) {
- safe_timer_lock.Lock();
+ safe_timer_lock.lock();
safe_timer.cancel_event(test_contexts[i]);
- safe_timer_lock.Unlock();
+ safe_timer_lock.unlock();
}
sleep(20);
- safe_timer_lock.Lock();
+ safe_timer_lock.lock();
safe_timer.cancel_all_events();
- safe_timer_lock.Unlock();
+ safe_timer_lock.unlock();
for (int i = 1; i < array_idx; i += 2) {
if (test_array[i] != i) {
common_init_finish(g_ceph_context);
int ret;
- Mutex safe_timer_lock("safe_timer_lock");
+ ceph::mutex safe_timer_lock = ceph::make_mutex("safe_timer_lock");
SafeTimer safe_timer(g_ceph_context, safe_timer_lock);
ret = basic_timer_test <SafeTimer>(safe_timer, &safe_timer_lock);