From: Ricardo Dias Date: Mon, 18 Feb 2019 10:44:20 +0000 (+0000) Subject: test/msgr: msgr2 unit tests using the protocol interceptor WIP X-Git-Tag: v14.1.0~14^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f22d58318490587a3d37ab1a48db26445593849e;p=ceph.git test/msgr: msgr2 unit tests using the protocol interceptor WIP Signed-off-by: Ricardo Dias --- diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 81910b4f6162..18cfb8e994af 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -19,6 +19,8 @@ #include #include #include +#include +#include #include "common/Mutex.h" #include "common/Cond.h" #include "common/ceph_argparse.h" @@ -225,6 +227,573 @@ class FakeDispatcher : public Dispatcher { typedef FakeDispatcher::Session Session; +struct TestInterceptor : public Interceptor { + + bool step_waiting = false; + bool waiting = true; + std::map current_step; + std::map> step_history; + std::map> decisions; + std::set breakpoints; + + uint32_t count_step(Connection *conn, uint32_t step) { + uint32_t count = 0; + for (auto s : step_history[conn]) { + if (s == step) { + count++; + } + } + return count; + } + + void breakpoint(uint32_t step) { + breakpoints.insert(step); + } + + void remove_bp(uint32_t step) { + breakpoints.erase(step); + } + + Connection *wait(uint32_t step, Connection *conn=nullptr) { + std::unique_lock l(lock); + while(true) { + if (conn) { + auto it = current_step.find(conn); + if (it != current_step.end()) { + if (it->second == step) { + break; + } + } + } else { + for (auto it : current_step) { + if (it.second == step) { + conn = it.first; + break; + } + } + if (conn) { + break; + } + } + step_waiting = true; + cond_var.wait(l); + } + step_waiting = false; + return conn; + } + + ACTION wait_for_decision(uint32_t step, std::unique_lock &l) { + if (decisions[step]) { + return *(decisions[step]); + } + waiting = true; + while(waiting) { + cond_var.wait(l); + } + return *(decisions[step]); + } + + void proceed(uint32_t step, ACTION decision) { + std::unique_lock l(lock); + decisions[step] = decision; + if (waiting) { + waiting = false; + cond_var.notify_one(); + } + } + + ACTION intercept(Connection *conn, uint32_t step) override { + lderr(g_ceph_context) << __func__ << " conn(" << conn + << ") intercept called on step=" << step << dendl; + + { + std::unique_lock l(lock); + step_history[conn].push_back(step); + current_step[conn] = step; + if (step_waiting) { + cond_var.notify_one(); + } + } + + std::unique_lock l(lock); + ACTION decision = ACTION::CONTINUE; + if (breakpoints.find(step) != breakpoints.end()) { + lderr(g_ceph_context) << __func__ << " conn(" << conn + << ") pausing on step=" << step << dendl; + decision = wait_for_decision(step, l); + } else { + if (decisions[step]) { + decision = *(decisions[step]); + } + } + lderr(g_ceph_context) << __func__ << " conn(" << conn + << ") resuming step=" << step << " with decision=" + << decision << dendl; + decisions[step].reset(); + return decision; + } + +}; + +/** + * Scenario: A connects to B, and B connects to A at the same time. + */ +TEST_P(MessengerTest, ConnectionRaceTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); + + TestInterceptor *cli_interceptor = new TestInterceptor(); + TestInterceptor *srv_interceptor = new TestInterceptor(); + + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0)); + server_msgr->interceptor = srv_interceptor; + + client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0)); + client_msgr->interceptor = cli_interceptor; + + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1:3300"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + bind_addr.parse("v2:127.0.0.1:3301"); + client_msgr->bind(bind_addr); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // pause before sending client_ident message + cli_interceptor->breakpoint(11); + // pause before sending client_ident message + srv_interceptor->breakpoint(11); + + ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + MPing *m1 = new MPing(); + ASSERT_EQ(c2s->send_message(m1), 0); + + ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(), + client_msgr->get_myaddrs()); + MPing *m2 = new MPing(); + ASSERT_EQ(s2c->send_message(m2), 0); + + cli_interceptor->wait(11, c2s.get()); + srv_interceptor->wait(11, s2c.get()); + + // at this point both connections (A->B, B->A) are paused just before sending + // the client_ident message. + + cli_interceptor->remove_bp(11); + srv_interceptor->remove_bp(11); + + cli_interceptor->proceed(11, Interceptor::ACTION::CONTINUE); + srv_interceptor->proceed(11, Interceptor::ACTION::CONTINUE); + + { + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; + } + + { + Mutex::Locker l(srv_dispatcher.lock); + while (!srv_dispatcher.got_new) + srv_dispatcher.cond.Wait(srv_dispatcher.lock); + srv_dispatcher.got_new = false; + } + + ASSERT_TRUE(s2c->is_connected()); + ASSERT_EQ(1u, static_cast(s2c->get_priv().get())->get_count()); + ASSERT_TRUE(s2c->peer_is_client()); + + ASSERT_TRUE(c2s->is_connected()); + ASSERT_EQ(1u, static_cast(c2s->get_priv().get())->get_count()); + ASSERT_TRUE(c2s->peer_is_osd()); + + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); + + delete cli_interceptor; + delete srv_interceptor; +} + +/** + * Scenario: + * - A connects to B + * - A sends client_ident to B + * - B fails before sending server_ident to A + * - A reconnects + */ +TEST_P(MessengerTest, MissingServerIdenTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); + + TestInterceptor *cli_interceptor = new TestInterceptor(); + TestInterceptor *srv_interceptor = new TestInterceptor(); + + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0)); + server_msgr->interceptor = srv_interceptor; + + client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0)); + client_msgr->interceptor = cli_interceptor; + + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1:3300"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + bind_addr.parse("v2:127.0.0.1:3301"); + client_msgr->bind(bind_addr); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // pause before sending client_ident message + srv_interceptor->breakpoint(12); + + ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + MPing *m1 = new MPing(); + ASSERT_EQ(c2s->send_message(m1), 0); + + Connection *c2s_accepter = srv_interceptor->wait(12); + srv_interceptor->remove_bp(12); + + // We inject a message from this side of the connection to force it to be + // in standby when we inject the failure below + MPing *m2 = new MPing(); + ASSERT_EQ(c2s_accepter->send_message(m2), 0); + + srv_interceptor->proceed(12, Interceptor::ACTION::FAIL); + + { + Mutex::Locker l(srv_dispatcher.lock); + while (!srv_dispatcher.got_new) + srv_dispatcher.cond.Wait(srv_dispatcher.lock); + srv_dispatcher.got_new = false; + } + + { + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; + } + + ASSERT_TRUE(c2s->is_connected()); + ASSERT_EQ(1u, static_cast(c2s->get_priv().get())->get_count()); + ASSERT_TRUE(c2s->peer_is_osd()); + + ASSERT_TRUE(c2s_accepter->is_connected()); + ASSERT_EQ(1u, static_cast(c2s_accepter->get_priv().get())->get_count()); + ASSERT_TRUE(c2s_accepter->peer_is_client()); + + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); + + delete cli_interceptor; + delete srv_interceptor; +} + +/** + * Scenario: + * - A connects to B + * - A sends client_ident to B + * - B fails before sending server_ident to A + * - A goes to standby + * - B reconnects to A + */ +TEST_P(MessengerTest, MissingServerIdenTest2) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); + + TestInterceptor *cli_interceptor = new TestInterceptor(); + TestInterceptor *srv_interceptor = new TestInterceptor(); + + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0)); + server_msgr->interceptor = srv_interceptor; + + client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); + client_msgr->interceptor = cli_interceptor; + + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1:3300"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + bind_addr.parse("v2:127.0.0.1:3301"); + client_msgr->bind(bind_addr); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // pause before sending client_ident message + srv_interceptor->breakpoint(12); + + ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + + Connection *c2s_accepter = srv_interceptor->wait(12); + srv_interceptor->remove_bp(12); + + // We inject a message from this side of the connection to force it to be + // in standby when we inject the failure below + MPing *m2 = new MPing(); + ASSERT_EQ(c2s_accepter->send_message(m2), 0); + + srv_interceptor->proceed(12, Interceptor::ACTION::FAIL); + + { + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; + } + + ASSERT_TRUE(c2s->is_connected()); + ASSERT_EQ(1u, static_cast(c2s->get_priv().get())->get_count()); + ASSERT_TRUE(c2s->peer_is_osd()); + + ASSERT_TRUE(c2s_accepter->is_connected()); + ASSERT_EQ(0u, static_cast(c2s_accepter->get_priv().get())->get_count()); + ASSERT_TRUE(c2s_accepter->peer_is_client()); + + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); + + delete cli_interceptor; + delete srv_interceptor; +} + +/** + * Scenario: + * - A connects to B + * - A and B exchange messages + * - A fails + * - B goes into standby + * - A reconnects + */ +TEST_P(MessengerTest, ReconnectTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + + TestInterceptor *cli_interceptor = new TestInterceptor(); + TestInterceptor *srv_interceptor = new TestInterceptor(); + + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0)); + server_msgr->interceptor = srv_interceptor; + + client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); + client_msgr->interceptor = cli_interceptor; + + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1:3300"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + bind_addr.parse("v2:127.0.0.1:3301"); + client_msgr->bind(bind_addr); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + + MPing *m1 = new MPing(); + ASSERT_EQ(c2s->send_message(m1), 0); + + { + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; + } + + ASSERT_TRUE(c2s->is_connected()); + ASSERT_EQ(1u, static_cast(c2s->get_priv().get())->get_count()); + ASSERT_TRUE(c2s->peer_is_osd()); + + cli_interceptor->breakpoint(16); + + MPing *m2 = new MPing(); + ASSERT_EQ(c2s->send_message(m2), 0); + + cli_interceptor->wait(16, c2s.get()); + cli_interceptor->remove_bp(16); + + // at this point client and server are connected together + + srv_interceptor->breakpoint(15); + + // failing client + cli_interceptor->proceed(16, Interceptor::ACTION::FAIL); + + MPing *m3 = new MPing(); + ASSERT_EQ(c2s->send_message(m3), 0); + + Connection *c2s_accepter = srv_interceptor->wait(15); + // the srv end of theconnection is now paused at ready + // this means that the reconnect was successful + srv_interceptor->remove_bp(15); + + ASSERT_TRUE(c2s_accepter->peer_is_client()); + // c2s_accepter sent 0 reconnect messages + ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 13), 0u); + // c2s_accepter sent 1 reconnect_ok messages + ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 14), 1u); + // c2s sent 1 reconnect messages + ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 13), 1u); + // c2s sent 0 reconnect_ok messages + ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 14), 0u); + + srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE); + + { + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; + } + + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); + + delete cli_interceptor; + delete srv_interceptor; +} + +/** + * Scenario: + * - A connects to B + * - A and B exchange messages + * - A fails + * - A reconnects // B reconnects + */ +TEST_P(MessengerTest, ReconnectRaceTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + + TestInterceptor *cli_interceptor = new TestInterceptor(); + TestInterceptor *srv_interceptor = new TestInterceptor(); + + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0)); + server_msgr->interceptor = srv_interceptor; + + client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); + client_msgr->interceptor = cli_interceptor; + + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1:3300"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + bind_addr.parse("v2:127.0.0.1:3301"); + client_msgr->bind(bind_addr); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + + MPing *m1 = new MPing(); + ASSERT_EQ(c2s->send_message(m1), 0); + + { + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; + } + + ASSERT_TRUE(c2s->is_connected()); + ASSERT_EQ(1u, static_cast(c2s->get_priv().get())->get_count()); + ASSERT_TRUE(c2s->peer_is_osd()); + + cli_interceptor->breakpoint(16); + + MPing *m2 = new MPing(); + ASSERT_EQ(c2s->send_message(m2), 0); + + cli_interceptor->wait(16, c2s.get()); + cli_interceptor->remove_bp(16); + + // at this point client and server are connected together + + // force both client and server to race on reconnect + cli_interceptor->breakpoint(13); + srv_interceptor->breakpoint(13); + + // failing client + // this will cause both client and server to reconnect at the same time + cli_interceptor->proceed(16, Interceptor::ACTION::FAIL); + + MPing *m3 = new MPing(); + ASSERT_EQ(c2s->send_message(m3), 0); + + cli_interceptor->wait(13, c2s.get()); + srv_interceptor->wait(13); + + cli_interceptor->remove_bp(13); + srv_interceptor->remove_bp(13); + + // pause on "ready" + srv_interceptor->breakpoint(15); + + cli_interceptor->proceed(13, Interceptor::ACTION::CONTINUE); + srv_interceptor->proceed(13, Interceptor::ACTION::CONTINUE); + + Connection *c2s_accepter = srv_interceptor->wait(15); + + // the server has reconnected and is "ready" + srv_interceptor->remove_bp(15); + + ASSERT_TRUE(c2s_accepter->peer_is_client()); + ASSERT_TRUE(c2s->peer_is_osd()); + + // the server should win the reconnect race + + // c2s_accepter sent 1 or 2 reconnect messages + ASSERT_LT(srv_interceptor->count_step(c2s_accepter, 13), 3u); + ASSERT_GT(srv_interceptor->count_step(c2s_accepter, 13), 0u); + // c2s_accepter sent 0 reconnect_ok messages + ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 14), 0u); + // c2s sent 1 reconnect messages + ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 13), 1u); + // c2s sent 1 reconnect_ok messages + ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 14), 1u); + + if (srv_interceptor->count_step(c2s_accepter, 13) == 2) { + // if the server send the reconnect message two times then + // the client must have sent a session retry message to the server + ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 18), 1u); + } else { + ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 18), 0u); + } + + srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE); + + { + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; + } + + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); + + delete cli_interceptor; + delete srv_interceptor; +} + TEST_P(MessengerTest, SimpleTest) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr;