]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
common/WorkQueue,librbd,rgw: use ceph::timespan for representing interval
authorKefu Chai <kchai@redhat.com>
Sat, 1 Aug 2020 14:50:25 +0000 (22:50 +0800)
committerKefu Chai <kchai@redhat.com>
Tue, 4 Aug 2020 03:20:49 +0000 (11:20 +0800)
better readability

Signed-off-by: Kefu Chai <kchai@redhat.com>
18 files changed:
src/common/WorkQueue.cc
src/common/WorkQueue.h
src/journal/Journaler.cc
src/librbd/ImageState.cc
src/librbd/Journal.cc
src/mds/MDSRank.cc
src/os/bluestore/BlueStore.cc
src/os/filestore/FileStore.cc
src/os/filestore/FileStore.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/OSDMapMapping.h
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_process.h
src/test/heartbeat_map.cc
src/test/journal/RadosTestFixture.cc
src/test/msgr/perf_msgr_server.cc

index 9e68b28a767028a76fe2280a0c319779b2371a23..f8e84aeae304f7864aa95151f681c000c4c50de4 100644 (file)
@@ -278,8 +278,8 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
       while (pause_threads) {
        cct->get_heartbeat_map()->reset_timeout(
                hb,
-               ceph::make_timespan(wq->timeout_interval),
-               ceph::make_timespan(wq->suicide_interval));
+               wq->timeout_interval,
+               wq->suicide_interval);
        shardedpool_cond.wait_for(
         ul,
         std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
@@ -294,8 +294,8 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
         while (drain_threads) {
          cct->get_heartbeat_map()->reset_timeout(
            hb,
-           ceph::make_timespan(wq->timeout_interval),
-           ceph::make_timespan(wq->suicide_interval));
+           wq->timeout_interval,
+           wq->suicide_interval);
           shardedpool_cond.wait_for(
            ul,
            std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
@@ -306,8 +306,8 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
 
     cct->get_heartbeat_map()->reset_timeout(
       hb,
-      ceph::make_timespan(wq->timeout_interval),
-      ceph::make_timespan(wq->suicide_interval));
+      wq->timeout_interval,
+      wq->suicide_interval);
     wq->_process(thread_index, hb);
 
   }
index 281fececc0a2df8d55fb3d0cdfd1211d6aff0967..533ce84e91967cf5e6b1891b5b5f6aa901938a4b 100644 (file)
@@ -65,8 +65,8 @@ public:
     TPHandle(
       CephContext *cct,
       ceph::heartbeat_handle_d *hb,
-      time_t grace,
-      time_t suicide_grace)
+      ceph::timespan grace,
+      ceph::timespan suicide_grace)
       : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
     void reset_tp_timeout() override final;
     void suspend_tp_timeout() override final;
@@ -76,8 +76,9 @@ protected:
   /// Basic interface to a work queue used by the worker threads.
   struct WorkQueue_ {
     std::string name;
-    time_t timeout_interval, suicide_interval;
-    WorkQueue_(std::string n, time_t ti, time_t sti)
+    ceph::timespan timeout_interval;
+    ceph::timespan suicide_interval;
+    WorkQueue_(std::string n, ceph::timespan ti, ceph::timespan sti)
       : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
     { }
     virtual ~WorkQueue_() {}
@@ -243,7 +244,10 @@ public:
     void _clear() override {}
 
   public:
-    WorkQueueVal(std::string n, time_t ti, time_t sti, ThreadPool *p)
+    WorkQueueVal(std::string n,
+                ceph::timespan ti,
+                ceph::timespan sti,
+                ThreadPool *p)
       : WorkQueue_(std::move(n), ti, sti), pool(p) {
       pool->add_work_queue(this);
     }
@@ -306,7 +310,9 @@ public:
     virtual void _process(T *t, TPHandle &) = 0;
 
   public:
-    WorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
+    WorkQueue(std::string n,
+             ceph::timespan ti, ceph::timespan sti,
+             ThreadPool* p)
       : WorkQueue_(std::move(n), ti, sti), pool(p) {
       pool->add_work_queue(this);
     }
@@ -383,7 +389,9 @@ public:
       return _empty();
     }
   protected:
-    PointerWQ(std::string n, time_t ti, time_t sti, ThreadPool* p)
+    PointerWQ(std::string n,
+             ceph::timespan ti, ceph::timespan sti,
+             ThreadPool* p)
       : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
     }
     void register_work_queue() {
@@ -553,7 +561,7 @@ class GenContextWQ :
   public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
   std::list<GenContext<ThreadPool::TPHandle&>*> _queue;
 public:
-  GenContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
+  GenContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
     : ThreadPool::WorkQueueVal<
       GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
   
@@ -593,8 +601,8 @@ public:
 /// @see Finisher
 class ContextWQ : public ThreadPool::PointerWQ<Context> {
 public:
-  ContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
-    : ThreadPool::PointerWQ<Context>(name, ti, 0, tp) {
+  ContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
+    : ThreadPool::PointerWQ<Context>(name, ti, ceph::timespan::zero(), tp) {
     this->register_work_queue();
   }
 
@@ -654,8 +662,9 @@ public:
   class BaseShardedWQ {
   
   public:
-    time_t timeout_interval, suicide_interval;
-    BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
+    ceph::timespan timeout_interval, suicide_interval;
+    BaseShardedWQ(ceph::timespan ti, ceph::timespan sti)
+      :timeout_interval(ti), suicide_interval(sti) {}
     virtual ~BaseShardedWQ() {}
 
     virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0;
@@ -675,8 +684,9 @@ public:
 
 
   public:
-    ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti), 
-                                                                 sharded_pool(tp) {
+    ShardedWQ(ceph::timespan ti,
+             ceph::timespan sti, ShardedThreadPool* tp)
+      : BaseShardedWQ(ti, sti), sharded_pool(tp) {
       tp->set_wq(this);
     }
     ~ShardedWQ() override {}
index c720ebf323f68d229be85dd7ec950bc39f1b7010..1838d633ac9298247339d7ce3fc552024c4f5cac 100644 (file)
@@ -47,7 +47,9 @@ Journaler::Threads::Threads(CephContext *cct) {
   thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1);
   thread_pool->start();
 
-  work_queue = new ContextWQ("Journaler::work_queue", 60, thread_pool);
+  work_queue = new ContextWQ("Journaler::work_queue",
+                             ceph::make_timespan(60),
+                             thread_pool);
 
   timer = new SafeTimer(cct, timer_lock, true);
   timer->init();
index a7d47276ca898911c33eabef28f9e44439d42af8..1c81471d20fef0a7bee0fec63c647230a65ba499 100644 (file)
@@ -220,8 +220,9 @@ private:
       ThreadPoolSingleton>("librbd::ImageUpdateWatchers::thread_pool",
                           false, m_cct);
     m_work_queue = new ContextWQ("librbd::ImageUpdateWatchers::work_queue",
-                                m_cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout"),
-                                &thread_pool);
+                                 ceph::make_timespan(
+                                   m_cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout")),
+                                 &thread_pool);
   }
 
   void destroy_work_queue() {
index 4c53012104573248ec4a774ef7d3a7001a69b253..11158366be501c913e0e5ef54f4df94ad36b17cc 100644 (file)
@@ -51,7 +51,8 @@ public:
   explicit ThreadPoolSingleton(CephContext *cct)
     : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1),
       work_queue(new ContextWQ("librbd::journal::work_queue",
-                               cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout"),
+                               ceph::make_timespan(
+                                 cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout")),
                                this)) {
     start();
   }
index 9a0e2a5390cea22332942c97161e3545c77f659a..fe51eea00ac4f1675fabc6a1186e6efdc7c88840 100644 (file)
@@ -1319,7 +1319,9 @@ void MDSRank::heartbeat_reset()
   // (by blacklisting us) when we fail to send beacons, and it's simpler to
   // only have one way of dying.
   auto grace = g_conf().get_val<double>("mds_heartbeat_grace");
-  g_ceph_context->get_heartbeat_map()->reset_timeout(hb, grace, 0);
+  g_ceph_context->get_heartbeat_map()->reset_timeout(hb,
+    ceph::make_timespan(grace),
+    ceph::timespan::zero());
 }
 
 bool MDSRank::is_stale_message(const cref_t<Message> &m) const
index 32c2809a67ac05d510b13e6b7a66063b157cb445..e71991d0dde68a42348b5e4fe3a38b2c54f7d812 100644 (file)
@@ -7683,7 +7683,7 @@ public:
                   ceph::mutex* _sb_info_lock,
                   BlueStore::sb_info_map_t& _sb_info,
                   BlueStoreRepairer* _repairer) :
-      WorkQueue_(n, time_t(), time_t()),
+      WorkQueue_(n, ceph::timespan::zero(), ceph::timespan::zero()),
       batchCount(_batchCount),
       store(_store),
       sb_info_lock(_sb_info_lock),
index 92539a9320bd98ea87905a1350ee076fab6512f3..dc304659b1801da526bb3eccc4343397978da6cb 100644 (file)
@@ -572,8 +572,10 @@ FileStore::FileStore(CephContext* cct, const std::string &base,
   m_ondisk_finisher_num(cct->_conf->filestore_ondisk_finisher_threads),
   m_apply_finisher_num(cct->_conf->filestore_apply_finisher_threads),
   op_tp(cct, "FileStore::op_tp", "tp_fstore_op", cct->_conf->filestore_op_threads, "filestore_op_threads"),
-  op_wq(this, cct->_conf->filestore_op_thread_timeout,
-       cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
+  op_wq(this,
+       ceph::make_timespan(cct->_conf->filestore_op_thread_timeout),
+       ceph::make_timespan(cct->_conf->filestore_op_thread_suicide_timeout),
+       &op_tp),
   logger(nullptr),
   trace_endpoint("0.0.0.0", 0, "FileStore"),
   m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
index fb97bf46eb2ebfd520cc2c4266007db77e0abdcc..324dbbe4daf57b511780b1cc3811bf2663156679 100644 (file)
@@ -392,8 +392,13 @@ private:
   ThreadPool op_tp;
   struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {
     FileStore *store;
-    OpWQ(FileStore *fs, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
-      : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", timeout, suicide_timeout, tp), store(fs) {}
+    OpWQ(FileStore *fs,
+        ceph::timespan timeout,
+        ceph::timespan suicide_timeout,
+        ThreadPool *tp)
+      : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ",
+                                          timeout, suicide_timeout, tp),
+       store(fs) {}
 
     bool _enqueue(OpSequencer *osr) override {
       store->op_queue.push_back(osr);
index d9889daf695abb76eebc0ac9dca22cceb306fef1..9907ce7132ffda2581a5314e50c64d8a1519c523 100644 (file)
@@ -2182,8 +2182,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   test_ops_hook(NULL),
   op_shardedwq(
     this,
-    cct->_conf->osd_op_thread_timeout,
-    cct->_conf->osd_op_thread_suicide_timeout,
+    ceph::make_timespan(cct->_conf->osd_op_thread_timeout),
+    ceph::make_timespan(cct->_conf->osd_op_thread_suicide_timeout),
     &osd_op_tp),
   last_pg_create_epoch(0),
   boot_finisher(cct),
index 13c9eacb8706acb8637080c71a229e0332f21c5a..5ebb54eb35b3d3127a0f3d310d02ae5778090777 100644 (file)
@@ -1574,8 +1574,8 @@ protected:
 
   public:
     ShardedOpWQ(OSD *o,
-               time_t ti,
-               time_t si,
+               ceph::timespan ti,
+               ceph::timespan si,
                ShardedThreadPool* tp)
       : ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
         osd(o) {
index 4096c389de455ecc714548559075b699750e67ad..70258ff4e8d69652d2f6c6664f442a8f058e0f8f 100644 (file)
@@ -115,7 +115,10 @@ protected:
     ParallelPGMapper *m;
 
     WQ(ParallelPGMapper *m_, ThreadPool *tp)
-      : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ", 0, 0, tp),
+      : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ",
+                                   ceph::timespan::zero(),
+                                   ceph::timespan::zero(),
+                                   tp),
         m(m_) {}
 
     bool _enqueue(Item *i) override {
index faf0ee6a4dba8a54d81b7028c08601f504561651..a269af767a3d1485e577fa5218d3ee21d5501dcb 100644 (file)
@@ -70,8 +70,10 @@ void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
 RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(CephContext *_cct, int num_threads)
   : cct(_cct), m_tp(cct, "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
     req_throttle(_cct, "rgw_async_rados_ops", num_threads * 2),
-    req_wq(this, g_conf()->rgw_op_thread_timeout,
-    g_conf()->rgw_op_thread_suicide_timeout, &m_tp) {
+    req_wq(this,
+          ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
+          ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
+          &m_tp) {
 }
 
 void RGWAsyncRadosProcessor::start() {
index 0f0d5ba17929bfa0112e7b401e2f4d75a64f2d04..0e8a94154f0e564998bd9aec26ded724ab14748a 100644 (file)
@@ -75,7 +75,9 @@ protected:
 
   struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
     RGWAsyncRadosProcessor *processor;
-    RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+    RGWWQ(RGWAsyncRadosProcessor *p,
+         ceph::timespan timeout, ceph::timespan suicide_timeout,
+         ThreadPool *tp)
       : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
 
     bool _enqueue(RGWAsyncRadosRequest *req) override;
index 124c2bd1f769ca95dd5b0041dc0dd8eed7f6eaad..c3c93409ac4bc0972fe47b2f793dfc3896e5a26a 100644 (file)
@@ -59,7 +59,8 @@ protected:
 
   struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
     RGWProcess* process;
-    RGWWQ(RGWProcess* p, time_t timeout, time_t suicide_timeout, ThreadPool* tp)
+    RGWWQ(RGWProcess* p, ceph::timespan timeout, ceph::timespan suicide_timeout,
+         ThreadPool* tp)
       : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
                                          tp), process(p) {}
 
@@ -101,8 +102,10 @@ public:
       conf(conf),
       sock_fd(-1),
       uri_prefix(pe->uri_prefix),
-      req_wq(this, g_conf()->rgw_op_thread_timeout,
-            g_conf()->rgw_op_thread_suicide_timeout, &m_tp) {
+      req_wq(this,
+            ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
+            ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
+            &m_tp) {
   }
   
   virtual ~RGWProcess() = default;
index 7c98c9082349db55d7bc59ccb641fbe2f7cbd8f4..27f1fe721e3e7c590360fbece242d7be60c82bad 100644 (file)
@@ -24,9 +24,9 @@ TEST(HeartbeatMap, Healthy) {
   HeartbeatMap hm(g_ceph_context);
   heartbeat_handle_d *h = hm.add_worker("one", pthread_self());
 
-  hm.reset_timeout(h, 9, 18);
+  hm.reset_timeout(h, ceph::make_timespan(9), ceph::make_timespan(18));
   bool healthy = hm.is_healthy();
-  ASSERT_EQ(healthy, true);
+  ASSERT_TRUE(healthy);
 
   hm.remove_worker(h);
 }
@@ -35,10 +35,10 @@ TEST(HeartbeatMap, Unhealth) {
   HeartbeatMap hm(g_ceph_context);
   heartbeat_handle_d *h = hm.add_worker("one", pthread_self());
 
-  hm.reset_timeout(h, 1, 3);
+  hm.reset_timeout(h, ceph::make_timespan(1), ceph::make_timespan(3));
   sleep(2);
   bool healthy = hm.is_healthy();
-  ASSERT_EQ(healthy, false);
+  ASSERT_FALSE(healthy);
 
   hm.remove_worker(h);
 }
index 64d05eef703455f82a26f0ce0e8ba8e594fceb9a..50e35e871d482280246e4206860b37f7e9f2ee75 100644 (file)
@@ -39,7 +39,8 @@ void RadosTestFixture::SetUp() {
   ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx));
 
   CephContext* cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
-  m_work_queue = new ContextWQ("RadosTestFixture::m_work_queue", 60,
+  m_work_queue = new ContextWQ("RadosTestFixture::m_work_queue",
+                               ceph::make_timespan(60),
                                _thread_pool);
 
   m_timer = new SafeTimer(cct, m_timer_lock, true);
index cb025fa4912604c78cb7cfa4e7b5e336c1a96c41..4a8e182fc1c08ff26a928ef47f68d0d0b3078c20 100644 (file)
@@ -38,7 +38,7 @@ class ServerDispatcher : public Dispatcher {
     list<Message*> messages;
 
    public:
-    OpWQ(time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+    OpWQ(ceph::timespan timeout, ceph::timespan suicide_timeout, ThreadPool *tp)
       : ThreadPool::WorkQueue<Message>("ServerDispatcher::OpWQ", timeout, suicide_timeout, tp) {}
 
     bool _enqueue(Message *m) override {
@@ -73,7 +73,7 @@ class ServerDispatcher : public Dispatcher {
  public:
   ServerDispatcher(int threads, uint64_t delay): Dispatcher(g_ceph_context), think_time(delay),
     op_tp(g_ceph_context, "ServerDispatcher::op_tp", "tp_serv_disp", threads, "serverdispatcher_op_threads"),
-    op_wq(30, 30, &op_tp) {
+    op_wq(ceph::make_timespan(30), ceph::make_timespan(30), &op_tp) {
     op_tp.start();
   }
   ~ServerDispatcher() override {