}
}
+void ThreadPool::TPHandle::reset_tp_timeout() {
+ cct->get_heartbeat_map()->reset_timeout(
+ hb, grace, suicide_grace);
+}
+
ThreadPool::~ThreadPool()
{
assert(_threads.empty());
processing++;
ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
<< " (" << processing << " active)" << dendl;
- cct->get_heartbeat_map()->reset_timeout(hb, wq->timeout_interval, wq->suicide_interval);
+ TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
+ tp_handle.reset_tp_timeout();
_lock.Unlock();
- wq->_void_process(item);
+ wq->_void_process(item, tp_handle);
_lock.Lock();
wq->_void_process_finish(item);
processing--;
#include "Cond.h"
#include "Thread.h"
#include "common/config_obs.h"
+#include "common/HeartbeatMap.h"
class CephContext;
int _draining;
Cond _wait_cond;
+public:
+ class TPHandle {
+ friend class ThreadPool;
+ CephContext *cct;
+ heartbeat_handle_d *hb;
+ time_t grace;
+ time_t suicide_grace;
+ TPHandle(
+ CephContext *cct,
+ heartbeat_handle_d *hb,
+ time_t grace,
+ time_t suicide_grace)
+ : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
+ public:
+ void reset_tp_timeout();
+ };
+private:
+
struct WorkQueue_ {
string name;
time_t timeout_interval, suicide_interval;
virtual void _clear() = 0;
virtual bool _empty() = 0;
virtual void *_void_dequeue() = 0;
- virtual void _void_process(void *) = 0;
+ virtual void _void_process(void *item, TPHandle &handle) = 0;
virtual void _void_process_finish(void *) = 0;
};
virtual bool _enqueue(T *) = 0;
virtual void _dequeue(T *) = 0;
virtual void _dequeue(list<T*> *) = 0;
- virtual void _process(const list<T*> &) = 0;
+ virtual void _process(const list<T*> &) { assert(0); }
+ virtual void _process(const list<T*> &items, TPHandle &handle) {
+ _process(items);
+ }
virtual void _process_finish(const list<T*> &) {}
void *_void_dequeue() {
return 0;
}
}
- void _void_process(void *p) {
- _process(*((list<T*>*)p));
+ void _void_process(void *p, TPHandle &handle) {
+ _process(*((list<T*>*)p), handle);
}
void _void_process_finish(void *p) {
_process_finish(*(list<T*>*)p);
virtual void _enqueue_front(T) = 0;
virtual bool _empty() = 0;
virtual U _dequeue() = 0;
- virtual void _process(U) = 0;
+ virtual void _process(U) { assert(0); }
+ virtual void _process(U u, TPHandle &) {
+ _process(u);
+ }
virtual void _process_finish(U) {}
void *_void_dequeue() {
}
return ((void*)1); // Not used
}
- void _void_process(void *) {
+ void _void_process(void *, TPHandle &handle) {
_lock.Lock();
assert(!to_process.empty());
U u = to_process.front();
to_process.pop_front();
_lock.Unlock();
- _process(u);
+ _process(u, handle);
_lock.Lock();
to_finish.push_back(u);
virtual bool _enqueue(T *) = 0;
virtual void _dequeue(T *) = 0;
virtual T *_dequeue() = 0;
- virtual void _process(T *) = 0;
+ virtual void _process(T *t) { assert(0); }
+ virtual void _process(T *t, TPHandle &) {
+ _process(t);
+ }
virtual void _process_finish(T *) {}
void *_void_dequeue() {
return (void *)_dequeue();
}
- void _void_process(void *p) {
- _process((T *)p);
+ void _void_process(void *p, TPHandle &handle) {
+ _process((T *)p, handle);
}
void _void_process_finish(void *p) {
_process_finish((T *)p);