Performance and Scalability
---------------------------
-All the mgr modules share a cache that can be enabled with
+All the Manager modules share a cache that can be enabled with
``ceph config set mgr mgr_ttl_cache_expire_seconds <seconds>``, where seconds
is the time to live of the cached python objects.
perf counters of a mgr module. In ``mgr.cache_hit`` and ``mgr.cache_miss``
you'll find the hit/miss ratio of the mgr cache.
+The Manager includes a ThreadMonitor that tracks CPU usage and memory consumption
+for each enabled module. This monitoring can be configured with
+``ceph config set mgr mgr_module_monitor_interval <seconds>``, where ``seconds``
+is the monitoring interval. Setting this to 0 disables module monitoring.
+
+The ThreadMonitor provides per-module performance counters accessible via
+``ceph daemon mgr.${MGRNAME} perf dump``, including:
+
+- ``notify_avg_usec``: Average time spent in notify calls (microseconds)
+- ``cmd_avg_usec``: Average time spent in command calls (microseconds)
+- ``alive``: Module health status (0=dead, 1=alive)
+- ``cpu_usage``: CPU percentage for the main module thread
+- ``serve_cpu_usage``: CPU percentage for the module's serve thread (if present)
+- ``mem_rss_current``: Current process memory usage (RSS) in bytes
+- ``mem_rss_change``: Memory usage change since last measurement in bytes
+
+These counters help identify resource-intensive modules and can be useful for
+debugging performance issues or memory leaks. The ``notify_avg_usec`` and
+``cmd_avg_usec`` counters track the performance of module operations, while
+the CPU and memory counters monitor resource consumption. The default monitoring
+interval is 2 seconds.
+
Automatic Stats Period Tuning
------------------------------
void Finisher::start()
{
ldout(cct, 10) << __func__ << dendl;
+ tid_promise = std::promise<void>{};
+ started_future = tid_promise.get_future();
finisher_thread.create(thread_name.c_str());
}
std::unique_lock ul(finisher_lock);
ldout(cct, 10) << "finisher_thread start" << dendl;
+ finisher_tid.store(ceph_gettid());
+ tid_promise.set_value();
utime_t start;
uint64_t count = 0;
while (!finisher_stop) {
#define CEPH_FINISHER_H
#include <atomic>
+#include <future>
#include <list>
#include <mutex>
#include <string>
bool finisher_stop = false; ///< Set when the finisher should stop.
bool finisher_running = false; ///< True when the finisher is currently executing contexts.
bool finisher_empty_wait = false; ///< True mean someone wait finisher empty.
+ std::atomic<pid_t> finisher_tid{0}; ///< TID of the finisher worker thread, set on startup
+ std::promise<void> tid_promise; ///< Fulfilled by the worker thread once it has started.
+ std::future<void> started_future; ///< Becomes ready when the finisher thread is running; use on_started() to observe.
/// Queue for contexts for which complete(0) will be called.
std::vector<std::pair<Context*,int>> finisher_queue;
/// Start the worker thread.
void start();
+ std::future<void>& on_started() { return started_future; }
/** @brief Stop the worker thread.
*
void wait_for_empty();
bool is_empty();
+ pid_t get_tid() const { return finisher_tid; }
std::string_view get_thread_name() const noexcept {
return thread_name;
default: 5
min: 0
max: 11
+- name: mgr_module_monitor_interval
+ type: int
+ level: advanced
+ desc: Period in seconds for collecting Manager modules cpu and memory performance counters.
+ long_desc: Period in seconds for Manager Monitor to collect the cpu and memory for
+ each enabled module. If set to 0, collection of these stats will be disabled.
+ default: 5
+ min: 0
- name: mgr_tick_period
type: secs
level: advanced
Gil gil(py_module->pMyThreadState, true);
+ auto _start = ceph::mono_clock::now();
// Execute
auto pValue = PyObject_CallMethod(pClassInstance,
const_cast<char*>("notify"), const_cast<char*>("(ss)"),
if (pValue != NULL) {
Py_DECREF(pValue);
+ if (py_module->perfcounter) {
+ auto duration = ceph::mono_clock::now() - _start;
+ auto usec = std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
+ py_module->perfcounter->inc(py_module->l_pym_notify_avg_usec, usec);
+ }
} else {
derr << get_name() << ".notify:" << dendl;
derr << handle_pyerror(true, get_name(), "ActivePyModule::notify") << dendl;
PyFormatter f;
log_entry.dump(&f);
auto py_log_entry = f.get();
-
+
+ auto _start = ceph::mono_clock::now();
// Execute
auto pValue = PyObject_CallMethod(pClassInstance,
const_cast<char*>("notify"), const_cast<char*>("(sN)"),
if (pValue != NULL) {
Py_DECREF(pValue);
+ if (py_module->perfcounter) {
+ auto duration = ceph::mono_clock::now() - _start;
+ auto usec = std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
+ py_module->perfcounter->inc(py_module->l_pym_notify_avg_usec, usec);
+ }
} else {
derr << get_name() << ".notify_clog:" << dendl;
derr << handle_pyerror(true, get_name(), "ActivePyModule::notify_clog") << dendl;
ceph_assert(m_session == nullptr);
m_command_perms = module_command.perm;
m_session = &session;
-
+ auto _start = ceph::mono_clock::now();
auto pResult = PyObject_CallMethod(pClassInstance,
const_cast<char*>("_handle_command"), const_cast<char*>("s#O"),
instr.c_str(), instr.length(), py_cmd);
int r = 0;
if (pResult != NULL) {
+ if (py_module->perfcounter) {
+ auto duration = ceph::mono_clock::now() - _start;
+ auto usec = std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
+ py_module->perfcounter->inc(py_module->l_pym_cmd_avg_usec, usec);
+ }
if (PyTuple_Size(pResult) != 3) {
derr << "module '" << py_module->get_name() << "' command handler "
"returned wrong type!" << dendl;
public:
ActivePyModule(const PyModuleRef &py_module_,
- LogChannelRef clog_)
- : PyModuleRunner(py_module_, clog_),
+ LogChannelRef clog_,
+ ThreadMonitor* monitor_ = nullptr)
+ : PyModuleRunner(py_module_, clog_, monitor_),
finisher(g_ceph_context, thread_name, fmt::format("m-fin-{}", py_module->get_name()).substr(0,15))
{
MonClient &mc, LogChannelRef clog_,
LogChannelRef audit_clog_, Objecter &objecter_,
Finisher &f, DaemonServer &server,
- PyModuleRegistry &pmr)
+ PyModuleRegistry &pmr, ThreadMonitor *monitor_)
: module_config(module_config_), daemon_state(ds), cluster_state(cs),
monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
finisher(f),
+ m_thread_monitor(monitor_),
cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"),
server(server), py_module_registry(pmr)
{
cmd_finisher.start();
}
-ActivePyModules::~ActivePyModules() = default;
+ActivePyModules::~ActivePyModules()
+{
+ dout(10) << "ActivePyModules destructor called" << dendl;
+
+ // Stop the thread monitor if it was started
+ if (m_thread_monitor) {
+ m_thread_monitor->stop_monitoring();
+ }
+
+ // Stop the finisher thread
+ cmd_finisher.stop();
+}
void ActivePyModules::dump_server(const std::string &hostname,
const DaemonStateCollection &dmc,
std::lock_guard l(lock);
const auto name = py_module->get_name();
- auto active_module = std::make_shared<ActivePyModule>(py_module, clog);
+ auto active_module = std::make_shared<ActivePyModule>(py_module, clog, m_thread_monitor);
pending_modules.insert(name);
// Send all python calls down a Finisher to avoid blocking
// C++ code, and avoid any potential lock cycles.
- finisher.queue(new LambdaContext([this, active_module, name](int) {
+ finisher.queue(new LambdaContext([this, active_module, name, py_module](int) {
// Delay loading in testing scenarios
auto delay = g_conf().get_val<std::chrono::milliseconds>("mgr_module_load_delay");
std::string delayed_module = g_conf().get_val<std::string>("mgr_module_load_delay_name");
} else {
auto em = modules.emplace(name, active_module);
ceph_assert(em.second); // actually inserted
-
- dout(4) << "Starting thread for " << name << dendl;
active_module->thread.create(active_module->get_thread_name());
- dout(4) << "Starting active module " << name <<" finisher thread "
- << active_module->get_fin_thread_name() << dendl;
+ py_module->perf_counter_build(g_ceph_context);
active_module->finisher.start();
+ active_module->finisher.on_started().wait();
+ active_module->set_native_tid(active_module->finisher.get_tid());
+ if (m_thread_monitor) {
+ m_thread_monitor->register_thread(active_module->get_native_tid(),
+ active_module->thread.get_tid(),
+ name, py_module);
+ }
}
// Signal when we're finally done starting up modules
Objecter &objecter;
Finisher &finisher;
TTLCache<std::string, PyObject*> ttl_cache;
+ ThreadMonitor* m_thread_monitor = nullptr;
public:
Finisher cmd_finisher;
private:
bool mon_provides_kv_sub,
DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_,
- Finisher &f, DaemonServer &server, PyModuleRegistry &pmr);
+ Finisher &f, DaemonServer &server, PyModuleRegistry &pmr, ThreadMonitor *monitor = nullptr);
~ActivePyModules();
PyModuleRunner.cc
PyOSDMap.cc
StandbyPyModules.cc
+ ThreadMonitor.cc
mgr_commands.cc
MgrOpRequest.cc
${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
}
}
+ PyModule::PyModule(const std::string &module_name_)
+ : module_name(module_name_)
+ { }
+
PyObject* PyModule::init_ceph_logger()
{
auto py_logger = PyModule_Create(&ceph_logger_module);
}
}
+int PyModule::perf_counter_build(CephContext *cct) {
+ ceph_assert(perfcounter == nullptr);
+ PerfCountersBuilder pcb(cct, "mgr_module_" + get_name(), l_pym_first, l_pym_last);
+ pcb.add_u64_avg(l_pym_notify_avg_usec, "notify_avg_usec", "Average time spent in notify calls", "nsec", 0);
+ pcb.add_u64_avg(l_pym_cmd_avg_usec, "cmd_avg_usec", "Average time spent in command calls", "csec", 0);
+ pcb.add_u64(l_pym_alive, "alive", "Is the module alive?", "aliv", 0, uint64_t(1));
+ pcb.add_u64(l_pym_cpu_usage, "cpu_usage", "CPU usage in percent", "cpu", 0, uint64_t(100));
+ pcb.add_u64(l_pym_mem_rss_change, "mem_rss_change", "Memory RSS change in bytes", "", 0);
+ pcb.add_u64(l_pym_mem_rss_current, "mem_rss_current", "Memory RSS current in bytes", "", 0);
+ pcb.add_u64(l_pym_serve_cpu_usage, "serve_cpu_usage", "Serve thread CPU usage in percent", "cpu", 0, uint64_t(100));
+ perfcounter = std::unique_ptr<PerfCounters>(pcb.create_perf_counters());
+ cct->get_perfcounters_collection()->add(perfcounter.get());
+
+ return 0;
+}
\ No newline at end of file
#include <string>
#include <vector>
#include <boost/optional.hpp>
+#include "common/ceph_context.h"
#include "common/ceph_mutex.h"
+#include "common/perf_counters.h"
#include "Python.h"
#include "Gil.h"
#include "mon/MgrMap.h"
std::set<std::string> notify_types;
public:
+ std::unique_ptr<PerfCounters> perfcounter;
static std::string mgr_store_prefix;
SafeThreadState pMyThreadState;
// true unless module in mgr_subinterpreter_modules
bool use_main_interpreter = true;
- explicit PyModule(const std::string &module_name_)
- : module_name(module_name_)
- {
- }
+ explicit PyModule(const std::string &module_name_);
~PyModule();
bool get_can_run() const {
std::lock_guard l(lock) ; return can_run;
}
+
+ enum PerfModuleCounters {
+ l_pym_first = 10000,
+ l_pym_notify_avg_usec,
+ l_pym_cmd_avg_usec,
+ l_pym_alive,
+ l_pym_cpu_usage,
+ l_pym_mem_rss_change,
+ l_pym_mem_rss_current,
+ l_pym_serve_cpu_usage,
+ l_pym_last
+ };
+
+ int perf_counter_build(CephContext *cct);
};
typedef std::shared_ptr<PyModule> PyModuleRef;
ceph_assert(pMainThreadState != nullptr);
std::list<std::string> failed_modules;
+ thread_monitor->start_monitoring();
const std::string module_path = g_conf().get_val<std::string>("mgr_module_path");
auto module_names = probe_modules(module_path);
failed_modules.push_back(module_name);
// Don't drop out here, load the other modules
}
-
// Record the module even if the load failed, so that we can
// report its loading error
modules[module_name] = std::move(mod);
kv_store, mon_provides_kv_sub,
ds, cs, mc,
clog_, audit_clog_, objecter_, f, server,
- *this));
+ *this, thread_monitor.get()));
for (const auto &i : modules) {
// Anything we're skipping because of !can_run will be flagged
std::unique_ptr<ActivePyModules> active_modules;
std::unique_ptr<StandbyPyModules> standby_modules;
+ std::unique_ptr<ThreadMonitor> thread_monitor;
PyThreadState *pMainThreadState;
std::vector<std::string> probe_modules(const std::string &path) const;
PyModuleConfig module_config;
+ PyObject* process_obj = nullptr;
public:
void handle_config(const std::string &k, const std::string &v);
}
explicit PyModuleRegistry(LogChannelRef clog_)
- : clog(clog_)
- {}
+ : clog(clog_),
+ thread_monitor(std::make_unique<ThreadMonitor>(g_ceph_context))
+ { }
+ ~PyModuleRegistry() {
+ thread_monitor->stop_monitoring();
+ }
/**
* @return true if the mgrmap has changed such that the service needs restart
*/
// This method is called from a separate OS thread (i.e. a thread not
// created by Python), so tell Gil to wrap this in a new thread state.
Gil gil(py_module->pMyThreadState, true);
-
+ if (py_module->perfcounter) {
+ py_module->perfcounter->set(py_module->l_pym_alive, 1);
+ }
auto pValue = PyObject_CallMethod(pClassInstance,
const_cast<char*>("serve"), nullptr);
return -EINVAL;
}
+ if (py_module->perfcounter) {
+ py_module->perfcounter->set(py_module->l_pym_alive, 0);
+ }
+
return r;
}
derr << "Failed to invoke shutdown() on " << get_name() << dendl;
derr << handle_pyerror(true, get_name(), "PyModuleRunner::shutdown") << dendl;
}
-
+ if (py_module->perfcounter) {
+ py_module->perfcounter->set(py_module->l_pym_alive, 0);
+ }
dead = true;
}
{
// No need to acquire the GIL here; the module does it.
dout(4) << "Entering thread for " << mod->get_name() << dendl;
+ runner_tid.store(ceph_gettid(), std::memory_order_release);
mod->serve();
return nullptr;
}
#include "mgr/Gil.h"
#include "PyModule.h"
+#include "mgr/ThreadMonitor.h"
+#include <future>
/**
* Implement the pattern of calling serve() on a module in a thread,
PyObject *pClassInstance = nullptr;
LogChannelRef clog;
-
+ pid_t m_native_tid = 0;
+ ThreadMonitor* m_thread_monitor = nullptr;
class PyModuleRunnerThread : public Thread
{
PyModuleRunner *mod;
-
+ std::atomic<pid_t> runner_tid{0};
public:
explicit PyModuleRunnerThread(PyModuleRunner *mod_)
: mod(mod_) {}
void *entry() override;
+ pid_t get_tid() const { return runner_tid.load(); }
};
bool is_dead() const { return dead; }
PyModuleRunner(
const PyModuleRef &py_module_,
- LogChannelRef clog_)
+ LogChannelRef clog_,
+ ThreadMonitor* monitor_ = nullptr)
:
py_module(py_module_),
clog(clog_),
+ m_thread_monitor(monitor_),
thread(this)
{
// Shortened name for use as thread name, because thread names
PyModuleRunnerThread thread;
std::string const &get_name() const { return py_module->get_name(); }
+ void set_native_tid(pid_t tid) { m_native_tid = tid; }
+ pid_t get_native_tid() const { return m_native_tid; }
private:
bool dead = false;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License version 2.1, as published by
+ * the Free Software Foundation. See file COPYING.
+ */
+
+#include "ThreadMonitor.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr[ThreadMonitor] " << __func__ << " "
+
+void ThreadMonitor::start_monitoring() {
+ if (running.exchange(true)) {
+ return;
+ }
+
+ dout(20) << "Starting monitoring thread." << dendl;
+ monitor_thread = std::make_unique<std::thread>(&ThreadMonitor::monitoring_loop, this);
+}
+
+void ThreadMonitor::stop_monitoring() {
+ if (!running.exchange(false)) {
+ return;
+ }
+
+ if (monitor_thread && monitor_thread->joinable()) {
+ monitor_thread->join();
+ dout(20) << "Monitoring thread stopped." << dendl;
+ }
+}
+
+void ThreadMonitor::register_thread(const pid_t thread_id,
+ const pid_t serve_thread_id,
+ const std::string& thread_name,
+ const PyModuleRef py_module_) {
+ std::lock_guard<std::mutex> lock(monitored_threads_mutex);
+
+ if (monitored_threads.count(thread_id)) {
+ dout(20) << "Attempted to register already known thread (TID: " << thread_id << ")." << dendl;
+ return;
+ }
+
+ MonitoredThreadInfo info;
+ info.name = thread_name;
+ info.py_module = py_module_;
+ info.serve_thread_id = serve_thread_id;
+ info.last_snapshot.timestamp = ceph::mono_clock::now();
+ info.last_serve_snapshot.timestamp = ceph::mono_clock::now();
+ monitored_threads[thread_id] = info;
+ dout(0) << "Registered thread: '" << thread_name << "' (TID: " << thread_id << ") module: "
+ << (py_module_ ? py_module_->get_name() : "unknown") << dendl;
+}
+
+void ThreadMonitor::handle_conf_change(
+ const ConfigProxy& conf,
+ const std::set<std::string> &changed) {
+ if (changed.count("mgr_module_monitor_interval")) {
+ int interval = m_cct->_conf.get_val<int64_t>("mgr_module_monitor_interval");
+ monitoring_interval = std::chrono::seconds(interval);
+ dout(20) << "Updated monitoring interval to " << interval << " seconds." << dendl;
+ if (interval == 0) {
+ stop_monitoring();
+ } else if (!running) {
+ start_monitoring();
+ }
+ }
+}
+
+void ThreadMonitor::monitoring_loop() {
+ if (m_clock_ticks_per_sec <= 0 || m_page_size <= 0) {
+ derr << "Failed to retrieve system configuration "
+ << "(clock ticks per second or page size). Monitoring will not start." << dendl;
+ running = false;
+ return;
+ }
+
+ while (running) {
+ // Phase 1: under lock — update RSS counters and copy all state needed for CPU computation.
+ long long process_rss_pages = 0;
+ if (!read_process_statm(process_rss_pages)) {
+ derr << "Failed to read process memory info from /proc/self/statm." << dendl;
+ continue;
+ }
+
+ std::vector<ThreadEntry> entries;
+ {
+ std::lock_guard<std::mutex> lock(monitored_threads_mutex);
+ for (auto& [tid, info] : monitored_threads) {
+ if (info.py_module && info.py_module->perfcounter) {
+ long long rss_bytes = process_rss_pages * m_page_size;
+ long long rss_change = rss_bytes - info.last_snapshot.rss_pages * m_page_size;
+ info.py_module->perfcounter->set(info.py_module->l_pym_mem_rss_current, rss_bytes);
+ info.py_module->perfcounter->set(info.py_module->l_pym_mem_rss_change, rss_change);
+ info.last_snapshot.rss_pages = process_rss_pages;
+ dout(20) << "Module '" << info.name << "' (TID: " << tid << "): "
+ << "Memory RSS: " << rss_bytes << " bytes"
+ << ", Change: " << rss_change << " bytes" << dendl;
+ }
+ entries.push_back({tid, info.serve_thread_id, info.name,
+ info.py_module, info.last_snapshot, info.last_serve_snapshot});
+ }
+ }
+
+ // Phase 2: no lock — do slow /proc reads and CPU calculations using copied snapshots.
+ std::vector<ThreadResult> results;
+ results.reserve(entries.size());
+ for (const auto& e : entries) {
+ results.push_back(process_thread_stats(e));
+ }
+
+ // Phase 3: under lock — write results back; remove dead threads.
+ {
+ std::lock_guard<std::mutex> lock(monitored_threads_mutex);
+ for (const auto& r : results) {
+ auto it = monitored_threads.find(r.tid);
+ if (it == monitored_threads.end()) {
+ continue; // deregistered between phase 1 and 3
+ }
+ MonitoredThreadInfo& info = it->second;
+ if (!r.main_ok) {
+ dout(0) << "Removing dead thread '" << info.name << "' (TID: " << r.tid << ")" << dendl;
+ monitored_threads.erase(it);
+ continue;
+ }
+ info.last_snapshot.utime = r.new_utime;
+ info.last_snapshot.stime = r.new_stime;
+ info.last_snapshot.timestamp = r.new_ts;
+ if (info.py_module && info.py_module->perfcounter) {
+ info.py_module->perfcounter->set(info.py_module->l_pym_cpu_usage,
+ static_cast<uint64_t>(r.cpu_pct));
+ }
+ if (info.serve_thread_id) {
+ if (!r.serve_ok) {
+ info.serve_thread_id = 0;
+ } else {
+ info.last_serve_snapshot.utime = r.new_serve_utime;
+ info.last_serve_snapshot.stime = r.new_serve_stime;
+ info.last_serve_snapshot.timestamp = r.new_serve_ts;
+ if (info.py_module && info.py_module->perfcounter) {
+ info.py_module->perfcounter->set(info.py_module->l_pym_serve_cpu_usage,
+ static_cast<uint64_t>(r.serve_cpu_pct));
+ }
+ }
+ }
+ }
+ }
+
+ std::this_thread::sleep_for(monitoring_interval);
+ }
+}
+
+ThreadMonitor::ThreadResult ThreadMonitor::process_thread_stats(const ThreadEntry& e) {
+ ThreadResult r;
+ r.tid = e.tid;
+ long long utime, stime;
+ r.new_ts = ceph::mono_clock::now();
+ r.main_ok = read_thread_stat(e.tid, utime, stime);
+ if (!r.main_ok) {
+ dout(20) << "Thread '" << e.name << "' (TID: " << e.tid << ") may have exited." << dendl;
+ return r;
+ }
+ double elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(
+ r.new_ts - e.last_snapshot.timestamp).count();
+ r.cpu_pct = calculate_cpu_percentage(utime - e.last_snapshot.utime,
+ stime - e.last_snapshot.stime, elapsed);
+ r.new_utime = utime;
+ r.new_stime = stime;
+ dout(20) << "Module '" << e.name << "' (TID: " << e.tid << "): "
+ << "CPU: " << std::fixed << std::setprecision(2) << r.cpu_pct << "%" << dendl;
+
+ if (!e.serve_tid) {
+ return r;
+ }
+ r.new_serve_ts = ceph::mono_clock::now();
+ r.serve_ok = read_thread_stat(e.serve_tid, utime, stime);
+ if (!r.serve_ok) {
+ dout(20) << "Serve thread (TID: " << e.serve_tid << ") may have exited." << dendl;
+ return r;
+ }
+ double serve_elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(
+ r.new_serve_ts - e.last_serve_snapshot.timestamp).count();
+ r.serve_cpu_pct = calculate_cpu_percentage(utime - e.last_serve_snapshot.utime,
+ stime - e.last_serve_snapshot.stime, serve_elapsed);
+ r.new_serve_utime = utime;
+ r.new_serve_stime = stime;
+ dout(20) << "Serve thread (TID: " << e.serve_tid << "): "
+ << "CPU: " << std::fixed << std::setprecision(2) << r.serve_cpu_pct << "%" << dendl;
+ return r;
+}
+
+bool ThreadMonitor::read_thread_stat(pid_t tid, long long& utime, long long& stime) {
+ std::string stat_path = "/proc/self/task/" + std::to_string(tid) + "/stat";
+ std::ifstream stat_file(stat_path);
+ if (!stat_file.is_open()) {
+ dout(20) << __func__ << "Could not open " << stat_path << dendl;
+ return false;
+ }
+ std::string line;
+ std::getline(stat_file, line);
+ size_t start = line.find('(');
+ size_t end = line.rfind(')');
+ if (start == std::string::npos || end == std::string::npos) {
+ dout(20) << __func__ << "Malformed stat file for TID " << tid << dendl;
+ return false;
+ }
+
+ std::string remainder = line.substr(end + 2); // +2 to skip ") "
+ std::stringstream ss(remainder);
+ std::string val;
+
+ // skip 11 fields before utime/stime:
+ // state ppid pgrp session tty_nr tpgid flags minflt cminflt majflt cmajflt
+ for (int i = 0; i < 11; ++i) {
+ ss >> val;
+ }
+ ss >> utime >> stime;
+ return true;
+}
+
+bool ThreadMonitor::read_process_statm(long long& rss_pages) {
+ std::string statm_path = "/proc/self/statm";
+ std::ifstream statm_file(statm_path);
+
+ if (!statm_file.is_open()) {
+ dout(20) << __func__ << "Could not open " << statm_path << dendl;
+ return false;
+ }
+
+ long long vsize_pages;
+ statm_file >> vsize_pages >> rss_pages;
+ return true;
+}
+
+double ThreadMonitor::calculate_cpu_percentage(long long utime_diff, long long stime_diff,
+ double elapsed_seconds) {
+ if (elapsed_seconds <= 0) {
+ return 0.0;
+ }
+ long long total_jiffies = utime_diff + stime_diff;
+ return (static_cast<double>(total_jiffies) / (m_clock_ticks_per_sec * elapsed_seconds)) * 100.0;
+}
\ No newline at end of file
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License version 2.1, as published by
+ * the Free Software Foundation. See file COPYING.
+ */
+
+#pragma once
+#include <unistd.h>
+#include "PyModule.h"
+#include "common/debug.h"
+
+class ThreadMonitor : public md_config_obs_t {
+public:
+ ThreadMonitor(CephContext *cct)
+ : m_cct(cct),
+ running(false),
+ monitoring_interval(m_cct->_conf.get_val<int64_t>("mgr_module_monitor_interval"))
+ {
+ m_cct->_conf.add_observer(this);
+ m_clock_ticks_per_sec = sysconf(_SC_CLK_TCK);
+ m_page_size = sysconf(_SC_PAGESIZE);
+ }
+
+ ~ThreadMonitor() {
+ m_cct->_conf.remove_observer(this);
+ stop_monitoring();
+ }
+
+ void start_monitoring();
+ void stop_monitoring();
+ void register_thread(const pid_t thread_id, const pid_t serve_thread_id, const std::string& name, const PyModuleRef py_module);
+
+protected:
+ std::vector<std::string> get_tracked_keys() const noexcept override {
+ return std::vector<std::string>{"mgr_module_monitor_interval"};
+ }
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string> &changed) override;
+private:
+ CephContext *m_cct;
+ struct ThreadSnapshot {
+ long long utime = 0;
+ long long stime = 0;
+ long long rss_pages = 0;
+ ceph::mono_clock::time_point timestamp;
+ };
+
+ // Information about a thread that is currently being monitored
+ struct MonitoredThreadInfo {
+ std::string name;
+ PyModuleRef py_module;
+ ThreadSnapshot last_snapshot;
+ pid_t serve_thread_id = 0; // TID of the thread runner
+ ThreadSnapshot last_serve_snapshot; // Last snapshot of the thread runner
+ };
+
+ struct ThreadEntry {
+ pid_t tid;
+ pid_t serve_tid;
+ std::string name;
+ PyModuleRef py_module;
+ ThreadSnapshot last_snapshot;
+ ThreadSnapshot last_serve_snapshot;
+ };
+
+ struct ThreadResult {
+ pid_t tid;
+ bool main_ok = false;
+ bool serve_ok = false;
+ long long new_utime = 0, new_stime = 0;
+ ceph::mono_clock::time_point new_ts;
+ double cpu_pct = 0;
+ long long new_serve_utime = 0, new_serve_stime = 0;
+ ceph::mono_clock::time_point new_serve_ts;
+ double serve_cpu_pct = 0;
+ };
+
+ std::map<pid_t, MonitoredThreadInfo> monitored_threads;
+
+ std::mutex monitored_threads_mutex;
+ std::atomic<bool> running;
+ std::unique_ptr<std::thread> monitor_thread;
+ ceph::mono_clock::duration monitoring_interval = std::chrono::seconds(2);
+ long m_clock_ticks_per_sec = 0;
+ long m_page_size = 0;
+
+ void monitoring_loop();
+ bool read_thread_stat(pid_t tid, long long& utime, long long& stime);
+ bool read_process_statm(long long& rss_pages);
+ long get_clock_ticks_per_sec() const;
+ long get_page_size() const;
+ double calculate_cpu_percentage(long long utime_diff, long long stime_diff, double elapsed_seconds);
+ ThreadResult process_thread_stats(const ThreadEntry& e);
+};
\ No newline at end of file