const std::set <std::string> &changed) override;
public:
- /** @brief Work queue that processes several submitted items at once.
- * The queue will automatically add itself to the thread pool on construction
- * and remove itself on destruction. */
- template<class T>
- class BatchWorkQueue : public WorkQueue_ {
- ThreadPool *pool;
-
- virtual bool _enqueue(T *) = 0;
- virtual void _dequeue(T *) = 0;
- virtual void _dequeue(std::list<T*> *) = 0;
- virtual void _process_finish(const std::list<T*> &) {}
-
- // virtual methods from WorkQueue_ below
- void *_void_dequeue() override {
- std::list<T*> *out(new std::list<T*>);
- _dequeue(out);
- if (!out->empty()) {
- return (void *)out;
- } else {
- delete out;
- return 0;
- }
- }
- void _void_process(void *p, TPHandle &handle) override {
- _process(*((std::list<T*>*)p), handle);
- }
- void _void_process_finish(void *p) override {
- _process_finish(*(std::list<T*>*)p);
- delete (std::list<T*> *)p;
- }
-
- protected:
- virtual void _process(const std::list<T*> &items, TPHandle &handle) = 0;
-
- public:
- BatchWorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
- : WorkQueue_(std::move(n), ti, sti), pool(p) {
- pool->add_work_queue(this);
- }
- ~BatchWorkQueue() override {
- pool->remove_work_queue(this);
- }
-
- bool queue(T *item) {
- pool->_lock.lock();
- bool r = _enqueue(item);
- pool->_cond.notify_one();
- pool->_lock.unlock();
- return r;
- }
- void dequeue(T *item) {
- pool->_lock.lock();
- _dequeue(item);
- pool->_lock.unlock();
- }
- void clear() {
- pool->_lock.lock();
- _clear();
- pool->_lock.unlock();
- }
-
- void lock() {
- pool->lock();
- }
- void unlock() {
- pool->unlock();
- }
- void wake() {
- pool->wake();
- }
- void _wake() {
- pool->_wake();
- }
- void drain() {
- pool->drain(this);
- }
-
- };
-
/** @brief Templated by-value work queue.
* Skeleton implementation of a queue that processes items submitted by value.
* This is useful if the items are single primitive values or very small objects