]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
workqueue: register and time out worker threads
authorSage Weil <sage.weil@dreamhost.com>
Thu, 28 Jul 2011 05:49:12 +0000 (22:49 -0700)
committerSage Weil <sage@newdream.net>
Thu, 28 Jul 2011 16:49:23 +0000 (09:49 -0700)
Register and unregister worker threads.  Periodically touch heartbeat
when idle.  Set heartbeat timeout before processing a queue item.

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/common/WorkQueue.cc

index a7d6ae029b8aed8306cd5d003caa37e09c3784ce..2d4f3a930fd6e8f677d52a16c30f13ecdd6b12d9 100644 (file)
  * 
  */
 
+#include <sstream>
+
 #include "include/types.h"
+#include "include/utime.h"
 #include "WorkQueue.h"
 
 #include "common/config.h"
+#include "common/HeartbeatMap.h"
 
 #define DOUT_SUBSYS tp
 #undef dout_prefix
@@ -26,6 +30,11 @@ void ThreadPool::worker()
 {
   _lock.Lock();
   ldout(cct,10) << "worker start" << dendl;
+  
+  std::stringstream ss;
+  ss << name << '@' << this;
+  heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(pthread_self(), ss.str());
+
   while (!_stop) {
     if (!_pause && work_queues.size()) {
       WorkQueue_* wq;
@@ -41,6 +50,7 @@ void ThreadPool::worker()
          processing++;
          ldout(cct,12) << "worker wq " << wq->name << " start processing " << item << dendl;
          _lock.Unlock();
+         cct->get_heartbeat_map()->touch_worker(hb, wq->timeout_interval);
          wq->_void_process(item);
          _lock.Lock();
          wq->_void_process_finish(item);
@@ -55,10 +65,15 @@ void ThreadPool::worker()
       if (did)
        continue;
     }
+
     ldout(cct,15) << "worker waiting" << dendl;
-    _cond.Wait(_lock);
+    cct->get_heartbeat_map()->touch_worker(hb, 4);
+    _cond.WaitInterval(cct, _lock, utime_t(2, 0));
   }
   ldout(cct,0) << "worker finish" << dendl;
+
+  cct->get_heartbeat_map()->remove_worker(hb);
+
   _lock.Unlock();
 }