#include "common/RefCountedObj.h"
#include "common/config.h"
#include "common/debug.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert...
#include "include/buffer.h"
#include "include/types.h"
#endif
struct Connection : public RefCountedObject {
- mutable Mutex lock;
+ mutable ceph::mutex lock = ceph::make_mutex("Connection::lock");
Messenger *msgr;
RefCountedPtr priv;
int peer_type;
// we are managed exclusively by ConnectionRef; make it so you can
// ConnectionRef foo = new Connection;
: RefCountedObject(cct, 0),
- lock("Connection::lock"),
msgr(m),
peer_type(-1),
features(0),
}
void set_priv(const RefCountedPtr& o) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
priv = o;
}
RefCountedPtr get_priv() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return priv;
}
void post_rx_buffer(ceph_tid_t tid, ceph::buffer::list& bl) {
#if 0
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
++rx_buffers_version;
rx_buffers[tid] = pair<bufferlist,int>(bl, rx_buffers_version);
#endif
void revoke_rx_buffer(ceph_tid_t tid) {
#if 0
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
rx_buffers.erase(tid);
#endif
}
utime_t get_last_keepalive() const {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return last_keepalive;
}
void set_last_keepalive(utime_t t) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
last_keepalive = t;
}
utime_t get_last_keepalive_ack() const {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return last_keepalive_ack;
}
void set_last_keepalive_ack(utime_t t) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
last_keepalive_ack = t;
}
#define dout_prefix *_dout << "-- " << msgr->get_myaddrs() << " "
double DispatchQueue::get_max_age(utime_t now) const {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (marrival.empty())
return 0;
else
void DispatchQueue::enqueue(const ref_t<Message>& m, int priority, uint64_t id)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (stop) {
return;
}
} else {
mqueue.enqueue(id, priority, m->get_cost(), QueueItem(m));
}
- cond.Signal();
+ cond.notify_all();
}
void DispatchQueue::local_delivery(const ref_t<Message>& m, int priority)
{
m->set_recv_stamp(ceph_clock_now());
- Mutex::Locker l(local_delivery_lock);
+ std::lock_guard l{local_delivery_lock};
if (local_messages.empty())
- local_delivery_cond.Signal();
+ local_delivery_cond.notify_all();
local_messages.emplace(m, priority);
return;
}
void DispatchQueue::run_local_delivery()
{
- local_delivery_lock.Lock();
+ std::unique_lock l{local_delivery_lock};
while (true) {
if (stop_local_delivery)
break;
if (local_messages.empty()) {
- local_delivery_cond.Wait(local_delivery_lock);
+ local_delivery_cond.wait(l);
continue;
}
auto p = std::move(local_messages.front());
local_messages.pop();
- local_delivery_lock.Unlock();
+ l.unlock();
const ref_t<Message>& m = p.first;
int priority = p.second;
fast_preprocess(m);
} else {
enqueue(m, priority, 0);
}
- local_delivery_lock.Lock();
+ l.lock();
}
- local_delivery_lock.Unlock();
}
void DispatchQueue::dispatch_throttle_release(uint64_t msize)
*/
void DispatchQueue::entry()
{
- lock.Lock();
+ std::unique_lock l{lock};
while (true) {
while (!mqueue.empty()) {
QueueItem qitem = mqueue.dequeue();
if (!qitem.is_code())
remove_arrival(qitem.get_message());
- lock.Unlock();
+ l.unlock();
if (qitem.is_code()) {
if (cct->_conf->ms_inject_internal_delays &&
}
}
- lock.Lock();
+ l.lock();
}
if (stop)
break;
// wait for something to be put on queue
- cond.Wait(lock);
+ cond.wait(l);
}
- lock.Unlock();
}
void DispatchQueue::discard_queue(uint64_t id) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
list<QueueItem> removed;
mqueue.remove_by_class(id, &removed);
for (list<QueueItem>::iterator i = removed.begin();
void DispatchQueue::shutdown()
{
// stop my local delivery thread
- local_delivery_lock.Lock();
- stop_local_delivery = true;
- local_delivery_cond.Signal();
- local_delivery_lock.Unlock();
-
+ {
+ std::scoped_lock l{local_delivery_lock};
+ stop_local_delivery = true;
+ local_delivery_cond.notify_all();
+ }
// stop my dispatch thread
- lock.Lock();
- stop = true;
- cond.Signal();
- lock.Unlock();
+ {
+ std::scoped_lock l{lock};
+ stop = true;
+ cond.notify_all();
+ }
}
#include <boost/intrusive_ptr.hpp>
#include "include/ceph_assert.h"
#include "common/Throttle.h"
-#include "common/Mutex.h"
-#include "common/Cond.h"
+#include "common/ceph_mutex.h"
#include "common/Thread.h"
#include "common/PrioritizedQueue.h"
CephContext *cct;
Messenger *msgr;
- mutable Mutex lock;
- Cond cond;
+ mutable ceph::mutex lock;
+ ceph::condition_variable cond;
PrioritizedQueue<QueueItem, uint64_t> mqueue;
}
} dispatch_thread;
- Mutex local_delivery_lock;
- Cond local_delivery_cond;
+ ceph::mutex local_delivery_lock;
+ ceph::condition_variable local_delivery_cond;
bool stop_local_delivery;
std::queue<pair<ref_t<Message>, int>> local_messages;
class LocalDeliveryThread : public Thread {
double get_max_age(utime_t now) const;
int get_queue_len() const {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return mqueue.length();
}
void dispatch_throttle_release(uint64_t msize);
void queue_connect(Connection *con) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (stop)
return;
mqueue.enqueue_strict(
0,
CEPH_MSG_PRIO_HIGHEST,
QueueItem(D_CONNECT, con));
- cond.Signal();
+ cond.notify_all();
}
void queue_accept(Connection *con) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (stop)
return;
mqueue.enqueue_strict(
0,
CEPH_MSG_PRIO_HIGHEST,
QueueItem(D_ACCEPT, con));
- cond.Signal();
+ cond.notify_all();
}
void queue_remote_reset(Connection *con) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (stop)
return;
mqueue.enqueue_strict(
0,
CEPH_MSG_PRIO_HIGHEST,
QueueItem(D_BAD_REMOTE_RESET, con));
- cond.Signal();
+ cond.notify_all();
}
void queue_reset(Connection *con) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (stop)
return;
mqueue.enqueue_strict(
0,
CEPH_MSG_PRIO_HIGHEST,
QueueItem(D_BAD_RESET, con));
- cond.Signal();
+ cond.notify_all();
}
void queue_refused(Connection *con) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (stop)
return;
mqueue.enqueue_strict(
0,
CEPH_MSG_PRIO_HIGHEST,
QueueItem(D_CONN_REFUSED, con));
- cond.Signal();
+ cond.notify_all();
}
bool can_fast_dispatch(const cref_t<Message> &m) const;
DispatchQueue(CephContext *cct, Messenger *msgr, string &name)
: cct(cct), msgr(msgr),
- lock("Messenger::DispatchQueue::lock" + name),
+ lock(ceph::make_mutex("Messenger::DispatchQueue::lock" + name)),
mqueue(cct->_conf->ms_pq_max_tokens_per_priority,
cct->_conf->ms_pq_min_cost),
next_id(1),
dispatch_thread(this),
- local_delivery_lock("Messenger::DispatchQueue::local_delivery_lock" + name),
+ local_delivery_lock(ceph::make_mutex("Messenger::DispatchQueue::local_delivery_lock" + name)),
stop_local_delivery(false),
local_delivery_thread(this),
dispatch_throttler(cct, string("msgr_dispatch_throttler-") + name,
#include "common/debug.h"
QueueStrategy::QueueStrategy(int _n_threads)
- : lock("QueueStrategy::lock"),
- n_threads(_n_threads),
+ : n_threads(_n_threads),
stop(false),
mqueue(),
disp_threads()
msgr->ms_fast_dispatch(m);
return;
}
- lock.Lock();
+ std::lock_guard l{lock};
mqueue.push_back(*m);
if (disp_threads.size()) {
if (! disp_threads.empty()) {
QSThread *thrd = &disp_threads.front();
disp_threads.pop_front();
- thrd->cond.Signal();
+ thrd->cond.notify_all();
}
}
- lock.Unlock();
}
void QueueStrategy::entry(QSThread *thrd)
{
for (;;) {
ref_t<Message> m;
- lock.Lock();
+ std::unique_lock l{lock};
for (;;) {
if (! mqueue.empty()) {
m = ref_t<Message>(&mqueue.front(), false);
if (stop)
break;
disp_threads.push_front(*thrd);
- thrd->cond.Wait(lock);
+ thrd->cond.wait(l);
}
- lock.Unlock();
+ l.unlock();
if (stop) {
if (!m) break;
continue;
void QueueStrategy::shutdown()
{
QSThread *thrd;
- lock.Lock();
+ std::lock_guard l{lock};
stop = true;
while (disp_threads.size()) {
thrd = &(disp_threads.front());
disp_threads.pop_front();
- thrd->cond.Signal();
+ thrd->cond.notify_all();
}
- lock.Unlock();
}
void QueueStrategy::wait()
{
- lock.Lock();
+ std::unique_lock l{lock};
ceph_assert(stop);
for (auto& thread : threads) {
- lock.Unlock();
+ l.unlock();
// join outside of lock
thread->join();
- lock.Lock();
+ l.lock();
}
- lock.Unlock();
}
void QueueStrategy::start()
{
ceph_assert(!stop);
- lock.Lock();
+ std::lock_guard l{lock};
threads.reserve(n_threads);
for (int ix = 0; ix < n_threads; ++ix) {
string thread_name = "ms_qs_";
thrd->create(thread_name.c_str());
threads.emplace_back(std::move(thrd));
}
- lock.Unlock();
}
namespace bi = boost::intrusive;
class QueueStrategy : public DispatchStrategy {
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("QueueStrategy::lock");
const int n_threads;
bool stop;
public:
bi::list_member_hook<> thread_q;
QueueStrategy *dq;
- Cond cond;
- explicit QSThread(QueueStrategy *dq) : thread_q(), dq(dq), cond() {}
+ ceph::condition_variable cond;
+ explicit QSThread(QueueStrategy *dq) : thread_q(), dq(dq) {}
void* entry() {
dq->entry(this);
return NULL;
{
private:
/// lock protecting policy
- Mutex policy_lock;
+ ceph::mutex policy_lock =
+ ceph::make_mutex("SimplePolicyMessenger::policy_lock");
// entity_name_t::type -> Policy
ceph::net::PolicySet<Throttle> policy_set;
SimplePolicyMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce)
- : Messenger(cct, name),
- policy_lock("SimplePolicyMessenger::policy_lock")
+ : Messenger(cct, name)
{
}
* @return A const Policy reference.
*/
Policy get_policy(int t) override {
- Mutex::Locker l(policy_lock);
+ std::lock_guard l{policy_lock};
return policy_set.get(t);
}
Policy get_default_policy() override {
- Mutex::Locker l(policy_lock);
+ std::lock_guard l{policy_lock};
return policy_set.get_default();
}
* @param p The Policy to apply.
*/
void set_default_policy(Policy p) override {
- Mutex::Locker l(policy_lock);
+ std::lock_guard l{policy_lock};
policy_set.set_default(p);
}
/**
* @param p The policy to apply.
*/
void set_policy(int type, Policy p) override {
- Mutex::Locker l(policy_lock);
+ std::lock_guard l{policy_lock};
policy_set.set(type, p);
}
void set_policy_throttlers(int type,
Throttle* byte_throttle,
Throttle* msg_throttle) override {
- Mutex::Locker l(policy_lock);
+ std::lock_guard l{policy_lock};
policy_set.set_throttlers(type, byte_throttle, msg_throttle);
}
const std::string &type, string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
dispatch_queue(cct, this, mname),
- lock("AsyncMessenger::lock"),
- nonce(_nonce), need_addr(true), did_bind(false),
- global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
- cluster_protocol(0), stopped(true)
+ nonce(_nonce)
{
std::string transport_type = "posix";
if (type.find("rdma") != std::string::npos)
}
}
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
for (auto &&p : processors)
p->start();
dispatch_queue.start();
// break ref cycles on the loopback connection
local_connection->set_priv(NULL);
did_bind = false;
- lock.Lock();
- stop_cond.Signal();
+ lock.lock();
+ stop_cond.notify_all();
stopped = true;
- lock.Unlock();
+ lock.unlock();
stack->drain();
return 0;
}
int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
{
- lock.Lock();
+ lock.lock();
if (!pending_bind && started) {
ldout(cct,10) << __func__ << " already started" << dendl;
- lock.Unlock();
+ lock.unlock();
return -1;
}
ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;
pending_bind_addrs = bind_addrs;
pending_bind = true;
- lock.Unlock();
+ lock.unlock();
return 0;
}
- lock.Unlock();
+ lock.unlock();
// bind to a socket
set<int> avoid_ports;
{
if (!cct->_conf->ms_bind_before_connect)
return 0;
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (did_bind) {
return 0;
}
int AsyncMessenger::start()
{
- lock.Lock();
+ std::scoped_lock l{lock};
ldout(cct,1) << __func__ << " start" << dendl;
// register at least one entity, first!
_init_local_connection();
}
- lock.Unlock();
return 0;
}
void AsyncMessenger::wait()
{
- lock.Lock();
- if (!started) {
- lock.Unlock();
- return;
+ {
+ std::unique_lock locker{lock};
+ if (!started) {
+ return;
+ }
+ if (!stopped)
+ stop_cond.wait(locker);
}
- if (!stopped)
- stop_cond.Wait(lock);
-
- lock.Unlock();
-
dispatch_queue.shutdown();
if (dispatch_queue.is_started()) {
ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl;
const entity_addr_t &listen_addr,
const entity_addr_t &peer_addr)
{
- lock.Lock();
+ std::lock_guard l{lock};
AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
listen_addr.is_msgr2(), false);
conn->accept(std::move(cli_socket), listen_addr, peer_addr);
accepting_conns.insert(conn);
- lock.Unlock();
}
AsyncConnectionRef AsyncMessenger::create_connect(
const entity_addrvec_t& addrs, int type)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct, 10) << __func__ << " " << addrs
<< ", creating connection and registering" << dendl;
int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
FUNCTRACE(cct);
ceph_assert(m);
ConnectionRef AsyncMessenger::connect_to(int type, const entity_addrvec_t& addrs)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (*my_addrs == addrs ||
(addrs.v.size() == 1 &&
my_addrs->contains(addrs.front()))) {
{
ldout(cct,1) << __func__ << " " << addrs << dendl;
bool ret = false;
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
entity_addrvec_t newaddrs = *my_addrs;
for (auto& a : newaddrs.v) {
void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
auto t = addrs;
for (auto& a : t.v) {
a.set_nonce(nonce);
void AsyncMessenger::shutdown_connections(bool queue_reset)
{
ldout(cct,1) << __func__ << " " << dendl;
- lock.Lock();
+ std::lock_guard l{lock};
for (const auto& c : accepting_conns) {
ldout(cct, 5) << __func__ << " accepting_conn " << c << dendl;
c->stop(queue_reset);
conns.clear();
{
- Mutex::Locker l(deleted_lock);
+ std::lock_guard l{deleted_lock};
if (cct->_conf->subsys.should_gather<ceph_subsys_ms, 5>()) {
for (const auto& c : deleted_conns) {
ldout(cct, 5) << __func__ << " delete " << c << dendl;
}
deleted_conns.clear();
}
- lock.Unlock();
}
void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs)
{
- lock.Lock();
+ std::lock_guard l{lock};
const AsyncConnectionRef& conn = _lookup_conn(addrs);
if (conn) {
ldout(cct, 1) << __func__ << " " << addrs << " -- " << conn << dendl;
} else {
ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl;
}
- lock.Unlock();
}
int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
auto it = conns.find(*conn->peer_addrs);
if (it != conns.end()) {
auto& existing = it->second;
// lazy delete, see "deleted_conns"
// If conn already in, we will return 0
- Mutex::Locker l(deleted_lock);
+ std::lock_guard l{deleted_lock};
if (deleted_conns.erase(existing)) {
conns.erase(it);
} else if (conn != existing) {
ldout(cct, 1) << __func__ << " start" << dendl;
int num = 0;
- Mutex::Locker l1(lock);
+ std::lock_guard l1{lock};
{
- Mutex::Locker l2(deleted_lock);
+ std::lock_guard l2{deleted_lock};
for (auto& c : deleted_conns) {
ldout(cct, 5) << __func__ << " delete " << c << dendl;
auto conns_it = conns.find(*c->peer_addrs);
#define CEPH_ASYNCMESSENGER_H
#include <map>
-#include <mutex>
#include "include/types.h"
#include "include/xlist.h"
#include "include/unordered_map.h"
#include "include/unordered_set.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/Cond.h"
#include "common/Thread.h"
std::string ms_type;
/// overall lock used for AsyncMessenger data structures
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("AsyncMessenger::lock");
// AsyncMessenger stuff
/// approximately unique ID set by the Constructor for use in entity_addr_t
uint64_t nonce;
/// true, specifying we haven't learned our addr; set false when we find it.
// maybe this should be protected by the lock?
- bool need_addr;
+ bool need_addr = true;
/**
* set to bind addresses if bind was called before NetworkStack was ready to
* false; set to true if the AsyncMessenger bound to a specific address;
* and set false again by Accepter::stop().
*/
- bool did_bind;
+ bool did_bind = false;
/// counter for the global seq our connection protocol uses
- __u32 global_seq;
+ __u32 global_seq = 0;
/// lock to protect the global_seq
ceph::spinlock global_seq_lock;
* deleted for AsyncConnection. "_lookup_conn" must ensure not return a
* AsyncConnection in this set.
*/
- Mutex deleted_lock;
+ ceph::mutex deleted_lock = ceph::make_mutex("AsyncMessenger::deleted_lock");
set<AsyncConnectionRef> deleted_conns;
EventCallbackRef reap_handler;
/// internal cluster protocol version, if any, for talking to entities of the same type.
- int cluster_protocol;
+ int cluster_protocol = 0;
- Cond stop_cond;
- bool stopped;
+ ceph::condition_variable stop_cond;
+ bool stopped = true;
/* You must hold this->lock for the duration of use! */
const auto& _lookup_conn(const entity_addrvec_t& k) {
static const AsyncConnectionRef nullref;
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
auto p = conns.find(k);
if (p == conns.end()) {
return nullref;
}
// lazy delete, see "deleted_conns"
- Mutex::Locker l(deleted_lock);
+ std::lock_guard l{deleted_lock};
if (deleted_conns.erase(p->second)) {
conns.erase(p);
return nullref;
}
void _init_local_connection() {
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
local_connection->peer_addrs = *my_addrs;
local_connection->peer_type = my_name.type();
local_connection->set_features(CEPH_FEATURES_ALL);
* This wraps _lookup_conn.
*/
AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return _lookup_conn(k); /* make new ref! */
}
* is used for delivering messages back to ourself.
*/
void init_local_connection() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
local_connection->is_loopback = true;
_init_local_connection();
}
* See "deleted_conns"
*/
void unregister_conn(const AsyncConnectionRef& conn) {
- Mutex::Locker l(deleted_lock);
+ std::lock_guard l{deleted_lock};
conn->get_perf_counter()->dec(l_msgr_active_connections);
deleted_conns.emplace(std::move(conn));
void NetworkStack::stop()
{
- std::lock_guard<decltype(pool_spin)> lk(pool_spin);
+ std::lock_guard lk(pool_spin);
for (unsigned i = 0; i < num_workers; ++i) {
workers[i]->done = true;
workers[i]->center.wakeup();
}
class C_drain : public EventCallback {
- Mutex drain_lock;
- Cond drain_cond;
+ ceph::mutex drain_lock = ceph::make_mutex("C_drain::drain_lock");
+ ceph::condition_variable drain_cond;
unsigned drain_count;
public:
explicit C_drain(size_t c)
- : drain_lock("C_drain::drain_lock"),
- drain_count(c) {}
+ : drain_count(c) {}
void do_request(uint64_t id) override {
- Mutex::Locker l(drain_lock);
+ std::lock_guard l{drain_lock};
drain_count--;
- if (drain_count == 0) drain_cond.Signal();
+ if (drain_count == 0) drain_cond.notify_all();
}
void wait() {
- Mutex::Locker l(drain_lock);
- while (drain_count)
- drain_cond.Wait(drain_lock);
+ std::unique_lock l{drain_lock};
+ drain_cond.wait(l, [this] { return drain_count == 0; });
}
};
WAIT_PORT_FIN_STAGE,
DONE
} create_stage = WAIT_DEVICE_STAGE;
- static Mutex lock("DPDKStack::lock");
- static Cond cond;
+ static ceph::mutex lock = ceph::make_mutex("DPDKStack::lock");
+ static ceph::condition_variable cond;
static unsigned queue_init_done = 0;
static unsigned cores = 0;
static std::shared_ptr<DPDKDevice> sdev;
sdev->workers.resize(cores);
ldout(cct, 1) << __func__ << " using " << cores << " cores " << dendl;
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
create_stage = WAIT_PORT_FIN_STAGE;
- cond.Signal();
+ cond.notify_all();
} else {
- Mutex::Locker l(lock);
- while (create_stage <= WAIT_DEVICE_STAGE)
- cond.Wait(lock);
+ std::unique_lock l{lock};
+ cond.wait(l, [] { return create_stage > WAIT_DEVICE_STAGE; });
}
ceph_assert(sdev);
if (i < sdev->hw_queues_count()) {
cpu_weights[i] = cct->_conf->ms_dpdk_hw_queue_weight;
qp->configure_proxies(cpu_weights);
sdev->set_local_queue(i, std::move(qp));
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
++queue_init_done;
cond.Signal();
} else {
}
if (i == 0) {
{
- Mutex::Locker l(lock);
- while (queue_init_done < cores)
- cond.Wait(lock);
+ std::unique_lock l{lock};
+ cond.wait(l, [] { return queue_init_done >= cores; });
}
if (sdev->init_port_fini() < 0) {
lderr(cct) << __func__ << " init_port_fini failed " << dendl;
ceph_abort();
}
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
create_stage = DONE;
- cond.Signal();
+ cond.notify_all();
} else {
- Mutex::Locker l(lock);
- while (create_stage <= WAIT_PORT_FIN_STAGE)
- cond.Wait(lock);
+ std::unique_lock l{lock};
+ cond.wait(l, [&] { return create_stage > WAIT_PORT_FIN_STAGE; });
}
sdev->workers[i] = this;
_impl = std::unique_ptr<DPDKWorker::Impl>(
new DPDKWorker::Impl(cct, i, ¢er, sdev));
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (!--queue_init_done) {
create_stage = WAIT_DEVICE_STAGE;
sdev.reset();
}
Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
- : manager(m), buffer_size(s), lock("cluster_lock")
+ : manager(m), buffer_size(s)
{
}
void Infiniband::MemoryManager::Cluster::take_back(std::vector<Chunk*> &ck)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
for (auto c : ck) {
c->clear();
free_chunks.push_back(c);
if (bytes % buffer_size == 0)
--num;
int r = num;
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (free_chunks.empty())
return 0;
if (!bytes) {
{
void *p;
- Mutex::Locker l(PoolAllocator::lock);
+ std::lock_guard l{PoolAllocator::lock};
PoolAllocator::g_ctx = ctx;
// this will trigger pool expansion via PoolAllocator::malloc()
p = boost::pool<PoolAllocator>::malloc();
}
Infiniband::MemoryManager::MemPoolContext *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr;
-Mutex Infiniband::MemoryManager::PoolAllocator::lock("pool-alloc-lock");
+ceph::mutex Infiniband::MemoryManager::PoolAllocator::lock =
+ ceph::make_mutex("pool-alloc-lock");
// lock is taken by mem_pool::slow_malloc()
char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes)
void Infiniband::MemoryManager::PoolAllocator::free(char * const block)
{
mem_info *m;
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
m = reinterpret_cast<mem_info *>(block) - 1;
m->ctx->update_stats(-m->nbufs);
}
Infiniband::Infiniband(CephContext *cct)
- : cct(cct), lock("IB lock"),
+ : cct(cct),
device_name(cct->_conf->ms_async_rdma_device_name),
port_num( cct->_conf->ms_async_rdma_port_num)
{
void Infiniband::init()
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (initialized)
return;
#include "include/page.h"
#include "common/debug.h"
#include "common/errno.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/perf_counters.h"
#include "msg/msg_types.h"
#include "msg/async/net_handler.h"
MemoryManager& manager;
uint32_t buffer_size;
uint32_t num_chunk = 0;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("cluster_lock");
std::vector<Chunk*> free_chunks;
char *base = nullptr;
char *end = nullptr;
static void free(char * const block);
static MemPoolContext *g_ctx;
- static Mutex lock;
+ static ceph::mutex lock;
};
/**
void wire_gid_to_gid(const char *wgid, IBSYNMsg* im);
void gid_to_wire_gid(const IBSYNMsg& im, char wgid[]);
CephContext *cct;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("IB lock");
bool initialized = false;
const std::string &device_name;
uint8_t port_num;
RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
RDMAWorker *w)
: cct(cct), connected(0), error(0), infiniband(ib),
- dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
+ dispatcher(s), worker(w),
is_server(false), con_handler(new C_handle_connection(this)),
active(false), pending(false)
{
dispatcher->post_chunk_to_pool(buffers[i]);
}
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (notify_fd >= 0)
::close(notify_fd);
if (tcp_fd >= 0)
void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (wc.empty())
wc = std::move(v);
else
void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (wc.empty())
return ;
w.swap(wc);
if (!bytes)
return 0;
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
pending_bl.claim_append(bl);
if (!connected) {
ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
{
if (error)
return -error;
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
size_t bytes = pending_bl.length();
ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
<< pending_bl.buffers().size() << dendl;
}
RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
- : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
- w_lock("RDMADispatcher::for worker pending list"), stack(s)
+ : cct(c), async_handler(new C_handle_cq_async(this)),
+ stack(s)
{
PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
void RDMADispatcher::polling_start()
{
// take lock because listen/connect can happen from different worker threads
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (t.joinable())
return; // dispatcher thread already running
void RDMADispatcher::polling_stop()
{
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
done = true;
}
uint64_t qpn = async_event.element.qp->qp_num;
ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
<< " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
if (!conn) {
ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
}
int RDMADispatcher::post_chunks_to_rq(int num, ibv_qp *qp)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return get_stack()->get_infiniband().post_chunks_to_rq(num, qp);
}
perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
- Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+ std::lock_guard l{lock};//make sure connected socket alive when pass wc
for (int i = 0; i < rx_ret; ++i) {
ibv_wc* response = &wc[i];
// because we need to check qp's state before sending
perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
if (num_dead_queue_pair) {
- Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
+ std::lock_guard l{lock}; // FIXME reuse dead qp because creating one qp costs 1 ms
auto it = dead_queue_pairs.begin();
while (it != dead_queue_pairs.end()) {
auto i = *it;
if (num_pending_workers) {
RDMAWorker *w = nullptr;
{
- Mutex::Locker l(w_lock);
+ std::lock_guard l{w_lock};
if (!pending_workers.empty()) {
w = pending_workers.front();
pending_workers.pop_front();
void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
ceph_assert(!qp_conns.count(qp->get_local_qp_number()));
qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
++num_qp_conn;
Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
// Try to find the QP in qp_conns firstly.
auto it = qp_conns.find(qp);
if (it != qp_conns.end())
void RDMADispatcher::erase_qpn(uint32_t qpn)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
erase_qpn_lockless(qpn);
}
ldout(cct, 1) << __func__ << " send work request returned error for buffer("
<< response->wr_id << ") status(" << response->status << "): "
<< get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
- Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+ std::lock_guard l{lock};//make sure connected socket alive when pass wc
RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
if (conn && conn->is_connected()) {
RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
: Worker(c, i), stack(nullptr),
- tx_handler(new C_handle_cq_tx(this)), lock("RDMAWorker::lock")
+ tx_handler(new C_handle_cq_tx(this))
{
// initialize perf_logger
char name[128];
bool done = false;
std::atomic<uint64_t> num_dead_queue_pair = {0};
std::atomic<uint64_t> num_qp_conn = {0};
- Mutex lock; // protect `qp_conns`, `dead_queue_pairs`
+ // protect `qp_conns`, `dead_queue_pairs`
+ ceph::mutex lock = ceph::make_mutex("RDMADispatcher::lock");
// qp_num -> InfRcConnection
// The main usage of `qp_conns` is looking up connection by qp_num,
// so the lifecycle of element in `qp_conns` is the lifecycle of qp.
std::vector<QueuePair*> dead_queue_pairs;
std::atomic<uint64_t> num_pending_workers = {0};
- Mutex w_lock; // protect pending workers
+ // protect pending workers
+ ceph::mutex w_lock =
+ ceph::make_mutex("RDMADispatcher::for worker pending list");
// fixme: lockfree
std::list<RDMAWorker*> pending_workers;
RDMAStack* stack;
void polling();
void register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
void make_pending_worker(RDMAWorker* w) {
- Mutex::Locker l(w_lock);
+ std::lock_guard l{w_lock};
auto it = std::find(pending_workers.begin(), pending_workers.end(), w);
if (it != pending_workers.end())
return;
EventCallbackRef tx_handler;
std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
RDMADispatcher* dispatcher = nullptr;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock");
class C_handle_cq_tx : public EventCallback {
RDMAWorker *worker;
int notify_fd = -1;
bufferlist pending_bl;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("RDMAConnectedSocketImpl::lock");
std::vector<ibv_wc> wc;
bool is_server;
EventCallbackRef con_handler;
ClientDispatcher dispatcher;
public:
- Mutex lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("MessengerBenchmark::ClientThread::lock");
+ ceph::condition_variable cond;
uint64_t inflight;
ClientThread(Messenger *m, int c, ConnectionRef con, int len, int ops, int think_time_us):
msgr(m), concurrent(c), conn(con), oid("object-name"), oloc(1, 1), msg_len(len), ops(ops),
- dispatcher(think_time_us, this), lock("MessengerBenchmark::ClientThread::lock"), inflight(0) {
+ dispatcher(think_time_us, this), inflight(0) {
m->add_dispatcher_head(&dispatcher);
bufferptr ptr(msg_len);
memset(ptr.c_str(), 0, msg_len);
data.append(ptr);
}
void *entry() override {
- lock.Lock();
+ std::unique_lock locker{lock};
for (int i = 0; i < ops; ++i) {
if (inflight > uint64_t(concurrent)) {
- cond.Wait(lock);
+ cond.wait(locker);
}
hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(),
oloc.nspace);
conn->send_message(m);
//cerr << __func__ << " send m=" << m << std::endl;
}
- lock.Unlock();
+ locker.unlock();
msgr->shutdown();
return 0;
}
void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message *m) {
usleep(think_time);
m->put();
- Mutex::Locker l(thread->lock);
+ std::lock_guard l{thread->lock};
thread->inflight--;
- thread->cond.Signal();
+ thread->cond.notify_all();
}
}
void add_client(ThreadData *t_data) {
- static Mutex lock("add_client_lock");
- Mutex::Locker l(lock);
+ static ceph::mutex lock = ceph::make_mutex("add_client_lock");
+ std::lock_guard l{lock};
ConnectedSocket sock;
int r = t_data->worker->connect(bind_addr, options, &sock);
std::default_random_engine rng(rd());