++s)
ldout(cct,10) << " " << s->first << "->" << s->second << dendl;
}
+
+class RWTimerThread : public Thread {
+ RWTimer *parent;
+public:
+ RWTimerThread(RWTimer *s) : parent(s) {}
+ void *entry() {
+ parent->timer_thread();
+ return NULL;
+ }
+};
+
+RWTimer::RWTimer(CephContext *cct_, RWLock &rwl, bool safe_callbacks)
+ : cct(cct_), rwlock(rwl), lock("RWTimer::lock"),
+ safe_callbacks(safe_callbacks),
+ thread(NULL),
+ stopping(false)
+{
+}
+
+RWTimer::~RWTimer()
+{
+ assert(thread == NULL);
+}
+
+void RWTimer::init()
+{
+ ldout(cct,10) << "init" << dendl;
+ thread = new RWTimerThread(this);
+ thread->create();
+}
+
+void RWTimer::shutdown()
+{
+ ldout(cct,10) << "shutdown" << dendl;
+ if (thread) {
+ assert(rwlock.is_locked());
+ cancel_all_events();
+ stopping = true;
+ lock.Lock();
+ cond.Signal();
+ lock.Unlock();
+ thread->join();
+ delete thread;
+ thread = NULL;
+ }
+}
+
+void RWTimer::timer_thread()
+{
+ rwlock.get_write();
+ ldout(cct,10) << "timer_thread starting" << dendl;
+ lock.Lock();
+
+ while (!stopping) {
+ utime_t now = ceph_clock_now(cct);
+
+ while (!schedule.empty()) {
+ scheduled_map_t::iterator p = schedule.begin();
+
+ // is the future now?
+ if (p->first > now)
+ break;
+
+ Context *callback = p->second;
+
+ events.erase(callback);
+ schedule.erase(p);
+ ldout(cct,10) << "timer_thread executing " << callback << dendl;
+
+ lock.Unlock();
+
+ if (!safe_callbacks)
+ rwlock.unlock();
+ callback->complete(0);
+ if (!safe_callbacks)
+ rwlock.get_read();
+
+ lock.Lock();
+ }
+
+ /* need to drop off rwlock here, don't want to wait on lock when rwlock is taken */
+
+ rwlock.unlock();
+
+ ldout(cct,20) << "timer_thread going to sleep" << dendl;
+
+ if (schedule.empty())
+ cond.Wait(lock);
+ else
+ cond.WaitUntil(lock, schedule.begin()->first);
+
+ lock.Unlock();
+ rwlock.get_write();
+ lock.Lock();
+ ldout(cct,20) << "timer_thread awake" << dendl;
+ }
+ lock.Unlock();
+ ldout(cct,10) << "timer_thread exiting" << dendl;
+ rwlock.unlock();
+}
+
+void RWTimer::add_event_after(double seconds, Context *callback)
+{
+ utime_t when = ceph_clock_now(cct);
+ when += seconds;
+ add_event_at(when, callback);
+}
+
+void RWTimer::add_event_at(utime_t when, Context *callback)
+{
+ assert(rwlock.is_locked());
+
+ ldout(cct,10) << "add_event_at " << when << " -> " << callback << dendl;
+
+ Mutex::Locker l(lock);
+
+ scheduled_map_t::value_type s_val(when, callback);
+ scheduled_map_t::iterator i = schedule.insert(s_val);
+
+ event_lookup_map_t::value_type e_val(callback, i);
+ pair < event_lookup_map_t::iterator, bool > rval(events.insert(e_val));
+
+ /* If you hit this, you tried to insert the same Context* twice. */
+ assert(rval.second);
+
+ /* If the event we have just inserted comes before everything else, we need to
+ * adjust our timeout. */
+ if (i == schedule.begin())
+ cond.Signal();
+}
+
+bool RWTimer::cancel_event(Context *callback)
+{
+ assert(rwlock.is_locked());
+
+ Mutex::Locker l(lock);
+
+ std::map<Context*, std::multimap<utime_t, Context*>::iterator>::iterator p = events.find(callback);
+ if (p == events.end()) {
+ ldout(cct,10) << "cancel_event " << callback << " not found" << dendl;
+ return false;
+ }
+
+ ldout(cct,10) << "cancel_event " << p->second->first << " -> " << callback << dendl;
+ delete p->first;
+
+ schedule.erase(p->second);
+ events.erase(p);
+ return true;
+}
+
+void RWTimer::cancel_all_events()
+{
+ assert(rwlock.is_locked());
+
+ ldout(cct,10) << "cancel_all_events" << dendl;
+
+ Mutex::Locker l(lock);
+
+ while (!events.empty()) {
+ std::map<Context*, std::multimap<utime_t, Context*>::iterator>::iterator p = events.begin();
+ ldout(cct,10) << " cancelled " << p->second->first << " -> " << p->first << dendl;
+ delete p->first;
+ schedule.erase(p->second);
+ events.erase(p);
+ }
+}
+
+void RWTimer::dump(const char *caller) const
+{
+ if (!caller)
+ caller = "";
+ ldout(cct,10) << "dump " << caller << dendl;
+
+ for (scheduled_map_t::const_iterator s = schedule.begin();
+ s != schedule.end();
+ ++s)
+ ldout(cct,10) << " " << s->first << "->" << s->second << dendl;
+}
#include "Cond.h"
#include "Mutex.h"
+#include "RWLock.h"
#include <map>
class CephContext;
class Context;
class SafeTimerThread;
+class RWTimerThread;
class SafeTimer
{
void cancel_all_events();
};
+
+class RWTimer
+{
+ // This class isn't supposed to be copied
+ RWTimer(const RWTimer &rhs);
+ RWTimer& operator=(const RWTimer &rhs);
+
+ CephContext *cct;
+ RWLock& rwlock;
+ Mutex lock;
+ Cond cond;
+ bool safe_callbacks;
+
+ friend class RWTimerThread;
+ RWTimerThread *thread;
+
+ void timer_thread();
+ void _shutdown();
+
+ std::multimap<utime_t, Context*> schedule;
+ std::map<Context*, std::multimap<utime_t, Context*>::iterator> events;
+ bool stopping;
+
+ void dump(const char *caller = 0) const;
+
+public:
+ /* Safe callbacks determines whether callbacks are called with the lock
+ * held.
+ *
+ * safe_callbacks = true (default option) guarantees that a cancelled
+ * event's callback will never be called.
+ *
+ * Under some circumstances, holding the lock can cause lock cycles.
+ * If you are able to relax requirements on cancelled callbacks, then
+ * setting safe_callbacks = false eliminates the lock cycle issue.
+ * */
+ RWTimer(CephContext *cct, RWLock &rwl, bool safe_callbacks=true);
+ ~RWTimer();
+
+ /* Call with the event_lock UNLOCKED.
+ *
+ * Cancel all events and stop the timer thread.
+ *
+ * If there are any events that still have to run, they will need to take
+ * the event_lock first. */
+ void init();
+ void shutdown();
+
+ /* Schedule an event in the future
+ * Call with the event_lock LOCKED */
+ void add_event_after(double seconds, Context *callback);
+ void add_event_at(utime_t when, Context *callback);
+
+ /* Cancel an event.
+ * Call with the event_lock LOCKED
+ *
+ * Returns true if the callback was cancelled.
+ * Returns false if you never addded the callback in the first place.
+ */
+ bool cancel_event(Context *callback);
+
+ /* Cancel all events.
+ * Call with the event_lock LOCKED
+ *
+ * When this function returns, all events have been cancelled, and there are no
+ * more in progress.
+ */
+ void cancel_all_events();
+
+};
+
#endif