From c11594a479ca24e790acf8e764e3759c4c661110 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sat, 1 Aug 2020 22:50:25 +0800 Subject: [PATCH] common/WorkQueue,librbd,rgw: use ceph::timespan for representing interval better readability Signed-off-by: Kefu Chai --- src/common/WorkQueue.cc | 12 ++++----- src/common/WorkQueue.h | 38 ++++++++++++++++++---------- src/journal/Journaler.cc | 4 ++- src/librbd/ImageState.cc | 5 ++-- src/librbd/Journal.cc | 3 ++- src/mds/MDSRank.cc | 4 ++- src/os/bluestore/BlueStore.cc | 2 +- src/os/filestore/FileStore.cc | 6 +++-- src/os/filestore/FileStore.h | 9 +++++-- src/osd/OSD.cc | 4 +-- src/osd/OSD.h | 4 +-- src/osd/OSDMapMapping.h | 5 +++- src/rgw/rgw_cr_rados.cc | 6 +++-- src/rgw/rgw_cr_rados.h | 4 ++- src/rgw/rgw_process.h | 9 ++++--- src/test/heartbeat_map.cc | 8 +++--- src/test/journal/RadosTestFixture.cc | 3 ++- src/test/msgr/perf_msgr_server.cc | 4 +-- 18 files changed, 82 insertions(+), 48 deletions(-) diff --git a/src/common/WorkQueue.cc b/src/common/WorkQueue.cc index 9e68b28a7670..f8e84aeae304 100644 --- a/src/common/WorkQueue.cc +++ b/src/common/WorkQueue.cc @@ -278,8 +278,8 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) while (pause_threads) { cct->get_heartbeat_map()->reset_timeout( hb, - ceph::make_timespan(wq->timeout_interval), - ceph::make_timespan(wq->suicide_interval)); + wq->timeout_interval, + wq->suicide_interval); shardedpool_cond.wait_for( ul, std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait)); @@ -294,8 +294,8 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) while (drain_threads) { cct->get_heartbeat_map()->reset_timeout( hb, - ceph::make_timespan(wq->timeout_interval), - ceph::make_timespan(wq->suicide_interval)); + wq->timeout_interval, + wq->suicide_interval); shardedpool_cond.wait_for( ul, std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait)); @@ -306,8 +306,8 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) cct->get_heartbeat_map()->reset_timeout( hb, - ceph::make_timespan(wq->timeout_interval), - ceph::make_timespan(wq->suicide_interval)); + wq->timeout_interval, + wq->suicide_interval); wq->_process(thread_index, hb); } diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index 281fececc0a2..533ce84e9196 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -65,8 +65,8 @@ public: TPHandle( CephContext *cct, ceph::heartbeat_handle_d *hb, - time_t grace, - time_t suicide_grace) + ceph::timespan grace, + ceph::timespan suicide_grace) : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {} void reset_tp_timeout() override final; void suspend_tp_timeout() override final; @@ -76,8 +76,9 @@ protected: /// Basic interface to a work queue used by the worker threads. struct WorkQueue_ { std::string name; - time_t timeout_interval, suicide_interval; - WorkQueue_(std::string n, time_t ti, time_t sti) + ceph::timespan timeout_interval; + ceph::timespan suicide_interval; + WorkQueue_(std::string n, ceph::timespan ti, ceph::timespan sti) : name(std::move(n)), timeout_interval(ti), suicide_interval(sti) { } virtual ~WorkQueue_() {} @@ -243,7 +244,10 @@ public: void _clear() override {} public: - WorkQueueVal(std::string n, time_t ti, time_t sti, ThreadPool *p) + WorkQueueVal(std::string n, + ceph::timespan ti, + ceph::timespan sti, + ThreadPool *p) : WorkQueue_(std::move(n), ti, sti), pool(p) { pool->add_work_queue(this); } @@ -306,7 +310,9 @@ public: virtual void _process(T *t, TPHandle &) = 0; public: - WorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p) + WorkQueue(std::string n, + ceph::timespan ti, ceph::timespan sti, + ThreadPool* p) : WorkQueue_(std::move(n), ti, sti), pool(p) { pool->add_work_queue(this); } @@ -383,7 +389,9 @@ public: return _empty(); } protected: - PointerWQ(std::string n, time_t ti, time_t sti, ThreadPool* p) + PointerWQ(std::string n, + ceph::timespan ti, ceph::timespan sti, + ThreadPool* p) : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) { } void register_work_queue() { @@ -553,7 +561,7 @@ class GenContextWQ : public ThreadPool::WorkQueueVal*> { std::list*> _queue; public: - GenContextWQ(const std::string &name, time_t ti, ThreadPool *tp) + GenContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp) : ThreadPool::WorkQueueVal< GenContext*>(name, ti, ti*10, tp) {} @@ -593,8 +601,8 @@ public: /// @see Finisher class ContextWQ : public ThreadPool::PointerWQ { public: - ContextWQ(const std::string &name, time_t ti, ThreadPool *tp) - : ThreadPool::PointerWQ(name, ti, 0, tp) { + ContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp) + : ThreadPool::PointerWQ(name, ti, ceph::timespan::zero(), tp) { this->register_work_queue(); } @@ -654,8 +662,9 @@ public: class BaseShardedWQ { public: - time_t timeout_interval, suicide_interval; - BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {} + ceph::timespan timeout_interval, suicide_interval; + BaseShardedWQ(ceph::timespan ti, ceph::timespan sti) + :timeout_interval(ti), suicide_interval(sti) {} virtual ~BaseShardedWQ() {} virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0; @@ -675,8 +684,9 @@ public: public: - ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti), - sharded_pool(tp) { + ShardedWQ(ceph::timespan ti, + ceph::timespan sti, ShardedThreadPool* tp) + : BaseShardedWQ(ti, sti), sharded_pool(tp) { tp->set_wq(this); } ~ShardedWQ() override {} diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index c720ebf323f6..1838d633ac92 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -47,7 +47,9 @@ Journaler::Threads::Threads(CephContext *cct) { thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1); thread_pool->start(); - work_queue = new ContextWQ("Journaler::work_queue", 60, thread_pool); + work_queue = new ContextWQ("Journaler::work_queue", + ceph::make_timespan(60), + thread_pool); timer = new SafeTimer(cct, timer_lock, true); timer->init(); diff --git a/src/librbd/ImageState.cc b/src/librbd/ImageState.cc index a7d47276ca89..1c81471d20fe 100644 --- a/src/librbd/ImageState.cc +++ b/src/librbd/ImageState.cc @@ -220,8 +220,9 @@ private: ThreadPoolSingleton>("librbd::ImageUpdateWatchers::thread_pool", false, m_cct); m_work_queue = new ContextWQ("librbd::ImageUpdateWatchers::work_queue", - m_cct->_conf.get_val("rbd_op_thread_timeout"), - &thread_pool); + ceph::make_timespan( + m_cct->_conf.get_val("rbd_op_thread_timeout")), + &thread_pool); } void destroy_work_queue() { diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index 4c5301210457..11158366be50 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -51,7 +51,8 @@ public: explicit ThreadPoolSingleton(CephContext *cct) : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1), work_queue(new ContextWQ("librbd::journal::work_queue", - cct->_conf.get_val("rbd_op_thread_timeout"), + ceph::make_timespan( + cct->_conf.get_val("rbd_op_thread_timeout")), this)) { start(); } diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index 9a0e2a5390ce..fe51eea00ac4 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -1319,7 +1319,9 @@ void MDSRank::heartbeat_reset() // (by blacklisting us) when we fail to send beacons, and it's simpler to // only have one way of dying. auto grace = g_conf().get_val("mds_heartbeat_grace"); - g_ceph_context->get_heartbeat_map()->reset_timeout(hb, grace, 0); + g_ceph_context->get_heartbeat_map()->reset_timeout(hb, + ceph::make_timespan(grace), + ceph::timespan::zero()); } bool MDSRank::is_stale_message(const cref_t &m) const diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index 32c2809a67ac..e71991d0dde6 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -7683,7 +7683,7 @@ public: ceph::mutex* _sb_info_lock, BlueStore::sb_info_map_t& _sb_info, BlueStoreRepairer* _repairer) : - WorkQueue_(n, time_t(), time_t()), + WorkQueue_(n, ceph::timespan::zero(), ceph::timespan::zero()), batchCount(_batchCount), store(_store), sb_info_lock(_sb_info_lock), diff --git a/src/os/filestore/FileStore.cc b/src/os/filestore/FileStore.cc index 92539a9320bd..dc304659b180 100644 --- a/src/os/filestore/FileStore.cc +++ b/src/os/filestore/FileStore.cc @@ -572,8 +572,10 @@ FileStore::FileStore(CephContext* cct, const std::string &base, m_ondisk_finisher_num(cct->_conf->filestore_ondisk_finisher_threads), m_apply_finisher_num(cct->_conf->filestore_apply_finisher_threads), op_tp(cct, "FileStore::op_tp", "tp_fstore_op", cct->_conf->filestore_op_threads, "filestore_op_threads"), - op_wq(this, cct->_conf->filestore_op_thread_timeout, - cct->_conf->filestore_op_thread_suicide_timeout, &op_tp), + op_wq(this, + ceph::make_timespan(cct->_conf->filestore_op_thread_timeout), + ceph::make_timespan(cct->_conf->filestore_op_thread_suicide_timeout), + &op_tp), logger(nullptr), trace_endpoint("0.0.0.0", 0, "FileStore"), m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout), diff --git a/src/os/filestore/FileStore.h b/src/os/filestore/FileStore.h index fb97bf46eb2e..324dbbe4daf5 100644 --- a/src/os/filestore/FileStore.h +++ b/src/os/filestore/FileStore.h @@ -392,8 +392,13 @@ private: ThreadPool op_tp; struct OpWQ : public ThreadPool::WorkQueue { FileStore *store; - OpWQ(FileStore *fs, time_t timeout, time_t suicide_timeout, ThreadPool *tp) - : ThreadPool::WorkQueue("FileStore::OpWQ", timeout, suicide_timeout, tp), store(fs) {} + OpWQ(FileStore *fs, + ceph::timespan timeout, + ceph::timespan suicide_timeout, + ThreadPool *tp) + : ThreadPool::WorkQueue("FileStore::OpWQ", + timeout, suicide_timeout, tp), + store(fs) {} bool _enqueue(OpSequencer *osr) override { store->op_queue.push_back(osr); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index d9889daf695a..9907ce7132ff 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -2182,8 +2182,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, test_ops_hook(NULL), op_shardedwq( this, - cct->_conf->osd_op_thread_timeout, - cct->_conf->osd_op_thread_suicide_timeout, + ceph::make_timespan(cct->_conf->osd_op_thread_timeout), + ceph::make_timespan(cct->_conf->osd_op_thread_suicide_timeout), &osd_op_tp), last_pg_create_epoch(0), boot_finisher(cct), diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 13c9eacb8706..5ebb54eb35b3 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1574,8 +1574,8 @@ protected: public: ShardedOpWQ(OSD *o, - time_t ti, - time_t si, + ceph::timespan ti, + ceph::timespan si, ShardedThreadPool* tp) : ShardedThreadPool::ShardedWQ(ti, si, tp), osd(o) { diff --git a/src/osd/OSDMapMapping.h b/src/osd/OSDMapMapping.h index 4096c389de45..70258ff4e8d6 100644 --- a/src/osd/OSDMapMapping.h +++ b/src/osd/OSDMapMapping.h @@ -115,7 +115,10 @@ protected: ParallelPGMapper *m; WQ(ParallelPGMapper *m_, ThreadPool *tp) - : ThreadPool::WorkQueue("ParallelPGMapper::WQ", 0, 0, tp), + : ThreadPool::WorkQueue("ParallelPGMapper::WQ", + ceph::timespan::zero(), + ceph::timespan::zero(), + tp), m(m_) {} bool _enqueue(Item *i) override { diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index faf0ee6a4dba..a269af767a3d 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -70,8 +70,10 @@ void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() { RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(CephContext *_cct, int num_threads) : cct(_cct), m_tp(cct, "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads), req_throttle(_cct, "rgw_async_rados_ops", num_threads * 2), - req_wq(this, g_conf()->rgw_op_thread_timeout, - g_conf()->rgw_op_thread_suicide_timeout, &m_tp) { + req_wq(this, + ceph::make_timespan(g_conf()->rgw_op_thread_timeout), + ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout), + &m_tp) { } void RGWAsyncRadosProcessor::start() { diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 0f0d5ba17929..0e8a94154f0e 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -75,7 +75,9 @@ protected: struct RGWWQ : public ThreadPool::WorkQueue { RGWAsyncRadosProcessor *processor; - RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp) + RGWWQ(RGWAsyncRadosProcessor *p, + ceph::timespan timeout, ceph::timespan suicide_timeout, + ThreadPool *tp) : ThreadPool::WorkQueue("RGWWQ", timeout, suicide_timeout, tp), processor(p) {} bool _enqueue(RGWAsyncRadosRequest *req) override; diff --git a/src/rgw/rgw_process.h b/src/rgw/rgw_process.h index 124c2bd1f769..c3c93409ac4b 100644 --- a/src/rgw/rgw_process.h +++ b/src/rgw/rgw_process.h @@ -59,7 +59,8 @@ protected: struct RGWWQ : public ThreadPool::WorkQueue { RGWProcess* process; - RGWWQ(RGWProcess* p, time_t timeout, time_t suicide_timeout, ThreadPool* tp) + RGWWQ(RGWProcess* p, ceph::timespan timeout, ceph::timespan suicide_timeout, + ThreadPool* tp) : ThreadPool::WorkQueue("RGWWQ", timeout, suicide_timeout, tp), process(p) {} @@ -101,8 +102,10 @@ public: conf(conf), sock_fd(-1), uri_prefix(pe->uri_prefix), - req_wq(this, g_conf()->rgw_op_thread_timeout, - g_conf()->rgw_op_thread_suicide_timeout, &m_tp) { + req_wq(this, + ceph::make_timespan(g_conf()->rgw_op_thread_timeout), + ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout), + &m_tp) { } virtual ~RGWProcess() = default; diff --git a/src/test/heartbeat_map.cc b/src/test/heartbeat_map.cc index 7c98c9082349..27f1fe721e3e 100644 --- a/src/test/heartbeat_map.cc +++ b/src/test/heartbeat_map.cc @@ -24,9 +24,9 @@ TEST(HeartbeatMap, Healthy) { HeartbeatMap hm(g_ceph_context); heartbeat_handle_d *h = hm.add_worker("one", pthread_self()); - hm.reset_timeout(h, 9, 18); + hm.reset_timeout(h, ceph::make_timespan(9), ceph::make_timespan(18)); bool healthy = hm.is_healthy(); - ASSERT_EQ(healthy, true); + ASSERT_TRUE(healthy); hm.remove_worker(h); } @@ -35,10 +35,10 @@ TEST(HeartbeatMap, Unhealth) { HeartbeatMap hm(g_ceph_context); heartbeat_handle_d *h = hm.add_worker("one", pthread_self()); - hm.reset_timeout(h, 1, 3); + hm.reset_timeout(h, ceph::make_timespan(1), ceph::make_timespan(3)); sleep(2); bool healthy = hm.is_healthy(); - ASSERT_EQ(healthy, false); + ASSERT_FALSE(healthy); hm.remove_worker(h); } diff --git a/src/test/journal/RadosTestFixture.cc b/src/test/journal/RadosTestFixture.cc index 64d05eef7034..50e35e871d48 100644 --- a/src/test/journal/RadosTestFixture.cc +++ b/src/test/journal/RadosTestFixture.cc @@ -39,7 +39,8 @@ void RadosTestFixture::SetUp() { ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx)); CephContext* cct = reinterpret_cast(m_ioctx.cct()); - m_work_queue = new ContextWQ("RadosTestFixture::m_work_queue", 60, + m_work_queue = new ContextWQ("RadosTestFixture::m_work_queue", + ceph::make_timespan(60), _thread_pool); m_timer = new SafeTimer(cct, m_timer_lock, true); diff --git a/src/test/msgr/perf_msgr_server.cc b/src/test/msgr/perf_msgr_server.cc index cb025fa49126..4a8e182fc1c0 100644 --- a/src/test/msgr/perf_msgr_server.cc +++ b/src/test/msgr/perf_msgr_server.cc @@ -38,7 +38,7 @@ class ServerDispatcher : public Dispatcher { list messages; public: - OpWQ(time_t timeout, time_t suicide_timeout, ThreadPool *tp) + OpWQ(ceph::timespan timeout, ceph::timespan suicide_timeout, ThreadPool *tp) : ThreadPool::WorkQueue("ServerDispatcher::OpWQ", timeout, suicide_timeout, tp) {} bool _enqueue(Message *m) override { @@ -73,7 +73,7 @@ class ServerDispatcher : public Dispatcher { public: ServerDispatcher(int threads, uint64_t delay): Dispatcher(g_ceph_context), think_time(delay), op_tp(g_ceph_context, "ServerDispatcher::op_tp", "tp_serv_disp", threads, "serverdispatcher_op_threads"), - op_wq(30, 30, &op_tp) { + op_wq(ceph::make_timespan(30), ceph::make_timespan(30), &op_tp) { op_tp.start(); } ~ServerDispatcher() override { -- 2.47.3