From: Sage Weil Date: Fri, 19 Oct 2018 18:31:07 +0000 (-0500) Subject: common/WorkQueue: Mutex -> ceph::mutex X-Git-Tag: v14.1.0~820^2~17 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b4cf8aaf07b072004f577190dec2bd4f95b17f16;p=ceph.git common/WorkQueue: Mutex -> ceph::mutex Lots of #include push-downs as a result. Signed-off-by: Sage Weil --- diff --git a/src/common/WorkQueue.cc b/src/common/WorkQueue.cc index 0c64af848fc2..ba7579002553 100644 --- a/src/common/WorkQueue.cc +++ b/src/common/WorkQueue.cc @@ -24,7 +24,7 @@ ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option) : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)), lockname(name + "::lock"), - _lock(lockname.c_str()), // this should be safe due to declaration order + _lock(ceph::make_mutex(lockname)), // this should be safe due to declaration order _stop(false), _pause(0), _draining(0), @@ -73,7 +73,7 @@ void ThreadPool::handle_conf_change(const ConfigProxy& conf, _lock.lock(); _num_threads = v; start_threads(); - _cond.SignalAll(); + _cond.notify_all(); _lock.unlock(); } } @@ -81,7 +81,7 @@ void ThreadPool::handle_conf_change(const ConfigProxy& conf, void ThreadPool::worker(WorkThread *wt) { - _lock.lock(); + std::unique_lock ul(_lock); ldout(cct,10) << "worker start" << dendl; std::stringstream ss; @@ -114,15 +114,15 @@ void ThreadPool::worker(WorkThread *wt) << " (" << processing << " active)" << dendl; TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval); tp_handle.reset_tp_timeout(); - _lock.unlock(); + ul.unlock(); wq->_void_process(item, tp_handle); - _lock.lock(); + ul.lock(); wq->_void_process_finish(item); processing--; ldout(cct,15) << "worker wq " << wq->name << " done processing " << item << " (" << processing << " active)" << dendl; if (_pause || _draining) - _wait_cond.Signal(); + _wait_cond.notify_all(); did = true; break; } @@ -136,20 +136,18 @@ void ThreadPool::worker(WorkThread *wt) hb, cct->_conf->threadpool_default_timeout, 0); - _cond.WaitInterval(_lock, - utime_t( - cct->_conf->threadpool_empty_queue_max_wait, 0)); + auto wait = std::chrono::seconds( + cct->_conf->threadpool_empty_queue_max_wait); + _cond.wait_for(ul, wait); } ldout(cct,1) << "worker finish" << dendl; cct->get_heartbeat_map()->remove_worker(hb); - - _lock.unlock(); } void ThreadPool::start_threads() { - ceph_assert(_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(_lock)); while (_threads.size() < _num_threads) { WorkThread *wt = new WorkThread(this); ldout(cct, 10) << "start_threads creating and starting " << wt << dendl; @@ -161,7 +159,7 @@ void ThreadPool::start_threads() void ThreadPool::join_old_threads() { - ceph_assert(_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(_lock)); while (!_old_threads.empty()) { ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl; _old_threads.front()->join(); @@ -196,7 +194,7 @@ void ThreadPool::stop(bool clear_after) _lock.lock(); _stop = true; - _cond.Signal(); + _cond.notify_all(); join_old_threads(); _lock.unlock(); for (set::iterator p = _threads.begin(); @@ -216,12 +214,12 @@ void ThreadPool::stop(bool clear_after) void ThreadPool::pause() { + std::unique_lock ul(_lock); ldout(cct,10) << "pause" << dendl; - _lock.lock(); _pause++; - while (processing) - _wait_cond.Wait(_lock); - _lock.unlock(); + while (processing) { + _wait_cond.wait(ul); + } ldout(cct,15) << "paused" << dendl; } @@ -239,19 +237,19 @@ void ThreadPool::unpause() _lock.lock(); ceph_assert(_pause > 0); _pause--; - _cond.Signal(); + _cond.notify_all(); _lock.unlock(); } void ThreadPool::drain(WorkQueue_* wq) { + std::unique_lock ul(_lock); ldout(cct,10) << "drain" << dendl; - _lock.lock(); _draining++; - while (processing || (wq != NULL && !wq->_empty())) - _wait_cond.Wait(_lock); + while (processing || (wq != NULL && !wq->_empty())) { + _wait_cond.wait(ul); + } _draining--; - _lock.unlock(); } ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn, @@ -260,7 +258,7 @@ ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn, name(std::move(nm)), thread_name(std::move(tn)), lockname(name + "::lock"), - shardedpool_lock(lockname.c_str()), + shardedpool_lock(ceph::make_mutex(lockname)), num_threads(pnum_threads), num_paused(0), num_drained(0), @@ -277,36 +275,34 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) while (!stop_threads) { if (pause_threads) { - shardedpool_lock.lock(); + std::unique_lock ul(shardedpool_lock); ++num_paused; - wait_cond.Signal(); + wait_cond.notify_all(); while (pause_threads) { cct->get_heartbeat_map()->reset_timeout( hb, wq->timeout_interval, wq->suicide_interval); - shardedpool_cond.WaitInterval(shardedpool_lock, - utime_t( - cct->_conf->threadpool_empty_queue_max_wait, 0)); + shardedpool_cond.wait_for( + ul, + std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait)); } --num_paused; - shardedpool_lock.unlock(); } if (drain_threads) { - shardedpool_lock.lock(); + std::unique_lock ul(shardedpool_lock); if (wq->is_shard_empty(thread_index)) { ++num_drained; - wait_cond.Signal(); + wait_cond.notify_all(); while (drain_threads) { cct->get_heartbeat_map()->reset_timeout( hb, wq->timeout_interval, wq->suicide_interval); - shardedpool_cond.WaitInterval(shardedpool_lock, - utime_t( - cct->_conf->threadpool_empty_queue_max_wait, 0)); + shardedpool_cond.wait_for( + ul, + std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait)); } --num_drained; } - shardedpool_lock.unlock(); } cct->get_heartbeat_map()->reset_timeout( @@ -324,7 +320,7 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) void ShardedThreadPool::start_threads() { - ceph_assert(shardedpool_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(shardedpool_lock)); int32_t thread_index = 0; while (threads_shardedpool.size() < num_threads) { @@ -364,15 +360,14 @@ void ShardedThreadPool::stop() void ShardedThreadPool::pause() { + std::unique_lock ul(shardedpool_lock); ldout(cct,10) << "pause" << dendl; - shardedpool_lock.lock(); pause_threads = true; ceph_assert(wq != NULL); wq->return_waiting_threads(); while (num_threads != num_paused){ - wait_cond.Wait(shardedpool_lock); + wait_cond.wait(ul); } - shardedpool_lock.unlock(); ldout(cct,10) << "paused" << dendl; } @@ -393,25 +388,24 @@ void ShardedThreadPool::unpause() shardedpool_lock.lock(); pause_threads = false; wq->stop_return_waiting_threads(); - shardedpool_cond.Signal(); + shardedpool_cond.notify_all(); shardedpool_lock.unlock(); ldout(cct,10) << "unpaused" << dendl; } void ShardedThreadPool::drain() { + std::unique_lock ul(shardedpool_lock); ldout(cct,10) << "drain" << dendl; - shardedpool_lock.lock(); drain_threads = true; ceph_assert(wq != NULL); wq->return_waiting_threads(); while (num_threads != num_drained) { - wait_cond.Wait(shardedpool_lock); + wait_cond.wait(ul); } drain_threads = false; wq->stop_return_waiting_threads(); - shardedpool_cond.Signal(); - shardedpool_lock.unlock(); + shardedpool_cond.notify_all(); ldout(cct,10) << "drained" << dendl; } diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index f1b902c73c50..f3424f5fa329 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -15,27 +15,33 @@ #ifndef CEPH_WORKQUEUE_H #define CEPH_WORKQUEUE_H -#include "Cond.h" +#include +#include +#include +#include +#include + +#include "common/ceph_mutex.h" #include "include/unordered_map.h" #include "common/config_obs.h" #include "common/HeartbeatMap.h" - -#include +#include "common/Thread.h" +#include "include/Context.h" class CephContext; /// Pool of threads that share work submitted to multiple work queues. class ThreadPool : public md_config_obs_t { CephContext *cct; - string name; - string thread_name; - string lockname; - Mutex _lock; - Cond _cond; + std::string name; + std::string thread_name; + std::string lockname; + ceph::mutex _lock; + ceph::condition_variable _cond; bool _stop; int _pause; int _draining; - Cond _wait_cond; + ceph::condition_variable _wait_cond; public: class TPHandle { @@ -58,9 +64,9 @@ private: /// Basic interface to a work queue used by the worker threads. struct WorkQueue_ { - string name; + std::string name; time_t timeout_interval, suicide_interval; - WorkQueue_(string n, time_t ti, time_t sti) + WorkQueue_(std::string n, time_t ti, time_t sti) : name(std::move(n)), timeout_interval(ti), suicide_interval(sti) { } virtual ~WorkQueue_() {} @@ -83,7 +89,7 @@ private: // track thread pool size changes unsigned _num_threads; - string _thread_num_option; + std::string _thread_num_option; const char **_conf_keys; const char **get_tracked_conf_keys() const override { @@ -102,12 +108,12 @@ public: virtual bool _enqueue(T *) = 0; virtual void _dequeue(T *) = 0; - virtual void _dequeue(list *) = 0; - virtual void _process_finish(const list &) {} + virtual void _dequeue(std::list *) = 0; + virtual void _process_finish(const std::list &) {} // virtual methods from WorkQueue_ below void *_void_dequeue() override { - list *out(new list); + std::list *out(new std::list); _dequeue(out); if (!out->empty()) { return (void *)out; @@ -117,18 +123,18 @@ public: } } void _void_process(void *p, TPHandle &handle) override { - _process(*((list*)p), handle); + _process(*((std::list*)p), handle); } void _void_process_finish(void *p) override { - _process_finish(*(list*)p); - delete (list *)p; + _process_finish(*(std::list*)p); + delete (std::list *)p; } protected: - virtual void _process(const list &items, TPHandle &handle) = 0; + virtual void _process(const std::list &items, TPHandle &handle) = 0; public: - BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) + BatchWorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_(std::move(n), ti, sti), pool(p) { pool->add_work_queue(this); } @@ -139,7 +145,7 @@ public: bool queue(T *item) { pool->_lock.lock(); bool r = _enqueue(item); - pool->_cond.SignalOne(); + pool->_cond.notify_one(); pool->_lock.unlock(); return r; } @@ -179,10 +185,10 @@ public: * construction and remove itself on destruction. */ template class WorkQueueVal : public WorkQueue_ { - Mutex _lock; + ceph::mutex _lock = ceph::make_mutex("WorkQueueVal::_lock"); ThreadPool *pool; - list to_process; - list to_finish; + std::list to_process; + std::list to_finish; virtual void _enqueue(T) = 0; virtual void _enqueue_front(T) = 0; bool _empty() override = 0; @@ -226,8 +232,8 @@ public: void _clear() override {} public: - WorkQueueVal(string n, time_t ti, time_t sti, ThreadPool *p) - : WorkQueue_(std::move(n), ti, sti), _lock("WorkQueueVal::lock"), pool(p) { + WorkQueueVal(std::string n, time_t ti, time_t sti, ThreadPool *p) + : WorkQueue_(std::move(n), ti, sti), pool(p) { pool->add_work_queue(this); } ~WorkQueueVal() override { @@ -236,12 +242,12 @@ public: void queue(T item) { std::lock_guard l(pool->_lock); _enqueue(item); - pool->_cond.SignalOne(); + pool->_cond.notify_one(); } void queue_front(T item) { std::lock_guard l(pool->_lock); _enqueue_front(item); - pool->_cond.SignalOne(); + pool->_cond.notify_one(); } void drain() { pool->drain(this); @@ -289,7 +295,7 @@ public: virtual void _process(T *t, TPHandle &) = 0; public: - WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) + WorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_(std::move(n), ti, sti), pool(p) { pool->add_work_queue(this); } @@ -300,7 +306,7 @@ public: bool queue(T *item) { pool->_lock.lock(); bool r = _enqueue(item); - pool->_cond.SignalOne(); + pool->_cond.notify_one(); pool->_lock.unlock(); return r; } @@ -359,29 +365,29 @@ public: void queue(T *item) { std::lock_guard l(m_pool->_lock); m_items.push_back(item); - m_pool->_cond.SignalOne(); + m_pool->_cond.notify_one(); } bool empty() { std::lock_guard l(m_pool->_lock); return _empty(); } protected: - PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p) + PointerWQ(std::string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) { } void register_work_queue() { m_pool->add_work_queue(this); } void _clear() override { - ceph_assert(m_pool->_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_pool->_lock)); m_items.clear(); } bool _empty() override { - ceph_assert(m_pool->_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_pool->_lock)); return m_items.empty(); } void *_void_dequeue() override { - ceph_assert(m_pool->_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_pool->_lock)); if (m_items.empty()) { return NULL; } @@ -395,7 +401,7 @@ public: process(reinterpret_cast(item)); } void _void_process_finish(void *item) override { - ceph_assert(m_pool->_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_pool->_lock)); ceph_assert(m_processing > 0); --m_processing; } @@ -407,7 +413,7 @@ public: } T *front() { - ceph_assert(m_pool->_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_pool->_lock)); if (m_items.empty()) { return NULL; } @@ -420,9 +426,9 @@ public: } void signal() { std::lock_guard pool_locker(m_pool->_lock); - m_pool->_cond.SignalOne(); + m_pool->_cond.notify_one(); } - Mutex &get_pool_lock() { + ceph::mutex &get_pool_lock() { return m_pool->_lock; } private: @@ -431,7 +437,7 @@ public: uint32_t m_processing; }; private: - vector work_queues; + std::vector work_queues; int next_work_queue = 0; @@ -446,8 +452,8 @@ private: } }; - set _threads; - list _old_threads; ///< need to be joined + std::set _threads; + std::list _old_threads; ///< need to be joined int processing; void start_threads(); @@ -455,7 +461,7 @@ private: void worker(WorkThread *wt); public: - ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option = NULL); + ThreadPool(CephContext *cct_, std::string nm, std::string tn, int n, const char *option = NULL); ~ThreadPool() override; /// return number of threads currently running @@ -491,21 +497,23 @@ public: } /// wait for a kick on this thread pool - void wait(Cond &c) { - c.Wait(_lock); + void wait(ceph::condition_variable &c) { + std::unique_lock l(_lock, std::adopt_lock); + c.wait(l); } /// wake up a waiter (with lock already held) void _wake() { - _cond.Signal(); + _cond.notify_all(); } /// wake up a waiter (without lock held) void wake() { std::lock_guard l(_lock); - _cond.Signal(); + _cond.notify_all(); } void _wait() { - _cond.Wait(_lock); + std::unique_lock l(_lock, std::adopt_lock); + _cond.wait(l); } /// start thread pool thread @@ -527,9 +535,9 @@ public: class GenContextWQ : public ThreadPool::WorkQueueVal*> { - list*> _queue; + std::list*> _queue; public: - GenContextWQ(const string &name, time_t ti, ThreadPool *tp) + GenContextWQ(const std::string &name, time_t ti, ThreadPool *tp) : ThreadPool::WorkQueueVal< GenContext*>(name, ti, ti*10, tp) {} @@ -569,9 +577,8 @@ public: /// @see Finisher class ContextWQ : public ThreadPool::PointerWQ { public: - ContextWQ(const string &name, time_t ti, ThreadPool *tp) - : ThreadPool::PointerWQ(name, ti, 0, tp), - m_lock("ContextWQ::m_lock") { + ContextWQ(const std::string &name, time_t ti, ThreadPool *tp) + : ThreadPool::PointerWQ(name, ti, 0, tp) { this->register_work_queue(); } @@ -604,19 +611,19 @@ protected: ctx->complete(result); } private: - Mutex m_lock; + ceph::mutex m_lock = ceph::make_mutex("ContextWQ::m_lock"); ceph::unordered_map m_context_results; }; class ShardedThreadPool { CephContext *cct; - string name; - string thread_name; - string lockname; - Mutex shardedpool_lock; - Cond shardedpool_cond; - Cond wait_cond; + std::string name; + std::string thread_name; + std::string lockname; + ceph::mutex shardedpool_lock; + ceph::condition_variable shardedpool_cond; + ceph::condition_variable wait_cond; uint32_t num_threads; std::atomic stop_threads = { false }; @@ -685,7 +692,7 @@ private: } }; - vector threads_shardedpool; + std::vector threads_shardedpool; void start_threads(); void shardedthreadpool_worker(uint32_t thread_index); void set_wq(BaseShardedWQ* swq) { @@ -696,7 +703,7 @@ private: public: - ShardedThreadPool(CephContext *cct_, string nm, string tn, uint32_t pnum_threads); + ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads); ~ShardedThreadPool(){}; diff --git a/src/librbd/ManagedLock.cc b/src/librbd/ManagedLock.cc index 1e4ced6095c1..6118cd09cfa6 100644 --- a/src/librbd/ManagedLock.cc +++ b/src/librbd/ManagedLock.cc @@ -14,6 +14,7 @@ #include "cls/lock/cls_lock_client.h" #include "common/dout.h" #include "common/errno.h" +#include "common/Cond.h" #include "common/WorkQueue.h" #include "librbd/Utils.h" diff --git a/src/librbd/MirroringWatcher.cc b/src/librbd/MirroringWatcher.cc index 757f56912ff8..f22dc149b6b9 100644 --- a/src/librbd/MirroringWatcher.cc +++ b/src/librbd/MirroringWatcher.cc @@ -5,6 +5,7 @@ #include "include/rbd_types.h" #include "include/rados/librados.hpp" #include "common/errno.h" +#include "common/Cond.h" #include "librbd/Utils.h" #include "librbd/watcher/Utils.h" diff --git a/src/librbd/api/Image.cc b/src/librbd/api/Image.cc index c2d887e48f9d..af2274d13c03 100644 --- a/src/librbd/api/Image.cc +++ b/src/librbd/api/Image.cc @@ -5,6 +5,7 @@ #include "include/rados/librados.hpp" #include "common/dout.h" #include "common/errno.h" +#include "common/Cond.h" #include "cls/rbd/cls_rbd_client.h" #include "librbd/DeepCopyRequest.h" #include "librbd/ExclusiveLock.h" diff --git a/src/librbd/api/Trash.cc b/src/librbd/api/Trash.cc index b6f4cafdcf34..c8c40a1eefeb 100644 --- a/src/librbd/api/Trash.cc +++ b/src/librbd/api/Trash.cc @@ -5,6 +5,7 @@ #include "include/rados/librados.hpp" #include "common/dout.h" #include "common/errno.h" +#include "common/Cond.h" #include "cls/rbd/cls_rbd_client.h" #include "librbd/ExclusiveLock.h" #include "librbd/ImageCtx.h" diff --git a/src/librbd/io/ImageRequestWQ.cc b/src/librbd/io/ImageRequestWQ.cc index ee07e10f740f..9aef36cc4009 100644 --- a/src/librbd/io/ImageRequestWQ.cc +++ b/src/librbd/io/ImageRequestWQ.cc @@ -4,6 +4,7 @@ #include "librbd/io/ImageRequestWQ.h" #include "common/errno.h" #include "common/zipkin_trace.h" +#include "common/Cond.h" #include "librbd/ExclusiveLock.h" #include "librbd/ImageCtx.h" #include "librbd/ImageState.h" @@ -738,7 +739,7 @@ void *ImageRequestWQ::_void_dequeue() { ceph_assert(peek_item == item); if (lock_required) { - this->get_pool_lock().Unlock(); + this->get_pool_lock().unlock(); m_image_ctx.owner_lock.get_read(); if (m_image_ctx.exclusive_lock != nullptr) { ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl; @@ -759,7 +760,7 @@ void *ImageRequestWQ::_void_dequeue() { lock_required = false; } m_image_ctx.owner_lock.put_read(); - this->get_pool_lock().Lock(); + this->get_pool_lock().lock(); if (lock_required) { return nullptr; @@ -772,9 +773,9 @@ void *ImageRequestWQ::_void_dequeue() { // stall IO until the refresh completes ++m_io_blockers; - this->get_pool_lock().Unlock(); + this->get_pool_lock().unlock(); m_image_ctx.state->refresh(new C_RefreshFinish(this, item)); - this->get_pool_lock().Lock(); + this->get_pool_lock().lock(); return nullptr; } diff --git a/src/osd/OSDMapMapping.h b/src/osd/OSDMapMapping.h index e2a52797458d..9ceef9ff0664 100644 --- a/src/osd/OSDMapMapping.h +++ b/src/osd/OSDMapMapping.h @@ -10,6 +10,7 @@ #include "osd/osd_types.h" #include "common/WorkQueue.h" +#include "common/Cond.h" class OSDMap; diff --git a/src/test/librbd/test_MirroringWatcher.cc b/src/test/librbd/test_MirroringWatcher.cc index 7b49b71a8d8b..6038b5540ad3 100644 --- a/src/test/librbd/test_MirroringWatcher.cc +++ b/src/test/librbd/test_MirroringWatcher.cc @@ -5,6 +5,7 @@ #include "test/librbd/test_support.h" #include "include/rbd_types.h" #include "librbd/MirroringWatcher.h" +#include "common/Cond.h" #include "gtest/gtest.h" #include "gmock/gmock.h" #include diff --git a/src/test/librbd/test_ObjectMap.cc b/src/test/librbd/test_ObjectMap.cc index f5e34c860b23..de23f76aab39 100644 --- a/src/test/librbd/test_ObjectMap.cc +++ b/src/test/librbd/test_ObjectMap.cc @@ -8,6 +8,7 @@ #include "librbd/ImageWatcher.h" #include "librbd/internal.h" #include "librbd/ObjectMap.h" +#include "common/Cond.h" #include "cls/rbd/cls_rbd_client.h" #include diff --git a/src/test/objectstore/TestObjectStoreState.h b/src/test/objectstore/TestObjectStoreState.h index 0d07bc2249b0..4383808891c4 100644 --- a/src/test/objectstore/TestObjectStoreState.h +++ b/src/test/objectstore/TestObjectStoreState.h @@ -13,13 +13,15 @@ #ifndef TEST_OBJECTSTORE_STATE_H_ #define TEST_OBJECTSTORE_STATE_H_ -#include "os/ObjectStore.h" #include #include #include #include #include +#include "os/ObjectStore.h" +#include "common/Cond.h" + typedef boost::mt11213b rngen_t; class TestObjectStoreState { diff --git a/src/test/rbd_mirror/test_InstanceWatcher.cc b/src/test/rbd_mirror/test_InstanceWatcher.cc index d495a9ea5c4e..92a8f94360b3 100644 --- a/src/test/rbd_mirror/test_InstanceWatcher.cc +++ b/src/test/rbd_mirror/test_InstanceWatcher.cc @@ -10,6 +10,7 @@ #include "test/rbd_mirror/test_fixture.h" #include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/Threads.h" +#include "common/Cond.h" #include "test/librados/test_cxx.h" #include "gtest/gtest.h" diff --git a/src/test/rbd_mirror/test_Instances.cc b/src/test/rbd_mirror/test_Instances.cc index 72a75b2f8c38..c4e8bd30437d 100644 --- a/src/test/rbd_mirror/test_Instances.cc +++ b/src/test/rbd_mirror/test_Instances.cc @@ -7,6 +7,7 @@ #include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/Instances.h" #include "tools/rbd_mirror/Threads.h" +#include "common/Cond.h" #include "test/librados/test.h" #include "gtest/gtest.h" diff --git a/src/test/rbd_mirror/test_LeaderWatcher.cc b/src/test/rbd_mirror/test_LeaderWatcher.cc index 86dc6d904b7b..8a5cd89078c1 100644 --- a/src/test/rbd_mirror/test_LeaderWatcher.cc +++ b/src/test/rbd_mirror/test_LeaderWatcher.cc @@ -9,6 +9,7 @@ #include "test/rbd_mirror/test_fixture.h" #include "tools/rbd_mirror/LeaderWatcher.h" #include "tools/rbd_mirror/Threads.h" +#include "common/Cond.h" #include "test/librados/test_cxx.h" #include "gtest/gtest.h" diff --git a/src/tools/rbd_mirror/InstanceWatcher.cc b/src/tools/rbd_mirror/InstanceWatcher.cc index c1f990018e80..d9feefc5491b 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.cc +++ b/src/tools/rbd_mirror/InstanceWatcher.cc @@ -10,6 +10,7 @@ #include "librbd/Utils.h" #include "InstanceReplayer.h" #include "ImageSyncThrottler.h" +#include "common/Cond.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror