From 635a60d76fec3344528aa7ce1ca231cd6e75694f Mon Sep 17 00:00:00 2001 From: =?utf8?q?Piotr=20Da=C5=82ek?= Date: Thu, 22 Feb 2018 14:45:52 +0100 Subject: [PATCH] common/OpHistory: move insert/cleanup into separate thread MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Cluster that's flooded with incoming ops (and enabled optracker) is bottlenecked by OpHistory::insert. Reduce that by: - pushing incoming ops into separate queue that'll be processed by separate thread. - using std::atomic_bool for shutdown flag so ops_history_lock doesn't need to be taken as often Signed-off-by: Piotr Dałek --- src/common/TrackedOp.cc | 41 +++++++++++++++++++++++++++++++++++++- src/common/TrackedOp.h | 44 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/src/common/TrackedOp.cc b/src/common/TrackedOp.cc index baf081d0dd4dc..4b7ce31ee7b2e 100644 --- a/src/common/TrackedOp.cc +++ b/src/common/TrackedOp.cc @@ -22,8 +22,47 @@ static ostream& _prefix(std::ostream* _dout) return *_dout << "-- op tracker -- "; } +void OpHistoryServiceThread::break_thread() { + queue_spinlock.lock(); + _external_queue.clear(); + _break_thread = true; + queue_spinlock.unlock(); +} + +void* OpHistoryServiceThread::entry() { + int sleep_time = 1000; + list> internal_queue; + while (1) { + queue_spinlock.lock(); + if (_break_thread) { + queue_spinlock.unlock(); + break; + } + internal_queue.swap(_external_queue); + queue_spinlock.unlock(); + if (internal_queue.empty()) { + usleep(sleep_time); + if (sleep_time < 128000) { + sleep_time <<= 2; + } + } else { + sleep_time = 1000; + } + + while (!internal_queue.empty()) { + pair op = internal_queue.front(); + _ophistory->_insert_delayed(op.first, op.second); + internal_queue.pop_front(); + } + } + return nullptr; +} + + void OpHistory::on_shutdown() { + opsvc.break_thread(); + opsvc.join(); Mutex::Locker history_lock(ops_history_lock); arrived.clear(); duration.clear(); @@ -31,7 +70,7 @@ void OpHistory::on_shutdown() shutdown = true; } -void OpHistory::insert(utime_t now, TrackedOpRef op) +void OpHistory::_insert_delayed(const utime_t& now, TrackedOpRef op) { Mutex::Locker history_lock(ops_history_lock); if (shutdown) diff --git a/src/common/TrackedOp.h b/src/common/TrackedOp.h index d14b2e03162e0..5c157a01cea9c 100644 --- a/src/common/TrackedOp.h +++ b/src/common/TrackedOp.h @@ -22,30 +22,68 @@ #define OPTRACKER_PREALLOC_EVENTS 20 class TrackedOp; +class OpHistory; + typedef boost::intrusive_ptr TrackedOpRef; +class OpHistoryServiceThread : public Thread +{ +private: + OpHistory* _ophistory; + mutable ceph::spinlock queue_spinlock; + list> _external_queue; + bool _break_thread; +public: + explicit OpHistoryServiceThread(OpHistory* parent) + : _ophistory(parent), + _break_thread(false) { } + + void break_thread(); + void insert_op(const utime_t& now, TrackedOpRef op) { + queue_spinlock.lock(); + _external_queue.emplace_back(now, op); + queue_spinlock.unlock(); + } + + void *entry() override; +}; + + class OpHistory { set > arrived; set > duration; set > slow_op; Mutex ops_history_lock; + std::atomic_bool shutdown; void cleanup(utime_t now); - bool shutdown; uint32_t history_size; uint32_t history_duration; uint32_t history_slow_op_size; uint32_t history_slow_op_threshold; + OpHistoryServiceThread opsvc; + friend class OpHistoryServiceThread; public: OpHistory() : ops_history_lock("OpHistory::Lock"), shutdown(false), history_size(0), history_duration(0), - history_slow_op_size(0), history_slow_op_threshold(0) {} + history_slow_op_size(0), history_slow_op_threshold(0), + opsvc(this) { + opsvc.create("OpHistorySvc"); + } ~OpHistory() { assert(arrived.empty()); assert(duration.empty()); assert(slow_op.empty()); } - void insert(utime_t now, TrackedOpRef op); + void insert(const utime_t& now, TrackedOpRef op) + { + if (shutdown) + return; + + opsvc.insert_op(now, op); + } + + void _insert_delayed(const utime_t& now, TrackedOpRef op); void dump_ops(utime_t now, Formatter *f, set filters = {""}); void dump_ops_by_duration(utime_t now, Formatter *f, set filters = {""}); void dump_slow_ops(utime_t now, Formatter *f, set filters = {""}); -- 2.39.5