#if GTEST_HAS_PARAM_TEST
+#define CHECK_AND_WAIT_TRUE(expr) do { \
+ int n = 10; \
+ while (--n) { \
+ if (expr) \
+ break; \
+ usleep(100); \
+ } \
+} while(0);
+
class MessengerTest : public ::testing::TestWithParam<const char*> {
public:
Messenger *server_msgr;
delete server_msgr;
delete client_msgr;
}
+
};
Mutex lock;
Cond cond;
bool is_server;
+ bool got_new;
+ bool got_remote_reset;
+ bool got_connect;
- FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"), is_server(s) {}
+ FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"),
+ is_server(s), got_new(false), got_remote_reset(false),
+ got_connect(false) {}
bool ms_can_fast_dispatch_any() const { return true; }
bool ms_can_fast_dispatch(Message *m) const {
switch (m->get_type()) {
Session *s = static_cast<Session*>(con->get_priv());
if (!s) {
s = new Session(con);
- con->set_priv(s);
+ con->set_priv(s->get());
cerr << __func__ << " con: " << con << " count: " << s->count << std::endl;
}
+ s->put();
+ lock.Lock();
+ got_connect = true;
+ cond.Signal();
+ lock.Unlock();
}
void ms_handle_fast_accept(Connection *con) {
Session *s = static_cast<Session*>(con->get_priv());
if (!s) {
s = new Session(con);
- con->set_priv(s);
+ con->set_priv(s->get());
}
+ s->put();
}
bool ms_dispatch(Message *m) {
Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+ if (!s) {
+ s = new Session(m->get_connection());
+ m->get_connection()->set_priv(s->get());
+ }
+ s->put();
Mutex::Locker l(s->lock);
s->count++;
cerr << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << std::endl;
if (is_server)
reply_message(m);
lock.Lock();
+ got_new = true;
cond.Signal();
lock.Unlock();
return true;
con->set_priv(NULL); // break ref <-> session cycle, if any
s->put();
}
+ got_remote_reset = true;
}
void ms_fast_dispatch(Message *m) {
Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+ if (!s) {
+ s = new Session(m->get_connection());
+ m->get_connection()->set_priv(s->get());
+ }
+ s->put();
Mutex::Locker (s->lock);
s->count++;
cerr << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << std::endl;
if (is_server)
reply_message(m);
lock.Lock();
+ got_new = true;
cond.Signal();
lock.Unlock();
}
{
Mutex::Locker l(cli_dispatcher.lock);
ASSERT_EQ(conn->send_message(m), 0);
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
}
ASSERT_TRUE(conn->is_connected());
ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
m = new MPing();
Mutex::Locker l(cli_dispatcher.lock);
ASSERT_EQ(conn->send_message(m), 0);
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
}
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
m = new MPing();
conn->send_message(m);
- // sleep 0.3s is enough to judge connection failed?
- usleep(300*1000);
+ CHECK_AND_WAIT_TRUE(!conn->is_connected());
ASSERT_FALSE(conn->is_connected());
// 5. loopback connection
m = new MPing();
Mutex::Locker l(cli_dispatcher.lock);
ASSERT_EQ(conn->send_message(m), 0);
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
}
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
client_msgr->shutdown();
{
Mutex::Locker l(cli_dispatcher.lock);
ASSERT_EQ(conn->send_message(m), 0);
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
}
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr());
MPing *m = new MPing();
ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
conn->send_message(m);
- usleep(300*1000);
+ CHECK_AND_WAIT_TRUE(!conn->is_connected());
// should failed build a connection
ASSERT_FALSE(conn->is_connected());
m = new MPing();
Mutex::Locker l(cli_dispatcher.lock);
ASSERT_EQ(conn->send_message(m), 0);
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
}
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
m = new MPing();
Mutex::Locker l(cli_dispatcher.lock);
ASSERT_EQ(conn->send_message(m), 0);
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
}
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
conn->mark_down();
m = new MPing();
Mutex::Locker l(cli_dispatcher.lock);
ASSERT_EQ(conn->send_message(m), 0);
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
}
- // resetcheck happen
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
server_conn = server_msgr->get_connection(client_msgr->get_myinst());
ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
// 2. test for client reconnect
+ ASSERT_FALSE(cli_dispatcher.got_remote_reset);
+ cli_dispatcher.got_connect = false;
server_conn->mark_down();
ASSERT_FALSE(server_conn->is_connected());
- // enough for client reconnect?
- usleep(300*1000);
+ // ensure client detect server socket closed
+ {
+ Mutex::Locker l(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_connect)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_connect = false;
+ }
+ CHECK_AND_WAIT_TRUE(conn->is_connected());
ASSERT_TRUE(conn->is_connected());
- conn = client_msgr->get_connection(server_msgr->get_myinst());
{
m = new MPing();
Mutex::Locker l(cli_dispatcher.lock);
ASSERT_TRUE(conn->is_connected());
ASSERT_EQ(conn->send_message(m), 0);
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
}
// resetcheck happen
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
server_conn = server_msgr->get_connection(client_msgr->get_myinst());
ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
+ ASSERT_TRUE(cli_dispatcher.got_remote_reset);
+ cli_dispatcher.got_remote_reset = false;
server_msgr->shutdown();
client_msgr->shutdown();
m = new MPing();
Mutex::Locker l(cli_dispatcher.lock);
ASSERT_EQ(conn->send_message(m), 0);
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
}
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
conn->mark_down();
m = new MPing();
Mutex::Locker l(cli_dispatcher.lock);
ASSERT_EQ(conn->send_message(m), 0);
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
}
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
// 2. test for client lossy
server_conn->mark_down();
ASSERT_FALSE(server_conn->is_connected());
- usleep(300*1000);
+ CHECK_AND_WAIT_TRUE(!conn->is_connected());
ASSERT_FALSE(conn->is_connected());
conn = client_msgr->get_connection(server_msgr->get_myinst());
{
m = new MPing();
Mutex::Locker l(cli_dispatcher.lock);
ASSERT_EQ(conn->send_message(m), 0);
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
}
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
m = new MPing();
Mutex::Locker l(cli_dispatcher.lock);
ASSERT_EQ(conn->send_message(m), 0);
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
}
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+ ASSERT_FALSE(cli_dispatcher.got_remote_reset);
server_conn->mark_down();
ASSERT_FALSE(server_conn->is_connected());
// client should be standby
// client should be standby, so we use original connection
{
m = new MPing();
- Mutex::Locker l(cli_dispatcher.lock);
conn->send_keepalive();
- usleep(300*1000);
+ CHECK_AND_WAIT_TRUE(conn->is_connected());
+ ASSERT_TRUE(conn->is_connected());
+ Mutex::Locker l(cli_dispatcher.lock);
ASSERT_EQ(conn->send_message(m), 0);
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_new = false;
}
// resetcheck for client, so it discard state previously
+ ASSERT_TRUE(cli_dispatcher.got_remote_reset);
+ cli_dispatcher.got_remote_reset = false;
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
server_conn = server_msgr->get_connection(client_msgr->get_myinst());
ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);