delete h;
}
-void HeartbeatMap::reset_timeout(heartbeat_handle_d *h, time_t grace)
+bool HeartbeatMap::_check(heartbeat_handle_d *h, const char *who, time_t now)
{
- ldout(m_cct, 20) << "reset_timeout '" << h->name << "' grace " << grace << dendl;
- time_t now = time(NULL);
- time_t was = h->timeout.read();
+ bool healthy = true;
+ time_t was;
+
+ was = h->timeout.read();
if (was && was < now) {
- ldout(m_cct, 1) << "reset_timeout '" << h->name << "'"
+ ldout(m_cct, 1) << who << " '" << h->name << "'"
<< " had timed out after " << h->grace << dendl;
+ healthy = false;
+ }
+ was = h->suicide_timeout.read();
+ if (was && was < now) {
+ ldout(m_cct, 1) << who << " '" << h->name << "'"
+ << " had suicide timed out after " << h->suicide_grace << dendl;
+ assert(0 == "hit suicide timeout");
}
+ return healthy;
+}
+
+void HeartbeatMap::reset_timeout(heartbeat_handle_d *h, time_t grace, time_t suicide_grace)
+{
+ ldout(m_cct, 20) << "reset_timeout '" << h->name << "' grace " << grace
+ << " suicide " << suicide_grace << dendl;
+ time_t now = time(NULL);
+ _check(h, "reset_timeout", now);
+
h->timeout.set(now + grace);
h->grace = grace;
+
+ if (suicide_grace)
+ h->suicide_timeout.set(now + suicide_grace);
+ h->suicide_grace = suicide_grace;
}
void HeartbeatMap::clear_timeout(heartbeat_handle_d *h)
{
ldout(m_cct, 20) << "clear_timeout '" << h->name << "'" << dendl;
time_t now = time(NULL);
- time_t was = h->timeout.read();
- if (was && was < now) {
- ldout(m_cct, 1) << "clear_timeout '" << h->name << "'"
- << " had timed out after " << h->grace << dendl;
- }
+ _check(h, "clear_timeout", now);
h->timeout.set(0);
+ h->suicide_timeout.set(0);
}
bool HeartbeatMap::is_healthy()
p != m_workers.end();
++p) {
heartbeat_handle_d *h = *p;
- time_t timeout = h->timeout.read();
- if (timeout && timeout < now) {
- ldout(m_cct, 1) << "is_healthy '" << h->name << "'"
- << " timed out after " << h->grace << dendl;
+ if (!_check(h, "is_healthy", now)) {
healthy = false;
}
}
struct heartbeat_handle_d {
std::string name;
- atomic_t timeout;
- time_t grace;
+ atomic_t timeout, suicide_timeout;
+ time_t grace, suicide_grace;
std::list<heartbeat_handle_d*>::iterator list_item;
heartbeat_handle_d(const std::string& n)
- : name(n), grace(0)
+ : name(n), grace(0), suicide_grace(0)
{ }
};
void remove_worker(heartbeat_handle_d *h);
// reset the timeout so that it expects another touch within grace amount of time
- void reset_timeout(heartbeat_handle_d *h, time_t grace);
+ void reset_timeout(heartbeat_handle_d *h, time_t grace, time_t suicide_grace);
// clear the timeout so that it's not checked on
void clear_timeout(heartbeat_handle_d *h);
CephContext *m_cct;
RWLock m_rwlock;
std::list<heartbeat_handle_d*> m_workers;
+
+ bool _check(heartbeat_handle_d *h, const char *who, time_t now);
};
}
processing++;
ldout(cct,12) << "worker wq " << wq->name << " start processing " << item << dendl;
_lock.Unlock();
- cct->get_heartbeat_map()->reset_timeout(hb, wq->timeout_interval);
+ cct->get_heartbeat_map()->reset_timeout(hb, wq->timeout_interval, wq->suicide_interval);
wq->_void_process(item);
_lock.Lock();
wq->_void_process_finish(item);
}
ldout(cct,15) << "worker waiting" << dendl;
- cct->get_heartbeat_map()->reset_timeout(hb, 4);
+ cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
_cond.WaitInterval(cct, _lock, utime_t(2, 0));
}
ldout(cct,1) << "worker finish" << dendl;
struct WorkQueue_ {
string name;
- time_t timeout_interval;
- WorkQueue_(string n, time_t ti)
- : name(n), timeout_interval(ti)
+ time_t timeout_interval, suicide_interval;
+ WorkQueue_(string n, time_t ti, time_t sti)
+ : name(n), timeout_interval(ti), suicide_interval(sti)
{ }
virtual ~WorkQueue_() {}
virtual void _clear() = 0;
}
public:
- WorkQueue(string n, time_t ti, ThreadPool* p) : WorkQueue_(n, ti), pool(p) {
+ WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_(n, ti, sti), pool(p) {
pool->add_work_queue(this);
}
~WorkQueue() {
OPTION(rgw_cache_lru_size, OPT_INT, 10000), // num of entries in rgw cache
OPTION(rgw_socket_path, OPT_STR, NULL), // path to unix domain socket, if not specified, rgw will not run as external fcgi
OPTION(rgw_op_thread_timeout, OPT_INT, 10*60),
+ OPTION(rgw_op_thread_suicide_timeout, OPT_INT, 60*60),
// see config.h
OPTION(internal_safe_to_start_threads, OPT_BOOL, false),
int rgw_cache_lru_size;
string rgw_socket_path;
int rgw_op_thread_timeout;
+ int rgw_op_thread_suicide_timeout;
// This will be set to true when it is safe to start threads.
// Once it is true, it will never change.
ThreadPool op_tp;
struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {
FileStore *store;
- OpWQ(FileStore *fs, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", ti, tp), store(fs) {}
+ OpWQ(FileStore *fs, time_t timeout, ThreadPool *tp)
+ : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", timeout, 0, tp), store(fs) {}
bool _enqueue(OpSequencer *osr) {
store->op_queue.push_back(osr);
struct OpWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
OpWQ(OSD *o, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<PG>("OSD::OpWQ", ti, tp), osd(o) {}
+ : ThreadPool::WorkQueue<PG>("OSD::OpWQ", ti, 0, tp), osd(o) {}
bool _enqueue(PG *pg) {
pg->get();
struct BacklogWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
BacklogWQ(OSD *o, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<PG>("OSD::BacklogWQ", ti, tp), osd(o) {}
+ : ThreadPool::WorkQueue<PG>("OSD::BacklogWQ", ti, 0, tp), osd(o) {}
bool _empty() {
return osd->backlog_queue.empty();
struct RecoveryWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
RecoveryWQ(OSD *o, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", ti, tp), osd(o) {}
+ : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", ti, 0, tp), osd(o) {}
bool _empty() {
return osd->recovery_queue.empty();
struct SnapTrimWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
SnapTrimWQ(OSD *o, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", ti, tp), osd(o) {}
+ : ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", ti, 0, tp), osd(o) {}
bool _empty() {
return osd->snap_trim_queue.empty();
struct ScrubWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
ScrubWQ(OSD *o, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", ti, tp), osd(o) {}
+ : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", ti, 0, tp), osd(o) {}
bool _empty() {
return osd->scrub_queue.empty();
public:
ScrubFinalizeWQ(OSD *o, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<PG>("OSD::ScrubFinalizeWQ", ti, tp), osd(o) {}
+ : ThreadPool::WorkQueue<PG>("OSD::ScrubFinalizeWQ", ti, 0, tp), osd(o) {}
bool _empty() {
return scrub_finalize_queue.empty();
public:
RepScrubWQ(OSD *o, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", ti, tp), osd(o) {}
+ : ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", ti, 0, tp), osd(o) {}
bool _empty() {
return rep_scrub_queue.empty();
struct RemoveWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
RemoveWQ(OSD *o, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<PG>("OSD::RemoveWQ", ti, tp), osd(o) {}
+ : ThreadPool::WorkQueue<PG>("OSD::RemoveWQ", ti, 0, tp), osd(o) {}
bool _empty() {
return osd->remove_queue.empty();
public:
ExportLocalFileWQ(IoCtxDistributor *io_ctx_dist, time_t ti,
ThreadPool *tp, ExportDir *export_dir, bool force)
- : RadosSyncWQ(io_ctx_dist, ti, tp),
+ : RadosSyncWQ(io_ctx_dist, ti, 0, tp),
m_export_dir(export_dir),
m_force(force)
{
public:
ExportValidateExistingWQ(IoCtxDistributor *io_ctx_dist, time_t ti,
ThreadPool *tp, const char *dir_name)
- : RadosSyncWQ(io_ctx_dist, ti, tp),
+ : RadosSyncWQ(io_ctx_dist, ti, 0, tp),
m_dir_name(dir_name)
{
}
public:
ImportLocalFileWQ(const char *dir_name, bool force,
IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp)
- : RadosSyncWQ(io_ctx_dist, ti, tp),
+ : RadosSyncWQ(io_ctx_dist, ti, 0, tp),
m_dir_name(dir_name),
m_force(force)
{
public:
ImportValidateExistingWQ(ExportDir *export_dir,
IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp)
- : RadosSyncWQ(io_ctx_dist, ti, tp),
+ : RadosSyncWQ(io_ctx_dist, ti, 0, tp),
m_export_dir(export_dir)
{
}
clear();
}
-RadosSyncWQ::RadosSyncWQ(IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<std::string>("FileStore::OpWQ", ti, tp),
+RadosSyncWQ::RadosSyncWQ(IoCtxDistributor *io_ctx_dist, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+ : ThreadPool::WorkQueue<std::string>("FileStore::OpWQ", timeout, suicide_timeout, tp),
m_io_ctx_dist(io_ctx_dist)
{
}
class RadosSyncWQ : public ThreadPool::WorkQueue<std::string> {
public:
- RadosSyncWQ(IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp);
+ RadosSyncWQ(IoCtxDistributor *io_ctx_dist, time_t timeout, time_t suicide_timeout, ThreadPool *tp);
protected:
IoCtxDistributor *m_io_ctx_dist;
private:
struct RGWWQ : public ThreadPool::WorkQueue<FCGX_Request> {
RGWProcess *process;
- RGWWQ(RGWProcess *p, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<FCGX_Request>("RGWWQ", ti, tp), process(p) {}
+ RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+ : ThreadPool::WorkQueue<FCGX_Request>("RGWWQ", timeout, suicide_timeout, tp), process(p) {}
bool _enqueue(FCGX_Request *req) {
process->m_fcgx_queue.push_back(req);
public:
RGWProcess(CephContext *cct, int num_threads)
: m_tp(cct, "RGWProcess::m_tp", num_threads),
- req_wq(this, g_conf->rgw_op_thread_timeout, &m_tp) {}
+ req_wq(this, g_conf->rgw_op_thread_timeout,
+ g_conf->rgw_op_thread_suicide_timeout, &m_tp) {}
void run();
void handle_request(FCGX_Request *fcgx);
};