return *_dout << "-- op tracker -- ";
}
+void OpHistoryServiceThread::break_thread() {
+ queue_spinlock.lock();
+ _external_queue.clear();
+ _break_thread = true;
+ queue_spinlock.unlock();
+}
+
+void* OpHistoryServiceThread::entry() {
+ int sleep_time = 1000;
+ list<pair<utime_t, TrackedOpRef>> internal_queue;
+ while (1) {
+ queue_spinlock.lock();
+ if (_break_thread) {
+ queue_spinlock.unlock();
+ break;
+ }
+ internal_queue.swap(_external_queue);
+ queue_spinlock.unlock();
+ if (internal_queue.empty()) {
+ usleep(sleep_time);
+ if (sleep_time < 128000) {
+ sleep_time <<= 2;
+ }
+ } else {
+ sleep_time = 1000;
+ }
+
+ while (!internal_queue.empty()) {
+ pair<utime_t, TrackedOpRef> op = internal_queue.front();
+ _ophistory->_insert_delayed(op.first, op.second);
+ internal_queue.pop_front();
+ }
+ }
+ return nullptr;
+}
+
+
void OpHistory::on_shutdown()
{
+ opsvc.break_thread();
+ opsvc.join();
Mutex::Locker history_lock(ops_history_lock);
arrived.clear();
duration.clear();
shutdown = true;
}
-void OpHistory::insert(utime_t now, TrackedOpRef op)
+void OpHistory::_insert_delayed(const utime_t& now, TrackedOpRef op)
{
Mutex::Locker history_lock(ops_history_lock);
if (shutdown)
#define OPTRACKER_PREALLOC_EVENTS 20
class TrackedOp;
+class OpHistory;
+
typedef boost::intrusive_ptr<TrackedOp> TrackedOpRef;
+class OpHistoryServiceThread : public Thread
+{
+private:
+ OpHistory* _ophistory;
+ mutable ceph::spinlock queue_spinlock;
+ list<pair<utime_t, TrackedOpRef>> _external_queue;
+ bool _break_thread;
+public:
+ explicit OpHistoryServiceThread(OpHistory* parent)
+ : _ophistory(parent),
+ _break_thread(false) { }
+
+ void break_thread();
+ void insert_op(const utime_t& now, TrackedOpRef op) {
+ queue_spinlock.lock();
+ _external_queue.emplace_back(now, op);
+ queue_spinlock.unlock();
+ }
+
+ void *entry() override;
+};
+
+
class OpHistory {
set<pair<utime_t, TrackedOpRef> > arrived;
set<pair<double, TrackedOpRef> > duration;
set<pair<utime_t, TrackedOpRef> > slow_op;
Mutex ops_history_lock;
+ std::atomic_bool shutdown;
void cleanup(utime_t now);
- bool shutdown;
uint32_t history_size;
uint32_t history_duration;
uint32_t history_slow_op_size;
uint32_t history_slow_op_threshold;
+ OpHistoryServiceThread opsvc;
+ friend class OpHistoryServiceThread;
public:
OpHistory() : ops_history_lock("OpHistory::Lock"), shutdown(false),
history_size(0), history_duration(0),
- history_slow_op_size(0), history_slow_op_threshold(0) {}
+ history_slow_op_size(0), history_slow_op_threshold(0),
+ opsvc(this) {
+ opsvc.create("OpHistorySvc");
+ }
~OpHistory() {
assert(arrived.empty());
assert(duration.empty());
assert(slow_op.empty());
}
- void insert(utime_t now, TrackedOpRef op);
+ void insert(const utime_t& now, TrackedOpRef op)
+ {
+ if (shutdown)
+ return;
+
+ opsvc.insert_op(now, op);
+ }
+
+ void _insert_delayed(const utime_t& now, TrackedOpRef op);
void dump_ops(utime_t now, Formatter *f, set<string> filters = {""});
void dump_ops_by_duration(utime_t now, Formatter *f, set<string> filters = {""});
void dump_slow_ops(utime_t now, Formatter *f, set<string> filters = {""});