Cond cond;
set<Messenger*> available_servers;
set<Messenger*> available_clients;
+ Messenger::Policy client_policy;
map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
SyntheticDispatcher dispatcher;
gen_type rng;
static const unsigned max_message_len = 1024 * 1024 * 4;
SyntheticWorkload(int servers, int clients, string type, int random_num,
- Messenger::Policy srv_policy, Messenger::Policy cli_policy):
- lock("SyntheticWorkload::lock"), dispatcher(false, this), rng(time(NULL)) {
+ Messenger::Policy srv_policy, Messenger::Policy cli_policy)
+ : lock("SyntheticWorkload::lock"),
+ client_policy(cli_policy),
+ dispatcher(false, this),
+ rng(time(NULL)) {
Messenger *msgr;
int base_port = 16800;
entity_addr_t bind_addr;
boost::uniform_int<> choose(0, available_servers.size() - 1);
if (server->get_default_policy().server) {
p = make_pair(client, server);
+ ConnectionRef conn = client->connect_to(server->get_mytype(),
+ server->get_myaddrs());
+ available_connections[conn] = p;
} else {
ConnectionRef conn = client->connect_to(server->get_mytype(),
- server->get_myaddrs());
- if (available_connections.count(conn) || choose(rng) % 2)
- p = make_pair(client, server);
- else
- p = make_pair(server, client);
+ server->get_myaddrs());
+ p = make_pair(client, server);
+ available_connections[conn] = p;
}
}
- ConnectionRef conn = p.first->connect_to(p.second->get_mytype(),
- p.second->get_myaddrs());
- available_connections[conn] = p;
}
void send_message() {
ConnectionRef conn = _get_random_connection();
dispatcher.clear_pending(conn);
conn->mark_down();
- pair<Messenger*, Messenger*> &p = available_connections[conn];
- // it's a lossless policy, so we need to mark down each side
- if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
- ASSERT_EQ(conn->get_messenger(), p.first);
- ConnectionRef peer = p.second->connect_to(p.first->get_mytype(),
- p.first->get_myaddrs());
- peer->mark_down();
- dispatcher.clear_pending(peer);
- available_connections.erase(peer);
+ if (!client_policy.server &&
+ !client_policy.lossy &&
+ client_policy.standby) {
+ // it's a lossless policy, so we need to mark down each side
+ pair<Messenger*, Messenger*> &p = available_connections[conn];
+ if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
+ ASSERT_EQ(conn->get_messenger(), p.first);
+ ConnectionRef peer = p.second->connect_to(p.first->get_mytype(),
+ p.first->get_myaddrs());
+ peer->mark_down();
+ dispatcher.clear_pending(peer);
+ available_connections.erase(peer);
+ }
}
ASSERT_EQ(available_connections.erase(conn), 1U);
}