]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
WorkQueue: add TPHandle to allow _process to ping the hb map
authorSamuel Just <sam.just@inktank.com>
Thu, 24 Jan 2013 19:04:04 +0000 (11:04 -0800)
committerSamuel Just <sam.just@inktank.com>
Fri, 25 Jan 2013 01:25:08 +0000 (17:25 -0800)
Backport: bobtail
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/common/WorkQueue.cc
src/common/WorkQueue.h

index f2fc1747a9bba6409b8f4dbe287ac2a283a8d9b1..a7efcc02870d101570753136b966370773d7455e 100644 (file)
@@ -49,6 +49,11 @@ ThreadPool::ThreadPool(CephContext *cct_, string nm, int n, const char *option)
   }
 }
 
+void ThreadPool::TPHandle::reset_tp_timeout() {
+  cct->get_heartbeat_map()->reset_timeout(
+    hb, grace, suicide_grace);
+}
+
 ThreadPool::~ThreadPool()
 {
   assert(_threads.empty());
@@ -108,9 +113,10 @@ void ThreadPool::worker(WorkThread *wt)
          processing++;
          ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
                        << " (" << processing << " active)" << dendl;
-         cct->get_heartbeat_map()->reset_timeout(hb, wq->timeout_interval, wq->suicide_interval);
+         TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
+         tp_handle.reset_tp_timeout();
          _lock.Unlock();
-         wq->_void_process(item);
+         wq->_void_process(item, tp_handle);
          _lock.Lock();
          wq->_void_process_finish(item);
          processing--;
index 2475b45d1ac0388916de49b095c51e1aae2375dd..9fb215b918843440ea90a158211fcd095ecbe66f 100644 (file)
@@ -19,6 +19,7 @@
 #include "Cond.h"
 #include "Thread.h"
 #include "common/config_obs.h"
+#include "common/HeartbeatMap.h"
 
 class CephContext;
 
@@ -33,6 +34,24 @@ class ThreadPool : public md_config_obs_t {
   int _draining;
   Cond _wait_cond;
 
+public:
+  class TPHandle {
+    friend class ThreadPool;
+    CephContext *cct;
+    heartbeat_handle_d *hb;
+    time_t grace;
+    time_t suicide_grace;
+    TPHandle(
+      CephContext *cct,
+      heartbeat_handle_d *hb,
+      time_t grace,
+      time_t suicide_grace)
+      : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
+  public:
+    void reset_tp_timeout();
+  };
+private:
+
   struct WorkQueue_ {
     string name;
     time_t timeout_interval, suicide_interval;
@@ -43,7 +62,7 @@ class ThreadPool : public md_config_obs_t {
     virtual void _clear() = 0;
     virtual bool _empty() = 0;
     virtual void *_void_dequeue() = 0;
-    virtual void _void_process(void *) = 0;
+    virtual void _void_process(void *item, TPHandle &handle) = 0;
     virtual void _void_process_finish(void *) = 0;
   };
 
@@ -66,7 +85,10 @@ public:
     virtual bool _enqueue(T *) = 0;
     virtual void _dequeue(T *) = 0;
     virtual void _dequeue(list<T*> *) = 0;
-    virtual void _process(const list<T*> &) = 0;
+    virtual void _process(const list<T*> &) { assert(0); }
+    virtual void _process(const list<T*> &items, TPHandle &handle) {
+      _process(items);
+    }
     virtual void _process_finish(const list<T*> &) {}
 
     void *_void_dequeue() {
@@ -79,8 +101,8 @@ public:
        return 0;
       }
     }
-    void _void_process(void *p) {
-      _process(*((list<T*>*)p));
+    void _void_process(void *p, TPHandle &handle) {
+      _process(*((list<T*>*)p), handle);
     }
     void _void_process_finish(void *p) {
       _process_finish(*(list<T*>*)p);
@@ -141,7 +163,10 @@ public:
     virtual void _enqueue_front(T) = 0;
     virtual bool _empty() = 0;
     virtual U _dequeue() = 0;
-    virtual void _process(U) = 0;
+    virtual void _process(U) { assert(0); }
+    virtual void _process(U u, TPHandle &) {
+      _process(u);
+    }
     virtual void _process_finish(U) {}
 
     void *_void_dequeue() {
@@ -154,14 +179,14 @@ public:
       }
       return ((void*)1); // Not used
     }
-    void _void_process(void *) {
+    void _void_process(void *, TPHandle &handle) {
       _lock.Lock();
       assert(!to_process.empty());
       U u = to_process.front();
       to_process.pop_front();
       _lock.Unlock();
 
-      _process(u);
+      _process(u, handle);
 
       _lock.Lock();
       to_finish.push_back(u);
@@ -216,14 +241,17 @@ public:
     virtual bool _enqueue(T *) = 0;
     virtual void _dequeue(T *) = 0;
     virtual T *_dequeue() = 0;
-    virtual void _process(T *) = 0;
+    virtual void _process(T *t) { assert(0); }
+    virtual void _process(T *t, TPHandle &) {
+      _process(t);
+    }
     virtual void _process_finish(T *) {}
     
     void *_void_dequeue() {
       return (void *)_dequeue();
     }
-    void _void_process(void *p) {
-      _process((T *)p);
+    void _void_process(void *p, TPHandle &handle) {
+      _process((T *)p, handle);
     }
     void _void_process_finish(void *p) {
       _process_finish((T *)p);