]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: use monotonic waits in Beacon 23640/head
authorPatrick Donnelly <pdonnell@redhat.com>
Fri, 17 Aug 2018 22:03:56 +0000 (15:03 -0700)
committerPatrick Donnelly <pdonnell@redhat.com>
Tue, 21 Aug 2018 21:42:05 +0000 (14:42 -0700)
This guarantees that the sender thread cannot be disrupted by system clock
changes. This commit also simplifies the sender thread by manually managing the
thread and avoiding unnecessary context creation.

Fixes: http://tracker.ceph.com/issues/26962
Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
src/mds/Beacon.cc
src/mds/Beacon.h

index 36c0161f648c4c7078b1f6d0f16e2a22d703a390..8d42cf9541629df790c6c3a3625dd523f51dee78 100644 (file)
 
 #include "Beacon.h"
 
+#include <chrono>
+
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mds
 #undef dout_prefix
 #define dout_prefix *_dout << "mds.beacon." << name << ' '
 
+using namespace std::chrono_literals;
+
+Beacon::Beacon(CephContext *cct, MonClient *monc, std::string_view name)
+  :
+    Dispatcher(cct),
+    beacon_interval(g_conf()->mds_beacon_interval),
+    monc(monc),
+    name(name)
+{
+}
+
+Beacon::~Beacon()
+{
+  shutdown();
+}
 
-Beacon::Beacon(CephContext *cct_, MonClient *monc_, std::string_view name_) :
-  Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
-  name(name_)
+void Beacon::shutdown()
 {
+  std::unique_lock<std::mutex> lock(mutex);
+  if (!finished) {
+    finished = true;
+    lock.unlock();
+    sender.join();
+  }
 }
 
 void Beacon::init(const MDSMap &mdsmap)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
 
   _notify_mdsmap(mdsmap);
   standby_for_rank = mds_rank_t(g_conf()->mds_standby_for_rank);
@@ -49,20 +70,22 @@ void Beacon::init(const MDSMap &mdsmap)
   standby_for_fscid = fs_cluster_id_t(g_conf()->mds_standby_for_fscid);
   standby_replay = g_conf()->mds_standby_replay;
 
-  // Spawn threads and start messaging
-  timer.init();
-  _send();
-}
-
-
-void Beacon::shutdown()
-{
-  Mutex::Locker l(lock);
-  if (sender) {
-    timer.cancel_event(sender);
-    sender = NULL;
-  }
-  timer.shutdown();
+  sender = std::thread([this]() {
+    std::unique_lock<std::mutex> lock(mutex);
+    std::condition_variable c; // no one wakes us
+    while (!finished) {
+      auto now = clock::now();
+      auto since = std::chrono::duration<double>(now-last_send).count();
+      auto interval = beacon_interval;
+      if (since >= interval*.90) {
+        _send();
+      } else {
+        interval -= since;
+      }
+      dout(20) << "sender thread waiting interval " << interval << "s" << dendl;
+      c.wait_for(lock, interval*1s);
+    }
+  });
 }
 
 bool Beacon::ms_can_fast_dispatch2(const Message::const_ref& m) const
@@ -96,7 +119,7 @@ bool Beacon::ms_dispatch2(const Message::ref& m)
  */
 void Beacon::handle_mds_beacon(const MMDSBeacon::const_ref &m)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
 
   version_t seq = m->get_seq();
 
@@ -120,9 +143,7 @@ void Beacon::handle_mds_beacon(const MMDSBeacon::const_ref &m)
     seq_stamp.erase(seq_stamp.begin(), ++it);
 
     // Wake a waiter up if present
-    if (awaiting_seq == seq) {
-      waiting_cond.Signal();
-    }
+    cvar.notify_all();
   } else {
     dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state())
            << " seq " << m->get_seq() << " dne" << dendl;
@@ -132,28 +153,26 @@ void Beacon::handle_mds_beacon(const MMDSBeacon::const_ref &m)
 
 void Beacon::send()
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
   _send();
 }
 
 
 void Beacon::send_and_wait(const double duration)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
   _send();
-  awaiting_seq = last_seq;
+  auto awaiting_seq = last_seq;
   dout(20) << __func__ << ": awaiting " << awaiting_seq
            << " for up to " << duration << "s" << dendl;
 
-  utime_t timeout;
-  timeout.set_from_double(ceph_clock_now()+duration);
-  while (!seq_stamp.empty()
-         && seq_stamp.begin()->first <= awaiting_seq
-         && ceph_clock_now() < timeout) {
-    waiting_cond.WaitUntil(lock, timeout);
+  auto start = clock::now();
+  while (!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) {
+    auto now = clock::now();
+    auto s = duration*.95-std::chrono::duration<double>(now-start).count();
+    if (s < 0) break;
+    cvar.wait_for(lock, s*1s);
   }
-
-  awaiting_seq = -1;
 }
 
 
@@ -162,17 +181,6 @@ void Beacon::send_and_wait(const double duration)
  */
 void Beacon::_send()
 {
-  if (sender) {
-    timer.cancel_event(sender);
-  }
-  sender = timer.add_event_after(
-    g_conf()->mds_beacon_interval,
-    new FunctionContext([this](int) {
-       assert(lock.is_locked_by_me());
-       sender = nullptr;
-       _send();
-      }));
-
   auto now = clock::now();
   auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
 
@@ -212,6 +220,7 @@ void Beacon::_send()
     beacon->set_sys_info(sys_info);
   }
   monc->send_mon_message(beacon.detach());
+  last_send = now;
 }
 
 /**
@@ -219,7 +228,7 @@ void Beacon::_send()
  */
 void Beacon::notify_mdsmap(const MDSMap &mdsmap)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
 
   _notify_mdsmap(mdsmap);
 }
@@ -238,7 +247,7 @@ void Beacon::_notify_mdsmap(const MDSMap &mdsmap)
 
 bool Beacon::is_laggy()
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
 
   auto now = clock::now();
   auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
@@ -261,7 +270,7 @@ bool Beacon::is_laggy()
 
 void Beacon::set_want_state(const MDSMap &mdsmap, MDSMap::DaemonState const newstate)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
 
   // Update mdsmap epoch atomically with updating want_state, so that when
   // we send a beacon with the new want state it has the latest epoch, and
@@ -286,7 +295,7 @@ void Beacon::set_want_state(const MDSMap &mdsmap, MDSMap::DaemonState const news
  */
 void Beacon::notify_health(MDSRank const *mds)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
   if (!mds) {
     // No MDS rank held
     return;
@@ -489,7 +498,7 @@ void Beacon::notify_health(MDSRank const *mds)
 
 MDSMap::DaemonState Beacon::get_want_state() const
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
   return want_state;
 }
 
index 0a278e3de73cd06b26c3fad4dea219c745e7bb2d..a06df4cbe68d8e9776e1ab325c64587002c13ccf 100644 (file)
 #ifndef BEACON_STATE_H
 #define BEACON_STATE_H
 
+#include <mutex>
 #include <string_view>
+#include <thread>
 
 #include "include/types.h"
 #include "include/Context.h"
-#include "common/Mutex.h"
 #include "msg/Dispatcher.h"
 
 #include "messages/MMDSBeacon.h"
@@ -44,8 +45,8 @@ public:
   using clock = ceph::coarse_mono_clock;
   using time = ceph::coarse_mono_time;
 
-  Beacon(CephContext *cct_, MonClient *monc_, std::string_view name);
-  ~Beacon() override {};
+  Beacon(CephContext *cct, MonClient *monc, std::string_view name);
+  ~Beacon() override;
 
   void init(const MDSMap &mdsmap);
   void shutdown();
@@ -77,7 +78,7 @@ public:
 
   bool is_laggy();
   double last_cleared_laggy() const {
-    Mutex::Locker l(lock);
+    std::unique_lock lock(mutex);
     return std::chrono::duration<double>(clock::now()-last_laggy).count();
   }
 
@@ -85,10 +86,13 @@ private:
   void _notify_mdsmap(const MDSMap &mdsmap);
   void _send();
 
-  //CephContext *cct;
-  mutable Mutex lock;
+  mutable std::mutex mutex;
+  std::thread sender;
+  std::condition_variable cvar;
+  time last_send = clock::zero();
+  double beacon_interval = 5.0;
+  bool finished = false;
   MonClient*    monc;
-  SafeTimer     timer;
 
   // Items we duplicate from the MDS to have access under our own lock
   std::string name;
@@ -110,13 +114,6 @@ private:
 
   // Health status to be copied into each beacon message
   MDSHealth health;
-
-  // Ticker
-  Context *sender = nullptr;
-
-  version_t awaiting_seq = -1;
-  Cond waiting_cond;
 };
 
 #endif // BEACON_STATE_H
-