client_msgr->wait();
}
+class MarkdownDispatcher : public Dispatcher {
+ Mutex lock;
+ set<Connection*> conns;
+ bool last_mark;
+ public:
+ atomic_t count;
+ MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"),
+ last_mark(false), count(0) {}
+ bool ms_can_fast_dispatch_any() const { return false; }
+ bool ms_can_fast_dispatch(Message *m) const {
+ switch (m->get_type()) {
+ case CEPH_MSG_PING:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ void ms_handle_fast_connect(Connection *con) {
+ cerr << __func__ << con << std::endl;
+ conns.insert(con);
+ }
+ void ms_handle_fast_accept(Connection *con) {
+ conns.insert(con);
+ }
+ bool ms_dispatch(Message *m) {
+ cerr << __func__ << " conn: " << m->get_connection() << std::endl;
+ Mutex::Locker l(lock);
+ count.inc();
+ conns.insert(m->get_connection().get());
+ if (conns.size() < 2 && !last_mark)
+ return true;
+
+ last_mark = true;
+ for (set<Connection*>::iterator it = conns.begin(); it != conns.end(); ++it) {
+ if ((*it) != m->get_connection().get()) {
+ (*it)->mark_down();
+ conns.erase(it);
+ break;
+ }
+ }
+ if (conns.empty())
+ last_mark = false;
+ return true;
+ }
+ bool ms_handle_reset(Connection *con) {
+ cerr << __func__ << con << std::endl;
+ conns.erase(con);
+ return true;
+ }
+ void ms_handle_remote_reset(Connection *con) {
+ conns.erase(con);
+ cerr << __func__ << con << std::endl;
+ }
+ void ms_fast_dispatch(Message *m) {
+ assert(0);
+ }
+ bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
+ bufferlist& authorizer, bufferlist& authorizer_reply,
+ bool& isvalid, CryptoKey& session_key) {
+ isvalid = true;
+ return true;
+ }
+};
+
+
+// Markdown with external lock
+TEST_P(MessengerTest, MarkdownTest) {
+ Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
+ MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("127.0.0.1:16800");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+ bind_addr.parse("127.0.0.1:16801");
+ server_msgr2->bind(bind_addr);
+ server_msgr2->add_dispatcher_head(&srv_dispatcher);
+ server_msgr2->start();
+
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ int i = 1000;
+ uint64_t last = 0;
+ bool equal = false;
+ uint64_t equal_count = 0;
+ while (i--) {
+ ConnectionRef conn1 = client_msgr->get_connection(server_msgr->get_myinst());
+ ConnectionRef conn2 = client_msgr->get_connection(server_msgr2->get_myinst());
+ MPing *m = new MPing();
+ ASSERT_EQ(conn1->send_message(m), 0);
+ m = new MPing();
+ ASSERT_EQ(conn2->send_message(m), 0);
+ CHECK_AND_WAIT_TRUE(srv_dispatcher.count.read() > last + 1);
+ if (srv_dispatcher.count.read() == last) {
+ cerr << __func__ << " last is " << last << std::endl;
+ equal = true;
+ equal_count++;
+ } else {
+ equal = false;
+ equal_count = 0;
+ }
+ last = srv_dispatcher.count.read();
+ if (equal_count)
+ sleep(0.5);
+ ASSERT_FALSE(equal && equal_count > 3);
+ }
+ server_msgr->shutdown();
+ client_msgr->shutdown();
+ server_msgr2->shutdown();
+ server_msgr->wait();
+ client_msgr->wait();
+ server_msgr2->wait();
+ delete server_msgr2;
+}
+
INSTANTIATE_TEST_CASE_P(
Messenger,
MessengerTest,