From d7b45882f2b70550f89b9b619e355b8bb6693599 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 27 Jul 2011 22:47:41 -0700 Subject: [PATCH] workqueue: provide op timeout to workqueue constructor Signed-off-by: Sage Weil --- src/common/WorkQueue.h | 7 +++++-- src/common/config.cc | 9 +++++++++ src/common/config.h | 10 ++++++++++ src/os/FileStore.cc | 3 ++- src/os/FileStore.h | 3 ++- src/osd/OSD.cc | 23 +++++++++++++++-------- src/osd/OSD.h | 26 ++++++++++++++++---------- src/rgw/rgw_main.cc | 7 +++++-- 8 files changed, 64 insertions(+), 24 deletions(-) diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index 1e14d18548233..cee53d17ef02d 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -32,7 +32,10 @@ class ThreadPool { struct WorkQueue_ { string name; - WorkQueue_(string n) : name(n) {} + time_t timeout_interval; + WorkQueue_(string n, time_t ti) + : name(n), timeout_interval(ti) + { } virtual ~WorkQueue_() {} virtual void _clear() = 0; virtual bool _empty() = 0; @@ -63,7 +66,7 @@ public: } public: - WorkQueue(string n, ThreadPool* p) : WorkQueue_(n), pool(p) { + WorkQueue(string n, time_t ti, ThreadPool* p) : WorkQueue_(n, ti), pool(p) { pool->add_work_queue(this); } ~WorkQueue() { diff --git a/src/common/config.cc b/src/common/config.cc index 5d5748e69b65b..755af7eb2ae12 100644 --- a/src/common/config.cc +++ b/src/common/config.cc @@ -344,6 +344,13 @@ struct config_option config_optionsp[] = { OPTION(osd_max_opq, OPT_INT, 10), OPTION(osd_disk_threads, OPT_INT, 1), OPTION(osd_recovery_threads, OPT_INT, 1), + OPTION(osd_op_thread_timeout, OPT_INT, 30), + OPTION(osd_backlog_thread_timeout, OPT_INT, 60*60*1), + OPTION(osd_recovery_thread_timeout, OPT_INT, 30), + OPTION(osd_snap_trim_thread_timeout, OPT_INT, 60*60*1), + OPTION(osd_scrub_thread_timeout, OPT_INT, 60), + OPTION(osd_scrub_finalize_thread_timeout, OPT_INT, 60*10), + OPTION(osd_remove_thread_timeout, OPT_INT, 60*60), OPTION(osd_age, OPT_FLOAT, .8), OPTION(osd_age_time, OPT_INT, 0), OPTION(osd_heartbeat_interval, OPT_INT, 1), @@ -394,6 +401,7 @@ struct config_option config_optionsp[] = { OPTION(filestore_queue_committing_max_ops, OPT_INT, 500), // this is ON TOP of filestore_queue_max_* OPTION(filestore_queue_committing_max_bytes, OPT_INT, 100 << 20), // " OPTION(filestore_op_threads, OPT_INT, 2), + OPTION(filestore_op_thread_timeout, OPT_INT, 60), OPTION(filestore_commit_timeout, OPT_FLOAT, 600), OPTION(filestore_fiemap_threshold, OPT_INT, 4096), OPTION(journal_dio, OPT_BOOL, true), @@ -417,6 +425,7 @@ struct config_option config_optionsp[] = { OPTION(rgw_cache_enabled, OPT_BOOL, false), // rgw cache enabled OPTION(rgw_cache_lru_size, OPT_INT, 10000), // num of entries in rgw cache OPTION(rgw_socket_path, OPT_STR, NULL), // path to unix domain socket, if not specified, rgw will not run as external fcgi + OPTION(rgw_op_thread_timeout, OPT_INT, 10*60), // see config.h OPTION(internal_safe_to_start_threads, OPT_BOOL, false), diff --git a/src/common/config.h b/src/common/config.h index 6161cc7e24e71..4de48ba9ad1db 100644 --- a/src/common/config.h +++ b/src/common/config.h @@ -419,6 +419,14 @@ public: int osd_disk_threads; int osd_recovery_threads; + int osd_op_thread_timeout; + int osd_backlog_thread_timeout; + int osd_recovery_thread_timeout; + int osd_snap_trim_thread_timeout; + int osd_scrub_thread_timeout; + int osd_scrub_finalize_thread_timeout; + int osd_remove_thread_timeout; + float osd_age; int osd_age_time; int osd_heartbeat_interval; @@ -479,6 +487,7 @@ public: int filestore_queue_committing_max_ops; int filestore_queue_committing_max_bytes; int filestore_op_threads; + int filestore_op_thread_timeout; float filestore_commit_timeout; int filestore_fiemap_threshold; @@ -508,6 +517,7 @@ public: bool rgw_cache_enabled; int rgw_cache_lru_size; string rgw_socket_path; + int rgw_op_thread_timeout; // This will be set to true when it is safe to start threads. // Once it is true, it will never change. diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index df11d8cf48f23..8bdee6b466bc4 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -875,7 +875,8 @@ FileStore::FileStore(const std::string &base, const std::string &jdev) : timer(g_ceph_context, sync_entry_timeo_lock), stop(false), sync_thread(this), op_queue_len(0), op_queue_bytes(0), op_finisher(g_ceph_context), next_finish(0), - op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads), op_wq(this, &op_tp), + op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads), + op_wq(this, g_conf->filestore_op_thread_timeout, &op_tp), flusher_queue_len(0), flusher_thread(this), logger(NULL) { diff --git a/src/os/FileStore.h b/src/os/FileStore.h index 0892b83abff86..dbe9d644c7be7 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -172,7 +172,8 @@ class FileStore : public JournalingObjectStore { ThreadPool op_tp; struct OpWQ : public ThreadPool::WorkQueue { FileStore *store; - OpWQ(FileStore *fs, ThreadPool *tp) : ThreadPool::WorkQueue("FileStore::OpWQ", tp), store(fs) {} + OpWQ(FileStore *fs, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue("FileStore::OpWQ", ti, tp), store(fs) {} bool _enqueue(OpSequencer *osr) { store->op_queue.push_back(osr); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 51e6ad0393fd9..516c9f484e6cc 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -74,6 +74,8 @@ #include "common/Timer.h" #include "common/LogClient.h" #include "common/safe_io.h" +#include "common/HeartbeatMap.h" + #include "include/color.h" #include "perfglue/cpu_profiler.h" #include "perfglue/heap_profiler.h" @@ -428,7 +430,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, heartbeat_dispatcher(this), stat_lock("OSD::stat_lock"), finished_lock("OSD::finished_lock"), - op_wq(this, &op_tp), + op_wq(this, g_conf->osd_op_thread_timeout, &op_tp), osdmap(NULL), map_lock("OSD::map_lock"), peer_map_epoch_lock("OSD::peer_map_epoch_lock"), @@ -438,19 +440,19 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, osd_stat_updated(false), last_tid(0), tid_lock("OSD::tid_lock"), - backlog_wq(this, &disk_tp), + backlog_wq(this, g_conf->osd_backlog_thread_timeout, &disk_tp), recovery_ops_active(0), - recovery_wq(this, &recovery_tp), + recovery_wq(this, g_conf->osd_recovery_thread_timeout, &recovery_tp), remove_list_lock("OSD::remove_list_lock"), replay_queue_lock("OSD::replay_queue_lock"), - snap_trim_wq(this, &disk_tp), + snap_trim_wq(this, g_conf->osd_snap_trim_thread_timeout, &disk_tp), sched_scrub_lock("OSD::sched_scrub_lock"), scrubs_pending(0), scrubs_active(0), - scrub_wq(this, &disk_tp), - scrub_finalize_wq(this, &op_tp), - rep_scrub_wq(this, &disk_tp), - remove_wq(this, &disk_tp), + scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp), + scrub_finalize_wq(this, g_conf->osd_scrub_finalize_thread_timeout, &op_tp), + rep_scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp), + remove_wq(this, g_conf->osd_remove_thread_timeout, &disk_tp), watch_lock("OSD::watch_lock"), watch_timer(hbm->cct, watch_lock) { @@ -1702,6 +1704,11 @@ void OSD::tick() assert(osd_lock.is_locked()); dout(5) << "tick" << dendl; + if (!cct->get_heartbeat_map()->is_healthy()) + dout(0) << "tick internal heartbeat_map reports NOT HEALTHY" << dendl; + else + dout(20) << "tick internal heartbeat_map reports healthy" << dendl; + logger->set(l_osd_buf, buffer::get_total_alloc()); if (got_sigterm) { diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 8a6e8a6600664..9aee7a19869ae 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -315,7 +315,8 @@ private: struct OpWQ : public ThreadPool::WorkQueue { OSD *osd; - OpWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::OpWQ", tp), osd(o) {} + OpWQ(OSD *o, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue("OSD::OpWQ", ti, tp), osd(o) {} bool _enqueue(PG *pg) { pg->get(); @@ -591,7 +592,8 @@ protected: struct BacklogWQ : public ThreadPool::WorkQueue { OSD *osd; - BacklogWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::BacklogWQ", tp), osd(o) {} + BacklogWQ(OSD *o, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue("OSD::BacklogWQ", ti, tp), osd(o) {} bool _empty() { return osd->backlog_queue.empty(); @@ -642,7 +644,8 @@ protected: struct RecoveryWQ : public ThreadPool::WorkQueue { OSD *osd; - RecoveryWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::RecoveryWQ", tp), osd(o) {} + RecoveryWQ(OSD *o, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue("OSD::RecoveryWQ", ti, tp), osd(o) {} bool _empty() { return osd->recovery_queue.empty(); @@ -716,7 +719,8 @@ protected: struct SnapTrimWQ : public ThreadPool::WorkQueue { OSD *osd; - SnapTrimWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::SnapTrimWQ", tp), osd(o) {} + SnapTrimWQ(OSD *o, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue("OSD::SnapTrimWQ", ti, tp), osd(o) {} bool _empty() { return osd->snap_trim_queue.empty(); @@ -775,7 +779,8 @@ protected: struct ScrubWQ : public ThreadPool::WorkQueue { OSD *osd; - ScrubWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::ScrubWQ", tp), osd(o) {} + ScrubWQ(OSD *o, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue("OSD::ScrubWQ", ti, tp), osd(o) {} bool _empty() { return osd->scrub_queue.empty(); @@ -818,8 +823,8 @@ protected: xlist scrub_finalize_queue; public: - ScrubFinalizeWQ(OSD *o, ThreadPool *tp) : - ThreadPool::WorkQueue("OSD::ScrubFinalizeWQ", tp), osd(o) {} + ScrubFinalizeWQ(OSD *o, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue("OSD::ScrubFinalizeWQ", ti, tp), osd(o) {} bool _empty() { return scrub_finalize_queue.empty(); @@ -863,8 +868,8 @@ protected: list rep_scrub_queue; public: - RepScrubWQ(OSD *o, ThreadPool *tp) : - ThreadPool::WorkQueue("OSD::RepScrubWQ", tp), osd(o) {} + RepScrubWQ(OSD *o, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue("OSD::RepScrubWQ", ti, tp), osd(o) {} bool _empty() { return rep_scrub_queue.empty(); @@ -910,7 +915,8 @@ protected: struct RemoveWQ : public ThreadPool::WorkQueue { OSD *osd; - RemoveWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::RemoveWQ", tp), osd(o) {} + RemoveWQ(OSD *o, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue("OSD::RemoveWQ", ti, tp), osd(o) {} bool _empty() { return osd->remove_queue.empty(); diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 19d38e70d28a0..67542c5a3a5e7 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -61,7 +61,8 @@ class RGWProcess { struct RGWWQ : public ThreadPool::WorkQueue { RGWProcess *process; - RGWWQ(RGWProcess *p, ThreadPool *tp) : ThreadPool::WorkQueue("RGWWQ", tp), process(p) {} + RGWWQ(RGWProcess *p, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue("RGWWQ", ti, tp), process(p) {} bool _enqueue(FCGX_Request *req) { process->m_fcgx_queue.push_back(req); @@ -104,7 +105,9 @@ class RGWProcess { } req_wq; public: - RGWProcess(CephContext *cct, int num_threads) : m_tp(cct, "RGWProcess::m_tp", num_threads), req_wq(this, &m_tp) {} + RGWProcess(CephContext *cct, int num_threads) + : m_tp(cct, "RGWProcess::m_tp", num_threads), + req_wq(this, g_conf->rgw_op_thread_timeout, &m_tp) {} void run(); void handle_request(FCGX_Request *fcgx); }; -- 2.39.5