]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
QueueRing: a reduced contention queue
authorYehuda Sadeh <yehuda@inktank.com>
Sun, 2 Mar 2014 03:14:12 +0000 (19:14 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 29 Jan 2015 18:09:12 +0000 (10:09 -0800)
A queue that provides multi-readers, multi-writers concurrency with a
reduced lock contention.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/common/Makefile.am
src/common/QueueRing.h [new file with mode: 0644]

index 28881945357c18cd3f87f385ee6cef564d5f91f5..4e033c1dc5c7f3e3bbb19db83641f9b23b52e410 100644 (file)
@@ -180,6 +180,7 @@ noinst_HEADERS += \
        common/map_cacher.hpp \
        common/MemoryModel.h \
        common/Mutex.h \
+       common/QueueRing.h \
        common/PrebufferedStreambuf.h \
        common/RWLock.h \
        common/Semaphore.h \
diff --git a/src/common/QueueRing.h b/src/common/QueueRing.h
new file mode 100644 (file)
index 0000000..830f80f
--- /dev/null
@@ -0,0 +1,61 @@
+#ifndef QUEUE_RING_H
+#define QUEUE_RING_H
+
+#include <list>
+#include <vector>
+#include "common/Mutex.h"
+#include "common/Cond.h"
+
+
+
+template <class T>
+class QueueRing {
+  struct QueueBucket {
+    Mutex lock;
+    Cond cond;
+    typename std::list<T> entries;
+
+    QueueBucket() : lock("QueueRing::QueueBucket::lock") {}
+    QueueBucket(const QueueBucket& rhs) : lock("QueueRing::QueueBucket::lock") {
+      entries = rhs.entries;
+    }
+
+    void enqueue(const T& entry) {
+      lock.Lock();
+      if (entries.empty()) {
+        cond.Signal();
+      }
+      entries.push_back(entry);
+      lock.Unlock();
+    }
+
+    void dequeue(T *entry) {
+      lock.Lock();
+      if (entries.empty()) {
+        cond.Wait(lock);
+      };
+      assert(!entries.empty());
+      *entry = entries.front();
+      entries.pop_front();
+      lock.Unlock();
+    };
+  };
+
+  std::vector<QueueBucket> buckets;
+  int num_buckets;
+  atomic_t cur_read_bucket;
+  atomic_t cur_write_bucket;
+public:
+  QueueRing(int n) : buckets(n), num_buckets(n) {
+  }
+
+  void enqueue(const T& entry) {
+    buckets[cur_write_bucket.inc() % num_buckets].enqueue(entry);
+  };
+
+  void dequeue(T *entry) {
+    buckets[cur_read_bucket.inc() % num_buckets].dequeue(entry);
+  }
+};
+
+#endif