]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/alienstore: use semaphore to manage tasks in thread pool
authorKefu Chai <kchai@redhat.com>
Tue, 29 Jun 2021 12:06:49 +0000 (20:06 +0800)
committerKefu Chai <kchai@redhat.com>
Tue, 29 Jun 2021 16:01:12 +0000 (00:01 +0800)
* implement std::counting_semaphore in C++17
* use the homebrew counting_semaphore as the synchronization primitive
  to access the pending tasks, for better performance than the
  implementation using mutex and condition_variable.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/os/alienstore/semaphore.h [new file with mode: 0644]
src/crimson/os/alienstore/thread_pool.h

diff --git a/src/crimson/os/alienstore/semaphore.h b/src/crimson/os/alienstore/semaphore.h
new file mode 100644 (file)
index 0000000..8cba02a
--- /dev/null
@@ -0,0 +1,90 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+#pragma once
+
+#include <semaphore.h>
+#include <ctime>
+#include <cerrno>
+#include <exception>
+#include <chrono>
+
+namespace crimson {
+
+// an implementation of std::counting_semaphore<> in C++17 using the POSIX
+// semaphore.
+//
+// LeastMaxValue is ignored, as we don't have different backends optimized
+// for different LeastMaxValues
+template<unsigned LeastMaxValue = 64>
+class counting_semaphore {
+  using clock_t = std::chrono::system_clock;
+public:
+  explicit counting_semaphore(unsigned count) noexcept {
+    sem_init(&sem, 0, count);
+  }
+
+  counting_semaphore(const counting_semaphore&) = delete;
+  counting_semaphore& operator=(const counting_semaphore&) = delete;
+
+  ~counting_semaphore() {
+    sem_destroy(&sem);
+  }
+
+  void acquire() noexcept {
+    for (;;) {
+      int err = sem_wait(&sem);
+      if (err != 0) {
+        if (errno == EINTR) {
+          continue;
+        } else {
+          std::terminate();
+        }
+      } else {
+        break;
+      }
+    }
+  }
+
+  void release(unsigned update = 1) {
+    for (; update != 0; --update) {
+      int err = sem_post(&sem);
+      if (err != 0) {
+        std::terminate();
+      }
+    }
+  }
+
+  template<typename Clock, typename Duration>
+  bool try_acquire_until(const std::chrono::time_point<Clock, Duration>& abs_time) noexcept {
+    auto s = std::chrono::time_point_cast<std::chrono::seconds>(abs_time);
+    auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(abs_time - s);
+    struct timespec ts = {
+      static_cast<std::time_t>(s.time_since_epoch().count()),
+      static_cast<long>(ns.count())
+    };
+    for (;;) {
+      if (int err = sem_timedwait(&sem, &ts); err) {
+        if (errno == EINTR) {
+          continue;
+        } else if (errno == ETIMEDOUT || errno == EINVAL) {
+          return false;
+        } else {
+          std::terminate();
+        }
+      } else {
+        break;
+      }
+    }
+    return true;
+  }
+
+  template<typename Rep, typename Period>
+  bool try_acquire_for(const std::chrono::duration<Rep, Period>& rel_time) {
+    return try_acquire_until(clock_t::now() + rel_time);
+  }
+
+private:
+  sem_t sem;
+};
+
+}
index 3a99398527bbd8d6a3740d93a572de69e27f954b..8f3069af3a5a595291866beb76aaf3397da992b6 100644 (file)
 #include <seastar/core/semaphore.hh>
 #include <seastar/core/sharded.hh>
 
+#if __cplusplus > 201703L
+#include <semaphore>
+namespace crimson {
+  using std::counting_semaphore;
+}
+#else
+#include "semaphore.h"
+#endif
+
 namespace crimson::os {
 
 struct WorkItem {
@@ -75,33 +84,32 @@ struct SubmitQueue {
 struct ShardedWorkQueue {
 public:
   WorkItem* pop_front(std::chrono::milliseconds& queue_max_wait) {
-    WorkItem* work_item = nullptr;
-    std::unique_lock lock{mutex};
-    cond.wait_for(lock, queue_max_wait, [this, &work_item] {
-      return pending.pop(work_item) || is_stopping();
-    });
-    return work_item;
+    if (sem.try_acquire_for(queue_max_wait)) {
+      if (!is_stopping()) {
+        WorkItem* work_item = nullptr;
+        [[maybe_unused]] bool popped = pending.pop(work_item);
+        assert(popped);
+        return work_item;
+      }
+    }
+    return nullptr;
   }
   void stop() {
-    {
-      std::unique_lock lock{mutex};
-      stopping = true;
-    }
-    cond.notify_all();
+    stopping = true;
+    sem.release();
   }
   void push_back(WorkItem* work_item) {
     [[maybe_unused]] bool pushed = pending.push(work_item);
     assert(pushed);
-    cond.notify_one();
+    sem.release();
   }
 private:
   bool is_stopping() const {
     return stopping;
   }
-  bool stopping = false;
-  std::mutex mutex;
-  std::condition_variable cond;
+  std::atomic<bool> stopping = false;
   static constexpr unsigned QUEUE_SIZE = 128;
+  crimson::counting_semaphore<QUEUE_SIZE> sem{0};
   boost::lockfree::queue<WorkItem*> pending{QUEUE_SIZE};
 };