public:
static const unsigned max_in_flight = 512;
static const unsigned max_connections = 128;
- static const unsigned max_messege_len = 1024 * 1024 * 4;
+ static const unsigned max_message_len = 1024 * 1024 * 4;
SyntheticWorkload(int servers, int clients, string type, int random_num):
lock("SyntheticWorkload::lock"), cli_dispatcher(false), srv_dispatcher(true),
msgr->add_dispatcher_head(&srv_dispatcher);
assert(msgr);
- msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
+ msgr->set_default_policy(Messenger::Policy::stateful_server(0, 0));
available_servers.insert(msgr);
msgr->start();
}
msgr->start();
}
- static const char alphanum[] = "0123456789"
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "abcdefghijklmnopqrstuvwxyz";
-
for (int i = 0; i < random_num; i++) {
bufferlist bl;
- boost::uniform_int<> u(1, max_messege_len);
+ boost::uniform_int<> u(32, max_message_len);
uint64_t value_len = u(rng);
bufferptr bp(value_len);
- for (uint64_t j = 0; j < value_len; j++)
- bp[j] = alphanum[rand() % (sizeof(alphanum) - 1)];
+ for (uint64_t j = 0; j < value_len; ) {
+ memcpy(bp.c_str()+j, &i, sizeof(i));
+ j += 4096;
+ }
- bp[value_len - 1] = '\0';
bl.append(bp);
rand_data.push_back(bl);
}
}
void wait_for_done() {
- while (cli_dispatcher.get_pending())
- usleep(500);
+ uint64_t i = 0;
+ while (cli_dispatcher.get_pending()) {
+ usleep(1000*100);
+ if (i++ % 50 == 0)
+ print_internal_state();
+ }
for (set<Messenger*>::iterator it = available_servers.begin();
it != available_servers.end(); ++it) {
(*it)->shutdown();
}
};
-TEST_P(MessengerTest, SyntheticTest) {
+TEST_P(MessengerTest, SyntheticStressTest) {
SyntheticWorkload test_msg(32, 128, GetParam(), 100);
for (int i = 0; i < 100; ++i) {
if (!(i % 10)) cerr << "seeding connection " << i << std::endl;
}
+TEST_P(MessengerTest, SyntheticInjectTest) {
+ g_ceph_context->_conf->set_val("ms_inject_socket_failures", "10");
+ g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
+ SyntheticWorkload test_msg(4, 16, GetParam(), 100);
+ for (int i = 0; i < 100; ++i) {
+ if (!(i % 10)) cerr << "seeding connection " << i << std::endl;
+ test_msg.generate_connection();
+ }
+ gen_type rng(time(NULL));
+ for (int i = 0; i < 1000; ++i) {
+ if (!(i % 10)) {
+ cerr << "Op " << i << ": ";
+ test_msg.print_internal_state();
+ }
+ boost::uniform_int<> true_false(0, 99);
+ int val = true_false(rng);
+ if (val > 85) {
+ test_msg.generate_connection();
+ } else if (val > 70) {
+ test_msg.drop_connection();
+ } else if (val > 10) {
+ test_msg.send_message();
+ } else {
+ usleep(rand() % 500 + 100);
+ }
+ }
+ test_msg.wait_for_done();
+ g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
+ g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
+}
+
+
class MarkdownDispatcher : public Dispatcher {
Mutex lock;
set<Connection*> conns;
}
last = srv_dispatcher.count.read();
if (equal_count)
- sleep(0.5);
+ usleep(1000*500);
ASSERT_FALSE(equal && equal_count > 3);
}
server_msgr->shutdown();