]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: allow osd_op_thread_timeout and suicide_timeout to be adjusted on the fly 49628/head
authorhaoyixing <haoyixing@kuaishou.com>
Wed, 4 Jan 2023 06:56:04 +0000 (14:56 +0800)
committerhaoyixing <haoyixing@kuaishou.com>
Sun, 29 Jan 2023 06:43:05 +0000 (14:43 +0800)
And make timeout_interval/suicide_interval atomic for WorkQueue_ and BaseShardedWQ.

Signed-off-by: haoyixing <haoyixing@kuaishou.com>
src/common/WorkQueue.cc
src/common/WorkQueue.h
src/os/bluestore/BlueStore.cc
src/osd/OSD.cc
src/test/test_workqueue.cc

index ea7ff393902013dbb937af5122eef7b769ae1cab..d181d30e5ef7586418769a5a5d853c0fb2317bfd 100644 (file)
@@ -114,7 +114,7 @@ void ThreadPool::worker(WorkThread *wt)
          ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
                        << " (" << processing << " active)" << dendl;
          ul.unlock();
-         TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
+         TPHandle tp_handle(cct, hb, wq->timeout_interval.load(), wq->suicide_interval.load());
          tp_handle.reset_tp_timeout();
          wq->_void_process(item, tp_handle);
          ul.lock();
@@ -280,8 +280,8 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
       while (pause_threads) {
        cct->get_heartbeat_map()->reset_timeout(
                hb,
-               wq->timeout_interval,
-               wq->suicide_interval);
+               wq->timeout_interval.load(),
+               wq->suicide_interval.load());
        shardedpool_cond.wait_for(
         ul,
         std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
@@ -296,8 +296,8 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
         while (drain_threads) {
          cct->get_heartbeat_map()->reset_timeout(
            hb,
-           wq->timeout_interval,
-           wq->suicide_interval);
+           wq->timeout_interval.load(),
+           wq->suicide_interval.load());
           shardedpool_cond.wait_for(
            ul,
            std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
@@ -307,11 +307,10 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
     }
 
     cct->get_heartbeat_map()->reset_timeout(
-      hb,
-      wq->timeout_interval,
-      wq->suicide_interval);
-    wq->_process(thread_index, hb);
-
+       hb,
+       wq->timeout_interval.load(),
+       wq->suicide_interval.load());
+       wq->_process(thread_index, hb);
   }
 
   ldout(cct,10) << "sharded worker finish" << dendl;
@@ -410,4 +409,3 @@ void ShardedThreadPool::drain()
   shardedpool_cond.notify_all();
   ldout(cct,10) << "drained" << dendl;
 }
-
index bb9e1b66b633d126d2a27594547ef91fb9a2a562..816be3b1834570e66e036008ed5a1b7a8c483323 100644 (file)
@@ -76,8 +76,8 @@ protected:
   /// Basic interface to a work queue used by the worker threads.
   struct WorkQueue_ {
     std::string name;
-    ceph::timespan timeout_interval;
-    ceph::timespan suicide_interval;
+    std::atomic<ceph::timespan> timeout_interval = ceph::timespan::zero();
+    std::atomic<ceph::timespan> suicide_interval = ceph::timespan::zero();
     WorkQueue_(std::string n, ceph::timespan ti, ceph::timespan sti)
       : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
     { }
@@ -98,10 +98,10 @@ protected:
      * It can be used for non-thread-safe finalization. */
     virtual void _void_process_finish(void *) = 0;
     void set_timeout(time_t ti){
-      timeout_interval = ceph::make_timespan(ti);
+      timeout_interval.store(ceph::make_timespan(ti));
     }
     void set_suicide_timeout(time_t sti){
-      suicide_interval = ceph::make_timespan(sti);
+      suicide_interval.store(ceph::make_timespan(sti));
     }
   };
 
@@ -589,7 +589,8 @@ public:
   class BaseShardedWQ {
   
   public:
-    ceph::timespan timeout_interval, suicide_interval;
+    std::atomic<ceph::timespan> timeout_interval = ceph::timespan::zero();
+    std::atomic<ceph::timespan> suicide_interval = ceph::timespan::zero();
     BaseShardedWQ(ceph::timespan ti, ceph::timespan sti)
       :timeout_interval(ti), suicide_interval(sti) {}
     virtual ~BaseShardedWQ() {}
@@ -598,6 +599,12 @@ public:
     virtual void return_waiting_threads() = 0;
     virtual void stop_return_waiting_threads() = 0;
     virtual bool is_shard_empty(uint32_t thread_index) = 0;
+    void set_timeout(time_t ti) {
+      timeout_interval.store(ceph::make_timespan(ti));
+    }
+    void set_suicide_timeout(time_t sti) {
+      suicide_interval.store(ceph::make_timespan(sti));
+    }
   };
 
   template <typename T>
index 33d1d9983087c282e6d02932c440331c57613f2e..45c54e4184a2214d3b031ef0951d0d9c56de7175 100644 (file)
@@ -8510,7 +8510,7 @@ public:
       void* item = wq->_void_dequeue();
       if (item) {
         processing++;
-        TPHandle tp_handle(cct, nullptr, wq->timeout_interval, wq->suicide_interval);
+        TPHandle tp_handle(cct, nullptr, wq->timeout_interval.load(), wq->suicide_interval.load());
         wq->_void_process(item, tp_handle);
         processing--;
       }
@@ -8712,8 +8712,8 @@ public:
         if (batch.entry_count) {
           TPHandle tp_handle(store->cct,
             nullptr,
-            timeout_interval,
-            suicide_interval);
+            timeout_interval.load(),
+            suicide_interval.load());
           ceph_assert(batch.running == 0);
 
           batch.running++; // just to be on-par with the regular call
index 9a7e04a7d2d0e2411c2ccf0e8ac9be8d4657aa5c..94e334a2939adc0418a3b3521754778847084d90 100644 (file)
@@ -9634,6 +9634,8 @@ const char** OSD::get_tracked_conf_keys() const
     "osd_object_clean_region_max_num_intervals",
     "osd_scrub_min_interval",
     "osd_scrub_max_interval",
+    "osd_op_thread_timeout",
+    "osd_op_thread_suicide_timeout",
     NULL
   };
   return KEYS;
@@ -9757,6 +9759,12 @@ void OSD::handle_conf_change(const ConfigProxy& conf,
     service.poolctx.stop();
     service.poolctx.start(conf.get_val<std::uint64_t>("osd_asio_thread_count"));
   }
+  if (changed.count("osd_op_thread_timeout")) {
+    op_shardedwq.set_timeout(g_conf().get_val<int64_t>("osd_op_thread_timeout"));
+  }
+  if (changed.count("osd_op_thread_suicide_timeout")) {
+    op_shardedwq.set_suicide_timeout(g_conf().get_val<int64_t>("osd_op_thread_suicide_timeout"));
+  }
 }
 
 void OSD::maybe_override_max_osd_capacity_for_qos()
@@ -10639,7 +10647,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
       }
       // found a work item; reapply default wq timeouts
       osd->cct->get_heartbeat_map()->reset_timeout(hb,
-        timeout_interval, suicide_interval);
+        timeout_interval.load(), suicide_interval.load());
     } else {
       dout(20) << __func__ << " need return immediately" << dendl;
       wait_lock.unlock();
@@ -10700,7 +10708,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
       sdata->shard_lock.lock();
       // Reapply default wq timeouts
       osd->cct->get_heartbeat_map()->reset_timeout(hb,
-        timeout_interval, suicide_interval);
+        timeout_interval.load(), suicide_interval.load());
       // Populate the oncommits list if there were any additions
       // to the context_queue while we were waiting
       if (is_smallest_thread_index) {
@@ -10796,8 +10804,8 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
           << " waiting " << slot->waiting
           << " waiting_peering " << slot->waiting_peering << dendl;
 
-  ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
-                                suicide_interval);
+  ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval.load(),
+                                suicide_interval.load());
 
   // take next item
   auto qi = std::move(slot->to_process.front());
index d3354d633ca0f1a5557e8d9f5161a121af273651..771b9d65952fd2de71f6857b39ccef0c460f1724 100644 (file)
@@ -86,13 +86,13 @@ TEST(WorkQueue, change_timeout){
     tp.start();
     twq wq(2, 20, &tp);
     // check timeout and suicide
-    ASSERT_EQ(ceph::make_timespan(2), wq.timeout_interval);
-    ASSERT_EQ(ceph::make_timespan(20), wq.suicide_interval);
+    ASSERT_EQ(ceph::make_timespan(2), wq.timeout_interval.load());
+    ASSERT_EQ(ceph::make_timespan(20), wq.suicide_interval.load());
 
     // change the timeout and suicide and then check them
     wq.set_timeout(4);
     wq.set_suicide_timeout(40);
-    ASSERT_EQ(ceph::make_timespan(4), wq.timeout_interval);
-    ASSERT_EQ(ceph::make_timespan(40), wq.suicide_interval);
+    ASSERT_EQ(ceph::make_timespan(4), wq.timeout_interval.load());
+    ASSERT_EQ(ceph::make_timespan(40), wq.suicide_interval.load());
     tp.stop();
 }