#include "Beacon.h"
+#include <chrono>
+
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mds
#undef dout_prefix
#define dout_prefix *_dout << "mds.beacon." << name << ' '
+using namespace std::chrono_literals;
+
+Beacon::Beacon(CephContext *cct, MonClient *monc, std::string_view name)
+ :
+ Dispatcher(cct),
+ beacon_interval(g_conf()->mds_beacon_interval),
+ monc(monc),
+ name(name)
+{
+}
+
+Beacon::~Beacon()
+{
+ shutdown();
+}
-Beacon::Beacon(CephContext *cct_, MonClient *monc_, std::string_view name_) :
- Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
- name(name_)
+void Beacon::shutdown()
{
+ std::unique_lock<std::mutex> lock(mutex);
+ if (!finished) {
+ finished = true;
+ lock.unlock();
+ sender.join();
+ }
}
void Beacon::init(const MDSMap &mdsmap)
{
- Mutex::Locker l(lock);
+ std::unique_lock lock(mutex);
_notify_mdsmap(mdsmap);
standby_for_rank = mds_rank_t(g_conf()->mds_standby_for_rank);
standby_for_fscid = fs_cluster_id_t(g_conf()->mds_standby_for_fscid);
standby_replay = g_conf()->mds_standby_replay;
- // Spawn threads and start messaging
- timer.init();
- _send();
-}
-
-
-void Beacon::shutdown()
-{
- Mutex::Locker l(lock);
- if (sender) {
- timer.cancel_event(sender);
- sender = NULL;
- }
- timer.shutdown();
+ sender = std::thread([this]() {
+ std::unique_lock<std::mutex> lock(mutex);
+ std::condition_variable c; // no one wakes us
+ while (!finished) {
+ auto now = clock::now();
+ auto since = std::chrono::duration<double>(now-last_send).count();
+ auto interval = beacon_interval;
+ if (since >= interval*.90) {
+ _send();
+ } else {
+ interval -= since;
+ }
+ dout(20) << "sender thread waiting interval " << interval << "s" << dendl;
+ c.wait_for(lock, interval*1s);
+ }
+ });
}
bool Beacon::ms_can_fast_dispatch2(const Message::const_ref& m) const
*/
void Beacon::handle_mds_beacon(const MMDSBeacon::const_ref &m)
{
- Mutex::Locker l(lock);
+ std::unique_lock lock(mutex);
version_t seq = m->get_seq();
seq_stamp.erase(seq_stamp.begin(), ++it);
// Wake a waiter up if present
- if (awaiting_seq == seq) {
- waiting_cond.Signal();
- }
+ cvar.notify_all();
} else {
dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state())
<< " seq " << m->get_seq() << " dne" << dendl;
void Beacon::send()
{
- Mutex::Locker l(lock);
+ std::unique_lock lock(mutex);
_send();
}
void Beacon::send_and_wait(const double duration)
{
- Mutex::Locker l(lock);
+ std::unique_lock lock(mutex);
_send();
- awaiting_seq = last_seq;
+ auto awaiting_seq = last_seq;
dout(20) << __func__ << ": awaiting " << awaiting_seq
<< " for up to " << duration << "s" << dendl;
- utime_t timeout;
- timeout.set_from_double(ceph_clock_now()+duration);
- while (!seq_stamp.empty()
- && seq_stamp.begin()->first <= awaiting_seq
- && ceph_clock_now() < timeout) {
- waiting_cond.WaitUntil(lock, timeout);
+ auto start = clock::now();
+ while (!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) {
+ auto now = clock::now();
+ auto s = duration*.95-std::chrono::duration<double>(now-start).count();
+ if (s < 0) break;
+ cvar.wait_for(lock, s*1s);
}
-
- awaiting_seq = -1;
}
*/
void Beacon::_send()
{
- if (sender) {
- timer.cancel_event(sender);
- }
- sender = timer.add_event_after(
- g_conf()->mds_beacon_interval,
- new FunctionContext([this](int) {
- assert(lock.is_locked_by_me());
- sender = nullptr;
- _send();
- }));
-
auto now = clock::now();
auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
beacon->set_sys_info(sys_info);
}
monc->send_mon_message(beacon.detach());
+ last_send = now;
}
/**
*/
void Beacon::notify_mdsmap(const MDSMap &mdsmap)
{
- Mutex::Locker l(lock);
+ std::unique_lock lock(mutex);
_notify_mdsmap(mdsmap);
}
bool Beacon::is_laggy()
{
- Mutex::Locker l(lock);
+ std::unique_lock lock(mutex);
auto now = clock::now();
auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
void Beacon::set_want_state(const MDSMap &mdsmap, MDSMap::DaemonState const newstate)
{
- Mutex::Locker l(lock);
+ std::unique_lock lock(mutex);
// Update mdsmap epoch atomically with updating want_state, so that when
// we send a beacon with the new want state it has the latest epoch, and
*/
void Beacon::notify_health(MDSRank const *mds)
{
- Mutex::Locker l(lock);
+ std::unique_lock lock(mutex);
if (!mds) {
// No MDS rank held
return;
MDSMap::DaemonState Beacon::get_want_state() const
{
- Mutex::Locker l(lock);
+ std::unique_lock lock(mutex);
return want_state;
}
#ifndef BEACON_STATE_H
#define BEACON_STATE_H
+#include <mutex>
#include <string_view>
+#include <thread>
#include "include/types.h"
#include "include/Context.h"
-#include "common/Mutex.h"
#include "msg/Dispatcher.h"
#include "messages/MMDSBeacon.h"
using clock = ceph::coarse_mono_clock;
using time = ceph::coarse_mono_time;
- Beacon(CephContext *cct_, MonClient *monc_, std::string_view name);
- ~Beacon() override {};
+ Beacon(CephContext *cct, MonClient *monc, std::string_view name);
+ ~Beacon() override;
void init(const MDSMap &mdsmap);
void shutdown();
bool is_laggy();
double last_cleared_laggy() const {
- Mutex::Locker l(lock);
+ std::unique_lock lock(mutex);
return std::chrono::duration<double>(clock::now()-last_laggy).count();
}
void _notify_mdsmap(const MDSMap &mdsmap);
void _send();
- //CephContext *cct;
- mutable Mutex lock;
+ mutable std::mutex mutex;
+ std::thread sender;
+ std::condition_variable cvar;
+ time last_send = clock::zero();
+ double beacon_interval = 5.0;
+ bool finished = false;
MonClient* monc;
- SafeTimer timer;
// Items we duplicate from the MDS to have access under our own lock
std::string name;
// Health status to be copied into each beacon message
MDSHealth health;
-
- // Ticker
- Context *sender = nullptr;
-
- version_t awaiting_seq = -1;
- Cond waiting_cond;
};
#endif // BEACON_STATE_H
-