]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg: s/Mutex/ceph::mutex/
authorKefu Chai <kchai@redhat.com>
Sun, 7 Jul 2019 03:15:22 +0000 (11:15 +0800)
committerKefu Chai <kchai@redhat.com>
Sat, 3 Aug 2019 01:34:50 +0000 (09:34 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
17 files changed:
src/msg/Connection.h
src/msg/DispatchQueue.cc
src/msg/DispatchQueue.h
src/msg/QueueStrategy.cc
src/msg/QueueStrategy.h
src/msg/SimplePolicyMessenger.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/msg/async/Stack.cc
src/msg/async/dpdk/DPDKStack.cc
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h
src/test/msgr/perf_msgr_client.cc
src/test/msgr/test_async_networkstack.cc

index a98e0e12a5347797384dc50543dd04c562dc58f5..9ea229c174267e381cd554570110649d5e17fff0 100644 (file)
@@ -24,7 +24,7 @@
 #include "common/RefCountedObj.h"
 #include "common/config.h"
 #include "common/debug.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert...
 #include "include/buffer.h"
 #include "include/types.h"
@@ -43,7 +43,7 @@ class Interceptor;
 #endif
 
 struct Connection : public RefCountedObject {
-  mutable Mutex lock;
+  mutable ceph::mutex lock = ceph::make_mutex("Connection::lock");
   Messenger *msgr;
   RefCountedPtr priv;
   int peer_type;
@@ -78,7 +78,6 @@ public:
     // we are managed exclusively by ConnectionRef; make it so you can
     //   ConnectionRef foo = new Connection;
     : RefCountedObject(cct, 0),
-      lock("Connection::lock"),
       msgr(m),
       peer_type(-1),
       features(0),
@@ -92,12 +91,12 @@ public:
   }
 
   void set_priv(const RefCountedPtr& o) {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     priv = o;
   }
 
   RefCountedPtr get_priv() {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     return priv;
   }
 
@@ -221,7 +220,7 @@ public:
 
   void post_rx_buffer(ceph_tid_t tid, ceph::buffer::list& bl) {
 #if 0
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     ++rx_buffers_version;
     rx_buffers[tid] = pair<bufferlist,int>(bl, rx_buffers_version);
 #endif
@@ -229,25 +228,25 @@ public:
 
   void revoke_rx_buffer(ceph_tid_t tid) {
 #if 0
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     rx_buffers.erase(tid);
 #endif
   }
 
   utime_t get_last_keepalive() const {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     return last_keepalive;
   }
   void set_last_keepalive(utime_t t) {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     last_keepalive = t;
   }
   utime_t get_last_keepalive_ack() const {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     return last_keepalive_ack;
   }
   void set_last_keepalive_ack(utime_t t) {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     last_keepalive_ack = t;
   }
 
index 3de1f00a503a47c784c9a5ab882054bfd73c9c97..ad543fb63011c2f26a659ba79a0a23c0808ca4c0 100644 (file)
@@ -29,7 +29,7 @@
 #define dout_prefix *_dout << "-- " << msgr->get_myaddrs() << " "
 
 double DispatchQueue::get_max_age(utime_t now) const {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (marrival.empty())
     return 0;
   else
@@ -80,7 +80,7 @@ void DispatchQueue::fast_preprocess(const ref_t<Message>& m)
 
 void DispatchQueue::enqueue(const ref_t<Message>& m, int priority, uint64_t id)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (stop) {
     return;
   }
@@ -91,32 +91,32 @@ void DispatchQueue::enqueue(const ref_t<Message>& m, int priority, uint64_t id)
   } else {
     mqueue.enqueue(id, priority, m->get_cost(), QueueItem(m));
   }
-  cond.Signal();
+  cond.notify_all();
 }
 
 void DispatchQueue::local_delivery(const ref_t<Message>& m, int priority)
 {
   m->set_recv_stamp(ceph_clock_now());
-  Mutex::Locker l(local_delivery_lock);
+  std::lock_guard l{local_delivery_lock};
   if (local_messages.empty())
-    local_delivery_cond.Signal();
+    local_delivery_cond.notify_all();
   local_messages.emplace(m, priority);
   return;
 }
 
 void DispatchQueue::run_local_delivery()
 {
-  local_delivery_lock.Lock();
+  std::unique_lock l{local_delivery_lock};
   while (true) {
     if (stop_local_delivery)
       break;
     if (local_messages.empty()) {
-      local_delivery_cond.Wait(local_delivery_lock);
+      local_delivery_cond.wait(l);
       continue;
     }
     auto p = std::move(local_messages.front());
     local_messages.pop();
-    local_delivery_lock.Unlock();
+    l.unlock();
     const ref_t<Message>& m = p.first;
     int priority = p.second;
     fast_preprocess(m);
@@ -125,9 +125,8 @@ void DispatchQueue::run_local_delivery()
     } else {
       enqueue(m, priority, 0);
     }
-    local_delivery_lock.Lock();
+    l.lock();
   }
-  local_delivery_lock.Unlock();
 }
 
 void DispatchQueue::dispatch_throttle_release(uint64_t msize)
@@ -151,13 +150,13 @@ void DispatchQueue::dispatch_throttle_release(uint64_t msize)
  */
 void DispatchQueue::entry()
 {
-  lock.Lock();
+  std::unique_lock l{lock};
   while (true) {
     while (!mqueue.empty()) {
       QueueItem qitem = mqueue.dequeue();
       if (!qitem.is_code())
        remove_arrival(qitem.get_message());
-      lock.Unlock();
+      l.unlock();
 
       if (qitem.is_code()) {
        if (cct->_conf->ms_inject_internal_delays &&
@@ -199,19 +198,18 @@ void DispatchQueue::entry()
        }
       }
 
-      lock.Lock();
+      l.lock();
     }
     if (stop)
       break;
 
     // wait for something to be put on queue
-    cond.Wait(lock);
+    cond.wait(l);
   }
-  lock.Unlock();
 }
 
 void DispatchQueue::discard_queue(uint64_t id) {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   list<QueueItem> removed;
   mqueue.remove_by_class(id, &removed);
   for (list<QueueItem>::iterator i = removed.begin();
@@ -246,14 +244,15 @@ void DispatchQueue::discard_local()
 void DispatchQueue::shutdown()
 {
   // stop my local delivery thread
-  local_delivery_lock.Lock();
-  stop_local_delivery = true;
-  local_delivery_cond.Signal();
-  local_delivery_lock.Unlock();
-
+  {
+    std::scoped_lock l{local_delivery_lock};
+    stop_local_delivery = true;
+    local_delivery_cond.notify_all();
+  }
   // stop my dispatch thread
-  lock.Lock();
-  stop = true;
-  cond.Signal();
-  lock.Unlock();
+  {
+    std::scoped_lock l{lock};
+    stop = true;
+    cond.notify_all();
+  }
 }
index 8244538219fa1e08782a0256615d31f9a5588655..97969de8abe4a49354a562854afc9a1cce26726c 100644 (file)
@@ -21,8 +21,7 @@
 #include <boost/intrusive_ptr.hpp>
 #include "include/ceph_assert.h"
 #include "common/Throttle.h"
-#include "common/Mutex.h"
-#include "common/Cond.h"
+#include "common/ceph_mutex.h"
 #include "common/Thread.h"
 #include "common/PrioritizedQueue.h"
 
@@ -65,8 +64,8 @@ class DispatchQueue {
     
   CephContext *cct;
   Messenger *msgr;
-  mutable Mutex lock;
-  Cond cond;
+  mutable ceph::mutex lock;
+  ceph::condition_variable cond;
 
   PrioritizedQueue<QueueItem, uint64_t> mqueue;
 
@@ -104,8 +103,8 @@ class DispatchQueue {
     }
   } dispatch_thread;
 
-  Mutex local_delivery_lock;
-  Cond local_delivery_cond;
+  ceph::mutex local_delivery_lock;
+  ceph::condition_variable local_delivery_cond;
   bool stop_local_delivery;
   std::queue<pair<ref_t<Message>, int>> local_messages;
   class LocalDeliveryThread : public Thread {
@@ -136,7 +135,7 @@ class DispatchQueue {
   double get_max_age(utime_t now) const;
 
   int get_queue_len() const {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     return mqueue.length();
   }
 
@@ -148,54 +147,54 @@ class DispatchQueue {
   void dispatch_throttle_release(uint64_t msize);
 
   void queue_connect(Connection *con) {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     if (stop)
       return;
     mqueue.enqueue_strict(
       0,
       CEPH_MSG_PRIO_HIGHEST,
       QueueItem(D_CONNECT, con));
-    cond.Signal();
+    cond.notify_all();
   }
   void queue_accept(Connection *con) {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     if (stop)
       return;
     mqueue.enqueue_strict(
       0,
       CEPH_MSG_PRIO_HIGHEST,
       QueueItem(D_ACCEPT, con));
-    cond.Signal();
+    cond.notify_all();
   }
   void queue_remote_reset(Connection *con) {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     if (stop)
       return;
     mqueue.enqueue_strict(
       0,
       CEPH_MSG_PRIO_HIGHEST,
       QueueItem(D_BAD_REMOTE_RESET, con));
-    cond.Signal();
+    cond.notify_all();
   }
   void queue_reset(Connection *con) {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     if (stop)
       return;
     mqueue.enqueue_strict(
       0,
       CEPH_MSG_PRIO_HIGHEST,
       QueueItem(D_BAD_RESET, con));
-    cond.Signal();
+    cond.notify_all();
   }
   void queue_refused(Connection *con) {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     if (stop)
       return;
     mqueue.enqueue_strict(
       0,
       CEPH_MSG_PRIO_HIGHEST,
       QueueItem(D_CONN_REFUSED, con));
-    cond.Signal();
+    cond.notify_all();
   }
 
   bool can_fast_dispatch(const cref_t<Message> &m) const;
@@ -221,12 +220,12 @@ class DispatchQueue {
 
   DispatchQueue(CephContext *cct, Messenger *msgr, string &name)
     : cct(cct), msgr(msgr),
-      lock("Messenger::DispatchQueue::lock" + name),
+      lock(ceph::make_mutex("Messenger::DispatchQueue::lock" + name)),
       mqueue(cct->_conf->ms_pq_max_tokens_per_priority,
             cct->_conf->ms_pq_min_cost),
       next_id(1),
       dispatch_thread(this),
-      local_delivery_lock("Messenger::DispatchQueue::local_delivery_lock" + name),
+      local_delivery_lock(ceph::make_mutex("Messenger::DispatchQueue::local_delivery_lock" + name)),
       stop_local_delivery(false),
       local_delivery_thread(this),
       dispatch_throttler(cct, string("msgr_dispatch_throttler-") + name,
index a6db0b96de67a4599d8413de99e5dbd10bddfc53..85b0a11e6021f4c9d6a13480efe3629b54ea89d9 100644 (file)
@@ -17,8 +17,7 @@
 #include "common/debug.h"
 
 QueueStrategy::QueueStrategy(int _n_threads)
-  : lock("QueueStrategy::lock"),
-    n_threads(_n_threads),
+  : n_threads(_n_threads),
     stop(false),
     mqueue(),
     disp_threads()
@@ -31,23 +30,22 @@ void QueueStrategy::ds_dispatch(Message *m) {
     msgr->ms_fast_dispatch(m);
     return;
   }
-  lock.Lock();
+  std::lock_guard l{lock};
   mqueue.push_back(*m);
   if (disp_threads.size()) {
     if (! disp_threads.empty()) {
       QSThread *thrd = &disp_threads.front();
       disp_threads.pop_front();
-      thrd->cond.Signal();
+      thrd->cond.notify_all();
     }
   }
-  lock.Unlock();
 }
 
 void QueueStrategy::entry(QSThread *thrd)
 {
   for (;;) {
     ref_t<Message> m;
-    lock.Lock();
+    std::unique_lock l{lock};
     for (;;) {
       if (! mqueue.empty()) {
        m = ref_t<Message>(&mqueue.front(), false);
@@ -57,9 +55,9 @@ void QueueStrategy::entry(QSThread *thrd)
       if (stop)
        break;
       disp_threads.push_front(*thrd);
-      thrd->cond.Wait(lock);
+      thrd->cond.wait(l);
     }
-    lock.Unlock();
+    l.unlock();
     if (stop) {
        if (!m) break;
        continue;
@@ -71,35 +69,33 @@ void QueueStrategy::entry(QSThread *thrd)
 void QueueStrategy::shutdown()
 {
   QSThread *thrd;
-  lock.Lock();
+  std::lock_guard l{lock};
   stop = true;
   while (disp_threads.size()) {
     thrd = &(disp_threads.front());
     disp_threads.pop_front();
-    thrd->cond.Signal();
+    thrd->cond.notify_all();
   }
-  lock.Unlock();
 }
 
 void QueueStrategy::wait()
 {
-  lock.Lock();
+  std::unique_lock l{lock};
   ceph_assert(stop);
   for (auto& thread : threads) {
-    lock.Unlock();
+    l.unlock();
 
     // join outside of lock
     thread->join();
 
-    lock.Lock();
+    l.lock();
   }
-  lock.Unlock();
 }
 
 void QueueStrategy::start()
 {
   ceph_assert(!stop);
-  lock.Lock();
+  std::lock_guard l{lock};
   threads.reserve(n_threads);
   for (int ix = 0; ix < n_threads; ++ix) {
     string thread_name = "ms_qs_";
@@ -108,5 +104,4 @@ void QueueStrategy::start()
     thrd->create(thread_name.c_str());
     threads.emplace_back(std::move(thrd));
   }
-  lock.Unlock();
 }
index a531cd7774327a601478ff77f725a33199af6fd4..b7f6df85d7c9f22e8ab4fa96055f73d179e0eabd 100644 (file)
@@ -25,7 +25,7 @@
 namespace bi = boost::intrusive;
 
 class QueueStrategy : public DispatchStrategy {
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("QueueStrategy::lock");
   const int n_threads;
   bool stop;
 
@@ -35,8 +35,8 @@ class QueueStrategy : public DispatchStrategy {
   public:
     bi::list_member_hook<> thread_q;
     QueueStrategy *dq;
-    Cond cond;
-    explicit QSThread(QueueStrategy *dq) : thread_q(), dq(dq), cond() {}
+    ceph::condition_variable cond;
+    explicit QSThread(QueueStrategy *dq) : thread_q(), dq(dq) {}
     void* entry() {
       dq->entry(this);
       return NULL;
index 429b1dfabf96602f01c578b8706b303e1cffd7c2..dc2b3c79222d2482de7229c1e9c269dda39080c6 100644 (file)
@@ -23,7 +23,8 @@ class SimplePolicyMessenger : public Messenger
 {
 private:
   /// lock protecting policy
-  Mutex policy_lock;
+  ceph::mutex policy_lock =
+    ceph::make_mutex("SimplePolicyMessenger::policy_lock");
   // entity_name_t::type -> Policy
   ceph::net::PolicySet<Throttle> policy_set;
 
@@ -31,8 +32,7 @@ public:
 
   SimplePolicyMessenger(CephContext *cct, entity_name_t name,
                        string mname, uint64_t _nonce)
-    : Messenger(cct, name),
-      policy_lock("SimplePolicyMessenger::policy_lock")
+    : Messenger(cct, name)
     {
     }
 
@@ -43,12 +43,12 @@ public:
    * @return A const Policy reference.
    */
   Policy get_policy(int t) override {
-    Mutex::Locker l(policy_lock);
+    std::lock_guard l{policy_lock};
     return policy_set.get(t);
   }
 
   Policy get_default_policy() override {
-    Mutex::Locker l(policy_lock);
+    std::lock_guard l{policy_lock};
     return policy_set.get_default();
   }
 
@@ -61,7 +61,7 @@ public:
    * @param p The Policy to apply.
    */
   void set_default_policy(Policy p) override {
-    Mutex::Locker l(policy_lock);
+    std::lock_guard l{policy_lock};
     policy_set.set_default(p);
   }
   /**
@@ -73,7 +73,7 @@ public:
    * @param p The policy to apply.
    */
   void set_policy(int type, Policy p) override {
-    Mutex::Locker l(policy_lock);
+    std::lock_guard l{policy_lock};
     policy_set.set(type, p);
   }
 
@@ -91,7 +91,7 @@ public:
   void set_policy_throttlers(int type,
                             Throttle* byte_throttle,
                             Throttle* msg_throttle) override {
-    Mutex::Locker l(policy_lock);
+    std::lock_guard l{policy_lock};
     policy_set.set_throttlers(type, byte_throttle, msg_throttle);
   }
 
index 6b2fb1c77f2ed9646c5a9097eb0d17981b71b033..6d2a4251cb1ee8d7ad6543b7e9c026e2e6dcd432 100644 (file)
@@ -278,10 +278,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
                                const std::string &type, string mname, uint64_t _nonce)
   : SimplePolicyMessenger(cct, name,mname, _nonce),
     dispatch_queue(cct, this, mname),
-    lock("AsyncMessenger::lock"),
-    nonce(_nonce), need_addr(true), did_bind(false),
-    global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
-    cluster_protocol(0), stopped(true)
+    nonce(_nonce)
 {
   std::string transport_type = "posix";
   if (type.find("rdma") != std::string::npos)
@@ -332,7 +329,7 @@ void AsyncMessenger::ready()
     }
   }
 
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   for (auto &&p : processors)
     p->start();
   dispatch_queue.start();
@@ -349,10 +346,10 @@ int AsyncMessenger::shutdown()
   // break ref cycles on the loopback connection
   local_connection->set_priv(NULL);
   did_bind = false;
-  lock.Lock();
-  stop_cond.Signal();
+  lock.lock();
+  stop_cond.notify_all();
   stopped = true;
-  lock.Unlock();
+  lock.unlock();
   stack->drain();
   return 0;
 }
@@ -376,11 +373,11 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr)
 
 int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
 {
-  lock.Lock();
+  lock.lock();
 
   if (!pending_bind && started) {
     ldout(cct,10) << __func__ << " already started" << dendl;
-    lock.Unlock();
+    lock.unlock();
     return -1;
   }
 
@@ -390,11 +387,11 @@ int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
     ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;
     pending_bind_addrs = bind_addrs;
     pending_bind = true;
-    lock.Unlock();
+    lock.unlock();
     return 0;
   }
 
-  lock.Unlock();
+  lock.unlock();
 
   // bind to a socket
   set<int> avoid_ports;
@@ -464,7 +461,7 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
 {
   if (!cct->_conf->ms_bind_before_connect)
     return 0;
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (did_bind) {
     return 0;
   }
@@ -505,7 +502,7 @@ void AsyncMessenger::_finish_bind(const entity_addrvec_t& bind_addrs,
 
 int AsyncMessenger::start()
 {
-  lock.Lock();
+  std::scoped_lock l{lock};
   ldout(cct,1) << __func__ << " start" << dendl;
 
   // register at least one entity, first!
@@ -524,22 +521,19 @@ int AsyncMessenger::start()
     _init_local_connection();
   }
 
-  lock.Unlock();
   return 0;
 }
 
 void AsyncMessenger::wait()
 {
-  lock.Lock();
-  if (!started) {
-    lock.Unlock();
-    return;
+  {
+    std::unique_lock locker{lock};
+    if (!started) {
+      return;
+    }
+    if (!stopped)
+      stop_cond.wait(locker);
   }
-  if (!stopped)
-    stop_cond.Wait(lock);
-
-  lock.Unlock();
-
   dispatch_queue.shutdown();
   if (dispatch_queue.is_started()) {
     ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl;
@@ -561,18 +555,17 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
                                const entity_addr_t &listen_addr,
                                const entity_addr_t &peer_addr)
 {
-  lock.Lock();
+  std::lock_guard l{lock};
   AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
                                                listen_addr.is_msgr2(), false);
   conn->accept(std::move(cli_socket), listen_addr, peer_addr);
   accepting_conns.insert(conn);
-  lock.Unlock();
 }
 
 AsyncConnectionRef AsyncMessenger::create_connect(
   const entity_addrvec_t& addrs, int type)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
 
   ldout(cct, 10) << __func__ << " " << addrs
       << ", creating connection and registering" << dendl;
@@ -641,7 +634,7 @@ entity_addrvec_t AsyncMessenger::_filter_addrs(const entity_addrvec_t& addrs)
 
 int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
 
   FUNCTRACE(cct);
   ceph_assert(m);
@@ -672,7 +665,7 @@ int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs)
 
 ConnectionRef AsyncMessenger::connect_to(int type, const entity_addrvec_t& addrs)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (*my_addrs == addrs ||
       (addrs.v.size() == 1 &&
        my_addrs->contains(addrs.front()))) {
@@ -747,7 +740,7 @@ bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
 {
   ldout(cct,1) << __func__ << " " << addrs << dendl;
   bool ret = false;
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
 
   entity_addrvec_t newaddrs = *my_addrs;
   for (auto& a : newaddrs.v) {
@@ -779,7 +772,7 @@ bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
 
 void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   auto t = addrs;
   for (auto& a : t.v) {
     a.set_nonce(nonce);
@@ -791,7 +784,7 @@ void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs)
 void AsyncMessenger::shutdown_connections(bool queue_reset)
 {
   ldout(cct,1) << __func__ << " " << dendl;
-  lock.Lock();
+  std::lock_guard l{lock};
   for (const auto& c : accepting_conns) {
     ldout(cct, 5) << __func__ << " accepting_conn " << c << dendl;
     c->stop(queue_reset);
@@ -806,7 +799,7 @@ void AsyncMessenger::shutdown_connections(bool queue_reset)
   conns.clear();
 
   {
-    Mutex::Locker l(deleted_lock);
+    std::lock_guard l{deleted_lock};
     if (cct->_conf->subsys.should_gather<ceph_subsys_ms, 5>()) {
       for (const auto& c : deleted_conns) {
         ldout(cct, 5) << __func__ << " delete " << c << dendl;
@@ -814,12 +807,11 @@ void AsyncMessenger::shutdown_connections(bool queue_reset)
     }
     deleted_conns.clear();
   }
-  lock.Unlock();
 }
 
 void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs)
 {
-  lock.Lock();
+  std::lock_guard l{lock};
   const AsyncConnectionRef& conn = _lookup_conn(addrs);
   if (conn) {
     ldout(cct, 1) << __func__ << " " << addrs << " -- " << conn << dendl;
@@ -827,7 +819,6 @@ void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs)
   } else {
     ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl;
   }
-  lock.Unlock();
 }
 
 int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
@@ -851,14 +842,14 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
 
 int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   auto it = conns.find(*conn->peer_addrs);
   if (it != conns.end()) {
     auto& existing = it->second;
 
     // lazy delete, see "deleted_conns"
     // If conn already in, we will return 0
-    Mutex::Locker l(deleted_lock);
+    std::lock_guard l{deleted_lock};
     if (deleted_conns.erase(existing)) {
       conns.erase(it);
     } else if (conn != existing) {
@@ -929,10 +920,10 @@ int AsyncMessenger::reap_dead()
   ldout(cct, 1) << __func__ << " start" << dendl;
   int num = 0;
 
-  Mutex::Locker l1(lock);
+  std::lock_guard l1{lock};
 
   {
-    Mutex::Locker l2(deleted_lock);
+    std::lock_guard l2{deleted_lock};
     for (auto& c : deleted_conns) {
       ldout(cct, 5) << __func__ << " delete " << c << dendl;
       auto conns_it = conns.find(*c->peer_addrs);
index 79e82b4264c01af846139c7b9b84315cd6aeaa15..5c1c6ba397ec97ddb2c694f00a187e5c68f7f404 100644 (file)
@@ -18,7 +18,6 @@
 #define CEPH_ASYNCMESSENGER_H
 
 #include <map>
-#include <mutex>
 
 #include "include/types.h"
 #include "include/xlist.h"
@@ -26,7 +25,7 @@
 #include "include/unordered_map.h"
 #include "include/unordered_set.h"
 
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "common/Cond.h"
 #include "common/Thread.h"
 
@@ -233,14 +232,14 @@ private:
   std::string ms_type;
 
   /// overall lock used for AsyncMessenger data structures
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("AsyncMessenger::lock");
   // AsyncMessenger stuff
   /// approximately unique ID set by the Constructor for use in entity_addr_t
   uint64_t nonce;
 
   /// true, specifying we haven't learned our addr; set false when we find it.
   // maybe this should be protected by the lock?
-  bool need_addr;
+  bool need_addr = true;
 
   /**
    * set to bind addresses if bind was called before NetworkStack was ready to
@@ -262,9 +261,9 @@ private:
    *  false; set to true if the AsyncMessenger bound to a specific address;
    *  and set false again by Accepter::stop().
    */
-  bool did_bind;
+  bool did_bind = false;
   /// counter for the global seq our connection protocol uses
-  __u32 global_seq;
+  __u32 global_seq = 0;
   /// lock to protect the global_seq
   ceph::spinlock global_seq_lock;
 
@@ -294,28 +293,28 @@ private:
    * deleted for AsyncConnection. "_lookup_conn" must ensure not return a
    * AsyncConnection in this set.
    */
-  Mutex deleted_lock;
+  ceph::mutex deleted_lock = ceph::make_mutex("AsyncMessenger::deleted_lock");
   set<AsyncConnectionRef> deleted_conns;
 
   EventCallbackRef reap_handler;
 
   /// internal cluster protocol version, if any, for talking to entities of the same type.
-  int cluster_protocol;
+  int cluster_protocol = 0;
 
-  Cond  stop_cond;
-  bool stopped;
+  ceph::condition_variable  stop_cond;
+  bool stopped = true;
 
   /* You must hold this->lock for the duration of use! */
   const auto& _lookup_conn(const entity_addrvec_t& k) {
     static const AsyncConnectionRef nullref;
-    ceph_assert(lock.is_locked());
+    ceph_assert(ceph_mutex_is_locked(lock));
     auto p = conns.find(k);
     if (p == conns.end()) {
       return nullref;
     }
 
     // lazy delete, see "deleted_conns"
-    Mutex::Locker l(deleted_lock);
+    std::lock_guard l{deleted_lock};
     if (deleted_conns.erase(p->second)) {
       conns.erase(p);
       return nullref;
@@ -325,7 +324,7 @@ private:
   }
 
   void _init_local_connection() {
-    ceph_assert(lock.is_locked());
+    ceph_assert(ceph_mutex_is_locked(lock));
     local_connection->peer_addrs = *my_addrs;
     local_connection->peer_type = my_name.type();
     local_connection->set_features(CEPH_FEATURES_ALL);
@@ -347,7 +346,7 @@ public:
    * This wraps _lookup_conn.
    */
   AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     return _lookup_conn(k); /* make new ref! */
   }
 
@@ -392,7 +391,7 @@ public:
    * is used for delivering messages back to ourself.
    */
   void init_local_connection() {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     local_connection->is_loopback = true;
     _init_local_connection();
   }
@@ -403,7 +402,7 @@ public:
    * See "deleted_conns"
    */
   void unregister_conn(const AsyncConnectionRef& conn) {
-    Mutex::Locker l(deleted_lock);
+    std::lock_guard l{deleted_lock};
     conn->get_perf_counter()->dec(l_msgr_active_connections);
     deleted_conns.emplace(std::move(conn));
 
index 8976c3cc43a85addd16eee72a8e0df3167a1ea16..5db63006c895f1f53bccbc1c3fcae52e30883495 100644 (file)
@@ -171,7 +171,7 @@ Worker* NetworkStack::get_worker()
 
 void NetworkStack::stop()
 {
-  std::lock_guard<decltype(pool_spin)> lk(pool_spin);
+  std::lock_guard lk(pool_spin);
   for (unsigned i = 0; i < num_workers; ++i) {
     workers[i]->done = true;
     workers[i]->center.wakeup();
@@ -181,23 +181,21 @@ void NetworkStack::stop()
 }
 
 class C_drain : public EventCallback {
-  Mutex drain_lock;
-  Cond drain_cond;
+  ceph::mutex drain_lock = ceph::make_mutex("C_drain::drain_lock");
+  ceph::condition_variable drain_cond;
   unsigned drain_count;
 
  public:
   explicit C_drain(size_t c)
-      : drain_lock("C_drain::drain_lock"),
-        drain_count(c) {}
+      : drain_count(c) {}
   void do_request(uint64_t id) override {
-    Mutex::Locker l(drain_lock);
+    std::lock_guard l{drain_lock};
     drain_count--;
-    if (drain_count == 0) drain_cond.Signal();
+    if (drain_count == 0) drain_cond.notify_all();
   }
   void wait() {
-    Mutex::Locker l(drain_lock);
-    while (drain_count)
-      drain_cond.Wait(drain_lock);
+    std::unique_lock l{drain_lock};
+    drain_cond.wait(l, [this] { return drain_count == 0; });
   }
 };
 
index 1db97c289ad453eb13d8da82750476cf2ac1d706..65022344de30c5251dc6ed09da3b4429c4f416cd 100644 (file)
@@ -68,8 +68,8 @@ void DPDKWorker::initialize()
     WAIT_PORT_FIN_STAGE,
     DONE
   } create_stage = WAIT_DEVICE_STAGE;
-  static Mutex lock("DPDKStack::lock");
-  static Cond cond;
+  static ceph::mutex lock = ceph::make_mutex("DPDKStack::lock");
+  static ceph::condition_variable cond;
   static unsigned queue_init_done = 0;
   static unsigned cores = 0;
   static std::shared_ptr<DPDKDevice> sdev;
@@ -87,13 +87,12 @@ void DPDKWorker::initialize()
     sdev->workers.resize(cores);
     ldout(cct, 1) << __func__ << " using " << cores << " cores " << dendl;
 
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     create_stage = WAIT_PORT_FIN_STAGE;
-    cond.Signal();
+    cond.notify_all();
   } else {
-    Mutex::Locker l(lock);
-    while (create_stage <= WAIT_DEVICE_STAGE)
-      cond.Wait(lock);
+    std::unique_lock l{lock};
+    cond.wait(l, [] { return create_stage > WAIT_DEVICE_STAGE; });
   }
   ceph_assert(sdev);
   if (i < sdev->hw_queues_count()) {
@@ -105,7 +104,7 @@ void DPDKWorker::initialize()
     cpu_weights[i] = cct->_conf->ms_dpdk_hw_queue_weight;
     qp->configure_proxies(cpu_weights);
     sdev->set_local_queue(i, std::move(qp));
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     ++queue_init_done;
     cond.Signal();
   } else {
@@ -115,29 +114,27 @@ void DPDKWorker::initialize()
   }
   if (i == 0) {
     {
-      Mutex::Locker l(lock);
-      while (queue_init_done < cores)
-        cond.Wait(lock);
+      std::unique_lock l{lock};
+      cond.wait(l, [] { return queue_init_done >= cores; });
     }
 
     if (sdev->init_port_fini() < 0) {
       lderr(cct) << __func__ << " init_port_fini failed " << dendl;
       ceph_abort();
     }
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     create_stage = DONE;
-    cond.Signal();
+    cond.notify_all();
   } else {
-    Mutex::Locker l(lock);
-    while (create_stage <= WAIT_PORT_FIN_STAGE)
-      cond.Wait(lock);
+    std::unique_lock  l{lock};
+    cond.wait(l, [&] { return create_stage > WAIT_PORT_FIN_STAGE; });
   }
 
   sdev->workers[i] = this;
   _impl = std::unique_ptr<DPDKWorker::Impl>(
           new DPDKWorker::Impl(cct, i, &center, sdev));
   {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     if (!--queue_init_done) {
       create_stage = WAIT_DEVICE_STAGE;
       sdev.reset();
index 167d11eb468740ea8a0354976b8a0fe5203d8538..38851c300ac6985a1c3280a07c2ec80f5e53383a 100644 (file)
@@ -580,7 +580,7 @@ void Infiniband::MemoryManager::Chunk::clear()
 }
 
 Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
-  : manager(m), buffer_size(s), lock("cluster_lock")
+  : manager(m), buffer_size(s)
 {
 }
 
@@ -622,7 +622,7 @@ int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
 
 void Infiniband::MemoryManager::Cluster::take_back(std::vector<Chunk*> &ck)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   for (auto c : ck) {
     c->clear();
     free_chunks.push_back(c);
@@ -635,7 +635,7 @@ int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks,
   if (bytes % buffer_size == 0)
     --num;
   int r = num;
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (free_chunks.empty())
     return 0;
   if (!bytes) {
@@ -696,7 +696,7 @@ void *Infiniband::MemoryManager::mem_pool::slow_malloc()
 {
   void *p;
 
-  Mutex::Locker l(PoolAllocator::lock);
+  std::lock_guard l{PoolAllocator::lock};
   PoolAllocator::g_ctx = ctx;
   // this will trigger pool expansion via PoolAllocator::malloc()
   p = boost::pool<PoolAllocator>::malloc();
@@ -705,7 +705,8 @@ void *Infiniband::MemoryManager::mem_pool::slow_malloc()
 }
 
 Infiniband::MemoryManager::MemPoolContext *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr;
-Mutex Infiniband::MemoryManager::PoolAllocator::lock("pool-alloc-lock");
+ceph::mutex Infiniband::MemoryManager::PoolAllocator::lock =
+                           ceph::make_mutex("pool-alloc-lock");
 
 // lock is taken by mem_pool::slow_malloc()
 char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes)
@@ -765,7 +766,7 @@ char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes)
 void Infiniband::MemoryManager::PoolAllocator::free(char * const block)
 {
   mem_info *m;
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
     
   m = reinterpret_cast<mem_info *>(block) - 1;
   m->ctx->update_stats(-m->nbufs);
@@ -887,7 +888,7 @@ void Infiniband::verify_prereq(CephContext *cct) {
 }
 
 Infiniband::Infiniband(CephContext *cct)
-  : cct(cct), lock("IB lock"),
+  : cct(cct),
     device_name(cct->_conf->ms_async_rdma_device_name),
     port_num( cct->_conf->ms_async_rdma_port_num)
 {
@@ -898,7 +899,7 @@ Infiniband::Infiniband(CephContext *cct)
 
 void Infiniband::init()
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
 
   if (initialized)
     return;
index 9096aaeca93ae48562c4050f02c06446a85672bc..b1aec88400d7e088efb233f71c787b6546b1e3f2 100644 (file)
@@ -32,7 +32,7 @@
 #include "include/page.h"
 #include "common/debug.h"
 #include "common/errno.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "common/perf_counters.h"
 #include "msg/msg_types.h"
 #include "msg/async/net_handler.h"
@@ -236,7 +236,7 @@ class Infiniband {
       MemoryManager& manager;
       uint32_t buffer_size;
       uint32_t num_chunk = 0;
-      Mutex lock;
+      ceph::mutex lock = ceph::make_mutex("cluster_lock");
       std::vector<Chunk*> free_chunks;
       char *base = nullptr;
       char *end = nullptr;
@@ -275,7 +275,7 @@ class Infiniband {
       static void free(char * const block);
 
       static MemPoolContext  *g_ctx;
-      static Mutex lock;
+      static ceph::mutex lock;
     };
 
     /**
@@ -366,7 +366,7 @@ class Infiniband {
   void wire_gid_to_gid(const char *wgid, IBSYNMsg* im);
   void gid_to_wire_gid(const IBSYNMsg& im, char wgid[]);
   CephContext *cct;
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("IB lock");
   bool initialized = false;
   const std::string &device_name;
   uint8_t port_num;
index bf9d072ef9a55019ef557e6e119b1fc3f587f58c..ec10aa8204420aa3588019a2c7340fd315fb2239 100644 (file)
@@ -22,7 +22,7 @@
 RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
                                                 RDMAWorker *w)
   : cct(cct), connected(0), error(0), infiniband(ib),
-    dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
+    dispatcher(s), worker(w),
     is_server(false), con_handler(new C_handle_connection(this)),
     active(false), pending(false)
 {
@@ -54,7 +54,7 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
     dispatcher->post_chunk_to_pool(buffers[i]);
   }
 
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (notify_fd >= 0)
     ::close(notify_fd);
   if (tcp_fd >= 0)
@@ -64,7 +64,7 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
 
 void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (wc.empty())
     wc = std::move(v);
   else
@@ -74,7 +74,7 @@ void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
 
 void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (wc.empty())
     return ;
   w.swap(wc);
@@ -421,7 +421,7 @@ ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
   if (!bytes)
     return 0;
   {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     pending_bl.claim_append(bl);
     if (!connected) {
       ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
@@ -439,7 +439,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
 {
   if (error)
     return -error;
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   size_t bytes = pending_bl.length();
   ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
                  << pending_bl.buffers().size() << dendl;
index f63a8e7d1f2b4ea37a30d22106bede264e5f8d41..5c11a8c8afc9b17e12f50f7107be9217852c51c4 100644 (file)
@@ -44,8 +44,8 @@ RDMADispatcher::~RDMADispatcher()
 }
 
 RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
-  : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
-  w_lock("RDMADispatcher::for worker pending list"), stack(s)
+  : cct(c), async_handler(new C_handle_cq_async(this)),
+  stack(s)
 {
   PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
 
@@ -80,7 +80,7 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
 void RDMADispatcher::polling_start()
 {
   // take lock because listen/connect can happen from different worker threads
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
 
   if (t.joinable()) 
     return; // dispatcher thread already running 
@@ -103,7 +103,7 @@ void RDMADispatcher::polling_start()
 void RDMADispatcher::polling_stop()
 {
   {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     done = true;
   }
 
@@ -139,7 +139,7 @@ void RDMADispatcher::handle_async_event()
       uint64_t qpn = async_event.element.qp->qp_num;
       ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
                      << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
-      Mutex::Locker l(lock);
+      std::lock_guard l{lock};
       RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
       if (!conn) {
         ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
@@ -160,14 +160,14 @@ void RDMADispatcher::handle_async_event()
 
 void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   get_stack()->get_infiniband().post_chunk_to_pool(chunk);
   perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
 }
 
 int RDMADispatcher::post_chunks_to_rq(int num, ibv_qp *qp)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   return get_stack()->get_infiniband().post_chunks_to_rq(num, qp);
 }
 
@@ -199,7 +199,7 @@ void RDMADispatcher::polling()
       perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
       perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
 
-      Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+      std::lock_guard l{lock};//make sure connected socket alive when pass wc
 
       for (int i = 0; i < rx_ret; ++i) {
         ibv_wc* response = &wc[i];
@@ -243,7 +243,7 @@ void RDMADispatcher::polling()
       // because we need to check qp's state before sending
       perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
       if (num_dead_queue_pair) {
-        Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
+       std::lock_guard l{lock}; // FIXME reuse dead qp because creating one qp costs 1 ms
         auto it = dead_queue_pairs.begin();
         while (it != dead_queue_pairs.end()) {
           auto i = *it;
@@ -308,7 +308,7 @@ void RDMADispatcher::notify_pending_workers() {
   if (num_pending_workers) {
     RDMAWorker *w = nullptr;
     {
-      Mutex::Locker l(w_lock);
+      std::lock_guard l{w_lock};
       if (!pending_workers.empty()) {
         w = pending_workers.front();
         pending_workers.pop_front();
@@ -322,7 +322,7 @@ void RDMADispatcher::notify_pending_workers() {
 
 void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   ceph_assert(!qp_conns.count(qp->get_local_qp_number()));
   qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
   ++num_qp_conn;
@@ -340,7 +340,7 @@ RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
 
 Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   // Try to find the QP in qp_conns firstly.
   auto it = qp_conns.find(qp);
   if (it != qp_conns.end())
@@ -367,7 +367,7 @@ void RDMADispatcher::erase_qpn_lockless(uint32_t qpn)
 
 void RDMADispatcher::erase_qpn(uint32_t qpn)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   erase_qpn_lockless(qpn);
 }
 
@@ -400,7 +400,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
         ldout(cct, 1) << __func__ << " send work request returned error for buffer("
                       << response->wr_id << ") status(" << response->status << "): "
                       << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
-        Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+       std::lock_guard l{lock};//make sure connected socket alive when pass wc
         RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
 
         if (conn && conn->is_connected()) {
@@ -451,7 +451,7 @@ void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
 
 RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
   : Worker(c, i), stack(nullptr),
-    tx_handler(new C_handle_cq_tx(this)), lock("RDMAWorker::lock")
+    tx_handler(new C_handle_cq_tx(this))
 {
   // initialize perf_logger
   char name[128];
index e038d3625986aa173d9c588ec8532c55467613eb..948d1fce2a0e37a2a0b2c03eee556a3dcc8702f9 100644 (file)
@@ -47,7 +47,8 @@ class RDMADispatcher {
   bool done = false;
   std::atomic<uint64_t> num_dead_queue_pair = {0};
   std::atomic<uint64_t> num_qp_conn = {0};
-  Mutex lock; // protect `qp_conns`, `dead_queue_pairs`
+  // protect `qp_conns`, `dead_queue_pairs`
+  ceph::mutex lock = ceph::make_mutex("RDMADispatcher::lock");
   // qp_num -> InfRcConnection
   // The main usage of `qp_conns` is looking up connection by qp_num,
   // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
@@ -71,7 +72,9 @@ class RDMADispatcher {
   std::vector<QueuePair*> dead_queue_pairs;
 
   std::atomic<uint64_t> num_pending_workers = {0};
-  Mutex w_lock; // protect pending workers
+  // protect pending workers
+  ceph::mutex w_lock =
+    ceph::make_mutex("RDMADispatcher::for worker pending list");
   // fixme: lockfree
   std::list<RDMAWorker*> pending_workers;
   RDMAStack* stack;
@@ -98,7 +101,7 @@ class RDMADispatcher {
   void polling();
   void register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
   void make_pending_worker(RDMAWorker* w) {
-    Mutex::Locker l(w_lock);
+    std::lock_guard l{w_lock};
     auto it = std::find(pending_workers.begin(), pending_workers.end(), w);
     if (it != pending_workers.end())
       return;
@@ -132,7 +135,7 @@ class RDMAWorker : public Worker {
   EventCallbackRef tx_handler;
   std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
   RDMADispatcher* dispatcher = nullptr;
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock");
 
   class C_handle_cq_tx : public EventCallback {
     RDMAWorker *worker;
@@ -193,7 +196,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
   int notify_fd = -1;
   bufferlist pending_bl;
 
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("RDMAConnectedSocketImpl::lock");
   std::vector<ibv_wc> wc;
   bool is_server;
   EventCallbackRef con_handler;
index 9a02b3485a830a7454bcd94d0b781e92a7503779..f9f3b617ea385f4a27e4c0f960e401200d4b44f5 100644 (file)
@@ -76,23 +76,23 @@ class MessengerClient {
     ClientDispatcher dispatcher;
 
    public:
-    Mutex lock;
-    Cond cond;
+    ceph::mutex lock = ceph::make_mutex("MessengerBenchmark::ClientThread::lock");
+    ceph::condition_variable cond;
     uint64_t inflight;
 
     ClientThread(Messenger *m, int c, ConnectionRef con, int len, int ops, int think_time_us):
         msgr(m), concurrent(c), conn(con), oid("object-name"), oloc(1, 1), msg_len(len), ops(ops),
-        dispatcher(think_time_us, this), lock("MessengerBenchmark::ClientThread::lock"), inflight(0) {
+        dispatcher(think_time_us, this), inflight(0) {
       m->add_dispatcher_head(&dispatcher);
       bufferptr ptr(msg_len);
       memset(ptr.c_str(), 0, msg_len);
       data.append(ptr);
     }
     void *entry() override {
-      lock.Lock();
+      std::unique_lock locker{lock};
       for (int i = 0; i < ops; ++i) {
         if (inflight > uint64_t(concurrent)) {
-          cond.Wait(lock);
+         cond.wait(locker);
         }
        hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(),
                       oloc.nspace);
@@ -104,7 +104,7 @@ class MessengerClient {
         conn->send_message(m);
         //cerr << __func__ << " send m=" << m << std::endl;
       }
-      lock.Unlock();
+      locker.unlock();
       msgr->shutdown();
       return 0;
     }
@@ -159,9 +159,9 @@ class MessengerClient {
 void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message *m) {
   usleep(think_time);
   m->put();
-  Mutex::Locker l(thread->lock);
+  std::lock_guard l{thread->lock};
   thread->inflight--;
-  thread->cond.Signal();
+  thread->cond.notify_all();
 }
 
 
index 3077c1f13e663cabb87483ac748eb5d10d11f27b..fab2e33b3904d79bb49a7ff485f81eb52e6bb3f0 100644 (file)
@@ -976,8 +976,8 @@ class StressFactory {
   }
 
   void add_client(ThreadData *t_data) {
-    static Mutex lock("add_client_lock");
-    Mutex::Locker l(lock);
+    static ceph::mutex lock = ceph::make_mutex("add_client_lock");
+    std::lock_guard l{lock};
     ConnectedSocket sock;
     int r = t_data->worker->connect(bind_addr, options, &sock);
     std::default_random_engine rng(rd());