]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Throttle: added new OrderedThrottle class
authorJason Dillaman <dillaman@redhat.com>
Thu, 3 Sep 2015 03:30:05 +0000 (23:30 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 4 Sep 2015 02:05:33 +0000 (22:05 -0400)
It is similar to the SimpleThrottle in usage but intercepts Context
callbacks to ensure they are completed in-order.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/common/Throttle.cc
src/common/Throttle.h

index 307c0ec9e4b857baa0a6229f746a815d963894cb..d117794606be84485cc15eae6d7760ef152f48c1 100644 (file)
@@ -282,3 +282,89 @@ int SimpleThrottle::wait_for_ret()
     m_cond.Wait(m_lock);
   return m_ret;
 }
+
+void C_OrderedThrottle::finish(int r) {
+  m_ordered_throttle->finish_op(m_tid, r);
+}
+
+OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent)
+  : m_lock("OrderedThrottle::m_lock"), m_max(max), m_current(0), m_ret_val(0),
+    m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) {
+}
+
+C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
+  assert(on_finish != NULL);
+
+  Mutex::Locker locker(m_lock);
+  uint64_t tid = m_next_tid++;
+  m_tid_result[tid] = Result(on_finish);
+  C_OrderedThrottle *ctx = new C_OrderedThrottle(this, tid);
+
+  complete_pending_ops();
+  while (m_max == m_current) {
+    m_cond.Wait(m_lock);
+    complete_pending_ops();
+  }
+  ++m_current;
+
+  return ctx;
+}
+
+void OrderedThrottle::end_op(int r) {
+  Mutex::Locker locker(m_lock);
+  assert(m_current > 0);
+
+  if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) {
+    m_ret_val = r;
+  }
+  --m_current;
+  m_cond.Signal();
+}
+
+void OrderedThrottle::finish_op(uint64_t tid, int r) {
+  Mutex::Locker locker(m_lock);
+
+  TidResult::iterator it = m_tid_result.find(tid);
+  assert(it != m_tid_result.end());
+
+  it->second.finished = true;
+  it->second.ret_val = r;
+  m_cond.Signal();
+}
+
+bool OrderedThrottle::pending_error() const {
+  Mutex::Locker locker(m_lock);
+  return (m_ret_val < 0);
+}
+
+int OrderedThrottle::wait_for_ret() {
+  Mutex::Locker locker(m_lock);
+  complete_pending_ops();
+
+  while (m_current > 0) {
+    m_cond.Wait(m_lock);
+    complete_pending_ops();
+  }
+  return m_ret_val;
+}
+
+void OrderedThrottle::complete_pending_ops() {
+  assert(m_lock.is_locked());
+
+  while (true) {
+    TidResult::iterator it = m_tid_result.begin();
+    if (it == m_tid_result.end() || it->first != m_complete_tid ||
+        !it->second.finished) {
+      break;
+    }
+
+    Result result = it->second;
+    m_tid_result.erase(it);
+
+    m_lock.Unlock();
+    result.on_finish->complete(result.ret_val);
+    m_lock.Lock();
+
+    ++m_complete_tid;
+  }
+}
index 2faea594d96ed951a734c2b3a3f0f538d3533dbe..c04a9319e5602d52aeb774ed248c2b6213c734da 100644 (file)
@@ -7,7 +7,9 @@
 #include "Mutex.h"
 #include "Cond.h"
 #include <list>
+#include <map>
 #include "include/atomic.h"
+#include "include/Context.h"
 
 class CephContext;
 class PerfCounters;
@@ -150,4 +152,70 @@ private:
   SimpleThrottle *m_throttle;
 };
 
+class OrderedThrottle;
+
+class C_OrderedThrottle : public Context {
+public:
+  C_OrderedThrottle(OrderedThrottle *ordered_throttle, uint64_t tid)
+    : m_ordered_throttle(ordered_throttle), m_tid(tid) {
+  }
+
+protected:
+  virtual void finish(int r);
+
+private:
+  OrderedThrottle *m_ordered_throttle;
+  uint64_t m_tid;
+};
+
+/**
+ * @class OrderedThrottle
+ * Throttles the maximum number of active requests and completes them in order
+ *
+ * Operations can complete out-of-order but their associated Context callback
+ * will completed in-order during invokation of start_op() and wait_for_ret()
+ */
+class OrderedThrottle {
+public:
+  OrderedThrottle(uint64_t max, bool ignore_enoent);
+
+  C_OrderedThrottle *start_op(Context *on_finish);
+  void end_op(int r);
+
+  bool pending_error() const;
+  int wait_for_ret();
+
+protected:
+  friend class C_OrderedThrottle;
+
+  void finish_op(uint64_t tid, int r);
+
+private:
+  struct Result {
+    bool finished;
+    int ret_val;
+    Context *on_finish;
+
+    Result(Context *_on_finish = NULL)
+      : finished(false), ret_val(0), on_finish(_on_finish) {
+    }
+  };
+
+  typedef std::map<uint64_t, Result> TidResult;
+
+  mutable Mutex m_lock;
+  Cond m_cond;
+  uint64_t m_max;
+  uint64_t m_current;
+  int m_ret_val;
+  bool m_ignore_enoent;
+
+  uint64_t m_next_tid;
+  uint64_t m_complete_tid;
+
+  TidResult m_tid_result;
+
+  void complete_pending_ops();
+};
+
 #endif