]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os: use lockfree queue for sharded queue
authorKefu Chai <kchai@redhat.com>
Tue, 22 Jun 2021 13:27:32 +0000 (21:27 +0800)
committerKefu Chai <kchai@redhat.com>
Tue, 29 Jun 2021 10:58:09 +0000 (18:58 +0800)
each sharded queue has multiple producers and a single consumer queue,
but the producer side is in seastar world, so ideally, we should not
guard the queue using a POSIX lock.

in this change, the lockfree multiple-writer/multiple-reader queue is
used to replace the std::queue to drop the lock guarding it.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/os/alienstore/thread_pool.h

index 6b742a5cc83c3579e63cfbace6c05012d7ac6136..3a99398527bbd8d6a3740d93a572de69e27f954b 100644 (file)
@@ -78,13 +78,7 @@ public:
     WorkItem* work_item = nullptr;
     std::unique_lock lock{mutex};
     cond.wait_for(lock, queue_max_wait, [this, &work_item] {
-      bool empty = true;
-      if (!pending.empty()) {
-        empty = false;
-        work_item = pending.front();
-        pending.pop_front();
-      }
-      return !empty || is_stopping();
+      return pending.pop(work_item) || is_stopping();
     });
     return work_item;
   }
@@ -96,10 +90,8 @@ public:
     cond.notify_all();
   }
   void push_back(WorkItem* work_item) {
-    // XXX: oops, we can stall the reactor!
-    // TODO: switch to boost::lockfree.
-    std::unique_lock lock{mutex};
-    pending.push_back(work_item);
+    [[maybe_unused]] bool pushed = pending.push(work_item);
+    assert(pushed);
     cond.notify_one();
   }
 private:
@@ -109,7 +101,8 @@ private:
   bool stopping = false;
   std::mutex mutex;
   std::condition_variable cond;
-  std::deque<WorkItem*> pending;
+  static constexpr unsigned QUEUE_SIZE = 128;
+  boost::lockfree::queue<WorkItem*> pending{QUEUE_SIZE};
 };
 
 /// an engine for scheduling non-seastar tasks from seastar fibers