#include "Mutex.h"
#include "Cond.h"
#include "Thread.h"
+#include "include/unordered_map.h"
#include "common/config_obs.h"
#include "common/HeartbeatMap.h"
};
+ template<typename T>
+ class PointerWQ : public WorkQueue_ {
+ public:
+ PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
+ : WorkQueue_(n, ti, sti), m_pool(p) {
+ m_pool->add_work_queue(this);
+ }
+ ~PointerWQ() {
+ m_pool->remove_work_queue(this);
+ }
+ void drain() {
+ m_pool->drain(this);
+ }
+ void queue(T *item) {
+ Mutex::Locker l(m_pool->_lock);
+ m_items.push_back(item);
+ m_pool->_cond.SignalOne();
+ }
+ protected:
+ virtual void _clear() {
+ assert(m_pool->_lock.is_locked());
+ m_items.clear();
+ }
+ virtual bool _empty() {
+ assert(m_pool->_lock.is_locked());
+ return m_items.empty();
+ }
+ virtual void *_void_dequeue() {
+ assert(m_pool->_lock.is_locked());
+ if (m_items.empty()) {
+ return NULL;
+ }
+
+ T *item = m_items.front();
+ m_items.pop_front();
+ return item;
+ }
+ virtual void _void_process(void *item, ThreadPool::TPHandle &handle) {
+ process(reinterpret_cast<T *>(item));
+ }
+ virtual void _void_process_finish(void *item) {
+ }
+
+ virtual void process(T *item) = 0;
+
+ T *front() {
+ assert(m_pool->_lock.is_locked());
+ if (m_items.empty()) {
+ return NULL;
+ }
+ return m_items.front();
+ }
+ void signal() {
+ Mutex::Locker pool_locker(m_pool->_lock);
+ m_pool->_cond.SignalOne();
+ }
+ private:
+ ThreadPool *m_pool;
+ std::list<T *> m_items;
+ };
private:
vector<WorkQueue_*> work_queues;
int last_work_queue;
/// Work queue that asynchronously completes contexts (executes callbacks).
/// @see Finisher
-class ContextWQ : public ThreadPool::WorkQueueVal<std::pair<Context *, int> > {
+class ContextWQ : public ThreadPool::PointerWQ<Context> {
public:
ContextWQ(const string &name, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueueVal<std::pair<Context *, int> >(name, ti, 0, tp) {}
+ : ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
+ m_lock("ContextWQ::m_lock") {
+ }
void queue(Context *ctx, int result = 0) {
- ThreadPool::WorkQueueVal<std::pair<Context *, int> >::queue(
- std::make_pair(ctx, result));
+ if (result != 0) {
+ Mutex::Locker locker(m_lock);
+ m_context_results[ctx] = result;
+ }
+ ThreadPool::PointerWQ<Context>::queue(ctx);
}
-
protected:
- virtual void _enqueue(std::pair<Context *, int> item) {
- _queue.push_back(item);
- }
- virtual void _enqueue_front(std::pair<Context *, int> item) {
- _queue.push_front(item);
- }
- virtual bool _empty() {
- return _queue.empty();
- }
- virtual std::pair<Context *, int> _dequeue() {
- std::pair<Context *, int> item = _queue.front();
- _queue.pop_front();
- return item;
+ virtual void _clear() {
+ ThreadPool::PointerWQ<Context>::_clear();
+
+ Mutex::Locker locker(m_lock);
+ m_context_results.clear();
}
- virtual void _process(std::pair<Context *, int> item) {
- item.first->complete(item.second);
+
+ virtual void process(Context *ctx) {
+ int result = 0;
+ {
+ Mutex::Locker locker(m_lock);
+ ceph::unordered_map<Context *, int>::iterator it =
+ m_context_results.find(ctx);
+ if (it != m_context_results.end()) {
+ result = it->second;
+ m_context_results.erase(it);
+ }
+ }
+ ctx->complete(result);
}
- using ThreadPool::WorkQueueVal<std::pair<Context *, int> >::_process;
private:
- list<std::pair<Context *, int> > _queue;
+ Mutex m_lock;
+ ceph::unordered_map<Context*, int> m_context_results;
};
class ShardedThreadPool {