From 95a90f7da8a32dd2a5a1e3b381aaeacea378437b Mon Sep 17 00:00:00 2001 From: Nitzan Mordechai Date: Sun, 8 Dec 2024 18:08:39 +0000 Subject: [PATCH] mgr: Add per-module performance counters to mgr This commit introduces performance counters for individual Ceph mgr modules. These counters allow monitoring module behavior, debugging latency issues, and identifying performance bottlenecks, all without modifying the modules themselves. The following counters are now exposed under: > ceph daemon mgr. perf dump Example structure: "mgr_module_": { "notify_avg_usec": { <- Average time spent handling notify events "avgcount": 0, "sum": 0 }, "cmd_avg_usec": { <- Average time spent processing CLI/admin commands "avgcount": 0, "sum": 0 }, "serve_avg_usec": { <- Average time spent in module serve loop (if applicable) "avgcount": 0, "sum": 0 }, "alive": 1 <- Module is alive (1 = running, 0 = exited) "cpu_usage": 0, <- CPU usage in percent "mem_rss_change": 0, <- Memory RSS change in bytes "mem_rss_current": 490737664 <- Memory RSS current in bytes } Signed-off-by: Nitzan Mordechai Conflicts: src/mgr/ActivePyModules.cc - finisher.queue changed by 63859, adding py_module to the parameter list src/mgr/PyModuleRegistry.cc - check_all_modules_started added by 63859 --- doc/mgr/administrator.rst | 24 +++- src/common/Finisher.cc | 4 + src/common/Finisher.h | 6 + src/common/options/mgr.yaml.in | 8 ++ src/mgr/ActivePyModule.cc | 21 ++- src/mgr/ActivePyModule.h | 5 +- src/mgr/ActivePyModules.cc | 32 +++-- src/mgr/ActivePyModules.h | 3 +- src/mgr/CMakeLists.txt | 1 + src/mgr/PyModule.cc | 19 +++ src/mgr/PyModule.h | 22 ++- src/mgr/PyModuleRegistry.cc | 4 +- src/mgr/PyModuleRegistry.h | 10 +- src/mgr/PyModuleRunner.cc | 13 +- src/mgr/PyModuleRunner.h | 14 +- src/mgr/ThreadMonitor.cc | 250 +++++++++++++++++++++++++++++++++ src/mgr/ThreadMonitor.h | 101 +++++++++++++ 17 files changed, 510 insertions(+), 27 deletions(-) create mode 100644 src/mgr/ThreadMonitor.cc create mode 100644 src/mgr/ThreadMonitor.h diff --git a/doc/mgr/administrator.rst b/doc/mgr/administrator.rst index 1477b74a9490..a6a7047577c5 100644 --- a/doc/mgr/administrator.rst +++ b/doc/mgr/administrator.rst @@ -108,7 +108,7 @@ daemon as failed using ``ceph mgr fail ``. Performance and Scalability --------------------------- -All the mgr modules share a cache that can be enabled with +All the Manager modules share a cache that can be enabled with ``ceph config set mgr mgr_ttl_cache_expire_seconds ``, where seconds is the time to live of the cached python objects. @@ -122,6 +122,28 @@ Furthermore, you can run ``ceph daemon mgr.${MGRNAME} perf dump`` to retrieve perf counters of a mgr module. In ``mgr.cache_hit`` and ``mgr.cache_miss`` you'll find the hit/miss ratio of the mgr cache. +The Manager includes a ThreadMonitor that tracks CPU usage and memory consumption +for each enabled module. This monitoring can be configured with +``ceph config set mgr mgr_module_monitor_interval ``, where ``seconds`` +is the monitoring interval. Setting this to 0 disables module monitoring. + +The ThreadMonitor provides per-module performance counters accessible via +``ceph daemon mgr.${MGRNAME} perf dump``, including: + +- ``notify_avg_usec``: Average time spent in notify calls (microseconds) +- ``cmd_avg_usec``: Average time spent in command calls (microseconds) +- ``alive``: Module health status (0=dead, 1=alive) +- ``cpu_usage``: CPU percentage for the main module thread +- ``serve_cpu_usage``: CPU percentage for the module's serve thread (if present) +- ``mem_rss_current``: Current process memory usage (RSS) in bytes +- ``mem_rss_change``: Memory usage change since last measurement in bytes + +These counters help identify resource-intensive modules and can be useful for +debugging performance issues or memory leaks. The ``notify_avg_usec`` and +``cmd_avg_usec`` counters track the performance of module operations, while +the CPU and memory counters monitor resource consumption. The default monitoring +interval is 2 seconds. + Automatic Stats Period Tuning ------------------------------ diff --git a/src/common/Finisher.cc b/src/common/Finisher.cc index b0c2b775fbfd..04ef97ad6f86 100644 --- a/src/common/Finisher.cc +++ b/src/common/Finisher.cc @@ -47,6 +47,8 @@ Finisher::~Finisher() { void Finisher::start() { ldout(cct, 10) << __func__ << dendl; + tid_promise = std::promise{}; + started_future = tid_promise.get_future(); finisher_thread.create(thread_name.c_str()); } @@ -86,6 +88,8 @@ void *Finisher::finisher_thread_entry() std::unique_lock ul(finisher_lock); ldout(cct, 10) << "finisher_thread start" << dendl; + finisher_tid.store(ceph_gettid()); + tid_promise.set_value(); utime_t start; uint64_t count = 0; while (!finisher_stop) { diff --git a/src/common/Finisher.h b/src/common/Finisher.h index d737f015cce7..41173cefa761 100644 --- a/src/common/Finisher.h +++ b/src/common/Finisher.h @@ -17,6 +17,7 @@ #define CEPH_FINISHER_H #include +#include #include #include #include @@ -49,6 +50,9 @@ class Finisher { bool finisher_stop = false; ///< Set when the finisher should stop. bool finisher_running = false; ///< True when the finisher is currently executing contexts. bool finisher_empty_wait = false; ///< True mean someone wait finisher empty. + std::atomic finisher_tid{0}; ///< TID of the finisher worker thread, set on startup + std::promise tid_promise; ///< Fulfilled by the worker thread once it has started. + std::future started_future; ///< Becomes ready when the finisher thread is running; use on_started() to observe. /// Queue for contexts for which complete(0) will be called. std::vector> finisher_queue; @@ -104,6 +108,7 @@ class Finisher { /// Start the worker thread. void start(); + std::future& on_started() { return started_future; } /** @brief Stop the worker thread. * @@ -119,6 +124,7 @@ class Finisher { void wait_for_empty(); bool is_empty(); + pid_t get_tid() const { return finisher_tid; } std::string_view get_thread_name() const noexcept { return thread_name; diff --git a/src/common/options/mgr.yaml.in b/src/common/options/mgr.yaml.in index 3f88de48d3b9..d1c587390d4d 100644 --- a/src/common/options/mgr.yaml.in +++ b/src/common/options/mgr.yaml.in @@ -438,6 +438,14 @@ options: default: 5 min: 0 max: 11 +- name: mgr_module_monitor_interval + type: int + level: advanced + desc: Period in seconds for collecting Manager modules cpu and memory performance counters. + long_desc: Period in seconds for Manager Monitor to collect the cpu and memory for + each enabled module. If set to 0, collection of these stats will be disabled. + default: 5 + min: 0 - name: mgr_tick_period type: secs level: advanced diff --git a/src/mgr/ActivePyModule.cc b/src/mgr/ActivePyModule.cc index 2c0df7a30116..a01233ac5dc7 100644 --- a/src/mgr/ActivePyModule.cc +++ b/src/mgr/ActivePyModule.cc @@ -66,6 +66,7 @@ void ActivePyModule::notify(const std::string ¬ify_type, const std::string &n Gil gil(py_module->pMyThreadState, true); + auto _start = ceph::mono_clock::now(); // Execute auto pValue = PyObject_CallMethod(pClassInstance, const_cast("notify"), const_cast("(ss)"), @@ -73,6 +74,11 @@ void ActivePyModule::notify(const std::string ¬ify_type, const std::string &n if (pValue != NULL) { Py_DECREF(pValue); + if (py_module->perfcounter) { + auto duration = ceph::mono_clock::now() - _start; + auto usec = std::chrono::duration_cast(duration).count(); + py_module->perfcounter->inc(py_module->l_pym_notify_avg_usec, usec); + } } else { derr << get_name() << ".notify:" << dendl; derr << handle_pyerror(true, get_name(), "ActivePyModule::notify") << dendl; @@ -98,7 +104,8 @@ void ActivePyModule::notify_clog(const LogEntry &log_entry) PyFormatter f; log_entry.dump(&f); auto py_log_entry = f.get(); - + + auto _start = ceph::mono_clock::now(); // Execute auto pValue = PyObject_CallMethod(pClassInstance, const_cast("notify"), const_cast("(sN)"), @@ -106,6 +113,11 @@ void ActivePyModule::notify_clog(const LogEntry &log_entry) if (pValue != NULL) { Py_DECREF(pValue); + if (py_module->perfcounter) { + auto duration = ceph::mono_clock::now() - _start; + auto usec = std::chrono::duration_cast(duration).count(); + py_module->perfcounter->inc(py_module->l_pym_notify_avg_usec, usec); + } } else { derr << get_name() << ".notify_clog:" << dendl; derr << handle_pyerror(true, get_name(), "ActivePyModule::notify_clog") << dendl; @@ -261,7 +273,7 @@ int ActivePyModule::handle_command( ceph_assert(m_session == nullptr); m_command_perms = module_command.perm; m_session = &session; - + auto _start = ceph::mono_clock::now(); auto pResult = PyObject_CallMethod(pClassInstance, const_cast("_handle_command"), const_cast("s#O"), instr.c_str(), instr.length(), py_cmd); @@ -272,6 +284,11 @@ int ActivePyModule::handle_command( int r = 0; if (pResult != NULL) { + if (py_module->perfcounter) { + auto duration = ceph::mono_clock::now() - _start; + auto usec = std::chrono::duration_cast(duration).count(); + py_module->perfcounter->inc(py_module->l_pym_cmd_avg_usec, usec); + } if (PyTuple_Size(pResult) != 3) { derr << "module '" << py_module->get_name() << "' command handler " "returned wrong type!" << dendl; diff --git a/src/mgr/ActivePyModule.h b/src/mgr/ActivePyModule.h index b58953387970..35fab34a1622 100644 --- a/src/mgr/ActivePyModule.h +++ b/src/mgr/ActivePyModule.h @@ -54,8 +54,9 @@ public: public: ActivePyModule(const PyModuleRef &py_module_, - LogChannelRef clog_) - : PyModuleRunner(py_module_, clog_), + LogChannelRef clog_, + ThreadMonitor* monitor_ = nullptr) + : PyModuleRunner(py_module_, clog_, monitor_), finisher(g_ceph_context, thread_name, fmt::format("m-fin-{}", py_module->get_name()).substr(0,15)) { diff --git a/src/mgr/ActivePyModules.cc b/src/mgr/ActivePyModules.cc index 0f5d07638dbe..93ad567def4f 100644 --- a/src/mgr/ActivePyModules.cc +++ b/src/mgr/ActivePyModules.cc @@ -60,10 +60,11 @@ ActivePyModules::ActivePyModules( MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, Finisher &f, DaemonServer &server, - PyModuleRegistry &pmr) + PyModuleRegistry &pmr, ThreadMonitor *monitor_) : module_config(module_config_), daemon_state(ds), cluster_state(cs), monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_), finisher(f), + m_thread_monitor(monitor_), cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"), server(server), py_module_registry(pmr) { @@ -75,7 +76,18 @@ ActivePyModules::ActivePyModules( cmd_finisher.start(); } -ActivePyModules::~ActivePyModules() = default; +ActivePyModules::~ActivePyModules() +{ + dout(10) << "ActivePyModules destructor called" << dendl; + + // Stop the thread monitor if it was started + if (m_thread_monitor) { + m_thread_monitor->stop_monitoring(); + } + + // Stop the finisher thread + cmd_finisher.stop(); +} void ActivePyModules::dump_server(const std::string &hostname, const DaemonStateCollection &dmc, @@ -535,12 +547,12 @@ void ActivePyModules::start_one(PyModuleRef py_module) std::lock_guard l(lock); const auto name = py_module->get_name(); - auto active_module = std::make_shared(py_module, clog); + auto active_module = std::make_shared(py_module, clog, m_thread_monitor); pending_modules.insert(name); // Send all python calls down a Finisher to avoid blocking // C++ code, and avoid any potential lock cycles. - finisher.queue(new LambdaContext([this, active_module, name](int) { + finisher.queue(new LambdaContext([this, active_module, name, py_module](int) { // Delay loading in testing scenarios auto delay = g_conf().get_val("mgr_module_load_delay"); std::string delayed_module = g_conf().get_val("mgr_module_load_delay_name"); @@ -558,12 +570,16 @@ void ActivePyModules::start_one(PyModuleRef py_module) } else { auto em = modules.emplace(name, active_module); ceph_assert(em.second); // actually inserted - - dout(4) << "Starting thread for " << name << dendl; active_module->thread.create(active_module->get_thread_name()); - dout(4) << "Starting active module " << name <<" finisher thread " - << active_module->get_fin_thread_name() << dendl; + py_module->perf_counter_build(g_ceph_context); active_module->finisher.start(); + active_module->finisher.on_started().wait(); + active_module->set_native_tid(active_module->finisher.get_tid()); + if (m_thread_monitor) { + m_thread_monitor->register_thread(active_module->get_native_tid(), + active_module->thread.get_tid(), + name, py_module); + } } // Signal when we're finally done starting up modules diff --git a/src/mgr/ActivePyModules.h b/src/mgr/ActivePyModules.h index e62bb625c9d8..151be14926c4 100644 --- a/src/mgr/ActivePyModules.h +++ b/src/mgr/ActivePyModules.h @@ -67,6 +67,7 @@ class ActivePyModules Objecter &objecter; Finisher &finisher; TTLCache ttl_cache; + ThreadMonitor* m_thread_monitor = nullptr; public: Finisher cmd_finisher; private: @@ -84,7 +85,7 @@ public: bool mon_provides_kv_sub, DaemonStateIndex &ds, ClusterState &cs, MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, - Finisher &f, DaemonServer &server, PyModuleRegistry &pmr); + Finisher &f, DaemonServer &server, PyModuleRegistry &pmr, ThreadMonitor *monitor = nullptr); ~ActivePyModules(); diff --git a/src/mgr/CMakeLists.txt b/src/mgr/CMakeLists.txt index abf42bd4efd3..61d02cb884c0 100644 --- a/src/mgr/CMakeLists.txt +++ b/src/mgr/CMakeLists.txt @@ -34,6 +34,7 @@ if(WITH_MGR) PyModuleRunner.cc PyOSDMap.cc StandbyPyModules.cc + ThreadMonitor.cc mgr_commands.cc MgrOpRequest.cc ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc diff --git a/src/mgr/PyModule.cc b/src/mgr/PyModule.cc index b974a60b265f..46dc97073b89 100644 --- a/src/mgr/PyModule.cc +++ b/src/mgr/PyModule.cc @@ -287,6 +287,10 @@ std::pair PyModuleConfig::set_config( } } + PyModule::PyModule(const std::string &module_name_) + : module_name(module_name_) + { } + PyObject* PyModule::init_ceph_logger() { auto py_logger = PyModule_Create(&ceph_logger_module); @@ -792,3 +796,18 @@ PyModule::~PyModule() } } +int PyModule::perf_counter_build(CephContext *cct) { + ceph_assert(perfcounter == nullptr); + PerfCountersBuilder pcb(cct, "mgr_module_" + get_name(), l_pym_first, l_pym_last); + pcb.add_u64_avg(l_pym_notify_avg_usec, "notify_avg_usec", "Average time spent in notify calls", "nsec", 0); + pcb.add_u64_avg(l_pym_cmd_avg_usec, "cmd_avg_usec", "Average time spent in command calls", "csec", 0); + pcb.add_u64(l_pym_alive, "alive", "Is the module alive?", "aliv", 0, uint64_t(1)); + pcb.add_u64(l_pym_cpu_usage, "cpu_usage", "CPU usage in percent", "cpu", 0, uint64_t(100)); + pcb.add_u64(l_pym_mem_rss_change, "mem_rss_change", "Memory RSS change in bytes", "", 0); + pcb.add_u64(l_pym_mem_rss_current, "mem_rss_current", "Memory RSS current in bytes", "", 0); + pcb.add_u64(l_pym_serve_cpu_usage, "serve_cpu_usage", "Serve thread CPU usage in percent", "cpu", 0, uint64_t(100)); + perfcounter = std::unique_ptr(pcb.create_perf_counters()); + cct->get_perfcounters_collection()->add(perfcounter.get()); + + return 0; +} \ No newline at end of file diff --git a/src/mgr/PyModule.h b/src/mgr/PyModule.h index 01cca1d8e27e..77212dd94b7d 100644 --- a/src/mgr/PyModule.h +++ b/src/mgr/PyModule.h @@ -19,7 +19,9 @@ #include #include #include +#include "common/ceph_context.h" #include "common/ceph_mutex.h" +#include "common/perf_counters.h" #include "Python.h" #include "Gil.h" #include "mon/MgrMap.h" @@ -98,6 +100,7 @@ private: std::set notify_types; public: + std::unique_ptr perfcounter; static std::string mgr_store_prefix; SafeThreadState pMyThreadState; @@ -108,10 +111,7 @@ public: // true unless module in mgr_subinterpreter_modules bool use_main_interpreter = true; - explicit PyModule(const std::string &module_name_) - : module_name(module_name_) - { - } + explicit PyModule(const std::string &module_name_); ~PyModule(); @@ -181,6 +181,20 @@ public: bool get_can_run() const { std::lock_guard l(lock) ; return can_run; } + + enum PerfModuleCounters { + l_pym_first = 10000, + l_pym_notify_avg_usec, + l_pym_cmd_avg_usec, + l_pym_alive, + l_pym_cpu_usage, + l_pym_mem_rss_change, + l_pym_mem_rss_current, + l_pym_serve_cpu_usage, + l_pym_last + }; + + int perf_counter_build(CephContext *cct); }; typedef std::shared_ptr PyModuleRef; diff --git a/src/mgr/PyModuleRegistry.cc b/src/mgr/PyModuleRegistry.cc index 6a8a59dcad12..67b14757746f 100644 --- a/src/mgr/PyModuleRegistry.cc +++ b/src/mgr/PyModuleRegistry.cc @@ -99,6 +99,7 @@ void PyModuleRegistry::init() ceph_assert(pMainThreadState != nullptr); std::list failed_modules; + thread_monitor->start_monitoring(); const std::string module_path = g_conf().get_val("mgr_module_path"); auto module_names = probe_modules(module_path); @@ -118,7 +119,6 @@ void PyModuleRegistry::init() failed_modules.push_back(module_name); // Don't drop out here, load the other modules } - // Record the module even if the load failed, so that we can // report its loading error modules[module_name] = std::move(mod); @@ -236,7 +236,7 @@ void PyModuleRegistry::active_start( kv_store, mon_provides_kv_sub, ds, cs, mc, clog_, audit_clog_, objecter_, f, server, - *this)); + *this, thread_monitor.get())); for (const auto &i : modules) { // Anything we're skipping because of !can_run will be flagged diff --git a/src/mgr/PyModuleRegistry.h b/src/mgr/PyModuleRegistry.h index 069af5154b91..d79183035a84 100644 --- a/src/mgr/PyModuleRegistry.h +++ b/src/mgr/PyModuleRegistry.h @@ -50,6 +50,7 @@ private: std::unique_ptr active_modules; std::unique_ptr standby_modules; + std::unique_ptr thread_monitor; PyThreadState *pMainThreadState; @@ -64,6 +65,7 @@ private: std::vector probe_modules(const std::string &path) const; PyModuleConfig module_config; + PyObject* process_obj = nullptr; public: void handle_config(const std::string &k, const std::string &v); @@ -93,9 +95,13 @@ public: } explicit PyModuleRegistry(LogChannelRef clog_) - : clog(clog_) - {} + : clog(clog_), + thread_monitor(std::make_unique(g_ceph_context)) + { } + ~PyModuleRegistry() { + thread_monitor->stop_monitoring(); + } /** * @return true if the mgrmap has changed such that the service needs restart */ diff --git a/src/mgr/PyModuleRunner.cc b/src/mgr/PyModuleRunner.cc index 36f8bc980443..e7469995110a 100644 --- a/src/mgr/PyModuleRunner.cc +++ b/src/mgr/PyModuleRunner.cc @@ -44,7 +44,9 @@ int PyModuleRunner::serve() // This method is called from a separate OS thread (i.e. a thread not // created by Python), so tell Gil to wrap this in a new thread state. Gil gil(py_module->pMyThreadState, true); - + if (py_module->perfcounter) { + py_module->perfcounter->set(py_module->l_pym_alive, 1); + } auto pValue = PyObject_CallMethod(pClassInstance, const_cast("serve"), nullptr); @@ -71,6 +73,10 @@ int PyModuleRunner::serve() return -EINVAL; } + if (py_module->perfcounter) { + py_module->perfcounter->set(py_module->l_pym_alive, 0); + } + return r; } @@ -89,7 +95,9 @@ void PyModuleRunner::shutdown() derr << "Failed to invoke shutdown() on " << get_name() << dendl; derr << handle_pyerror(true, get_name(), "PyModuleRunner::shutdown") << dendl; } - + if (py_module->perfcounter) { + py_module->perfcounter->set(py_module->l_pym_alive, 0); + } dead = true; } @@ -106,6 +114,7 @@ void* PyModuleRunner::PyModuleRunnerThread::entry() { // No need to acquire the GIL here; the module does it. dout(4) << "Entering thread for " << mod->get_name() << dendl; + runner_tid.store(ceph_gettid(), std::memory_order_release); mod->serve(); return nullptr; } diff --git a/src/mgr/PyModuleRunner.h b/src/mgr/PyModuleRunner.h index 187002745e13..3b9596ea9e31 100644 --- a/src/mgr/PyModuleRunner.h +++ b/src/mgr/PyModuleRunner.h @@ -20,6 +20,8 @@ #include "mgr/Gil.h" #include "PyModule.h" +#include "mgr/ThreadMonitor.h" +#include /** * Implement the pattern of calling serve() on a module in a thread, @@ -36,16 +38,18 @@ protected: PyObject *pClassInstance = nullptr; LogChannelRef clog; - + pid_t m_native_tid = 0; + ThreadMonitor* m_thread_monitor = nullptr; class PyModuleRunnerThread : public Thread { PyModuleRunner *mod; - + std::atomic runner_tid{0}; public: explicit PyModuleRunnerThread(PyModuleRunner *mod_) : mod(mod_) {} void *entry() override; + pid_t get_tid() const { return runner_tid.load(); } }; bool is_dead() const { return dead; } @@ -64,10 +68,12 @@ public: PyModuleRunner( const PyModuleRef &py_module_, - LogChannelRef clog_) + LogChannelRef clog_, + ThreadMonitor* monitor_ = nullptr) : py_module(py_module_), clog(clog_), + m_thread_monitor(monitor_), thread(this) { // Shortened name for use as thread name, because thread names @@ -82,6 +88,8 @@ public: PyModuleRunnerThread thread; std::string const &get_name() const { return py_module->get_name(); } + void set_native_tid(pid_t tid) { m_native_tid = tid; } + pid_t get_native_tid() const { return m_native_tid; } private: bool dead = false; diff --git a/src/mgr/ThreadMonitor.cc b/src/mgr/ThreadMonitor.cc new file mode 100644 index 000000000000..d26a747ed6ec --- /dev/null +++ b/src/mgr/ThreadMonitor.cc @@ -0,0 +1,250 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab + +/* + * Ceph - scalable distributed file system + * + * Copyright 2026 IBM Corporation + * + * This is free software; you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License version 2.1, as published by + * the Free Software Foundation. See file COPYING. + */ + +#include "ThreadMonitor.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "mgr[ThreadMonitor] " << __func__ << " " + +void ThreadMonitor::start_monitoring() { + if (running.exchange(true)) { + return; + } + + dout(20) << "Starting monitoring thread." << dendl; + monitor_thread = std::make_unique(&ThreadMonitor::monitoring_loop, this); +} + +void ThreadMonitor::stop_monitoring() { + if (!running.exchange(false)) { + return; + } + + if (monitor_thread && monitor_thread->joinable()) { + monitor_thread->join(); + dout(20) << "Monitoring thread stopped." << dendl; + } +} + +void ThreadMonitor::register_thread(const pid_t thread_id, + const pid_t serve_thread_id, + const std::string& thread_name, + const PyModuleRef py_module_) { + std::lock_guard lock(monitored_threads_mutex); + + if (monitored_threads.count(thread_id)) { + dout(20) << "Attempted to register already known thread (TID: " << thread_id << ")." << dendl; + return; + } + + MonitoredThreadInfo info; + info.name = thread_name; + info.py_module = py_module_; + info.serve_thread_id = serve_thread_id; + info.last_snapshot.timestamp = ceph::mono_clock::now(); + info.last_serve_snapshot.timestamp = ceph::mono_clock::now(); + monitored_threads[thread_id] = info; + dout(0) << "Registered thread: '" << thread_name << "' (TID: " << thread_id << ") module: " + << (py_module_ ? py_module_->get_name() : "unknown") << dendl; +} + +void ThreadMonitor::handle_conf_change( + const ConfigProxy& conf, + const std::set &changed) { + if (changed.count("mgr_module_monitor_interval")) { + int interval = m_cct->_conf.get_val("mgr_module_monitor_interval"); + monitoring_interval = std::chrono::seconds(interval); + dout(20) << "Updated monitoring interval to " << interval << " seconds." << dendl; + if (interval == 0) { + stop_monitoring(); + } else if (!running) { + start_monitoring(); + } + } +} + +void ThreadMonitor::monitoring_loop() { + if (m_clock_ticks_per_sec <= 0 || m_page_size <= 0) { + derr << "Failed to retrieve system configuration " + << "(clock ticks per second or page size). Monitoring will not start." << dendl; + running = false; + return; + } + + while (running) { + // Phase 1: under lock — update RSS counters and copy all state needed for CPU computation. + long long process_rss_pages = 0; + if (!read_process_statm(process_rss_pages)) { + derr << "Failed to read process memory info from /proc/self/statm." << dendl; + continue; + } + + std::vector entries; + { + std::lock_guard lock(monitored_threads_mutex); + for (auto& [tid, info] : monitored_threads) { + if (info.py_module && info.py_module->perfcounter) { + long long rss_bytes = process_rss_pages * m_page_size; + long long rss_change = rss_bytes - info.last_snapshot.rss_pages * m_page_size; + info.py_module->perfcounter->set(info.py_module->l_pym_mem_rss_current, rss_bytes); + info.py_module->perfcounter->set(info.py_module->l_pym_mem_rss_change, rss_change); + info.last_snapshot.rss_pages = process_rss_pages; + dout(20) << "Module '" << info.name << "' (TID: " << tid << "): " + << "Memory RSS: " << rss_bytes << " bytes" + << ", Change: " << rss_change << " bytes" << dendl; + } + entries.push_back({tid, info.serve_thread_id, info.name, + info.py_module, info.last_snapshot, info.last_serve_snapshot}); + } + } + + // Phase 2: no lock — do slow /proc reads and CPU calculations using copied snapshots. + std::vector results; + results.reserve(entries.size()); + for (const auto& e : entries) { + results.push_back(process_thread_stats(e)); + } + + // Phase 3: under lock — write results back; remove dead threads. + { + std::lock_guard lock(monitored_threads_mutex); + for (const auto& r : results) { + auto it = monitored_threads.find(r.tid); + if (it == monitored_threads.end()) { + continue; // deregistered between phase 1 and 3 + } + MonitoredThreadInfo& info = it->second; + if (!r.main_ok) { + dout(0) << "Removing dead thread '" << info.name << "' (TID: " << r.tid << ")" << dendl; + monitored_threads.erase(it); + continue; + } + info.last_snapshot.utime = r.new_utime; + info.last_snapshot.stime = r.new_stime; + info.last_snapshot.timestamp = r.new_ts; + if (info.py_module && info.py_module->perfcounter) { + info.py_module->perfcounter->set(info.py_module->l_pym_cpu_usage, + static_cast(r.cpu_pct)); + } + if (info.serve_thread_id) { + if (!r.serve_ok) { + info.serve_thread_id = 0; + } else { + info.last_serve_snapshot.utime = r.new_serve_utime; + info.last_serve_snapshot.stime = r.new_serve_stime; + info.last_serve_snapshot.timestamp = r.new_serve_ts; + if (info.py_module && info.py_module->perfcounter) { + info.py_module->perfcounter->set(info.py_module->l_pym_serve_cpu_usage, + static_cast(r.serve_cpu_pct)); + } + } + } + } + } + + std::this_thread::sleep_for(monitoring_interval); + } +} + +ThreadMonitor::ThreadResult ThreadMonitor::process_thread_stats(const ThreadEntry& e) { + ThreadResult r; + r.tid = e.tid; + long long utime, stime; + r.new_ts = ceph::mono_clock::now(); + r.main_ok = read_thread_stat(e.tid, utime, stime); + if (!r.main_ok) { + dout(20) << "Thread '" << e.name << "' (TID: " << e.tid << ") may have exited." << dendl; + return r; + } + double elapsed = std::chrono::duration_cast>( + r.new_ts - e.last_snapshot.timestamp).count(); + r.cpu_pct = calculate_cpu_percentage(utime - e.last_snapshot.utime, + stime - e.last_snapshot.stime, elapsed); + r.new_utime = utime; + r.new_stime = stime; + dout(20) << "Module '" << e.name << "' (TID: " << e.tid << "): " + << "CPU: " << std::fixed << std::setprecision(2) << r.cpu_pct << "%" << dendl; + + if (!e.serve_tid) { + return r; + } + r.new_serve_ts = ceph::mono_clock::now(); + r.serve_ok = read_thread_stat(e.serve_tid, utime, stime); + if (!r.serve_ok) { + dout(20) << "Serve thread (TID: " << e.serve_tid << ") may have exited." << dendl; + return r; + } + double serve_elapsed = std::chrono::duration_cast>( + r.new_serve_ts - e.last_serve_snapshot.timestamp).count(); + r.serve_cpu_pct = calculate_cpu_percentage(utime - e.last_serve_snapshot.utime, + stime - e.last_serve_snapshot.stime, serve_elapsed); + r.new_serve_utime = utime; + r.new_serve_stime = stime; + dout(20) << "Serve thread (TID: " << e.serve_tid << "): " + << "CPU: " << std::fixed << std::setprecision(2) << r.serve_cpu_pct << "%" << dendl; + return r; +} + +bool ThreadMonitor::read_thread_stat(pid_t tid, long long& utime, long long& stime) { + std::string stat_path = "/proc/self/task/" + std::to_string(tid) + "/stat"; + std::ifstream stat_file(stat_path); + if (!stat_file.is_open()) { + dout(20) << __func__ << "Could not open " << stat_path << dendl; + return false; + } + std::string line; + std::getline(stat_file, line); + size_t start = line.find('('); + size_t end = line.rfind(')'); + if (start == std::string::npos || end == std::string::npos) { + dout(20) << __func__ << "Malformed stat file for TID " << tid << dendl; + return false; + } + + std::string remainder = line.substr(end + 2); // +2 to skip ") " + std::stringstream ss(remainder); + std::string val; + + // skip 11 fields before utime/stime: + // state ppid pgrp session tty_nr tpgid flags minflt cminflt majflt cmajflt + for (int i = 0; i < 11; ++i) { + ss >> val; + } + ss >> utime >> stime; + return true; +} + +bool ThreadMonitor::read_process_statm(long long& rss_pages) { + std::string statm_path = "/proc/self/statm"; + std::ifstream statm_file(statm_path); + + if (!statm_file.is_open()) { + dout(20) << __func__ << "Could not open " << statm_path << dendl; + return false; + } + + long long vsize_pages; + statm_file >> vsize_pages >> rss_pages; + return true; +} + +double ThreadMonitor::calculate_cpu_percentage(long long utime_diff, long long stime_diff, + double elapsed_seconds) { + if (elapsed_seconds <= 0) { + return 0.0; + } + long long total_jiffies = utime_diff + stime_diff; + return (static_cast(total_jiffies) / (m_clock_ticks_per_sec * elapsed_seconds)) * 100.0; +} \ No newline at end of file diff --git a/src/mgr/ThreadMonitor.h b/src/mgr/ThreadMonitor.h new file mode 100644 index 000000000000..60a839ae9655 --- /dev/null +++ b/src/mgr/ThreadMonitor.h @@ -0,0 +1,101 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab + +/* + * Ceph - scalable distributed file system + * + * Copyright 2026 IBM Corporation + * + * This is free software; you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License version 2.1, as published by + * the Free Software Foundation. See file COPYING. + */ + +#pragma once +#include +#include "PyModule.h" +#include "common/debug.h" + +class ThreadMonitor : public md_config_obs_t { +public: + ThreadMonitor(CephContext *cct) + : m_cct(cct), + running(false), + monitoring_interval(m_cct->_conf.get_val("mgr_module_monitor_interval")) + { + m_cct->_conf.add_observer(this); + m_clock_ticks_per_sec = sysconf(_SC_CLK_TCK); + m_page_size = sysconf(_SC_PAGESIZE); + } + + ~ThreadMonitor() { + m_cct->_conf.remove_observer(this); + stop_monitoring(); + } + + void start_monitoring(); + void stop_monitoring(); + void register_thread(const pid_t thread_id, const pid_t serve_thread_id, const std::string& name, const PyModuleRef py_module); + +protected: + std::vector get_tracked_keys() const noexcept override { + return std::vector{"mgr_module_monitor_interval"}; + } + void handle_conf_change(const ConfigProxy& conf, + const std::set &changed) override; +private: + CephContext *m_cct; + struct ThreadSnapshot { + long long utime = 0; + long long stime = 0; + long long rss_pages = 0; + ceph::mono_clock::time_point timestamp; + }; + + // Information about a thread that is currently being monitored + struct MonitoredThreadInfo { + std::string name; + PyModuleRef py_module; + ThreadSnapshot last_snapshot; + pid_t serve_thread_id = 0; // TID of the thread runner + ThreadSnapshot last_serve_snapshot; // Last snapshot of the thread runner + }; + + struct ThreadEntry { + pid_t tid; + pid_t serve_tid; + std::string name; + PyModuleRef py_module; + ThreadSnapshot last_snapshot; + ThreadSnapshot last_serve_snapshot; + }; + + struct ThreadResult { + pid_t tid; + bool main_ok = false; + bool serve_ok = false; + long long new_utime = 0, new_stime = 0; + ceph::mono_clock::time_point new_ts; + double cpu_pct = 0; + long long new_serve_utime = 0, new_serve_stime = 0; + ceph::mono_clock::time_point new_serve_ts; + double serve_cpu_pct = 0; + }; + + std::map monitored_threads; + + std::mutex monitored_threads_mutex; + std::atomic running; + std::unique_ptr monitor_thread; + ceph::mono_clock::duration monitoring_interval = std::chrono::seconds(2); + long m_clock_ticks_per_sec = 0; + long m_page_size = 0; + + void monitoring_loop(); + bool read_thread_stat(pid_t tid, long long& utime, long long& stime); + bool read_process_statm(long long& rss_pages); + long get_clock_ticks_per_sec() const; + long get_page_size() const; + double calculate_cpu_percentage(long long utime_diff, long long stime_diff, double elapsed_seconds); + ThreadResult process_thread_stats(const ThreadEntry& e); +}; \ No newline at end of file -- 2.47.3