]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: reset_tp_timeout should reset timeout for all shards
authorBill Scales <bill_scales@uk.ibm.com>
Mon, 24 Nov 2025 09:18:21 +0000 (09:18 +0000)
committerBill Scales <bill_scales@uk.ibm.com>
Thu, 27 Nov 2025 07:44:58 +0000 (07:44 +0000)
ShardedThreadPools are only used by the classic OSD process
which can have more than one thread for the same shard. Each
thread has a heartbeat timeout used to detect stalled threads.
Some code that is known to take a long time makes calls to
reset_tp_timeout to reset this timeout. However for sharded
pools this can be ineffective because it is common for threads
for the same shard to use the same locks (e.g. PG Lock) and
therefore if thread A is taking a long time and resetting
its timeout while holding a lock, thread B for the same shard
is liable to be waiting for the same lock, will not be
resetting its timeout and can be timed out.

Debug for issue 72879 showed heartbeat timeouts occurring at
the same time for both shards, an attempt to fix the problem
by calling reset_tp_timeout for the slow thread still showed
the other threads for the shard timing out waiting for the PG
lock that was held bythe slow thread. Looking at the OSD code
most places where reset_tp_timeout is called the thread is
holding the PG lock.

This commit moves the concept of shard_index from OSD into
ShardedThreadPool and modifies reset_tp_timeout so that it resets
the timeout for all threads for the same shard.

Some code calls reset_tp_timeout from inside loops that can take
a long time without consideration for how long the thread has
actually been running for. There is a risk that this type of
call could repeatedly reset the timeout for another shard which
is genuinely stuck and hence defeat the heartbeat checks. To
prevent this reset_tp_timeout is modified to be a NOP unless
the thread has been processing the current workitem for more
than 0.5 seconds. Therefore threads have to be slow but making
forward progress to be abe to reset the timeout.

Fixes: https://tracker.ceph.com/issues/72879
Signed-off-by: Bill Scales <bill_scales@uk.ibm.com>
src/common/WorkQueue.cc
src/common/WorkQueue.h
src/osd/OSD.cc
src/osd/OSD.h

index 0c8efbc3a163c1f3f9f33444ca1bf5c00d328397..206e87cc32a1de9acc690cc53f6486c591b7b6e5 100644 (file)
@@ -16,6 +16,7 @@
 #include "WorkQueue.h"
 #include "include/compat.h"
 #include "common/errno.h"
+#include "common/ceph_time.h"
 
 #include <sstream>
 
@@ -45,8 +46,19 @@ void ThreadPool::TPHandle::suspend_tp_timeout()
 
 void ThreadPool::TPHandle::reset_tp_timeout()
 {
-  cct->get_heartbeat_map()->reset_timeout(
-    hb, grace, suicide_grace);
+  const auto now = ceph::coarse_mono_clock::now();
+  if (now + grace - std::chrono::milliseconds(500) <
+      hb->timeout.load(std::memory_order_relaxed)) {
+    // Don't reset the timeout until 0.5 seconds has passed indiciating
+    // the thread is actually slow
+    return;
+  }
+  if (sharded_pool) {
+    sharded_pool->reset_tp_timeout(hb, grace, suicide_grace);
+  } else {
+    cct->get_heartbeat_map()->reset_timeout(
+      hb, grace, suicide_grace);
+  }
 }
 
 ThreadPool::~ThreadPool()
@@ -247,18 +259,39 @@ void ThreadPool::drain(WorkQueue_* wq)
 }
 
 ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, std::string nm, std::string tn,
-                                    uint32_t pnum_threads):
+                                    uint32_t pnum_threads, uint32_t pnum_shards):
   cct(pcct_),
   name(std::move(nm)),
   thread_name(std::move(tn)),
   lockname(name + "::lock"),
   shardedpool_lock(ceph::make_mutex(lockname)),
   num_threads(pnum_threads),
+  num_shards(pnum_shards),
   num_paused(0),
   num_drained(0),
   wq(NULL) {}
 
-void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
+void ShardedThreadPool::reset_tp_timeout(heartbeat_handle_d *hb,
+                                         ceph::timespan grace,
+                                         ceph::timespan suicide_grace)
+{
+  // For sharded pools reset the timeout for the set of shards
+  // as shards are likely to share locks
+  std::lock_guard lck(shardedpool_lock);
+  uint32_t thread_index = hb_to_thread_index[hb];
+  uint32_t shard_index = thread_index % num_shards;
+  for (uint32_t index = shard_index;
+       index < num_threads;
+       index += num_shards) {
+    auto shardhb = thread_index_to_hb.find(index);
+    if (shardhb != thread_index_to_hb.end()) {
+      cct->get_heartbeat_map()->reset_timeout(
+        shardhb->second, grace, suicide_grace);
+    }
+  }
+}
+
+void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index, uint32_t shard_index)
 {
   ceph_assert(wq != NULL);
   ldout(cct,10) << "worker start" << dendl;
@@ -266,6 +299,11 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
   std::stringstream ss;
   ss << name << " thread " << (void *)pthread_self();
   auto hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
+  {
+      std::lock_guard lck(shardedpool_lock);
+      hb_to_thread_index[hb] = thread_index;
+      thread_index_to_hb[thread_index] = hb;
+  }
 
   while (!stop_threads) {
     if (pause_threads) {
@@ -285,7 +323,7 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
     }
     if (drain_threads) {
       std::unique_lock ul(shardedpool_lock);
-      if (wq->is_shard_empty(thread_index)) {
+      if (wq->is_shard_empty(thread_index, shard_index)) {
         ++num_drained;
         wait_cond.notify_all();
         while (drain_threads) {
@@ -305,11 +343,16 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
        hb,
        wq->timeout_interval.load(),
        wq->suicide_interval.load());
-       wq->_process(thread_index, hb);
+    wq->_process(thread_index, shard_index, hb);
   }
 
   ldout(cct,10) << "sharded worker finish" << dendl;
 
+  {
+      std::lock_guard lck(shardedpool_lock);
+      hb_to_thread_index.erase(hb);
+      thread_index_to_hb.erase(thread_index);
+  }
   cct->get_heartbeat_map()->remove_worker(hb);
 
 }
@@ -319,8 +362,10 @@ void ShardedThreadPool::start_threads()
   ceph_assert(ceph_mutex_is_locked(shardedpool_lock));
   int32_t thread_index = 0;
   while (threads_shardedpool.size() < num_threads) {
-
-    WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
+    uint32_t shard_index = thread_index % num_shards;
+    WorkThreadSharded *wt = new WorkThreadSharded(this,
+                                                  thread_index,
+                                                  shard_index);
     ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
     threads_shardedpool.push_back(wt);
     wt->create(thread_name.c_str());
index c38d27d54e427e63c9daa8e846dd942482df070e..0306435b810b60dd7ad85eb2d5ea2d213c339815 100644 (file)
@@ -40,6 +40,8 @@ struct ThreadPool {
 #include "common/HBHandle.h"
 
 
+class ShardedThreadPool;
+
 /// Pool of threads that share work submitted to multiple work queues.
 class ThreadPool : public md_config_obs_t {
 protected:
@@ -56,18 +58,20 @@ protected:
 
 public:
   class TPHandle : public HBHandle {
-    friend class ThreadPool;
     CephContext *cct;
     ceph::heartbeat_handle_d *hb;
     ceph::timespan grace;
     ceph::timespan suicide_grace;
+    ShardedThreadPool *sharded_pool;
   public:
     TPHandle(
       CephContext *cct,
       ceph::heartbeat_handle_d *hb,
       ceph::timespan grace,
-      ceph::timespan suicide_grace)
-      : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
+      ceph::timespan suicide_grace,
+      ShardedThreadPool *sharded_pool = nullptr)
+      : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace),
+        sharded_pool(sharded_pool) {}
     void reset_tp_timeout() override final;
     void suspend_tp_timeout() override final;
   };
@@ -578,7 +582,10 @@ class ShardedThreadPool {
   ceph::mutex shardedpool_lock;
   ceph::condition_variable shardedpool_cond;
   ceph::condition_variable wait_cond;
-  uint32_t num_threads;
+  const uint32_t num_threads;
+  const uint32_t num_shards;
+  std::map<heartbeat_handle_d *,uint32_t> hb_to_thread_index;
+  std::map<uint32_t,heartbeat_handle_d *> thread_index_to_hb;
 
   std::atomic<bool> stop_threads = { false };
   std::atomic<bool> pause_threads = { false };
@@ -598,10 +605,13 @@ public:
       :timeout_interval(ti), suicide_interval(sti) {}
     virtual ~BaseShardedWQ() {}
 
-    virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0;
+    virtual void _process(uint32_t thread_index,
+                          uint32_t shard_index,
+                          ceph::heartbeat_handle_d *hb ) = 0;
     virtual void return_waiting_threads() = 0;
     virtual void stop_return_waiting_threads() = 0;
-    virtual bool is_shard_empty(uint32_t thread_index) = 0;
+    virtual bool is_shard_empty(uint32_t thread_index,
+                                uint32_t shard_index) = 0;
     void set_timeout(time_t ti) {
       timeout_interval.store(ceph::make_timespan(ti));
     }
@@ -646,18 +656,20 @@ private:
   // threads
   struct WorkThreadSharded : public Thread {
     ShardedThreadPool *pool;
-    uint32_t thread_index;
-    WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
-      thread_index(pthread_index) {}
+    const uint32_t thread_index;
+    const uint32_t shard_index;
+    WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index, uint32_t pshard_index): pool(p),
+      thread_index(pthread_index),
+      shard_index(pshard_index) {}
     void *entry() override {
-      pool->shardedthreadpool_worker(thread_index);
+      pool->shardedthreadpool_worker(thread_index, shard_index);
       return 0;
     }
   };
 
   std::vector<WorkThreadSharded*> threads_shardedpool;
   void start_threads();
-  void shardedthreadpool_worker(uint32_t thread_index);
+  void shardedthreadpool_worker(uint32_t thread_index, uint32_t shard_index);
   void set_wq(BaseShardedWQ* swq) {
     wq = swq;
   }
@@ -666,10 +678,14 @@ private:
 
 public:
 
-  ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads);
+  ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads, uint32_t pnum_shards);
 
   ~ShardedThreadPool(){};
 
+  void reset_tp_timeout(heartbeat_handle_d *hb,
+                        ceph::timespan grace,
+                        ceph::timespan suicide_grace);
+
   /// start thread pool thread
   void start();
   /// stop thread pool thread
index ae60a6ba22490baf89a7cb84a4c6f993e006b06a..5e8ef8927cd12d61c25c4e853f6559217dd9d248 100644 (file)
@@ -2427,7 +2427,7 @@ OSD::OSD(CephContext *cct_,
                                  "osd_pg_epoch_max_lag_factor")),
   osd_compat(get_osd_compat_set()),
   osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
-           get_num_op_threads()),
+           get_num_op_threads(), get_num_op_shards()),
   heartbeat_stop(false),
   heartbeat_need_update(true),
   hb_front_client_messenger(hb_client_front),
@@ -11069,9 +11069,8 @@ void OSD::ShardedOpWQ::_add_slot_waiter(
 #undef dout_prefix
 #define dout_prefix *_dout << "osd." << osd->whoami << " op_wq(" << shard_index << ") "
 
-void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
+void OSD::ShardedOpWQ::_process(uint32_t thread_index, uint32_t shard_index, heartbeat_handle_d *hb)
 {
-  uint32_t shard_index = thread_index % osd->num_shards;
   auto& sdata = osd->shards[shard_index];
   ceph_assert(sdata);
 
@@ -11261,7 +11260,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
           << " waiting_peering " << slot->waiting_peering << dendl;
 
   ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval.load(),
-                                suicide_interval.load());
+                                suicide_interval.load(), &osd->osd_op_tp);
 
   // take next item
   auto qi = std::move(slot->to_process.front());
index 5c00a23be5f0e41ad1597455d7605d30a9a205fc..3b3e7092650d4e3a2be2351fee2aec1a1e78d029 100644 (file)
@@ -1745,7 +1745,9 @@ protected:
       OpSchedulerItem&& qi);
 
     /// try to do some work
-    void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb) override;
+    void _process(uint32_t thread_index,
+                  uint32_t shard_index,
+                  ceph::heartbeat_handle_d *hb) override;
 
     void stop_for_fast_shutdown();
 
@@ -1789,8 +1791,7 @@ protected:
       }
     }
 
-    bool is_shard_empty(uint32_t thread_index) override {
-      uint32_t shard_index = thread_index % osd->num_shards;
+    bool is_shard_empty(uint32_t thread_index, uint32_t shard_index) override {
       auto &&sdata = osd->shards[shard_index];
       ceph_assert(sdata);
       std::lock_guard l(sdata->shard_lock);