#include "messages/MPing.h"
#include "messages/MCommand.h"
+#include <boost/random/mersenne_twister.hpp>
+#include <boost/random/uniform_int.hpp>
+#include <boost/random/binomial_distribution.hpp>
#include <gtest/gtest.h>
+typedef boost::mt11213b gen_type;
+
#if GTEST_HAS_PARAM_TEST
#define CHECK_AND_WAIT_TRUE(expr) do { \
Session(ConnectionRef c): RefCountedObject(g_ceph_context), lock("FakeDispatcher::Session::lock"), count(0), con(c) {
}
- uint64_t get_count() {return count;}
+ uint64_t get_count() { return count; }
};
Mutex lock;
}
void ms_handle_fast_connect(Connection *con) {
+ lock.Lock();
cerr << __func__ << con << std::endl;
Session *s = static_cast<Session*>(con->get_priv());
if (!s) {
cerr << __func__ << " con: " << con << " count: " << s->count << std::endl;
}
s->put();
- lock.Lock();
got_connect = true;
cond.Signal();
lock.Unlock();
}
void ms_handle_fast_accept(Connection *con) {
+ Mutex::Locker l(lock);
Session *s = static_cast<Session*>(con->get_priv());
if (!s) {
s = new Session(con);
s->put();
}
bool ms_dispatch(Message *m) {
+ Mutex::Locker l(lock);
Session *s = static_cast<Session*>(m->get_connection()->get_priv());
if (!s) {
s = new Session(m->get_connection());
m->get_connection()->set_priv(s->get());
}
s->put();
- Mutex::Locker l(s->lock);
+ Mutex::Locker l1(s->lock);
s->count++;
cerr << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << std::endl;
- if (is_server)
+ if (is_server) {
reply_message(m);
- lock.Lock();
+ }
got_new = true;
cond.Signal();
- lock.Unlock();
return true;
}
bool ms_handle_reset(Connection *con) {
+ Mutex::Locker l(lock);
cerr << __func__ << con << std::endl;
Session *s = static_cast<Session*>(con->get_priv());
if (s) {
return true;
}
void ms_handle_remote_reset(Connection *con) {
+ Mutex::Locker l(lock);
cerr << __func__ << con << std::endl;
Session *s = static_cast<Session*>(con->get_priv());
if (s) {
got_remote_reset = true;
}
void ms_fast_dispatch(Message *m) {
+ Mutex::Locker l(lock);
Session *s = static_cast<Session*>(m->get_connection()->get_priv());
if (!s) {
s = new Session(m->get_connection());
m->get_connection()->set_priv(s->get());
}
s->put();
- Mutex::Locker (s->lock);
+ Mutex::Locker l1(s->lock);
s->count++;
cerr << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << std::endl;
- if (is_server)
+ if (is_server) {
reply_message(m);
- lock.Lock();
+ }
got_new = true;
cond.Signal();
- lock.Unlock();
}
+
bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
bufferlist& authorizer, bufferlist& authorizer_reply,
bool& isvalid, CryptoKey& session_key) {
return true;
}
-
void reply_message(Message *m) {
MPing *rm = new MPing();
m->get_connection()->send_message(rm);
client_msgr->wait();
}
+class SyntheticDispatcher : public Dispatcher {
+ public:
+ Mutex lock;
+ Cond cond;
+ bool is_server;
+ bool got_new;
+ bool got_remote_reset;
+ bool got_connect;
+ map<ConnectionRef, list<uint64_t> > conn_sent;
+ map<uint64_t, bufferlist> sent;
+ atomic_t index;
+
+ SyntheticDispatcher(bool s): Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"),
+ is_server(s), got_new(false), got_remote_reset(false),
+ got_connect(false), index(0) {}
+ bool ms_can_fast_dispatch_any() const { return true; }
+ bool ms_can_fast_dispatch(Message *m) const {
+ switch (m->get_type()) {
+ case CEPH_MSG_PING:
+ case MSG_COMMAND:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ void ms_handle_fast_connect(Connection *con) {
+ lock.Lock();
+ got_connect = true;
+ cond.Signal();
+ lock.Unlock();
+ }
+ void ms_handle_fast_accept(Connection *con) { }
+ bool ms_dispatch(Message *m) {
+ assert(0);
+ }
+ bool ms_handle_reset(Connection *con) {
+ return true;
+ }
+ void ms_handle_remote_reset(Connection *con) {
+ Mutex::Locker l(lock);
+ got_remote_reset = true;
+ }
+ void ms_fast_dispatch(Message *m) {
+ Mutex::Locker l(lock);
+ if (is_server) {
+ reply_message(m);
+ } else if (m->get_middle().length()) {
+ bufferlist middle = m->get_middle();
+ uint64_t i;
+ ASSERT_EQ(sizeof(uint64_t), middle.length());
+ memcpy(&i, middle.c_str(), middle.length());
+ if (sent.count(i)) {
+ ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
+ ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
+ conn_sent[m->get_connection()].pop_front();
+ sent.erase(i);
+ }
+ }
+ got_new = true;
+ cond.Signal();
+ }
+
+ bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
+ bufferlist& authorizer, bufferlist& authorizer_reply,
+ bool& isvalid, CryptoKey& session_key) {
+ isvalid = true;
+ return true;
+ }
+
+ void reply_message(Message *m) {
+ MPing *rm = new MPing();
+ if (m->get_data_len())
+ rm->set_data(m->get_data());
+ if (m->get_middle().length())
+ rm->set_middle(m->get_middle());
+ m->get_connection()->send_message(rm);
+ }
+
+ void send_message_wrap(ConnectionRef con, Message *m) {
+ {
+ Mutex::Locker l(lock);
+ bufferlist bl;
+ uint64_t i = index.read();
+ index.inc();
+ bufferptr bp(sizeof(i));
+ memcpy(bp.c_str(), (char*)&i, sizeof(i));
+ bl.push_back(bp);
+ m->set_middle(bl);
+ sent[i] = m->get_data();
+ conn_sent[con].push_back(i);
+ }
+ ASSERT_EQ(con->send_message(m), 0);
+ }
+
+ uint64_t get_pending() {
+ Mutex::Locker l(lock);
+ return sent.size();
+ }
+
+ void clear_pending(ConnectionRef con) {
+ Mutex::Locker l(lock);
+
+ for (list<uint64_t>::iterator it = conn_sent[con].begin();
+ it != conn_sent[con].end(); ++it)
+ sent.erase(*it);
+ conn_sent.erase(con);
+ }
+};
+
+
TEST_P(MessengerTest, MessageTest) {
- FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ SyntheticDispatcher cli_dispatcher(false), srv_dispatcher(true);
entity_addr_t bind_addr;
bind_addr.parse("127.0.0.1");
Messenger::Policy p = Messenger::Policy::stateful_server(0, 0);
cli_dispatcher.got_new = false;
}
+ // 2. A very large "data"
+ {
+ bufferlist bl;
+ string s("abcdefghijklmnopqrstuvwxyz");
+ for (int i = 0; i < 1024*30; i++)
+ bl.append(s);
+ MPing *m = new MPing();
+ m->set_data(bl);
+ cli_dispatcher.send_message_wrap(conn, m);
+ utime_t t;
+ t += 1000*1000*500;
+ Mutex::Locker l(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t);
+ ASSERT_TRUE(cli_dispatcher.got_new);
+ cli_dispatcher.got_new = false;
+ }
server_msgr->shutdown();
client_msgr->shutdown();
server_msgr->wait();
client_msgr->wait();
}
+
+class SyntheticWorkload {
+ Mutex lock;
+ Cond cond;
+ set<Messenger*> available_servers;
+ set<Messenger*> available_clients;
+ map<pair<Messenger*, Messenger*>, ConnectionRef> available_connections;
+ SyntheticDispatcher cli_dispatcher, srv_dispatcher;
+ gen_type rng;
+ vector<bufferlist> rand_data;
+
+ public:
+ static const unsigned max_in_flight = 512;
+ static const unsigned max_connections = 128;
+ static const unsigned max_messege_len = 1024 * 1024 * 4;
+
+ SyntheticWorkload(int servers, int clients, string type, int random_num):
+ lock("SyntheticWorkload::lock"), cli_dispatcher(false), srv_dispatcher(true),
+ rng(time(NULL)) {
+ Messenger *msgr;
+ int base_port = 16800;
+ for (int i = 0; i < servers; ++i) {
+ entity_addr_t bind_addr;
+ char addr[64];
+ snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
+ msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
+ "server", getpid()+i);
+ bind_addr.parse(addr);
+ msgr->bind(bind_addr);
+ msgr->add_dispatcher_head(&srv_dispatcher);
+
+ assert(msgr);
+ msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
+ available_servers.insert(msgr);
+ msgr->start();
+ }
+
+ for (int i = 0; i < clients; ++i) {
+ msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
+ "client", getpid()+i+servers);
+ assert(msgr);
+ msgr->set_default_policy(Messenger::Policy::lossless_client(0, 0));
+ msgr->add_dispatcher_head(&cli_dispatcher);
+ available_clients.insert(msgr);
+ msgr->start();
+ }
+
+ static const char alphanum[] = "0123456789"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "abcdefghijklmnopqrstuvwxyz";
+
+ for (int i = 0; i < random_num; i++) {
+ bufferlist bl;
+ boost::uniform_int<> u(1, max_messege_len);
+ uint64_t value_len = u(rng);
+ bufferptr bp(value_len);
+ for (uint64_t j = 0; j < value_len; j++)
+ bp[j] = alphanum[rand() % (sizeof(alphanum) - 1)];
+
+ bp[value_len - 1] = '\0';
+ bl.append(bp);
+ rand_data.push_back(bl);
+ }
+ }
+
+ ConnectionRef _get_random_connection(pair<Messenger*, Messenger*> *p) {
+ while (cli_dispatcher.get_pending() > max_in_flight)
+ usleep(500);
+ assert(lock.is_locked());
+ boost::uniform_int<> choose(0, available_connections.size() - 1);
+ int index = choose(rng);
+ map<pair<Messenger*, Messenger*>, ConnectionRef>::iterator i = available_connections.begin();
+ for (; index > 0; --index, ++i) ;
+ if (p)
+ *p = i->first;
+ return i->second;
+ }
+
+ bool can_create_connection() {
+ return available_connections.size() < max_connections;
+ }
+
+ void generate_connection() {
+ Mutex::Locker l(lock);
+ if (!can_create_connection())
+ return ;
+
+ Messenger *server, *client;
+ {
+ boost::uniform_int<> choose(0, available_servers.size() - 1);
+ int index = choose(rng);
+ set<Messenger*>::iterator i = available_servers.begin();
+ for (; index > 0; --index, ++i) ;
+ server = *i;
+ }
+ {
+ boost::uniform_int<> choose(0, available_clients.size() - 1);
+ int index = choose(rng);
+ set<Messenger*>::iterator i = available_clients.begin();
+ for (; index > 0; --index, ++i) ;
+ client = *i;
+ }
+
+ if (!available_connections.count(make_pair(client, server))) {
+ ConnectionRef conn = client->get_connection(server->get_myinst());
+ available_connections[make_pair(client, server)] = conn;
+ }
+ }
+
+ void send_message() {
+ Message *m = new MPing();
+ bufferlist bl;
+ boost::uniform_int<> u(0, rand_data.size()-1);
+ uint64_t index = u(rng);
+ bl = rand_data[index];
+ m->set_data(bl);
+ Mutex::Locker l(lock);
+ ConnectionRef conn = _get_random_connection(NULL);
+ cli_dispatcher.send_message_wrap(conn, m);
+ }
+
+ void drop_connection() {
+ pair<Messenger*, Messenger*> p;
+ Mutex::Locker l(lock);
+ if (available_connections.size() < 10)
+ return;
+ ConnectionRef conn = _get_random_connection(&p);
+ cli_dispatcher.clear_pending(conn);
+ conn->mark_down();
+ ASSERT_EQ(available_connections.erase(p), 1U);
+ }
+
+ void print_internal_state() {
+ Mutex::Locker l(lock);
+ cerr << "available_connections: " << available_connections.size()
+ << " inflight messages: " << cli_dispatcher.get_pending() << std::endl;
+ }
+
+ void wait_for_done() {
+ while (cli_dispatcher.get_pending())
+ usleep(500);
+ for (set<Messenger*>::iterator it = available_servers.begin();
+ it != available_servers.end(); ++it) {
+ (*it)->shutdown();
+ (*it)->wait();
+ }
+ for (set<Messenger*>::iterator it = available_clients.begin();
+ it != available_clients.end(); ++it) {
+ (*it)->shutdown();
+ (*it)->wait();
+ }
+ }
+};
+
+TEST_P(MessengerTest, SyntheticTest) {
+ SyntheticWorkload test_msg(32, 128, GetParam(), 100);
+ for (int i = 0; i < 100; ++i) {
+ if (!(i % 10)) cerr << "seeding connection " << i << std::endl;
+ test_msg.generate_connection();
+ }
+ gen_type rng(time(NULL));
+ for (int i = 0; i < 10000; ++i) {
+ if (!(i % 10)) {
+ cerr << "Op " << i << ": ";
+ test_msg.print_internal_state();
+ }
+ boost::uniform_int<> true_false(0, 99);
+ int val = true_false(rng);
+ if (val > 85) {
+ test_msg.generate_connection();
+ } else if (val > 70) {
+ test_msg.drop_connection();
+ } else if (val > 10) {
+ test_msg.send_message();
+ } else {
+ usleep(rand() % 500 + 100);
+ }
+ }
+ test_msg.wait_for_done();
+}
+
+
class MarkdownDispatcher : public Dispatcher {
Mutex lock;
set<Connection*> conns;