client_msgr->wait();
}
+TEST_P(MessengerTest, MessageTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("127.0.0.1");
+ Messenger::Policy p = Messenger::Policy::stateful_server(0, 0);
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
+ p = Messenger::Policy::lossless_peer(0, 0);
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
+
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+
+ // 1. A very large "front"(as well as "payload")
+ // Because a external message need to invade Messenger::decode_message,
+ // here we only use existing message class(MCommand)
+ ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+ {
+ uuid_d uuid;
+ uuid.generate_random();
+ vector<string> cmds;
+ string s("abcdefghijklmnopqrstuvwxyz");
+ for (int i = 0; i < 1024*30; i++)
+ cmds.push_back(s);
+ MCommand *m = new MCommand(uuid);
+ m->cmd = cmds;
+ conn->send_message(m);
+ utime_t t;
+ t += 1000*1000*500;
+ Mutex::Locker l(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t);
+ ASSERT_TRUE(cli_dispatcher.got_new);
+ cli_dispatcher.got_new = false;
+ }
+
+ // 2. A very large "data"
+ {
+ bufferlist bl;
+ string s("abcdefghijklmnopqrstuvwxyz");
+ for (int i = 0; i < 1024*30; i++)
+ bl.append(s);
+ MPing *m = new MPing();
+ m->set_data(bl);
+ conn->send_message(m);
+ utime_t t;
+ t += 1000*1000*500;
+ Mutex::Locker l(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_new)
+ cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t);
+ ASSERT_TRUE(cli_dispatcher.got_new);
+ cli_dispatcher.got_new = false;
+ }
+ server_msgr->shutdown();
+ client_msgr->shutdown();
+ server_msgr->wait();
+ client_msgr->wait();
+}
+
+
+class SyntheticWorkload;
+
class SyntheticDispatcher : public Dispatcher {
public:
Mutex lock;
map<ConnectionRef, list<uint64_t> > conn_sent;
map<uint64_t, bufferlist> sent;
atomic_t index;
+ SyntheticWorkload *workload;
- SyntheticDispatcher(bool s): Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"),
- is_server(s), got_new(false), got_remote_reset(false),
- got_connect(false), index(0) {}
+ SyntheticDispatcher(bool s, SyntheticWorkload *wl):
+ Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false),
+ got_remote_reset(false), got_connect(false), index(0), workload(wl) {}
bool ms_can_fast_dispatch_any() const { return true; }
bool ms_can_fast_dispatch(Message *m) const {
switch (m->get_type()) {
bool ms_dispatch(Message *m) {
assert(0);
}
- bool ms_handle_reset(Connection *con) {
- return true;
- }
+ bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con) {
Mutex::Locker l(lock);
list<uint64_t> c = conn_sent[con];
};
-TEST_P(MessengerTest, MessageTest) {
- SyntheticDispatcher cli_dispatcher(false), srv_dispatcher(true);
- entity_addr_t bind_addr;
- bind_addr.parse("127.0.0.1");
- Messenger::Policy p = Messenger::Policy::stateful_server(0, 0);
- server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
- p = Messenger::Policy::lossless_peer(0, 0);
- client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
-
- server_msgr->bind(bind_addr);
- server_msgr->add_dispatcher_head(&srv_dispatcher);
- server_msgr->start();
- client_msgr->add_dispatcher_head(&cli_dispatcher);
- client_msgr->start();
-
-
- // 1. A very large "front"(as well as "payload")
- // Because a external message need to invade Messenger::decode_message,
- // here we only use existing message class(MCommand)
- ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
- {
- uuid_d uuid;
- uuid.generate_random();
- vector<string> cmds;
- string s("abcdefghijklmnopqrstuvwxyz");
- for (int i = 0; i < 1024*30; i++)
- cmds.push_back(s);
- MCommand *m = new MCommand(uuid);
- m->cmd = cmds;
- cli_dispatcher.send_message_wrap(conn, m);
- utime_t t;
- t += 1000*1000*500;
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t);
- ASSERT_TRUE(cli_dispatcher.got_new);
- cli_dispatcher.got_new = false;
- }
-
- // 2. A very large "data"
- {
- bufferlist bl;
- string s("abcdefghijklmnopqrstuvwxyz");
- for (int i = 0; i < 1024*30; i++)
- bl.append(s);
- MPing *m = new MPing();
- m->set_data(bl);
- cli_dispatcher.send_message_wrap(conn, m);
- utime_t t;
- t += 1000*1000*500;
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t);
- ASSERT_TRUE(cli_dispatcher.got_new);
- cli_dispatcher.got_new = false;
- }
- server_msgr->shutdown();
- client_msgr->shutdown();
- server_msgr->wait();
- client_msgr->wait();
-}
-
-
class SyntheticWorkload {
Mutex lock;
Cond cond;
set<Messenger*> available_servers;
set<Messenger*> available_clients;
- map<pair<Messenger*, Messenger*>, ConnectionRef> available_connections;
+ map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
SyntheticDispatcher dispatcher;
gen_type rng;
vector<bufferlist> rand_data;
SyntheticWorkload(int servers, int clients, string type, int random_num,
Messenger::Policy srv_policy, Messenger::Policy cli_policy):
- lock("SyntheticWorkload::lock"), dispatcher(false), rng(time(NULL)) {
+ lock("SyntheticWorkload::lock"), dispatcher(false, this), rng(time(NULL)) {
Messenger *msgr;
int base_port = 16800;
entity_addr_t bind_addr;
}
}
- ConnectionRef _get_random_connection(pair<Messenger*, Messenger*> *p) {
- while (dispatcher.get_pending() > max_in_flight)
+ ConnectionRef _get_random_connection() {
+ while (dispatcher.get_pending() > max_in_flight) {
+ lock.Unlock();
usleep(500);
+ lock.Lock();
+ }
assert(lock.is_locked());
boost::uniform_int<> choose(0, available_connections.size() - 1);
int index = choose(rng);
- map<pair<Messenger*, Messenger*>, ConnectionRef>::iterator i = available_connections.begin();
+ map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
for (; index > 0; --index, ++i) ;
- if (p)
- *p = i->first;
- return i->second;
+ return i->first;
}
bool can_create_connection() {
else
p = make_pair(server, client);
}
- if (!available_connections.count(p)) {
- ConnectionRef conn = p.first->get_connection(p.second->get_myinst());
- available_connections[p] = conn;
- }
+ ConnectionRef conn = p.first->get_connection(p.second->get_myinst());
+ available_connections[conn] = p;
}
void send_message() {
bl = rand_data[index];
m->set_data(bl);
Mutex::Locker l(lock);
- ConnectionRef conn = _get_random_connection(NULL);
+ ConnectionRef conn = _get_random_connection();
dispatcher.send_message_wrap(conn, m);
}
void drop_connection() {
- pair<Messenger*, Messenger*> p;
Mutex::Locker l(lock);
if (available_connections.size() < 10)
return;
- ConnectionRef conn = _get_random_connection(&p);
+ ConnectionRef conn = _get_random_connection();
dispatcher.clear_pending(conn);
conn->mark_down();
- ASSERT_EQ(available_connections.erase(p), 1U);
+ ASSERT_EQ(available_connections.erase(conn), 1U);
}
void print_internal_state() {
}
available_clients.clear();
}
+
+ void handle_reset(Connection *con) {
+ Mutex::Locker l(lock);
+ available_connections.erase(con);
+ dispatcher.clear_pending(con);
+ }
};
+bool SyntheticDispatcher::ms_handle_reset(Connection *con) {
+ workload->handle_reset(con);
+ return true;
+}
+
TEST_P(MessengerTest, SyntheticStressTest) {
SyntheticWorkload test_msg(8, 32, GetParam(), 100,
Messenger::Policy::stateful_server(0, 0),
}
+TEST_P(MessengerTest, SyntheticInjectTest4) {
+ g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
+ g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
+ SyntheticWorkload test_msg(16, 32, GetParam(), 100,
+ Messenger::Policy::lossless_peer(0, 0),
+ Messenger::Policy::lossless_peer(0, 0));
+ for (int i = 0; i < 100; ++i) {
+ if (!(i % 10)) cerr << "seeding connection " << i << std::endl;
+ test_msg.generate_connection();
+ }
+ gen_type rng(time(NULL));
+ for (int i = 0; i < 1000; ++i) {
+ if (!(i % 10)) {
+ cerr << "Op " << i << ": ";
+ test_msg.print_internal_state();
+ }
+ boost::uniform_int<> true_false(0, 99);
+ int val = true_false(rng);
+ if (val > 95) {
+ test_msg.generate_connection();
+ } else if (val > 80) {
+ // test_msg.drop_connection();
+ } else if (val > 10) {
+ test_msg.send_message();
+ } else {
+ usleep(rand() % 500 + 100);
+ }
+ }
+ test_msg.wait_for_done();
+ g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
+ g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
+}
+
+
class MarkdownDispatcher : public Dispatcher {
Mutex lock;
set<ConnectionRef> conns;