From: Haomai Wang Date: Fri, 13 Mar 2015 11:18:14 +0000 (+0800) Subject: test_msgr: Add new inject test and add support for handle_reset X-Git-Tag: v9.0.0~137^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=10edd5d1f7f846794f3f56d2021d1bc739dcd2c3;p=ceph.git test_msgr: Add new inject test and add support for handle_reset Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index cbcb5dc04c52..fdc94a06d395 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -2041,6 +2041,7 @@ void AsyncConnection::fault() ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half " << "accept state just closed, state=" << get_state_name(state) << dendl; + center->dispatch_event_external(reset_handler); _stop(); return ; } diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 859b3887b8fa..1d32b8f18eab 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -624,6 +624,71 @@ TEST_P(MessengerTest, AuthTest) { client_msgr->wait(); } +TEST_P(MessengerTest, MessageTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("127.0.0.1"); + Messenger::Policy p = Messenger::Policy::stateful_server(0, 0); + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); + p = Messenger::Policy::lossless_peer(0, 0); + client_msgr->set_policy(entity_name_t::TYPE_OSD, p); + + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + + // 1. A very large "front"(as well as "payload") + // Because a external message need to invade Messenger::decode_message, + // here we only use existing message class(MCommand) + ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); + { + uuid_d uuid; + uuid.generate_random(); + vector cmds; + string s("abcdefghijklmnopqrstuvwxyz"); + for (int i = 0; i < 1024*30; i++) + cmds.push_back(s); + MCommand *m = new MCommand(uuid); + m->cmd = cmds; + conn->send_message(m); + utime_t t; + t += 1000*1000*500; + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t); + ASSERT_TRUE(cli_dispatcher.got_new); + cli_dispatcher.got_new = false; + } + + // 2. A very large "data" + { + bufferlist bl; + string s("abcdefghijklmnopqrstuvwxyz"); + for (int i = 0; i < 1024*30; i++) + bl.append(s); + MPing *m = new MPing(); + m->set_data(bl); + conn->send_message(m); + utime_t t; + t += 1000*1000*500; + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t); + ASSERT_TRUE(cli_dispatcher.got_new); + cli_dispatcher.got_new = false; + } + server_msgr->shutdown(); + client_msgr->shutdown(); + server_msgr->wait(); + client_msgr->wait(); +} + + +class SyntheticWorkload; + class SyntheticDispatcher : public Dispatcher { public: Mutex lock; @@ -635,10 +700,11 @@ class SyntheticDispatcher : public Dispatcher { map > conn_sent; map sent; atomic_t index; + SyntheticWorkload *workload; - SyntheticDispatcher(bool s): Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), - is_server(s), got_new(false), got_remote_reset(false), - got_connect(false), index(0) {} + SyntheticDispatcher(bool s, SyntheticWorkload *wl): + Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false), + got_remote_reset(false), got_connect(false), index(0), workload(wl) {} bool ms_can_fast_dispatch_any() const { return true; } bool ms_can_fast_dispatch(Message *m) const { switch (m->get_type()) { @@ -660,9 +726,7 @@ class SyntheticDispatcher : public Dispatcher { bool ms_dispatch(Message *m) { assert(0); } - bool ms_handle_reset(Connection *con) { - return true; - } + bool ms_handle_reset(Connection *con); void ms_handle_remote_reset(Connection *con) { Mutex::Locker l(lock); list c = conn_sent[con]; @@ -747,75 +811,12 @@ class SyntheticDispatcher : public Dispatcher { }; -TEST_P(MessengerTest, MessageTest) { - SyntheticDispatcher cli_dispatcher(false), srv_dispatcher(true); - entity_addr_t bind_addr; - bind_addr.parse("127.0.0.1"); - Messenger::Policy p = Messenger::Policy::stateful_server(0, 0); - server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); - p = Messenger::Policy::lossless_peer(0, 0); - client_msgr->set_policy(entity_name_t::TYPE_OSD, p); - - server_msgr->bind(bind_addr); - server_msgr->add_dispatcher_head(&srv_dispatcher); - server_msgr->start(); - client_msgr->add_dispatcher_head(&cli_dispatcher); - client_msgr->start(); - - - // 1. A very large "front"(as well as "payload") - // Because a external message need to invade Messenger::decode_message, - // here we only use existing message class(MCommand) - ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); - { - uuid_d uuid; - uuid.generate_random(); - vector cmds; - string s("abcdefghijklmnopqrstuvwxyz"); - for (int i = 0; i < 1024*30; i++) - cmds.push_back(s); - MCommand *m = new MCommand(uuid); - m->cmd = cmds; - cli_dispatcher.send_message_wrap(conn, m); - utime_t t; - t += 1000*1000*500; - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t); - ASSERT_TRUE(cli_dispatcher.got_new); - cli_dispatcher.got_new = false; - } - - // 2. A very large "data" - { - bufferlist bl; - string s("abcdefghijklmnopqrstuvwxyz"); - for (int i = 0; i < 1024*30; i++) - bl.append(s); - MPing *m = new MPing(); - m->set_data(bl); - cli_dispatcher.send_message_wrap(conn, m); - utime_t t; - t += 1000*1000*500; - Mutex::Locker l(cli_dispatcher.lock); - while (!cli_dispatcher.got_new) - cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t); - ASSERT_TRUE(cli_dispatcher.got_new); - cli_dispatcher.got_new = false; - } - server_msgr->shutdown(); - client_msgr->shutdown(); - server_msgr->wait(); - client_msgr->wait(); -} - - class SyntheticWorkload { Mutex lock; Cond cond; set available_servers; set available_clients; - map, ConnectionRef> available_connections; + map > available_connections; SyntheticDispatcher dispatcher; gen_type rng; vector rand_data; @@ -827,7 +828,7 @@ class SyntheticWorkload { SyntheticWorkload(int servers, int clients, string type, int random_num, Messenger::Policy srv_policy, Messenger::Policy cli_policy): - lock("SyntheticWorkload::lock"), dispatcher(false), rng(time(NULL)) { + lock("SyntheticWorkload::lock"), dispatcher(false, this), rng(time(NULL)) { Messenger *msgr; int base_port = 16800; entity_addr_t bind_addr; @@ -878,17 +879,18 @@ class SyntheticWorkload { } } - ConnectionRef _get_random_connection(pair *p) { - while (dispatcher.get_pending() > max_in_flight) + ConnectionRef _get_random_connection() { + while (dispatcher.get_pending() > max_in_flight) { + lock.Unlock(); usleep(500); + lock.Lock(); + } assert(lock.is_locked()); boost::uniform_int<> choose(0, available_connections.size() - 1); int index = choose(rng); - map, ConnectionRef>::iterator i = available_connections.begin(); + map >::iterator i = available_connections.begin(); for (; index > 0; --index, ++i) ; - if (p) - *p = i->first; - return i->second; + return i->first; } bool can_create_connection() { @@ -924,10 +926,8 @@ class SyntheticWorkload { else p = make_pair(server, client); } - if (!available_connections.count(p)) { - ConnectionRef conn = p.first->get_connection(p.second->get_myinst()); - available_connections[p] = conn; - } + ConnectionRef conn = p.first->get_connection(p.second->get_myinst()); + available_connections[conn] = p; } void send_message() { @@ -938,19 +938,18 @@ class SyntheticWorkload { bl = rand_data[index]; m->set_data(bl); Mutex::Locker l(lock); - ConnectionRef conn = _get_random_connection(NULL); + ConnectionRef conn = _get_random_connection(); dispatcher.send_message_wrap(conn, m); } void drop_connection() { - pair p; Mutex::Locker l(lock); if (available_connections.size() < 10) return; - ConnectionRef conn = _get_random_connection(&p); + ConnectionRef conn = _get_random_connection(); dispatcher.clear_pending(conn); conn->mark_down(); - ASSERT_EQ(available_connections.erase(p), 1U); + ASSERT_EQ(available_connections.erase(conn), 1U); } void print_internal_state() { @@ -982,8 +981,19 @@ class SyntheticWorkload { } available_clients.clear(); } + + void handle_reset(Connection *con) { + Mutex::Locker l(lock); + available_connections.erase(con); + dispatcher.clear_pending(con); + } }; +bool SyntheticDispatcher::ms_handle_reset(Connection *con) { + workload->handle_reset(con); + return true; +} + TEST_P(MessengerTest, SyntheticStressTest) { SyntheticWorkload test_msg(8, 32, GetParam(), 100, Messenger::Policy::stateful_server(0, 0), @@ -1143,6 +1153,40 @@ TEST_P(MessengerTest, SyntheticInjectTest3) { } +TEST_P(MessengerTest, SyntheticInjectTest4) { + g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30"); + g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1"); + SyntheticWorkload test_msg(16, 32, GetParam(), 100, + Messenger::Policy::lossless_peer(0, 0), + Messenger::Policy::lossless_peer(0, 0)); + for (int i = 0; i < 100; ++i) { + if (!(i % 10)) cerr << "seeding connection " << i << std::endl; + test_msg.generate_connection(); + } + gen_type rng(time(NULL)); + for (int i = 0; i < 1000; ++i) { + if (!(i % 10)) { + cerr << "Op " << i << ": "; + test_msg.print_internal_state(); + } + boost::uniform_int<> true_false(0, 99); + int val = true_false(rng); + if (val > 95) { + test_msg.generate_connection(); + } else if (val > 80) { + // test_msg.drop_connection(); + } else if (val > 10) { + test_msg.send_message(); + } else { + usleep(rand() % 500 + 100); + } + } + test_msg.wait_for_done(); + g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0"); + g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0"); +} + + class MarkdownDispatcher : public Dispatcher { Mutex lock; set conns;