]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test/msgr: s/Mutex/ceph::mutex/
authorKefu Chai <kchai@redhat.com>
Wed, 17 Jul 2019 08:28:11 +0000 (16:28 +0800)
committerKefu Chai <kchai@redhat.com>
Sat, 3 Aug 2019 03:27:20 +0000 (11:27 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/test/msgr/test_async_driver.cc
src/test/msgr/test_msgr.cc

index 8b0f329ab24b48a645edf2641ffc45857539d087..e6a217af309e954d2a9051de8b76819ae1969707 100644 (file)
@@ -24,7 +24,7 @@
 #include <stdint.h>
 #include <arpa/inet.h>
 #include "include/Context.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "common/Cond.h"
 #include "global/global_init.h"
 #include "common/ceph_argparse.h"
@@ -295,24 +295,25 @@ class Worker : public Thread {
 
 class CountEvent: public EventCallback {
   std::atomic<unsigned> *count;
-  Mutex *lock;
-  Cond *cond;
+  ceph::mutex *lock;
+  ceph::condition_variable *cond;
 
  public:
-  CountEvent(std::atomic<unsigned> *atomic, Mutex *l, Cond *c): count(atomic), lock(l), cond(c) {}
+  CountEvent(std::atomic<unsigned> *atomic,
+             ceph::mutex *l, ceph::condition_variable *c)
+    : count(atomic), lock(l), cond(c) {}
   void do_request(uint64_t id) override {
-    lock->Lock();
+    std::scoped_lock l{*lock};
     (*count)--;
-    cond->Signal();
-    lock->Unlock();
+    cond->notify_all();
   }
 };
 
 TEST(EventCenterTest, DispatchTest) {
   Worker worker1(g_ceph_context, 1), worker2(g_ceph_context, 2);
   std::atomic<unsigned> count = { 0 };
-  Mutex lock("DispatchTest::lock");
-  Cond cond;
+  ceph::mutex lock = ceph::make_mutex("DispatchTest::lock");
+  ceph::condition_variable cond;
   worker1.create("worker_1");
   worker2.create("worker_2");
   for (int i = 0; i < 10000; ++i) {
@@ -320,9 +321,8 @@ TEST(EventCenterTest, DispatchTest) {
     worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
     count++;
     worker2.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
-    Mutex::Locker l(lock);
-    while (count)
-      cond.Wait(lock);
+    std::unique_lock l{lock};
+    cond.wait(l, [&] { return count == 0; });
   }
   worker1.stop();
   worker2.stop();
index dd4c52c010ab2b0b63e59f30205a9f22c6e21321..c107b3695f7966809397598e239c0d4c2bc1a808 100644 (file)
@@ -21,8 +21,7 @@
 #include <time.h>
 #include <set>
 #include <list>
-#include "common/Mutex.h"
-#include "common/Cond.h"
+#include "common/ceph_mutex.h"
 #include "common/ceph_argparse.h"
 #include "global/global_init.h"
 #include "msg/Dispatcher.h"
@@ -102,8 +101,8 @@ class FakeDispatcher : public Dispatcher {
     uint64_t get_count() { return count; }
   };
 
-  Mutex lock;
-  Cond cond;
+  ceph::mutex lock = ceph::make_mutex("FakeDispatcher::lock");
+  ceph::condition_variable cond;
   bool is_server;
   bool got_new;
   bool got_remote_reset;
@@ -111,7 +110,7 @@ class FakeDispatcher : public Dispatcher {
   bool loopback;
   entity_addrvec_t last_accept;
 
-  explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"),
+  explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context),
                           is_server(s), got_new(false), got_remote_reset(false),
                           got_connect(false), loopback(false) {
   }
@@ -126,7 +125,7 @@ class FakeDispatcher : public Dispatcher {
   }
 
   void ms_handle_fast_connect(Connection *con) override {
-    lock.Lock();
+    std::scoped_lock l{lock};
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
     auto s = con->get_priv();
     if (!s) {
@@ -136,8 +135,7 @@ class FakeDispatcher : public Dispatcher {
                            << " count: " << session->count << dendl;
     }
     got_connect = true;
-    cond.Signal();
-    lock.Unlock();
+    cond.notify_all();
   }
   void ms_handle_fast_accept(Connection *con) override {
     last_accept = con->get_peer_addrs();
@@ -158,14 +156,14 @@ class FakeDispatcher : public Dispatcher {
     if (is_server) {
       reply_message(m);
     }
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     got_new = true;
-    cond.Signal();
+    cond.notify_all();
     m->put();
     return true;
   }
   bool ms_handle_reset(Connection *con) override {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
     auto priv = con->get_priv();
     if (auto s = static_cast<Session*>(priv.get()); s) {
@@ -175,7 +173,7 @@ class FakeDispatcher : public Dispatcher {
     return true;
   }
   void ms_handle_remote_reset(Connection *con) override {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
     auto priv = con->get_priv();
     if (auto s = static_cast<Session*>(priv.get()); s) {
@@ -183,7 +181,7 @@ class FakeDispatcher : public Dispatcher {
       con->set_priv(nullptr);   // break ref <-> session cycle, if any
     }
     got_remote_reset = true;
-    cond.Signal();
+    cond.notify_all();
   }
   bool ms_handle_refused(Connection *con) override {
     return false;
@@ -207,9 +205,9 @@ class FakeDispatcher : public Dispatcher {
       ceph_assert(m->get_source().is_client());
     }
     m->put();
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     got_new = true;
-    cond.Signal();
+    cond.notify_all();
   }
 
   int ms_handle_authentication(Connection *con) override {
@@ -284,9 +282,7 @@ struct TestInterceptor : public Interceptor {
       return *(decisions[step]);
     }
     waiting = true;
-    while(waiting) {
-      cond_var.wait(l);
-    }
+    cond_var.wait(l, [this] { return !waiting; });
     return *(decisions[step]);
   }
 
@@ -386,16 +382,14 @@ TEST_P(MessengerTest, ConnectionRaceTest) {
   srv_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
 
   {
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
 
   {
-    Mutex::Locker l(srv_dispatcher.lock);
-    while (!srv_dispatcher.got_new)
-      srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+    std::unique_lock l{srv_dispatcher.lock};
+    srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
     srv_dispatcher.got_new = false;
   }
   
@@ -465,16 +459,14 @@ TEST_P(MessengerTest, MissingServerIdenTest) {
   srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
 
   {
-    Mutex::Locker l(srv_dispatcher.lock);
-    while (!srv_dispatcher.got_new)
-      srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+    std::unique_lock l{srv_dispatcher.lock};
+    srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
     srv_dispatcher.got_new = false;
   }
 
   {
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   
@@ -543,9 +535,8 @@ TEST_P(MessengerTest, MissingServerIdenTest2) {
   srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
 
   {
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   
@@ -604,9 +595,8 @@ TEST_P(MessengerTest, ReconnectTest) {
   ASSERT_EQ(c2s->send_message(m1), 0);
 
   {
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
 
@@ -650,9 +640,8 @@ TEST_P(MessengerTest, ReconnectTest) {
   srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
 
   {
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
 
@@ -702,9 +691,8 @@ TEST_P(MessengerTest, ReconnectRaceTest) {
   ASSERT_EQ(c2s->send_message(m1), 0);
 
   {
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
 
@@ -776,9 +764,8 @@ TEST_P(MessengerTest, ReconnectRaceTest) {
   srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
 
   {
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
 
@@ -808,9 +795,8 @@ TEST_P(MessengerTest, SimpleTest) {
                                               server_msgr->get_myaddrs());
   {
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
@@ -834,9 +820,8 @@ TEST_P(MessengerTest, SimpleTest) {
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
@@ -860,9 +845,8 @@ TEST_P(MessengerTest, SimpleTest) {
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   srv_dispatcher.loopback = false;
@@ -896,9 +880,8 @@ TEST_P(MessengerTest, SimpleMsgr2Test) {
     server_msgr->get_myaddrs());
   {
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
@@ -923,9 +906,8 @@ TEST_P(MessengerTest, SimpleMsgr2Test) {
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
@@ -949,9 +931,8 @@ TEST_P(MessengerTest, SimpleMsgr2Test) {
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   srv_dispatcher.loopback = false;
@@ -978,9 +959,8 @@ TEST_P(MessengerTest, NameAddrTest) {
                                               server_msgr->get_myaddrs());
   {
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
@@ -1044,9 +1024,8 @@ TEST_P(MessengerTest, FeatureTest) {
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
@@ -1075,9 +1054,8 @@ TEST_P(MessengerTest, TimeoutTest) {
                                                    server_msgr->get_myaddrs());
   {
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
@@ -1118,9 +1096,8 @@ TEST_P(MessengerTest, StatefulTest) {
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
@@ -1137,18 +1114,16 @@ TEST_P(MessengerTest, StatefulTest) {
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
                                        srv_dispatcher.last_accept);
   {
-    Mutex::Locker l(srv_dispatcher.lock);
-    while (!srv_dispatcher.got_remote_reset)
-      srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+    std::unique_lock l{srv_dispatcher.lock};
+    srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_remote_reset; });
   }
 
   // 2. test for client reconnect
@@ -1160,15 +1135,13 @@ TEST_P(MessengerTest, StatefulTest) {
   ASSERT_FALSE(server_conn->is_connected());
   // ensure client detect server socket closed
   {
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_remote_reset)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
     cli_dispatcher.got_remote_reset = false;
   }
   {
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_connect)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
     cli_dispatcher.got_connect = false;
   }
   CHECK_AND_WAIT_TRUE(conn->is_connected());
@@ -1178,9 +1151,8 @@ TEST_P(MessengerTest, StatefulTest) {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
     ASSERT_TRUE(conn->is_connected());
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   // resetcheck happen
@@ -1218,9 +1190,8 @@ TEST_P(MessengerTest, StatelessTest) {
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
@@ -1233,9 +1204,8 @@ TEST_P(MessengerTest, StatelessTest) {
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
@@ -1243,9 +1213,8 @@ TEST_P(MessengerTest, StatelessTest) {
                                                      srv_dispatcher.last_accept);
   // server lose state
   {
-    Mutex::Locker l(srv_dispatcher.lock);
-    while (!srv_dispatcher.got_new)
-      srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+    std::unique_lock l{srv_dispatcher.lock};
+    srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
   }
   ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
@@ -1260,9 +1229,8 @@ TEST_P(MessengerTest, StatelessTest) {
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
@@ -1295,9 +1263,8 @@ TEST_P(MessengerTest, ClientStandbyTest) {
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
@@ -1316,21 +1283,18 @@ TEST_P(MessengerTest, ClientStandbyTest) {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
     {
-      Mutex::Locker l(cli_dispatcher.lock);
-      while (!cli_dispatcher.got_remote_reset)
-        cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+      std::unique_lock l{cli_dispatcher.lock};
+      cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
       cli_dispatcher.got_remote_reset = false;
-      while (!cli_dispatcher.got_connect)
-        cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+      cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
       cli_dispatcher.got_connect = false;
     }
     CHECK_AND_WAIT_TRUE(conn->is_connected());
     ASSERT_TRUE(conn->is_connected());
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
@@ -1364,9 +1328,8 @@ TEST_P(MessengerTest, AuthTest) {
                                                    server_msgr->get_myaddrs());
   {
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
@@ -1383,9 +1346,8 @@ TEST_P(MessengerTest, AuthTest) {
   {
     MPing *m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
@@ -1427,11 +1389,8 @@ TEST_P(MessengerTest, MessageTest) {
     MCommand *m = new MCommand(uuid);
     m->cmd = cmds;
     conn->send_message(m);
-    utime_t t;
-    t += 1000*1000*500;
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait_for(l, 500s, [&] { return cli_dispatcher.got_new; });
     ASSERT_TRUE(cli_dispatcher.got_new);
     cli_dispatcher.got_new = false;
   }
@@ -1447,9 +1406,8 @@ TEST_P(MessengerTest, MessageTest) {
     conn->send_message(m);
     utime_t t;
     t += 1000*1000*500;
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
     ASSERT_TRUE(cli_dispatcher.got_new);
     cli_dispatcher.got_new = false;
   }
@@ -1492,8 +1450,8 @@ ostream& operator<<(ostream& out, const Payload &pl)
 
 class SyntheticDispatcher : public Dispatcher {
  public:
-  Mutex lock;
-  Cond cond;
+  ceph::mutex lock = ceph::make_mutex("SyntheticDispatcher::lock");
+  ceph::condition_variable cond;
   bool is_server;
   bool got_new;
   bool got_remote_reset;
@@ -1504,7 +1462,7 @@ class SyntheticDispatcher : public Dispatcher {
   SyntheticWorkload *workload;
 
   SyntheticDispatcher(bool s, SyntheticWorkload *wl):
-      Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false),
+      Dispatcher(g_ceph_context), is_server(s), got_new(false),
       got_remote_reset(false), got_connect(false), index(0), workload(wl) {
   }
   bool ms_can_fast_dispatch_any() const override { return true; }
@@ -1519,30 +1477,30 @@ class SyntheticDispatcher : public Dispatcher {
   }
 
   void ms_handle_fast_connect(Connection *con) override {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     list<uint64_t> c = conn_sent[con];
     for (list<uint64_t>::iterator it = c.begin();
          it != c.end(); ++it)
       sent.erase(*it);
     conn_sent.erase(con);
     got_connect = true;
-    cond.Signal();
+    cond.notify_all();
   }
   void ms_handle_fast_accept(Connection *con) override {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     list<uint64_t> c = conn_sent[con];
     for (list<uint64_t>::iterator it = c.begin();
          it != c.end(); ++it)
       sent.erase(*it);
     conn_sent.erase(con);
-    cond.Signal();
+    cond.notify_all();
   }
   bool ms_dispatch(Message *m) override {
     ceph_abort();
   }
   bool ms_handle_reset(Connection *con) override;
   void ms_handle_remote_reset(Connection *con) override {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     list<uint64_t> c = conn_sent[con];
     for (list<uint64_t>::iterator it = c.begin();
          it != c.end(); ++it)
@@ -1567,11 +1525,11 @@ class SyntheticDispatcher : public Dispatcher {
       lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
       reply_message(m, pl);
       m->put();
-      Mutex::Locker l(lock);
+      std::lock_guard l{lock};
       got_new = true;
-      cond.Signal();
+      cond.notify_all();
     } else {
-      Mutex::Locker l(lock);
+      std::lock_guard l{lock};
       if (sent.count(pl.seq)) {
        lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
        ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
@@ -1581,7 +1539,7 @@ class SyntheticDispatcher : public Dispatcher {
       }
       m->put();
       got_new = true;
-      cond.Signal();
+      cond.notify_all();
     }
   }
 
@@ -1606,7 +1564,7 @@ class SyntheticDispatcher : public Dispatcher {
     encode(pl, bl);
     m->set_data(bl);
     if (!con->get_messenger()->get_default_policy().lossy) {
-      Mutex::Locker l(lock);
+      std::lock_guard l{lock};
       sent[pl.seq] = pl.data;
       conn_sent[con].push_back(pl.seq);
     }
@@ -1615,12 +1573,12 @@ class SyntheticDispatcher : public Dispatcher {
   }
 
   uint64_t get_pending() {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     return sent.size();
   }
 
   void clear_pending(ConnectionRef con) {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
 
     for (list<uint64_t>::iterator it = conn_sent[con].begin();
          it != conn_sent[con].end(); ++it)
@@ -1639,8 +1597,8 @@ class SyntheticDispatcher : public Dispatcher {
 
 
 class SyntheticWorkload {
-  Mutex lock;
-  Cond cond;
+  ceph::mutex lock = ceph::make_mutex("SyntheticWorkload::lock");
+  ceph::condition_variable cond;
   set<Messenger*> available_servers;
   set<Messenger*> available_clients;
   Messenger::Policy client_policy;
@@ -1657,8 +1615,7 @@ class SyntheticWorkload {
 
   SyntheticWorkload(int servers, int clients, string type, int random_num,
                     Messenger::Policy srv_policy, Messenger::Policy cli_policy)
-    : lock("SyntheticWorkload::lock"),
-      client_policy(cli_policy),
+    : client_policy(cli_policy),
       dispatcher(false, this),
       rng(time(NULL)),
       dummy_auth(g_ceph_context) {
@@ -1721,11 +1678,11 @@ class SyntheticWorkload {
 
   ConnectionRef _get_random_connection() {
     while (dispatcher.get_pending() > max_in_flight) {
-      lock.Unlock();
+      lock.unlock();
       usleep(500);
-      lock.Lock();
+      lock.lock();
     }
-    ceph_assert(lock.is_locked());
+    ceph_assert(ceph_mutex_is_locked(lock));
     boost::uniform_int<> choose(0, available_connections.size() - 1);
     int index = choose(rng);
     map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
@@ -1738,7 +1695,7 @@ class SyntheticWorkload {
   }
 
   void generate_connection() {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     if (!can_create_connection())
       return ;
 
@@ -1776,7 +1733,7 @@ class SyntheticWorkload {
   }
 
   void send_message() {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     ConnectionRef conn = _get_random_connection();
     boost::uniform_int<> true_false(0, 99);
     int val = true_false(rng);
@@ -1796,7 +1753,7 @@ class SyntheticWorkload {
   }
 
   void drop_connection() {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     if (available_connections.size() < 10)
       return;
     ConnectionRef conn = _get_random_connection();
@@ -1820,7 +1777,7 @@ class SyntheticWorkload {
   }
 
   void print_internal_state(bool detail=false) {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     lderr(g_ceph_context) << "available_connections: " << available_connections.size()
          << " inflight messages: " << dispatcher.get_pending() << dendl;
     if (detail && !available_connections.empty()) {
@@ -1860,7 +1817,7 @@ class SyntheticWorkload {
   }
 
   void handle_reset(Connection *con) {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     available_connections.erase(con);
     dispatcher.clear_pending(con);
   }
@@ -2075,12 +2032,12 @@ TEST_P(MessengerTest, SyntheticInjectTest4) {
 
 
 class MarkdownDispatcher : public Dispatcher {
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock");
   set<ConnectionRef> conns;
   bool last_mark;
  public:
   std::atomic<uint64_t> count = { 0 };
-  explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"),
+  explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context),
                               last_mark(false) {
   }
   bool ms_can_fast_dispatch_any() const override { return false; }
@@ -2095,16 +2052,16 @@ class MarkdownDispatcher : public Dispatcher {
 
   void ms_handle_fast_connect(Connection *con) override {
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     conns.insert(con);
   }
   void ms_handle_fast_accept(Connection *con) override {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     conns.insert(con);
   }
   bool ms_dispatch(Message *m) override {
     lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl;
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     count++;
     conns.insert(m->get_connection());
     if (conns.size() < 2 && !last_mark) {
@@ -2128,13 +2085,13 @@ class MarkdownDispatcher : public Dispatcher {
   }
   bool ms_handle_reset(Connection *con) override {
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     conns.erase(con);
     usleep(rand() % 500);
     return true;
   }
   void ms_handle_remote_reset(Connection *con) override {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     conns.erase(con);
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
   }