]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: use monotonic waits in Beacon
authorPatrick Donnelly <pdonnell@redhat.com>
Fri, 17 Aug 2018 22:03:56 +0000 (15:03 -0700)
committerVenky Shankar <vshankar@redhat.com>
Tue, 6 Nov 2018 04:17:26 +0000 (09:47 +0530)
This guarantees that the sender thread cannot be disrupted by system clock
changes. This commit also simplifies the sender thread by manually managing the
thread and avoiding unnecessary context creation.

Fixes: http://tracker.ceph.com/issues/32090
Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
(cherry picked from commit a5fc29b95281c6ca58c9177c665c379846beb4b3)

 Conflicts:
src/mds/Beacon.cc

src/mds/Beacon.cc
src/mds/Beacon.h

index 4ebbebdaffd7cc220014d9c8630082477a8e52ef..a013c391398b4fcd51f1f592c35f102545969cc7 100644 (file)
 
 #include "Beacon.h"
 
+#include <chrono>
+
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mds
 #undef dout_prefix
 #define dout_prefix *_dout << "mds.beacon." << name << ' '
 
+using namespace std::chrono_literals;
+
+Beacon::Beacon(CephContext *cct, MonClient *monc, std::string_view name)
+  :
+    Dispatcher(cct),
+    beacon_interval(g_conf->mds_beacon_interval),
+    monc(monc),
+    name(name)
+{
+}
+
+Beacon::~Beacon()
+{
+  shutdown();
+}
 
-Beacon::Beacon(CephContext *cct_, MonClient *monc_, std::string_view name_) :
-  Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
-  name(name_)
+void Beacon::shutdown()
 {
+  std::unique_lock<std::mutex> lock(mutex);
+  if (!finished) {
+    finished = true;
+    lock.unlock();
+    sender.join();
+  }
 }
 
 void Beacon::init(MDSMap const *mdsmap)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
   assert(mdsmap != NULL);
 
   _notify_mdsmap(mdsmap);
@@ -51,20 +72,22 @@ void Beacon::init(MDSMap const *mdsmap)
   standby_for_fscid = fs_cluster_id_t(g_conf->mds_standby_for_fscid);
   standby_replay = g_conf->mds_standby_replay;
 
-  // Spawn threads and start messaging
-  timer.init();
-  _send();
-}
-
-
-void Beacon::shutdown()
-{
-  Mutex::Locker l(lock);
-  if (sender) {
-    timer.cancel_event(sender);
-    sender = NULL;
-  }
-  timer.shutdown();
+  sender = std::thread([this]() {
+    std::unique_lock<std::mutex> lock(mutex);
+    std::condition_variable c; // no one wakes us
+    while (!finished) {
+      auto now = clock::now();
+      auto since = std::chrono::duration<double>(now-last_send).count();
+      auto interval = beacon_interval;
+      if (since >= interval*.90) {
+        _send();
+      } else {
+        interval -= since;
+      }
+      dout(20) << "sender thread waiting interval " << interval << "s" << dendl;
+      c.wait_for(lock, interval*1s);
+    }
+  });
 }
 
 bool Beacon::ms_can_fast_dispatch(const Message *m) const
@@ -100,7 +123,7 @@ bool Beacon::ms_dispatch(Message *m)
  */
 void Beacon::handle_mds_beacon(MMDSBeacon *m)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
   assert(m != NULL);
 
   version_t seq = m->get_seq();
@@ -125,9 +148,7 @@ void Beacon::handle_mds_beacon(MMDSBeacon *m)
     seq_stamp.erase(seq_stamp.begin(), ++it);
 
     // Wake a waiter up if present
-    if (awaiting_seq == seq) {
-      waiting_cond.Signal();
-    }
+    cvar.notify_all();
   } else {
     dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state())
            << " seq " << m->get_seq() << " dne" << dendl;
@@ -138,28 +159,26 @@ void Beacon::handle_mds_beacon(MMDSBeacon *m)
 
 void Beacon::send()
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
   _send();
 }
 
 
 void Beacon::send_and_wait(const double duration)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
   _send();
-  awaiting_seq = last_seq;
+  auto awaiting_seq = last_seq;
   dout(20) << __func__ << ": awaiting " << awaiting_seq
            << " for up to " << duration << "s" << dendl;
 
-  utime_t timeout;
-  timeout.set_from_double(ceph_clock_now()+duration);
-  while (!seq_stamp.empty()
-         && seq_stamp.begin()->first <= awaiting_seq
-         && ceph_clock_now() < timeout) {
-    waiting_cond.WaitUntil(lock, timeout);
+  auto start = clock::now();
+  while (!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) {
+    auto now = clock::now();
+    auto s = duration*.95-std::chrono::duration<double>(now-start).count();
+    if (s < 0) break;
+    cvar.wait_for(lock, s*1s);
   }
-
-  awaiting_seq = -1;
 }
 
 
@@ -168,17 +187,6 @@ void Beacon::send_and_wait(const double duration)
  */
 void Beacon::_send()
 {
-  if (sender) {
-    timer.cancel_event(sender);
-  }
-  sender = timer.add_event_after(
-    g_conf->mds_beacon_interval,
-    new FunctionContext([this](int) {
-       assert(lock.is_locked_by_me());
-       sender = nullptr;
-       _send();
-      }));
-
   auto now = clock::now();
   auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
 
@@ -218,6 +226,7 @@ void Beacon::_send()
     beacon->set_sys_info(sys_info);
   }
   monc->send_mon_message(beacon);
+  last_send = now;
 }
 
 /**
@@ -225,7 +234,7 @@ void Beacon::_send()
  */
 void Beacon::notify_mdsmap(MDSMap const *mdsmap)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
   assert(mdsmap != NULL);
 
   _notify_mdsmap(mdsmap);
@@ -246,7 +255,7 @@ void Beacon::_notify_mdsmap(MDSMap const *mdsmap)
 
 bool Beacon::is_laggy()
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
 
   auto now = clock::now();
   auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
@@ -271,7 +280,7 @@ bool Beacon::is_laggy()
 
 void Beacon::set_want_state(MDSMap const *mdsmap, MDSMap::DaemonState const newstate)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
 
   // Update mdsmap epoch atomically with updating want_state, so that when
   // we send a beacon with the new want state it has the latest epoch, and
@@ -296,7 +305,7 @@ void Beacon::set_want_state(MDSMap const *mdsmap, MDSMap::DaemonState const news
  */
 void Beacon::notify_health(MDSRank const *mds)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
   if (!mds) {
     // No MDS rank held
     return;
@@ -500,7 +509,7 @@ void Beacon::notify_health(MDSRank const *mds)
 
 MDSMap::DaemonState Beacon::get_want_state() const
 {
-  Mutex::Locker l(lock);
+  std::unique_lock lock(mutex);
   return want_state;
 }
 
index 74e2359235f8df5d1f9278ae1caa2776e1d588a6..b7fbe27ec19c874772017effcba9ddfc4612e0e0 100644 (file)
 #ifndef BEACON_STATE_H
 #define BEACON_STATE_H
 
+#include <mutex>
 #include <string_view>
+#include <thread>
 
 #include "include/types.h"
 #include "include/Context.h"
-#include "common/Mutex.h"
 #include "msg/Dispatcher.h"
 #include "messages/MMDSBeacon.h"
 
@@ -45,8 +46,8 @@ public:
   using clock = ceph::coarse_mono_clock;
   using time = ceph::coarse_mono_time;
 
-  Beacon(CephContext *cct_, MonClient *monc_, std::string_view name);
-  ~Beacon() override {};
+  Beacon(CephContext *cct, MonClient *monc, std::string_view name);
+  ~Beacon() override;
 
   void init(MDSMap const *mdsmap);
   void shutdown();
@@ -78,7 +79,7 @@ public:
 
   bool is_laggy();
   double last_cleared_laggy() const {
-    Mutex::Locker l(lock);
+    std::unique_lock lock(mutex);
     return std::chrono::duration<double>(clock::now()-last_laggy).count();
   }
 
@@ -86,10 +87,13 @@ private:
   void _notify_mdsmap(MDSMap const *mdsmap);
   void _send();
 
-  //CephContext *cct;
-  mutable Mutex lock;
+  mutable std::mutex mutex;
+  std::thread sender;
+  std::condition_variable cvar;
+  time last_send = time::min();
+  double beacon_interval = 5.0;
+  bool finished = false;
   MonClient*    monc;
-  SafeTimer     timer;
 
   // Items we duplicate from the MDS to have access under our own lock
   std::string name;
@@ -111,13 +115,6 @@ private:
 
   // Health status to be copied into each beacon message
   MDSHealth health;
-
-  // Ticker
-  Context *sender = nullptr;
-
-  version_t awaiting_seq = -1;
-  Cond waiting_cond;
 };
 
 #endif // BEACON_STATE_H
-