From 9783a5cb64508003972923ac37dca8bb4c6e62c6 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Sun, 7 Dec 2014 02:51:47 +0800 Subject: [PATCH] test/msgr/test_msgr: Fix potential unsafe cond wakeup and wrap check Signed-off-by: Haomai Wang --- src/test/Makefile.am | 10 +-- src/test/msgr/test_msgr.cc | 123 +++++++++++++++++++++++++++++-------- 2 files changed, 102 insertions(+), 31 deletions(-) diff --git a/src/test/Makefile.am b/src/test/Makefile.am index 93a7866bd3c56..ebfbecbac013b 100644 --- a/src/test/Makefile.am +++ b/src/test/Makefile.am @@ -26,11 +26,6 @@ ceph_test_rewrite_latency_SOURCES = test/test_rewrite_latency.cc ceph_test_rewrite_latency_LDADD = $(LIBCOMMON) $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS) bin_DEBUGPROGRAMS += ceph_test_rewrite_latency -ceph_test_msgr_SOURCES = test/msgr/test_msgr.cc -ceph_test_msgr_LDADD = $(LIBOS) $(UNITTEST_LDADD) $(CEPH_GLOBAL) -ceph_test_msgr_CXXFLAGS = $(UNITTEST_CXXFLAGS) -bin_DEBUGPROGRAMS += ceph_test_msgr - ceph_test_async_driver_SOURCES = test/msgr/test_async_driver.cc ceph_test_async_driver_LDADD = $(LIBOS) $(UNITTEST_LDADD) $(CEPH_GLOBAL) ceph_test_async_driver_CXXFLAGS = $(UNITTEST_CXXFLAGS) @@ -626,6 +621,11 @@ unittest_readahead_LDADD = $(UNITTEST_LDADD) $(CEPH_GLOBAL) unittest_readahead_CXXFLAGS = $(UNITTEST_CXXFLAGS) -O2 check_PROGRAMS += unittest_readahead +unittest_msgr_SOURCES = test/msgr/test_msgr.cc +unittest_msgr_LDADD = $(UNITTEST_LDADD) $(CEPH_GLOBAL) +unittest_msgr_CXXFLAGS = $(UNITTEST_CXXFLAGS) +check_PROGRAMS += unittest_msgr + check_SCRIPTS += test/pybind/test_ceph_argparse.py diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index a42943fb6a92d..5996d213a5c72 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -32,6 +32,15 @@ #if GTEST_HAS_PARAM_TEST +#define CHECK_AND_WAIT_TRUE(expr) do { \ + int n = 10; \ + while (--n) { \ + if (expr) \ + break; \ + usleep(100); \ + } \ +} while(0); + class MessengerTest : public ::testing::TestWithParam { public: Messenger *server_msgr; @@ -49,6 +58,7 @@ class MessengerTest : public ::testing::TestWithParam { delete server_msgr; delete client_msgr; } + }; @@ -67,8 +77,13 @@ class FakeDispatcher : public Dispatcher { Mutex lock; Cond cond; bool is_server; + bool got_new; + bool got_remote_reset; + bool got_connect; - FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"), is_server(s) {} + FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"), + is_server(s), got_new(false), got_remote_reset(false), + got_connect(false) {} bool ms_can_fast_dispatch_any() const { return true; } bool ms_can_fast_dispatch(Message *m) const { switch (m->get_type()) { @@ -84,25 +99,37 @@ class FakeDispatcher : public Dispatcher { Session *s = static_cast(con->get_priv()); if (!s) { s = new Session(con); - con->set_priv(s); + con->set_priv(s->get()); cerr << __func__ << " con: " << con << " count: " << s->count << std::endl; } + s->put(); + lock.Lock(); + got_connect = true; + cond.Signal(); + lock.Unlock(); } void ms_handle_fast_accept(Connection *con) { Session *s = static_cast(con->get_priv()); if (!s) { s = new Session(con); - con->set_priv(s); + con->set_priv(s->get()); } + s->put(); } bool ms_dispatch(Message *m) { Session *s = static_cast(m->get_connection()->get_priv()); + if (!s) { + s = new Session(m->get_connection()); + m->get_connection()->set_priv(s->get()); + } + s->put(); Mutex::Locker l(s->lock); s->count++; cerr << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << std::endl; if (is_server) reply_message(m); lock.Lock(); + got_new = true; cond.Signal(); lock.Unlock(); return true; @@ -126,15 +153,22 @@ class FakeDispatcher : public Dispatcher { con->set_priv(NULL); // break ref <-> session cycle, if any s->put(); } + got_remote_reset = true; } void ms_fast_dispatch(Message *m) { Session *s = static_cast(m->get_connection()->get_priv()); + if (!s) { + s = new Session(m->get_connection()); + m->get_connection()->set_priv(s->get()); + } + s->put(); Mutex::Locker (s->lock); s->count++; cerr << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << std::endl; if (is_server) reply_message(m); lock.Lock(); + got_new = true; cond.Signal(); lock.Unlock(); } @@ -171,7 +205,9 @@ TEST_P(MessengerTest, SimpleTest) { { Mutex::Locker l(cli_dispatcher.lock); ASSERT_EQ(conn->send_message(m), 0); - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); ASSERT_TRUE((static_cast(conn->get_priv()))->get_count() == 1); @@ -189,7 +225,9 @@ TEST_P(MessengerTest, SimpleTest) { m = new MPing(); Mutex::Locker l(cli_dispatcher.lock); ASSERT_EQ(conn->send_message(m), 0); - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; } ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); @@ -203,8 +241,7 @@ TEST_P(MessengerTest, SimpleTest) { m = new MPing(); conn->send_message(m); - // sleep 0.3s is enough to judge connection failed? - usleep(300*1000); + CHECK_AND_WAIT_TRUE(!conn->is_connected()); ASSERT_FALSE(conn->is_connected()); // 5. loopback connection @@ -213,7 +250,9 @@ TEST_P(MessengerTest, SimpleTest) { m = new MPing(); Mutex::Locker l(cli_dispatcher.lock); ASSERT_EQ(conn->send_message(m), 0); - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; } ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); client_msgr->shutdown(); @@ -236,7 +275,9 @@ TEST_P(MessengerTest, NameAddrTest) { { Mutex::Locker l(cli_dispatcher.lock); ASSERT_EQ(conn->send_message(m), 0); - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; } ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr()); @@ -277,7 +318,7 @@ TEST_P(MessengerTest, FeatureTest) { MPing *m = new MPing(); ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); conn->send_message(m); - usleep(300*1000); + CHECK_AND_WAIT_TRUE(!conn->is_connected()); // should failed build a connection ASSERT_FALSE(conn->is_connected()); @@ -295,7 +336,9 @@ TEST_P(MessengerTest, FeatureTest) { m = new MPing(); Mutex::Locker l(cli_dispatcher.lock); ASSERT_EQ(conn->send_message(m), 0); - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; } ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); @@ -327,7 +370,9 @@ TEST_P(MessengerTest, StatefulTest) { m = new MPing(); Mutex::Locker l(cli_dispatcher.lock); ASSERT_EQ(conn->send_message(m), 0); - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; } ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); conn->mark_down(); @@ -341,31 +386,43 @@ TEST_P(MessengerTest, StatefulTest) { m = new MPing(); Mutex::Locker l(cli_dispatcher.lock); ASSERT_EQ(conn->send_message(m), 0); - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; } - // resetcheck happen ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); // 2. test for client reconnect + ASSERT_FALSE(cli_dispatcher.got_remote_reset); + cli_dispatcher.got_connect = false; server_conn->mark_down(); ASSERT_FALSE(server_conn->is_connected()); - // enough for client reconnect? - usleep(300*1000); + // ensure client detect server socket closed + { + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_connect) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_connect = false; + } + CHECK_AND_WAIT_TRUE(conn->is_connected()); ASSERT_TRUE(conn->is_connected()); - conn = client_msgr->get_connection(server_msgr->get_myinst()); { m = new MPing(); Mutex::Locker l(cli_dispatcher.lock); ASSERT_TRUE(conn->is_connected()); ASSERT_EQ(conn->send_message(m), 0); - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; } // resetcheck happen ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); + ASSERT_TRUE(cli_dispatcher.got_remote_reset); + cli_dispatcher.got_remote_reset = false; server_msgr->shutdown(); client_msgr->shutdown(); @@ -395,7 +452,9 @@ TEST_P(MessengerTest, StatelessTest) { m = new MPing(); Mutex::Locker l(cli_dispatcher.lock); ASSERT_EQ(conn->send_message(m), 0); - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; } ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); conn->mark_down(); @@ -406,7 +465,9 @@ TEST_P(MessengerTest, StatelessTest) { m = new MPing(); Mutex::Locker l(cli_dispatcher.lock); ASSERT_EQ(conn->send_message(m), 0); - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; } ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); @@ -416,14 +477,16 @@ TEST_P(MessengerTest, StatelessTest) { // 2. test for client lossy server_conn->mark_down(); ASSERT_FALSE(server_conn->is_connected()); - usleep(300*1000); + CHECK_AND_WAIT_TRUE(!conn->is_connected()); ASSERT_FALSE(conn->is_connected()); conn = client_msgr->get_connection(server_msgr->get_myinst()); { m = new MPing(); Mutex::Locker l(cli_dispatcher.lock); ASSERT_EQ(conn->send_message(m), 0); - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; } ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); @@ -455,10 +518,13 @@ TEST_P(MessengerTest, ClientStandbyTest) { m = new MPing(); Mutex::Locker l(cli_dispatcher.lock); ASSERT_EQ(conn->send_message(m), 0); - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; } ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); + ASSERT_FALSE(cli_dispatcher.got_remote_reset); server_conn->mark_down(); ASSERT_FALSE(server_conn->is_connected()); // client should be standby @@ -466,13 +532,18 @@ TEST_P(MessengerTest, ClientStandbyTest) { // client should be standby, so we use original connection { m = new MPing(); - Mutex::Locker l(cli_dispatcher.lock); conn->send_keepalive(); - usleep(300*1000); + CHECK_AND_WAIT_TRUE(conn->is_connected()); + ASSERT_TRUE(conn->is_connected()); + Mutex::Locker l(cli_dispatcher.lock); ASSERT_EQ(conn->send_message(m), 0); - cli_dispatcher.cond.Wait(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; } // resetcheck for client, so it discard state previously + ASSERT_TRUE(cli_dispatcher.got_remote_reset); + cli_dispatcher.got_remote_reset = false; ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); -- 2.39.5