#include <time.h>
#include <set>
#include <list>
-#include "common/Mutex.h"
-#include "common/Cond.h"
+#include "common/ceph_mutex.h"
#include "common/ceph_argparse.h"
#include "global/global_init.h"
#include "msg/Dispatcher.h"
uint64_t get_count() { return count; }
};
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("FakeDispatcher::lock");
+ ceph::condition_variable cond;
bool is_server;
bool got_new;
bool got_remote_reset;
bool loopback;
entity_addrvec_t last_accept;
- explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"),
+ explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context),
is_server(s), got_new(false), got_remote_reset(false),
got_connect(false), loopback(false) {
}
}
void ms_handle_fast_connect(Connection *con) override {
- lock.Lock();
+ std::scoped_lock l{lock};
lderr(g_ceph_context) << __func__ << " " << con << dendl;
auto s = con->get_priv();
if (!s) {
<< " count: " << session->count << dendl;
}
got_connect = true;
- cond.Signal();
- lock.Unlock();
+ cond.notify_all();
}
void ms_handle_fast_accept(Connection *con) override {
last_accept = con->get_peer_addrs();
if (is_server) {
reply_message(m);
}
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
got_new = true;
- cond.Signal();
+ cond.notify_all();
m->put();
return true;
}
bool ms_handle_reset(Connection *con) override {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
lderr(g_ceph_context) << __func__ << " " << con << dendl;
auto priv = con->get_priv();
if (auto s = static_cast<Session*>(priv.get()); s) {
return true;
}
void ms_handle_remote_reset(Connection *con) override {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
lderr(g_ceph_context) << __func__ << " " << con << dendl;
auto priv = con->get_priv();
if (auto s = static_cast<Session*>(priv.get()); s) {
con->set_priv(nullptr); // break ref <-> session cycle, if any
}
got_remote_reset = true;
- cond.Signal();
+ cond.notify_all();
}
bool ms_handle_refused(Connection *con) override {
return false;
ceph_assert(m->get_source().is_client());
}
m->put();
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
got_new = true;
- cond.Signal();
+ cond.notify_all();
}
int ms_handle_authentication(Connection *con) override {
return *(decisions[step]);
}
waiting = true;
- while(waiting) {
- cond_var.wait(l);
- }
+ cond_var.wait(l, [this] { return !waiting; });
return *(decisions[step]);
}
srv_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
{
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
{
- Mutex::Locker l(srv_dispatcher.lock);
- while (!srv_dispatcher.got_new)
- srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+ std::unique_lock l{srv_dispatcher.lock};
+ srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
srv_dispatcher.got_new = false;
}
srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
{
- Mutex::Locker l(srv_dispatcher.lock);
- while (!srv_dispatcher.got_new)
- srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+ std::unique_lock l{srv_dispatcher.lock};
+ srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
srv_dispatcher.got_new = false;
}
{
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
{
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_EQ(c2s->send_message(m1), 0);
{
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
{
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_EQ(c2s->send_message(m1), 0);
{
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
{
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
server_msgr->get_myaddrs());
{
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_TRUE(conn->is_connected());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
srv_dispatcher.loopback = false;
server_msgr->get_myaddrs());
{
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_TRUE(conn->is_connected());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
srv_dispatcher.loopback = false;
server_msgr->get_myaddrs());
{
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
server_msgr->get_myaddrs());
{
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_TRUE(conn->is_connected());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
srv_dispatcher.last_accept);
{
- Mutex::Locker l(srv_dispatcher.lock);
- while (!srv_dispatcher.got_remote_reset)
- srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+ std::unique_lock l{srv_dispatcher.lock};
+ srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_remote_reset; });
}
// 2. test for client reconnect
ASSERT_FALSE(server_conn->is_connected());
// ensure client detect server socket closed
{
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_remote_reset)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
cli_dispatcher.got_remote_reset = false;
}
{
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_connect)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
cli_dispatcher.got_connect = false;
}
CHECK_AND_WAIT_TRUE(conn->is_connected());
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
ASSERT_TRUE(conn->is_connected());
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
// resetcheck happen
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
srv_dispatcher.last_accept);
// server lose state
{
- Mutex::Locker l(srv_dispatcher.lock);
- while (!srv_dispatcher.got_new)
- srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+ std::unique_lock l{srv_dispatcher.lock};
+ srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
}
ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
{
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_remote_reset)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
cli_dispatcher.got_remote_reset = false;
- while (!cli_dispatcher.got_connect)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
cli_dispatcher.got_connect = false;
}
CHECK_AND_WAIT_TRUE(conn->is_connected());
ASSERT_TRUE(conn->is_connected());
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
server_msgr->get_myaddrs());
{
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_TRUE(conn->is_connected());
{
MPing *m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
cli_dispatcher.got_new = false;
}
ASSERT_TRUE(conn->is_connected());
MCommand *m = new MCommand(uuid);
m->cmd = cmds;
conn->send_message(m);
- utime_t t;
- t += 1000*1000*500;
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait_for(l, 500s, [&] { return cli_dispatcher.got_new; });
ASSERT_TRUE(cli_dispatcher.got_new);
cli_dispatcher.got_new = false;
}
conn->send_message(m);
utime_t t;
t += 1000*1000*500;
- Mutex::Locker l(cli_dispatcher.lock);
- while (!cli_dispatcher.got_new)
- cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
ASSERT_TRUE(cli_dispatcher.got_new);
cli_dispatcher.got_new = false;
}
class SyntheticDispatcher : public Dispatcher {
public:
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("SyntheticDispatcher::lock");
+ ceph::condition_variable cond;
bool is_server;
bool got_new;
bool got_remote_reset;
SyntheticWorkload *workload;
SyntheticDispatcher(bool s, SyntheticWorkload *wl):
- Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false),
+ Dispatcher(g_ceph_context), is_server(s), got_new(false),
got_remote_reset(false), got_connect(false), index(0), workload(wl) {
}
bool ms_can_fast_dispatch_any() const override { return true; }
}
void ms_handle_fast_connect(Connection *con) override {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
list<uint64_t> c = conn_sent[con];
for (list<uint64_t>::iterator it = c.begin();
it != c.end(); ++it)
sent.erase(*it);
conn_sent.erase(con);
got_connect = true;
- cond.Signal();
+ cond.notify_all();
}
void ms_handle_fast_accept(Connection *con) override {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
list<uint64_t> c = conn_sent[con];
for (list<uint64_t>::iterator it = c.begin();
it != c.end(); ++it)
sent.erase(*it);
conn_sent.erase(con);
- cond.Signal();
+ cond.notify_all();
}
bool ms_dispatch(Message *m) override {
ceph_abort();
}
bool ms_handle_reset(Connection *con) override;
void ms_handle_remote_reset(Connection *con) override {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
list<uint64_t> c = conn_sent[con];
for (list<uint64_t>::iterator it = c.begin();
it != c.end(); ++it)
lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
reply_message(m, pl);
m->put();
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
got_new = true;
- cond.Signal();
+ cond.notify_all();
} else {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (sent.count(pl.seq)) {
lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
}
m->put();
got_new = true;
- cond.Signal();
+ cond.notify_all();
}
}
encode(pl, bl);
m->set_data(bl);
if (!con->get_messenger()->get_default_policy().lossy) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
sent[pl.seq] = pl.data;
conn_sent[con].push_back(pl.seq);
}
}
uint64_t get_pending() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return sent.size();
}
void clear_pending(ConnectionRef con) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
for (list<uint64_t>::iterator it = conn_sent[con].begin();
it != conn_sent[con].end(); ++it)
class SyntheticWorkload {
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("SyntheticWorkload::lock");
+ ceph::condition_variable cond;
set<Messenger*> available_servers;
set<Messenger*> available_clients;
Messenger::Policy client_policy;
SyntheticWorkload(int servers, int clients, string type, int random_num,
Messenger::Policy srv_policy, Messenger::Policy cli_policy)
- : lock("SyntheticWorkload::lock"),
- client_policy(cli_policy),
+ : client_policy(cli_policy),
dispatcher(false, this),
rng(time(NULL)),
dummy_auth(g_ceph_context) {
ConnectionRef _get_random_connection() {
while (dispatcher.get_pending() > max_in_flight) {
- lock.Unlock();
+ lock.unlock();
usleep(500);
- lock.Lock();
+ lock.lock();
}
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
boost::uniform_int<> choose(0, available_connections.size() - 1);
int index = choose(rng);
map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
}
void generate_connection() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (!can_create_connection())
return ;
}
void send_message() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
ConnectionRef conn = _get_random_connection();
boost::uniform_int<> true_false(0, 99);
int val = true_false(rng);
}
void drop_connection() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (available_connections.size() < 10)
return;
ConnectionRef conn = _get_random_connection();
}
void print_internal_state(bool detail=false) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
lderr(g_ceph_context) << "available_connections: " << available_connections.size()
<< " inflight messages: " << dispatcher.get_pending() << dendl;
if (detail && !available_connections.empty()) {
}
void handle_reset(Connection *con) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
available_connections.erase(con);
dispatcher.clear_pending(con);
}
class MarkdownDispatcher : public Dispatcher {
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock");
set<ConnectionRef> conns;
bool last_mark;
public:
std::atomic<uint64_t> count = { 0 };
- explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"),
+ explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context),
last_mark(false) {
}
bool ms_can_fast_dispatch_any() const override { return false; }
void ms_handle_fast_connect(Connection *con) override {
lderr(g_ceph_context) << __func__ << " " << con << dendl;
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
conns.insert(con);
}
void ms_handle_fast_accept(Connection *con) override {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
conns.insert(con);
}
bool ms_dispatch(Message *m) override {
lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl;
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
count++;
conns.insert(m->get_connection());
if (conns.size() < 2 && !last_mark) {
}
bool ms_handle_reset(Connection *con) override {
lderr(g_ceph_context) << __func__ << " " << con << dendl;
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
conns.erase(con);
usleep(rand() % 500);
return true;
}
void ms_handle_remote_reset(Connection *con) override {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
conns.erase(con);
lderr(g_ceph_context) << __func__ << " " << con << dendl;
}