while (pause_threads) {
cct->get_heartbeat_map()->reset_timeout(
hb,
- ceph::make_timespan(wq->timeout_interval),
- ceph::make_timespan(wq->suicide_interval));
+ wq->timeout_interval,
+ wq->suicide_interval);
shardedpool_cond.wait_for(
ul,
std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
while (drain_threads) {
cct->get_heartbeat_map()->reset_timeout(
hb,
- ceph::make_timespan(wq->timeout_interval),
- ceph::make_timespan(wq->suicide_interval));
+ wq->timeout_interval,
+ wq->suicide_interval);
shardedpool_cond.wait_for(
ul,
std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
cct->get_heartbeat_map()->reset_timeout(
hb,
- ceph::make_timespan(wq->timeout_interval),
- ceph::make_timespan(wq->suicide_interval));
+ wq->timeout_interval,
+ wq->suicide_interval);
wq->_process(thread_index, hb);
}
TPHandle(
CephContext *cct,
ceph::heartbeat_handle_d *hb,
- time_t grace,
- time_t suicide_grace)
+ ceph::timespan grace,
+ ceph::timespan suicide_grace)
: cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
void reset_tp_timeout() override final;
void suspend_tp_timeout() override final;
/// Basic interface to a work queue used by the worker threads.
struct WorkQueue_ {
std::string name;
- time_t timeout_interval, suicide_interval;
- WorkQueue_(std::string n, time_t ti, time_t sti)
+ ceph::timespan timeout_interval;
+ ceph::timespan suicide_interval;
+ WorkQueue_(std::string n, ceph::timespan ti, ceph::timespan sti)
: name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
{ }
virtual ~WorkQueue_() {}
void _clear() override {}
public:
- WorkQueueVal(std::string n, time_t ti, time_t sti, ThreadPool *p)
+ WorkQueueVal(std::string n,
+ ceph::timespan ti,
+ ceph::timespan sti,
+ ThreadPool *p)
: WorkQueue_(std::move(n), ti, sti), pool(p) {
pool->add_work_queue(this);
}
virtual void _process(T *t, TPHandle &) = 0;
public:
- WorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
+ WorkQueue(std::string n,
+ ceph::timespan ti, ceph::timespan sti,
+ ThreadPool* p)
: WorkQueue_(std::move(n), ti, sti), pool(p) {
pool->add_work_queue(this);
}
return _empty();
}
protected:
- PointerWQ(std::string n, time_t ti, time_t sti, ThreadPool* p)
+ PointerWQ(std::string n,
+ ceph::timespan ti, ceph::timespan sti,
+ ThreadPool* p)
: WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
}
void register_work_queue() {
public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
std::list<GenContext<ThreadPool::TPHandle&>*> _queue;
public:
- GenContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
+ GenContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
: ThreadPool::WorkQueueVal<
GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
/// @see Finisher
class ContextWQ : public ThreadPool::PointerWQ<Context> {
public:
- ContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
- : ThreadPool::PointerWQ<Context>(name, ti, 0, tp) {
+ ContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
+ : ThreadPool::PointerWQ<Context>(name, ti, ceph::timespan::zero(), tp) {
this->register_work_queue();
}
class BaseShardedWQ {
public:
- time_t timeout_interval, suicide_interval;
- BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
+ ceph::timespan timeout_interval, suicide_interval;
+ BaseShardedWQ(ceph::timespan ti, ceph::timespan sti)
+ :timeout_interval(ti), suicide_interval(sti) {}
virtual ~BaseShardedWQ() {}
virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0;
public:
- ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti),
- sharded_pool(tp) {
+ ShardedWQ(ceph::timespan ti,
+ ceph::timespan sti, ShardedThreadPool* tp)
+ : BaseShardedWQ(ti, sti), sharded_pool(tp) {
tp->set_wq(this);
}
~ShardedWQ() override {}
thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1);
thread_pool->start();
- work_queue = new ContextWQ("Journaler::work_queue", 60, thread_pool);
+ work_queue = new ContextWQ("Journaler::work_queue",
+ ceph::make_timespan(60),
+ thread_pool);
timer = new SafeTimer(cct, timer_lock, true);
timer->init();
ThreadPoolSingleton>("librbd::ImageUpdateWatchers::thread_pool",
false, m_cct);
m_work_queue = new ContextWQ("librbd::ImageUpdateWatchers::work_queue",
- m_cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout"),
- &thread_pool);
+ ceph::make_timespan(
+ m_cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout")),
+ &thread_pool);
}
void destroy_work_queue() {
explicit ThreadPoolSingleton(CephContext *cct)
: ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1),
work_queue(new ContextWQ("librbd::journal::work_queue",
- cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout"),
+ ceph::make_timespan(
+ cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout")),
this)) {
start();
}
// (by blacklisting us) when we fail to send beacons, and it's simpler to
// only have one way of dying.
auto grace = g_conf().get_val<double>("mds_heartbeat_grace");
- g_ceph_context->get_heartbeat_map()->reset_timeout(hb, grace, 0);
+ g_ceph_context->get_heartbeat_map()->reset_timeout(hb,
+ ceph::make_timespan(grace),
+ ceph::timespan::zero());
}
bool MDSRank::is_stale_message(const cref_t<Message> &m) const
ceph::mutex* _sb_info_lock,
BlueStore::sb_info_map_t& _sb_info,
BlueStoreRepairer* _repairer) :
- WorkQueue_(n, time_t(), time_t()),
+ WorkQueue_(n, ceph::timespan::zero(), ceph::timespan::zero()),
batchCount(_batchCount),
store(_store),
sb_info_lock(_sb_info_lock),
m_ondisk_finisher_num(cct->_conf->filestore_ondisk_finisher_threads),
m_apply_finisher_num(cct->_conf->filestore_apply_finisher_threads),
op_tp(cct, "FileStore::op_tp", "tp_fstore_op", cct->_conf->filestore_op_threads, "filestore_op_threads"),
- op_wq(this, cct->_conf->filestore_op_thread_timeout,
- cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
+ op_wq(this,
+ ceph::make_timespan(cct->_conf->filestore_op_thread_timeout),
+ ceph::make_timespan(cct->_conf->filestore_op_thread_suicide_timeout),
+ &op_tp),
logger(nullptr),
trace_endpoint("0.0.0.0", 0, "FileStore"),
m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
ThreadPool op_tp;
struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {
FileStore *store;
- OpWQ(FileStore *fs, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
- : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", timeout, suicide_timeout, tp), store(fs) {}
+ OpWQ(FileStore *fs,
+ ceph::timespan timeout,
+ ceph::timespan suicide_timeout,
+ ThreadPool *tp)
+ : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ",
+ timeout, suicide_timeout, tp),
+ store(fs) {}
bool _enqueue(OpSequencer *osr) override {
store->op_queue.push_back(osr);
test_ops_hook(NULL),
op_shardedwq(
this,
- cct->_conf->osd_op_thread_timeout,
- cct->_conf->osd_op_thread_suicide_timeout,
+ ceph::make_timespan(cct->_conf->osd_op_thread_timeout),
+ ceph::make_timespan(cct->_conf->osd_op_thread_suicide_timeout),
&osd_op_tp),
last_pg_create_epoch(0),
boot_finisher(cct),
public:
ShardedOpWQ(OSD *o,
- time_t ti,
- time_t si,
+ ceph::timespan ti,
+ ceph::timespan si,
ShardedThreadPool* tp)
: ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
osd(o) {
ParallelPGMapper *m;
WQ(ParallelPGMapper *m_, ThreadPool *tp)
- : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ", 0, 0, tp),
+ : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ",
+ ceph::timespan::zero(),
+ ceph::timespan::zero(),
+ tp),
m(m_) {}
bool _enqueue(Item *i) override {
RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(CephContext *_cct, int num_threads)
: cct(_cct), m_tp(cct, "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
req_throttle(_cct, "rgw_async_rados_ops", num_threads * 2),
- req_wq(this, g_conf()->rgw_op_thread_timeout,
- g_conf()->rgw_op_thread_suicide_timeout, &m_tp) {
+ req_wq(this,
+ ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
+ ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
+ &m_tp) {
}
void RGWAsyncRadosProcessor::start() {
struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
RGWAsyncRadosProcessor *processor;
- RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+ RGWWQ(RGWAsyncRadosProcessor *p,
+ ceph::timespan timeout, ceph::timespan suicide_timeout,
+ ThreadPool *tp)
: ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
bool _enqueue(RGWAsyncRadosRequest *req) override;
struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
RGWProcess* process;
- RGWWQ(RGWProcess* p, time_t timeout, time_t suicide_timeout, ThreadPool* tp)
+ RGWWQ(RGWProcess* p, ceph::timespan timeout, ceph::timespan suicide_timeout,
+ ThreadPool* tp)
: ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
tp), process(p) {}
conf(conf),
sock_fd(-1),
uri_prefix(pe->uri_prefix),
- req_wq(this, g_conf()->rgw_op_thread_timeout,
- g_conf()->rgw_op_thread_suicide_timeout, &m_tp) {
+ req_wq(this,
+ ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
+ ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
+ &m_tp) {
}
virtual ~RGWProcess() = default;
HeartbeatMap hm(g_ceph_context);
heartbeat_handle_d *h = hm.add_worker("one", pthread_self());
- hm.reset_timeout(h, 9, 18);
+ hm.reset_timeout(h, ceph::make_timespan(9), ceph::make_timespan(18));
bool healthy = hm.is_healthy();
- ASSERT_EQ(healthy, true);
+ ASSERT_TRUE(healthy);
hm.remove_worker(h);
}
HeartbeatMap hm(g_ceph_context);
heartbeat_handle_d *h = hm.add_worker("one", pthread_self());
- hm.reset_timeout(h, 1, 3);
+ hm.reset_timeout(h, ceph::make_timespan(1), ceph::make_timespan(3));
sleep(2);
bool healthy = hm.is_healthy();
- ASSERT_EQ(healthy, false);
+ ASSERT_FALSE(healthy);
hm.remove_worker(h);
}
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx));
CephContext* cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
- m_work_queue = new ContextWQ("RadosTestFixture::m_work_queue", 60,
+ m_work_queue = new ContextWQ("RadosTestFixture::m_work_queue",
+ ceph::make_timespan(60),
_thread_pool);
m_timer = new SafeTimer(cct, m_timer_lock, true);
list<Message*> messages;
public:
- OpWQ(time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+ OpWQ(ceph::timespan timeout, ceph::timespan suicide_timeout, ThreadPool *tp)
: ThreadPool::WorkQueue<Message>("ServerDispatcher::OpWQ", timeout, suicide_timeout, tp) {}
bool _enqueue(Message *m) override {
public:
ServerDispatcher(int threads, uint64_t delay): Dispatcher(g_ceph_context), think_time(delay),
op_tp(g_ceph_context, "ServerDispatcher::op_tp", "tp_serv_disp", threads, "serverdispatcher_op_threads"),
- op_wq(30, 30, &op_tp) {
+ op_wq(ceph::make_timespan(30), ceph::make_timespan(30), &op_tp) {
op_tp.start();
}
~ServerDispatcher() override {