From: Haomai Wang Date: Mon, 12 Jan 2015 04:14:39 +0000 (+0800) Subject: test_msgr: Add SyntheticWorkload to do message measurement X-Git-Tag: v0.93~247^2~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4b900a6f829c07fb338ac9358ecb837c5d54ac73;p=ceph.git test_msgr: Add SyntheticWorkload to do message measurement Signed-off-by: Haomai Wang --- diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 639db808689..5df34f3790e 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -30,8 +30,13 @@ #include "messages/MPing.h" #include "messages/MCommand.h" +#include +#include +#include #include +typedef boost::mt11213b gen_type; + #if GTEST_HAS_PARAM_TEST #define CHECK_AND_WAIT_TRUE(expr) do { \ @@ -73,7 +78,7 @@ class FakeDispatcher : public Dispatcher { Session(ConnectionRef c): RefCountedObject(g_ceph_context), lock("FakeDispatcher::Session::lock"), count(0), con(c) { } - uint64_t get_count() {return count;} + uint64_t get_count() { return count; } }; Mutex lock; @@ -97,6 +102,7 @@ class FakeDispatcher : public Dispatcher { } void ms_handle_fast_connect(Connection *con) { + lock.Lock(); cerr << __func__ << con << std::endl; Session *s = static_cast(con->get_priv()); if (!s) { @@ -105,12 +111,12 @@ class FakeDispatcher : public Dispatcher { cerr << __func__ << " con: " << con << " count: " << s->count << std::endl; } s->put(); - lock.Lock(); got_connect = true; cond.Signal(); lock.Unlock(); } void ms_handle_fast_accept(Connection *con) { + Mutex::Locker l(lock); Session *s = static_cast(con->get_priv()); if (!s) { s = new Session(con); @@ -119,24 +125,25 @@ class FakeDispatcher : public Dispatcher { s->put(); } bool ms_dispatch(Message *m) { + Mutex::Locker l(lock); Session *s = static_cast(m->get_connection()->get_priv()); if (!s) { s = new Session(m->get_connection()); m->get_connection()->set_priv(s->get()); } s->put(); - Mutex::Locker l(s->lock); + Mutex::Locker l1(s->lock); s->count++; cerr << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << std::endl; - if (is_server) + if (is_server) { reply_message(m); - lock.Lock(); + } got_new = true; cond.Signal(); - lock.Unlock(); return true; } bool ms_handle_reset(Connection *con) { + Mutex::Locker l(lock); cerr << __func__ << con << std::endl; Session *s = static_cast(con->get_priv()); if (s) { @@ -147,6 +154,7 @@ class FakeDispatcher : public Dispatcher { return true; } void ms_handle_remote_reset(Connection *con) { + Mutex::Locker l(lock); cerr << __func__ << con << std::endl; Session *s = static_cast(con->get_priv()); if (s) { @@ -157,22 +165,23 @@ class FakeDispatcher : public Dispatcher { got_remote_reset = true; } void ms_fast_dispatch(Message *m) { + Mutex::Locker l(lock); Session *s = static_cast(m->get_connection()->get_priv()); if (!s) { s = new Session(m->get_connection()); m->get_connection()->set_priv(s->get()); } s->put(); - Mutex::Locker (s->lock); + Mutex::Locker l1(s->lock); s->count++; cerr << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << std::endl; - if (is_server) + if (is_server) { reply_message(m); - lock.Lock(); + } got_new = true; cond.Signal(); - lock.Unlock(); } + bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& authorizer, bufferlist& authorizer_reply, bool& isvalid, CryptoKey& session_key) { @@ -180,7 +189,6 @@ class FakeDispatcher : public Dispatcher { return true; } - void reply_message(Message *m) { MPing *rm = new MPing(); m->get_connection()->send_message(rm); @@ -555,8 +563,119 @@ TEST_P(MessengerTest, ClientStandbyTest) { client_msgr->wait(); } +class SyntheticDispatcher : public Dispatcher { + public: + Mutex lock; + Cond cond; + bool is_server; + bool got_new; + bool got_remote_reset; + bool got_connect; + map > conn_sent; + map sent; + atomic_t index; + + SyntheticDispatcher(bool s): Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), + is_server(s), got_new(false), got_remote_reset(false), + got_connect(false), index(0) {} + bool ms_can_fast_dispatch_any() const { return true; } + bool ms_can_fast_dispatch(Message *m) const { + switch (m->get_type()) { + case CEPH_MSG_PING: + case MSG_COMMAND: + return true; + default: + return false; + } + } + + void ms_handle_fast_connect(Connection *con) { + lock.Lock(); + got_connect = true; + cond.Signal(); + lock.Unlock(); + } + void ms_handle_fast_accept(Connection *con) { } + bool ms_dispatch(Message *m) { + assert(0); + } + bool ms_handle_reset(Connection *con) { + return true; + } + void ms_handle_remote_reset(Connection *con) { + Mutex::Locker l(lock); + got_remote_reset = true; + } + void ms_fast_dispatch(Message *m) { + Mutex::Locker l(lock); + if (is_server) { + reply_message(m); + } else if (m->get_middle().length()) { + bufferlist middle = m->get_middle(); + uint64_t i; + ASSERT_EQ(sizeof(uint64_t), middle.length()); + memcpy(&i, middle.c_str(), middle.length()); + if (sent.count(i)) { + ASSERT_EQ(conn_sent[m->get_connection()].front(), i); + ASSERT_TRUE(m->get_data().contents_equal(sent[i])); + conn_sent[m->get_connection()].pop_front(); + sent.erase(i); + } + } + got_new = true; + cond.Signal(); + } + + bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, + bufferlist& authorizer, bufferlist& authorizer_reply, + bool& isvalid, CryptoKey& session_key) { + isvalid = true; + return true; + } + + void reply_message(Message *m) { + MPing *rm = new MPing(); + if (m->get_data_len()) + rm->set_data(m->get_data()); + if (m->get_middle().length()) + rm->set_middle(m->get_middle()); + m->get_connection()->send_message(rm); + } + + void send_message_wrap(ConnectionRef con, Message *m) { + { + Mutex::Locker l(lock); + bufferlist bl; + uint64_t i = index.read(); + index.inc(); + bufferptr bp(sizeof(i)); + memcpy(bp.c_str(), (char*)&i, sizeof(i)); + bl.push_back(bp); + m->set_middle(bl); + sent[i] = m->get_data(); + conn_sent[con].push_back(i); + } + ASSERT_EQ(con->send_message(m), 0); + } + + uint64_t get_pending() { + Mutex::Locker l(lock); + return sent.size(); + } + + void clear_pending(ConnectionRef con) { + Mutex::Locker l(lock); + + for (list::iterator it = conn_sent[con].begin(); + it != conn_sent[con].end(); ++it) + sent.erase(*it); + conn_sent.erase(con); + } +}; + + TEST_P(MessengerTest, MessageTest) { - FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + SyntheticDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr; bind_addr.parse("127.0.0.1"); Messenger::Policy p = Messenger::Policy::stateful_server(0, 0); @@ -594,12 +713,211 @@ TEST_P(MessengerTest, MessageTest) { cli_dispatcher.got_new = false; } + // 2. A very large "data" + { + bufferlist bl; + string s("abcdefghijklmnopqrstuvwxyz"); + for (int i = 0; i < 1024*30; i++) + bl.append(s); + MPing *m = new MPing(); + m->set_data(bl); + cli_dispatcher.send_message_wrap(conn, m); + utime_t t; + t += 1000*1000*500; + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t); + ASSERT_TRUE(cli_dispatcher.got_new); + cli_dispatcher.got_new = false; + } server_msgr->shutdown(); client_msgr->shutdown(); server_msgr->wait(); client_msgr->wait(); } + +class SyntheticWorkload { + Mutex lock; + Cond cond; + set available_servers; + set available_clients; + map, ConnectionRef> available_connections; + SyntheticDispatcher cli_dispatcher, srv_dispatcher; + gen_type rng; + vector rand_data; + + public: + static const unsigned max_in_flight = 512; + static const unsigned max_connections = 128; + static const unsigned max_messege_len = 1024 * 1024 * 4; + + SyntheticWorkload(int servers, int clients, string type, int random_num): + lock("SyntheticWorkload::lock"), cli_dispatcher(false), srv_dispatcher(true), + rng(time(NULL)) { + Messenger *msgr; + int base_port = 16800; + for (int i = 0; i < servers; ++i) { + entity_addr_t bind_addr; + char addr[64]; + snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i); + msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), + "server", getpid()+i); + bind_addr.parse(addr); + msgr->bind(bind_addr); + msgr->add_dispatcher_head(&srv_dispatcher); + + assert(msgr); + msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0)); + available_servers.insert(msgr); + msgr->start(); + } + + for (int i = 0; i < clients; ++i) { + msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1), + "client", getpid()+i+servers); + assert(msgr); + msgr->set_default_policy(Messenger::Policy::lossless_client(0, 0)); + msgr->add_dispatcher_head(&cli_dispatcher); + available_clients.insert(msgr); + msgr->start(); + } + + static const char alphanum[] = "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + + for (int i = 0; i < random_num; i++) { + bufferlist bl; + boost::uniform_int<> u(1, max_messege_len); + uint64_t value_len = u(rng); + bufferptr bp(value_len); + for (uint64_t j = 0; j < value_len; j++) + bp[j] = alphanum[rand() % (sizeof(alphanum) - 1)]; + + bp[value_len - 1] = '\0'; + bl.append(bp); + rand_data.push_back(bl); + } + } + + ConnectionRef _get_random_connection(pair *p) { + while (cli_dispatcher.get_pending() > max_in_flight) + usleep(500); + assert(lock.is_locked()); + boost::uniform_int<> choose(0, available_connections.size() - 1); + int index = choose(rng); + map, ConnectionRef>::iterator i = available_connections.begin(); + for (; index > 0; --index, ++i) ; + if (p) + *p = i->first; + return i->second; + } + + bool can_create_connection() { + return available_connections.size() < max_connections; + } + + void generate_connection() { + Mutex::Locker l(lock); + if (!can_create_connection()) + return ; + + Messenger *server, *client; + { + boost::uniform_int<> choose(0, available_servers.size() - 1); + int index = choose(rng); + set::iterator i = available_servers.begin(); + for (; index > 0; --index, ++i) ; + server = *i; + } + { + boost::uniform_int<> choose(0, available_clients.size() - 1); + int index = choose(rng); + set::iterator i = available_clients.begin(); + for (; index > 0; --index, ++i) ; + client = *i; + } + + if (!available_connections.count(make_pair(client, server))) { + ConnectionRef conn = client->get_connection(server->get_myinst()); + available_connections[make_pair(client, server)] = conn; + } + } + + void send_message() { + Message *m = new MPing(); + bufferlist bl; + boost::uniform_int<> u(0, rand_data.size()-1); + uint64_t index = u(rng); + bl = rand_data[index]; + m->set_data(bl); + Mutex::Locker l(lock); + ConnectionRef conn = _get_random_connection(NULL); + cli_dispatcher.send_message_wrap(conn, m); + } + + void drop_connection() { + pair p; + Mutex::Locker l(lock); + if (available_connections.size() < 10) + return; + ConnectionRef conn = _get_random_connection(&p); + cli_dispatcher.clear_pending(conn); + conn->mark_down(); + ASSERT_EQ(available_connections.erase(p), 1U); + } + + void print_internal_state() { + Mutex::Locker l(lock); + cerr << "available_connections: " << available_connections.size() + << " inflight messages: " << cli_dispatcher.get_pending() << std::endl; + } + + void wait_for_done() { + while (cli_dispatcher.get_pending()) + usleep(500); + for (set::iterator it = available_servers.begin(); + it != available_servers.end(); ++it) { + (*it)->shutdown(); + (*it)->wait(); + } + for (set::iterator it = available_clients.begin(); + it != available_clients.end(); ++it) { + (*it)->shutdown(); + (*it)->wait(); + } + } +}; + +TEST_P(MessengerTest, SyntheticTest) { + SyntheticWorkload test_msg(32, 128, GetParam(), 100); + for (int i = 0; i < 100; ++i) { + if (!(i % 10)) cerr << "seeding connection " << i << std::endl; + test_msg.generate_connection(); + } + gen_type rng(time(NULL)); + for (int i = 0; i < 10000; ++i) { + if (!(i % 10)) { + cerr << "Op " << i << ": "; + test_msg.print_internal_state(); + } + boost::uniform_int<> true_false(0, 99); + int val = true_false(rng); + if (val > 85) { + test_msg.generate_connection(); + } else if (val > 70) { + test_msg.drop_connection(); + } else if (val > 10) { + test_msg.send_message(); + } else { + usleep(rand() % 500 + 100); + } + } + test_msg.wait_for_done(); +} + + class MarkdownDispatcher : public Dispatcher { Mutex lock; set conns;