#include <unistd.h>
#include <stdlib.h>
#include <time.h>
+#include <set>
+#include <list>
#include "common/Mutex.h"
#include "common/Cond.h"
#include "common/ceph_argparse.h"
typedef FakeDispatcher::Session Session;
+struct TestInterceptor : public Interceptor {
+
+ bool step_waiting = false;
+ bool waiting = true;
+ std::map<Connection *, uint32_t> current_step;
+ std::map<Connection *, std::list<uint32_t>> step_history;
+ std::map<uint32_t, std::optional<ACTION>> decisions;
+ std::set<uint32_t> breakpoints;
+
+ uint32_t count_step(Connection *conn, uint32_t step) {
+ uint32_t count = 0;
+ for (auto s : step_history[conn]) {
+ if (s == step) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ void breakpoint(uint32_t step) {
+ breakpoints.insert(step);
+ }
+
+ void remove_bp(uint32_t step) {
+ breakpoints.erase(step);
+ }
+
+ Connection *wait(uint32_t step, Connection *conn=nullptr) {
+ std::unique_lock<std::mutex> l(lock);
+ while(true) {
+ if (conn) {
+ auto it = current_step.find(conn);
+ if (it != current_step.end()) {
+ if (it->second == step) {
+ break;
+ }
+ }
+ } else {
+ for (auto it : current_step) {
+ if (it.second == step) {
+ conn = it.first;
+ break;
+ }
+ }
+ if (conn) {
+ break;
+ }
+ }
+ step_waiting = true;
+ cond_var.wait(l);
+ }
+ step_waiting = false;
+ return conn;
+ }
+
+ ACTION wait_for_decision(uint32_t step, std::unique_lock<std::mutex> &l) {
+ if (decisions[step]) {
+ return *(decisions[step]);
+ }
+ waiting = true;
+ while(waiting) {
+ cond_var.wait(l);
+ }
+ return *(decisions[step]);
+ }
+
+ void proceed(uint32_t step, ACTION decision) {
+ std::unique_lock<std::mutex> l(lock);
+ decisions[step] = decision;
+ if (waiting) {
+ waiting = false;
+ cond_var.notify_one();
+ }
+ }
+
+ ACTION intercept(Connection *conn, uint32_t step) override {
+ lderr(g_ceph_context) << __func__ << " conn(" << conn
+ << ") intercept called on step=" << step << dendl;
+
+ {
+ std::unique_lock<std::mutex> l(lock);
+ step_history[conn].push_back(step);
+ current_step[conn] = step;
+ if (step_waiting) {
+ cond_var.notify_one();
+ }
+ }
+
+ std::unique_lock<std::mutex> l(lock);
+ ACTION decision = ACTION::CONTINUE;
+ if (breakpoints.find(step) != breakpoints.end()) {
+ lderr(g_ceph_context) << __func__ << " conn(" << conn
+ << ") pausing on step=" << step << dendl;
+ decision = wait_for_decision(step, l);
+ } else {
+ if (decisions[step]) {
+ decision = *(decisions[step]);
+ }
+ }
+ lderr(g_ceph_context) << __func__ << " conn(" << conn
+ << ") resuming step=" << step << " with decision="
+ << decision << dendl;
+ decisions[step].reset();
+ return decision;
+ }
+
+};
+
+/**
+ * Scenario: A connects to B, and B connects to A at the same time.
+ */
+TEST_P(MessengerTest, ConnectionRaceTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+ TestInterceptor *cli_interceptor = new TestInterceptor();
+ TestInterceptor *srv_interceptor = new TestInterceptor();
+
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0));
+ server_msgr->interceptor = srv_interceptor;
+
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0));
+ client_msgr->interceptor = cli_interceptor;
+
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1:3300");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ bind_addr.parse("v2:127.0.0.1:3301");
+ client_msgr->bind(bind_addr);
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // pause before sending client_ident message
+ cli_interceptor->breakpoint(11);
+ // pause before sending client_ident message
+ srv_interceptor->breakpoint(11);
+
+ ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ MPing *m1 = new MPing();
+ ASSERT_EQ(c2s->send_message(m1), 0);
+
+ ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
+ client_msgr->get_myaddrs());
+ MPing *m2 = new MPing();
+ ASSERT_EQ(s2c->send_message(m2), 0);
+
+ cli_interceptor->wait(11, c2s.get());
+ srv_interceptor->wait(11, s2c.get());
+
+ // at this point both connections (A->B, B->A) are paused just before sending
+ // the client_ident message.
+
+ cli_interceptor->remove_bp(11);
+ srv_interceptor->remove_bp(11);
+
+ cli_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
+ srv_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
+
+ {
+ Mutex::Locker l(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
+ }
+
+ {
+ Mutex::Locker l(srv_dispatcher.lock);
+ while (!srv_dispatcher.got_new)
+ srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+ srv_dispatcher.got_new = false;
+ }
+
+ ASSERT_TRUE(s2c->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
+ ASSERT_TRUE(s2c->peer_is_client());
+
+ ASSERT_TRUE(c2s->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s->peer_is_osd());
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ delete cli_interceptor;
+ delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ * - A connects to B
+ * - A sends client_ident to B
+ * - B fails before sending server_ident to A
+ * - A reconnects
+ */
+TEST_P(MessengerTest, MissingServerIdenTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+ TestInterceptor *cli_interceptor = new TestInterceptor();
+ TestInterceptor *srv_interceptor = new TestInterceptor();
+
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
+ server_msgr->interceptor = srv_interceptor;
+
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0));
+ client_msgr->interceptor = cli_interceptor;
+
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1:3300");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ bind_addr.parse("v2:127.0.0.1:3301");
+ client_msgr->bind(bind_addr);
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // pause before sending client_ident message
+ srv_interceptor->breakpoint(12);
+
+ ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ MPing *m1 = new MPing();
+ ASSERT_EQ(c2s->send_message(m1), 0);
+
+ Connection *c2s_accepter = srv_interceptor->wait(12);
+ srv_interceptor->remove_bp(12);
+
+ // We inject a message from this side of the connection to force it to be
+ // in standby when we inject the failure below
+ MPing *m2 = new MPing();
+ ASSERT_EQ(c2s_accepter->send_message(m2), 0);
+
+ srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
+
+ {
+ Mutex::Locker l(srv_dispatcher.lock);
+ while (!srv_dispatcher.got_new)
+ srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+ srv_dispatcher.got_new = false;
+ }
+
+ {
+ Mutex::Locker l(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
+ }
+
+ ASSERT_TRUE(c2s->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s->peer_is_osd());
+
+ ASSERT_TRUE(c2s_accepter->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s_accepter->peer_is_client());
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ delete cli_interceptor;
+ delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ * - A connects to B
+ * - A sends client_ident to B
+ * - B fails before sending server_ident to A
+ * - A goes to standby
+ * - B reconnects to A
+ */
+TEST_P(MessengerTest, MissingServerIdenTest2) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+ TestInterceptor *cli_interceptor = new TestInterceptor();
+ TestInterceptor *srv_interceptor = new TestInterceptor();
+
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
+ server_msgr->interceptor = srv_interceptor;
+
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
+ client_msgr->interceptor = cli_interceptor;
+
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1:3300");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ bind_addr.parse("v2:127.0.0.1:3301");
+ client_msgr->bind(bind_addr);
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // pause before sending client_ident message
+ srv_interceptor->breakpoint(12);
+
+ ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+
+ Connection *c2s_accepter = srv_interceptor->wait(12);
+ srv_interceptor->remove_bp(12);
+
+ // We inject a message from this side of the connection to force it to be
+ // in standby when we inject the failure below
+ MPing *m2 = new MPing();
+ ASSERT_EQ(c2s_accepter->send_message(m2), 0);
+
+ srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
+
+ {
+ Mutex::Locker l(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
+ }
+
+ ASSERT_TRUE(c2s->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s->peer_is_osd());
+
+ ASSERT_TRUE(c2s_accepter->is_connected());
+ ASSERT_EQ(0u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s_accepter->peer_is_client());
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ delete cli_interceptor;
+ delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ * - A connects to B
+ * - A and B exchange messages
+ * - A fails
+ * - B goes into standby
+ * - A reconnects
+ */
+TEST_P(MessengerTest, ReconnectTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+
+ TestInterceptor *cli_interceptor = new TestInterceptor();
+ TestInterceptor *srv_interceptor = new TestInterceptor();
+
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
+ server_msgr->interceptor = srv_interceptor;
+
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
+ client_msgr->interceptor = cli_interceptor;
+
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1:3300");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ bind_addr.parse("v2:127.0.0.1:3301");
+ client_msgr->bind(bind_addr);
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+
+ MPing *m1 = new MPing();
+ ASSERT_EQ(c2s->send_message(m1), 0);
+
+ {
+ Mutex::Locker l(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
+ }
+
+ ASSERT_TRUE(c2s->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s->peer_is_osd());
+
+ cli_interceptor->breakpoint(16);
+
+ MPing *m2 = new MPing();
+ ASSERT_EQ(c2s->send_message(m2), 0);
+
+ cli_interceptor->wait(16, c2s.get());
+ cli_interceptor->remove_bp(16);
+
+ // at this point client and server are connected together
+
+ srv_interceptor->breakpoint(15);
+
+ // failing client
+ cli_interceptor->proceed(16, Interceptor::ACTION::FAIL);
+
+ MPing *m3 = new MPing();
+ ASSERT_EQ(c2s->send_message(m3), 0);
+
+ Connection *c2s_accepter = srv_interceptor->wait(15);
+ // the srv end of theconnection is now paused at ready
+ // this means that the reconnect was successful
+ srv_interceptor->remove_bp(15);
+
+ ASSERT_TRUE(c2s_accepter->peer_is_client());
+ // c2s_accepter sent 0 reconnect messages
+ ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 13), 0u);
+ // c2s_accepter sent 1 reconnect_ok messages
+ ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 14), 1u);
+ // c2s sent 1 reconnect messages
+ ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 13), 1u);
+ // c2s sent 0 reconnect_ok messages
+ ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 14), 0u);
+
+ srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
+
+ {
+ Mutex::Locker l(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
+ }
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ delete cli_interceptor;
+ delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ * - A connects to B
+ * - A and B exchange messages
+ * - A fails
+ * - A reconnects // B reconnects
+ */
+TEST_P(MessengerTest, ReconnectRaceTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+
+ TestInterceptor *cli_interceptor = new TestInterceptor();
+ TestInterceptor *srv_interceptor = new TestInterceptor();
+
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
+ server_msgr->interceptor = srv_interceptor;
+
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
+ client_msgr->interceptor = cli_interceptor;
+
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1:3300");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ bind_addr.parse("v2:127.0.0.1:3301");
+ client_msgr->bind(bind_addr);
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+
+ MPing *m1 = new MPing();
+ ASSERT_EQ(c2s->send_message(m1), 0);
+
+ {
+ Mutex::Locker l(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
+ }
+
+ ASSERT_TRUE(c2s->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s->peer_is_osd());
+
+ cli_interceptor->breakpoint(16);
+
+ MPing *m2 = new MPing();
+ ASSERT_EQ(c2s->send_message(m2), 0);
+
+ cli_interceptor->wait(16, c2s.get());
+ cli_interceptor->remove_bp(16);
+
+ // at this point client and server are connected together
+
+ // force both client and server to race on reconnect
+ cli_interceptor->breakpoint(13);
+ srv_interceptor->breakpoint(13);
+
+ // failing client
+ // this will cause both client and server to reconnect at the same time
+ cli_interceptor->proceed(16, Interceptor::ACTION::FAIL);
+
+ MPing *m3 = new MPing();
+ ASSERT_EQ(c2s->send_message(m3), 0);
+
+ cli_interceptor->wait(13, c2s.get());
+ srv_interceptor->wait(13);
+
+ cli_interceptor->remove_bp(13);
+ srv_interceptor->remove_bp(13);
+
+ // pause on "ready"
+ srv_interceptor->breakpoint(15);
+
+ cli_interceptor->proceed(13, Interceptor::ACTION::CONTINUE);
+ srv_interceptor->proceed(13, Interceptor::ACTION::CONTINUE);
+
+ Connection *c2s_accepter = srv_interceptor->wait(15);
+
+ // the server has reconnected and is "ready"
+ srv_interceptor->remove_bp(15);
+
+ ASSERT_TRUE(c2s_accepter->peer_is_client());
+ ASSERT_TRUE(c2s->peer_is_osd());
+
+ // the server should win the reconnect race
+
+ // c2s_accepter sent 1 or 2 reconnect messages
+ ASSERT_LT(srv_interceptor->count_step(c2s_accepter, 13), 3u);
+ ASSERT_GT(srv_interceptor->count_step(c2s_accepter, 13), 0u);
+ // c2s_accepter sent 0 reconnect_ok messages
+ ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 14), 0u);
+ // c2s sent 1 reconnect messages
+ ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 13), 1u);
+ // c2s sent 1 reconnect_ok messages
+ ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 14), 1u);
+
+ if (srv_interceptor->count_step(c2s_accepter, 13) == 2) {
+ // if the server send the reconnect message two times then
+ // the client must have sent a session retry message to the server
+ ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 18), 1u);
+ } else {
+ ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 18), 0u);
+ }
+
+ srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
+
+ {
+ Mutex::Locker l(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
+ }
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ delete cli_interceptor;
+ delete srv_interceptor;
+}
+
TEST_P(MessengerTest, SimpleTest) {
FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
entity_addr_t bind_addr;