struct WorkQueue_ {
string name;
- WorkQueue_(string n) : name(n) {}
+ time_t timeout_interval;
+ WorkQueue_(string n, time_t ti)
+ : name(n), timeout_interval(ti)
+ { }
virtual ~WorkQueue_() {}
virtual void _clear() = 0;
virtual bool _empty() = 0;
}
public:
- WorkQueue(string n, ThreadPool* p) : WorkQueue_(n), pool(p) {
+ WorkQueue(string n, time_t ti, ThreadPool* p) : WorkQueue_(n, ti), pool(p) {
pool->add_work_queue(this);
}
~WorkQueue() {
OPTION(osd_max_opq, OPT_INT, 10),
OPTION(osd_disk_threads, OPT_INT, 1),
OPTION(osd_recovery_threads, OPT_INT, 1),
+ OPTION(osd_op_thread_timeout, OPT_INT, 30),
+ OPTION(osd_backlog_thread_timeout, OPT_INT, 60*60*1),
+ OPTION(osd_recovery_thread_timeout, OPT_INT, 30),
+ OPTION(osd_snap_trim_thread_timeout, OPT_INT, 60*60*1),
+ OPTION(osd_scrub_thread_timeout, OPT_INT, 60),
+ OPTION(osd_scrub_finalize_thread_timeout, OPT_INT, 60*10),
+ OPTION(osd_remove_thread_timeout, OPT_INT, 60*60),
OPTION(osd_age, OPT_FLOAT, .8),
OPTION(osd_age_time, OPT_INT, 0),
OPTION(osd_heartbeat_interval, OPT_INT, 1),
OPTION(filestore_queue_committing_max_ops, OPT_INT, 500), // this is ON TOP of filestore_queue_max_*
OPTION(filestore_queue_committing_max_bytes, OPT_INT, 100 << 20), // "
OPTION(filestore_op_threads, OPT_INT, 2),
+ OPTION(filestore_op_thread_timeout, OPT_INT, 60),
OPTION(filestore_commit_timeout, OPT_FLOAT, 600),
OPTION(filestore_fiemap_threshold, OPT_INT, 4096),
OPTION(journal_dio, OPT_BOOL, true),
OPTION(rgw_cache_enabled, OPT_BOOL, false), // rgw cache enabled
OPTION(rgw_cache_lru_size, OPT_INT, 10000), // num of entries in rgw cache
OPTION(rgw_socket_path, OPT_STR, NULL), // path to unix domain socket, if not specified, rgw will not run as external fcgi
+ OPTION(rgw_op_thread_timeout, OPT_INT, 10*60),
// see config.h
OPTION(internal_safe_to_start_threads, OPT_BOOL, false),
int osd_disk_threads;
int osd_recovery_threads;
+ int osd_op_thread_timeout;
+ int osd_backlog_thread_timeout;
+ int osd_recovery_thread_timeout;
+ int osd_snap_trim_thread_timeout;
+ int osd_scrub_thread_timeout;
+ int osd_scrub_finalize_thread_timeout;
+ int osd_remove_thread_timeout;
+
float osd_age;
int osd_age_time;
int osd_heartbeat_interval;
int filestore_queue_committing_max_ops;
int filestore_queue_committing_max_bytes;
int filestore_op_threads;
+ int filestore_op_thread_timeout;
float filestore_commit_timeout;
int filestore_fiemap_threshold;
bool rgw_cache_enabled;
int rgw_cache_lru_size;
string rgw_socket_path;
+ int rgw_op_thread_timeout;
// This will be set to true when it is safe to start threads.
// Once it is true, it will never change.
timer(g_ceph_context, sync_entry_timeo_lock),
stop(false), sync_thread(this),
op_queue_len(0), op_queue_bytes(0), op_finisher(g_ceph_context), next_finish(0),
- op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads), op_wq(this, &op_tp),
+ op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads),
+ op_wq(this, g_conf->filestore_op_thread_timeout, &op_tp),
flusher_queue_len(0), flusher_thread(this),
logger(NULL)
{
ThreadPool op_tp;
struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {
FileStore *store;
- OpWQ(FileStore *fs, ThreadPool *tp) : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", tp), store(fs) {}
+ OpWQ(FileStore *fs, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", ti, tp), store(fs) {}
bool _enqueue(OpSequencer *osr) {
store->op_queue.push_back(osr);
#include "common/Timer.h"
#include "common/LogClient.h"
#include "common/safe_io.h"
+#include "common/HeartbeatMap.h"
+
#include "include/color.h"
#include "perfglue/cpu_profiler.h"
#include "perfglue/heap_profiler.h"
heartbeat_dispatcher(this),
stat_lock("OSD::stat_lock"),
finished_lock("OSD::finished_lock"),
- op_wq(this, &op_tp),
+ op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
osdmap(NULL),
map_lock("OSD::map_lock"),
peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
osd_stat_updated(false),
last_tid(0),
tid_lock("OSD::tid_lock"),
- backlog_wq(this, &disk_tp),
+ backlog_wq(this, g_conf->osd_backlog_thread_timeout, &disk_tp),
recovery_ops_active(0),
- recovery_wq(this, &recovery_tp),
+ recovery_wq(this, g_conf->osd_recovery_thread_timeout, &recovery_tp),
remove_list_lock("OSD::remove_list_lock"),
replay_queue_lock("OSD::replay_queue_lock"),
- snap_trim_wq(this, &disk_tp),
+ snap_trim_wq(this, g_conf->osd_snap_trim_thread_timeout, &disk_tp),
sched_scrub_lock("OSD::sched_scrub_lock"),
scrubs_pending(0),
scrubs_active(0),
- scrub_wq(this, &disk_tp),
- scrub_finalize_wq(this, &op_tp),
- rep_scrub_wq(this, &disk_tp),
- remove_wq(this, &disk_tp),
+ scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
+ scrub_finalize_wq(this, g_conf->osd_scrub_finalize_thread_timeout, &op_tp),
+ rep_scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
+ remove_wq(this, g_conf->osd_remove_thread_timeout, &disk_tp),
watch_lock("OSD::watch_lock"),
watch_timer(hbm->cct, watch_lock)
{
assert(osd_lock.is_locked());
dout(5) << "tick" << dendl;
+ if (!cct->get_heartbeat_map()->is_healthy())
+ dout(0) << "tick internal heartbeat_map reports NOT HEALTHY" << dendl;
+ else
+ dout(20) << "tick internal heartbeat_map reports healthy" << dendl;
+
logger->set(l_osd_buf, buffer::get_total_alloc());
if (got_sigterm) {
struct OpWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
- OpWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::OpWQ", tp), osd(o) {}
+ OpWQ(OSD *o, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<PG>("OSD::OpWQ", ti, tp), osd(o) {}
bool _enqueue(PG *pg) {
pg->get();
struct BacklogWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
- BacklogWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::BacklogWQ", tp), osd(o) {}
+ BacklogWQ(OSD *o, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<PG>("OSD::BacklogWQ", ti, tp), osd(o) {}
bool _empty() {
return osd->backlog_queue.empty();
struct RecoveryWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
- RecoveryWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", tp), osd(o) {}
+ RecoveryWQ(OSD *o, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", ti, tp), osd(o) {}
bool _empty() {
return osd->recovery_queue.empty();
struct SnapTrimWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
- SnapTrimWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", tp), osd(o) {}
+ SnapTrimWQ(OSD *o, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", ti, tp), osd(o) {}
bool _empty() {
return osd->snap_trim_queue.empty();
struct ScrubWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
- ScrubWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", tp), osd(o) {}
+ ScrubWQ(OSD *o, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", ti, tp), osd(o) {}
bool _empty() {
return osd->scrub_queue.empty();
xlist<PG*> scrub_finalize_queue;
public:
- ScrubFinalizeWQ(OSD *o, ThreadPool *tp) :
- ThreadPool::WorkQueue<PG>("OSD::ScrubFinalizeWQ", tp), osd(o) {}
+ ScrubFinalizeWQ(OSD *o, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<PG>("OSD::ScrubFinalizeWQ", ti, tp), osd(o) {}
bool _empty() {
return scrub_finalize_queue.empty();
list<MOSDRepScrub*> rep_scrub_queue;
public:
- RepScrubWQ(OSD *o, ThreadPool *tp) :
- ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", tp), osd(o) {}
+ RepScrubWQ(OSD *o, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", ti, tp), osd(o) {}
bool _empty() {
return rep_scrub_queue.empty();
struct RemoveWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
- RemoveWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::RemoveWQ", tp), osd(o) {}
+ RemoveWQ(OSD *o, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<PG>("OSD::RemoveWQ", ti, tp), osd(o) {}
bool _empty() {
return osd->remove_queue.empty();
struct RGWWQ : public ThreadPool::WorkQueue<FCGX_Request> {
RGWProcess *process;
- RGWWQ(RGWProcess *p, ThreadPool *tp) : ThreadPool::WorkQueue<FCGX_Request>("RGWWQ", tp), process(p) {}
+ RGWWQ(RGWProcess *p, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<FCGX_Request>("RGWWQ", ti, tp), process(p) {}
bool _enqueue(FCGX_Request *req) {
process->m_fcgx_queue.push_back(req);
} req_wq;
public:
- RGWProcess(CephContext *cct, int num_threads) : m_tp(cct, "RGWProcess::m_tp", num_threads), req_wq(this, &m_tp) {}
+ RGWProcess(CephContext *cct, int num_threads)
+ : m_tp(cct, "RGWProcess::m_tp", num_threads),
+ req_wq(this, g_conf->rgw_op_thread_timeout, &m_tp) {}
void run();
void handle_request(FCGX_Request *fcgx);
};