]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
common/WorkQueue: Mutex -> ceph::mutex
authorSage Weil <sage@redhat.com>
Fri, 19 Oct 2018 18:31:07 +0000 (13:31 -0500)
committerKefu Chai <kchai@redhat.com>
Wed, 21 Nov 2018 03:56:33 +0000 (11:56 +0800)
Lots of #include push-downs as a result.

Signed-off-by: Sage Weil <sage@redhat.com>
15 files changed:
src/common/WorkQueue.cc
src/common/WorkQueue.h
src/librbd/ManagedLock.cc
src/librbd/MirroringWatcher.cc
src/librbd/api/Image.cc
src/librbd/api/Trash.cc
src/librbd/io/ImageRequestWQ.cc
src/osd/OSDMapMapping.h
src/test/librbd/test_MirroringWatcher.cc
src/test/librbd/test_ObjectMap.cc
src/test/objectstore/TestObjectStoreState.h
src/test/rbd_mirror/test_InstanceWatcher.cc
src/test/rbd_mirror/test_Instances.cc
src/test/rbd_mirror/test_LeaderWatcher.cc
src/tools/rbd_mirror/InstanceWatcher.cc

index 0c64af848fc2b85b762b03953d6614d45604b3b7..ba7579002553630ef39aef042596b7c1ea9d2a2e 100644 (file)
@@ -24,7 +24,7 @@
 ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option)
   : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)),
     lockname(name + "::lock"),
-    _lock(lockname.c_str()),  // this should be safe due to declaration order
+    _lock(ceph::make_mutex(lockname)),  // this should be safe due to declaration order
     _stop(false),
     _pause(0),
     _draining(0),
@@ -73,7 +73,7 @@ void ThreadPool::handle_conf_change(const ConfigProxy& conf,
       _lock.lock();
       _num_threads = v;
       start_threads();
-      _cond.SignalAll();
+      _cond.notify_all();
       _lock.unlock();
     }
   }
@@ -81,7 +81,7 @@ void ThreadPool::handle_conf_change(const ConfigProxy& conf,
 
 void ThreadPool::worker(WorkThread *wt)
 {
-  _lock.lock();
+  std::unique_lock ul(_lock);
   ldout(cct,10) << "worker start" << dendl;
   
   std::stringstream ss;
@@ -114,15 +114,15 @@ void ThreadPool::worker(WorkThread *wt)
                        << " (" << processing << " active)" << dendl;
          TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
          tp_handle.reset_tp_timeout();
-         _lock.unlock();
+         ul.unlock();
          wq->_void_process(item, tp_handle);
-         _lock.lock();
+         ul.lock();
          wq->_void_process_finish(item);
          processing--;
          ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
                        << " (" << processing << " active)" << dendl;
          if (_pause || _draining)
-           _wait_cond.Signal();
+           _wait_cond.notify_all();
          did = true;
          break;
        }
@@ -136,20 +136,18 @@ void ThreadPool::worker(WorkThread *wt)
       hb,
       cct->_conf->threadpool_default_timeout,
       0);
-    _cond.WaitInterval(_lock,
-      utime_t(
-       cct->_conf->threadpool_empty_queue_max_wait, 0));
+    auto wait = std::chrono::seconds(
+      cct->_conf->threadpool_empty_queue_max_wait);
+    _cond.wait_for(ul, wait);
   }
   ldout(cct,1) << "worker finish" << dendl;
 
   cct->get_heartbeat_map()->remove_worker(hb);
-
-  _lock.unlock();
 }
 
 void ThreadPool::start_threads()
 {
-  ceph_assert(_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(_lock));
   while (_threads.size() < _num_threads) {
     WorkThread *wt = new WorkThread(this);
     ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
@@ -161,7 +159,7 @@ void ThreadPool::start_threads()
 
 void ThreadPool::join_old_threads()
 {
-  ceph_assert(_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(_lock));
   while (!_old_threads.empty()) {
     ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;
     _old_threads.front()->join();
@@ -196,7 +194,7 @@ void ThreadPool::stop(bool clear_after)
 
   _lock.lock();
   _stop = true;
-  _cond.Signal();
+  _cond.notify_all();
   join_old_threads();
   _lock.unlock();
   for (set<WorkThread*>::iterator p = _threads.begin();
@@ -216,12 +214,12 @@ void ThreadPool::stop(bool clear_after)
 
 void ThreadPool::pause()
 {
+  std::unique_lock ul(_lock);
   ldout(cct,10) << "pause" << dendl;
-  _lock.lock();
   _pause++;
-  while (processing)
-    _wait_cond.Wait(_lock);
-  _lock.unlock();
+  while (processing) {
+    _wait_cond.wait(ul);
+  }
   ldout(cct,15) << "paused" << dendl;
 }
 
@@ -239,19 +237,19 @@ void ThreadPool::unpause()
   _lock.lock();
   ceph_assert(_pause > 0);
   _pause--;
-  _cond.Signal();
+  _cond.notify_all();
   _lock.unlock();
 }
 
 void ThreadPool::drain(WorkQueue_* wq)
 {
+  std::unique_lock ul(_lock);
   ldout(cct,10) << "drain" << dendl;
-  _lock.lock();
   _draining++;
-  while (processing || (wq != NULL && !wq->_empty()))
-    _wait_cond.Wait(_lock);
+  while (processing || (wq != NULL && !wq->_empty())) {
+    _wait_cond.wait(ul);
+  }
   _draining--;
-  _lock.unlock();
 }
 
 ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
@@ -260,7 +258,7 @@ ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
   name(std::move(nm)),
   thread_name(std::move(tn)),
   lockname(name + "::lock"),
-  shardedpool_lock(lockname.c_str()),
+  shardedpool_lock(ceph::make_mutex(lockname)),
   num_threads(pnum_threads),
   num_paused(0),
   num_drained(0),
@@ -277,36 +275,34 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
 
   while (!stop_threads) {
     if (pause_threads) {
-      shardedpool_lock.lock();
+      std::unique_lock ul(shardedpool_lock);
       ++num_paused;
-      wait_cond.Signal();
+      wait_cond.notify_all();
       while (pause_threads) {
        cct->get_heartbeat_map()->reset_timeout(
                hb,
                wq->timeout_interval, wq->suicide_interval);
-       shardedpool_cond.WaitInterval(shardedpool_lock,
-          utime_t(
-          cct->_conf->threadpool_empty_queue_max_wait, 0));
+       shardedpool_cond.wait_for(
+        ul,
+        std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
       }
       --num_paused;
-      shardedpool_lock.unlock();
     }
     if (drain_threads) {
-      shardedpool_lock.lock();
+      std::unique_lock ul(shardedpool_lock);
       if (wq->is_shard_empty(thread_index)) {
         ++num_drained;
-        wait_cond.Signal();
+        wait_cond.notify_all();
         while (drain_threads) {
          cct->get_heartbeat_map()->reset_timeout(
            hb,
            wq->timeout_interval, wq->suicide_interval);
-          shardedpool_cond.WaitInterval(shardedpool_lock,
-           utime_t(
-             cct->_conf->threadpool_empty_queue_max_wait, 0));
+          shardedpool_cond.wait_for(
+           ul,
+           std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
         }
         --num_drained;
       }
-      shardedpool_lock.unlock();
     }
 
     cct->get_heartbeat_map()->reset_timeout(
@@ -324,7 +320,7 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
 
 void ShardedThreadPool::start_threads()
 {
-  ceph_assert(shardedpool_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(shardedpool_lock));
   int32_t thread_index = 0;
   while (threads_shardedpool.size() < num_threads) {
 
@@ -364,15 +360,14 @@ void ShardedThreadPool::stop()
 
 void ShardedThreadPool::pause()
 {
+  std::unique_lock ul(shardedpool_lock);
   ldout(cct,10) << "pause" << dendl;
-  shardedpool_lock.lock();
   pause_threads = true;
   ceph_assert(wq != NULL);
   wq->return_waiting_threads();
   while (num_threads != num_paused){
-    wait_cond.Wait(shardedpool_lock);
+    wait_cond.wait(ul);
   }
-  shardedpool_lock.unlock();
   ldout(cct,10) << "paused" << dendl; 
 }
 
@@ -393,25 +388,24 @@ void ShardedThreadPool::unpause()
   shardedpool_lock.lock();
   pause_threads = false;
   wq->stop_return_waiting_threads();
-  shardedpool_cond.Signal();
+  shardedpool_cond.notify_all();
   shardedpool_lock.unlock();
   ldout(cct,10) << "unpaused" << dendl;
 }
 
 void ShardedThreadPool::drain()
 {
+  std::unique_lock ul(shardedpool_lock);
   ldout(cct,10) << "drain" << dendl;
-  shardedpool_lock.lock();
   drain_threads = true;
   ceph_assert(wq != NULL);
   wq->return_waiting_threads();
   while (num_threads != num_drained) {
-    wait_cond.Wait(shardedpool_lock);
+    wait_cond.wait(ul);
   }
   drain_threads = false;
   wq->stop_return_waiting_threads();
-  shardedpool_cond.Signal();
-  shardedpool_lock.unlock();
+  shardedpool_cond.notify_all();
   ldout(cct,10) << "drained" << dendl;
 }
 
index f1b902c73c50ecefa272bd4dd5186f763df1c595..f3424f5fa3291e70e98f357778ae6ff5a8c5ca9d 100644 (file)
 #ifndef CEPH_WORKQUEUE_H
 #define CEPH_WORKQUEUE_H
 
-#include "Cond.h"
+#include <atomic>
+#include <list>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "common/ceph_mutex.h"
 #include "include/unordered_map.h"
 #include "common/config_obs.h"
 #include "common/HeartbeatMap.h"
-
-#include <atomic>
+#include "common/Thread.h"
+#include "include/Context.h"
 
 class CephContext;
 
 /// Pool of threads that share work submitted to multiple work queues.
 class ThreadPool : public md_config_obs_t {
   CephContext *cct;
-  string name;
-  string thread_name;
-  string lockname;
-  Mutex _lock;
-  Cond _cond;
+  std::string name;
+  std::string thread_name;
+  std::string lockname;
+  ceph::mutex _lock;
+  ceph::condition_variable _cond;
   bool _stop;
   int _pause;
   int _draining;
-  Cond _wait_cond;
+  ceph::condition_variable _wait_cond;
 
 public:
   class TPHandle {
@@ -58,9 +64,9 @@ private:
 
   /// Basic interface to a work queue used by the worker threads.
   struct WorkQueue_ {
-    string name;
+    std::string name;
     time_t timeout_interval, suicide_interval;
-    WorkQueue_(string n, time_t ti, time_t sti)
+    WorkQueue_(std::string n, time_t ti, time_t sti)
       : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
     { }
     virtual ~WorkQueue_() {}
@@ -83,7 +89,7 @@ private:
 
   // track thread pool size changes
   unsigned _num_threads;
-  string _thread_num_option;
+  std::string _thread_num_option;
   const char **_conf_keys;
 
   const char **get_tracked_conf_keys() const override {
@@ -102,12 +108,12 @@ public:
 
     virtual bool _enqueue(T *) = 0;
     virtual void _dequeue(T *) = 0;
-    virtual void _dequeue(list<T*> *) = 0;
-    virtual void _process_finish(const list<T*> &) {}
+    virtual void _dequeue(std::list<T*> *) = 0;
+    virtual void _process_finish(const std::list<T*> &) {}
 
     // virtual methods from WorkQueue_ below
     void *_void_dequeue() override {
-      list<T*> *out(new list<T*>);
+      std::list<T*> *out(new std::list<T*>);
       _dequeue(out);
       if (!out->empty()) {
        return (void *)out;
@@ -117,18 +123,18 @@ public:
       }
     }
     void _void_process(void *p, TPHandle &handle) override {
-      _process(*((list<T*>*)p), handle);
+      _process(*((std::list<T*>*)p), handle);
     }
     void _void_process_finish(void *p) override {
-      _process_finish(*(list<T*>*)p);
-      delete (list<T*> *)p;
+      _process_finish(*(std::list<T*>*)p);
+      delete (std::list<T*> *)p;
     }
 
   protected:
-    virtual void _process(const list<T*> &items, TPHandle &handle) = 0;
+    virtual void _process(const std::list<T*> &items, TPHandle &handle) = 0;
 
   public:
-    BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
+    BatchWorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
       : WorkQueue_(std::move(n), ti, sti), pool(p) {
       pool->add_work_queue(this);
     }
@@ -139,7 +145,7 @@ public:
     bool queue(T *item) {
       pool->_lock.lock();
       bool r = _enqueue(item);
-      pool->_cond.SignalOne();
+      pool->_cond.notify_one();
       pool->_lock.unlock();
       return r;
     }
@@ -179,10 +185,10 @@ public:
    * construction and remove itself on destruction. */
   template<typename T, typename U = T>
   class WorkQueueVal : public WorkQueue_ {
-    Mutex _lock;
+    ceph::mutex _lock = ceph::make_mutex("WorkQueueVal::_lock");
     ThreadPool *pool;
-    list<U> to_process;
-    list<U> to_finish;
+    std::list<U> to_process;
+    std::list<U> to_finish;
     virtual void _enqueue(T) = 0;
     virtual void _enqueue_front(T) = 0;
     bool _empty() override = 0;
@@ -226,8 +232,8 @@ public:
     void _clear() override {}
 
   public:
-    WorkQueueVal(string n, time_t ti, time_t sti, ThreadPool *p)
-      : WorkQueue_(std::move(n), ti, sti), _lock("WorkQueueVal::lock"), pool(p) {
+    WorkQueueVal(std::string n, time_t ti, time_t sti, ThreadPool *p)
+      : WorkQueue_(std::move(n), ti, sti), pool(p) {
       pool->add_work_queue(this);
     }
     ~WorkQueueVal() override {
@@ -236,12 +242,12 @@ public:
     void queue(T item) {
       std::lock_guard l(pool->_lock);
       _enqueue(item);
-      pool->_cond.SignalOne();
+      pool->_cond.notify_one();
     }
     void queue_front(T item) {
       std::lock_guard l(pool->_lock);
       _enqueue_front(item);
-      pool->_cond.SignalOne();
+      pool->_cond.notify_one();
     }
     void drain() {
       pool->drain(this);
@@ -289,7 +295,7 @@ public:
     virtual void _process(T *t, TPHandle &) = 0;
 
   public:
-    WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
+    WorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
       : WorkQueue_(std::move(n), ti, sti), pool(p) {
       pool->add_work_queue(this);
     }
@@ -300,7 +306,7 @@ public:
     bool queue(T *item) {
       pool->_lock.lock();
       bool r = _enqueue(item);
-      pool->_cond.SignalOne();
+      pool->_cond.notify_one();
       pool->_lock.unlock();
       return r;
     }
@@ -359,29 +365,29 @@ public:
     void queue(T *item) {
       std::lock_guard l(m_pool->_lock);
       m_items.push_back(item);
-      m_pool->_cond.SignalOne();
+      m_pool->_cond.notify_one();
     }
     bool empty() {
       std::lock_guard l(m_pool->_lock);
       return _empty();
     }
   protected:
-    PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
+    PointerWQ(std::string n, time_t ti, time_t sti, ThreadPool* p)
       : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
     }
     void register_work_queue() {
       m_pool->add_work_queue(this);
     }
     void _clear() override {
-      ceph_assert(m_pool->_lock.is_locked());
+      ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
       m_items.clear();
     }
     bool _empty() override {
-      ceph_assert(m_pool->_lock.is_locked());
+      ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
       return m_items.empty();
     }
     void *_void_dequeue() override {
-      ceph_assert(m_pool->_lock.is_locked());
+      ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
       if (m_items.empty()) {
         return NULL;
       }
@@ -395,7 +401,7 @@ public:
       process(reinterpret_cast<T *>(item));
     }
     void _void_process_finish(void *item) override {
-      ceph_assert(m_pool->_lock.is_locked());
+      ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
       ceph_assert(m_processing > 0);
       --m_processing;
     }
@@ -407,7 +413,7 @@ public:
     }
 
     T *front() {
-      ceph_assert(m_pool->_lock.is_locked());
+      ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
       if (m_items.empty()) {
         return NULL;
       }
@@ -420,9 +426,9 @@ public:
     }
     void signal() {
       std::lock_guard pool_locker(m_pool->_lock);
-      m_pool->_cond.SignalOne();
+      m_pool->_cond.notify_one();
     }
-    Mutex &get_pool_lock() {
+    ceph::mutex &get_pool_lock() {
       return m_pool->_lock;
     }
   private:
@@ -431,7 +437,7 @@ public:
     uint32_t m_processing;
   };
 private:
-  vector<WorkQueue_*> work_queues;
+  std::vector<WorkQueue_*> work_queues;
   int next_work_queue = 0;
  
 
@@ -446,8 +452,8 @@ private:
     }
   };
   
-  set<WorkThread*> _threads;
-  list<WorkThread*> _old_threads;  ///< need to be joined
+  std::set<WorkThread*> _threads;
+  std::list<WorkThread*> _old_threads;  ///< need to be joined
   int processing;
 
   void start_threads();
@@ -455,7 +461,7 @@ private:
   void worker(WorkThread *wt);
 
 public:
-  ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option = NULL);
+  ThreadPool(CephContext *cct_, std::string nm, std::string tn, int n, const char *option = NULL);
   ~ThreadPool() override;
 
   /// return number of threads currently running
@@ -491,21 +497,23 @@ public:
   }
 
   /// wait for a kick on this thread pool
-  void wait(Cond &c) {
-    c.Wait(_lock);
+  void wait(ceph::condition_variable &c) {
+    std::unique_lock l(_lock, std::adopt_lock);
+    c.wait(l);
   }
 
   /// wake up a waiter (with lock already held)
   void _wake() {
-    _cond.Signal();
+    _cond.notify_all();
   }
   /// wake up a waiter (without lock held)
   void wake() {
     std::lock_guard l(_lock);
-    _cond.Signal();
+    _cond.notify_all();
   }
   void _wait() {
-    _cond.Wait(_lock);
+    std::unique_lock l(_lock, std::adopt_lock);
+    _cond.wait(l);
   }
 
   /// start thread pool thread
@@ -527,9 +535,9 @@ public:
 
 class GenContextWQ :
   public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
-  list<GenContext<ThreadPool::TPHandle&>*> _queue;
+  std::list<GenContext<ThreadPool::TPHandle&>*> _queue;
 public:
-  GenContextWQ(const string &name, time_t ti, ThreadPool *tp)
+  GenContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
     : ThreadPool::WorkQueueVal<
       GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
   
@@ -569,9 +577,8 @@ public:
 /// @see Finisher
 class ContextWQ : public ThreadPool::PointerWQ<Context> {
 public:
-  ContextWQ(const string &name, time_t ti, ThreadPool *tp)
-    : ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
-      m_lock("ContextWQ::m_lock") {
+  ContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
+    : ThreadPool::PointerWQ<Context>(name, ti, 0, tp) {
     this->register_work_queue();
   }
 
@@ -604,19 +611,19 @@ protected:
     ctx->complete(result);
   }
 private:
-  Mutex m_lock;
+  ceph::mutex m_lock = ceph::make_mutex("ContextWQ::m_lock");
   ceph::unordered_map<Context*, int> m_context_results;
 };
 
 class ShardedThreadPool {
 
   CephContext *cct;
-  string name;
-  string thread_name;
-  string lockname;
-  Mutex shardedpool_lock;
-  Cond shardedpool_cond;
-  Cond wait_cond;
+  std::string name;
+  std::string thread_name;
+  std::string lockname;
+  ceph::mutex shardedpool_lock;
+  ceph::condition_variable shardedpool_cond;
+  ceph::condition_variable wait_cond;
   uint32_t num_threads;
 
   std::atomic<bool> stop_threads = { false };
@@ -685,7 +692,7 @@ private:
     }
   };
 
-  vector<WorkThreadSharded*> threads_shardedpool;
+  std::vector<WorkThreadSharded*> threads_shardedpool;
   void start_threads();
   void shardedthreadpool_worker(uint32_t thread_index);
   void set_wq(BaseShardedWQ* swq) {
@@ -696,7 +703,7 @@ private:
 
 public:
 
-  ShardedThreadPool(CephContext *cct_, string nm, string tn, uint32_t pnum_threads);
+  ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads);
 
   ~ShardedThreadPool(){};
 
index 1e4ced6095c1575b7667088f03c17d6c5d54327a..6118cd09cfa6eed920fb3c63d22ccad25543d721 100644 (file)
@@ -14,6 +14,7 @@
 #include "cls/lock/cls_lock_client.h"
 #include "common/dout.h"
 #include "common/errno.h"
+#include "common/Cond.h"
 #include "common/WorkQueue.h"
 #include "librbd/Utils.h"
 
index 757f56912ff8c7f2db0c76eb4ddb9e1bac2a58cf..f22dc149b6b9c318d044c1bca2b0f0e70560c159 100644 (file)
@@ -5,6 +5,7 @@
 #include "include/rbd_types.h"
 #include "include/rados/librados.hpp"
 #include "common/errno.h"
+#include "common/Cond.h"
 #include "librbd/Utils.h"
 #include "librbd/watcher/Utils.h"
 
index c2d887e48f9da617d89366ac915555b7a46364d0..af2274d13c038ad03a1562e009e514bac7c484ba 100644 (file)
@@ -5,6 +5,7 @@
 #include "include/rados/librados.hpp"
 #include "common/dout.h"
 #include "common/errno.h"
+#include "common/Cond.h"
 #include "cls/rbd/cls_rbd_client.h"
 #include "librbd/DeepCopyRequest.h"
 #include "librbd/ExclusiveLock.h"
index b6f4cafdcf347718d509479ef478a783076c8482..c8c40a1eefebf61473f4d7fff8055e00f4ed11c4 100644 (file)
@@ -5,6 +5,7 @@
 #include "include/rados/librados.hpp"
 #include "common/dout.h"
 #include "common/errno.h"
+#include "common/Cond.h"
 #include "cls/rbd/cls_rbd_client.h"
 #include "librbd/ExclusiveLock.h"
 #include "librbd/ImageCtx.h"
index ee07e10f740fc2f3dc26f54a504937cf6f24296c..9aef36cc40097e38d5b37cddcf10abd1683965e8 100644 (file)
@@ -4,6 +4,7 @@
 #include "librbd/io/ImageRequestWQ.h"
 #include "common/errno.h"
 #include "common/zipkin_trace.h"
+#include "common/Cond.h"
 #include "librbd/ExclusiveLock.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/ImageState.h"
@@ -738,7 +739,7 @@ void *ImageRequestWQ<I>::_void_dequeue() {
   ceph_assert(peek_item == item);
 
   if (lock_required) {
-    this->get_pool_lock().Unlock();
+    this->get_pool_lock().unlock();
     m_image_ctx.owner_lock.get_read();
     if (m_image_ctx.exclusive_lock != nullptr) {
       ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl;
@@ -759,7 +760,7 @@ void *ImageRequestWQ<I>::_void_dequeue() {
       lock_required = false;
     }
     m_image_ctx.owner_lock.put_read();
-    this->get_pool_lock().Lock();
+    this->get_pool_lock().lock();
 
     if (lock_required) {
       return nullptr;
@@ -772,9 +773,9 @@ void *ImageRequestWQ<I>::_void_dequeue() {
     // stall IO until the refresh completes
     ++m_io_blockers;
 
-    this->get_pool_lock().Unlock();
+    this->get_pool_lock().unlock();
     m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
-    this->get_pool_lock().Lock();
+    this->get_pool_lock().lock();
     return nullptr;
   }
 
index e2a52797458d1f26f2398cf35db093dd5f9da5bc..9ceef9ff0664cae03172e6881af2ae14234767e8 100644 (file)
@@ -10,6 +10,7 @@
 
 #include "osd/osd_types.h"
 #include "common/WorkQueue.h"
+#include "common/Cond.h"
 
 class OSDMap;
 
index 7b49b71a8d8bda79ca2cabffe64bd6ddaf3d24c4..6038b5540ad3d3930fd711952dffc12d18ad3eee 100644 (file)
@@ -5,6 +5,7 @@
 #include "test/librbd/test_support.h"
 #include "include/rbd_types.h"
 #include "librbd/MirroringWatcher.h"
+#include "common/Cond.h"
 #include "gtest/gtest.h"
 #include "gmock/gmock.h"
 #include <list>
index f5e34c860b2374e3be3a14bb9953560208e193cd..de23f76aab390ac4016dc48d8e5b87295ae7d782 100644 (file)
@@ -8,6 +8,7 @@
 #include "librbd/ImageWatcher.h"
 #include "librbd/internal.h"
 #include "librbd/ObjectMap.h"
+#include "common/Cond.h"
 #include "cls/rbd/cls_rbd_client.h"
 #include <list>
 
index 0d07bc2249b0d0b7f1bf9b40cd5f664eb745e018..4383808891c4ba18a70a4d946d2aacc326dc8b79 100644 (file)
 #ifndef TEST_OBJECTSTORE_STATE_H_
 #define TEST_OBJECTSTORE_STATE_H_
 
-#include "os/ObjectStore.h"
 #include <boost/scoped_ptr.hpp>
 #include <boost/random/mersenne_twister.hpp>
 #include <boost/random/uniform_int.hpp>
 #include <map>
 #include <vector>
 
+#include "os/ObjectStore.h"
+#include "common/Cond.h"
+
 typedef boost::mt11213b rngen_t;
 
 class TestObjectStoreState {
index d495a9ea5c4e9df79051b643730f1792f47933cb..92a8f94360b3b385c58847b8127dc4d648fdbc3d 100644 (file)
@@ -10,6 +10,7 @@
 #include "test/rbd_mirror/test_fixture.h"
 #include "tools/rbd_mirror/InstanceWatcher.h"
 #include "tools/rbd_mirror/Threads.h"
+#include "common/Cond.h"
 
 #include "test/librados/test_cxx.h"
 #include "gtest/gtest.h"
index 72a75b2f8c388f40d3d882d1d3c95a71feb70443..c4e8bd30437dda6d6b31b74663413c10ed5fc03e 100644 (file)
@@ -7,6 +7,7 @@
 #include "tools/rbd_mirror/InstanceWatcher.h"
 #include "tools/rbd_mirror/Instances.h"
 #include "tools/rbd_mirror/Threads.h"
+#include "common/Cond.h"
 
 #include "test/librados/test.h"
 #include "gtest/gtest.h"
index 86dc6d904b7b85122d5402e1375aba55d78556fa..8a5cd89078c181273b3116e2d068c473cc302a10 100644 (file)
@@ -9,6 +9,7 @@
 #include "test/rbd_mirror/test_fixture.h"
 #include "tools/rbd_mirror/LeaderWatcher.h"
 #include "tools/rbd_mirror/Threads.h"
+#include "common/Cond.h"
 
 #include "test/librados/test_cxx.h"
 #include "gtest/gtest.h"
index c1f990018e80f60d7954b952be6f9010653cfdcc..d9feefc5491bab69ceac1a189dc4282b45bceb07 100644 (file)
@@ -10,6 +10,7 @@
 #include "librbd/Utils.h"
 #include "InstanceReplayer.h"
 #include "ImageSyncThrottler.h"
+#include "common/Cond.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror