]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test_msgr: Make each side can initialize connection when !policy.server
authorHaomai Wang <haomaiwang@gmail.com>
Thu, 12 Mar 2015 10:22:09 +0000 (18:22 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Fri, 13 Mar 2015 06:10:06 +0000 (14:10 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/test/msgr/test_msgr.cc

index 4dcf756cd95c628fd237bb28a8f960eee8f2a3fb..3e2fadbddfa1ac5b6ed1ac3c41397b047c99b08f 100644 (file)
@@ -674,19 +674,21 @@ class SyntheticDispatcher : public Dispatcher {
   }
   void ms_fast_dispatch(Message *m) {
     Mutex::Locker l(lock);
-    if (is_server) {
-      reply_message(m);
-    } else if (m->get_middle().length()) {
-      bufferlist middle = m->get_middle();
-      uint64_t i;
-      ASSERT_EQ(sizeof(uint64_t), middle.length());
-      memcpy(&i, middle.c_str(), middle.length());
-      if (sent.count(i)) {
-        ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
-        ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
-        conn_sent[m->get_connection()].pop_front();
-        sent.erase(i);
-      }
+    uint64_t i;
+    bool reply;
+    assert(m->get_middle().length());
+    bufferlist::iterator blp = m->get_middle().begin();
+    ::decode(i, blp);
+    ::decode(reply, blp);
+    if (reply) {
+      cerr << __func__ << " reply=" << reply << " i=" << i << std::endl;
+      reply_message(m, i);
+    } else if (sent.count(i)) {
+      cerr << __func__ << " reply=" << reply << " i=" << i << std::endl;
+      ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
+      ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
+      conn_sent[m->get_connection()].pop_front();
+      sent.erase(i);
     }
     got_new = true;
     cond.Signal();
@@ -700,12 +702,15 @@ class SyntheticDispatcher : public Dispatcher {
     return true;
   }
 
-  void reply_message(Message *m) {
+  void reply_message(Message *m, uint64_t i) {
+    bufferlist bl;
+    ::encode(i, bl);
+    ::encode(false, bl);
     MPing *rm = new MPing();
     if (m->get_data_len())
       rm->set_data(m->get_data());
     if (m->get_middle().length())
-      rm->set_middle(m->get_middle());
+      rm->set_middle(bl);
     m->get_connection()->send_message(rm);
   }
 
@@ -715,12 +720,13 @@ class SyntheticDispatcher : public Dispatcher {
       bufferlist bl;
       uint64_t i = index.read();
       index.inc();
-      bufferptr bp(sizeof(i));
-      memcpy(bp.c_str(), (char*)&i, sizeof(i));
-      bl.push_back(bp);
+      ::encode(i, bl);
+      ::encode(true, bl);
       m->set_middle(bl);
-      sent[i] = m->get_data();
-      conn_sent[con].push_back(i);
+      if (!con->get_messenger()->get_default_policy().lossy) {
+        sent[i] = m->get_data();
+        conn_sent[con].push_back(i);
+      }
     }
     ASSERT_EQ(con->send_message(m), 0);
   }
@@ -770,7 +776,7 @@ TEST_P(MessengerTest, MessageTest) {
       cmds.push_back(s);
     MCommand *m = new MCommand(uuid);
     m->cmd = cmds;
-    ASSERT_EQ(conn->send_message(m), 0);
+    cli_dispatcher.send_message_wrap(conn, m);
     utime_t t;
     t += 1000*1000*500;
     Mutex::Locker l(cli_dispatcher.lock);
@@ -810,7 +816,7 @@ class SyntheticWorkload {
   set<Messenger*> available_servers;
   set<Messenger*> available_clients;
   map<pair<Messenger*, Messenger*>, ConnectionRef> available_connections;
-  SyntheticDispatcher cli_dispatcher, srv_dispatcher;
+  SyntheticDispatcher dispatcher;
   gen_type rng;
   vector<bufferlist> rand_data;
 
@@ -821,8 +827,7 @@ class SyntheticWorkload {
 
   SyntheticWorkload(int servers, int clients, string type, int random_num,
                     Messenger::Policy srv_policy, Messenger::Policy cli_policy):
-      lock("SyntheticWorkload::lock"), cli_dispatcher(false), srv_dispatcher(true),
-      rng(time(NULL)) {
+      lock("SyntheticWorkload::lock"), dispatcher(false), rng(time(NULL)) {
     Messenger *msgr;
     int base_port = 16800;
     entity_addr_t bind_addr;
@@ -833,7 +838,7 @@ class SyntheticWorkload {
       snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
       bind_addr.parse(addr);
       msgr->bind(bind_addr);
-      msgr->add_dispatcher_head(&srv_dispatcher);
+      msgr->add_dispatcher_head(&dispatcher);
 
       assert(msgr);
       msgr->set_default_policy(srv_policy);
@@ -849,7 +854,7 @@ class SyntheticWorkload {
         bind_addr.parse(addr);
         msgr->bind(bind_addr);
       }
-      msgr->add_dispatcher_head(&cli_dispatcher);
+      msgr->add_dispatcher_head(&dispatcher);
 
       assert(msgr);
       msgr->set_default_policy(cli_policy);
@@ -874,7 +879,7 @@ class SyntheticWorkload {
   }
 
   ConnectionRef _get_random_connection(pair<Messenger*, Messenger*> *p) {
-    while (cli_dispatcher.get_pending() > max_in_flight)
+    while (dispatcher.get_pending() > max_in_flight)
       usleep(500);
     assert(lock.is_locked());
     boost::uniform_int<> choose(0, available_connections.size() - 1);
@@ -911,9 +916,21 @@ class SyntheticWorkload {
       client = *i;
     }
 
-    if (!available_connections.count(make_pair(client, server))) {
-      ConnectionRef conn = client->get_connection(server->get_myinst());
-      available_connections[make_pair(client, server)] = conn;
+    pair<Messenger*, Messenger*> p;
+    {
+      boost::uniform_int<> choose(0, available_servers.size() - 1);
+      int index = choose(rng);
+      if (server->get_default_policy().server || index % 2) {
+        conn = client->get_connection(server->get_myinst());
+        p = make_pair(client, server);
+      } else {
+        conn = server->get_connection(client->get_myinst());
+        p = make_pair(server, client);
+      }
+    }
+    if (!available_connections.count(p)) {
+      ConnectionRef conn = p.first->get_connection(p.second->get_myinst());
+      available_connections[p] = conn;
     }
   }
 
@@ -926,7 +943,7 @@ class SyntheticWorkload {
     m->set_data(bl);
     Mutex::Locker l(lock);
     ConnectionRef conn = _get_random_connection(NULL);
-    cli_dispatcher.send_message_wrap(conn, m);
+    dispatcher.send_message_wrap(conn, m);
   }
 
   void drop_connection() {
@@ -935,7 +952,7 @@ class SyntheticWorkload {
     if (available_connections.size() < 10)
       return;
     ConnectionRef conn = _get_random_connection(&p);
-    cli_dispatcher.clear_pending(conn);
+    dispatcher.clear_pending(conn);
     conn->mark_down();
     ASSERT_EQ(available_connections.erase(p), 1U);
   }
@@ -943,12 +960,12 @@ class SyntheticWorkload {
   void print_internal_state() {
     Mutex::Locker l(lock);
     cerr << "available_connections: " << available_connections.size()
-         << " inflight messages: " << cli_dispatcher.get_pending() << std::endl;
+         << " inflight messages: " << dispatcher.get_pending() << std::endl;
   }
 
   void wait_for_done() {
     uint64_t i = 0;
-    while (cli_dispatcher.get_pending()) {
+    while (dispatcher.get_pending()) {
       usleep(1000*100);
       if (i++ % 50 == 0)
         print_internal_state();
@@ -972,7 +989,7 @@ class SyntheticWorkload {
 };
 
 TEST_P(MessengerTest, SyntheticStressTest) {
-  SyntheticWorkload test_msg(32, 128, GetParam(), 100,
+  SyntheticWorkload test_msg(8, 32, GetParam(), 100,
                              Messenger::Policy::stateful_server(0, 0),
                              Messenger::Policy::lossless_client(0, 0));
   for (int i = 0; i < 100; ++i) {
@@ -1000,11 +1017,40 @@ TEST_P(MessengerTest, SyntheticStressTest) {
   test_msg.wait_for_done();
 }
 
+TEST_P(MessengerTest, SyntheticStressTest1) {
+  SyntheticWorkload test_msg(16, 32, GetParam(), 100,
+                             Messenger::Policy::lossless_peer_reuse(0, 0),
+                             Messenger::Policy::lossless_peer_reuse(0, 0));
+  for (int i = 0; i < 10; ++i) {
+    if (!(i % 10)) cerr << "seeding connection " << i << std::endl;
+    test_msg.generate_connection();
+  }
+  gen_type rng(time(NULL));
+  for (int i = 0; i < 10000; ++i) {
+    if (!(i % 10)) {
+      cerr << "Op " << i << ": ";
+      test_msg.print_internal_state();
+    }
+    boost::uniform_int<> true_false(0, 99);
+    int val = true_false(rng);
+    if (val > 80) {
+      test_msg.generate_connection();
+    } else if (val > 60) {
+      test_msg.drop_connection();
+    } else if (val > 10) {
+      test_msg.send_message();
+    } else {
+      usleep(rand() % 1000 + 500);
+    }
+  }
+  test_msg.wait_for_done();
+}
+
 
 TEST_P(MessengerTest, SyntheticInjectTest) {
   g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
   g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
-  SyntheticWorkload test_msg(4, 16, GetParam(), 100,
+  SyntheticWorkload test_msg(8, 32, GetParam(), 100,
                              Messenger::Policy::stateful_server(0, 0),
                              Messenger::Policy::lossless_client(0, 0));
   for (int i = 0; i < 100; ++i) {
@@ -1037,7 +1083,7 @@ TEST_P(MessengerTest, SyntheticInjectTest) {
 TEST_P(MessengerTest, SyntheticInjectTest2) {
   g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
   g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
-  SyntheticWorkload test_msg(4, 16, GetParam(), 100,
+  SyntheticWorkload test_msg(8, 16, GetParam(), 100,
                              Messenger::Policy::lossless_peer_reuse(0, 0),
                              Messenger::Policy::lossless_peer_reuse(0, 0));
   for (int i = 0; i < 100; ++i) {