From: Yehuda Sadeh Date: Sun, 2 Mar 2014 03:14:12 +0000 (-0800) Subject: QueueRing: a reduced contention queue X-Git-Tag: v0.93~157^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=32b9bb7a2b031b3357600a2a405d1f18ac594546;p=ceph.git QueueRing: a reduced contention queue A queue that provides multi-readers, multi-writers concurrency with a reduced lock contention. Signed-off-by: Yehuda Sadeh --- diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 28881945357..4e033c1dc5c 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -180,6 +180,7 @@ noinst_HEADERS += \ common/map_cacher.hpp \ common/MemoryModel.h \ common/Mutex.h \ + common/QueueRing.h \ common/PrebufferedStreambuf.h \ common/RWLock.h \ common/Semaphore.h \ diff --git a/src/common/QueueRing.h b/src/common/QueueRing.h new file mode 100644 index 00000000000..830f80f8442 --- /dev/null +++ b/src/common/QueueRing.h @@ -0,0 +1,61 @@ +#ifndef QUEUE_RING_H +#define QUEUE_RING_H + +#include +#include +#include "common/Mutex.h" +#include "common/Cond.h" + + + +template +class QueueRing { + struct QueueBucket { + Mutex lock; + Cond cond; + typename std::list entries; + + QueueBucket() : lock("QueueRing::QueueBucket::lock") {} + QueueBucket(const QueueBucket& rhs) : lock("QueueRing::QueueBucket::lock") { + entries = rhs.entries; + } + + void enqueue(const T& entry) { + lock.Lock(); + if (entries.empty()) { + cond.Signal(); + } + entries.push_back(entry); + lock.Unlock(); + } + + void dequeue(T *entry) { + lock.Lock(); + if (entries.empty()) { + cond.Wait(lock); + }; + assert(!entries.empty()); + *entry = entries.front(); + entries.pop_front(); + lock.Unlock(); + }; + }; + + std::vector buckets; + int num_buckets; + atomic_t cur_read_bucket; + atomic_t cur_write_bucket; +public: + QueueRing(int n) : buckets(n), num_buckets(n) { + } + + void enqueue(const T& entry) { + buckets[cur_write_bucket.inc() % num_buckets].enqueue(entry); + }; + + void dequeue(T *entry) { + buckets[cur_read_bucket.inc() % num_buckets].dequeue(entry); + } +}; + +#endif