WorkItem* work_item = nullptr;
std::unique_lock lock{mutex};
cond.wait_for(lock, queue_max_wait, [this, &work_item] {
- bool empty = true;
- if (!pending.empty()) {
- empty = false;
- work_item = pending.front();
- pending.pop_front();
- }
- return !empty || is_stopping();
+ return pending.pop(work_item) || is_stopping();
});
return work_item;
}
cond.notify_all();
}
void push_back(WorkItem* work_item) {
- // XXX: oops, we can stall the reactor!
- // TODO: switch to boost::lockfree.
- std::unique_lock lock{mutex};
- pending.push_back(work_item);
+ [[maybe_unused]] bool pushed = pending.push(work_item);
+ assert(pushed);
cond.notify_one();
}
private:
bool stopping = false;
std::mutex mutex;
std::condition_variable cond;
- std::deque<WorkItem*> pending;
+ static constexpr unsigned QUEUE_SIZE = 128;
+ boost::lockfree::queue<WorkItem*> pending{QUEUE_SIZE};
};
/// an engine for scheduling non-seastar tasks from seastar fibers