}
void ms_handle_remote_reset(Connection *con) {
Mutex::Locker l(lock);
+ list<uint64_t> c = conn_sent[con];
+ for (list<uint64_t>::iterator it = c.begin();
+ it != c.end(); it++)
+ sent.erase(*it);
+ conn_sent.erase(con);
got_remote_reset = true;
}
void ms_fast_dispatch(Message *m) {
static const unsigned max_connections = 128;
static const unsigned max_message_len = 1024 * 1024 * 4;
- SyntheticWorkload(int servers, int clients, string type, int random_num):
+ SyntheticWorkload(int servers, int clients, string type, int random_num,
+ Messenger::Policy srv_policy, Messenger::Policy cli_policy):
lock("SyntheticWorkload::lock"), cli_dispatcher(false), srv_dispatcher(true),
rng(time(NULL)) {
Messenger *msgr;
int base_port = 16800;
+ entity_addr_t bind_addr;
+ char addr[64];
for (int i = 0; i < servers; ++i) {
- entity_addr_t bind_addr;
- char addr[64];
- snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
"server", getpid()+i);
+ snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
bind_addr.parse(addr);
msgr->bind(bind_addr);
msgr->add_dispatcher_head(&srv_dispatcher);
assert(msgr);
- msgr->set_default_policy(Messenger::Policy::stateful_server(0, 0));
+ msgr->set_default_policy(srv_policy);
available_servers.insert(msgr);
msgr->start();
}
for (int i = 0; i < clients; ++i) {
msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
"client", getpid()+i+servers);
- assert(msgr);
- msgr->set_default_policy(Messenger::Policy::lossless_client(0, 0));
+ snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers);
+ bind_addr.parse(addr);
+ msgr->bind(bind_addr);
msgr->add_dispatcher_head(&cli_dispatcher);
+
+ assert(msgr);
+ msgr->set_default_policy(cli_policy);
available_clients.insert(msgr);
msgr->start();
}
};
TEST_P(MessengerTest, SyntheticStressTest) {
- SyntheticWorkload test_msg(32, 128, GetParam(), 100);
+ SyntheticWorkload test_msg(32, 128, GetParam(), 100,
+ Messenger::Policy::stateful_server(0, 0),
+ Messenger::Policy::lossless_client(0, 0));
for (int i = 0; i < 100; ++i) {
if (!(i % 10)) cerr << "seeding connection " << i << std::endl;
test_msg.generate_connection();
TEST_P(MessengerTest, SyntheticInjectTest) {
g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
- SyntheticWorkload test_msg(4, 16, GetParam(), 100);
+ SyntheticWorkload test_msg(4, 16, GetParam(), 100,
+ Messenger::Policy::stateful_server(0, 0),
+ Messenger::Policy::lossless_client(0, 0));
for (int i = 0; i < 100; ++i) {
if (!(i % 10)) cerr << "seeding connection " << i << std::endl;
test_msg.generate_connection();