From a5fc29b95281c6ca58c9177c665c379846beb4b3 Mon Sep 17 00:00:00 2001 From: Patrick Donnelly Date: Fri, 17 Aug 2018 15:03:56 -0700 Subject: [PATCH] mds: use monotonic waits in Beacon This guarantees that the sender thread cannot be disrupted by system clock changes. This commit also simplifies the sender thread by manually managing the thread and avoiding unnecessary context creation. Fixes: http://tracker.ceph.com/issues/26962 Signed-off-by: Patrick Donnelly --- src/mds/Beacon.cc | 107 +++++++++++++++++++++++++--------------------- src/mds/Beacon.h | 25 +++++------ 2 files changed, 69 insertions(+), 63 deletions(-) diff --git a/src/mds/Beacon.cc b/src/mds/Beacon.cc index 36c0161f648c4..8d42cf9541629 100644 --- a/src/mds/Beacon.cc +++ b/src/mds/Beacon.cc @@ -27,21 +27,42 @@ #include "Beacon.h" +#include + #define dout_context g_ceph_context #define dout_subsys ceph_subsys_mds #undef dout_prefix #define dout_prefix *_dout << "mds.beacon." << name << ' ' +using namespace std::chrono_literals; + +Beacon::Beacon(CephContext *cct, MonClient *monc, std::string_view name) + : + Dispatcher(cct), + beacon_interval(g_conf()->mds_beacon_interval), + monc(monc), + name(name) +{ +} + +Beacon::~Beacon() +{ + shutdown(); +} -Beacon::Beacon(CephContext *cct_, MonClient *monc_, std::string_view name_) : - Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock), - name(name_) +void Beacon::shutdown() { + std::unique_lock lock(mutex); + if (!finished) { + finished = true; + lock.unlock(); + sender.join(); + } } void Beacon::init(const MDSMap &mdsmap) { - Mutex::Locker l(lock); + std::unique_lock lock(mutex); _notify_mdsmap(mdsmap); standby_for_rank = mds_rank_t(g_conf()->mds_standby_for_rank); @@ -49,20 +70,22 @@ void Beacon::init(const MDSMap &mdsmap) standby_for_fscid = fs_cluster_id_t(g_conf()->mds_standby_for_fscid); standby_replay = g_conf()->mds_standby_replay; - // Spawn threads and start messaging - timer.init(); - _send(); -} - - -void Beacon::shutdown() -{ - Mutex::Locker l(lock); - if (sender) { - timer.cancel_event(sender); - sender = NULL; - } - timer.shutdown(); + sender = std::thread([this]() { + std::unique_lock lock(mutex); + std::condition_variable c; // no one wakes us + while (!finished) { + auto now = clock::now(); + auto since = std::chrono::duration(now-last_send).count(); + auto interval = beacon_interval; + if (since >= interval*.90) { + _send(); + } else { + interval -= since; + } + dout(20) << "sender thread waiting interval " << interval << "s" << dendl; + c.wait_for(lock, interval*1s); + } + }); } bool Beacon::ms_can_fast_dispatch2(const Message::const_ref& m) const @@ -96,7 +119,7 @@ bool Beacon::ms_dispatch2(const Message::ref& m) */ void Beacon::handle_mds_beacon(const MMDSBeacon::const_ref &m) { - Mutex::Locker l(lock); + std::unique_lock lock(mutex); version_t seq = m->get_seq(); @@ -120,9 +143,7 @@ void Beacon::handle_mds_beacon(const MMDSBeacon::const_ref &m) seq_stamp.erase(seq_stamp.begin(), ++it); // Wake a waiter up if present - if (awaiting_seq == seq) { - waiting_cond.Signal(); - } + cvar.notify_all(); } else { dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state()) << " seq " << m->get_seq() << " dne" << dendl; @@ -132,28 +153,26 @@ void Beacon::handle_mds_beacon(const MMDSBeacon::const_ref &m) void Beacon::send() { - Mutex::Locker l(lock); + std::unique_lock lock(mutex); _send(); } void Beacon::send_and_wait(const double duration) { - Mutex::Locker l(lock); + std::unique_lock lock(mutex); _send(); - awaiting_seq = last_seq; + auto awaiting_seq = last_seq; dout(20) << __func__ << ": awaiting " << awaiting_seq << " for up to " << duration << "s" << dendl; - utime_t timeout; - timeout.set_from_double(ceph_clock_now()+duration); - while (!seq_stamp.empty() - && seq_stamp.begin()->first <= awaiting_seq - && ceph_clock_now() < timeout) { - waiting_cond.WaitUntil(lock, timeout); + auto start = clock::now(); + while (!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) { + auto now = clock::now(); + auto s = duration*.95-std::chrono::duration(now-start).count(); + if (s < 0) break; + cvar.wait_for(lock, s*1s); } - - awaiting_seq = -1; } @@ -162,17 +181,6 @@ void Beacon::send_and_wait(const double duration) */ void Beacon::_send() { - if (sender) { - timer.cancel_event(sender); - } - sender = timer.add_event_after( - g_conf()->mds_beacon_interval, - new FunctionContext([this](int) { - assert(lock.is_locked_by_me()); - sender = nullptr; - _send(); - })); - auto now = clock::now(); auto since = std::chrono::duration(now-last_acked_stamp).count(); @@ -212,6 +220,7 @@ void Beacon::_send() beacon->set_sys_info(sys_info); } monc->send_mon_message(beacon.detach()); + last_send = now; } /** @@ -219,7 +228,7 @@ void Beacon::_send() */ void Beacon::notify_mdsmap(const MDSMap &mdsmap) { - Mutex::Locker l(lock); + std::unique_lock lock(mutex); _notify_mdsmap(mdsmap); } @@ -238,7 +247,7 @@ void Beacon::_notify_mdsmap(const MDSMap &mdsmap) bool Beacon::is_laggy() { - Mutex::Locker l(lock); + std::unique_lock lock(mutex); auto now = clock::now(); auto since = std::chrono::duration(now-last_acked_stamp).count(); @@ -261,7 +270,7 @@ bool Beacon::is_laggy() void Beacon::set_want_state(const MDSMap &mdsmap, MDSMap::DaemonState const newstate) { - Mutex::Locker l(lock); + std::unique_lock lock(mutex); // Update mdsmap epoch atomically with updating want_state, so that when // we send a beacon with the new want state it has the latest epoch, and @@ -286,7 +295,7 @@ void Beacon::set_want_state(const MDSMap &mdsmap, MDSMap::DaemonState const news */ void Beacon::notify_health(MDSRank const *mds) { - Mutex::Locker l(lock); + std::unique_lock lock(mutex); if (!mds) { // No MDS rank held return; @@ -489,7 +498,7 @@ void Beacon::notify_health(MDSRank const *mds) MDSMap::DaemonState Beacon::get_want_state() const { - Mutex::Locker l(lock); + std::unique_lock lock(mutex); return want_state; } diff --git a/src/mds/Beacon.h b/src/mds/Beacon.h index 0a278e3de73cd..a06df4cbe68d8 100644 --- a/src/mds/Beacon.h +++ b/src/mds/Beacon.h @@ -16,11 +16,12 @@ #ifndef BEACON_STATE_H #define BEACON_STATE_H +#include #include +#include #include "include/types.h" #include "include/Context.h" -#include "common/Mutex.h" #include "msg/Dispatcher.h" #include "messages/MMDSBeacon.h" @@ -44,8 +45,8 @@ public: using clock = ceph::coarse_mono_clock; using time = ceph::coarse_mono_time; - Beacon(CephContext *cct_, MonClient *monc_, std::string_view name); - ~Beacon() override {}; + Beacon(CephContext *cct, MonClient *monc, std::string_view name); + ~Beacon() override; void init(const MDSMap &mdsmap); void shutdown(); @@ -77,7 +78,7 @@ public: bool is_laggy(); double last_cleared_laggy() const { - Mutex::Locker l(lock); + std::unique_lock lock(mutex); return std::chrono::duration(clock::now()-last_laggy).count(); } @@ -85,10 +86,13 @@ private: void _notify_mdsmap(const MDSMap &mdsmap); void _send(); - //CephContext *cct; - mutable Mutex lock; + mutable std::mutex mutex; + std::thread sender; + std::condition_variable cvar; + time last_send = clock::zero(); + double beacon_interval = 5.0; + bool finished = false; MonClient* monc; - SafeTimer timer; // Items we duplicate from the MDS to have access under our own lock std::string name; @@ -110,13 +114,6 @@ private: // Health status to be copied into each beacon message MDSHealth health; - - // Ticker - Context *sender = nullptr; - - version_t awaiting_seq = -1; - Cond waiting_cond; }; #endif // BEACON_STATE_H - -- 2.39.5