]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test/msgr: msgr2 unit tests using the protocol interceptor WIP
authorRicardo Dias <rdias@suse.com>
Mon, 18 Feb 2019 10:44:20 +0000 (10:44 +0000)
committerRicardo Dias <rdias@suse.com>
Wed, 20 Feb 2019 13:36:13 +0000 (13:36 +0000)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/test/msgr/test_msgr.cc

index 81910b4f6162a00a2569ac48fd173c20e9aa9bac..18cfb8e994af27bd0c2212a4f21cfb02eee99e30 100644 (file)
@@ -19,6 +19,8 @@
 #include <unistd.h>
 #include <stdlib.h>
 #include <time.h>
+#include <set>
+#include <list>
 #include "common/Mutex.h"
 #include "common/Cond.h"
 #include "common/ceph_argparse.h"
@@ -225,6 +227,573 @@ class FakeDispatcher : public Dispatcher {
 
 typedef FakeDispatcher::Session Session;
 
+struct TestInterceptor : public Interceptor {
+
+  bool step_waiting = false;
+  bool waiting = true;
+  std::map<Connection *, uint32_t> current_step;
+  std::map<Connection *, std::list<uint32_t>> step_history;
+  std::map<uint32_t, std::optional<ACTION>> decisions;
+  std::set<uint32_t> breakpoints;
+
+  uint32_t count_step(Connection *conn, uint32_t step) {
+    uint32_t count = 0;
+    for (auto s : step_history[conn]) {
+      if (s == step) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  void breakpoint(uint32_t step) {
+    breakpoints.insert(step);
+  }
+
+  void remove_bp(uint32_t step) {
+    breakpoints.erase(step);
+  }
+
+  Connection *wait(uint32_t step, Connection *conn=nullptr) {
+    std::unique_lock<std::mutex> l(lock);
+    while(true) {
+      if (conn) {
+        auto it = current_step.find(conn);
+        if (it != current_step.end()) {
+          if (it->second == step) {
+            break;
+          }
+        }
+      } else {
+        for (auto it : current_step) {
+          if (it.second == step) {
+            conn = it.first;
+            break; 
+          }
+        }
+        if (conn) {
+          break;
+        }
+      }
+      step_waiting = true;
+      cond_var.wait(l);
+    }
+    step_waiting = false;
+    return conn;
+  }
+
+  ACTION wait_for_decision(uint32_t step, std::unique_lock<std::mutex> &l) {
+    if (decisions[step]) {
+      return *(decisions[step]);
+    }
+    waiting = true;
+    while(waiting) {
+      cond_var.wait(l);
+    }
+    return *(decisions[step]);
+  }
+
+  void proceed(uint32_t step, ACTION decision) {
+    std::unique_lock<std::mutex> l(lock);
+    decisions[step] = decision;
+    if (waiting) {
+      waiting = false;
+      cond_var.notify_one();
+    }
+  }
+
+  ACTION intercept(Connection *conn, uint32_t step) override {
+    lderr(g_ceph_context) << __func__ << " conn(" << conn 
+                          << ") intercept called on step=" << step << dendl;
+
+    {
+      std::unique_lock<std::mutex> l(lock);
+      step_history[conn].push_back(step);
+      current_step[conn] = step;  
+      if (step_waiting) {
+        cond_var.notify_one();
+      }
+    }
+
+    std::unique_lock<std::mutex> l(lock);
+    ACTION decision = ACTION::CONTINUE;
+    if (breakpoints.find(step) != breakpoints.end()) {
+      lderr(g_ceph_context) << __func__ << " conn(" << conn 
+                            << ") pausing on step=" << step << dendl;
+      decision = wait_for_decision(step, l);
+    } else {
+      if (decisions[step]) {
+        decision = *(decisions[step]);
+      }
+    }
+    lderr(g_ceph_context) << __func__ << " conn(" << conn 
+                          << ") resuming step=" << step << " with decision=" 
+                          << decision << dendl;
+    decisions[step].reset();
+    return decision;
+  }
+
+};
+
+/**
+ * Scenario: A connects to B, and B connects to A at the same time.
+ */ 
+TEST_P(MessengerTest, ConnectionRaceTest) {
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+  TestInterceptor *cli_interceptor = new TestInterceptor();
+  TestInterceptor *srv_interceptor = new TestInterceptor();
+
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0));
+  server_msgr->interceptor = srv_interceptor;
+
+  client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0));
+  client_msgr->interceptor = cli_interceptor;
+
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1:3300");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+
+  bind_addr.parse("v2:127.0.0.1:3301");
+  client_msgr->bind(bind_addr);
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  // pause before sending client_ident message
+  cli_interceptor->breakpoint(11);
+  // pause before sending client_ident message
+  srv_interceptor->breakpoint(11);
+
+  ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                                           server_msgr->get_myaddrs());
+  MPing *m1 = new MPing();
+  ASSERT_EQ(c2s->send_message(m1), 0);
+
+  ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
+                                                                           client_msgr->get_myaddrs());
+  MPing *m2 = new MPing();
+  ASSERT_EQ(s2c->send_message(m2), 0);
+
+  cli_interceptor->wait(11, c2s.get());
+  srv_interceptor->wait(11, s2c.get());
+
+  // at this point both connections (A->B, B->A) are paused just before sending
+  // the client_ident message.
+
+  cli_interceptor->remove_bp(11);
+  srv_interceptor->remove_bp(11);
+
+  cli_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
+  srv_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+
+  {
+    Mutex::Locker l(srv_dispatcher.lock);
+    while (!srv_dispatcher.got_new)
+      srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+    srv_dispatcher.got_new = false;
+  }
+  
+  ASSERT_TRUE(s2c->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
+  ASSERT_TRUE(s2c->peer_is_client());
+
+  ASSERT_TRUE(c2s->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s->peer_is_osd());
+
+  client_msgr->shutdown();
+  client_msgr->wait();
+  server_msgr->shutdown();
+  server_msgr->wait();
+
+  delete cli_interceptor;
+  delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ *    - A connects to B
+ *    - A sends client_ident to B
+ *    - B fails before sending server_ident to A
+ *    - A reconnects
+ */ 
+TEST_P(MessengerTest, MissingServerIdenTest) {
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+  TestInterceptor *cli_interceptor = new TestInterceptor();
+  TestInterceptor *srv_interceptor = new TestInterceptor();
+
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
+  server_msgr->interceptor = srv_interceptor;
+
+  client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0));
+  client_msgr->interceptor = cli_interceptor;
+
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1:3300");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+
+  bind_addr.parse("v2:127.0.0.1:3301");
+  client_msgr->bind(bind_addr);
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  // pause before sending client_ident message
+  srv_interceptor->breakpoint(12);
+
+  ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                                           server_msgr->get_myaddrs());
+  MPing *m1 = new MPing();
+  ASSERT_EQ(c2s->send_message(m1), 0);
+
+  Connection *c2s_accepter = srv_interceptor->wait(12);
+  srv_interceptor->remove_bp(12);
+
+  // We inject a message from this side of the connection to force it to be
+  // in standby when we inject the failure below
+  MPing *m2 = new MPing();
+  ASSERT_EQ(c2s_accepter->send_message(m2), 0);
+
+  srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
+
+  {
+    Mutex::Locker l(srv_dispatcher.lock);
+    while (!srv_dispatcher.got_new)
+      srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+    srv_dispatcher.got_new = false;
+  }
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+  
+  ASSERT_TRUE(c2s->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s->peer_is_osd());
+
+  ASSERT_TRUE(c2s_accepter->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s_accepter->peer_is_client());
+
+  client_msgr->shutdown();
+  client_msgr->wait();
+  server_msgr->shutdown();
+  server_msgr->wait();
+
+  delete cli_interceptor;
+  delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ *    - A connects to B
+ *    - A sends client_ident to B
+ *    - B fails before sending server_ident to A
+ *    - A goes to standby
+ *    - B reconnects to A
+ */ 
+TEST_P(MessengerTest, MissingServerIdenTest2) {
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+  TestInterceptor *cli_interceptor = new TestInterceptor();
+  TestInterceptor *srv_interceptor = new TestInterceptor();
+
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
+  server_msgr->interceptor = srv_interceptor;
+
+  client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
+  client_msgr->interceptor = cli_interceptor;
+
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1:3300");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+
+  bind_addr.parse("v2:127.0.0.1:3301");
+  client_msgr->bind(bind_addr);
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  // pause before sending client_ident message
+  srv_interceptor->breakpoint(12);
+
+  ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                                           server_msgr->get_myaddrs());
+
+  Connection *c2s_accepter = srv_interceptor->wait(12);
+  srv_interceptor->remove_bp(12);
+
+  // We inject a message from this side of the connection to force it to be
+  // in standby when we inject the failure below
+  MPing *m2 = new MPing();
+  ASSERT_EQ(c2s_accepter->send_message(m2), 0);
+
+  srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+  
+  ASSERT_TRUE(c2s->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s->peer_is_osd());
+
+  ASSERT_TRUE(c2s_accepter->is_connected());
+  ASSERT_EQ(0u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s_accepter->peer_is_client());
+
+  client_msgr->shutdown();
+  client_msgr->wait();
+  server_msgr->shutdown();
+  server_msgr->wait();
+
+  delete cli_interceptor;
+  delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ *    - A connects to B
+ *    - A and B exchange messages
+ *    - A fails
+ *    - B goes into standby
+ *    - A reconnects
+ */ 
+TEST_P(MessengerTest, ReconnectTest) {
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+
+  TestInterceptor *cli_interceptor = new TestInterceptor();
+  TestInterceptor *srv_interceptor = new TestInterceptor();
+
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
+  server_msgr->interceptor = srv_interceptor;
+
+  client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
+  client_msgr->interceptor = cli_interceptor;
+
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1:3300");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+
+  bind_addr.parse("v2:127.0.0.1:3301");
+  client_msgr->bind(bind_addr);
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                                           server_msgr->get_myaddrs());
+
+  MPing *m1 = new MPing();
+  ASSERT_EQ(c2s->send_message(m1), 0);
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+
+  ASSERT_TRUE(c2s->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s->peer_is_osd());
+
+  cli_interceptor->breakpoint(16);
+  
+  MPing *m2 = new MPing();
+  ASSERT_EQ(c2s->send_message(m2), 0);
+
+  cli_interceptor->wait(16, c2s.get());
+  cli_interceptor->remove_bp(16);
+
+  // at this point client and server are connected together
+
+  srv_interceptor->breakpoint(15);
+
+  // failing client
+  cli_interceptor->proceed(16, Interceptor::ACTION::FAIL);
+
+  MPing *m3 = new MPing();
+  ASSERT_EQ(c2s->send_message(m3), 0);
+
+  Connection *c2s_accepter = srv_interceptor->wait(15);
+  // the srv end of theconnection is now paused at ready
+  // this means that the reconnect was successful
+  srv_interceptor->remove_bp(15);
+
+  ASSERT_TRUE(c2s_accepter->peer_is_client());
+  // c2s_accepter sent 0 reconnect messages
+  ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 13), 0u);
+  // c2s_accepter sent 1 reconnect_ok messages
+  ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 14), 1u);
+  // c2s sent 1 reconnect messages
+  ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 13), 1u);
+  // c2s sent 0 reconnect_ok messages
+  ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 14), 0u);
+
+  srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+
+  client_msgr->shutdown();
+  client_msgr->wait();
+  server_msgr->shutdown();
+  server_msgr->wait();
+
+  delete cli_interceptor;
+  delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ *    - A connects to B
+ *    - A and B exchange messages
+ *    - A fails
+ *    - A reconnects // B reconnects
+ */ 
+TEST_P(MessengerTest, ReconnectRaceTest) {
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+
+  TestInterceptor *cli_interceptor = new TestInterceptor();
+  TestInterceptor *srv_interceptor = new TestInterceptor();
+
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
+  server_msgr->interceptor = srv_interceptor;
+
+  client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
+  client_msgr->interceptor = cli_interceptor;
+
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1:3300");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+
+  bind_addr.parse("v2:127.0.0.1:3301");
+  client_msgr->bind(bind_addr);
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                                           server_msgr->get_myaddrs());
+
+  MPing *m1 = new MPing();
+  ASSERT_EQ(c2s->send_message(m1), 0);
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+
+  ASSERT_TRUE(c2s->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s->peer_is_osd());
+
+  cli_interceptor->breakpoint(16);
+  
+  MPing *m2 = new MPing();
+  ASSERT_EQ(c2s->send_message(m2), 0);
+
+  cli_interceptor->wait(16, c2s.get());
+  cli_interceptor->remove_bp(16);
+
+  // at this point client and server are connected together
+
+  // force both client and server to race on reconnect
+  cli_interceptor->breakpoint(13);
+  srv_interceptor->breakpoint(13);
+
+  // failing client
+  // this will cause both client and server to reconnect at the same time
+  cli_interceptor->proceed(16, Interceptor::ACTION::FAIL);
+
+  MPing *m3 = new MPing();
+  ASSERT_EQ(c2s->send_message(m3), 0);
+
+  cli_interceptor->wait(13, c2s.get());
+  srv_interceptor->wait(13);
+
+  cli_interceptor->remove_bp(13);
+  srv_interceptor->remove_bp(13);
+
+  // pause on "ready"
+  srv_interceptor->breakpoint(15);
+
+  cli_interceptor->proceed(13, Interceptor::ACTION::CONTINUE);
+  srv_interceptor->proceed(13, Interceptor::ACTION::CONTINUE);
+
+  Connection *c2s_accepter = srv_interceptor->wait(15);
+
+  // the server has reconnected and is "ready"
+  srv_interceptor->remove_bp(15);
+
+  ASSERT_TRUE(c2s_accepter->peer_is_client());
+  ASSERT_TRUE(c2s->peer_is_osd());
+  
+  // the server should win the reconnect race
+
+  // c2s_accepter sent 1 or 2 reconnect messages
+  ASSERT_LT(srv_interceptor->count_step(c2s_accepter, 13), 3u);
+  ASSERT_GT(srv_interceptor->count_step(c2s_accepter, 13), 0u);
+  // c2s_accepter sent 0 reconnect_ok messages
+  ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 14), 0u);
+  // c2s sent 1 reconnect messages
+  ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 13), 1u);
+  // c2s sent 1 reconnect_ok messages
+  ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 14), 1u);
+
+  if (srv_interceptor->count_step(c2s_accepter, 13) == 2) {
+    // if the server send the reconnect message two times then
+    // the client must have sent a session retry message to the server
+    ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 18), 1u);
+  } else {
+    ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 18), 0u);
+  }
+
+  srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+
+  client_msgr->shutdown();
+  client_msgr->wait();
+  server_msgr->shutdown();
+  server_msgr->wait();
+
+  delete cli_interceptor;
+  delete srv_interceptor;
+}
+
 TEST_P(MessengerTest, SimpleTest) {
   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;