From: Bill Scales Date: Mon, 24 Nov 2025 09:18:21 +0000 (+0000) Subject: osd: reset_tp_timeout should reset timeout for all shards X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7264ee9e434e155bdabbc4540d0f7df531acfc79;p=ceph.git osd: reset_tp_timeout should reset timeout for all shards ShardedThreadPools are only used by the classic OSD process which can have more than one thread for the same shard. Each thread has a heartbeat timeout used to detect stalled threads. Some code that is known to take a long time makes calls to reset_tp_timeout to reset this timeout. However for sharded pools this can be ineffective because it is common for threads for the same shard to use the same locks (e.g. PG Lock) and therefore if thread A is taking a long time and resetting its timeout while holding a lock, thread B for the same shard is liable to be waiting for the same lock, will not be resetting its timeout and can be timed out. Debug for issue 72879 showed heartbeat timeouts occurring at the same time for both shards, an attempt to fix the problem by calling reset_tp_timeout for the slow thread still showed the other threads for the shard timing out waiting for the PG lock that was held bythe slow thread. Looking at the OSD code most places where reset_tp_timeout is called the thread is holding the PG lock. This commit moves the concept of shard_index from OSD into ShardedThreadPool and modifies reset_tp_timeout so that it resets the timeout for all threads for the same shard. Some code calls reset_tp_timeout from inside loops that can take a long time without consideration for how long the thread has actually been running for. There is a risk that this type of call could repeatedly reset the timeout for another shard which is genuinely stuck and hence defeat the heartbeat checks. To prevent this reset_tp_timeout is modified to be a NOP unless the thread has been processing the current workitem for more than 0.5 seconds. Therefore threads have to be slow but making forward progress to be abe to reset the timeout. Fixes: https://tracker.ceph.com/issues/72879 Signed-off-by: Bill Scales --- diff --git a/src/common/WorkQueue.cc b/src/common/WorkQueue.cc index 0c8efbc3a163..206e87cc32a1 100644 --- a/src/common/WorkQueue.cc +++ b/src/common/WorkQueue.cc @@ -16,6 +16,7 @@ #include "WorkQueue.h" #include "include/compat.h" #include "common/errno.h" +#include "common/ceph_time.h" #include @@ -45,8 +46,19 @@ void ThreadPool::TPHandle::suspend_tp_timeout() void ThreadPool::TPHandle::reset_tp_timeout() { - cct->get_heartbeat_map()->reset_timeout( - hb, grace, suicide_grace); + const auto now = ceph::coarse_mono_clock::now(); + if (now + grace - std::chrono::milliseconds(500) < + hb->timeout.load(std::memory_order_relaxed)) { + // Don't reset the timeout until 0.5 seconds has passed indiciating + // the thread is actually slow + return; + } + if (sharded_pool) { + sharded_pool->reset_tp_timeout(hb, grace, suicide_grace); + } else { + cct->get_heartbeat_map()->reset_timeout( + hb, grace, suicide_grace); + } } ThreadPool::~ThreadPool() @@ -247,18 +259,39 @@ void ThreadPool::drain(WorkQueue_* wq) } ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, std::string nm, std::string tn, - uint32_t pnum_threads): + uint32_t pnum_threads, uint32_t pnum_shards): cct(pcct_), name(std::move(nm)), thread_name(std::move(tn)), lockname(name + "::lock"), shardedpool_lock(ceph::make_mutex(lockname)), num_threads(pnum_threads), + num_shards(pnum_shards), num_paused(0), num_drained(0), wq(NULL) {} -void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) +void ShardedThreadPool::reset_tp_timeout(heartbeat_handle_d *hb, + ceph::timespan grace, + ceph::timespan suicide_grace) +{ + // For sharded pools reset the timeout for the set of shards + // as shards are likely to share locks + std::lock_guard lck(shardedpool_lock); + uint32_t thread_index = hb_to_thread_index[hb]; + uint32_t shard_index = thread_index % num_shards; + for (uint32_t index = shard_index; + index < num_threads; + index += num_shards) { + auto shardhb = thread_index_to_hb.find(index); + if (shardhb != thread_index_to_hb.end()) { + cct->get_heartbeat_map()->reset_timeout( + shardhb->second, grace, suicide_grace); + } + } +} + +void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index, uint32_t shard_index) { ceph_assert(wq != NULL); ldout(cct,10) << "worker start" << dendl; @@ -266,6 +299,11 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) std::stringstream ss; ss << name << " thread " << (void *)pthread_self(); auto hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self()); + { + std::lock_guard lck(shardedpool_lock); + hb_to_thread_index[hb] = thread_index; + thread_index_to_hb[thread_index] = hb; + } while (!stop_threads) { if (pause_threads) { @@ -285,7 +323,7 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) } if (drain_threads) { std::unique_lock ul(shardedpool_lock); - if (wq->is_shard_empty(thread_index)) { + if (wq->is_shard_empty(thread_index, shard_index)) { ++num_drained; wait_cond.notify_all(); while (drain_threads) { @@ -305,11 +343,16 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) hb, wq->timeout_interval.load(), wq->suicide_interval.load()); - wq->_process(thread_index, hb); + wq->_process(thread_index, shard_index, hb); } ldout(cct,10) << "sharded worker finish" << dendl; + { + std::lock_guard lck(shardedpool_lock); + hb_to_thread_index.erase(hb); + thread_index_to_hb.erase(thread_index); + } cct->get_heartbeat_map()->remove_worker(hb); } @@ -319,8 +362,10 @@ void ShardedThreadPool::start_threads() ceph_assert(ceph_mutex_is_locked(shardedpool_lock)); int32_t thread_index = 0; while (threads_shardedpool.size() < num_threads) { - - WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index); + uint32_t shard_index = thread_index % num_shards; + WorkThreadSharded *wt = new WorkThreadSharded(this, + thread_index, + shard_index); ldout(cct, 10) << "start_threads creating and starting " << wt << dendl; threads_shardedpool.push_back(wt); wt->create(thread_name.c_str()); diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index c38d27d54e42..0306435b810b 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -40,6 +40,8 @@ struct ThreadPool { #include "common/HBHandle.h" +class ShardedThreadPool; + /// Pool of threads that share work submitted to multiple work queues. class ThreadPool : public md_config_obs_t { protected: @@ -56,18 +58,20 @@ protected: public: class TPHandle : public HBHandle { - friend class ThreadPool; CephContext *cct; ceph::heartbeat_handle_d *hb; ceph::timespan grace; ceph::timespan suicide_grace; + ShardedThreadPool *sharded_pool; public: TPHandle( CephContext *cct, ceph::heartbeat_handle_d *hb, ceph::timespan grace, - ceph::timespan suicide_grace) - : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {} + ceph::timespan suicide_grace, + ShardedThreadPool *sharded_pool = nullptr) + : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace), + sharded_pool(sharded_pool) {} void reset_tp_timeout() override final; void suspend_tp_timeout() override final; }; @@ -578,7 +582,10 @@ class ShardedThreadPool { ceph::mutex shardedpool_lock; ceph::condition_variable shardedpool_cond; ceph::condition_variable wait_cond; - uint32_t num_threads; + const uint32_t num_threads; + const uint32_t num_shards; + std::map hb_to_thread_index; + std::map thread_index_to_hb; std::atomic stop_threads = { false }; std::atomic pause_threads = { false }; @@ -598,10 +605,13 @@ public: :timeout_interval(ti), suicide_interval(sti) {} virtual ~BaseShardedWQ() {} - virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0; + virtual void _process(uint32_t thread_index, + uint32_t shard_index, + ceph::heartbeat_handle_d *hb ) = 0; virtual void return_waiting_threads() = 0; virtual void stop_return_waiting_threads() = 0; - virtual bool is_shard_empty(uint32_t thread_index) = 0; + virtual bool is_shard_empty(uint32_t thread_index, + uint32_t shard_index) = 0; void set_timeout(time_t ti) { timeout_interval.store(ceph::make_timespan(ti)); } @@ -646,18 +656,20 @@ private: // threads struct WorkThreadSharded : public Thread { ShardedThreadPool *pool; - uint32_t thread_index; - WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p), - thread_index(pthread_index) {} + const uint32_t thread_index; + const uint32_t shard_index; + WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index, uint32_t pshard_index): pool(p), + thread_index(pthread_index), + shard_index(pshard_index) {} void *entry() override { - pool->shardedthreadpool_worker(thread_index); + pool->shardedthreadpool_worker(thread_index, shard_index); return 0; } }; std::vector threads_shardedpool; void start_threads(); - void shardedthreadpool_worker(uint32_t thread_index); + void shardedthreadpool_worker(uint32_t thread_index, uint32_t shard_index); void set_wq(BaseShardedWQ* swq) { wq = swq; } @@ -666,10 +678,14 @@ private: public: - ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads); + ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads, uint32_t pnum_shards); ~ShardedThreadPool(){}; + void reset_tp_timeout(heartbeat_handle_d *hb, + ceph::timespan grace, + ceph::timespan suicide_grace); + /// start thread pool thread void start(); /// stop thread pool thread diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index ae60a6ba2249..5e8ef8927cd1 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -2427,7 +2427,7 @@ OSD::OSD(CephContext *cct_, "osd_pg_epoch_max_lag_factor")), osd_compat(get_osd_compat_set()), osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp", - get_num_op_threads()), + get_num_op_threads(), get_num_op_shards()), heartbeat_stop(false), heartbeat_need_update(true), hb_front_client_messenger(hb_client_front), @@ -11069,9 +11069,8 @@ void OSD::ShardedOpWQ::_add_slot_waiter( #undef dout_prefix #define dout_prefix *_dout << "osd." << osd->whoami << " op_wq(" << shard_index << ") " -void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) +void OSD::ShardedOpWQ::_process(uint32_t thread_index, uint32_t shard_index, heartbeat_handle_d *hb) { - uint32_t shard_index = thread_index % osd->num_shards; auto& sdata = osd->shards[shard_index]; ceph_assert(sdata); @@ -11261,7 +11260,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) << " waiting_peering " << slot->waiting_peering << dendl; ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval.load(), - suicide_interval.load()); + suicide_interval.load(), &osd->osd_op_tp); // take next item auto qi = std::move(slot->to_process.front()); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 5c00a23be5f0..3b3e7092650d 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1745,7 +1745,9 @@ protected: OpSchedulerItem&& qi); /// try to do some work - void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb) override; + void _process(uint32_t thread_index, + uint32_t shard_index, + ceph::heartbeat_handle_d *hb) override; void stop_for_fast_shutdown(); @@ -1789,8 +1791,7 @@ protected: } } - bool is_shard_empty(uint32_t thread_index) override { - uint32_t shard_index = thread_index % osd->num_shards; + bool is_shard_empty(uint32_t thread_index, uint32_t shard_index) override { auto &&sdata = osd->shards[shard_index]; ceph_assert(sdata); std::lock_guard l(sdata->shard_lock);