From: Haomai Wang Date: Tue, 6 Jan 2015 03:32:54 +0000 (+0800) Subject: MessengerTest: Add markdown with caller lock tests X-Git-Tag: v0.93~247^2~22 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=24fd12f48d9086d20a3a74845f055fe10efe59f2;p=ceph.git MessengerTest: Add markdown with caller lock tests Signed-off-by: Haomai Wang --- diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 5a0200bfbc71..ef8c28cab250 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -554,6 +554,123 @@ TEST_P(MessengerTest, ClientStandbyTest) { client_msgr->wait(); } +class MarkdownDispatcher : public Dispatcher { + Mutex lock; + set conns; + bool last_mark; + public: + atomic_t count; + MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"), + last_mark(false), count(0) {} + bool ms_can_fast_dispatch_any() const { return false; } + bool ms_can_fast_dispatch(Message *m) const { + switch (m->get_type()) { + case CEPH_MSG_PING: + return true; + default: + return false; + } + } + + void ms_handle_fast_connect(Connection *con) { + cerr << __func__ << con << std::endl; + conns.insert(con); + } + void ms_handle_fast_accept(Connection *con) { + conns.insert(con); + } + bool ms_dispatch(Message *m) { + cerr << __func__ << " conn: " << m->get_connection() << std::endl; + Mutex::Locker l(lock); + count.inc(); + conns.insert(m->get_connection().get()); + if (conns.size() < 2 && !last_mark) + return true; + + last_mark = true; + for (set::iterator it = conns.begin(); it != conns.end(); ++it) { + if ((*it) != m->get_connection().get()) { + (*it)->mark_down(); + conns.erase(it); + break; + } + } + if (conns.empty()) + last_mark = false; + return true; + } + bool ms_handle_reset(Connection *con) { + cerr << __func__ << con << std::endl; + conns.erase(con); + return true; + } + void ms_handle_remote_reset(Connection *con) { + conns.erase(con); + cerr << __func__ << con << std::endl; + } + void ms_fast_dispatch(Message *m) { + assert(0); + } + bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, + bufferlist& authorizer, bufferlist& authorizer_reply, + bool& isvalid, CryptoKey& session_key) { + isvalid = true; + return true; + } +}; + + +// Markdown with external lock +TEST_P(MessengerTest, MarkdownTest) { + Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid()); + MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("127.0.0.1:16800"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + bind_addr.parse("127.0.0.1:16801"); + server_msgr2->bind(bind_addr); + server_msgr2->add_dispatcher_head(&srv_dispatcher); + server_msgr2->start(); + + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + int i = 1000; + uint64_t last = 0; + bool equal = false; + uint64_t equal_count = 0; + while (i--) { + ConnectionRef conn1 = client_msgr->get_connection(server_msgr->get_myinst()); + ConnectionRef conn2 = client_msgr->get_connection(server_msgr2->get_myinst()); + MPing *m = new MPing(); + ASSERT_EQ(conn1->send_message(m), 0); + m = new MPing(); + ASSERT_EQ(conn2->send_message(m), 0); + CHECK_AND_WAIT_TRUE(srv_dispatcher.count.read() > last + 1); + if (srv_dispatcher.count.read() == last) { + cerr << __func__ << " last is " << last << std::endl; + equal = true; + equal_count++; + } else { + equal = false; + equal_count = 0; + } + last = srv_dispatcher.count.read(); + if (equal_count) + sleep(0.5); + ASSERT_FALSE(equal && equal_count > 3); + } + server_msgr->shutdown(); + client_msgr->shutdown(); + server_msgr2->shutdown(); + server_msgr->wait(); + client_msgr->wait(); + server_msgr2->wait(); + delete server_msgr2; +} + INSTANTIATE_TEST_CASE_P( Messenger, MessengerTest,