class CephContext;
+/// Pool of threads that share work submitted to multiple work queues.
class ThreadPool : public md_config_obs_t {
CephContext *cct;
string name;
};
private:
+ /// Basic interface to a work queue used by the worker threads.
struct WorkQueue_ {
string name;
time_t timeout_interval, suicide_interval;
: name(n), timeout_interval(ti), suicide_interval(sti)
{ }
virtual ~WorkQueue_() {}
+ /// Remove all work items from the queue.
virtual void _clear() = 0;
+ /// Check whether there is anything to do.
virtual bool _empty() = 0;
+ /// Get the next work item to process.
virtual void *_void_dequeue() = 0;
+ /** @brief Process the work item.
+ * This function will be called several times in parallel
+ * and must therefore be thread-safe. */
virtual void _void_process(void *item, TPHandle &handle) = 0;
+ /** @brief Synchronously finish processing a work item.
+ * This function is called after _void_process with the global thread pool lock held,
+ * so at most one copy will execute simultaneously for a given thread pool.
+ * It can be used for non-thread-safe finalization. */
virtual void _void_process_finish(void *) = 0;
};
const std::set <std::string> &changed);
public:
+ /** @brief Work queue that processes several submitted items at once.
+ * The queue will automatically add itself to the thread pool on construction
+ * and remove itself on destruction. */
template<class T>
class BatchWorkQueue : public WorkQueue_ {
ThreadPool *pool;
}
virtual void _process_finish(const list<T*> &) {}
+ // virtual methods from WorkQueue_ below
void *_void_dequeue() {
list<T*> *out(new list<T*>);
_dequeue(out);
}
};
+
+ /** @brief Templated by-value work queue.
+ * Skeleton implementation of a queue that processes items submitted by value.
+ * This is useful if the items are single primitive values or very small objects
+ * (a few bytes). The queue will automatically add itself to the thread pool on
+ * construction and remove itself on destruction. */
template<typename T, typename U = T>
class WorkQueueVal : public WorkQueue_ {
Mutex _lock;
pool->unlock();
}
};
+
+ /** @brief Template by-pointer work queue.
+ * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
+ * This is useful when the work item are large or include dynamically allocated memory. The queue
+ * will automatically add itself to the thread pool on construction and remove itself on
+ * destruction. */
template<class T>
class WorkQueue : public WorkQueue_ {
ThreadPool *pool;
+ /// Add a work item to the queue.
virtual bool _enqueue(T *) = 0;
+ /// Dequeue a previously submitted work item.
virtual void _dequeue(T *) = 0;
+ /// Dequeue a work item and return the original submitted pointer.
virtual T *_dequeue() = 0;
+ /// Process a work item. Called from the worker threads.
virtual void _process(T *t) { assert(0); }
virtual void _process(T *t, TPHandle &) {
_process(t);
}
virtual void _process_finish(T *) {}
-
+
+ // implementation of virtual methods from WorkQueue_
void *_void_dequeue() {
return (void *)_dequeue();
}
void pause_new();
/// resume work in thread pool. must match each pause() call 1:1 to resume.
void unpause();
- /// wait for all work to complete
+ /** @brief Wait until work completes.
+ * If the parameter is NULL, blocks until all threads are idle.
+ * If it is not NULL, blocks until the given work queue does not have
+ * any items left to process. */
void drain(WorkQueue_* wq = 0);
/// set io priority
}
};
+/// Work queue that asynchronously completes contexts (executes callbacks).
+/// @see Finisher
class ContextWQ : public ThreadPool::WorkQueueVal<std::pair<Context *, int> > {
public:
ContextWQ(const string &name, time_t ti, ThreadPool *tp)