]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/OpHistory: move insert/cleanup into separate thread
authorPiotr Dałek <piotr.dalek@corp.ovh.com>
Thu, 22 Feb 2018 13:45:52 +0000 (14:45 +0100)
committerPiotr Dałek <piotr.dalek@corp.ovh.com>
Mon, 26 Feb 2018 14:27:42 +0000 (15:27 +0100)
Cluster that's flooded with incoming ops (and enabled optracker)
is bottlenecked by OpHistory::insert. Reduce that by:
- pushing incoming ops into separate queue that'll be processed by
  separate thread.
- using std::atomic_bool for shutdown flag so ops_history_lock doesn't
  need to be taken as often

Signed-off-by: Piotr Dałek <piotr.dalek@corp.ovh.com>
src/common/TrackedOp.cc
src/common/TrackedOp.h

index baf081d0dd4dc929cf14725198058fb4867fad87..4b7ce31ee7b2ed3f2e6071be92656601a3912665 100644 (file)
@@ -22,8 +22,47 @@ static ostream& _prefix(std::ostream* _dout)
   return *_dout << "-- op tracker -- ";
 }
 
+void OpHistoryServiceThread::break_thread() {
+  queue_spinlock.lock();
+  _external_queue.clear();
+  _break_thread = true;
+  queue_spinlock.unlock();
+}
+
+void* OpHistoryServiceThread::entry() {
+  int sleep_time = 1000;
+  list<pair<utime_t, TrackedOpRef>> internal_queue;
+  while (1) {
+    queue_spinlock.lock();
+    if (_break_thread) {
+      queue_spinlock.unlock();
+      break;
+    }
+    internal_queue.swap(_external_queue);
+    queue_spinlock.unlock();
+    if (internal_queue.empty()) {
+      usleep(sleep_time);
+      if (sleep_time < 128000) {
+        sleep_time <<= 2;
+      }
+    } else {
+      sleep_time = 1000;
+    }
+
+    while (!internal_queue.empty()) {
+      pair<utime_t, TrackedOpRef> op = internal_queue.front();
+      _ophistory->_insert_delayed(op.first, op.second);
+      internal_queue.pop_front();
+    }
+  }
+  return nullptr;
+}
+
+
 void OpHistory::on_shutdown()
 {
+  opsvc.break_thread();
+  opsvc.join();
   Mutex::Locker history_lock(ops_history_lock);
   arrived.clear();
   duration.clear();
@@ -31,7 +70,7 @@ void OpHistory::on_shutdown()
   shutdown = true;
 }
 
-void OpHistory::insert(utime_t now, TrackedOpRef op)
+void OpHistory::_insert_delayed(const utime_t& now, TrackedOpRef op)
 {
   Mutex::Locker history_lock(ops_history_lock);
   if (shutdown)
index d14b2e03162e0486eaaa04e808ca17a78d903634..5c157a01cea9c29255be8025794200bfd21da74d 100644 (file)
 #define OPTRACKER_PREALLOC_EVENTS 20
 
 class TrackedOp;
+class OpHistory;
+
 typedef boost::intrusive_ptr<TrackedOp> TrackedOpRef;
 
+class OpHistoryServiceThread : public Thread
+{
+private:
+  OpHistory* _ophistory;
+  mutable ceph::spinlock queue_spinlock;
+  list<pair<utime_t, TrackedOpRef>> _external_queue;
+  bool _break_thread;
+public:
+  explicit OpHistoryServiceThread(OpHistory* parent)
+    : _ophistory(parent),
+      _break_thread(false) { }
+
+  void break_thread();
+  void insert_op(const utime_t& now, TrackedOpRef op) {
+    queue_spinlock.lock();
+    _external_queue.emplace_back(now, op);
+    queue_spinlock.unlock();
+  }
+
+  void *entry() override;
+};
+
+
 class OpHistory {
   set<pair<utime_t, TrackedOpRef> > arrived;
   set<pair<double, TrackedOpRef> > duration;
   set<pair<utime_t, TrackedOpRef> > slow_op;
   Mutex ops_history_lock;
+  std::atomic_bool shutdown;
   void cleanup(utime_t now);
-  bool shutdown;
   uint32_t history_size;
   uint32_t history_duration;
   uint32_t history_slow_op_size;
   uint32_t history_slow_op_threshold;
+  OpHistoryServiceThread opsvc;
+  friend class OpHistoryServiceThread;
 
 public:
   OpHistory() : ops_history_lock("OpHistory::Lock"), shutdown(false),
     history_size(0), history_duration(0),
-    history_slow_op_size(0), history_slow_op_threshold(0) {}
+    history_slow_op_size(0), history_slow_op_threshold(0),
+    opsvc(this) {
+    opsvc.create("OpHistorySvc");
+  }
   ~OpHistory() {
     assert(arrived.empty());
     assert(duration.empty());
     assert(slow_op.empty());
   }
-  void insert(utime_t now, TrackedOpRef op);
+  void insert(const utime_t& now, TrackedOpRef op)
+  {
+    if (shutdown)
+      return;
+
+    opsvc.insert_op(now, op);
+  }
+
+  void _insert_delayed(const utime_t& now, TrackedOpRef op);
   void dump_ops(utime_t now, Formatter *f, set<string> filters = {""});
   void dump_ops_by_duration(utime_t now, Formatter *f, set<string> filters = {""});
   void dump_slow_ops(utime_t now, Formatter *f, set<string> filters = {""});