Lots of #include push-downs as a result.
Signed-off-by: Sage Weil <sage@redhat.com>
ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option)
: cct(cct_), name(std::move(nm)), thread_name(std::move(tn)),
lockname(name + "::lock"),
- _lock(lockname.c_str()), // this should be safe due to declaration order
+ _lock(ceph::make_mutex(lockname)), // this should be safe due to declaration order
_stop(false),
_pause(0),
_draining(0),
_lock.lock();
_num_threads = v;
start_threads();
- _cond.SignalAll();
+ _cond.notify_all();
_lock.unlock();
}
}
void ThreadPool::worker(WorkThread *wt)
{
- _lock.lock();
+ std::unique_lock ul(_lock);
ldout(cct,10) << "worker start" << dendl;
std::stringstream ss;
<< " (" << processing << " active)" << dendl;
TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
tp_handle.reset_tp_timeout();
- _lock.unlock();
+ ul.unlock();
wq->_void_process(item, tp_handle);
- _lock.lock();
+ ul.lock();
wq->_void_process_finish(item);
processing--;
ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
<< " (" << processing << " active)" << dendl;
if (_pause || _draining)
- _wait_cond.Signal();
+ _wait_cond.notify_all();
did = true;
break;
}
hb,
cct->_conf->threadpool_default_timeout,
0);
- _cond.WaitInterval(_lock,
- utime_t(
- cct->_conf->threadpool_empty_queue_max_wait, 0));
+ auto wait = std::chrono::seconds(
+ cct->_conf->threadpool_empty_queue_max_wait);
+ _cond.wait_for(ul, wait);
}
ldout(cct,1) << "worker finish" << dendl;
cct->get_heartbeat_map()->remove_worker(hb);
-
- _lock.unlock();
}
void ThreadPool::start_threads()
{
- ceph_assert(_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(_lock));
while (_threads.size() < _num_threads) {
WorkThread *wt = new WorkThread(this);
ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
void ThreadPool::join_old_threads()
{
- ceph_assert(_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(_lock));
while (!_old_threads.empty()) {
ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;
_old_threads.front()->join();
_lock.lock();
_stop = true;
- _cond.Signal();
+ _cond.notify_all();
join_old_threads();
_lock.unlock();
for (set<WorkThread*>::iterator p = _threads.begin();
void ThreadPool::pause()
{
+ std::unique_lock ul(_lock);
ldout(cct,10) << "pause" << dendl;
- _lock.lock();
_pause++;
- while (processing)
- _wait_cond.Wait(_lock);
- _lock.unlock();
+ while (processing) {
+ _wait_cond.wait(ul);
+ }
ldout(cct,15) << "paused" << dendl;
}
_lock.lock();
ceph_assert(_pause > 0);
_pause--;
- _cond.Signal();
+ _cond.notify_all();
_lock.unlock();
}
void ThreadPool::drain(WorkQueue_* wq)
{
+ std::unique_lock ul(_lock);
ldout(cct,10) << "drain" << dendl;
- _lock.lock();
_draining++;
- while (processing || (wq != NULL && !wq->_empty()))
- _wait_cond.Wait(_lock);
+ while (processing || (wq != NULL && !wq->_empty())) {
+ _wait_cond.wait(ul);
+ }
_draining--;
- _lock.unlock();
}
ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
name(std::move(nm)),
thread_name(std::move(tn)),
lockname(name + "::lock"),
- shardedpool_lock(lockname.c_str()),
+ shardedpool_lock(ceph::make_mutex(lockname)),
num_threads(pnum_threads),
num_paused(0),
num_drained(0),
while (!stop_threads) {
if (pause_threads) {
- shardedpool_lock.lock();
+ std::unique_lock ul(shardedpool_lock);
++num_paused;
- wait_cond.Signal();
+ wait_cond.notify_all();
while (pause_threads) {
cct->get_heartbeat_map()->reset_timeout(
hb,
wq->timeout_interval, wq->suicide_interval);
- shardedpool_cond.WaitInterval(shardedpool_lock,
- utime_t(
- cct->_conf->threadpool_empty_queue_max_wait, 0));
+ shardedpool_cond.wait_for(
+ ul,
+ std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
}
--num_paused;
- shardedpool_lock.unlock();
}
if (drain_threads) {
- shardedpool_lock.lock();
+ std::unique_lock ul(shardedpool_lock);
if (wq->is_shard_empty(thread_index)) {
++num_drained;
- wait_cond.Signal();
+ wait_cond.notify_all();
while (drain_threads) {
cct->get_heartbeat_map()->reset_timeout(
hb,
wq->timeout_interval, wq->suicide_interval);
- shardedpool_cond.WaitInterval(shardedpool_lock,
- utime_t(
- cct->_conf->threadpool_empty_queue_max_wait, 0));
+ shardedpool_cond.wait_for(
+ ul,
+ std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
}
--num_drained;
}
- shardedpool_lock.unlock();
}
cct->get_heartbeat_map()->reset_timeout(
void ShardedThreadPool::start_threads()
{
- ceph_assert(shardedpool_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(shardedpool_lock));
int32_t thread_index = 0;
while (threads_shardedpool.size() < num_threads) {
void ShardedThreadPool::pause()
{
+ std::unique_lock ul(shardedpool_lock);
ldout(cct,10) << "pause" << dendl;
- shardedpool_lock.lock();
pause_threads = true;
ceph_assert(wq != NULL);
wq->return_waiting_threads();
while (num_threads != num_paused){
- wait_cond.Wait(shardedpool_lock);
+ wait_cond.wait(ul);
}
- shardedpool_lock.unlock();
ldout(cct,10) << "paused" << dendl;
}
shardedpool_lock.lock();
pause_threads = false;
wq->stop_return_waiting_threads();
- shardedpool_cond.Signal();
+ shardedpool_cond.notify_all();
shardedpool_lock.unlock();
ldout(cct,10) << "unpaused" << dendl;
}
void ShardedThreadPool::drain()
{
+ std::unique_lock ul(shardedpool_lock);
ldout(cct,10) << "drain" << dendl;
- shardedpool_lock.lock();
drain_threads = true;
ceph_assert(wq != NULL);
wq->return_waiting_threads();
while (num_threads != num_drained) {
- wait_cond.Wait(shardedpool_lock);
+ wait_cond.wait(ul);
}
drain_threads = false;
wq->stop_return_waiting_threads();
- shardedpool_cond.Signal();
- shardedpool_lock.unlock();
+ shardedpool_cond.notify_all();
ldout(cct,10) << "drained" << dendl;
}
#ifndef CEPH_WORKQUEUE_H
#define CEPH_WORKQUEUE_H
-#include "Cond.h"
+#include <atomic>
+#include <list>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "common/ceph_mutex.h"
#include "include/unordered_map.h"
#include "common/config_obs.h"
#include "common/HeartbeatMap.h"
-
-#include <atomic>
+#include "common/Thread.h"
+#include "include/Context.h"
class CephContext;
/// Pool of threads that share work submitted to multiple work queues.
class ThreadPool : public md_config_obs_t {
CephContext *cct;
- string name;
- string thread_name;
- string lockname;
- Mutex _lock;
- Cond _cond;
+ std::string name;
+ std::string thread_name;
+ std::string lockname;
+ ceph::mutex _lock;
+ ceph::condition_variable _cond;
bool _stop;
int _pause;
int _draining;
- Cond _wait_cond;
+ ceph::condition_variable _wait_cond;
public:
class TPHandle {
/// Basic interface to a work queue used by the worker threads.
struct WorkQueue_ {
- string name;
+ std::string name;
time_t timeout_interval, suicide_interval;
- WorkQueue_(string n, time_t ti, time_t sti)
+ WorkQueue_(std::string n, time_t ti, time_t sti)
: name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
{ }
virtual ~WorkQueue_() {}
// track thread pool size changes
unsigned _num_threads;
- string _thread_num_option;
+ std::string _thread_num_option;
const char **_conf_keys;
const char **get_tracked_conf_keys() const override {
virtual bool _enqueue(T *) = 0;
virtual void _dequeue(T *) = 0;
- virtual void _dequeue(list<T*> *) = 0;
- virtual void _process_finish(const list<T*> &) {}
+ virtual void _dequeue(std::list<T*> *) = 0;
+ virtual void _process_finish(const std::list<T*> &) {}
// virtual methods from WorkQueue_ below
void *_void_dequeue() override {
- list<T*> *out(new list<T*>);
+ std::list<T*> *out(new std::list<T*>);
_dequeue(out);
if (!out->empty()) {
return (void *)out;
}
}
void _void_process(void *p, TPHandle &handle) override {
- _process(*((list<T*>*)p), handle);
+ _process(*((std::list<T*>*)p), handle);
}
void _void_process_finish(void *p) override {
- _process_finish(*(list<T*>*)p);
- delete (list<T*> *)p;
+ _process_finish(*(std::list<T*>*)p);
+ delete (std::list<T*> *)p;
}
protected:
- virtual void _process(const list<T*> &items, TPHandle &handle) = 0;
+ virtual void _process(const std::list<T*> &items, TPHandle &handle) = 0;
public:
- BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
+ BatchWorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
: WorkQueue_(std::move(n), ti, sti), pool(p) {
pool->add_work_queue(this);
}
bool queue(T *item) {
pool->_lock.lock();
bool r = _enqueue(item);
- pool->_cond.SignalOne();
+ pool->_cond.notify_one();
pool->_lock.unlock();
return r;
}
* construction and remove itself on destruction. */
template<typename T, typename U = T>
class WorkQueueVal : public WorkQueue_ {
- Mutex _lock;
+ ceph::mutex _lock = ceph::make_mutex("WorkQueueVal::_lock");
ThreadPool *pool;
- list<U> to_process;
- list<U> to_finish;
+ std::list<U> to_process;
+ std::list<U> to_finish;
virtual void _enqueue(T) = 0;
virtual void _enqueue_front(T) = 0;
bool _empty() override = 0;
void _clear() override {}
public:
- WorkQueueVal(string n, time_t ti, time_t sti, ThreadPool *p)
- : WorkQueue_(std::move(n), ti, sti), _lock("WorkQueueVal::lock"), pool(p) {
+ WorkQueueVal(std::string n, time_t ti, time_t sti, ThreadPool *p)
+ : WorkQueue_(std::move(n), ti, sti), pool(p) {
pool->add_work_queue(this);
}
~WorkQueueVal() override {
void queue(T item) {
std::lock_guard l(pool->_lock);
_enqueue(item);
- pool->_cond.SignalOne();
+ pool->_cond.notify_one();
}
void queue_front(T item) {
std::lock_guard l(pool->_lock);
_enqueue_front(item);
- pool->_cond.SignalOne();
+ pool->_cond.notify_one();
}
void drain() {
pool->drain(this);
virtual void _process(T *t, TPHandle &) = 0;
public:
- WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
+ WorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
: WorkQueue_(std::move(n), ti, sti), pool(p) {
pool->add_work_queue(this);
}
bool queue(T *item) {
pool->_lock.lock();
bool r = _enqueue(item);
- pool->_cond.SignalOne();
+ pool->_cond.notify_one();
pool->_lock.unlock();
return r;
}
void queue(T *item) {
std::lock_guard l(m_pool->_lock);
m_items.push_back(item);
- m_pool->_cond.SignalOne();
+ m_pool->_cond.notify_one();
}
bool empty() {
std::lock_guard l(m_pool->_lock);
return _empty();
}
protected:
- PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
+ PointerWQ(std::string n, time_t ti, time_t sti, ThreadPool* p)
: WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
}
void register_work_queue() {
m_pool->add_work_queue(this);
}
void _clear() override {
- ceph_assert(m_pool->_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
m_items.clear();
}
bool _empty() override {
- ceph_assert(m_pool->_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
return m_items.empty();
}
void *_void_dequeue() override {
- ceph_assert(m_pool->_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
if (m_items.empty()) {
return NULL;
}
process(reinterpret_cast<T *>(item));
}
void _void_process_finish(void *item) override {
- ceph_assert(m_pool->_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
ceph_assert(m_processing > 0);
--m_processing;
}
}
T *front() {
- ceph_assert(m_pool->_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
if (m_items.empty()) {
return NULL;
}
}
void signal() {
std::lock_guard pool_locker(m_pool->_lock);
- m_pool->_cond.SignalOne();
+ m_pool->_cond.notify_one();
}
- Mutex &get_pool_lock() {
+ ceph::mutex &get_pool_lock() {
return m_pool->_lock;
}
private:
uint32_t m_processing;
};
private:
- vector<WorkQueue_*> work_queues;
+ std::vector<WorkQueue_*> work_queues;
int next_work_queue = 0;
}
};
- set<WorkThread*> _threads;
- list<WorkThread*> _old_threads; ///< need to be joined
+ std::set<WorkThread*> _threads;
+ std::list<WorkThread*> _old_threads; ///< need to be joined
int processing;
void start_threads();
void worker(WorkThread *wt);
public:
- ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option = NULL);
+ ThreadPool(CephContext *cct_, std::string nm, std::string tn, int n, const char *option = NULL);
~ThreadPool() override;
/// return number of threads currently running
}
/// wait for a kick on this thread pool
- void wait(Cond &c) {
- c.Wait(_lock);
+ void wait(ceph::condition_variable &c) {
+ std::unique_lock l(_lock, std::adopt_lock);
+ c.wait(l);
}
/// wake up a waiter (with lock already held)
void _wake() {
- _cond.Signal();
+ _cond.notify_all();
}
/// wake up a waiter (without lock held)
void wake() {
std::lock_guard l(_lock);
- _cond.Signal();
+ _cond.notify_all();
}
void _wait() {
- _cond.Wait(_lock);
+ std::unique_lock l(_lock, std::adopt_lock);
+ _cond.wait(l);
}
/// start thread pool thread
class GenContextWQ :
public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
- list<GenContext<ThreadPool::TPHandle&>*> _queue;
+ std::list<GenContext<ThreadPool::TPHandle&>*> _queue;
public:
- GenContextWQ(const string &name, time_t ti, ThreadPool *tp)
+ GenContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
: ThreadPool::WorkQueueVal<
GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
/// @see Finisher
class ContextWQ : public ThreadPool::PointerWQ<Context> {
public:
- ContextWQ(const string &name, time_t ti, ThreadPool *tp)
- : ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
- m_lock("ContextWQ::m_lock") {
+ ContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
+ : ThreadPool::PointerWQ<Context>(name, ti, 0, tp) {
this->register_work_queue();
}
ctx->complete(result);
}
private:
- Mutex m_lock;
+ ceph::mutex m_lock = ceph::make_mutex("ContextWQ::m_lock");
ceph::unordered_map<Context*, int> m_context_results;
};
class ShardedThreadPool {
CephContext *cct;
- string name;
- string thread_name;
- string lockname;
- Mutex shardedpool_lock;
- Cond shardedpool_cond;
- Cond wait_cond;
+ std::string name;
+ std::string thread_name;
+ std::string lockname;
+ ceph::mutex shardedpool_lock;
+ ceph::condition_variable shardedpool_cond;
+ ceph::condition_variable wait_cond;
uint32_t num_threads;
std::atomic<bool> stop_threads = { false };
}
};
- vector<WorkThreadSharded*> threads_shardedpool;
+ std::vector<WorkThreadSharded*> threads_shardedpool;
void start_threads();
void shardedthreadpool_worker(uint32_t thread_index);
void set_wq(BaseShardedWQ* swq) {
public:
- ShardedThreadPool(CephContext *cct_, string nm, string tn, uint32_t pnum_threads);
+ ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads);
~ShardedThreadPool(){};
#include "cls/lock/cls_lock_client.h"
#include "common/dout.h"
#include "common/errno.h"
+#include "common/Cond.h"
#include "common/WorkQueue.h"
#include "librbd/Utils.h"
#include "include/rbd_types.h"
#include "include/rados/librados.hpp"
#include "common/errno.h"
+#include "common/Cond.h"
#include "librbd/Utils.h"
#include "librbd/watcher/Utils.h"
#include "include/rados/librados.hpp"
#include "common/dout.h"
#include "common/errno.h"
+#include "common/Cond.h"
#include "cls/rbd/cls_rbd_client.h"
#include "librbd/DeepCopyRequest.h"
#include "librbd/ExclusiveLock.h"
#include "include/rados/librados.hpp"
#include "common/dout.h"
#include "common/errno.h"
+#include "common/Cond.h"
#include "cls/rbd/cls_rbd_client.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/io/ImageRequestWQ.h"
#include "common/errno.h"
#include "common/zipkin_trace.h"
+#include "common/Cond.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
ceph_assert(peek_item == item);
if (lock_required) {
- this->get_pool_lock().Unlock();
+ this->get_pool_lock().unlock();
m_image_ctx.owner_lock.get_read();
if (m_image_ctx.exclusive_lock != nullptr) {
ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl;
lock_required = false;
}
m_image_ctx.owner_lock.put_read();
- this->get_pool_lock().Lock();
+ this->get_pool_lock().lock();
if (lock_required) {
return nullptr;
// stall IO until the refresh completes
++m_io_blockers;
- this->get_pool_lock().Unlock();
+ this->get_pool_lock().unlock();
m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
- this->get_pool_lock().Lock();
+ this->get_pool_lock().lock();
return nullptr;
}
#include "osd/osd_types.h"
#include "common/WorkQueue.h"
+#include "common/Cond.h"
class OSDMap;
#include "test/librbd/test_support.h"
#include "include/rbd_types.h"
#include "librbd/MirroringWatcher.h"
+#include "common/Cond.h"
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include <list>
#include "librbd/ImageWatcher.h"
#include "librbd/internal.h"
#include "librbd/ObjectMap.h"
+#include "common/Cond.h"
#include "cls/rbd/cls_rbd_client.h"
#include <list>
#ifndef TEST_OBJECTSTORE_STATE_H_
#define TEST_OBJECTSTORE_STATE_H_
-#include "os/ObjectStore.h"
#include <boost/scoped_ptr.hpp>
#include <boost/random/mersenne_twister.hpp>
#include <boost/random/uniform_int.hpp>
#include <map>
#include <vector>
+#include "os/ObjectStore.h"
+#include "common/Cond.h"
+
typedef boost::mt11213b rngen_t;
class TestObjectStoreState {
#include "test/rbd_mirror/test_fixture.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
+#include "common/Cond.h"
#include "test/librados/test_cxx.h"
#include "gtest/gtest.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Instances.h"
#include "tools/rbd_mirror/Threads.h"
+#include "common/Cond.h"
#include "test/librados/test.h"
#include "gtest/gtest.h"
#include "test/rbd_mirror/test_fixture.h"
#include "tools/rbd_mirror/LeaderWatcher.h"
#include "tools/rbd_mirror/Threads.h"
+#include "common/Cond.h"
#include "test/librados/test_cxx.h"
#include "gtest/gtest.h"
#include "librbd/Utils.h"
#include "InstanceReplayer.h"
#include "ImageSyncThrottler.h"
+#include "common/Cond.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror