]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
time: create RWTimer
authorYehuda Sadeh <yehuda@inktank.com>
Mon, 17 Mar 2014 19:58:02 +0000 (12:58 -0700)
committerJohn Spray <john.spray@redhat.com>
Mon, 25 Aug 2014 00:33:40 +0000 (01:33 +0100)
a timer implementation that uses RWLock

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/common/Timer.cc
src/common/Timer.h

index f90d9ab9694e69bd0fb5ce2d1d661c1c82e00e23..7d04823f787219035f078398877a5cf3b979c7c5 100644 (file)
@@ -191,3 +191,182 @@ void SafeTimer::dump(const char *caller) const
        ++s)
     ldout(cct,10) << " " << s->first << "->" << s->second << dendl;
 }
+
+class RWTimerThread : public Thread {
+  RWTimer *parent;
+public:
+  RWTimerThread(RWTimer *s) : parent(s) {}
+  void *entry() {
+    parent->timer_thread();
+    return NULL;
+  }
+};
+
+RWTimer::RWTimer(CephContext *cct_, RWLock &rwl, bool safe_callbacks)
+  : cct(cct_), rwlock(rwl), lock("RWTimer::lock"),
+    safe_callbacks(safe_callbacks),
+    thread(NULL),
+    stopping(false)
+{
+}
+
+RWTimer::~RWTimer()
+{
+  assert(thread == NULL);
+}
+
+void RWTimer::init()
+{
+  ldout(cct,10) << "init" << dendl;
+  thread = new RWTimerThread(this);
+  thread->create();
+}
+
+void RWTimer::shutdown()
+{
+  ldout(cct,10) << "shutdown" << dendl;
+  if (thread) {
+    assert(rwlock.is_locked());
+    cancel_all_events();
+    stopping = true;
+    lock.Lock();
+    cond.Signal();
+    lock.Unlock();
+    thread->join();
+    delete thread;
+    thread = NULL;
+  }
+}
+
+void RWTimer::timer_thread()
+{
+  rwlock.get_write();
+  ldout(cct,10) << "timer_thread starting" << dendl;
+  lock.Lock();
+
+  while (!stopping) {
+    utime_t now = ceph_clock_now(cct);
+
+    while (!schedule.empty()) {
+      scheduled_map_t::iterator p = schedule.begin();
+
+      // is the future now?
+      if (p->first > now)
+       break;
+
+      Context *callback = p->second;
+
+      events.erase(callback);
+      schedule.erase(p);
+      ldout(cct,10) << "timer_thread executing " << callback << dendl;
+
+      lock.Unlock();
+      
+      if (!safe_callbacks)
+       rwlock.unlock();
+      callback->complete(0);
+      if (!safe_callbacks)
+       rwlock.get_read();
+
+      lock.Lock();
+    }
+
+    /* need to drop off rwlock here, don't want to wait on lock when rwlock is taken */
+
+    rwlock.unlock();
+
+    ldout(cct,20) << "timer_thread going to sleep" << dendl;
+
+    if (schedule.empty())
+      cond.Wait(lock);
+    else
+      cond.WaitUntil(lock, schedule.begin()->first);
+
+    lock.Unlock();
+    rwlock.get_write();
+    lock.Lock();
+    ldout(cct,20) << "timer_thread awake" << dendl;
+  }
+  lock.Unlock();
+  ldout(cct,10) << "timer_thread exiting" << dendl;
+  rwlock.unlock();
+}
+
+void RWTimer::add_event_after(double seconds, Context *callback)
+{
+  utime_t when = ceph_clock_now(cct);
+  when += seconds;
+  add_event_at(when, callback);
+}
+
+void RWTimer::add_event_at(utime_t when, Context *callback)
+{
+  assert(rwlock.is_locked());
+
+  ldout(cct,10) << "add_event_at " << when << " -> " << callback << dendl;
+
+  Mutex::Locker l(lock);
+
+  scheduled_map_t::value_type s_val(when, callback);
+  scheduled_map_t::iterator i = schedule.insert(s_val);
+
+  event_lookup_map_t::value_type e_val(callback, i);
+  pair < event_lookup_map_t::iterator, bool > rval(events.insert(e_val));
+
+  /* If you hit this, you tried to insert the same Context* twice. */
+  assert(rval.second);
+
+  /* If the event we have just inserted comes before everything else, we need to
+   * adjust our timeout. */
+  if (i == schedule.begin())
+    cond.Signal();
+}
+
+bool RWTimer::cancel_event(Context *callback)
+{
+  assert(rwlock.is_locked());
+
+  Mutex::Locker l(lock);
+  
+  std::map<Context*, std::multimap<utime_t, Context*>::iterator>::iterator p = events.find(callback);
+  if (p == events.end()) {
+    ldout(cct,10) << "cancel_event " << callback << " not found" << dendl;
+    return false;
+  }
+
+  ldout(cct,10) << "cancel_event " << p->second->first << " -> " << callback << dendl;
+  delete p->first;
+
+  schedule.erase(p->second);
+  events.erase(p);
+  return true;
+}
+
+void RWTimer::cancel_all_events()
+{
+  assert(rwlock.is_locked());
+
+  ldout(cct,10) << "cancel_all_events" << dendl;
+
+  Mutex::Locker l(lock);
+  
+  while (!events.empty()) {
+    std::map<Context*, std::multimap<utime_t, Context*>::iterator>::iterator p = events.begin();
+    ldout(cct,10) << " cancelled " << p->second->first << " -> " << p->first << dendl;
+    delete p->first;
+    schedule.erase(p->second);
+    events.erase(p);
+  }
+}
+
+void RWTimer::dump(const char *caller) const
+{
+  if (!caller)
+    caller = "";
+  ldout(cct,10) << "dump " << caller << dendl;
+
+  for (scheduled_map_t::const_iterator s = schedule.begin();
+       s != schedule.end();
+       ++s)
+    ldout(cct,10) << " " << s->first << "->" << s->second << dendl;
+}
index 40d5015e18bb54a9c84e6f9df2c234ed18e55160..11333998f50d025b0ee5a105b8632b888e6fa005 100644 (file)
 
 #include "Cond.h"
 #include "Mutex.h"
+#include "RWLock.h"
 
 #include <map>
 
 class CephContext;
 class Context;
 class SafeTimerThread;
+class RWTimerThread;
 
 class SafeTimer
 {
@@ -92,4 +94,75 @@ public:
   void cancel_all_events();
 
 };
+
+class RWTimer
+{
+  // This class isn't supposed to be copied
+  RWTimer(const RWTimer &rhs);
+  RWTimer& operator=(const RWTimer &rhs);
+
+  CephContext *cct;
+  RWLock& rwlock;
+  Mutex lock;
+  Cond cond;
+  bool safe_callbacks;
+
+  friend class RWTimerThread;
+  RWTimerThread *thread;
+
+  void timer_thread();
+  void _shutdown();
+
+  std::multimap<utime_t, Context*> schedule;
+  std::map<Context*, std::multimap<utime_t, Context*>::iterator> events;
+  bool stopping;
+
+  void dump(const char *caller = 0) const;
+
+public:
+  /* Safe callbacks determines whether callbacks are called with the lock
+   * held.
+   *
+   * safe_callbacks = true (default option) guarantees that a cancelled
+   * event's callback will never be called.
+   *
+   * Under some circumstances, holding the lock can cause lock cycles.
+   * If you are able to relax requirements on cancelled callbacks, then
+   * setting safe_callbacks = false eliminates the lock cycle issue.
+   * */
+  RWTimer(CephContext *cct, RWLock &rwl, bool safe_callbacks=true);
+  ~RWTimer();
+
+  /* Call with the event_lock UNLOCKED.
+   *
+   * Cancel all events and stop the timer thread.
+   *
+   * If there are any events that still have to run, they will need to take
+   * the event_lock first. */
+  void init();
+  void shutdown();
+
+  /* Schedule an event in the future
+   * Call with the event_lock LOCKED */
+  void add_event_after(double seconds, Context *callback);
+  void add_event_at(utime_t when, Context *callback);
+
+  /* Cancel an event.
+   * Call with the event_lock LOCKED
+   *
+   * Returns true if the callback was cancelled.
+   * Returns false if you never addded the callback in the first place.
+   */
+  bool cancel_event(Context *callback);
+
+  /* Cancel all events.
+   * Call with the event_lock LOCKED
+   *
+   * When this function returns, all events have been cancelled, and there are no
+   * more in progress.
+   */
+  void cancel_all_events();
+
+};
+
 #endif