]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/Finisher: convert to ceph::mutex etc 24133/head
authorSage Weil <sage@redhat.com>
Fri, 14 Sep 2018 22:19:26 +0000 (17:19 -0500)
committerSage Weil <sage@redhat.com>
Fri, 21 Sep 2018 16:55:58 +0000 (11:55 -0500)
Signed-off-by: Sage Weil <sage@redhat.com>
src/common/Finisher.cc
src/common/Finisher.h

index 720477019dcec7b0f40d369340b6696eae9bf1ec..277fe06f19763ed214c3b1eab7c4a597880149a9 100644 (file)
@@ -20,7 +20,7 @@ void Finisher::stop()
   finisher_stop = true;
   // we don't have any new work to do, but we want the worker to wake up anyway
   // to process the stop condition.
-  finisher_cond.Signal();
+  finisher_cond.notify_all();
   finisher_lock.unlock();
   finisher_thread.join(); // wait until the worker exits completely
   ldout(cct, 10) << __func__ << " finish" << dendl;
@@ -28,20 +28,19 @@ void Finisher::stop()
 
 void Finisher::wait_for_empty()
 {
-  finisher_lock.lock();
+  std::unique_lock ul(finisher_lock);
   while (!finisher_queue.empty() || finisher_running) {
     ldout(cct, 10) << "wait_for_empty waiting" << dendl;
     finisher_empty_wait = true;
-    finisher_empty_cond.Wait(finisher_lock);
+    finisher_empty_cond.wait(ul);
   }
   ldout(cct, 10) << "wait_for_empty empty" << dendl;
   finisher_empty_wait = false;
-  finisher_lock.unlock();
 }
 
 void *Finisher::finisher_thread_entry()
 {
-  finisher_lock.lock();
+  std::unique_lock ul(finisher_lock);
   ldout(cct, 10) << "finisher_thread start" << dendl;
 
   utime_t start;
@@ -55,7 +54,7 @@ void *Finisher::finisher_thread_entry()
       vector<pair<Context*,int>> ls;
       ls.swap(finisher_queue);
       finisher_running = true;
-      finisher_lock.unlock();
+      ul.unlock();
       ldout(cct, 10) << "finisher_thread doing " << ls << dendl;
 
       if (logger) {
@@ -74,25 +73,24 @@ void *Finisher::finisher_thread_entry()
        logger->tinc(l_finisher_complete_lat, ceph_clock_now() - start);
       }
 
-      finisher_lock.lock();
+      ul.lock();
       finisher_running = false;
     }
     ldout(cct, 10) << "finisher_thread empty" << dendl;
     if (unlikely(finisher_empty_wait))
-      finisher_empty_cond.Signal();
+      finisher_empty_cond.notify_all();
     if (finisher_stop)
       break;
     
     ldout(cct, 10) << "finisher_thread sleeping" << dendl;
-    finisher_cond.Wait(finisher_lock);
+    finisher_cond.wait(ul);
   }
   // If we are exiting, we signal the thread waiting in stop(),
   // otherwise it would never unblock
-  finisher_empty_cond.Signal();
+  finisher_empty_cond.notify_all();
 
   ldout(cct, 10) << "finisher_thread stop" << dendl;
   finisher_stop = false;
-  finisher_lock.unlock();
   return 0;
 }
 
index 15127ef6f9e9b6d2228b248e304588e48cd2602d..1bcf82573d789fdd1a286d38df3b13a4b832f416 100644 (file)
 #ifndef CEPH_FINISHER_H
 #define CEPH_FINISHER_H
 
-#include "common/Mutex.h"
-#include "common/Cond.h"
+#include "include/Context.h"
+#include "common/Thread.h"
+#include "common/ceph_mutex.h"
 #include "common/perf_counters.h"
+#include "common/Cond.h"
 
 class CephContext;
 
@@ -36,9 +38,9 @@ enum {
  */
 class Finisher {
   CephContext *cct;
-  Mutex        finisher_lock; ///< Protects access to queues and finisher_running.
-  Cond         finisher_cond; ///< Signaled when there is something to process.
-  Cond         finisher_empty_cond; ///< Signaled when the finisher has nothing more to process.
+  ceph::mutex finisher_lock; ///< Protects access to queues and finisher_running.
+  ceph::condition_variable finisher_cond; ///< Signaled when there is something to process.
+  ceph::condition_variable finisher_empty_cond; ///< Signaled when the finisher has nothing more to process.
   bool         finisher_stop; ///< Set when the finisher should stop.
   bool         finisher_running; ///< True when the finisher is currently executing contexts.
   bool        finisher_empty_wait; ///< True mean someone wait finisher empty.
@@ -63,53 +65,55 @@ class Finisher {
  public:
   /// Add a context to complete, optionally specifying a parameter for the complete function.
   void queue(Context *c, int r = 0) {
-    finisher_lock.lock();
+    std::unique_lock ul(finisher_lock);
     if (finisher_queue.empty()) {
-      finisher_cond.Signal();
+      finisher_cond.notify_all();
     }
     finisher_queue.push_back(make_pair(c, r));
     if (logger)
       logger->inc(l_finisher_queue_len);
-    finisher_lock.unlock();
   }
 
   void queue(list<Context*>& ls) {
-    finisher_lock.lock();
-    if (finisher_queue.empty()) {
-      finisher_cond.Signal();
-    }
-    for (auto i : ls) {
-      finisher_queue.push_back(make_pair(i, 0));
+    {
+      std::unique_lock ul(finisher_lock);
+      if (finisher_queue.empty()) {
+       finisher_cond.notify_all();
+      }
+      for (auto i : ls) {
+       finisher_queue.push_back(make_pair(i, 0));
+      }
+      if (logger)
+       logger->inc(l_finisher_queue_len, ls.size());
     }
-    if (logger)
-      logger->inc(l_finisher_queue_len, ls.size());
-    finisher_lock.unlock();
     ls.clear();
   }
   void queue(deque<Context*>& ls) {
-    finisher_lock.lock();
-    if (finisher_queue.empty()) {
-      finisher_cond.Signal();
-    }
-    for (auto i : ls) {
-      finisher_queue.push_back(make_pair(i, 0));
+    {
+      std::unique_lock ul(finisher_lock);
+      if (finisher_queue.empty()) {
+       finisher_cond.notify_all();
+      }
+      for (auto i : ls) {
+       finisher_queue.push_back(make_pair(i, 0));
+      }
+      if (logger)
+       logger->inc(l_finisher_queue_len, ls.size());
     }
-    if (logger)
-      logger->inc(l_finisher_queue_len, ls.size());
-    finisher_lock.unlock();
     ls.clear();
   }
   void queue(vector<Context*>& ls) {
-    finisher_lock.lock();
-    if (finisher_queue.empty()) {
-      finisher_cond.Signal();
-    }
-    for (auto i : ls) {
-      finisher_queue.push_back(make_pair(i, 0));
+    {
+      std::unique_lock ul(finisher_lock);
+      if (finisher_queue.empty()) {
+       finisher_cond.notify_all();
+      }
+      for (auto i : ls) {
+       finisher_queue.push_back(make_pair(i, 0));
+      }
+      if (logger)
+       logger->inc(l_finisher_queue_len, ls.size());
     }
-    if (logger)
-      logger->inc(l_finisher_queue_len, ls.size());
-    finisher_lock.unlock();
     ls.clear();
   }
 
@@ -132,14 +136,14 @@ class Finisher {
   /// Construct an anonymous Finisher.
   /// Anonymous finishers do not log their queue length.
   explicit Finisher(CephContext *cct_) :
-    cct(cct_), finisher_lock("Finisher::finisher_lock"),
+    cct(cct_), finisher_lock(ceph::make_mutex("Finisher::finisher_lock")),
     finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
     thread_name("fn_anonymous"), logger(0),
     finisher_thread(this) {}
 
   /// Construct a named Finisher that logs its queue length.
   Finisher(CephContext *cct_, string name, string tn) :
-    cct(cct_), finisher_lock("Finisher::" + name),
+    cct(cct_), finisher_lock(ceph::make_mutex("Finisher::" + name)),
     finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
     thread_name(tn), logger(0),
     finisher_thread(this) {