From bfb8c741cd1e588a3a9dd6bbeff5024d5713dd90 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Wed, 17 Jul 2019 16:28:11 +0800 Subject: [PATCH] test/msgr: s/Mutex/ceph::mutex/ Signed-off-by: Kefu Chai --- src/test/msgr/test_async_driver.cc | 24 +-- src/test/msgr/test_msgr.cc | 279 ++++++++++++----------------- 2 files changed, 130 insertions(+), 173 deletions(-) diff --git a/src/test/msgr/test_async_driver.cc b/src/test/msgr/test_async_driver.cc index 8b0f329ab24..e6a217af309 100644 --- a/src/test/msgr/test_async_driver.cc +++ b/src/test/msgr/test_async_driver.cc @@ -24,7 +24,7 @@ #include #include #include "include/Context.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/Cond.h" #include "global/global_init.h" #include "common/ceph_argparse.h" @@ -295,24 +295,25 @@ class Worker : public Thread { class CountEvent: public EventCallback { std::atomic *count; - Mutex *lock; - Cond *cond; + ceph::mutex *lock; + ceph::condition_variable *cond; public: - CountEvent(std::atomic *atomic, Mutex *l, Cond *c): count(atomic), lock(l), cond(c) {} + CountEvent(std::atomic *atomic, + ceph::mutex *l, ceph::condition_variable *c) + : count(atomic), lock(l), cond(c) {} void do_request(uint64_t id) override { - lock->Lock(); + std::scoped_lock l{*lock}; (*count)--; - cond->Signal(); - lock->Unlock(); + cond->notify_all(); } }; TEST(EventCenterTest, DispatchTest) { Worker worker1(g_ceph_context, 1), worker2(g_ceph_context, 2); std::atomic count = { 0 }; - Mutex lock("DispatchTest::lock"); - Cond cond; + ceph::mutex lock = ceph::make_mutex("DispatchTest::lock"); + ceph::condition_variable cond; worker1.create("worker_1"); worker2.create("worker_2"); for (int i = 0; i < 10000; ++i) { @@ -320,9 +321,8 @@ TEST(EventCenterTest, DispatchTest) { worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond))); count++; worker2.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond))); - Mutex::Locker l(lock); - while (count) - cond.Wait(lock); + std::unique_lock l{lock}; + cond.wait(l, [&] { return count == 0; }); } worker1.stop(); worker2.stop(); diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index dd4c52c010a..c107b3695f7 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -21,8 +21,7 @@ #include #include #include -#include "common/Mutex.h" -#include "common/Cond.h" +#include "common/ceph_mutex.h" #include "common/ceph_argparse.h" #include "global/global_init.h" #include "msg/Dispatcher.h" @@ -102,8 +101,8 @@ class FakeDispatcher : public Dispatcher { uint64_t get_count() { return count; } }; - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("FakeDispatcher::lock"); + ceph::condition_variable cond; bool is_server; bool got_new; bool got_remote_reset; @@ -111,7 +110,7 @@ class FakeDispatcher : public Dispatcher { bool loopback; entity_addrvec_t last_accept; - explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"), + explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), is_server(s), got_new(false), got_remote_reset(false), got_connect(false), loopback(false) { } @@ -126,7 +125,7 @@ class FakeDispatcher : public Dispatcher { } void ms_handle_fast_connect(Connection *con) override { - lock.Lock(); + std::scoped_lock l{lock}; lderr(g_ceph_context) << __func__ << " " << con << dendl; auto s = con->get_priv(); if (!s) { @@ -136,8 +135,7 @@ class FakeDispatcher : public Dispatcher { << " count: " << session->count << dendl; } got_connect = true; - cond.Signal(); - lock.Unlock(); + cond.notify_all(); } void ms_handle_fast_accept(Connection *con) override { last_accept = con->get_peer_addrs(); @@ -158,14 +156,14 @@ class FakeDispatcher : public Dispatcher { if (is_server) { reply_message(m); } - Mutex::Locker l(lock); + std::lock_guard l{lock}; got_new = true; - cond.Signal(); + cond.notify_all(); m->put(); return true; } bool ms_handle_reset(Connection *con) override { - Mutex::Locker l(lock); + std::lock_guard l{lock}; lderr(g_ceph_context) << __func__ << " " << con << dendl; auto priv = con->get_priv(); if (auto s = static_cast(priv.get()); s) { @@ -175,7 +173,7 @@ class FakeDispatcher : public Dispatcher { return true; } void ms_handle_remote_reset(Connection *con) override { - Mutex::Locker l(lock); + std::lock_guard l{lock}; lderr(g_ceph_context) << __func__ << " " << con << dendl; auto priv = con->get_priv(); if (auto s = static_cast(priv.get()); s) { @@ -183,7 +181,7 @@ class FakeDispatcher : public Dispatcher { con->set_priv(nullptr); // break ref <-> session cycle, if any } got_remote_reset = true; - cond.Signal(); + cond.notify_all(); } bool ms_handle_refused(Connection *con) override { return false; @@ -207,9 +205,9 @@ class FakeDispatcher : public Dispatcher { ceph_assert(m->get_source().is_client()); } m->put(); - Mutex::Locker l(lock); + std::lock_guard l{lock}; got_new = true; - cond.Signal(); + cond.notify_all(); } int ms_handle_authentication(Connection *con) override { @@ -284,9 +282,7 @@ struct TestInterceptor : public Interceptor { return *(decisions[step]); } waiting = true; - while(waiting) { - cond_var.wait(l); - } + cond_var.wait(l, [this] { return !waiting; }); return *(decisions[step]); } @@ -386,16 +382,14 @@ TEST_P(MessengerTest, ConnectionRaceTest) { srv_interceptor->proceed(11, Interceptor::ACTION::CONTINUE); { - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } { - Mutex::Locker l(srv_dispatcher.lock); - while (!srv_dispatcher.got_new) - srv_dispatcher.cond.Wait(srv_dispatcher.lock); + std::unique_lock l{srv_dispatcher.lock}; + srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); srv_dispatcher.got_new = false; } @@ -465,16 +459,14 @@ TEST_P(MessengerTest, MissingServerIdenTest) { srv_interceptor->proceed(12, Interceptor::ACTION::FAIL); { - Mutex::Locker l(srv_dispatcher.lock); - while (!srv_dispatcher.got_new) - srv_dispatcher.cond.Wait(srv_dispatcher.lock); + std::unique_lock l{srv_dispatcher.lock}; + srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); srv_dispatcher.got_new = false; } { - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } @@ -543,9 +535,8 @@ TEST_P(MessengerTest, MissingServerIdenTest2) { srv_interceptor->proceed(12, Interceptor::ACTION::FAIL); { - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } @@ -604,9 +595,8 @@ TEST_P(MessengerTest, ReconnectTest) { ASSERT_EQ(c2s->send_message(m1), 0); { - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } @@ -650,9 +640,8 @@ TEST_P(MessengerTest, ReconnectTest) { srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE); { - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } @@ -702,9 +691,8 @@ TEST_P(MessengerTest, ReconnectRaceTest) { ASSERT_EQ(c2s->send_message(m1), 0); { - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } @@ -776,9 +764,8 @@ TEST_P(MessengerTest, ReconnectRaceTest) { srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE); { - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } @@ -808,9 +795,8 @@ TEST_P(MessengerTest, SimpleTest) { server_msgr->get_myaddrs()); { ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); @@ -834,9 +820,8 @@ TEST_P(MessengerTest, SimpleTest) { { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); @@ -860,9 +845,8 @@ TEST_P(MessengerTest, SimpleTest) { { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } srv_dispatcher.loopback = false; @@ -896,9 +880,8 @@ TEST_P(MessengerTest, SimpleMsgr2Test) { server_msgr->get_myaddrs()); { ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); @@ -923,9 +906,8 @@ TEST_P(MessengerTest, SimpleMsgr2Test) { { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); @@ -949,9 +931,8 @@ TEST_P(MessengerTest, SimpleMsgr2Test) { { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } srv_dispatcher.loopback = false; @@ -978,9 +959,8 @@ TEST_P(MessengerTest, NameAddrTest) { server_msgr->get_myaddrs()); { ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); @@ -1044,9 +1024,8 @@ TEST_P(MessengerTest, FeatureTest) { { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); @@ -1075,9 +1054,8 @@ TEST_P(MessengerTest, TimeoutTest) { server_msgr->get_myaddrs()); { ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); @@ -1118,9 +1096,8 @@ TEST_P(MessengerTest, StatefulTest) { { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); @@ -1137,18 +1114,16 @@ TEST_P(MessengerTest, StatefulTest) { { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_conn = server_msgr->connect_to(client_msgr->get_mytype(), srv_dispatcher.last_accept); { - Mutex::Locker l(srv_dispatcher.lock); - while (!srv_dispatcher.got_remote_reset) - srv_dispatcher.cond.Wait(srv_dispatcher.lock); + std::unique_lock l{srv_dispatcher.lock}; + srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_remote_reset; }); } // 2. test for client reconnect @@ -1160,15 +1135,13 @@ TEST_P(MessengerTest, StatefulTest) { ASSERT_FALSE(server_conn->is_connected()); // ensure client detect server socket closed { - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_remote_reset) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; }); cli_dispatcher.got_remote_reset = false; } { - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_connect) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; }); cli_dispatcher.got_connect = false; } CHECK_AND_WAIT_TRUE(conn->is_connected()); @@ -1178,9 +1151,8 @@ TEST_P(MessengerTest, StatefulTest) { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); ASSERT_TRUE(conn->is_connected()); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } // resetcheck happen @@ -1218,9 +1190,8 @@ TEST_P(MessengerTest, StatelessTest) { { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); @@ -1233,9 +1204,8 @@ TEST_P(MessengerTest, StatelessTest) { { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); @@ -1243,9 +1213,8 @@ TEST_P(MessengerTest, StatelessTest) { srv_dispatcher.last_accept); // server lose state { - Mutex::Locker l(srv_dispatcher.lock); - while (!srv_dispatcher.got_new) - srv_dispatcher.cond.Wait(srv_dispatcher.lock); + std::unique_lock l{srv_dispatcher.lock}; + srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); } ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); @@ -1260,9 +1229,8 @@ TEST_P(MessengerTest, StatelessTest) { { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); @@ -1295,9 +1263,8 @@ TEST_P(MessengerTest, ClientStandbyTest) { { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); @@ -1316,21 +1283,18 @@ TEST_P(MessengerTest, ClientStandbyTest) { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); { - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_remote_reset) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; }); cli_dispatcher.got_remote_reset = false; - while (!cli_dispatcher.got_connect) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; }); cli_dispatcher.got_connect = false; } CHECK_AND_WAIT_TRUE(conn->is_connected()); ASSERT_TRUE(conn->is_connected()); m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); @@ -1364,9 +1328,8 @@ TEST_P(MessengerTest, AuthTest) { server_msgr->get_myaddrs()); { ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); @@ -1383,9 +1346,8 @@ TEST_P(MessengerTest, AuthTest) { { MPing *m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); @@ -1427,11 +1389,8 @@ TEST_P(MessengerTest, MessageTest) { MCommand *m = new MCommand(uuid); m->cmd = cmds; conn->send_message(m); - utime_t t; - t += 1000*1000*500; - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait_for(l, 500s, [&] { return cli_dispatcher.got_new; }); ASSERT_TRUE(cli_dispatcher.got_new); cli_dispatcher.got_new = false; } @@ -1447,9 +1406,8 @@ TEST_P(MessengerTest, MessageTest) { conn->send_message(m); utime_t t; t += 1000*1000*500; - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); ASSERT_TRUE(cli_dispatcher.got_new); cli_dispatcher.got_new = false; } @@ -1492,8 +1450,8 @@ ostream& operator<<(ostream& out, const Payload &pl) class SyntheticDispatcher : public Dispatcher { public: - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("SyntheticDispatcher::lock"); + ceph::condition_variable cond; bool is_server; bool got_new; bool got_remote_reset; @@ -1504,7 +1462,7 @@ class SyntheticDispatcher : public Dispatcher { SyntheticWorkload *workload; SyntheticDispatcher(bool s, SyntheticWorkload *wl): - Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false), + Dispatcher(g_ceph_context), is_server(s), got_new(false), got_remote_reset(false), got_connect(false), index(0), workload(wl) { } bool ms_can_fast_dispatch_any() const override { return true; } @@ -1519,30 +1477,30 @@ class SyntheticDispatcher : public Dispatcher { } void ms_handle_fast_connect(Connection *con) override { - Mutex::Locker l(lock); + std::lock_guard l{lock}; list c = conn_sent[con]; for (list::iterator it = c.begin(); it != c.end(); ++it) sent.erase(*it); conn_sent.erase(con); got_connect = true; - cond.Signal(); + cond.notify_all(); } void ms_handle_fast_accept(Connection *con) override { - Mutex::Locker l(lock); + std::lock_guard l{lock}; list c = conn_sent[con]; for (list::iterator it = c.begin(); it != c.end(); ++it) sent.erase(*it); conn_sent.erase(con); - cond.Signal(); + cond.notify_all(); } bool ms_dispatch(Message *m) override { ceph_abort(); } bool ms_handle_reset(Connection *con) override; void ms_handle_remote_reset(Connection *con) override { - Mutex::Locker l(lock); + std::lock_guard l{lock}; list c = conn_sent[con]; for (list::iterator it = c.begin(); it != c.end(); ++it) @@ -1567,11 +1525,11 @@ class SyntheticDispatcher : public Dispatcher { lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; reply_message(m, pl); m->put(); - Mutex::Locker l(lock); + std::lock_guard l{lock}; got_new = true; - cond.Signal(); + cond.notify_all(); } else { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (sent.count(pl.seq)) { lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq); @@ -1581,7 +1539,7 @@ class SyntheticDispatcher : public Dispatcher { } m->put(); got_new = true; - cond.Signal(); + cond.notify_all(); } } @@ -1606,7 +1564,7 @@ class SyntheticDispatcher : public Dispatcher { encode(pl, bl); m->set_data(bl); if (!con->get_messenger()->get_default_policy().lossy) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; sent[pl.seq] = pl.data; conn_sent[con].push_back(pl.seq); } @@ -1615,12 +1573,12 @@ class SyntheticDispatcher : public Dispatcher { } uint64_t get_pending() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return sent.size(); } void clear_pending(ConnectionRef con) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; for (list::iterator it = conn_sent[con].begin(); it != conn_sent[con].end(); ++it) @@ -1639,8 +1597,8 @@ class SyntheticDispatcher : public Dispatcher { class SyntheticWorkload { - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("SyntheticWorkload::lock"); + ceph::condition_variable cond; set available_servers; set available_clients; Messenger::Policy client_policy; @@ -1657,8 +1615,7 @@ class SyntheticWorkload { SyntheticWorkload(int servers, int clients, string type, int random_num, Messenger::Policy srv_policy, Messenger::Policy cli_policy) - : lock("SyntheticWorkload::lock"), - client_policy(cli_policy), + : client_policy(cli_policy), dispatcher(false, this), rng(time(NULL)), dummy_auth(g_ceph_context) { @@ -1721,11 +1678,11 @@ class SyntheticWorkload { ConnectionRef _get_random_connection() { while (dispatcher.get_pending() > max_in_flight) { - lock.Unlock(); + lock.unlock(); usleep(500); - lock.Lock(); + lock.lock(); } - ceph_assert(lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(lock)); boost::uniform_int<> choose(0, available_connections.size() - 1); int index = choose(rng); map >::iterator i = available_connections.begin(); @@ -1738,7 +1695,7 @@ class SyntheticWorkload { } void generate_connection() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (!can_create_connection()) return ; @@ -1776,7 +1733,7 @@ class SyntheticWorkload { } void send_message() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; ConnectionRef conn = _get_random_connection(); boost::uniform_int<> true_false(0, 99); int val = true_false(rng); @@ -1796,7 +1753,7 @@ class SyntheticWorkload { } void drop_connection() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (available_connections.size() < 10) return; ConnectionRef conn = _get_random_connection(); @@ -1820,7 +1777,7 @@ class SyntheticWorkload { } void print_internal_state(bool detail=false) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; lderr(g_ceph_context) << "available_connections: " << available_connections.size() << " inflight messages: " << dispatcher.get_pending() << dendl; if (detail && !available_connections.empty()) { @@ -1860,7 +1817,7 @@ class SyntheticWorkload { } void handle_reset(Connection *con) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; available_connections.erase(con); dispatcher.clear_pending(con); } @@ -2075,12 +2032,12 @@ TEST_P(MessengerTest, SyntheticInjectTest4) { class MarkdownDispatcher : public Dispatcher { - Mutex lock; + ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock"); set conns; bool last_mark; public: std::atomic count = { 0 }; - explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"), + explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), last_mark(false) { } bool ms_can_fast_dispatch_any() const override { return false; } @@ -2095,16 +2052,16 @@ class MarkdownDispatcher : public Dispatcher { void ms_handle_fast_connect(Connection *con) override { lderr(g_ceph_context) << __func__ << " " << con << dendl; - Mutex::Locker l(lock); + std::lock_guard l{lock}; conns.insert(con); } void ms_handle_fast_accept(Connection *con) override { - Mutex::Locker l(lock); + std::lock_guard l{lock}; conns.insert(con); } bool ms_dispatch(Message *m) override { lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl; - Mutex::Locker l(lock); + std::lock_guard l{lock}; count++; conns.insert(m->get_connection()); if (conns.size() < 2 && !last_mark) { @@ -2128,13 +2085,13 @@ class MarkdownDispatcher : public Dispatcher { } bool ms_handle_reset(Connection *con) override { lderr(g_ceph_context) << __func__ << " " << con << dendl; - Mutex::Locker l(lock); + std::lock_guard l{lock}; conns.erase(con); usleep(rand() % 500); return true; } void ms_handle_remote_reset(Connection *con) override { - Mutex::Locker l(lock); + std::lock_guard l{lock}; conns.erase(con); lderr(g_ceph_context) << __func__ << " " << con << dendl; } -- 2.39.5