From: Sage Weil Date: Fri, 14 Sep 2018 22:19:26 +0000 (-0500) Subject: common/Finisher: convert to ceph::mutex etc X-Git-Tag: v14.0.1~203^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F24133%2Fhead;p=ceph.git common/Finisher: convert to ceph::mutex etc Signed-off-by: Sage Weil --- diff --git a/src/common/Finisher.cc b/src/common/Finisher.cc index 720477019dcec..277fe06f19763 100644 --- a/src/common/Finisher.cc +++ b/src/common/Finisher.cc @@ -20,7 +20,7 @@ void Finisher::stop() finisher_stop = true; // we don't have any new work to do, but we want the worker to wake up anyway // to process the stop condition. - finisher_cond.Signal(); + finisher_cond.notify_all(); finisher_lock.unlock(); finisher_thread.join(); // wait until the worker exits completely ldout(cct, 10) << __func__ << " finish" << dendl; @@ -28,20 +28,19 @@ void Finisher::stop() void Finisher::wait_for_empty() { - finisher_lock.lock(); + std::unique_lock ul(finisher_lock); while (!finisher_queue.empty() || finisher_running) { ldout(cct, 10) << "wait_for_empty waiting" << dendl; finisher_empty_wait = true; - finisher_empty_cond.Wait(finisher_lock); + finisher_empty_cond.wait(ul); } ldout(cct, 10) << "wait_for_empty empty" << dendl; finisher_empty_wait = false; - finisher_lock.unlock(); } void *Finisher::finisher_thread_entry() { - finisher_lock.lock(); + std::unique_lock ul(finisher_lock); ldout(cct, 10) << "finisher_thread start" << dendl; utime_t start; @@ -55,7 +54,7 @@ void *Finisher::finisher_thread_entry() vector> ls; ls.swap(finisher_queue); finisher_running = true; - finisher_lock.unlock(); + ul.unlock(); ldout(cct, 10) << "finisher_thread doing " << ls << dendl; if (logger) { @@ -74,25 +73,24 @@ void *Finisher::finisher_thread_entry() logger->tinc(l_finisher_complete_lat, ceph_clock_now() - start); } - finisher_lock.lock(); + ul.lock(); finisher_running = false; } ldout(cct, 10) << "finisher_thread empty" << dendl; if (unlikely(finisher_empty_wait)) - finisher_empty_cond.Signal(); + finisher_empty_cond.notify_all(); if (finisher_stop) break; ldout(cct, 10) << "finisher_thread sleeping" << dendl; - finisher_cond.Wait(finisher_lock); + finisher_cond.wait(ul); } // If we are exiting, we signal the thread waiting in stop(), // otherwise it would never unblock - finisher_empty_cond.Signal(); + finisher_empty_cond.notify_all(); ldout(cct, 10) << "finisher_thread stop" << dendl; finisher_stop = false; - finisher_lock.unlock(); return 0; } diff --git a/src/common/Finisher.h b/src/common/Finisher.h index 15127ef6f9e9b..1bcf82573d789 100644 --- a/src/common/Finisher.h +++ b/src/common/Finisher.h @@ -15,9 +15,11 @@ #ifndef CEPH_FINISHER_H #define CEPH_FINISHER_H -#include "common/Mutex.h" -#include "common/Cond.h" +#include "include/Context.h" +#include "common/Thread.h" +#include "common/ceph_mutex.h" #include "common/perf_counters.h" +#include "common/Cond.h" class CephContext; @@ -36,9 +38,9 @@ enum { */ class Finisher { CephContext *cct; - Mutex finisher_lock; ///< Protects access to queues and finisher_running. - Cond finisher_cond; ///< Signaled when there is something to process. - Cond finisher_empty_cond; ///< Signaled when the finisher has nothing more to process. + ceph::mutex finisher_lock; ///< Protects access to queues and finisher_running. + ceph::condition_variable finisher_cond; ///< Signaled when there is something to process. + ceph::condition_variable finisher_empty_cond; ///< Signaled when the finisher has nothing more to process. bool finisher_stop; ///< Set when the finisher should stop. bool finisher_running; ///< True when the finisher is currently executing contexts. bool finisher_empty_wait; ///< True mean someone wait finisher empty. @@ -63,53 +65,55 @@ class Finisher { public: /// Add a context to complete, optionally specifying a parameter for the complete function. void queue(Context *c, int r = 0) { - finisher_lock.lock(); + std::unique_lock ul(finisher_lock); if (finisher_queue.empty()) { - finisher_cond.Signal(); + finisher_cond.notify_all(); } finisher_queue.push_back(make_pair(c, r)); if (logger) logger->inc(l_finisher_queue_len); - finisher_lock.unlock(); } void queue(list& ls) { - finisher_lock.lock(); - if (finisher_queue.empty()) { - finisher_cond.Signal(); - } - for (auto i : ls) { - finisher_queue.push_back(make_pair(i, 0)); + { + std::unique_lock ul(finisher_lock); + if (finisher_queue.empty()) { + finisher_cond.notify_all(); + } + for (auto i : ls) { + finisher_queue.push_back(make_pair(i, 0)); + } + if (logger) + logger->inc(l_finisher_queue_len, ls.size()); } - if (logger) - logger->inc(l_finisher_queue_len, ls.size()); - finisher_lock.unlock(); ls.clear(); } void queue(deque& ls) { - finisher_lock.lock(); - if (finisher_queue.empty()) { - finisher_cond.Signal(); - } - for (auto i : ls) { - finisher_queue.push_back(make_pair(i, 0)); + { + std::unique_lock ul(finisher_lock); + if (finisher_queue.empty()) { + finisher_cond.notify_all(); + } + for (auto i : ls) { + finisher_queue.push_back(make_pair(i, 0)); + } + if (logger) + logger->inc(l_finisher_queue_len, ls.size()); } - if (logger) - logger->inc(l_finisher_queue_len, ls.size()); - finisher_lock.unlock(); ls.clear(); } void queue(vector& ls) { - finisher_lock.lock(); - if (finisher_queue.empty()) { - finisher_cond.Signal(); - } - for (auto i : ls) { - finisher_queue.push_back(make_pair(i, 0)); + { + std::unique_lock ul(finisher_lock); + if (finisher_queue.empty()) { + finisher_cond.notify_all(); + } + for (auto i : ls) { + finisher_queue.push_back(make_pair(i, 0)); + } + if (logger) + logger->inc(l_finisher_queue_len, ls.size()); } - if (logger) - logger->inc(l_finisher_queue_len, ls.size()); - finisher_lock.unlock(); ls.clear(); } @@ -132,14 +136,14 @@ class Finisher { /// Construct an anonymous Finisher. /// Anonymous finishers do not log their queue length. explicit Finisher(CephContext *cct_) : - cct(cct_), finisher_lock("Finisher::finisher_lock"), + cct(cct_), finisher_lock(ceph::make_mutex("Finisher::finisher_lock")), finisher_stop(false), finisher_running(false), finisher_empty_wait(false), thread_name("fn_anonymous"), logger(0), finisher_thread(this) {} /// Construct a named Finisher that logs its queue length. Finisher(CephContext *cct_, string name, string tn) : - cct(cct_), finisher_lock("Finisher::" + name), + cct(cct_), finisher_lock(ceph::make_mutex("Finisher::" + name)), finisher_stop(false), finisher_running(false), finisher_empty_wait(false), thread_name(tn), logger(0), finisher_thread(this) {