]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/msgr/test_msgr: Fix potential unsafe cond wakeup and wrap check
authorHaomai Wang <haomaiwang@gmail.com>
Sat, 6 Dec 2014 18:51:47 +0000 (02:51 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 18 Dec 2014 14:50:57 +0000 (22:50 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/test/Makefile.am
src/test/msgr/test_msgr.cc

index 93a7866bd3c569b00e0544a03c33f2d50798d3fe..ebfbecbac013bc8ae8c32af384f9b51e1862d761 100644 (file)
@@ -26,11 +26,6 @@ ceph_test_rewrite_latency_SOURCES = test/test_rewrite_latency.cc
 ceph_test_rewrite_latency_LDADD = $(LIBCOMMON) $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS)
 bin_DEBUGPROGRAMS += ceph_test_rewrite_latency
 
-ceph_test_msgr_SOURCES = test/msgr/test_msgr.cc
-ceph_test_msgr_LDADD = $(LIBOS) $(UNITTEST_LDADD) $(CEPH_GLOBAL)
-ceph_test_msgr_CXXFLAGS = $(UNITTEST_CXXFLAGS)
-bin_DEBUGPROGRAMS += ceph_test_msgr
-
 ceph_test_async_driver_SOURCES = test/msgr/test_async_driver.cc
 ceph_test_async_driver_LDADD = $(LIBOS) $(UNITTEST_LDADD) $(CEPH_GLOBAL)
 ceph_test_async_driver_CXXFLAGS = $(UNITTEST_CXXFLAGS)
@@ -626,6 +621,11 @@ unittest_readahead_LDADD = $(UNITTEST_LDADD) $(CEPH_GLOBAL)
 unittest_readahead_CXXFLAGS = $(UNITTEST_CXXFLAGS) -O2
 check_PROGRAMS += unittest_readahead
 
+unittest_msgr_SOURCES = test/msgr/test_msgr.cc
+unittest_msgr_LDADD = $(UNITTEST_LDADD) $(CEPH_GLOBAL)
+unittest_msgr_CXXFLAGS = $(UNITTEST_CXXFLAGS)
+check_PROGRAMS += unittest_msgr
+
 
 check_SCRIPTS += test/pybind/test_ceph_argparse.py
 
index a42943fb6a92df97193666cb2cccfcec1ccbfa9b..5996d213a5c72985917c6526d4bf3818b6f9db10 100644 (file)
 
 #if GTEST_HAS_PARAM_TEST
 
+#define CHECK_AND_WAIT_TRUE(expr) do {  \
+  int n = 10;                           \
+  while (--n) {                         \
+    if (expr)                           \
+      break;                            \
+    usleep(100);                        \
+  }                                     \
+} while(0);
+
 class MessengerTest : public ::testing::TestWithParam<const char*> {
  public:
   Messenger *server_msgr;
@@ -49,6 +58,7 @@ class MessengerTest : public ::testing::TestWithParam<const char*> {
     delete server_msgr;
     delete client_msgr;
   }
+
 };
 
 
@@ -67,8 +77,13 @@ class FakeDispatcher : public Dispatcher {
   Mutex lock;
   Cond cond;
   bool is_server;
+  bool got_new;
+  bool got_remote_reset;
+  bool got_connect;
 
-  FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"), is_server(s) {}
+  FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"),
+                          is_server(s), got_new(false), got_remote_reset(false),
+                          got_connect(false) {}
   bool ms_can_fast_dispatch_any() const { return true; }
   bool ms_can_fast_dispatch(Message *m) const {
     switch (m->get_type()) {
@@ -84,25 +99,37 @@ class FakeDispatcher : public Dispatcher {
     Session *s = static_cast<Session*>(con->get_priv());
     if (!s) {
       s = new Session(con);
-      con->set_priv(s);
+      con->set_priv(s->get());
       cerr << __func__ << " con: " << con << " count: " << s->count << std::endl;
     }
+    s->put();
+    lock.Lock();
+    got_connect = true;
+    cond.Signal();
+    lock.Unlock();
   }
   void ms_handle_fast_accept(Connection *con) {
     Session *s = static_cast<Session*>(con->get_priv());
     if (!s) {
       s = new Session(con);
-      con->set_priv(s);
+      con->set_priv(s->get());
     }
+    s->put();
   }
   bool ms_dispatch(Message *m) {
     Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+    if (!s) {
+      s = new Session(m->get_connection());
+      m->get_connection()->set_priv(s->get());
+    }
+    s->put();
     Mutex::Locker l(s->lock);
     s->count++;
     cerr << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << std::endl;
     if (is_server)
       reply_message(m);
     lock.Lock();
+    got_new = true;
     cond.Signal();
     lock.Unlock();
     return true;
@@ -126,15 +153,22 @@ class FakeDispatcher : public Dispatcher {
       con->set_priv(NULL);   // break ref <-> session cycle, if any
       s->put();
     }
+    got_remote_reset = true;
   }
   void ms_fast_dispatch(Message *m) {
     Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+    if (!s) {
+      s = new Session(m->get_connection());
+      m->get_connection()->set_priv(s->get());
+    }
+    s->put();
     Mutex::Locker (s->lock);
     s->count++;
     cerr << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << std::endl;
     if (is_server)
       reply_message(m);
     lock.Lock();
+    got_new = true;
     cond.Signal();
     lock.Unlock();
   }
@@ -171,7 +205,9 @@ TEST_P(MessengerTest, SimpleTest) {
   {
     Mutex::Locker l(cli_dispatcher.lock);
     ASSERT_EQ(conn->send_message(m), 0);
-    cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
   ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
@@ -189,7 +225,9 @@ TEST_P(MessengerTest, SimpleTest) {
     m = new MPing();
     Mutex::Locker l(cli_dispatcher.lock);
     ASSERT_EQ(conn->send_message(m), 0);
-    cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
 
@@ -203,8 +241,7 @@ TEST_P(MessengerTest, SimpleTest) {
 
   m = new MPing();
   conn->send_message(m);
-  // sleep 0.3s is enough to judge connection failed?
-  usleep(300*1000);
+  CHECK_AND_WAIT_TRUE(!conn->is_connected());
   ASSERT_FALSE(conn->is_connected());
 
   // 5. loopback connection
@@ -213,7 +250,9 @@ TEST_P(MessengerTest, SimpleTest) {
     m = new MPing();
     Mutex::Locker l(cli_dispatcher.lock);
     ASSERT_EQ(conn->send_message(m), 0);
-    cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
   client_msgr->shutdown();
@@ -236,7 +275,9 @@ TEST_P(MessengerTest, NameAddrTest) {
   {
     Mutex::Locker l(cli_dispatcher.lock);
     ASSERT_EQ(conn->send_message(m), 0);
-    cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
   ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr());
@@ -277,7 +318,7 @@ TEST_P(MessengerTest, FeatureTest) {
   MPing *m = new MPing();
   ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
   conn->send_message(m);
-  usleep(300*1000);
+  CHECK_AND_WAIT_TRUE(!conn->is_connected());
   // should failed build a connection
   ASSERT_FALSE(conn->is_connected());
 
@@ -295,7 +336,9 @@ TEST_P(MessengerTest, FeatureTest) {
     m = new MPing();
     Mutex::Locker l(cli_dispatcher.lock);
     ASSERT_EQ(conn->send_message(m), 0);
-    cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
 
@@ -327,7 +370,9 @@ TEST_P(MessengerTest, StatefulTest) {
     m = new MPing();
     Mutex::Locker l(cli_dispatcher.lock);
     ASSERT_EQ(conn->send_message(m), 0);
-    cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
   conn->mark_down();
@@ -341,31 +386,43 @@ TEST_P(MessengerTest, StatefulTest) {
     m = new MPing();
     Mutex::Locker l(cli_dispatcher.lock);
     ASSERT_EQ(conn->send_message(m), 0);
-    cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
   }
-  // resetcheck happen
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
 
   // 2. test for client reconnect
+  ASSERT_FALSE(cli_dispatcher.got_remote_reset);
+  cli_dispatcher.got_connect = false;
   server_conn->mark_down();
   ASSERT_FALSE(server_conn->is_connected());
-  // enough for client reconnect?
-  usleep(300*1000);
+  // ensure client detect server socket closed
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_connect)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_connect = false;
+  }
+  CHECK_AND_WAIT_TRUE(conn->is_connected());
   ASSERT_TRUE(conn->is_connected());
-  conn = client_msgr->get_connection(server_msgr->get_myinst());
   {
     m = new MPing();
     Mutex::Locker l(cli_dispatcher.lock);
     ASSERT_TRUE(conn->is_connected());
     ASSERT_EQ(conn->send_message(m), 0);
-    cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
   }
   // resetcheck happen
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
+  ASSERT_TRUE(cli_dispatcher.got_remote_reset);
+  cli_dispatcher.got_remote_reset = false;
 
   server_msgr->shutdown();
   client_msgr->shutdown();
@@ -395,7 +452,9 @@ TEST_P(MessengerTest, StatelessTest) {
     m = new MPing();
     Mutex::Locker l(cli_dispatcher.lock);
     ASSERT_EQ(conn->send_message(m), 0);
-    cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
   conn->mark_down();
@@ -406,7 +465,9 @@ TEST_P(MessengerTest, StatelessTest) {
     m = new MPing();
     Mutex::Locker l(cli_dispatcher.lock);
     ASSERT_EQ(conn->send_message(m), 0);
-    cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
@@ -416,14 +477,16 @@ TEST_P(MessengerTest, StatelessTest) {
   // 2. test for client lossy
   server_conn->mark_down();
   ASSERT_FALSE(server_conn->is_connected());
-  usleep(300*1000);
+  CHECK_AND_WAIT_TRUE(!conn->is_connected());
   ASSERT_FALSE(conn->is_connected());
   conn = client_msgr->get_connection(server_msgr->get_myinst());
   {
     m = new MPing();
     Mutex::Locker l(cli_dispatcher.lock);
     ASSERT_EQ(conn->send_message(m), 0);
-    cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
 
@@ -455,10 +518,13 @@ TEST_P(MessengerTest, ClientStandbyTest) {
     m = new MPing();
     Mutex::Locker l(cli_dispatcher.lock);
     ASSERT_EQ(conn->send_message(m), 0);
-    cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+  ASSERT_FALSE(cli_dispatcher.got_remote_reset);
   server_conn->mark_down();
   ASSERT_FALSE(server_conn->is_connected());
   // client should be standby
@@ -466,13 +532,18 @@ TEST_P(MessengerTest, ClientStandbyTest) {
   // client should be standby, so we use original connection
   {
     m = new MPing();
-    Mutex::Locker l(cli_dispatcher.lock);
     conn->send_keepalive();
-    usleep(300*1000);
+    CHECK_AND_WAIT_TRUE(conn->is_connected());
+    ASSERT_TRUE(conn->is_connected());
+    Mutex::Locker l(cli_dispatcher.lock);
     ASSERT_EQ(conn->send_message(m), 0);
-    cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
   }
   // resetcheck for client, so it discard state previously
+  ASSERT_TRUE(cli_dispatcher.got_remote_reset);
+  cli_dispatcher.got_remote_reset = false;
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);