From: Jason Dillaman Date: Thu, 3 Sep 2015 03:30:05 +0000 (-0400) Subject: Throttle: added new OrderedThrottle class X-Git-Tag: v9.1.0~174^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=eceadee257ea6134ab9573e70cee7cce2d56b086;p=ceph.git Throttle: added new OrderedThrottle class It is similar to the SimpleThrottle in usage but intercepts Context callbacks to ensure they are completed in-order. Signed-off-by: Jason Dillaman --- diff --git a/src/common/Throttle.cc b/src/common/Throttle.cc index 307c0ec9e4b8..d117794606be 100644 --- a/src/common/Throttle.cc +++ b/src/common/Throttle.cc @@ -282,3 +282,89 @@ int SimpleThrottle::wait_for_ret() m_cond.Wait(m_lock); return m_ret; } + +void C_OrderedThrottle::finish(int r) { + m_ordered_throttle->finish_op(m_tid, r); +} + +OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent) + : m_lock("OrderedThrottle::m_lock"), m_max(max), m_current(0), m_ret_val(0), + m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) { +} + +C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) { + assert(on_finish != NULL); + + Mutex::Locker locker(m_lock); + uint64_t tid = m_next_tid++; + m_tid_result[tid] = Result(on_finish); + C_OrderedThrottle *ctx = new C_OrderedThrottle(this, tid); + + complete_pending_ops(); + while (m_max == m_current) { + m_cond.Wait(m_lock); + complete_pending_ops(); + } + ++m_current; + + return ctx; +} + +void OrderedThrottle::end_op(int r) { + Mutex::Locker locker(m_lock); + assert(m_current > 0); + + if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) { + m_ret_val = r; + } + --m_current; + m_cond.Signal(); +} + +void OrderedThrottle::finish_op(uint64_t tid, int r) { + Mutex::Locker locker(m_lock); + + TidResult::iterator it = m_tid_result.find(tid); + assert(it != m_tid_result.end()); + + it->second.finished = true; + it->second.ret_val = r; + m_cond.Signal(); +} + +bool OrderedThrottle::pending_error() const { + Mutex::Locker locker(m_lock); + return (m_ret_val < 0); +} + +int OrderedThrottle::wait_for_ret() { + Mutex::Locker locker(m_lock); + complete_pending_ops(); + + while (m_current > 0) { + m_cond.Wait(m_lock); + complete_pending_ops(); + } + return m_ret_val; +} + +void OrderedThrottle::complete_pending_ops() { + assert(m_lock.is_locked()); + + while (true) { + TidResult::iterator it = m_tid_result.begin(); + if (it == m_tid_result.end() || it->first != m_complete_tid || + !it->second.finished) { + break; + } + + Result result = it->second; + m_tid_result.erase(it); + + m_lock.Unlock(); + result.on_finish->complete(result.ret_val); + m_lock.Lock(); + + ++m_complete_tid; + } +} diff --git a/src/common/Throttle.h b/src/common/Throttle.h index 2faea594d96e..c04a9319e560 100644 --- a/src/common/Throttle.h +++ b/src/common/Throttle.h @@ -7,7 +7,9 @@ #include "Mutex.h" #include "Cond.h" #include +#include #include "include/atomic.h" +#include "include/Context.h" class CephContext; class PerfCounters; @@ -150,4 +152,70 @@ private: SimpleThrottle *m_throttle; }; +class OrderedThrottle; + +class C_OrderedThrottle : public Context { +public: + C_OrderedThrottle(OrderedThrottle *ordered_throttle, uint64_t tid) + : m_ordered_throttle(ordered_throttle), m_tid(tid) { + } + +protected: + virtual void finish(int r); + +private: + OrderedThrottle *m_ordered_throttle; + uint64_t m_tid; +}; + +/** + * @class OrderedThrottle + * Throttles the maximum number of active requests and completes them in order + * + * Operations can complete out-of-order but their associated Context callback + * will completed in-order during invokation of start_op() and wait_for_ret() + */ +class OrderedThrottle { +public: + OrderedThrottle(uint64_t max, bool ignore_enoent); + + C_OrderedThrottle *start_op(Context *on_finish); + void end_op(int r); + + bool pending_error() const; + int wait_for_ret(); + +protected: + friend class C_OrderedThrottle; + + void finish_op(uint64_t tid, int r); + +private: + struct Result { + bool finished; + int ret_val; + Context *on_finish; + + Result(Context *_on_finish = NULL) + : finished(false), ret_val(0), on_finish(_on_finish) { + } + }; + + typedef std::map TidResult; + + mutable Mutex m_lock; + Cond m_cond; + uint64_t m_max; + uint64_t m_current; + int m_ret_val; + bool m_ignore_enoent; + + uint64_t m_next_tid; + uint64_t m_complete_tid; + + TidResult m_tid_result; + + void complete_pending_ops(); +}; + #endif