--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+#pragma once
+
+#include <semaphore.h>
+#include <ctime>
+#include <cerrno>
+#include <exception>
+#include <chrono>
+
+namespace crimson {
+
+// an implementation of std::counting_semaphore<> in C++17 using the POSIX
+// semaphore.
+//
+// LeastMaxValue is ignored, as we don't have different backends optimized
+// for different LeastMaxValues
+template<unsigned LeastMaxValue = 64>
+class counting_semaphore {
+ using clock_t = std::chrono::system_clock;
+public:
+ explicit counting_semaphore(unsigned count) noexcept {
+ sem_init(&sem, 0, count);
+ }
+
+ counting_semaphore(const counting_semaphore&) = delete;
+ counting_semaphore& operator=(const counting_semaphore&) = delete;
+
+ ~counting_semaphore() {
+ sem_destroy(&sem);
+ }
+
+ void acquire() noexcept {
+ for (;;) {
+ int err = sem_wait(&sem);
+ if (err != 0) {
+ if (errno == EINTR) {
+ continue;
+ } else {
+ std::terminate();
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ void release(unsigned update = 1) {
+ for (; update != 0; --update) {
+ int err = sem_post(&sem);
+ if (err != 0) {
+ std::terminate();
+ }
+ }
+ }
+
+ template<typename Clock, typename Duration>
+ bool try_acquire_until(const std::chrono::time_point<Clock, Duration>& abs_time) noexcept {
+ auto s = std::chrono::time_point_cast<std::chrono::seconds>(abs_time);
+ auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(abs_time - s);
+ struct timespec ts = {
+ static_cast<std::time_t>(s.time_since_epoch().count()),
+ static_cast<long>(ns.count())
+ };
+ for (;;) {
+ if (int err = sem_timedwait(&sem, &ts); err) {
+ if (errno == EINTR) {
+ continue;
+ } else if (errno == ETIMEDOUT || errno == EINVAL) {
+ return false;
+ } else {
+ std::terminate();
+ }
+ } else {
+ break;
+ }
+ }
+ return true;
+ }
+
+ template<typename Rep, typename Period>
+ bool try_acquire_for(const std::chrono::duration<Rep, Period>& rel_time) {
+ return try_acquire_until(clock_t::now() + rel_time);
+ }
+
+private:
+ sem_t sem;
+};
+
+}
#include <seastar/core/semaphore.hh>
#include <seastar/core/sharded.hh>
+#if __cplusplus > 201703L
+#include <semaphore>
+namespace crimson {
+ using std::counting_semaphore;
+}
+#else
+#include "semaphore.h"
+#endif
+
namespace crimson::os {
struct WorkItem {
struct ShardedWorkQueue {
public:
WorkItem* pop_front(std::chrono::milliseconds& queue_max_wait) {
- WorkItem* work_item = nullptr;
- std::unique_lock lock{mutex};
- cond.wait_for(lock, queue_max_wait, [this, &work_item] {
- return pending.pop(work_item) || is_stopping();
- });
- return work_item;
+ if (sem.try_acquire_for(queue_max_wait)) {
+ if (!is_stopping()) {
+ WorkItem* work_item = nullptr;
+ [[maybe_unused]] bool popped = pending.pop(work_item);
+ assert(popped);
+ return work_item;
+ }
+ }
+ return nullptr;
}
void stop() {
- {
- std::unique_lock lock{mutex};
- stopping = true;
- }
- cond.notify_all();
+ stopping = true;
+ sem.release();
}
void push_back(WorkItem* work_item) {
[[maybe_unused]] bool pushed = pending.push(work_item);
assert(pushed);
- cond.notify_one();
+ sem.release();
}
private:
bool is_stopping() const {
return stopping;
}
- bool stopping = false;
- std::mutex mutex;
- std::condition_variable cond;
+ std::atomic<bool> stopping = false;
static constexpr unsigned QUEUE_SIZE = 128;
+ crimson::counting_semaphore<QUEUE_SIZE> sem{0};
boost::lockfree::queue<WorkItem*> pending{QUEUE_SIZE};
};