m_cond.Wait(m_lock);
return m_ret;
}
+
+void C_OrderedThrottle::finish(int r) {
+ m_ordered_throttle->finish_op(m_tid, r);
+}
+
+OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent)
+ : m_lock("OrderedThrottle::m_lock"), m_max(max), m_current(0), m_ret_val(0),
+ m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) {
+}
+
+C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
+ assert(on_finish != NULL);
+
+ Mutex::Locker locker(m_lock);
+ uint64_t tid = m_next_tid++;
+ m_tid_result[tid] = Result(on_finish);
+ C_OrderedThrottle *ctx = new C_OrderedThrottle(this, tid);
+
+ complete_pending_ops();
+ while (m_max == m_current) {
+ m_cond.Wait(m_lock);
+ complete_pending_ops();
+ }
+ ++m_current;
+
+ return ctx;
+}
+
+void OrderedThrottle::end_op(int r) {
+ Mutex::Locker locker(m_lock);
+ assert(m_current > 0);
+
+ if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) {
+ m_ret_val = r;
+ }
+ --m_current;
+ m_cond.Signal();
+}
+
+void OrderedThrottle::finish_op(uint64_t tid, int r) {
+ Mutex::Locker locker(m_lock);
+
+ TidResult::iterator it = m_tid_result.find(tid);
+ assert(it != m_tid_result.end());
+
+ it->second.finished = true;
+ it->second.ret_val = r;
+ m_cond.Signal();
+}
+
+bool OrderedThrottle::pending_error() const {
+ Mutex::Locker locker(m_lock);
+ return (m_ret_val < 0);
+}
+
+int OrderedThrottle::wait_for_ret() {
+ Mutex::Locker locker(m_lock);
+ complete_pending_ops();
+
+ while (m_current > 0) {
+ m_cond.Wait(m_lock);
+ complete_pending_ops();
+ }
+ return m_ret_val;
+}
+
+void OrderedThrottle::complete_pending_ops() {
+ assert(m_lock.is_locked());
+
+ while (true) {
+ TidResult::iterator it = m_tid_result.begin();
+ if (it == m_tid_result.end() || it->first != m_complete_tid ||
+ !it->second.finished) {
+ break;
+ }
+
+ Result result = it->second;
+ m_tid_result.erase(it);
+
+ m_lock.Unlock();
+ result.on_finish->complete(result.ret_val);
+ m_lock.Lock();
+
+ ++m_complete_tid;
+ }
+}
#include "Mutex.h"
#include "Cond.h"
#include <list>
+#include <map>
#include "include/atomic.h"
+#include "include/Context.h"
class CephContext;
class PerfCounters;
SimpleThrottle *m_throttle;
};
+class OrderedThrottle;
+
+class C_OrderedThrottle : public Context {
+public:
+ C_OrderedThrottle(OrderedThrottle *ordered_throttle, uint64_t tid)
+ : m_ordered_throttle(ordered_throttle), m_tid(tid) {
+ }
+
+protected:
+ virtual void finish(int r);
+
+private:
+ OrderedThrottle *m_ordered_throttle;
+ uint64_t m_tid;
+};
+
+/**
+ * @class OrderedThrottle
+ * Throttles the maximum number of active requests and completes them in order
+ *
+ * Operations can complete out-of-order but their associated Context callback
+ * will completed in-order during invokation of start_op() and wait_for_ret()
+ */
+class OrderedThrottle {
+public:
+ OrderedThrottle(uint64_t max, bool ignore_enoent);
+
+ C_OrderedThrottle *start_op(Context *on_finish);
+ void end_op(int r);
+
+ bool pending_error() const;
+ int wait_for_ret();
+
+protected:
+ friend class C_OrderedThrottle;
+
+ void finish_op(uint64_t tid, int r);
+
+private:
+ struct Result {
+ bool finished;
+ int ret_val;
+ Context *on_finish;
+
+ Result(Context *_on_finish = NULL)
+ : finished(false), ret_val(0), on_finish(_on_finish) {
+ }
+ };
+
+ typedef std::map<uint64_t, Result> TidResult;
+
+ mutable Mutex m_lock;
+ Cond m_cond;
+ uint64_t m_max;
+ uint64_t m_current;
+ int m_ret_val;
+ bool m_ignore_enoent;
+
+ uint64_t m_next_tid;
+ uint64_t m_complete_tid;
+
+ TidResult m_tid_result;
+
+ void complete_pending_ops();
+};
+
#endif