}
bool HeartbeatMap::_check(const heartbeat_handle_d *h, const char *who,
- ceph::coarse_mono_clock::rep now)
+ ceph::coarse_mono_time now)
{
bool healthy = true;
- auto was = h->timeout.load();
- if (was && was < now) {
+ if (auto was = h->timeout.load();
+ !clock::is_zero(was) && was < now) {
ldout(m_cct, 1) << who << " '" << h->name << "'"
<< " had timed out after " << h->grace << dendl;
healthy = false;
}
- was = h->suicide_timeout;
- if (was && was < now) {
+ if (auto was = h->suicide_timeout.load();
+ !clock::is_zero(was) && was < now) {
ldout(m_cct, 1) << who << " '" << h->name << "'"
<< " had suicide timed out after " << h->suicide_grace << dendl;
pthread_kill(h->thread_id, SIGABRT);
}
void HeartbeatMap::reset_timeout(heartbeat_handle_d *h,
- ceph::coarse_mono_clock::rep grace,
- ceph::coarse_mono_clock::rep suicide_grace)
+ ceph::timespan grace,
+ ceph::timespan suicide_grace)
{
ldout(m_cct, 20) << "reset_timeout '" << h->name << "' grace " << grace
<< " suicide " << suicide_grace << dendl;
- auto now = duration_cast<seconds>(coarse_mono_clock::now()
- .time_since_epoch()).count();
+ const auto now = clock::now();
_check(h, "reset_timeout", now);
h->timeout = now + grace;
h->grace = grace;
- if (suicide_grace)
+ if (suicide_grace > ceph::timespan::zero()) {
h->suicide_timeout = now + suicide_grace;
- else
- h->suicide_timeout = 0;
+ } else {
+ h->suicide_timeout = clock::zero();
+ }
h->suicide_grace = suicide_grace;
}
void HeartbeatMap::clear_timeout(heartbeat_handle_d *h)
{
ldout(m_cct, 20) << "clear_timeout '" << h->name << "'" << dendl;
- auto now = duration_cast<seconds>(coarse_mono_clock::now()
- .time_since_epoch()).count();
+ auto now = clock::now();
_check(h, "clear_timeout", now);
- h->timeout = 0;
- h->suicide_timeout = 0;
+ h->timeout = clock::zero();
+ h->suicide_timeout = clock::zero();
}
bool HeartbeatMap::is_healthy()
p != m_workers.end();
++p) {
heartbeat_handle_d *h = *p;
- auto epoch = duration_cast<seconds>(now.time_since_epoch()).count();
- if (!_check(h, "is_healthy", epoch)) {
+ if (!_check(h, "is_healthy", now)) {
healthy = false;
unhealthy++;
}
struct heartbeat_handle_d {
const std::string name;
- pthread_t thread_id;
- // TODO: use atomic<time_point>, once we can ditch GCC 4.8
- std::atomic<unsigned> timeout = { 0 }, suicide_timeout = { 0 };
- time_t grace, suicide_grace;
+ pthread_t thread_id = 0;
+ using clock = ceph::coarse_mono_clock;
+ using time = ceph::coarse_mono_time;
+ std::atomic<time> timeout = clock::zero();
+ std::atomic<time> suicide_timeout = clock::zero();
+ ceph::timespan grace = ceph::timespan::zero();
+ ceph::timespan suicide_grace = ceph::timespan::zero();
std::list<heartbeat_handle_d*>::iterator list_item;
explicit heartbeat_handle_d(const std::string& n)
- : name(n), thread_id(0), grace(0), suicide_grace(0)
+ : name(n)
{ }
};
// reset the timeout so that it expects another touch within grace amount of time
void reset_timeout(heartbeat_handle_d *h,
- ceph::coarse_mono_clock::rep grace,
- ceph::coarse_mono_clock::rep suicide_grace);
+ ceph::timespan grace,
+ ceph::timespan suicide_grace);
// clear the timeout so that it's not checked on
void clear_timeout(heartbeat_handle_d *h);
~HeartbeatMap();
private:
+ using clock = ceph::coarse_mono_clock;
CephContext *m_cct;
ceph::shared_mutex m_rwlock =
ceph::make_shared_mutex("HeartbeatMap::m_rwlock");
- ceph::coarse_mono_clock::time_point m_inject_unhealthy_until;
+ clock::time_point m_inject_unhealthy_until;
std::list<heartbeat_handle_d*> m_workers;
std::atomic<unsigned> m_unhealthy_workers = { 0 };
std::atomic<unsigned> m_total_workers = { 0 };
bool _check(const heartbeat_handle_d *h, const char *who,
- ceph::coarse_mono_clock::rep now);
+ ceph::coarse_mono_time now);
};
}
ldout(cct,20) << "worker waiting" << dendl;
cct->get_heartbeat_map()->reset_timeout(
hb,
- cct->_conf->threadpool_default_timeout,
- 0);
+ ceph::make_timespan(cct->_conf->threadpool_default_timeout),
+ ceph::make_timespan(0));
auto wait = std::chrono::seconds(
cct->_conf->threadpool_empty_queue_max_wait);
_cond.wait_for(ul, wait);
while (pause_threads) {
cct->get_heartbeat_map()->reset_timeout(
hb,
- wq->timeout_interval, wq->suicide_interval);
+ ceph::make_timespan(wq->timeout_interval),
+ ceph::make_timespan(wq->suicide_interval));
shardedpool_cond.wait_for(
ul,
std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
while (drain_threads) {
cct->get_heartbeat_map()->reset_timeout(
hb,
- wq->timeout_interval, wq->suicide_interval);
+ ceph::make_timespan(wq->timeout_interval),
+ ceph::make_timespan(wq->suicide_interval));
shardedpool_cond.wait_for(
ul,
std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
cct->get_heartbeat_map()->reset_timeout(
hb,
- wq->timeout_interval, wq->suicide_interval);
+ ceph::make_timespan(wq->timeout_interval),
+ ceph::make_timespan(wq->suicide_interval));
wq->_process(thread_index, hb);
}
friend class ThreadPool;
CephContext *cct;
ceph::heartbeat_handle_d *hb;
- ceph::coarse_mono_clock::rep grace;
- ceph::coarse_mono_clock::rep suicide_grace;
+ ceph::timespan grace;
+ ceph::timespan suicide_grace;
public:
TPHandle(
CephContext *cct,