#include "WorkQueue.h"
#include "include/compat.h"
#include "common/errno.h"
+#include "common/ceph_time.h"
#include <sstream>
void ThreadPool::TPHandle::reset_tp_timeout()
{
- cct->get_heartbeat_map()->reset_timeout(
- hb, grace, suicide_grace);
+ const auto now = ceph::coarse_mono_clock::now();
+ if (now + grace - std::chrono::milliseconds(500) <
+ hb->timeout.load(std::memory_order_relaxed)) {
+ // Don't reset the timeout until 0.5 seconds has passed indiciating
+ // the thread is actually slow
+ return;
+ }
+ if (sharded_pool) {
+ sharded_pool->reset_tp_timeout(hb, grace, suicide_grace);
+ } else {
+ cct->get_heartbeat_map()->reset_timeout(
+ hb, grace, suicide_grace);
+ }
}
ThreadPool::~ThreadPool()
}
ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, std::string nm, std::string tn,
- uint32_t pnum_threads):
+ uint32_t pnum_threads, uint32_t pnum_shards):
cct(pcct_),
name(std::move(nm)),
thread_name(std::move(tn)),
lockname(name + "::lock"),
shardedpool_lock(ceph::make_mutex(lockname)),
num_threads(pnum_threads),
+ num_shards(pnum_shards),
num_paused(0),
num_drained(0),
wq(NULL) {}
-void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
+void ShardedThreadPool::reset_tp_timeout(heartbeat_handle_d *hb,
+ ceph::timespan grace,
+ ceph::timespan suicide_grace)
+{
+ // For sharded pools reset the timeout for the set of shards
+ // as shards are likely to share locks
+ std::lock_guard lck(shardedpool_lock);
+ uint32_t thread_index = hb_to_thread_index[hb];
+ uint32_t shard_index = thread_index % num_shards;
+ for (uint32_t index = shard_index;
+ index < num_threads;
+ index += num_shards) {
+ auto shardhb = thread_index_to_hb.find(index);
+ if (shardhb != thread_index_to_hb.end()) {
+ cct->get_heartbeat_map()->reset_timeout(
+ shardhb->second, grace, suicide_grace);
+ }
+ }
+}
+
+void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index, uint32_t shard_index)
{
ceph_assert(wq != NULL);
ldout(cct,10) << "worker start" << dendl;
std::stringstream ss;
ss << name << " thread " << (void *)pthread_self();
auto hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
+ {
+ std::lock_guard lck(shardedpool_lock);
+ hb_to_thread_index[hb] = thread_index;
+ thread_index_to_hb[thread_index] = hb;
+ }
while (!stop_threads) {
if (pause_threads) {
}
if (drain_threads) {
std::unique_lock ul(shardedpool_lock);
- if (wq->is_shard_empty(thread_index)) {
+ if (wq->is_shard_empty(thread_index, shard_index)) {
++num_drained;
wait_cond.notify_all();
while (drain_threads) {
hb,
wq->timeout_interval.load(),
wq->suicide_interval.load());
- wq->_process(thread_index, hb);
+ wq->_process(thread_index, shard_index, hb);
}
ldout(cct,10) << "sharded worker finish" << dendl;
+ {
+ std::lock_guard lck(shardedpool_lock);
+ hb_to_thread_index.erase(hb);
+ thread_index_to_hb.erase(thread_index);
+ }
cct->get_heartbeat_map()->remove_worker(hb);
}
ceph_assert(ceph_mutex_is_locked(shardedpool_lock));
int32_t thread_index = 0;
while (threads_shardedpool.size() < num_threads) {
-
- WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
+ uint32_t shard_index = thread_index % num_shards;
+ WorkThreadSharded *wt = new WorkThreadSharded(this,
+ thread_index,
+ shard_index);
ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
threads_shardedpool.push_back(wt);
wt->create(thread_name.c_str());
#include "common/HBHandle.h"
+class ShardedThreadPool;
+
/// Pool of threads that share work submitted to multiple work queues.
class ThreadPool : public md_config_obs_t {
protected:
public:
class TPHandle : public HBHandle {
- friend class ThreadPool;
CephContext *cct;
ceph::heartbeat_handle_d *hb;
ceph::timespan grace;
ceph::timespan suicide_grace;
+ ShardedThreadPool *sharded_pool;
public:
TPHandle(
CephContext *cct,
ceph::heartbeat_handle_d *hb,
ceph::timespan grace,
- ceph::timespan suicide_grace)
- : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
+ ceph::timespan suicide_grace,
+ ShardedThreadPool *sharded_pool = nullptr)
+ : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace),
+ sharded_pool(sharded_pool) {}
void reset_tp_timeout() override final;
void suspend_tp_timeout() override final;
};
ceph::mutex shardedpool_lock;
ceph::condition_variable shardedpool_cond;
ceph::condition_variable wait_cond;
- uint32_t num_threads;
+ const uint32_t num_threads;
+ const uint32_t num_shards;
+ std::map<heartbeat_handle_d *,uint32_t> hb_to_thread_index;
+ std::map<uint32_t,heartbeat_handle_d *> thread_index_to_hb;
std::atomic<bool> stop_threads = { false };
std::atomic<bool> pause_threads = { false };
:timeout_interval(ti), suicide_interval(sti) {}
virtual ~BaseShardedWQ() {}
- virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0;
+ virtual void _process(uint32_t thread_index,
+ uint32_t shard_index,
+ ceph::heartbeat_handle_d *hb ) = 0;
virtual void return_waiting_threads() = 0;
virtual void stop_return_waiting_threads() = 0;
- virtual bool is_shard_empty(uint32_t thread_index) = 0;
+ virtual bool is_shard_empty(uint32_t thread_index,
+ uint32_t shard_index) = 0;
void set_timeout(time_t ti) {
timeout_interval.store(ceph::make_timespan(ti));
}
// threads
struct WorkThreadSharded : public Thread {
ShardedThreadPool *pool;
- uint32_t thread_index;
- WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
- thread_index(pthread_index) {}
+ const uint32_t thread_index;
+ const uint32_t shard_index;
+ WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index, uint32_t pshard_index): pool(p),
+ thread_index(pthread_index),
+ shard_index(pshard_index) {}
void *entry() override {
- pool->shardedthreadpool_worker(thread_index);
+ pool->shardedthreadpool_worker(thread_index, shard_index);
return 0;
}
};
std::vector<WorkThreadSharded*> threads_shardedpool;
void start_threads();
- void shardedthreadpool_worker(uint32_t thread_index);
+ void shardedthreadpool_worker(uint32_t thread_index, uint32_t shard_index);
void set_wq(BaseShardedWQ* swq) {
wq = swq;
}
public:
- ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads);
+ ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads, uint32_t pnum_shards);
~ShardedThreadPool(){};
+ void reset_tp_timeout(heartbeat_handle_d *hb,
+ ceph::timespan grace,
+ ceph::timespan suicide_grace);
+
/// start thread pool thread
void start();
/// stop thread pool thread
"osd_pg_epoch_max_lag_factor")),
osd_compat(get_osd_compat_set()),
osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
- get_num_op_threads()),
+ get_num_op_threads(), get_num_op_shards()),
heartbeat_stop(false),
heartbeat_need_update(true),
hb_front_client_messenger(hb_client_front),
#undef dout_prefix
#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq(" << shard_index << ") "
-void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
+void OSD::ShardedOpWQ::_process(uint32_t thread_index, uint32_t shard_index, heartbeat_handle_d *hb)
{
- uint32_t shard_index = thread_index % osd->num_shards;
auto& sdata = osd->shards[shard_index];
ceph_assert(sdata);
<< " waiting_peering " << slot->waiting_peering << dendl;
ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval.load(),
- suicide_interval.load());
+ suicide_interval.load(), &osd->osd_op_tp);
// take next item
auto qi = std::move(slot->to_process.front());
OpSchedulerItem&& qi);
/// try to do some work
- void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb) override;
+ void _process(uint32_t thread_index,
+ uint32_t shard_index,
+ ceph::heartbeat_handle_d *hb) override;
void stop_for_fast_shutdown();
}
}
- bool is_shard_empty(uint32_t thread_index) override {
- uint32_t shard_index = thread_index % osd->num_shards;
+ bool is_shard_empty(uint32_t thread_index, uint32_t shard_index) override {
auto &&sdata = osd->shards[shard_index];
ceph_assert(sdata);
std::lock_guard l(sdata->shard_lock);