]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: use monotonic waits in Beacon 24375/head
authorPatrick Donnelly <pdonnell@redhat.com>
Fri, 17 Aug 2018 22:03:56 +0000 (15:03 -0700)
committerNathan Cutler <ncutler@suse.com>
Wed, 3 Oct 2018 19:06:02 +0000 (21:06 +0200)
This guarantees that the sender thread cannot be disrupted by system clock
changes. This commit also simplifies the sender thread by manually managing the
thread and avoiding unnecessary context creation.

Fixes: http://tracker.ceph.com/issues/26962
Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
(cherry picked from commit a5fc29b95281c6ca58c9177c665c379846beb4b3)

Conflicts:
src/mds/Beacon.cc
- g_conf->foo instead of g_conf()->foo
- boost::string_view instead of std::string_view
- always specify template type std::unique_lock<std::mutex>
src/mds/Beacon.h
- time::min() instead of clock::zero()
- always specify template type std::unique_lock<std::mutex>
- std::chrono::seconds instead of "1s" in std::chrono_literals namespace
  (which is a C++14ism)

src/mds/Beacon.cc
src/mds/Beacon.h

index f4289c060524ad1f23efc2099b088a66b08cd370..6e4d07de34bc4d60a638a74490eaa52124f082d0 100644 (file)
 
 #include "Beacon.h"
 
+#include <chrono>
+
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mds
 #undef dout_prefix
 #define dout_prefix *_dout << "mds.beacon." << name << ' '
 
+Beacon::Beacon(CephContext *cct, MonClient *monc, boost::string_view name)
+  :
+    Dispatcher(cct),
+    beacon_interval(g_conf->mds_beacon_interval),
+    monc(monc),
+    name(name)
+{
+}
 
-Beacon::Beacon(CephContext *cct_, MonClient *monc_, boost::string_view name_) :
-  Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
-  name(name_)
+Beacon::~Beacon()
 {
+  shutdown();
+}
+
+void Beacon::shutdown()
+{
+  std::unique_lock<std::mutex> lock(mutex);
+  if (!finished) {
+    finished = true;
+    lock.unlock();
+    sender.join();
+  }
 }
 
 void Beacon::init(const MDSMap* mdsmap)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   assert(mdsmap != NULL);
 
   _notify_mdsmap(mdsmap);
@@ -51,20 +70,22 @@ void Beacon::init(const MDSMap* mdsmap)
   standby_for_fscid = fs_cluster_id_t(g_conf->mds_standby_for_fscid);
   standby_replay = g_conf->mds_standby_replay;
 
-  // Spawn threads and start messaging
-  timer.init();
-  _send();
-}
-
-
-void Beacon::shutdown()
-{
-  Mutex::Locker l(lock);
-  if (sender) {
-    timer.cancel_event(sender);
-    sender = NULL;
-  }
-  timer.shutdown();
+  sender = std::thread([this]() {
+    std::unique_lock<std::mutex> lock(mutex);
+    std::condition_variable c; // no one wakes us
+    while (!finished) {
+      auto now = clock::now();
+      auto since = std::chrono::duration<double>(now-last_send).count();
+      auto interval = beacon_interval;
+      if (since >= interval*.90) {
+        _send();
+      } else {
+        interval -= since;
+      }
+      dout(20) << "sender thread waiting interval " << interval << "s" << dendl;
+      c.wait_for(lock, interval*std::chrono::seconds(1));
+    }
+  });
 }
 
 bool Beacon::ms_can_fast_dispatch(const Message *m) const
@@ -100,7 +121,7 @@ bool Beacon::ms_dispatch(Message *m)
  */
 void Beacon::handle_mds_beacon(MMDSBeacon *m)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   assert(m != NULL);
 
   version_t seq = m->get_seq();
@@ -125,9 +146,7 @@ void Beacon::handle_mds_beacon(MMDSBeacon *m)
     seq_stamp.erase(seq_stamp.begin(), ++it);
 
     // Wake a waiter up if present
-    if (awaiting_seq == seq) {
-      waiting_cond.Signal();
-    }
+    cvar.notify_all();
   } else {
     dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state())
            << " seq " << m->get_seq() << " dne" << dendl;
@@ -138,28 +157,26 @@ void Beacon::handle_mds_beacon(MMDSBeacon *m)
 
 void Beacon::send()
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   _send();
 }
 
 
 void Beacon::send_and_wait(const double duration)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   _send();
-  awaiting_seq = last_seq;
+  auto awaiting_seq = last_seq;
   dout(20) << __func__ << ": awaiting " << awaiting_seq
            << " for up to " << duration << "s" << dendl;
 
-  utime_t timeout;
-  timeout.set_from_double(ceph_clock_now()+duration);
-  while (!seq_stamp.empty()
-         && seq_stamp.begin()->first <= awaiting_seq
-         && ceph_clock_now() < timeout) {
-    waiting_cond.WaitUntil(lock, timeout);
+  auto start = clock::now();
+  while (!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) {
+    auto now = clock::now();
+    auto s = duration*.95-std::chrono::duration<double>(now-start).count();
+    if (s < 0) break;
+    cvar.wait_for(lock, s*std::chrono::seconds(1));
   }
-
-  awaiting_seq = -1;
 }
 
 
@@ -168,17 +185,6 @@ void Beacon::send_and_wait(const double duration)
  */
 void Beacon::_send()
 {
-  if (sender) {
-    timer.cancel_event(sender);
-  }
-  sender = timer.add_event_after(
-    g_conf->mds_beacon_interval,
-    new FunctionContext([this](int) {
-       assert(lock.is_locked_by_me());
-       sender = nullptr;
-       _send();
-      }));
-
   auto now = clock::now();
   auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
 
@@ -218,6 +224,7 @@ void Beacon::_send()
     beacon->set_sys_info(sys_info);
   }
   monc->send_mon_message(beacon);
+  last_send = now;
 }
 
 /**
@@ -225,7 +232,7 @@ void Beacon::_send()
  */
 void Beacon::notify_mdsmap(MDSMap const *mdsmap)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   assert(mdsmap != NULL);
 
   _notify_mdsmap(mdsmap);
@@ -246,7 +253,7 @@ void Beacon::_notify_mdsmap(MDSMap const *mdsmap)
 
 bool Beacon::is_laggy()
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
 
   auto now = clock::now();
   auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
@@ -271,7 +278,7 @@ bool Beacon::is_laggy()
 
 void Beacon::set_want_state(const MDSMap* mdsmap, MDSMap::DaemonState const newstate)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
 
   // Update mdsmap epoch atomically with updating want_state, so that when
   // we send a beacon with the new want state it has the latest epoch, and
@@ -296,7 +303,7 @@ void Beacon::set_want_state(const MDSMap* mdsmap, MDSMap::DaemonState const news
  */
 void Beacon::notify_health(MDSRank const *mds)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   if (!mds) {
     // No MDS rank held
     return;
@@ -499,7 +506,7 @@ void Beacon::notify_health(MDSRank const *mds)
 
 MDSMap::DaemonState Beacon::get_want_state() const
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   return want_state;
 }
 
index 26d441ea6b34866e3464bd0e9fc9efcd6e17ad40..bf5afa2c8b08a2297f332616943a9aa7027e347f 100644 (file)
 #define BEACON_STATE_H
 
 #include <boost/utility/string_view.hpp>
+#include <mutex>
+#include <thread>
 
 #include "include/types.h"
 #include "include/Context.h"
-#include "common/Mutex.h"
 #include "msg/Dispatcher.h"
 #include "messages/MMDSBeacon.h"
 
@@ -45,8 +46,8 @@ public:
   using clock = ceph::coarse_mono_clock;
   using time = ceph::coarse_mono_time;
 
-  Beacon(CephContext *cct_, MonClient *monc_, boost::string_view name);
-  ~Beacon() override {};
+  Beacon(CephContext *cct, MonClient *monc, boost::string_view name);
+  ~Beacon() override;
 
   void init(MDSMap const *mdsmap);
   void shutdown();
@@ -78,7 +79,7 @@ public:
 
   bool is_laggy();
   double last_cleared_laggy() const {
-    Mutex::Locker l(lock);
+    std::unique_lock<std::mutex> lock(mutex);
     return std::chrono::duration<double>(clock::now()-last_laggy).count();
   }
 
@@ -86,10 +87,13 @@ private:
   void _notify_mdsmap(MDSMap const *mdsmap);
   void _send();
 
-  //CephContext *cct;
-  mutable Mutex lock;
+  mutable std::mutex mutex;
+  std::thread sender;
+  std::condition_variable cvar;
+  time last_send = time::min();
+  double beacon_interval = 5.0;
+  bool finished = false;
   MonClient*    monc;
-  SafeTimer     timer;
 
   // Items we duplicate from the MDS to have access under our own lock
   std::string name;
@@ -111,13 +115,6 @@ private:
 
   // Health status to be copied into each beacon message
   MDSHealth health;
-
-  // Ticker
-  Context *sender = nullptr;
-
-  version_t awaiting_seq = -1;
-  Cond waiting_cond;
 };
 
 #endif // BEACON_STATE_H
-