ldout(cct, 10) << __func__ << dendl;
finisher_lock.Lock();
finisher_stop = true;
+ // we don't have any new work to do, but we want the worker to wake up anyway
+ // to process the stop condition.
finisher_cond.Signal();
finisher_lock.Unlock();
- finisher_thread.join();
+ finisher_thread.join(); // wait until the worker exits completely
ldout(cct, 10) << __func__ << " finish" << dendl;
}
ldout(cct, 10) << "finisher_thread start" << dendl;
while (!finisher_stop) {
+ /// Every time we are woken up, we process the queue until it is empty.
while (!finisher_queue.empty()) {
+ // To reduce lock contention, we swap out the queue to process.
+ // This way other threads can submit new contexts to complete while we are working.
vector<Context*> ls;
list<pair<Context*,int> > ls_rval;
ls.swap(finisher_queue);
finisher_lock.Unlock();
ldout(cct, 10) << "finisher_thread doing " << ls << dendl;
+ // Now actually process the contexts.
for (vector<Context*>::iterator p = ls.begin();
p != ls.end();
++p) {
if (*p) {
(*p)->complete(0);
} else {
+ // When an item is NULL in the finisher_queue, it means
+ // we should instead process an item from finisher_queue_rval,
+ // which has a parameter for complete() other than zero.
+ // This preserves the order while saving some storage.
assert(!ls_rval.empty());
Context *c = ls_rval.front().first;
c->complete(ls_rval.front().second);
ldout(cct, 10) << "finisher_thread sleeping" << dendl;
finisher_cond.Wait(finisher_lock);
}
+ // If we are exiting, we signal the thread waiting in stop(),
+ // otherwise it would never unblock
finisher_empty_cond.Signal();
ldout(cct, 10) << "finisher_thread stop" << dendl;
class CephContext;
+/// Finisher queue length performance counter ID.
enum {
l_finisher_first = 997082,
l_finisher_queue_len,
l_finisher_last
};
+/** @brief Asynchronous cleanup class.
+ * Finisher asynchronously completes Contexts, which are simple classes
+ * representing callbacks, in a dedicated worker thread. Enqueuing
+ * contexts to complete is thread-safe.
+ */
class Finisher {
CephContext *cct;
- Mutex finisher_lock;
- Cond finisher_cond, finisher_empty_cond;
- bool finisher_stop, finisher_running;
+ Mutex finisher_lock; ///< Protects access to queues and finisher_running.
+ Cond finisher_cond; ///< Signaled when there is something to process.
+ Cond finisher_empty_cond; ///< Signaled when the finisher has nothing more to process.
+ bool finisher_stop; ///< Set when the finisher should stop.
+ bool finisher_running; ///< True when the finisher is currently executing contexts.
+ /// Queue for contexts for which complete(0) will be called.
+ /// NULLs in this queue indicate that an item from finisher_queue_rval
+ /// should be completed in that place instead.
vector<Context*> finisher_queue;
+
+ /// Queue for contexts for which the complete function will be called
+ /// with a parameter other than 0.
list<pair<Context*,int> > finisher_queue_rval;
+
+ /// Performance counter for the finisher's queue length.
+ /// Only active for named finishers.
PerfCounters *logger;
void *finisher_thread_entry();
} finisher_thread;
public:
+ /// Add a context to complete, optionally specifying a parameter for the complete function.
void queue(Context *c, int r = 0) {
finisher_lock.Lock();
if (finisher_queue.empty()) {
finisher_lock.Unlock();
ls.clear();
}
-
+
+ /// Start the worker thread.
void start();
+
+ /** @brief Stop the worker thread.
+ *
+ * Does not wait until all outstanding contexts are completed.
+ * To ensure that everything finishes, you should first shut down
+ * all sources that can add contexts to this finisher and call
+ * wait_for_empty() before calling stop(). */
void stop();
+ /** @brief Blocks until the finisher has nothing left to process.
+ * This function will also return when a concurrent call to stop()
+ * finishes, but this class should never be used in this way. */
void wait_for_empty();
+ /// Construct an anonymous Finisher.
+ /// Anonymous finishers do not log their queue length.
Finisher(CephContext *cct_) :
cct(cct_), finisher_lock("Finisher::finisher_lock"),
finisher_stop(false), finisher_running(false),
logger(0),
finisher_thread(this) {}
+
+ /// Construct a named Finisher that logs its queue length.
Finisher(CephContext *cct_, string name) :
cct(cct_), finisher_lock("Finisher::finisher_lock"),
finisher_stop(false), finisher_running(false),
}
};
+/// Context that is completed asynchronously on the supplied finisher.
class C_OnFinisher : public Context {
Context *con;
Finisher *fin;