]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
workqueue: provide op timeout to workqueue constructor
authorSage Weil <sage.weil@dreamhost.com>
Thu, 28 Jul 2011 05:47:41 +0000 (22:47 -0700)
committerSage Weil <sage@newdream.net>
Thu, 28 Jul 2011 16:49:23 +0000 (09:49 -0700)
Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/common/WorkQueue.h
src/common/config.cc
src/common/config.h
src/os/FileStore.cc
src/os/FileStore.h
src/osd/OSD.cc
src/osd/OSD.h
src/rgw/rgw_main.cc

index 1e14d18548233eea7938986b44a56aee38ba2e13..cee53d17ef02d4b4d078e9a637acafb0fb169f8b 100644 (file)
@@ -32,7 +32,10 @@ class ThreadPool {
 
   struct WorkQueue_ {
     string name;
-    WorkQueue_(string n) : name(n) {}
+    time_t timeout_interval;
+    WorkQueue_(string n, time_t ti)
+      : name(n), timeout_interval(ti)
+    { }
     virtual ~WorkQueue_() {}
     virtual void _clear() = 0;
     virtual bool _empty() = 0;
@@ -63,7 +66,7 @@ public:
     }
 
   public:
-    WorkQueue(string n, ThreadPool* p) : WorkQueue_(n), pool(p) {
+    WorkQueue(string n, time_t ti, ThreadPool* p) : WorkQueue_(n, ti), pool(p) {
       pool->add_work_queue(this);
     }
     ~WorkQueue() {
index 5d5748e69b65bc92978f5e14d39b97cfca6a91ad..755af7eb2ae123bcffd9b84403cdc8d9a6c3a13c 100644 (file)
@@ -344,6 +344,13 @@ struct config_option config_optionsp[] = {
   OPTION(osd_max_opq, OPT_INT, 10),
   OPTION(osd_disk_threads, OPT_INT, 1),
   OPTION(osd_recovery_threads, OPT_INT, 1),
+  OPTION(osd_op_thread_timeout, OPT_INT, 30),
+  OPTION(osd_backlog_thread_timeout, OPT_INT, 60*60*1),
+  OPTION(osd_recovery_thread_timeout, OPT_INT, 30),
+  OPTION(osd_snap_trim_thread_timeout, OPT_INT, 60*60*1),
+  OPTION(osd_scrub_thread_timeout, OPT_INT, 60),
+  OPTION(osd_scrub_finalize_thread_timeout, OPT_INT, 60*10),
+  OPTION(osd_remove_thread_timeout, OPT_INT, 60*60),
   OPTION(osd_age, OPT_FLOAT, .8),
   OPTION(osd_age_time, OPT_INT, 0),
   OPTION(osd_heartbeat_interval, OPT_INT, 1),
@@ -394,6 +401,7 @@ struct config_option config_optionsp[] = {
   OPTION(filestore_queue_committing_max_ops, OPT_INT, 500),        // this is ON TOP of filestore_queue_max_*
   OPTION(filestore_queue_committing_max_bytes, OPT_INT, 100 << 20), //  "
   OPTION(filestore_op_threads, OPT_INT, 2),
+  OPTION(filestore_op_thread_timeout, OPT_INT, 60),
   OPTION(filestore_commit_timeout, OPT_FLOAT, 600),
   OPTION(filestore_fiemap_threshold, OPT_INT, 4096),
   OPTION(journal_dio, OPT_BOOL, true),
@@ -417,6 +425,7 @@ struct config_option config_optionsp[] = {
   OPTION(rgw_cache_enabled, OPT_BOOL, false),   // rgw cache enabled
   OPTION(rgw_cache_lru_size, OPT_INT, 10000),   // num of entries in rgw cache
   OPTION(rgw_socket_path, OPT_STR, NULL),   // path to unix domain socket, if not specified, rgw will not run as external fcgi
+  OPTION(rgw_op_thread_timeout, OPT_INT, 10*60),
 
   // see config.h
   OPTION(internal_safe_to_start_threads, OPT_BOOL, false),
index 6161cc7e24e71cbe71fd4b03d41d43986a873442..4de48ba9ad1db3500e05d50bfba7fcb7eb5835e3 100644 (file)
@@ -419,6 +419,14 @@ public:
   int   osd_disk_threads;
   int   osd_recovery_threads;
 
+  int   osd_op_thread_timeout;
+  int   osd_backlog_thread_timeout;
+  int   osd_recovery_thread_timeout;
+  int   osd_snap_trim_thread_timeout;
+  int   osd_scrub_thread_timeout;
+  int   osd_scrub_finalize_thread_timeout;
+  int   osd_remove_thread_timeout;
+
   float   osd_age;
   int   osd_age_time;
   int   osd_heartbeat_interval;
@@ -479,6 +487,7 @@ public:
   int filestore_queue_committing_max_ops;
   int filestore_queue_committing_max_bytes;
   int filestore_op_threads;
+  int filestore_op_thread_timeout;
   float filestore_commit_timeout;
   int filestore_fiemap_threshold;
 
@@ -508,6 +517,7 @@ public:
   bool  rgw_cache_enabled;
   int   rgw_cache_lru_size;
   string rgw_socket_path;
+  int rgw_op_thread_timeout;
 
   // This will be set to true when it is safe to start threads.
   // Once it is true, it will never change.
index df11d8cf48f237b3c07d41188774985323b5e507..8bdee6b466bc47df61b2fc83bbaf7994ad199ee3 100644 (file)
@@ -875,7 +875,8 @@ FileStore::FileStore(const std::string &base, const std::string &jdev) :
   timer(g_ceph_context, sync_entry_timeo_lock),
   stop(false), sync_thread(this),
   op_queue_len(0), op_queue_bytes(0), op_finisher(g_ceph_context), next_finish(0),
-  op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads), op_wq(this, &op_tp),
+  op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads),
+  op_wq(this, g_conf->filestore_op_thread_timeout, &op_tp),
   flusher_queue_len(0), flusher_thread(this),
   logger(NULL)
 {
index 0892b83abff86d1c98a7b15f3c0810038a232c6c..dbe9d644c7be7fafa50c0c763caa8c2a6d1c4769 100644 (file)
@@ -172,7 +172,8 @@ class FileStore : public JournalingObjectStore {
   ThreadPool op_tp;
   struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {
     FileStore *store;
-    OpWQ(FileStore *fs, ThreadPool *tp) : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", tp), store(fs) {}
+    OpWQ(FileStore *fs, time_t ti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", ti, tp), store(fs) {}
 
     bool _enqueue(OpSequencer *osr) {
       store->op_queue.push_back(osr);
index 51e6ad0393fd9b9b5accc03a47fa456d62a01338..516c9f484e6cc50821651ae2caa560ffe3407758 100644 (file)
@@ -74,6 +74,8 @@
 #include "common/Timer.h"
 #include "common/LogClient.h"
 #include "common/safe_io.h"
+#include "common/HeartbeatMap.h"
+
 #include "include/color.h"
 #include "perfglue/cpu_profiler.h"
 #include "perfglue/heap_profiler.h"
@@ -428,7 +430,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   heartbeat_dispatcher(this),
   stat_lock("OSD::stat_lock"),
   finished_lock("OSD::finished_lock"),
-  op_wq(this, &op_tp),
+  op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
   osdmap(NULL),
   map_lock("OSD::map_lock"),
   peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
@@ -438,19 +440,19 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   osd_stat_updated(false),
   last_tid(0),
   tid_lock("OSD::tid_lock"),
-  backlog_wq(this, &disk_tp),
+  backlog_wq(this, g_conf->osd_backlog_thread_timeout, &disk_tp),
   recovery_ops_active(0),
-  recovery_wq(this, &recovery_tp),
+  recovery_wq(this, g_conf->osd_recovery_thread_timeout, &recovery_tp),
   remove_list_lock("OSD::remove_list_lock"),
   replay_queue_lock("OSD::replay_queue_lock"),
-  snap_trim_wq(this, &disk_tp),
+  snap_trim_wq(this, g_conf->osd_snap_trim_thread_timeout, &disk_tp),
   sched_scrub_lock("OSD::sched_scrub_lock"),
   scrubs_pending(0),
   scrubs_active(0),
-  scrub_wq(this, &disk_tp),
-  scrub_finalize_wq(this, &op_tp),
-  rep_scrub_wq(this, &disk_tp),
-  remove_wq(this, &disk_tp),
+  scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
+  scrub_finalize_wq(this, g_conf->osd_scrub_finalize_thread_timeout, &op_tp),
+  rep_scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
+  remove_wq(this, g_conf->osd_remove_thread_timeout, &disk_tp),
   watch_lock("OSD::watch_lock"),
   watch_timer(hbm->cct, watch_lock)
 {
@@ -1702,6 +1704,11 @@ void OSD::tick()
   assert(osd_lock.is_locked());
   dout(5) << "tick" << dendl;
 
+  if (!cct->get_heartbeat_map()->is_healthy())
+    dout(0) << "tick internal heartbeat_map reports NOT HEALTHY" << dendl;
+  else
+    dout(20) << "tick internal heartbeat_map reports healthy" << dendl;
+
   logger->set(l_osd_buf, buffer::get_total_alloc());
 
   if (got_sigterm) {
index 8a6e8a660066455f18eb3f558bf8b5d5dfb0e6df..9aee7a19869aedfeb0f7efb1ca9e5473830b8bc1 100644 (file)
@@ -315,7 +315,8 @@ private:
   
   struct OpWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
-    OpWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::OpWQ", tp), osd(o) {}
+    OpWQ(OSD *o, time_t ti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<PG>("OSD::OpWQ", ti, tp), osd(o) {}
 
     bool _enqueue(PG *pg) {
       pg->get();
@@ -591,7 +592,8 @@ protected:
 
   struct BacklogWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
-    BacklogWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::BacklogWQ", tp), osd(o) {}
+    BacklogWQ(OSD *o, time_t ti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<PG>("OSD::BacklogWQ", ti, tp), osd(o) {}
 
     bool _empty() {
       return osd->backlog_queue.empty();
@@ -642,7 +644,8 @@ protected:
 
   struct RecoveryWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
-    RecoveryWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", tp), osd(o) {}
+    RecoveryWQ(OSD *o, time_t ti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", ti, tp), osd(o) {}
 
     bool _empty() {
       return osd->recovery_queue.empty();
@@ -716,7 +719,8 @@ protected:
   
   struct SnapTrimWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
-    SnapTrimWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", tp), osd(o) {}
+    SnapTrimWQ(OSD *o, time_t ti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", ti, tp), osd(o) {}
 
     bool _empty() {
       return osd->snap_trim_queue.empty();
@@ -775,7 +779,8 @@ protected:
 
   struct ScrubWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
-    ScrubWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", tp), osd(o) {}
+    ScrubWQ(OSD *o, time_t ti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", ti, tp), osd(o) {}
 
     bool _empty() {
       return osd->scrub_queue.empty();
@@ -818,8 +823,8 @@ protected:
     xlist<PG*> scrub_finalize_queue;
 
   public:
-    ScrubFinalizeWQ(OSD *o, ThreadPool *tp) : 
-      ThreadPool::WorkQueue<PG>("OSD::ScrubFinalizeWQ", tp), osd(o) {}
+    ScrubFinalizeWQ(OSD *o, time_t ti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<PG>("OSD::ScrubFinalizeWQ", ti, tp), osd(o) {}
 
     bool _empty() {
       return scrub_finalize_queue.empty();
@@ -863,8 +868,8 @@ protected:
     list<MOSDRepScrub*> rep_scrub_queue;
 
   public:
-    RepScrubWQ(OSD *o, ThreadPool *tp) : 
-      ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", tp), osd(o) {}
+    RepScrubWQ(OSD *o, time_t ti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", ti, tp), osd(o) {}
 
     bool _empty() {
       return rep_scrub_queue.empty();
@@ -910,7 +915,8 @@ protected:
 
   struct RemoveWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
-    RemoveWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::RemoveWQ", tp), osd(o) {}
+    RemoveWQ(OSD *o, time_t ti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<PG>("OSD::RemoveWQ", ti, tp), osd(o) {}
 
     bool _empty() {
       return osd->remove_queue.empty();
index 19d38e70d28a06348411c5e506e52750c5404ac5..67542c5a3a5e74365242e294144683f0349ba093 100644 (file)
@@ -61,7 +61,8 @@ class RGWProcess {
 
   struct RGWWQ : public ThreadPool::WorkQueue<FCGX_Request> {
     RGWProcess *process;
-    RGWWQ(RGWProcess *p, ThreadPool *tp) : ThreadPool::WorkQueue<FCGX_Request>("RGWWQ", tp), process(p) {}
+    RGWWQ(RGWProcess *p, time_t ti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<FCGX_Request>("RGWWQ", ti, tp), process(p) {}
 
     bool _enqueue(FCGX_Request *req) {
       process->m_fcgx_queue.push_back(req);
@@ -104,7 +105,9 @@ class RGWProcess {
   } req_wq;
 
 public:
-  RGWProcess(CephContext *cct, int num_threads) : m_tp(cct, "RGWProcess::m_tp", num_threads), req_wq(this, &m_tp) {}
+  RGWProcess(CephContext *cct, int num_threads)
+    : m_tp(cct, "RGWProcess::m_tp", num_threads),
+      req_wq(this, g_conf->rgw_op_thread_timeout, &m_tp) {}
   void run();
   void handle_request(FCGX_Request *fcgx);
 };