]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test_msgr: Add new inject test and add support for handle_reset
authorHaomai Wang <haomaiwang@gmail.com>
Fri, 13 Mar 2015 11:18:14 +0000 (19:18 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Fri, 13 Mar 2015 11:28:04 +0000 (19:28 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/test/msgr/test_msgr.cc

index cbcb5dc04c5221089a45c8dea16c169798d0b0af..fdc94a06d395d7d505105d4f32558c86f8079ba0 100644 (file)
@@ -2041,6 +2041,7 @@ void AsyncConnection::fault()
     ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half "
                               << "accept state just closed, state="
                               << get_state_name(state) << dendl;
+    center->dispatch_event_external(reset_handler);
     _stop();
     return ;
   }
index 859b3887b8fa2d30edc4fac3f10e84044448da47..1d32b8f18eaba09f6a3e1e2d0f9d2eb27fc19aa3 100644 (file)
@@ -624,6 +624,71 @@ TEST_P(MessengerTest, AuthTest) {
   client_msgr->wait();
 }
 
+TEST_P(MessengerTest, MessageTest) {
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+  entity_addr_t bind_addr;
+  bind_addr.parse("127.0.0.1");
+  Messenger::Policy p = Messenger::Policy::stateful_server(0, 0);
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
+  p = Messenger::Policy::lossless_peer(0, 0);
+  client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
+
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+
+  // 1. A very large "front"(as well as "payload")
+  // Because a external message need to invade Messenger::decode_message,
+  // here we only use existing message class(MCommand)
+  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  {
+    uuid_d uuid;
+    uuid.generate_random();
+    vector<string> cmds;
+    string s("abcdefghijklmnopqrstuvwxyz");
+    for (int i = 0; i < 1024*30; i++)
+      cmds.push_back(s);
+    MCommand *m = new MCommand(uuid);
+    m->cmd = cmds;
+    conn->send_message(m);
+    utime_t t;
+    t += 1000*1000*500;
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t);
+    ASSERT_TRUE(cli_dispatcher.got_new);
+    cli_dispatcher.got_new = false;
+  }
+
+  // 2. A very large "data"
+  {
+    bufferlist bl;
+    string s("abcdefghijklmnopqrstuvwxyz");
+    for (int i = 0; i < 1024*30; i++)
+      bl.append(s);
+    MPing *m = new MPing();
+    m->set_data(bl);
+    conn->send_message(m);
+    utime_t t;
+    t += 1000*1000*500;
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t);
+    ASSERT_TRUE(cli_dispatcher.got_new);
+    cli_dispatcher.got_new = false;
+  }
+  server_msgr->shutdown();
+  client_msgr->shutdown();
+  server_msgr->wait();
+  client_msgr->wait();
+}
+
+
+class SyntheticWorkload;
+
 class SyntheticDispatcher : public Dispatcher {
  public:
   Mutex lock;
@@ -635,10 +700,11 @@ class SyntheticDispatcher : public Dispatcher {
   map<ConnectionRef, list<uint64_t> > conn_sent;
   map<uint64_t, bufferlist> sent;
   atomic_t index;
+  SyntheticWorkload *workload;
 
-  SyntheticDispatcher(bool s): Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"),
-                          is_server(s), got_new(false), got_remote_reset(false),
-                          got_connect(false), index(0) {}
+  SyntheticDispatcher(bool s, SyntheticWorkload *wl):
+      Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false),
+      got_remote_reset(false), got_connect(false), index(0), workload(wl) {}
   bool ms_can_fast_dispatch_any() const { return true; }
   bool ms_can_fast_dispatch(Message *m) const {
     switch (m->get_type()) {
@@ -660,9 +726,7 @@ class SyntheticDispatcher : public Dispatcher {
   bool ms_dispatch(Message *m) {
     assert(0);
   }
-  bool ms_handle_reset(Connection *con) {
-    return true;
-  }
+  bool ms_handle_reset(Connection *con);
   void ms_handle_remote_reset(Connection *con) {
     Mutex::Locker l(lock);
     list<uint64_t> c = conn_sent[con];
@@ -747,75 +811,12 @@ class SyntheticDispatcher : public Dispatcher {
 };
 
 
-TEST_P(MessengerTest, MessageTest) {
-  SyntheticDispatcher cli_dispatcher(false), srv_dispatcher(true);
-  entity_addr_t bind_addr;
-  bind_addr.parse("127.0.0.1");
-  Messenger::Policy p = Messenger::Policy::stateful_server(0, 0);
-  server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
-  p = Messenger::Policy::lossless_peer(0, 0);
-  client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
-
-  server_msgr->bind(bind_addr);
-  server_msgr->add_dispatcher_head(&srv_dispatcher);
-  server_msgr->start();
-  client_msgr->add_dispatcher_head(&cli_dispatcher);
-  client_msgr->start();
-
-
-  // 1. A very large "front"(as well as "payload")
-  // Because a external message need to invade Messenger::decode_message,
-  // here we only use existing message class(MCommand)
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
-  {
-    uuid_d uuid;
-    uuid.generate_random();
-    vector<string> cmds;
-    string s("abcdefghijklmnopqrstuvwxyz");
-    for (int i = 0; i < 1024*30; i++)
-      cmds.push_back(s);
-    MCommand *m = new MCommand(uuid);
-    m->cmd = cmds;
-    cli_dispatcher.send_message_wrap(conn, m);
-    utime_t t;
-    t += 1000*1000*500;
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t);
-    ASSERT_TRUE(cli_dispatcher.got_new);
-    cli_dispatcher.got_new = false;
-  }
-
-  // 2. A very large "data"
-  {
-    bufferlist bl;
-    string s("abcdefghijklmnopqrstuvwxyz");
-    for (int i = 0; i < 1024*30; i++)
-      bl.append(s);
-    MPing *m = new MPing();
-    m->set_data(bl);
-    cli_dispatcher.send_message_wrap(conn, m);
-    utime_t t;
-    t += 1000*1000*500;
-    Mutex::Locker l(cli_dispatcher.lock);
-    while (!cli_dispatcher.got_new)
-      cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t);
-    ASSERT_TRUE(cli_dispatcher.got_new);
-    cli_dispatcher.got_new = false;
-  }
-  server_msgr->shutdown();
-  client_msgr->shutdown();
-  server_msgr->wait();
-  client_msgr->wait();
-}
-
-
 class SyntheticWorkload {
   Mutex lock;
   Cond cond;
   set<Messenger*> available_servers;
   set<Messenger*> available_clients;
-  map<pair<Messenger*, Messenger*>, ConnectionRef> available_connections;
+  map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
   SyntheticDispatcher dispatcher;
   gen_type rng;
   vector<bufferlist> rand_data;
@@ -827,7 +828,7 @@ class SyntheticWorkload {
 
   SyntheticWorkload(int servers, int clients, string type, int random_num,
                     Messenger::Policy srv_policy, Messenger::Policy cli_policy):
-      lock("SyntheticWorkload::lock"), dispatcher(false), rng(time(NULL)) {
+      lock("SyntheticWorkload::lock"), dispatcher(false, this), rng(time(NULL)) {
     Messenger *msgr;
     int base_port = 16800;
     entity_addr_t bind_addr;
@@ -878,17 +879,18 @@ class SyntheticWorkload {
     }
   }
 
-  ConnectionRef _get_random_connection(pair<Messenger*, Messenger*> *p) {
-    while (dispatcher.get_pending() > max_in_flight)
+  ConnectionRef _get_random_connection() {
+    while (dispatcher.get_pending() > max_in_flight) {
+      lock.Unlock();
       usleep(500);
+      lock.Lock();
+    }
     assert(lock.is_locked());
     boost::uniform_int<> choose(0, available_connections.size() - 1);
     int index = choose(rng);
-    map<pair<Messenger*, Messenger*>, ConnectionRef>::iterator i = available_connections.begin();
+    map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
     for (; index > 0; --index, ++i) ;
-    if (p)
-      *p = i->first;
-    return i->second;
+    return i->first;
   }
 
   bool can_create_connection() {
@@ -924,10 +926,8 @@ class SyntheticWorkload {
       else
         p = make_pair(server, client);
     }
-    if (!available_connections.count(p)) {
-      ConnectionRef conn = p.first->get_connection(p.second->get_myinst());
-      available_connections[p] = conn;
-    }
+    ConnectionRef conn = p.first->get_connection(p.second->get_myinst());
+    available_connections[conn] = p;
   }
 
   void send_message() {
@@ -938,19 +938,18 @@ class SyntheticWorkload {
     bl = rand_data[index];
     m->set_data(bl);
     Mutex::Locker l(lock);
-    ConnectionRef conn = _get_random_connection(NULL);
+    ConnectionRef conn = _get_random_connection();
     dispatcher.send_message_wrap(conn, m);
   }
 
   void drop_connection() {
-    pair<Messenger*, Messenger*> p;
     Mutex::Locker l(lock);
     if (available_connections.size() < 10)
       return;
-    ConnectionRef conn = _get_random_connection(&p);
+    ConnectionRef conn = _get_random_connection();
     dispatcher.clear_pending(conn);
     conn->mark_down();
-    ASSERT_EQ(available_connections.erase(p), 1U);
+    ASSERT_EQ(available_connections.erase(conn), 1U);
   }
 
   void print_internal_state() {
@@ -982,8 +981,19 @@ class SyntheticWorkload {
     }
     available_clients.clear();
   }
+
+  void handle_reset(Connection *con) {
+    Mutex::Locker l(lock);
+    available_connections.erase(con);
+    dispatcher.clear_pending(con);
+  }
 };
 
+bool SyntheticDispatcher::ms_handle_reset(Connection *con) {
+  workload->handle_reset(con);
+  return true;
+}
+
 TEST_P(MessengerTest, SyntheticStressTest) {
   SyntheticWorkload test_msg(8, 32, GetParam(), 100,
                              Messenger::Policy::stateful_server(0, 0),
@@ -1143,6 +1153,40 @@ TEST_P(MessengerTest, SyntheticInjectTest3) {
 }
 
 
+TEST_P(MessengerTest, SyntheticInjectTest4) {
+  g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
+  g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
+  SyntheticWorkload test_msg(16, 32, GetParam(), 100,
+                             Messenger::Policy::lossless_peer(0, 0),
+                             Messenger::Policy::lossless_peer(0, 0));
+  for (int i = 0; i < 100; ++i) {
+    if (!(i % 10)) cerr << "seeding connection " << i << std::endl;
+    test_msg.generate_connection();
+  }
+  gen_type rng(time(NULL));
+  for (int i = 0; i < 1000; ++i) {
+    if (!(i % 10)) {
+      cerr << "Op " << i << ": ";
+      test_msg.print_internal_state();
+    }
+    boost::uniform_int<> true_false(0, 99);
+    int val = true_false(rng);
+    if (val > 95) {
+      test_msg.generate_connection();
+    } else if (val > 80) {
+      // test_msg.drop_connection();
+    } else if (val > 10) {
+      test_msg.send_message();
+    } else {
+      usleep(rand() % 500 + 100);
+    }
+  }
+  test_msg.wait_for_done();
+  g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
+  g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
+}
+
+
 class MarkdownDispatcher : public Dispatcher {
   Mutex lock;
   set<ConnectionRef> conns;