}
void ms_fast_dispatch(Message *m) {
Mutex::Locker l(lock);
- if (is_server) {
- reply_message(m);
- } else if (m->get_middle().length()) {
- bufferlist middle = m->get_middle();
- uint64_t i;
- ASSERT_EQ(sizeof(uint64_t), middle.length());
- memcpy(&i, middle.c_str(), middle.length());
- if (sent.count(i)) {
- ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
- ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
- conn_sent[m->get_connection()].pop_front();
- sent.erase(i);
- }
+ uint64_t i;
+ bool reply;
+ assert(m->get_middle().length());
+ bufferlist::iterator blp = m->get_middle().begin();
+ ::decode(i, blp);
+ ::decode(reply, blp);
+ if (reply) {
+ cerr << __func__ << " reply=" << reply << " i=" << i << std::endl;
+ reply_message(m, i);
+ } else if (sent.count(i)) {
+ cerr << __func__ << " reply=" << reply << " i=" << i << std::endl;
+ ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
+ ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
+ conn_sent[m->get_connection()].pop_front();
+ sent.erase(i);
}
got_new = true;
cond.Signal();
return true;
}
- void reply_message(Message *m) {
+ void reply_message(Message *m, uint64_t i) {
+ bufferlist bl;
+ ::encode(i, bl);
+ ::encode(false, bl);
MPing *rm = new MPing();
if (m->get_data_len())
rm->set_data(m->get_data());
if (m->get_middle().length())
- rm->set_middle(m->get_middle());
+ rm->set_middle(bl);
m->get_connection()->send_message(rm);
}
bufferlist bl;
uint64_t i = index.read();
index.inc();
- bufferptr bp(sizeof(i));
- memcpy(bp.c_str(), (char*)&i, sizeof(i));
- bl.push_back(bp);
+ ::encode(i, bl);
+ ::encode(true, bl);
m->set_middle(bl);
- sent[i] = m->get_data();
- conn_sent[con].push_back(i);
+ if (!con->get_messenger()->get_default_policy().lossy) {
+ sent[i] = m->get_data();
+ conn_sent[con].push_back(i);
+ }
}
ASSERT_EQ(con->send_message(m), 0);
}
cmds.push_back(s);
MCommand *m = new MCommand(uuid);
m->cmd = cmds;
- ASSERT_EQ(conn->send_message(m), 0);
+ cli_dispatcher.send_message_wrap(conn, m);
utime_t t;
t += 1000*1000*500;
Mutex::Locker l(cli_dispatcher.lock);
set<Messenger*> available_servers;
set<Messenger*> available_clients;
map<pair<Messenger*, Messenger*>, ConnectionRef> available_connections;
- SyntheticDispatcher cli_dispatcher, srv_dispatcher;
+ SyntheticDispatcher dispatcher;
gen_type rng;
vector<bufferlist> rand_data;
SyntheticWorkload(int servers, int clients, string type, int random_num,
Messenger::Policy srv_policy, Messenger::Policy cli_policy):
- lock("SyntheticWorkload::lock"), cli_dispatcher(false), srv_dispatcher(true),
- rng(time(NULL)) {
+ lock("SyntheticWorkload::lock"), dispatcher(false), rng(time(NULL)) {
Messenger *msgr;
int base_port = 16800;
entity_addr_t bind_addr;
snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
bind_addr.parse(addr);
msgr->bind(bind_addr);
- msgr->add_dispatcher_head(&srv_dispatcher);
+ msgr->add_dispatcher_head(&dispatcher);
assert(msgr);
msgr->set_default_policy(srv_policy);
bind_addr.parse(addr);
msgr->bind(bind_addr);
}
- msgr->add_dispatcher_head(&cli_dispatcher);
+ msgr->add_dispatcher_head(&dispatcher);
assert(msgr);
msgr->set_default_policy(cli_policy);
}
ConnectionRef _get_random_connection(pair<Messenger*, Messenger*> *p) {
- while (cli_dispatcher.get_pending() > max_in_flight)
+ while (dispatcher.get_pending() > max_in_flight)
usleep(500);
assert(lock.is_locked());
boost::uniform_int<> choose(0, available_connections.size() - 1);
client = *i;
}
- if (!available_connections.count(make_pair(client, server))) {
- ConnectionRef conn = client->get_connection(server->get_myinst());
- available_connections[make_pair(client, server)] = conn;
+ pair<Messenger*, Messenger*> p;
+ {
+ boost::uniform_int<> choose(0, available_servers.size() - 1);
+ int index = choose(rng);
+ if (server->get_default_policy().server || index % 2) {
+ conn = client->get_connection(server->get_myinst());
+ p = make_pair(client, server);
+ } else {
+ conn = server->get_connection(client->get_myinst());
+ p = make_pair(server, client);
+ }
+ }
+ if (!available_connections.count(p)) {
+ ConnectionRef conn = p.first->get_connection(p.second->get_myinst());
+ available_connections[p] = conn;
}
}
m->set_data(bl);
Mutex::Locker l(lock);
ConnectionRef conn = _get_random_connection(NULL);
- cli_dispatcher.send_message_wrap(conn, m);
+ dispatcher.send_message_wrap(conn, m);
}
void drop_connection() {
if (available_connections.size() < 10)
return;
ConnectionRef conn = _get_random_connection(&p);
- cli_dispatcher.clear_pending(conn);
+ dispatcher.clear_pending(conn);
conn->mark_down();
ASSERT_EQ(available_connections.erase(p), 1U);
}
void print_internal_state() {
Mutex::Locker l(lock);
cerr << "available_connections: " << available_connections.size()
- << " inflight messages: " << cli_dispatcher.get_pending() << std::endl;
+ << " inflight messages: " << dispatcher.get_pending() << std::endl;
}
void wait_for_done() {
uint64_t i = 0;
- while (cli_dispatcher.get_pending()) {
+ while (dispatcher.get_pending()) {
usleep(1000*100);
if (i++ % 50 == 0)
print_internal_state();
};
TEST_P(MessengerTest, SyntheticStressTest) {
- SyntheticWorkload test_msg(32, 128, GetParam(), 100,
+ SyntheticWorkload test_msg(8, 32, GetParam(), 100,
Messenger::Policy::stateful_server(0, 0),
Messenger::Policy::lossless_client(0, 0));
for (int i = 0; i < 100; ++i) {
test_msg.wait_for_done();
}
+TEST_P(MessengerTest, SyntheticStressTest1) {
+ SyntheticWorkload test_msg(16, 32, GetParam(), 100,
+ Messenger::Policy::lossless_peer_reuse(0, 0),
+ Messenger::Policy::lossless_peer_reuse(0, 0));
+ for (int i = 0; i < 10; ++i) {
+ if (!(i % 10)) cerr << "seeding connection " << i << std::endl;
+ test_msg.generate_connection();
+ }
+ gen_type rng(time(NULL));
+ for (int i = 0; i < 10000; ++i) {
+ if (!(i % 10)) {
+ cerr << "Op " << i << ": ";
+ test_msg.print_internal_state();
+ }
+ boost::uniform_int<> true_false(0, 99);
+ int val = true_false(rng);
+ if (val > 80) {
+ test_msg.generate_connection();
+ } else if (val > 60) {
+ test_msg.drop_connection();
+ } else if (val > 10) {
+ test_msg.send_message();
+ } else {
+ usleep(rand() % 1000 + 500);
+ }
+ }
+ test_msg.wait_for_done();
+}
+
TEST_P(MessengerTest, SyntheticInjectTest) {
g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
- SyntheticWorkload test_msg(4, 16, GetParam(), 100,
+ SyntheticWorkload test_msg(8, 32, GetParam(), 100,
Messenger::Policy::stateful_server(0, 0),
Messenger::Policy::lossless_client(0, 0));
for (int i = 0; i < 100; ++i) {
TEST_P(MessengerTest, SyntheticInjectTest2) {
g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
- SyntheticWorkload test_msg(4, 16, GetParam(), 100,
+ SyntheticWorkload test_msg(8, 16, GetParam(), 100,
Messenger::Policy::lossless_peer_reuse(0, 0),
Messenger::Policy::lossless_peer_reuse(0, 0));
for (int i = 0; i < 100; ++i) {