From: Kefu Chai Date: Sun, 7 Jul 2019 03:15:22 +0000 (+0800) Subject: msg: s/Mutex/ceph::mutex/ X-Git-Tag: v15.1.0~1971^2~56 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=079fc7b3faf24caf7d9e5cf0689fab4a34c11788;p=ceph-ci.git msg: s/Mutex/ceph::mutex/ Signed-off-by: Kefu Chai --- diff --git a/src/msg/Connection.h b/src/msg/Connection.h index a98e0e12a53..9ea229c1742 100644 --- a/src/msg/Connection.h +++ b/src/msg/Connection.h @@ -24,7 +24,7 @@ #include "common/RefCountedObj.h" #include "common/config.h" #include "common/debug.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert... #include "include/buffer.h" #include "include/types.h" @@ -43,7 +43,7 @@ class Interceptor; #endif struct Connection : public RefCountedObject { - mutable Mutex lock; + mutable ceph::mutex lock = ceph::make_mutex("Connection::lock"); Messenger *msgr; RefCountedPtr priv; int peer_type; @@ -78,7 +78,6 @@ public: // we are managed exclusively by ConnectionRef; make it so you can // ConnectionRef foo = new Connection; : RefCountedObject(cct, 0), - lock("Connection::lock"), msgr(m), peer_type(-1), features(0), @@ -92,12 +91,12 @@ public: } void set_priv(const RefCountedPtr& o) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; priv = o; } RefCountedPtr get_priv() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return priv; } @@ -221,7 +220,7 @@ public: void post_rx_buffer(ceph_tid_t tid, ceph::buffer::list& bl) { #if 0 - Mutex::Locker l(lock); + std::lock_guard l{lock}; ++rx_buffers_version; rx_buffers[tid] = pair(bl, rx_buffers_version); #endif @@ -229,25 +228,25 @@ public: void revoke_rx_buffer(ceph_tid_t tid) { #if 0 - Mutex::Locker l(lock); + std::lock_guard l{lock}; rx_buffers.erase(tid); #endif } utime_t get_last_keepalive() const { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return last_keepalive; } void set_last_keepalive(utime_t t) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; last_keepalive = t; } utime_t get_last_keepalive_ack() const { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return last_keepalive_ack; } void set_last_keepalive_ack(utime_t t) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; last_keepalive_ack = t; } diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc index 3de1f00a503..ad543fb6301 100644 --- a/src/msg/DispatchQueue.cc +++ b/src/msg/DispatchQueue.cc @@ -29,7 +29,7 @@ #define dout_prefix *_dout << "-- " << msgr->get_myaddrs() << " " double DispatchQueue::get_max_age(utime_t now) const { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (marrival.empty()) return 0; else @@ -80,7 +80,7 @@ void DispatchQueue::fast_preprocess(const ref_t& m) void DispatchQueue::enqueue(const ref_t& m, int priority, uint64_t id) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (stop) { return; } @@ -91,32 +91,32 @@ void DispatchQueue::enqueue(const ref_t& m, int priority, uint64_t id) } else { mqueue.enqueue(id, priority, m->get_cost(), QueueItem(m)); } - cond.Signal(); + cond.notify_all(); } void DispatchQueue::local_delivery(const ref_t& m, int priority) { m->set_recv_stamp(ceph_clock_now()); - Mutex::Locker l(local_delivery_lock); + std::lock_guard l{local_delivery_lock}; if (local_messages.empty()) - local_delivery_cond.Signal(); + local_delivery_cond.notify_all(); local_messages.emplace(m, priority); return; } void DispatchQueue::run_local_delivery() { - local_delivery_lock.Lock(); + std::unique_lock l{local_delivery_lock}; while (true) { if (stop_local_delivery) break; if (local_messages.empty()) { - local_delivery_cond.Wait(local_delivery_lock); + local_delivery_cond.wait(l); continue; } auto p = std::move(local_messages.front()); local_messages.pop(); - local_delivery_lock.Unlock(); + l.unlock(); const ref_t& m = p.first; int priority = p.second; fast_preprocess(m); @@ -125,9 +125,8 @@ void DispatchQueue::run_local_delivery() } else { enqueue(m, priority, 0); } - local_delivery_lock.Lock(); + l.lock(); } - local_delivery_lock.Unlock(); } void DispatchQueue::dispatch_throttle_release(uint64_t msize) @@ -151,13 +150,13 @@ void DispatchQueue::dispatch_throttle_release(uint64_t msize) */ void DispatchQueue::entry() { - lock.Lock(); + std::unique_lock l{lock}; while (true) { while (!mqueue.empty()) { QueueItem qitem = mqueue.dequeue(); if (!qitem.is_code()) remove_arrival(qitem.get_message()); - lock.Unlock(); + l.unlock(); if (qitem.is_code()) { if (cct->_conf->ms_inject_internal_delays && @@ -199,19 +198,18 @@ void DispatchQueue::entry() } } - lock.Lock(); + l.lock(); } if (stop) break; // wait for something to be put on queue - cond.Wait(lock); + cond.wait(l); } - lock.Unlock(); } void DispatchQueue::discard_queue(uint64_t id) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; list removed; mqueue.remove_by_class(id, &removed); for (list::iterator i = removed.begin(); @@ -246,14 +244,15 @@ void DispatchQueue::discard_local() void DispatchQueue::shutdown() { // stop my local delivery thread - local_delivery_lock.Lock(); - stop_local_delivery = true; - local_delivery_cond.Signal(); - local_delivery_lock.Unlock(); - + { + std::scoped_lock l{local_delivery_lock}; + stop_local_delivery = true; + local_delivery_cond.notify_all(); + } // stop my dispatch thread - lock.Lock(); - stop = true; - cond.Signal(); - lock.Unlock(); + { + std::scoped_lock l{lock}; + stop = true; + cond.notify_all(); + } } diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index 8244538219f..97969de8abe 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -21,8 +21,7 @@ #include #include "include/ceph_assert.h" #include "common/Throttle.h" -#include "common/Mutex.h" -#include "common/Cond.h" +#include "common/ceph_mutex.h" #include "common/Thread.h" #include "common/PrioritizedQueue.h" @@ -65,8 +64,8 @@ class DispatchQueue { CephContext *cct; Messenger *msgr; - mutable Mutex lock; - Cond cond; + mutable ceph::mutex lock; + ceph::condition_variable cond; PrioritizedQueue mqueue; @@ -104,8 +103,8 @@ class DispatchQueue { } } dispatch_thread; - Mutex local_delivery_lock; - Cond local_delivery_cond; + ceph::mutex local_delivery_lock; + ceph::condition_variable local_delivery_cond; bool stop_local_delivery; std::queue, int>> local_messages; class LocalDeliveryThread : public Thread { @@ -136,7 +135,7 @@ class DispatchQueue { double get_max_age(utime_t now) const; int get_queue_len() const { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return mqueue.length(); } @@ -148,54 +147,54 @@ class DispatchQueue { void dispatch_throttle_release(uint64_t msize); void queue_connect(Connection *con) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (stop) return; mqueue.enqueue_strict( 0, CEPH_MSG_PRIO_HIGHEST, QueueItem(D_CONNECT, con)); - cond.Signal(); + cond.notify_all(); } void queue_accept(Connection *con) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (stop) return; mqueue.enqueue_strict( 0, CEPH_MSG_PRIO_HIGHEST, QueueItem(D_ACCEPT, con)); - cond.Signal(); + cond.notify_all(); } void queue_remote_reset(Connection *con) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (stop) return; mqueue.enqueue_strict( 0, CEPH_MSG_PRIO_HIGHEST, QueueItem(D_BAD_REMOTE_RESET, con)); - cond.Signal(); + cond.notify_all(); } void queue_reset(Connection *con) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (stop) return; mqueue.enqueue_strict( 0, CEPH_MSG_PRIO_HIGHEST, QueueItem(D_BAD_RESET, con)); - cond.Signal(); + cond.notify_all(); } void queue_refused(Connection *con) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (stop) return; mqueue.enqueue_strict( 0, CEPH_MSG_PRIO_HIGHEST, QueueItem(D_CONN_REFUSED, con)); - cond.Signal(); + cond.notify_all(); } bool can_fast_dispatch(const cref_t &m) const; @@ -221,12 +220,12 @@ class DispatchQueue { DispatchQueue(CephContext *cct, Messenger *msgr, string &name) : cct(cct), msgr(msgr), - lock("Messenger::DispatchQueue::lock" + name), + lock(ceph::make_mutex("Messenger::DispatchQueue::lock" + name)), mqueue(cct->_conf->ms_pq_max_tokens_per_priority, cct->_conf->ms_pq_min_cost), next_id(1), dispatch_thread(this), - local_delivery_lock("Messenger::DispatchQueue::local_delivery_lock" + name), + local_delivery_lock(ceph::make_mutex("Messenger::DispatchQueue::local_delivery_lock" + name)), stop_local_delivery(false), local_delivery_thread(this), dispatch_throttler(cct, string("msgr_dispatch_throttler-") + name, diff --git a/src/msg/QueueStrategy.cc b/src/msg/QueueStrategy.cc index a6db0b96de6..85b0a11e602 100644 --- a/src/msg/QueueStrategy.cc +++ b/src/msg/QueueStrategy.cc @@ -17,8 +17,7 @@ #include "common/debug.h" QueueStrategy::QueueStrategy(int _n_threads) - : lock("QueueStrategy::lock"), - n_threads(_n_threads), + : n_threads(_n_threads), stop(false), mqueue(), disp_threads() @@ -31,23 +30,22 @@ void QueueStrategy::ds_dispatch(Message *m) { msgr->ms_fast_dispatch(m); return; } - lock.Lock(); + std::lock_guard l{lock}; mqueue.push_back(*m); if (disp_threads.size()) { if (! disp_threads.empty()) { QSThread *thrd = &disp_threads.front(); disp_threads.pop_front(); - thrd->cond.Signal(); + thrd->cond.notify_all(); } } - lock.Unlock(); } void QueueStrategy::entry(QSThread *thrd) { for (;;) { ref_t m; - lock.Lock(); + std::unique_lock l{lock}; for (;;) { if (! mqueue.empty()) { m = ref_t(&mqueue.front(), false); @@ -57,9 +55,9 @@ void QueueStrategy::entry(QSThread *thrd) if (stop) break; disp_threads.push_front(*thrd); - thrd->cond.Wait(lock); + thrd->cond.wait(l); } - lock.Unlock(); + l.unlock(); if (stop) { if (!m) break; continue; @@ -71,35 +69,33 @@ void QueueStrategy::entry(QSThread *thrd) void QueueStrategy::shutdown() { QSThread *thrd; - lock.Lock(); + std::lock_guard l{lock}; stop = true; while (disp_threads.size()) { thrd = &(disp_threads.front()); disp_threads.pop_front(); - thrd->cond.Signal(); + thrd->cond.notify_all(); } - lock.Unlock(); } void QueueStrategy::wait() { - lock.Lock(); + std::unique_lock l{lock}; ceph_assert(stop); for (auto& thread : threads) { - lock.Unlock(); + l.unlock(); // join outside of lock thread->join(); - lock.Lock(); + l.lock(); } - lock.Unlock(); } void QueueStrategy::start() { ceph_assert(!stop); - lock.Lock(); + std::lock_guard l{lock}; threads.reserve(n_threads); for (int ix = 0; ix < n_threads; ++ix) { string thread_name = "ms_qs_"; @@ -108,5 +104,4 @@ void QueueStrategy::start() thrd->create(thread_name.c_str()); threads.emplace_back(std::move(thrd)); } - lock.Unlock(); } diff --git a/src/msg/QueueStrategy.h b/src/msg/QueueStrategy.h index a531cd77743..b7f6df85d7c 100644 --- a/src/msg/QueueStrategy.h +++ b/src/msg/QueueStrategy.h @@ -25,7 +25,7 @@ namespace bi = boost::intrusive; class QueueStrategy : public DispatchStrategy { - Mutex lock; + ceph::mutex lock = ceph::make_mutex("QueueStrategy::lock"); const int n_threads; bool stop; @@ -35,8 +35,8 @@ class QueueStrategy : public DispatchStrategy { public: bi::list_member_hook<> thread_q; QueueStrategy *dq; - Cond cond; - explicit QSThread(QueueStrategy *dq) : thread_q(), dq(dq), cond() {} + ceph::condition_variable cond; + explicit QSThread(QueueStrategy *dq) : thread_q(), dq(dq) {} void* entry() { dq->entry(this); return NULL; diff --git a/src/msg/SimplePolicyMessenger.h b/src/msg/SimplePolicyMessenger.h index 429b1dfabf9..dc2b3c79222 100644 --- a/src/msg/SimplePolicyMessenger.h +++ b/src/msg/SimplePolicyMessenger.h @@ -23,7 +23,8 @@ class SimplePolicyMessenger : public Messenger { private: /// lock protecting policy - Mutex policy_lock; + ceph::mutex policy_lock = + ceph::make_mutex("SimplePolicyMessenger::policy_lock"); // entity_name_t::type -> Policy ceph::net::PolicySet policy_set; @@ -31,8 +32,7 @@ public: SimplePolicyMessenger(CephContext *cct, entity_name_t name, string mname, uint64_t _nonce) - : Messenger(cct, name), - policy_lock("SimplePolicyMessenger::policy_lock") + : Messenger(cct, name) { } @@ -43,12 +43,12 @@ public: * @return A const Policy reference. */ Policy get_policy(int t) override { - Mutex::Locker l(policy_lock); + std::lock_guard l{policy_lock}; return policy_set.get(t); } Policy get_default_policy() override { - Mutex::Locker l(policy_lock); + std::lock_guard l{policy_lock}; return policy_set.get_default(); } @@ -61,7 +61,7 @@ public: * @param p The Policy to apply. */ void set_default_policy(Policy p) override { - Mutex::Locker l(policy_lock); + std::lock_guard l{policy_lock}; policy_set.set_default(p); } /** @@ -73,7 +73,7 @@ public: * @param p The policy to apply. */ void set_policy(int type, Policy p) override { - Mutex::Locker l(policy_lock); + std::lock_guard l{policy_lock}; policy_set.set(type, p); } @@ -91,7 +91,7 @@ public: void set_policy_throttlers(int type, Throttle* byte_throttle, Throttle* msg_throttle) override { - Mutex::Locker l(policy_lock); + std::lock_guard l{policy_lock}; policy_set.set_throttlers(type, byte_throttle, msg_throttle); } diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 6b2fb1c77f2..6d2a4251cb1 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -278,10 +278,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type, string mname, uint64_t _nonce) : SimplePolicyMessenger(cct, name,mname, _nonce), dispatch_queue(cct, this, mname), - lock("AsyncMessenger::lock"), - nonce(_nonce), need_addr(true), did_bind(false), - global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"), - cluster_protocol(0), stopped(true) + nonce(_nonce) { std::string transport_type = "posix"; if (type.find("rdma") != std::string::npos) @@ -332,7 +329,7 @@ void AsyncMessenger::ready() } } - Mutex::Locker l(lock); + std::lock_guard l{lock}; for (auto &&p : processors) p->start(); dispatch_queue.start(); @@ -349,10 +346,10 @@ int AsyncMessenger::shutdown() // break ref cycles on the loopback connection local_connection->set_priv(NULL); did_bind = false; - lock.Lock(); - stop_cond.Signal(); + lock.lock(); + stop_cond.notify_all(); stopped = true; - lock.Unlock(); + lock.unlock(); stack->drain(); return 0; } @@ -376,11 +373,11 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr) int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs) { - lock.Lock(); + lock.lock(); if (!pending_bind && started) { ldout(cct,10) << __func__ << " already started" << dendl; - lock.Unlock(); + lock.unlock(); return -1; } @@ -390,11 +387,11 @@ int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs) ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl; pending_bind_addrs = bind_addrs; pending_bind = true; - lock.Unlock(); + lock.unlock(); return 0; } - lock.Unlock(); + lock.unlock(); // bind to a socket set avoid_ports; @@ -464,7 +461,7 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr) { if (!cct->_conf->ms_bind_before_connect) return 0; - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (did_bind) { return 0; } @@ -505,7 +502,7 @@ void AsyncMessenger::_finish_bind(const entity_addrvec_t& bind_addrs, int AsyncMessenger::start() { - lock.Lock(); + std::scoped_lock l{lock}; ldout(cct,1) << __func__ << " start" << dendl; // register at least one entity, first! @@ -524,22 +521,19 @@ int AsyncMessenger::start() _init_local_connection(); } - lock.Unlock(); return 0; } void AsyncMessenger::wait() { - lock.Lock(); - if (!started) { - lock.Unlock(); - return; + { + std::unique_lock locker{lock}; + if (!started) { + return; + } + if (!stopped) + stop_cond.wait(locker); } - if (!stopped) - stop_cond.Wait(lock); - - lock.Unlock(); - dispatch_queue.shutdown(); if (dispatch_queue.is_started()) { ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl; @@ -561,18 +555,17 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, const entity_addr_t &listen_addr, const entity_addr_t &peer_addr) { - lock.Lock(); + std::lock_guard l{lock}; AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w, listen_addr.is_msgr2(), false); conn->accept(std::move(cli_socket), listen_addr, peer_addr); accepting_conns.insert(conn); - lock.Unlock(); } AsyncConnectionRef AsyncMessenger::create_connect( const entity_addrvec_t& addrs, int type) { - ceph_assert(lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(lock)); ldout(cct, 10) << __func__ << " " << addrs << ", creating connection and registering" << dendl; @@ -641,7 +634,7 @@ entity_addrvec_t AsyncMessenger::_filter_addrs(const entity_addrvec_t& addrs) int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; FUNCTRACE(cct); ceph_assert(m); @@ -672,7 +665,7 @@ int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs) ConnectionRef AsyncMessenger::connect_to(int type, const entity_addrvec_t& addrs) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (*my_addrs == addrs || (addrs.v.size() == 1 && my_addrs->contains(addrs.front()))) { @@ -747,7 +740,7 @@ bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t &addrs) { ldout(cct,1) << __func__ << " " << addrs << dendl; bool ret = false; - Mutex::Locker l(lock); + std::lock_guard l{lock}; entity_addrvec_t newaddrs = *my_addrs; for (auto& a : newaddrs.v) { @@ -779,7 +772,7 @@ bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t &addrs) void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; auto t = addrs; for (auto& a : t.v) { a.set_nonce(nonce); @@ -791,7 +784,7 @@ void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs) void AsyncMessenger::shutdown_connections(bool queue_reset) { ldout(cct,1) << __func__ << " " << dendl; - lock.Lock(); + std::lock_guard l{lock}; for (const auto& c : accepting_conns) { ldout(cct, 5) << __func__ << " accepting_conn " << c << dendl; c->stop(queue_reset); @@ -806,7 +799,7 @@ void AsyncMessenger::shutdown_connections(bool queue_reset) conns.clear(); { - Mutex::Locker l(deleted_lock); + std::lock_guard l{deleted_lock}; if (cct->_conf->subsys.should_gather()) { for (const auto& c : deleted_conns) { ldout(cct, 5) << __func__ << " delete " << c << dendl; @@ -814,12 +807,11 @@ void AsyncMessenger::shutdown_connections(bool queue_reset) } deleted_conns.clear(); } - lock.Unlock(); } void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs) { - lock.Lock(); + std::lock_guard l{lock}; const AsyncConnectionRef& conn = _lookup_conn(addrs); if (conn) { ldout(cct, 1) << __func__ << " " << addrs << " -- " << conn << dendl; @@ -827,7 +819,6 @@ void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs) } else { ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl; } - lock.Unlock(); } int AsyncMessenger::get_proto_version(int peer_type, bool connect) const @@ -851,14 +842,14 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) const int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; auto it = conns.find(*conn->peer_addrs); if (it != conns.end()) { auto& existing = it->second; // lazy delete, see "deleted_conns" // If conn already in, we will return 0 - Mutex::Locker l(deleted_lock); + std::lock_guard l{deleted_lock}; if (deleted_conns.erase(existing)) { conns.erase(it); } else if (conn != existing) { @@ -929,10 +920,10 @@ int AsyncMessenger::reap_dead() ldout(cct, 1) << __func__ << " start" << dendl; int num = 0; - Mutex::Locker l1(lock); + std::lock_guard l1{lock}; { - Mutex::Locker l2(deleted_lock); + std::lock_guard l2{deleted_lock}; for (auto& c : deleted_conns) { ldout(cct, 5) << __func__ << " delete " << c << dendl; auto conns_it = conns.find(*c->peer_addrs); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 79e82b4264c..5c1c6ba397e 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -18,7 +18,6 @@ #define CEPH_ASYNCMESSENGER_H #include -#include #include "include/types.h" #include "include/xlist.h" @@ -26,7 +25,7 @@ #include "include/unordered_map.h" #include "include/unordered_set.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/Cond.h" #include "common/Thread.h" @@ -233,14 +232,14 @@ private: std::string ms_type; /// overall lock used for AsyncMessenger data structures - Mutex lock; + ceph::mutex lock = ceph::make_mutex("AsyncMessenger::lock"); // AsyncMessenger stuff /// approximately unique ID set by the Constructor for use in entity_addr_t uint64_t nonce; /// true, specifying we haven't learned our addr; set false when we find it. // maybe this should be protected by the lock? - bool need_addr; + bool need_addr = true; /** * set to bind addresses if bind was called before NetworkStack was ready to @@ -262,9 +261,9 @@ private: * false; set to true if the AsyncMessenger bound to a specific address; * and set false again by Accepter::stop(). */ - bool did_bind; + bool did_bind = false; /// counter for the global seq our connection protocol uses - __u32 global_seq; + __u32 global_seq = 0; /// lock to protect the global_seq ceph::spinlock global_seq_lock; @@ -294,28 +293,28 @@ private: * deleted for AsyncConnection. "_lookup_conn" must ensure not return a * AsyncConnection in this set. */ - Mutex deleted_lock; + ceph::mutex deleted_lock = ceph::make_mutex("AsyncMessenger::deleted_lock"); set deleted_conns; EventCallbackRef reap_handler; /// internal cluster protocol version, if any, for talking to entities of the same type. - int cluster_protocol; + int cluster_protocol = 0; - Cond stop_cond; - bool stopped; + ceph::condition_variable stop_cond; + bool stopped = true; /* You must hold this->lock for the duration of use! */ const auto& _lookup_conn(const entity_addrvec_t& k) { static const AsyncConnectionRef nullref; - ceph_assert(lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(lock)); auto p = conns.find(k); if (p == conns.end()) { return nullref; } // lazy delete, see "deleted_conns" - Mutex::Locker l(deleted_lock); + std::lock_guard l{deleted_lock}; if (deleted_conns.erase(p->second)) { conns.erase(p); return nullref; @@ -325,7 +324,7 @@ private: } void _init_local_connection() { - ceph_assert(lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(lock)); local_connection->peer_addrs = *my_addrs; local_connection->peer_type = my_name.type(); local_connection->set_features(CEPH_FEATURES_ALL); @@ -347,7 +346,7 @@ public: * This wraps _lookup_conn. */ AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return _lookup_conn(k); /* make new ref! */ } @@ -392,7 +391,7 @@ public: * is used for delivering messages back to ourself. */ void init_local_connection() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; local_connection->is_loopback = true; _init_local_connection(); } @@ -403,7 +402,7 @@ public: * See "deleted_conns" */ void unregister_conn(const AsyncConnectionRef& conn) { - Mutex::Locker l(deleted_lock); + std::lock_guard l{deleted_lock}; conn->get_perf_counter()->dec(l_msgr_active_connections); deleted_conns.emplace(std::move(conn)); diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc index 8976c3cc43a..5db63006c89 100644 --- a/src/msg/async/Stack.cc +++ b/src/msg/async/Stack.cc @@ -171,7 +171,7 @@ Worker* NetworkStack::get_worker() void NetworkStack::stop() { - std::lock_guard lk(pool_spin); + std::lock_guard lk(pool_spin); for (unsigned i = 0; i < num_workers; ++i) { workers[i]->done = true; workers[i]->center.wakeup(); @@ -181,23 +181,21 @@ void NetworkStack::stop() } class C_drain : public EventCallback { - Mutex drain_lock; - Cond drain_cond; + ceph::mutex drain_lock = ceph::make_mutex("C_drain::drain_lock"); + ceph::condition_variable drain_cond; unsigned drain_count; public: explicit C_drain(size_t c) - : drain_lock("C_drain::drain_lock"), - drain_count(c) {} + : drain_count(c) {} void do_request(uint64_t id) override { - Mutex::Locker l(drain_lock); + std::lock_guard l{drain_lock}; drain_count--; - if (drain_count == 0) drain_cond.Signal(); + if (drain_count == 0) drain_cond.notify_all(); } void wait() { - Mutex::Locker l(drain_lock); - while (drain_count) - drain_cond.Wait(drain_lock); + std::unique_lock l{drain_lock}; + drain_cond.wait(l, [this] { return drain_count == 0; }); } }; diff --git a/src/msg/async/dpdk/DPDKStack.cc b/src/msg/async/dpdk/DPDKStack.cc index 1db97c289ad..65022344de3 100644 --- a/src/msg/async/dpdk/DPDKStack.cc +++ b/src/msg/async/dpdk/DPDKStack.cc @@ -68,8 +68,8 @@ void DPDKWorker::initialize() WAIT_PORT_FIN_STAGE, DONE } create_stage = WAIT_DEVICE_STAGE; - static Mutex lock("DPDKStack::lock"); - static Cond cond; + static ceph::mutex lock = ceph::make_mutex("DPDKStack::lock"); + static ceph::condition_variable cond; static unsigned queue_init_done = 0; static unsigned cores = 0; static std::shared_ptr sdev; @@ -87,13 +87,12 @@ void DPDKWorker::initialize() sdev->workers.resize(cores); ldout(cct, 1) << __func__ << " using " << cores << " cores " << dendl; - Mutex::Locker l(lock); + std::lock_guard l{lock}; create_stage = WAIT_PORT_FIN_STAGE; - cond.Signal(); + cond.notify_all(); } else { - Mutex::Locker l(lock); - while (create_stage <= WAIT_DEVICE_STAGE) - cond.Wait(lock); + std::unique_lock l{lock}; + cond.wait(l, [] { return create_stage > WAIT_DEVICE_STAGE; }); } ceph_assert(sdev); if (i < sdev->hw_queues_count()) { @@ -105,7 +104,7 @@ void DPDKWorker::initialize() cpu_weights[i] = cct->_conf->ms_dpdk_hw_queue_weight; qp->configure_proxies(cpu_weights); sdev->set_local_queue(i, std::move(qp)); - Mutex::Locker l(lock); + std::lock_guard l{lock}; ++queue_init_done; cond.Signal(); } else { @@ -115,29 +114,27 @@ void DPDKWorker::initialize() } if (i == 0) { { - Mutex::Locker l(lock); - while (queue_init_done < cores) - cond.Wait(lock); + std::unique_lock l{lock}; + cond.wait(l, [] { return queue_init_done >= cores; }); } if (sdev->init_port_fini() < 0) { lderr(cct) << __func__ << " init_port_fini failed " << dendl; ceph_abort(); } - Mutex::Locker l(lock); + std::lock_guard l{lock}; create_stage = DONE; - cond.Signal(); + cond.notify_all(); } else { - Mutex::Locker l(lock); - while (create_stage <= WAIT_PORT_FIN_STAGE) - cond.Wait(lock); + std::unique_lock l{lock}; + cond.wait(l, [&] { return create_stage > WAIT_PORT_FIN_STAGE; }); } sdev->workers[i] = this; _impl = std::unique_ptr( new DPDKWorker::Impl(cct, i, ¢er, sdev)); { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (!--queue_init_done) { create_stage = WAIT_DEVICE_STAGE; sdev.reset(); diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index 167d11eb468..38851c300ac 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -580,7 +580,7 @@ void Infiniband::MemoryManager::Chunk::clear() } Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s) - : manager(m), buffer_size(s), lock("cluster_lock") + : manager(m), buffer_size(s) { } @@ -622,7 +622,7 @@ int Infiniband::MemoryManager::Cluster::fill(uint32_t num) void Infiniband::MemoryManager::Cluster::take_back(std::vector &ck) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; for (auto c : ck) { c->clear(); free_chunks.push_back(c); @@ -635,7 +635,7 @@ int Infiniband::MemoryManager::Cluster::get_buffers(std::vector &chunks, if (bytes % buffer_size == 0) --num; int r = num; - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (free_chunks.empty()) return 0; if (!bytes) { @@ -696,7 +696,7 @@ void *Infiniband::MemoryManager::mem_pool::slow_malloc() { void *p; - Mutex::Locker l(PoolAllocator::lock); + std::lock_guard l{PoolAllocator::lock}; PoolAllocator::g_ctx = ctx; // this will trigger pool expansion via PoolAllocator::malloc() p = boost::pool::malloc(); @@ -705,7 +705,8 @@ void *Infiniband::MemoryManager::mem_pool::slow_malloc() } Infiniband::MemoryManager::MemPoolContext *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr; -Mutex Infiniband::MemoryManager::PoolAllocator::lock("pool-alloc-lock"); +ceph::mutex Infiniband::MemoryManager::PoolAllocator::lock = + ceph::make_mutex("pool-alloc-lock"); // lock is taken by mem_pool::slow_malloc() char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes) @@ -765,7 +766,7 @@ char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes) void Infiniband::MemoryManager::PoolAllocator::free(char * const block) { mem_info *m; - Mutex::Locker l(lock); + std::lock_guard l{lock}; m = reinterpret_cast(block) - 1; m->ctx->update_stats(-m->nbufs); @@ -887,7 +888,7 @@ void Infiniband::verify_prereq(CephContext *cct) { } Infiniband::Infiniband(CephContext *cct) - : cct(cct), lock("IB lock"), + : cct(cct), device_name(cct->_conf->ms_async_rdma_device_name), port_num( cct->_conf->ms_async_rdma_port_num) { @@ -898,7 +899,7 @@ Infiniband::Infiniband(CephContext *cct) void Infiniband::init() { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (initialized) return; diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index 9096aaeca93..b1aec88400d 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -32,7 +32,7 @@ #include "include/page.h" #include "common/debug.h" #include "common/errno.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/perf_counters.h" #include "msg/msg_types.h" #include "msg/async/net_handler.h" @@ -236,7 +236,7 @@ class Infiniband { MemoryManager& manager; uint32_t buffer_size; uint32_t num_chunk = 0; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("cluster_lock"); std::vector free_chunks; char *base = nullptr; char *end = nullptr; @@ -275,7 +275,7 @@ class Infiniband { static void free(char * const block); static MemPoolContext *g_ctx; - static Mutex lock; + static ceph::mutex lock; }; /** @@ -366,7 +366,7 @@ class Infiniband { void wire_gid_to_gid(const char *wgid, IBSYNMsg* im); void gid_to_wire_gid(const IBSYNMsg& im, char wgid[]); CephContext *cct; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("IB lock"); bool initialized = false; const std::string &device_name; uint8_t port_num; diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index bf9d072ef9a..ec10aa82044 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -22,7 +22,7 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, RDMAWorker *w) : cct(cct), connected(0), error(0), infiniband(ib), - dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"), + dispatcher(s), worker(w), is_server(false), con_handler(new C_handle_connection(this)), active(false), pending(false) { @@ -54,7 +54,7 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() dispatcher->post_chunk_to_pool(buffers[i]); } - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (notify_fd >= 0) ::close(notify_fd); if (tcp_fd >= 0) @@ -64,7 +64,7 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() void RDMAConnectedSocketImpl::pass_wc(std::vector &&v) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (wc.empty()) wc = std::move(v); else @@ -74,7 +74,7 @@ void RDMAConnectedSocketImpl::pass_wc(std::vector &&v) void RDMAConnectedSocketImpl::get_wc(std::vector &w) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (wc.empty()) return ; w.swap(wc); @@ -421,7 +421,7 @@ ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) if (!bytes) return 0; { - Mutex::Locker l(lock); + std::lock_guard l{lock}; pending_bl.claim_append(bl); if (!connected) { ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl; @@ -439,7 +439,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) { if (error) return -error; - Mutex::Locker l(lock); + std::lock_guard l{lock}; size_t bytes = pending_bl.length(); ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: " << pending_bl.buffers().size() << dendl; diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index f63a8e7d1f2..5c11a8c8afc 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -44,8 +44,8 @@ RDMADispatcher::~RDMADispatcher() } RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) - : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), - w_lock("RDMADispatcher::for worker pending list"), stack(s) + : cct(c), async_handler(new C_handle_cq_async(this)), + stack(s) { PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last); @@ -80,7 +80,7 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) void RDMADispatcher::polling_start() { // take lock because listen/connect can happen from different worker threads - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (t.joinable()) return; // dispatcher thread already running @@ -103,7 +103,7 @@ void RDMADispatcher::polling_start() void RDMADispatcher::polling_stop() { { - Mutex::Locker l(lock); + std::lock_guard l{lock}; done = true; } @@ -139,7 +139,7 @@ void RDMADispatcher::handle_async_event() uint64_t qpn = async_event.element.qp->qp_num; ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp << " evt: " << ibv_event_type_str(async_event.event_type) << dendl; - Mutex::Locker l(lock); + std::lock_guard l{lock}; RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn); if (!conn) { ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl; @@ -160,14 +160,14 @@ void RDMADispatcher::handle_async_event() void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; get_stack()->get_infiniband().post_chunk_to_pool(chunk); perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); } int RDMADispatcher::post_chunks_to_rq(int num, ibv_qp *qp) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; return get_stack()->get_infiniband().post_chunks_to_rq(num, qp); } @@ -199,7 +199,7 @@ void RDMADispatcher::polling() perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret); perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret); - Mutex::Locker l(lock);//make sure connected socket alive when pass wc + std::lock_guard l{lock};//make sure connected socket alive when pass wc for (int i = 0; i < rx_ret; ++i) { ibv_wc* response = &wc[i]; @@ -243,7 +243,7 @@ void RDMADispatcher::polling() // because we need to check qp's state before sending perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight); if (num_dead_queue_pair) { - Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms + std::lock_guard l{lock}; // FIXME reuse dead qp because creating one qp costs 1 ms auto it = dead_queue_pairs.begin(); while (it != dead_queue_pairs.end()) { auto i = *it; @@ -308,7 +308,7 @@ void RDMADispatcher::notify_pending_workers() { if (num_pending_workers) { RDMAWorker *w = nullptr; { - Mutex::Locker l(w_lock); + std::lock_guard l{w_lock}; if (!pending_workers.empty()) { w = pending_workers.front(); pending_workers.pop_front(); @@ -322,7 +322,7 @@ void RDMADispatcher::notify_pending_workers() { void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; ceph_assert(!qp_conns.count(qp->get_local_qp_number())); qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi); ++num_qp_conn; @@ -340,7 +340,7 @@ RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp) Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; // Try to find the QP in qp_conns firstly. auto it = qp_conns.find(qp); if (it != qp_conns.end()) @@ -367,7 +367,7 @@ void RDMADispatcher::erase_qpn_lockless(uint32_t qpn) void RDMADispatcher::erase_qpn(uint32_t qpn) { - Mutex::Locker l(lock); + std::lock_guard l{lock}; erase_qpn_lockless(qpn); } @@ -400,7 +400,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) ldout(cct, 1) << __func__ << " send work request returned error for buffer(" << response->wr_id << ") status(" << response->status << "): " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl; - Mutex::Locker l(lock);//make sure connected socket alive when pass wc + std::lock_guard l{lock};//make sure connected socket alive when pass wc RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); if (conn && conn->is_connected()) { @@ -451,7 +451,7 @@ void RDMADispatcher::post_tx_buffer(std::vector &chunks) RDMAWorker::RDMAWorker(CephContext *c, unsigned i) : Worker(c, i), stack(nullptr), - tx_handler(new C_handle_cq_tx(this)), lock("RDMAWorker::lock") + tx_handler(new C_handle_cq_tx(this)) { // initialize perf_logger char name[128]; diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index e038d362598..948d1fce2a0 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -47,7 +47,8 @@ class RDMADispatcher { bool done = false; std::atomic num_dead_queue_pair = {0}; std::atomic num_qp_conn = {0}; - Mutex lock; // protect `qp_conns`, `dead_queue_pairs` + // protect `qp_conns`, `dead_queue_pairs` + ceph::mutex lock = ceph::make_mutex("RDMADispatcher::lock"); // qp_num -> InfRcConnection // The main usage of `qp_conns` is looking up connection by qp_num, // so the lifecycle of element in `qp_conns` is the lifecycle of qp. @@ -71,7 +72,9 @@ class RDMADispatcher { std::vector dead_queue_pairs; std::atomic num_pending_workers = {0}; - Mutex w_lock; // protect pending workers + // protect pending workers + ceph::mutex w_lock = + ceph::make_mutex("RDMADispatcher::for worker pending list"); // fixme: lockfree std::list pending_workers; RDMAStack* stack; @@ -98,7 +101,7 @@ class RDMADispatcher { void polling(); void register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi); void make_pending_worker(RDMAWorker* w) { - Mutex::Locker l(w_lock); + std::lock_guard l{w_lock}; auto it = std::find(pending_workers.begin(), pending_workers.end(), w); if (it != pending_workers.end()) return; @@ -132,7 +135,7 @@ class RDMAWorker : public Worker { EventCallbackRef tx_handler; std::list pending_sent_conns; RDMADispatcher* dispatcher = nullptr; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock"); class C_handle_cq_tx : public EventCallback { RDMAWorker *worker; @@ -193,7 +196,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl { int notify_fd = -1; bufferlist pending_bl; - Mutex lock; + ceph::mutex lock = ceph::make_mutex("RDMAConnectedSocketImpl::lock"); std::vector wc; bool is_server; EventCallbackRef con_handler; diff --git a/src/test/msgr/perf_msgr_client.cc b/src/test/msgr/perf_msgr_client.cc index 9a02b3485a8..f9f3b617ea3 100644 --- a/src/test/msgr/perf_msgr_client.cc +++ b/src/test/msgr/perf_msgr_client.cc @@ -76,23 +76,23 @@ class MessengerClient { ClientDispatcher dispatcher; public: - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("MessengerBenchmark::ClientThread::lock"); + ceph::condition_variable cond; uint64_t inflight; ClientThread(Messenger *m, int c, ConnectionRef con, int len, int ops, int think_time_us): msgr(m), concurrent(c), conn(con), oid("object-name"), oloc(1, 1), msg_len(len), ops(ops), - dispatcher(think_time_us, this), lock("MessengerBenchmark::ClientThread::lock"), inflight(0) { + dispatcher(think_time_us, this), inflight(0) { m->add_dispatcher_head(&dispatcher); bufferptr ptr(msg_len); memset(ptr.c_str(), 0, msg_len); data.append(ptr); } void *entry() override { - lock.Lock(); + std::unique_lock locker{lock}; for (int i = 0; i < ops; ++i) { if (inflight > uint64_t(concurrent)) { - cond.Wait(lock); + cond.wait(locker); } hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(), oloc.nspace); @@ -104,7 +104,7 @@ class MessengerClient { conn->send_message(m); //cerr << __func__ << " send m=" << m << std::endl; } - lock.Unlock(); + locker.unlock(); msgr->shutdown(); return 0; } @@ -159,9 +159,9 @@ class MessengerClient { void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message *m) { usleep(think_time); m->put(); - Mutex::Locker l(thread->lock); + std::lock_guard l{thread->lock}; thread->inflight--; - thread->cond.Signal(); + thread->cond.notify_all(); } diff --git a/src/test/msgr/test_async_networkstack.cc b/src/test/msgr/test_async_networkstack.cc index 3077c1f13e6..fab2e33b390 100644 --- a/src/test/msgr/test_async_networkstack.cc +++ b/src/test/msgr/test_async_networkstack.cc @@ -976,8 +976,8 @@ class StressFactory { } void add_client(ThreadData *t_data) { - static Mutex lock("add_client_lock"); - Mutex::Locker l(lock); + static ceph::mutex lock = ceph::make_mutex("add_client_lock"); + std::lock_guard l{lock}; ConnectedSocket sock; int r = t_data->worker->connect(bind_addr, options, &sock); std::default_random_engine rng(rd());