finisher_stop = true;
// we don't have any new work to do, but we want the worker to wake up anyway
// to process the stop condition.
- finisher_cond.Signal();
+ finisher_cond.notify_all();
finisher_lock.unlock();
finisher_thread.join(); // wait until the worker exits completely
ldout(cct, 10) << __func__ << " finish" << dendl;
void Finisher::wait_for_empty()
{
- finisher_lock.lock();
+ std::unique_lock ul(finisher_lock);
while (!finisher_queue.empty() || finisher_running) {
ldout(cct, 10) << "wait_for_empty waiting" << dendl;
finisher_empty_wait = true;
- finisher_empty_cond.Wait(finisher_lock);
+ finisher_empty_cond.wait(ul);
}
ldout(cct, 10) << "wait_for_empty empty" << dendl;
finisher_empty_wait = false;
- finisher_lock.unlock();
}
void *Finisher::finisher_thread_entry()
{
- finisher_lock.lock();
+ std::unique_lock ul(finisher_lock);
ldout(cct, 10) << "finisher_thread start" << dendl;
utime_t start;
vector<pair<Context*,int>> ls;
ls.swap(finisher_queue);
finisher_running = true;
- finisher_lock.unlock();
+ ul.unlock();
ldout(cct, 10) << "finisher_thread doing " << ls << dendl;
if (logger) {
logger->tinc(l_finisher_complete_lat, ceph_clock_now() - start);
}
- finisher_lock.lock();
+ ul.lock();
finisher_running = false;
}
ldout(cct, 10) << "finisher_thread empty" << dendl;
if (unlikely(finisher_empty_wait))
- finisher_empty_cond.Signal();
+ finisher_empty_cond.notify_all();
if (finisher_stop)
break;
ldout(cct, 10) << "finisher_thread sleeping" << dendl;
- finisher_cond.Wait(finisher_lock);
+ finisher_cond.wait(ul);
}
// If we are exiting, we signal the thread waiting in stop(),
// otherwise it would never unblock
- finisher_empty_cond.Signal();
+ finisher_empty_cond.notify_all();
ldout(cct, 10) << "finisher_thread stop" << dendl;
finisher_stop = false;
- finisher_lock.unlock();
return 0;
}
#ifndef CEPH_FINISHER_H
#define CEPH_FINISHER_H
-#include "common/Mutex.h"
-#include "common/Cond.h"
+#include "include/Context.h"
+#include "common/Thread.h"
+#include "common/ceph_mutex.h"
#include "common/perf_counters.h"
+#include "common/Cond.h"
class CephContext;
*/
class Finisher {
CephContext *cct;
- Mutex finisher_lock; ///< Protects access to queues and finisher_running.
- Cond finisher_cond; ///< Signaled when there is something to process.
- Cond finisher_empty_cond; ///< Signaled when the finisher has nothing more to process.
+ ceph::mutex finisher_lock; ///< Protects access to queues and finisher_running.
+ ceph::condition_variable finisher_cond; ///< Signaled when there is something to process.
+ ceph::condition_variable finisher_empty_cond; ///< Signaled when the finisher has nothing more to process.
bool finisher_stop; ///< Set when the finisher should stop.
bool finisher_running; ///< True when the finisher is currently executing contexts.
bool finisher_empty_wait; ///< True mean someone wait finisher empty.
public:
/// Add a context to complete, optionally specifying a parameter for the complete function.
void queue(Context *c, int r = 0) {
- finisher_lock.lock();
+ std::unique_lock ul(finisher_lock);
if (finisher_queue.empty()) {
- finisher_cond.Signal();
+ finisher_cond.notify_all();
}
finisher_queue.push_back(make_pair(c, r));
if (logger)
logger->inc(l_finisher_queue_len);
- finisher_lock.unlock();
}
void queue(list<Context*>& ls) {
- finisher_lock.lock();
- if (finisher_queue.empty()) {
- finisher_cond.Signal();
- }
- for (auto i : ls) {
- finisher_queue.push_back(make_pair(i, 0));
+ {
+ std::unique_lock ul(finisher_lock);
+ if (finisher_queue.empty()) {
+ finisher_cond.notify_all();
+ }
+ for (auto i : ls) {
+ finisher_queue.push_back(make_pair(i, 0));
+ }
+ if (logger)
+ logger->inc(l_finisher_queue_len, ls.size());
}
- if (logger)
- logger->inc(l_finisher_queue_len, ls.size());
- finisher_lock.unlock();
ls.clear();
}
void queue(deque<Context*>& ls) {
- finisher_lock.lock();
- if (finisher_queue.empty()) {
- finisher_cond.Signal();
- }
- for (auto i : ls) {
- finisher_queue.push_back(make_pair(i, 0));
+ {
+ std::unique_lock ul(finisher_lock);
+ if (finisher_queue.empty()) {
+ finisher_cond.notify_all();
+ }
+ for (auto i : ls) {
+ finisher_queue.push_back(make_pair(i, 0));
+ }
+ if (logger)
+ logger->inc(l_finisher_queue_len, ls.size());
}
- if (logger)
- logger->inc(l_finisher_queue_len, ls.size());
- finisher_lock.unlock();
ls.clear();
}
void queue(vector<Context*>& ls) {
- finisher_lock.lock();
- if (finisher_queue.empty()) {
- finisher_cond.Signal();
- }
- for (auto i : ls) {
- finisher_queue.push_back(make_pair(i, 0));
+ {
+ std::unique_lock ul(finisher_lock);
+ if (finisher_queue.empty()) {
+ finisher_cond.notify_all();
+ }
+ for (auto i : ls) {
+ finisher_queue.push_back(make_pair(i, 0));
+ }
+ if (logger)
+ logger->inc(l_finisher_queue_len, ls.size());
}
- if (logger)
- logger->inc(l_finisher_queue_len, ls.size());
- finisher_lock.unlock();
ls.clear();
}
/// Construct an anonymous Finisher.
/// Anonymous finishers do not log their queue length.
explicit Finisher(CephContext *cct_) :
- cct(cct_), finisher_lock("Finisher::finisher_lock"),
+ cct(cct_), finisher_lock(ceph::make_mutex("Finisher::finisher_lock")),
finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
thread_name("fn_anonymous"), logger(0),
finisher_thread(this) {}
/// Construct a named Finisher that logs its queue length.
Finisher(CephContext *cct_, string name, string tn) :
- cct(cct_), finisher_lock("Finisher::" + name),
+ cct(cct_), finisher_lock(ceph::make_mutex("Finisher::" + name)),
finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
thread_name(tn), logger(0),
finisher_thread(this) {