]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
TestMsgr: Make SyntheticWorkload support policy passed in
authorHaomai Wang <haomaiwang@gmail.com>
Wed, 25 Feb 2015 07:14:31 +0000 (15:14 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 25 Feb 2015 15:43:32 +0000 (23:43 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/test/msgr/test_msgr.cc

index ae0218d65b21ac6a86deca1a82cf619dbae74549..68ef3f4c826e2c256490cc4e637a510d5955a834 100644 (file)
@@ -665,6 +665,11 @@ class SyntheticDispatcher : public Dispatcher {
   }
   void ms_handle_remote_reset(Connection *con) {
     Mutex::Locker l(lock);
+    list<uint64_t> c = conn_sent[con];
+    for (list<uint64_t>::iterator it = c.begin();
+         it != c.end(); it++)
+      sent.erase(*it);
+    conn_sent.erase(con);
     got_remote_reset = true;
   }
   void ms_fast_dispatch(Message *m) {
@@ -814,23 +819,24 @@ class SyntheticWorkload {
   static const unsigned max_connections = 128;
   static const unsigned max_message_len = 1024 * 1024 * 4;
 
-  SyntheticWorkload(int servers, int clients, string type, int random_num):
+  SyntheticWorkload(int servers, int clients, string type, int random_num,
+                    Messenger::Policy srv_policy, Messenger::Policy cli_policy):
       lock("SyntheticWorkload::lock"), cli_dispatcher(false), srv_dispatcher(true),
       rng(time(NULL)) {
     Messenger *msgr;
     int base_port = 16800;
+    entity_addr_t bind_addr;
+    char addr[64];
     for (int i = 0; i < servers; ++i) {
-      entity_addr_t bind_addr;
-      char addr[64];
-      snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
       msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
                                "server", getpid()+i);
+      snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
       bind_addr.parse(addr);
       msgr->bind(bind_addr);
       msgr->add_dispatcher_head(&srv_dispatcher);
 
       assert(msgr);
-      msgr->set_default_policy(Messenger::Policy::stateful_server(0, 0));
+      msgr->set_default_policy(srv_policy);
       available_servers.insert(msgr);
       msgr->start();
     }
@@ -838,9 +844,13 @@ class SyntheticWorkload {
     for (int i = 0; i < clients; ++i) {
       msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
                                "client", getpid()+i+servers);
-      assert(msgr);
-      msgr->set_default_policy(Messenger::Policy::lossless_client(0, 0));
+      snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers);
+      bind_addr.parse(addr);
+      msgr->bind(bind_addr);
       msgr->add_dispatcher_head(&cli_dispatcher);
+
+      assert(msgr);
+      msgr->set_default_policy(cli_policy);
       available_clients.insert(msgr);
       msgr->start();
     }
@@ -960,7 +970,9 @@ class SyntheticWorkload {
 };
 
 TEST_P(MessengerTest, SyntheticStressTest) {
-  SyntheticWorkload test_msg(32, 128, GetParam(), 100);
+  SyntheticWorkload test_msg(32, 128, GetParam(), 100,
+                             Messenger::Policy::stateful_server(0, 0),
+                             Messenger::Policy::lossless_client(0, 0));
   for (int i = 0; i < 100; ++i) {
     if (!(i % 10)) cerr << "seeding connection " << i << std::endl;
     test_msg.generate_connection();
@@ -990,7 +1002,9 @@ TEST_P(MessengerTest, SyntheticStressTest) {
 TEST_P(MessengerTest, SyntheticInjectTest) {
   g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
   g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
-  SyntheticWorkload test_msg(4, 16, GetParam(), 100);
+  SyntheticWorkload test_msg(4, 16, GetParam(), 100,
+                             Messenger::Policy::stateful_server(0, 0),
+                             Messenger::Policy::lossless_client(0, 0));
   for (int i = 0; i < 100; ++i) {
     if (!(i % 10)) cerr << "seeding connection " << i << std::endl;
     test_msg.generate_connection();