]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
MessengerTest: Add markdown with caller lock tests
authorHaomai Wang <haomaiwang@gmail.com>
Tue, 6 Jan 2015 03:32:54 +0000 (11:32 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 15 Jan 2015 19:07:10 +0000 (03:07 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/test/msgr/test_msgr.cc

index 5a0200bfbc71e3c9c9effd2675545b97be562bea..ef8c28cab2500b6b60d74805ae9c2944f1c60999 100644 (file)
@@ -554,6 +554,123 @@ TEST_P(MessengerTest, ClientStandbyTest) {
   client_msgr->wait();
 }
 
+class MarkdownDispatcher : public Dispatcher {
+  Mutex lock;
+  set<Connection*> conns;
+  bool last_mark;
+ public:
+  atomic_t count;
+  MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"),
+                              last_mark(false), count(0) {}
+  bool ms_can_fast_dispatch_any() const { return false; }
+  bool ms_can_fast_dispatch(Message *m) const {
+    switch (m->get_type()) {
+    case CEPH_MSG_PING:
+      return true;
+    default:
+      return false;
+    }
+  }
+
+  void ms_handle_fast_connect(Connection *con) {
+    cerr << __func__ << con << std::endl;
+    conns.insert(con);
+  }
+  void ms_handle_fast_accept(Connection *con) {
+    conns.insert(con);
+  }
+  bool ms_dispatch(Message *m) {
+    cerr << __func__ << " conn: " << m->get_connection() << std::endl;
+    Mutex::Locker l(lock);
+    count.inc();
+    conns.insert(m->get_connection().get());
+    if (conns.size() < 2 && !last_mark)
+      return true;
+
+    last_mark = true;
+    for (set<Connection*>::iterator it = conns.begin(); it != conns.end(); ++it) {
+      if ((*it) != m->get_connection().get()) {
+        (*it)->mark_down();
+        conns.erase(it);
+        break;
+      }
+    }
+    if (conns.empty())
+      last_mark = false;
+    return true;
+  }
+  bool ms_handle_reset(Connection *con) {
+    cerr << __func__ << con << std::endl;
+    conns.erase(con);
+    return true;
+  }
+  void ms_handle_remote_reset(Connection *con) {
+    conns.erase(con);
+    cerr << __func__ << con << std::endl;
+  }
+  void ms_fast_dispatch(Message *m) {
+    assert(0);
+  }
+  bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
+                            bufferlist& authorizer, bufferlist& authorizer_reply,
+                            bool& isvalid, CryptoKey& session_key) {
+    isvalid = true;
+    return true;
+  }
+};
+
+
+// Markdown with external lock
+TEST_P(MessengerTest, MarkdownTest) {
+  Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
+  MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
+  entity_addr_t bind_addr;
+  bind_addr.parse("127.0.0.1:16800");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+  bind_addr.parse("127.0.0.1:16801");
+  server_msgr2->bind(bind_addr);
+  server_msgr2->add_dispatcher_head(&srv_dispatcher);
+  server_msgr2->start();
+
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  int i = 1000;
+  uint64_t last = 0;
+  bool equal = false;
+  uint64_t equal_count = 0;
+  while (i--) {
+    ConnectionRef conn1 = client_msgr->get_connection(server_msgr->get_myinst());
+    ConnectionRef conn2 = client_msgr->get_connection(server_msgr2->get_myinst());
+    MPing *m = new MPing();
+    ASSERT_EQ(conn1->send_message(m), 0);
+    m = new MPing();
+    ASSERT_EQ(conn2->send_message(m), 0);
+    CHECK_AND_WAIT_TRUE(srv_dispatcher.count.read() > last + 1);
+    if (srv_dispatcher.count.read() == last) {
+      cerr << __func__ << " last is " << last << std::endl;
+      equal = true;
+      equal_count++;
+    } else {
+      equal = false;
+      equal_count = 0;
+    }
+    last = srv_dispatcher.count.read();
+    if (equal_count)
+      sleep(0.5);
+    ASSERT_FALSE(equal && equal_count > 3);
+  }
+  server_msgr->shutdown();
+  client_msgr->shutdown();
+  server_msgr2->shutdown();
+  server_msgr->wait();
+  client_msgr->wait();
+  server_msgr2->wait();
+  delete server_msgr2;
+}
+
 INSTANTIATE_TEST_CASE_P(
   Messenger,
   MessengerTest,