From: Haomai Wang Date: Wed, 25 Feb 2015 07:14:31 +0000 (+0800) Subject: TestMsgr: Make SyntheticWorkload support policy passed in X-Git-Tag: v9.0.0~241^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9f24a8c75c8fe48ae0161990f919af82c41d994e;p=ceph.git TestMsgr: Make SyntheticWorkload support policy passed in Signed-off-by: Haomai Wang --- diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index ae0218d65b21..68ef3f4c826e 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -665,6 +665,11 @@ class SyntheticDispatcher : public Dispatcher { } void ms_handle_remote_reset(Connection *con) { Mutex::Locker l(lock); + list c = conn_sent[con]; + for (list::iterator it = c.begin(); + it != c.end(); it++) + sent.erase(*it); + conn_sent.erase(con); got_remote_reset = true; } void ms_fast_dispatch(Message *m) { @@ -814,23 +819,24 @@ class SyntheticWorkload { static const unsigned max_connections = 128; static const unsigned max_message_len = 1024 * 1024 * 4; - SyntheticWorkload(int servers, int clients, string type, int random_num): + SyntheticWorkload(int servers, int clients, string type, int random_num, + Messenger::Policy srv_policy, Messenger::Policy cli_policy): lock("SyntheticWorkload::lock"), cli_dispatcher(false), srv_dispatcher(true), rng(time(NULL)) { Messenger *msgr; int base_port = 16800; + entity_addr_t bind_addr; + char addr[64]; for (int i = 0; i < servers; ++i) { - entity_addr_t bind_addr; - char addr[64]; - snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i); msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", getpid()+i); + snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i); bind_addr.parse(addr); msgr->bind(bind_addr); msgr->add_dispatcher_head(&srv_dispatcher); assert(msgr); - msgr->set_default_policy(Messenger::Policy::stateful_server(0, 0)); + msgr->set_default_policy(srv_policy); available_servers.insert(msgr); msgr->start(); } @@ -838,9 +844,13 @@ class SyntheticWorkload { for (int i = 0; i < clients; ++i) { msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1), "client", getpid()+i+servers); - assert(msgr); - msgr->set_default_policy(Messenger::Policy::lossless_client(0, 0)); + snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers); + bind_addr.parse(addr); + msgr->bind(bind_addr); msgr->add_dispatcher_head(&cli_dispatcher); + + assert(msgr); + msgr->set_default_policy(cli_policy); available_clients.insert(msgr); msgr->start(); } @@ -960,7 +970,9 @@ class SyntheticWorkload { }; TEST_P(MessengerTest, SyntheticStressTest) { - SyntheticWorkload test_msg(32, 128, GetParam(), 100); + SyntheticWorkload test_msg(32, 128, GetParam(), 100, + Messenger::Policy::stateful_server(0, 0), + Messenger::Policy::lossless_client(0, 0)); for (int i = 0; i < 100; ++i) { if (!(i % 10)) cerr << "seeding connection " << i << std::endl; test_msg.generate_connection(); @@ -990,7 +1002,9 @@ TEST_P(MessengerTest, SyntheticStressTest) { TEST_P(MessengerTest, SyntheticInjectTest) { g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30"); g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1"); - SyntheticWorkload test_msg(4, 16, GetParam(), 100); + SyntheticWorkload test_msg(4, 16, GetParam(), 100, + Messenger::Policy::stateful_server(0, 0), + Messenger::Policy::lossless_client(0, 0)); for (int i = 0; i < 100; ++i) { if (!(i % 10)) cerr << "seeding connection " << i << std::endl; test_msg.generate_connection();