*
*/
+#include <sstream>
+
#include "include/types.h"
+#include "include/utime.h"
#include "WorkQueue.h"
#include "common/config.h"
+#include "common/HeartbeatMap.h"
#define DOUT_SUBSYS tp
#undef dout_prefix
{
_lock.Lock();
ldout(cct,10) << "worker start" << dendl;
+
+ std::stringstream ss;
+ ss << name << '@' << this;
+ heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(pthread_self(), ss.str());
+
while (!_stop) {
if (!_pause && work_queues.size()) {
WorkQueue_* wq;
processing++;
ldout(cct,12) << "worker wq " << wq->name << " start processing " << item << dendl;
_lock.Unlock();
+ cct->get_heartbeat_map()->touch_worker(hb, wq->timeout_interval);
wq->_void_process(item);
_lock.Lock();
wq->_void_process_finish(item);
if (did)
continue;
}
+
ldout(cct,15) << "worker waiting" << dendl;
- _cond.Wait(_lock);
+ cct->get_heartbeat_map()->touch_worker(hb, 4);
+ _cond.WaitInterval(cct, _lock, utime_t(2, 0));
}
ldout(cct,0) << "worker finish" << dendl;
+
+ cct->get_heartbeat_map()->remove_worker(hb);
+
_lock.Unlock();
}