]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: Add per-module performance counters to mgr
authorNitzan Mordechai <nmordech@redhat.com>
Sun, 8 Dec 2024 18:08:39 +0000 (18:08 +0000)
committerNitzan Mordechai <nmordech@ibm.com>
Mon, 25 May 2026 07:48:06 +0000 (07:48 +0000)
This commit introduces performance counters for individual Ceph mgr modules.
These counters allow monitoring module behavior, debugging latency issues,
and identifying performance bottlenecks, all without modifying the modules themselves.

The following counters are now exposed under:
  > ceph daemon mgr.<id> perf dump

Example structure:
"mgr_module_<module_name>": {
    "notify_avg_usec": {     <- Average time spent handling notify events
        "avgcount": 0,
        "sum": 0
    },
    "cmd_avg_usec": {        <- Average time spent processing CLI/admin commands
        "avgcount": 0,
        "sum": 0
    },
    "serve_avg_usec": {      <- Average time spent in module serve loop (if applicable)
        "avgcount": 0,
        "sum": 0
    },
    "alive": 1               <- Module is alive (1 = running, 0 = exited)
    "cpu_usage": 0,          <- CPU usage in percent
    "mem_rss_change": 0,     <- Memory RSS change in bytes
    "mem_rss_current": 490737664 <- Memory RSS current in bytes

}

Signed-off-by: Nitzan Mordechai <nmordech@ibm.com>
Conflicts:
  src/mgr/ActivePyModules.cc - finisher.queue changed by 63859, adding py_module to the parameter list
  src/mgr/PyModuleRegistry.cc - check_all_modules_started added by 63859

17 files changed:
doc/mgr/administrator.rst
src/common/Finisher.cc
src/common/Finisher.h
src/common/options/mgr.yaml.in
src/mgr/ActivePyModule.cc
src/mgr/ActivePyModule.h
src/mgr/ActivePyModules.cc
src/mgr/ActivePyModules.h
src/mgr/CMakeLists.txt
src/mgr/PyModule.cc
src/mgr/PyModule.h
src/mgr/PyModuleRegistry.cc
src/mgr/PyModuleRegistry.h
src/mgr/PyModuleRunner.cc
src/mgr/PyModuleRunner.h
src/mgr/ThreadMonitor.cc [new file with mode: 0644]
src/mgr/ThreadMonitor.h [new file with mode: 0644]

index 1477b74a94906ac692921d497371cf03cb420edd..a6a7047577c5d88e836dbaedee1c6332848ddb01 100644 (file)
@@ -108,7 +108,7 @@ daemon as failed using ``ceph mgr fail <mgr name>``.
 Performance and Scalability
 ---------------------------
 
-All the mgr modules share a cache that can be enabled with
+All the Manager modules share a cache that can be enabled with
 ``ceph config set mgr mgr_ttl_cache_expire_seconds <seconds>``, where seconds
 is the time to live of the cached python objects.
 
@@ -122,6 +122,28 @@ Furthermore, you can run ``ceph daemon mgr.${MGRNAME} perf dump`` to retrieve
 perf counters of a mgr module. In ``mgr.cache_hit`` and ``mgr.cache_miss``
 you'll find the hit/miss ratio of the mgr cache.
 
+The Manager includes a ThreadMonitor that tracks CPU usage and memory consumption
+for each enabled module. This monitoring can be configured with
+``ceph config set mgr mgr_module_monitor_interval <seconds>``, where ``seconds``
+is the monitoring interval. Setting this to 0 disables module monitoring.
+
+The ThreadMonitor provides per-module performance counters accessible via
+``ceph daemon mgr.${MGRNAME} perf dump``, including:
+
+- ``notify_avg_usec``: Average time spent in notify calls (microseconds)
+- ``cmd_avg_usec``: Average time spent in command calls (microseconds)  
+- ``alive``: Module health status (0=dead, 1=alive)
+- ``cpu_usage``: CPU percentage for the main module thread
+- ``serve_cpu_usage``: CPU percentage for the module's serve thread (if present)
+- ``mem_rss_current``: Current process memory usage (RSS) in bytes
+- ``mem_rss_change``: Memory usage change since last measurement in bytes
+
+These counters help identify resource-intensive modules and can be useful for
+debugging performance issues or memory leaks. The ``notify_avg_usec`` and 
+``cmd_avg_usec`` counters track the performance of module operations, while
+the CPU and memory counters monitor resource consumption. The default monitoring
+interval is 2 seconds.
+
 
 Automatic Stats Period Tuning
 ------------------------------
index b0c2b775fbfd2511b331b9a3bdab60f3b42e3473..04ef97ad6f86b86c89a63f441e02ef5907371fa9 100644 (file)
@@ -47,6 +47,8 @@ Finisher::~Finisher() {
 void Finisher::start()
 {
   ldout(cct, 10) << __func__ << dendl;
+  tid_promise = std::promise<void>{};
+  started_future = tid_promise.get_future();
   finisher_thread.create(thread_name.c_str());
 }
 
@@ -86,6 +88,8 @@ void *Finisher::finisher_thread_entry()
   std::unique_lock ul(finisher_lock);
   ldout(cct, 10) << "finisher_thread start" << dendl;
 
+  finisher_tid.store(ceph_gettid());
+  tid_promise.set_value();
   utime_t start;
   uint64_t count = 0;
   while (!finisher_stop) {
index d737f015cce78a8c715b19b560c127af92fe7a0c..41173cefa761749e134ebfc56270b6dcae95db9e 100644 (file)
@@ -17,6 +17,7 @@
 #define CEPH_FINISHER_H
 
 #include <atomic>
+#include <future>
 #include <list>
 #include <mutex>
 #include <string>
@@ -49,6 +50,9 @@ class Finisher {
   bool         finisher_stop = false; ///< Set when the finisher should stop.
   bool         finisher_running = false; ///< True when the finisher is currently executing contexts.
   bool        finisher_empty_wait = false; ///< True mean someone wait finisher empty.
+  std::atomic<pid_t> finisher_tid{0}; ///< TID of the finisher worker thread, set on startup
+  std::promise<void> tid_promise; ///< Fulfilled by the worker thread once it has started.
+  std::future<void>  started_future; ///< Becomes ready when the finisher thread is running; use on_started() to observe.
 
   /// Queue for contexts for which complete(0) will be called.
   std::vector<std::pair<Context*,int>> finisher_queue;
@@ -104,6 +108,7 @@ class Finisher {
 
   /// Start the worker thread.
   void start();
+  std::future<void>& on_started() { return started_future; }
 
   /** @brief Stop the worker thread.
    *
@@ -119,6 +124,7 @@ class Finisher {
   void wait_for_empty();
 
   bool is_empty();
+  pid_t get_tid() const { return finisher_tid; }
 
   std::string_view get_thread_name() const noexcept {
     return thread_name;
index 3f88de48d3b94c99b28de4438bd2ee18062971f2..d1c587390d4d64c72cb9ec973a75d610d1fb5e18 100644 (file)
@@ -438,6 +438,14 @@ options:
   default: 5
   min: 0
   max: 11
+- name: mgr_module_monitor_interval
+  type: int
+  level: advanced
+  desc: Period in seconds for collecting Manager modules cpu and memory performance counters.
+  long_desc: Period in seconds for Manager Monitor to collect the cpu and memory for
+    each enabled module. If set to 0, collection of these stats will be disabled.
+  default: 5
+  min: 0
 - name: mgr_tick_period
   type: secs
   level: advanced
index 2c0df7a30116f92c624b3b766cc01bccce0eccfc..a01233ac5dc7c0264f68d6c11a28c51b1a681062 100644 (file)
@@ -66,6 +66,7 @@ void ActivePyModule::notify(const std::string &notify_type, const std::string &n
 
   Gil gil(py_module->pMyThreadState, true);
 
+  auto _start = ceph::mono_clock::now();
   // Execute
   auto pValue = PyObject_CallMethod(pClassInstance,
        const_cast<char*>("notify"), const_cast<char*>("(ss)"),
@@ -73,6 +74,11 @@ void ActivePyModule::notify(const std::string &notify_type, const std::string &n
 
   if (pValue != NULL) {
     Py_DECREF(pValue);
+    if (py_module->perfcounter) {
+      auto duration = ceph::mono_clock::now() - _start;
+      auto usec = std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
+      py_module->perfcounter->inc(py_module->l_pym_notify_avg_usec, usec);
+    }
   } else {
     derr << get_name() << ".notify:" << dendl;
     derr << handle_pyerror(true, get_name(), "ActivePyModule::notify") << dendl;
@@ -98,7 +104,8 @@ void ActivePyModule::notify_clog(const LogEntry &log_entry)
   PyFormatter f;
   log_entry.dump(&f);
   auto py_log_entry = f.get();
-
+  
+  auto  _start = ceph::mono_clock::now();
   // Execute
   auto pValue = PyObject_CallMethod(pClassInstance,
        const_cast<char*>("notify"), const_cast<char*>("(sN)"),
@@ -106,6 +113,11 @@ void ActivePyModule::notify_clog(const LogEntry &log_entry)
 
   if (pValue != NULL) {
     Py_DECREF(pValue);
+    if (py_module->perfcounter) {
+      auto duration = ceph::mono_clock::now() - _start;
+      auto usec = std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
+      py_module->perfcounter->inc(py_module->l_pym_notify_avg_usec, usec);
+    }
   } else {
     derr << get_name() << ".notify_clog:" << dendl;
     derr << handle_pyerror(true, get_name(), "ActivePyModule::notify_clog") << dendl;
@@ -261,7 +273,7 @@ int ActivePyModule::handle_command(
   ceph_assert(m_session == nullptr);
   m_command_perms = module_command.perm;
   m_session = &session;
-
+  auto _start = ceph::mono_clock::now();
   auto pResult = PyObject_CallMethod(pClassInstance,
       const_cast<char*>("_handle_command"), const_cast<char*>("s#O"),
       instr.c_str(), instr.length(), py_cmd);
@@ -272,6 +284,11 @@ int ActivePyModule::handle_command(
 
   int r = 0;
   if (pResult != NULL) {
+    if (py_module->perfcounter) {
+      auto duration = ceph::mono_clock::now() - _start;
+      auto usec = std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
+      py_module->perfcounter->inc(py_module->l_pym_cmd_avg_usec, usec);
+    }
     if (PyTuple_Size(pResult) != 3) {
       derr << "module '" << py_module->get_name() << "' command handler "
               "returned wrong type!" << dendl;
index b58953387970043685ee52a8da3a206536a75371..35fab34a1622a636fc4b672bd7874736a6ccc726 100644 (file)
@@ -54,8 +54,9 @@ public:
 
 public:
   ActivePyModule(const PyModuleRef &py_module_,
-      LogChannelRef clog_)
-    : PyModuleRunner(py_module_, clog_),
+      LogChannelRef clog_,
+      ThreadMonitor* monitor_ = nullptr)
+    : PyModuleRunner(py_module_, clog_, monitor_),
       finisher(g_ceph_context, thread_name, fmt::format("m-fin-{}", py_module->get_name()).substr(0,15))
 
   {
index 0f5d07638dbe187fe797d5df0fd73b06148265b9..93ad567def4fc725be02237a7630b4fa1ccb6ae4 100644 (file)
@@ -60,10 +60,11 @@ ActivePyModules::ActivePyModules(
   MonClient &mc, LogChannelRef clog_,
   LogChannelRef audit_clog_, Objecter &objecter_,
   Finisher &f, DaemonServer &server,
-  PyModuleRegistry &pmr)
+  PyModuleRegistry &pmr, ThreadMonitor *monitor_)
 : module_config(module_config_), daemon_state(ds), cluster_state(cs),
   monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
   finisher(f),
+  m_thread_monitor(monitor_),
   cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"),
   server(server), py_module_registry(pmr)
 {
@@ -75,7 +76,18 @@ ActivePyModules::ActivePyModules(
   cmd_finisher.start();
 }
 
-ActivePyModules::~ActivePyModules() = default;
+ActivePyModules::~ActivePyModules()
+{
+  dout(10) << "ActivePyModules destructor called" << dendl;
+
+  // Stop the thread monitor if it was started
+  if (m_thread_monitor) {
+    m_thread_monitor->stop_monitoring();
+  }
+
+  // Stop the finisher thread
+  cmd_finisher.stop();
+}
 
 void ActivePyModules::dump_server(const std::string &hostname,
                       const DaemonStateCollection &dmc,
@@ -535,12 +547,12 @@ void ActivePyModules::start_one(PyModuleRef py_module)
   std::lock_guard l(lock);
 
   const auto name = py_module->get_name();
-  auto active_module = std::make_shared<ActivePyModule>(py_module, clog);
+  auto active_module = std::make_shared<ActivePyModule>(py_module, clog, m_thread_monitor);
 
   pending_modules.insert(name);
   // Send all python calls down a Finisher to avoid blocking
   // C++ code, and avoid any potential lock cycles.
-  finisher.queue(new LambdaContext([this, active_module, name](int) {
+  finisher.queue(new LambdaContext([this, active_module, name, py_module](int) {
     // Delay loading in testing scenarios
     auto delay = g_conf().get_val<std::chrono::milliseconds>("mgr_module_load_delay");
     std::string delayed_module = g_conf().get_val<std::string>("mgr_module_load_delay_name");
@@ -558,12 +570,16 @@ void ActivePyModules::start_one(PyModuleRef py_module)
     } else {
       auto em = modules.emplace(name, active_module);
       ceph_assert(em.second); // actually inserted
-
-      dout(4) << "Starting thread for " << name << dendl;
       active_module->thread.create(active_module->get_thread_name());
-      dout(4) << "Starting active module " << name <<" finisher thread "
-        << active_module->get_fin_thread_name() << dendl;
+      py_module->perf_counter_build(g_ceph_context);
       active_module->finisher.start();
+      active_module->finisher.on_started().wait();
+      active_module->set_native_tid(active_module->finisher.get_tid());
+      if (m_thread_monitor) {
+        m_thread_monitor->register_thread(active_module->get_native_tid(), 
+          active_module->thread.get_tid(),
+          name, py_module);
+      }
     }
 
     // Signal when we're finally done starting up modules
index e62bb625c9d846f4509048e8627ce46870628d38..151be14926c4bc29719d2d7cd5d036d3bc7551da 100644 (file)
@@ -67,6 +67,7 @@ class ActivePyModules
   Objecter &objecter;
   Finisher &finisher;
   TTLCache<std::string, PyObject*> ttl_cache;
+  ThreadMonitor* m_thread_monitor = nullptr;
 public:
   Finisher cmd_finisher;
 private:
@@ -84,7 +85,7 @@ public:
     bool mon_provides_kv_sub,
     DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
     LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_,
-    Finisher &f, DaemonServer &server, PyModuleRegistry &pmr);
+    Finisher &f, DaemonServer &server, PyModuleRegistry &pmr, ThreadMonitor *monitor = nullptr);
 
   ~ActivePyModules();
 
index abf42bd4efd365172d0aedf469d49b2e9deeb469..61d02cb884c0ce11de356ef63ad00a1cf14c27bc 100644 (file)
@@ -34,6 +34,7 @@ if(WITH_MGR)
     PyModuleRunner.cc
     PyOSDMap.cc
     StandbyPyModules.cc
+    ThreadMonitor.cc
     mgr_commands.cc
     MgrOpRequest.cc
     ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
index b974a60b265fe3d8b19341e95b6498db4b3d7ade..46dc97073b89ff90f5afaea7580c1a8fbbbb51d2 100644 (file)
@@ -287,6 +287,10 @@ std::pair<int, std::string> PyModuleConfig::set_config(
   }
 }
 
+  PyModule::PyModule(const std::string &module_name_)
+    : module_name(module_name_)
+  { }
+
 PyObject* PyModule::init_ceph_logger()
 {
   auto py_logger = PyModule_Create(&ceph_logger_module);
@@ -792,3 +796,18 @@ PyModule::~PyModule()
   }
 }
 
+int PyModule::perf_counter_build(CephContext *cct) {
+  ceph_assert(perfcounter == nullptr);
+  PerfCountersBuilder pcb(cct, "mgr_module_" + get_name(), l_pym_first, l_pym_last);
+  pcb.add_u64_avg(l_pym_notify_avg_usec, "notify_avg_usec", "Average time spent in notify calls", "nsec", 0);
+  pcb.add_u64_avg(l_pym_cmd_avg_usec, "cmd_avg_usec", "Average time spent in command calls", "csec", 0);
+  pcb.add_u64(l_pym_alive, "alive", "Is the module alive?", "aliv", 0, uint64_t(1));
+  pcb.add_u64(l_pym_cpu_usage, "cpu_usage", "CPU usage in percent", "cpu", 0, uint64_t(100));
+  pcb.add_u64(l_pym_mem_rss_change, "mem_rss_change", "Memory RSS change in bytes", "", 0);
+  pcb.add_u64(l_pym_mem_rss_current, "mem_rss_current", "Memory RSS current in bytes", "", 0);
+  pcb.add_u64(l_pym_serve_cpu_usage, "serve_cpu_usage", "Serve thread CPU usage in percent", "cpu", 0, uint64_t(100));
+  perfcounter = std::unique_ptr<PerfCounters>(pcb.create_perf_counters());
+  cct->get_perfcounters_collection()->add(perfcounter.get());
+
+  return 0;
+}
\ No newline at end of file
index 01cca1d8e27ed274ec0699ce1b05a7c3a0fd9189..77212dd94b7d08885be4b917d66c82caae013aa5 100644 (file)
@@ -19,7 +19,9 @@
 #include <string>
 #include <vector>
 #include <boost/optional.hpp>
+#include "common/ceph_context.h"
 #include "common/ceph_mutex.h"
+#include "common/perf_counters.h"
 #include "Python.h"
 #include "Gil.h"
 #include "mon/MgrMap.h"
@@ -98,6 +100,7 @@ private:
   std::set<std::string> notify_types;
 
 public:
+  std::unique_ptr<PerfCounters> perfcounter;
   static std::string mgr_store_prefix;
 
   SafeThreadState pMyThreadState;
@@ -108,10 +111,7 @@ public:
   // true unless module in mgr_subinterpreter_modules
   bool use_main_interpreter = true;
 
-  explicit PyModule(const std::string &module_name_)
-    : module_name(module_name_)
-  {
-  }
+  explicit PyModule(const std::string &module_name_);
 
   ~PyModule();
 
@@ -181,6 +181,20 @@ public:
   bool get_can_run() const {
     std::lock_guard l(lock) ; return can_run;
   }
+
+  enum PerfModuleCounters {
+    l_pym_first = 10000,
+    l_pym_notify_avg_usec,
+    l_pym_cmd_avg_usec,
+    l_pym_alive,
+    l_pym_cpu_usage,
+    l_pym_mem_rss_change,
+    l_pym_mem_rss_current,
+    l_pym_serve_cpu_usage,
+    l_pym_last
+  };
+
+  int perf_counter_build(CephContext *cct);
 };
 
 typedef std::shared_ptr<PyModule> PyModuleRef;
index 6a8a59dcad12013003af8705ce49490ecbac7b40..67b14757746fb2fd3a50a20a04c321d4102767f9 100644 (file)
@@ -99,6 +99,7 @@ void PyModuleRegistry::init()
   ceph_assert(pMainThreadState != nullptr);
 
   std::list<std::string> failed_modules;
+  thread_monitor->start_monitoring();
 
   const std::string module_path = g_conf().get_val<std::string>("mgr_module_path");
   auto module_names = probe_modules(module_path);
@@ -118,7 +119,6 @@ void PyModuleRegistry::init()
       failed_modules.push_back(module_name);
       // Don't drop out here, load the other modules
     }
-
     // Record the module even if the load failed, so that we can
     // report its loading error
     modules[module_name] = std::move(mod);
@@ -236,7 +236,7 @@ void PyModuleRegistry::active_start(
       kv_store, mon_provides_kv_sub,
       ds, cs, mc,
       clog_, audit_clog_, objecter_, f, server,
-      *this));
+      *this, thread_monitor.get()));
 
   for (const auto &i : modules) {
     // Anything we're skipping because of !can_run will be flagged
index 069af5154b9122442e81d2f5db0942ff66d23d8f..d79183035a84ef3c133086b8690a90410e4a7bb9 100644 (file)
@@ -50,6 +50,7 @@ private:
 
   std::unique_ptr<ActivePyModules> active_modules;
   std::unique_ptr<StandbyPyModules> standby_modules;
+  std::unique_ptr<ThreadMonitor> thread_monitor;
 
   PyThreadState *pMainThreadState;
 
@@ -64,6 +65,7 @@ private:
   std::vector<std::string> probe_modules(const std::string &path) const;
 
   PyModuleConfig module_config;
+  PyObject* process_obj = nullptr;
 
 public:
   void handle_config(const std::string &k, const std::string &v);
@@ -93,9 +95,13 @@ public:
   }
 
   explicit PyModuleRegistry(LogChannelRef clog_)
-    : clog(clog_)
-  {}
+    : clog(clog_),
+      thread_monitor(std::make_unique<ThreadMonitor>(g_ceph_context))
+  { }
 
+  ~PyModuleRegistry() {
+    thread_monitor->stop_monitoring();
+  }
   /**
    * @return true if the mgrmap has changed such that the service needs restart
    */
index 36f8bc980443f88e9db42012f249dd4975b6d511..e7469995110a44e1514bbe11fc00d21617437198 100644 (file)
@@ -44,7 +44,9 @@ int PyModuleRunner::serve()
   // This method is called from a separate OS thread (i.e. a thread not
   // created by Python), so tell Gil to wrap this in a new thread state.
   Gil gil(py_module->pMyThreadState, true);
-
+  if (py_module->perfcounter) {
+    py_module->perfcounter->set(py_module->l_pym_alive, 1);
+  }
   auto pValue = PyObject_CallMethod(pClassInstance,
       const_cast<char*>("serve"), nullptr);
 
@@ -71,6 +73,10 @@ int PyModuleRunner::serve()
     return -EINVAL;
   }
 
+  if (py_module->perfcounter) {
+    py_module->perfcounter->set(py_module->l_pym_alive, 0);
+  }
+
   return r;
 }
 
@@ -89,7 +95,9 @@ void PyModuleRunner::shutdown()
     derr << "Failed to invoke shutdown() on " << get_name() << dendl;
     derr << handle_pyerror(true, get_name(), "PyModuleRunner::shutdown") << dendl;
   }
-
+  if (py_module->perfcounter) {
+    py_module->perfcounter->set(py_module->l_pym_alive, 0);
+  }
   dead = true;
 }
 
@@ -106,6 +114,7 @@ void* PyModuleRunner::PyModuleRunnerThread::entry()
 {
   // No need to acquire the GIL here; the module does it.
   dout(4) << "Entering thread for " << mod->get_name() << dendl;
+  runner_tid.store(ceph_gettid(), std::memory_order_release); 
   mod->serve();
   return nullptr;
 }
index 187002745e13709aee3d6904a32c21c7a255584e..3b9596ea9e312cb695bdd9155e8156a87a3653fd 100644 (file)
@@ -20,6 +20,8 @@
 #include "mgr/Gil.h"
 
 #include "PyModule.h"
+#include "mgr/ThreadMonitor.h"
+#include <future>
 
 /**
  * Implement the pattern of calling serve() on a module in a thread,
@@ -36,16 +38,18 @@ protected:
   PyObject *pClassInstance = nullptr;
 
   LogChannelRef clog;
-
+  pid_t m_native_tid = 0;
+  ThreadMonitor* m_thread_monitor = nullptr;
   class PyModuleRunnerThread : public Thread
   {
     PyModuleRunner *mod;
-
+    std::atomic<pid_t> runner_tid{0};
   public:
     explicit PyModuleRunnerThread(PyModuleRunner *mod_)
       : mod(mod_) {}
 
     void *entry() override;
+    pid_t get_tid() const { return runner_tid.load(); }
   };
 
   bool is_dead() const { return dead; }
@@ -64,10 +68,12 @@ public:
 
   PyModuleRunner(
       const PyModuleRef &py_module_,
-      LogChannelRef clog_)
+      LogChannelRef clog_,
+      ThreadMonitor* monitor_ = nullptr)
     : 
       py_module(py_module_),
       clog(clog_),
+      m_thread_monitor(monitor_),
       thread(this)
   {
     // Shortened name for use as thread name, because thread names
@@ -82,6 +88,8 @@ public:
   PyModuleRunnerThread thread;
 
   std::string const &get_name() const { return py_module->get_name(); }
+  void set_native_tid(pid_t tid) { m_native_tid = tid; }
+  pid_t get_native_tid() const { return m_native_tid; }
 
 private:
   bool dead = false;
diff --git a/src/mgr/ThreadMonitor.cc b/src/mgr/ThreadMonitor.cc
new file mode 100644 (file)
index 0000000..d26a747
--- /dev/null
@@ -0,0 +1,250 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License version 2.1, as published by
+ * the Free Software Foundation.  See file COPYING.
+ */
+
+#include "ThreadMonitor.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr[ThreadMonitor] " << __func__ << " "
+
+void ThreadMonitor::start_monitoring() {
+  if (running.exchange(true)) {
+    return;
+  }
+
+  dout(20) << "Starting monitoring thread." << dendl;
+  monitor_thread = std::make_unique<std::thread>(&ThreadMonitor::monitoring_loop, this);
+}
+
+void ThreadMonitor::stop_monitoring() {
+  if (!running.exchange(false)) {
+    return;
+  }
+
+  if (monitor_thread && monitor_thread->joinable()) {
+    monitor_thread->join();
+    dout(20) << "Monitoring thread stopped." << dendl;
+  }
+}
+
+void ThreadMonitor::register_thread(const pid_t thread_id,
+  const pid_t serve_thread_id,
+  const std::string& thread_name,
+  const PyModuleRef py_module_) {
+  std::lock_guard<std::mutex> lock(monitored_threads_mutex);
+
+  if (monitored_threads.count(thread_id)) {
+    dout(20) << "Attempted to register already known thread (TID: " << thread_id << ")." << dendl;
+    return;
+  }
+
+  MonitoredThreadInfo info;
+  info.name = thread_name;
+  info.py_module = py_module_;
+  info.serve_thread_id = serve_thread_id;
+  info.last_snapshot.timestamp = ceph::mono_clock::now();
+  info.last_serve_snapshot.timestamp = ceph::mono_clock::now();
+  monitored_threads[thread_id] = info;
+  dout(0) << "Registered thread: '" << thread_name << "' (TID: " << thread_id << ") module: "
+    << (py_module_ ? py_module_->get_name() : "unknown") << dendl;
+}
+
+void ThreadMonitor::handle_conf_change(
+  const ConfigProxy& conf,
+  const std::set<std::string> &changed) {
+  if (changed.count("mgr_module_monitor_interval")) {
+    int interval = m_cct->_conf.get_val<int64_t>("mgr_module_monitor_interval");
+    monitoring_interval = std::chrono::seconds(interval);
+    dout(20) << "Updated monitoring interval to " << interval << " seconds." << dendl;
+    if (interval == 0) {
+      stop_monitoring();
+    } else if (!running) {
+      start_monitoring();
+    }
+  }
+}
+
+void ThreadMonitor::monitoring_loop() {
+  if (m_clock_ticks_per_sec <= 0 || m_page_size <= 0) {
+    derr << "Failed to retrieve system configuration "
+         << "(clock ticks per second or page size). Monitoring will not start." << dendl;
+    running = false;
+    return;
+  }
+
+  while (running) {
+    // Phase 1: under lock — update RSS counters and copy all state needed for CPU computation.
+    long long process_rss_pages = 0;
+    if (!read_process_statm(process_rss_pages)) {
+      derr << "Failed to read process memory info from /proc/self/statm." << dendl;
+      continue;
+    }
+
+    std::vector<ThreadEntry> entries;
+    {
+      std::lock_guard<std::mutex> lock(monitored_threads_mutex);
+      for (auto& [tid, info] : monitored_threads) {
+        if (info.py_module && info.py_module->perfcounter) {
+          long long rss_bytes = process_rss_pages * m_page_size;
+          long long rss_change = rss_bytes - info.last_snapshot.rss_pages * m_page_size;
+          info.py_module->perfcounter->set(info.py_module->l_pym_mem_rss_current, rss_bytes);
+          info.py_module->perfcounter->set(info.py_module->l_pym_mem_rss_change, rss_change);
+          info.last_snapshot.rss_pages = process_rss_pages;
+          dout(20) << "Module '" << info.name << "' (TID: " << tid << "): "
+                   << "Memory RSS: " << rss_bytes << " bytes"
+                   << ", Change: " << rss_change << " bytes" << dendl;
+        }
+        entries.push_back({tid, info.serve_thread_id, info.name,
+                           info.py_module, info.last_snapshot, info.last_serve_snapshot});
+      }
+    }
+
+    // Phase 2: no lock — do slow /proc reads and CPU calculations using copied snapshots.
+    std::vector<ThreadResult> results;
+    results.reserve(entries.size());
+    for (const auto& e : entries) {
+      results.push_back(process_thread_stats(e));
+    }
+
+    // Phase 3: under lock — write results back; remove dead threads.
+    {
+      std::lock_guard<std::mutex> lock(monitored_threads_mutex);
+      for (const auto& r : results) {
+        auto it = monitored_threads.find(r.tid);
+        if (it == monitored_threads.end()) {
+          continue; // deregistered between phase 1 and 3
+        }
+        MonitoredThreadInfo& info = it->second;
+        if (!r.main_ok) {
+          dout(0) << "Removing dead thread '" << info.name << "' (TID: " << r.tid << ")" << dendl;
+          monitored_threads.erase(it);
+          continue;
+        }
+        info.last_snapshot.utime = r.new_utime;
+        info.last_snapshot.stime = r.new_stime;
+        info.last_snapshot.timestamp = r.new_ts;
+        if (info.py_module && info.py_module->perfcounter) {
+          info.py_module->perfcounter->set(info.py_module->l_pym_cpu_usage,
+                                            static_cast<uint64_t>(r.cpu_pct));
+        }
+        if (info.serve_thread_id) {
+          if (!r.serve_ok) {
+            info.serve_thread_id = 0;
+          } else {
+            info.last_serve_snapshot.utime = r.new_serve_utime;
+            info.last_serve_snapshot.stime = r.new_serve_stime;
+            info.last_serve_snapshot.timestamp = r.new_serve_ts;
+            if (info.py_module && info.py_module->perfcounter) {
+              info.py_module->perfcounter->set(info.py_module->l_pym_serve_cpu_usage,
+                                               static_cast<uint64_t>(r.serve_cpu_pct));
+            }
+          }
+        }
+      }
+    }
+
+    std::this_thread::sleep_for(monitoring_interval);
+  }
+}
+
+ThreadMonitor::ThreadResult ThreadMonitor::process_thread_stats(const ThreadEntry& e) {
+  ThreadResult r;
+  r.tid = e.tid;
+  long long utime, stime;
+  r.new_ts = ceph::mono_clock::now();
+  r.main_ok = read_thread_stat(e.tid, utime, stime);
+  if (!r.main_ok) {
+    dout(20) << "Thread '" << e.name << "' (TID: " << e.tid << ") may have exited." << dendl;
+    return r;
+  }
+  double elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(
+                     r.new_ts - e.last_snapshot.timestamp).count();
+  r.cpu_pct = calculate_cpu_percentage(utime - e.last_snapshot.utime,
+                                        stime - e.last_snapshot.stime, elapsed);
+  r.new_utime = utime;
+  r.new_stime = stime;
+  dout(20) << "Module '" << e.name << "' (TID: " << e.tid << "): "
+           << "CPU: " << std::fixed << std::setprecision(2) << r.cpu_pct << "%" << dendl;
+
+  if (!e.serve_tid) {
+    return r;
+  }
+  r.new_serve_ts = ceph::mono_clock::now();
+  r.serve_ok = read_thread_stat(e.serve_tid, utime, stime);
+  if (!r.serve_ok) {
+    dout(20) << "Serve thread (TID: " << e.serve_tid << ") may have exited." << dendl;
+    return r;
+  }
+  double serve_elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(
+                           r.new_serve_ts - e.last_serve_snapshot.timestamp).count();
+  r.serve_cpu_pct = calculate_cpu_percentage(utime - e.last_serve_snapshot.utime,
+                                              stime - e.last_serve_snapshot.stime, serve_elapsed);
+  r.new_serve_utime = utime;
+  r.new_serve_stime = stime;
+  dout(20) << "Serve thread (TID: " << e.serve_tid << "): "
+           << "CPU: " << std::fixed << std::setprecision(2) << r.serve_cpu_pct << "%" << dendl;
+  return r;
+}
+
+bool ThreadMonitor::read_thread_stat(pid_t tid, long long& utime, long long& stime) {
+  std::string stat_path = "/proc/self/task/" + std::to_string(tid) + "/stat";
+  std::ifstream stat_file(stat_path);
+  if (!stat_file.is_open()) {
+    dout(20) << __func__ << "Could not open " << stat_path << dendl;
+    return false;
+  }
+  std::string line;
+  std::getline(stat_file, line);
+  size_t start = line.find('(');
+  size_t end = line.rfind(')');
+  if (start == std::string::npos || end == std::string::npos) {
+    dout(20) << __func__ << "Malformed stat file for TID " << tid << dendl;
+    return false;
+  }
+
+  std::string remainder = line.substr(end + 2); // +2 to skip ") "
+  std::stringstream ss(remainder);
+  std::string val;
+
+  // skip 11 fields before utime/stime: 
+  // state ppid pgrp session tty_nr tpgid flags minflt cminflt majflt cmajflt
+  for (int i = 0; i < 11; ++i) {
+    ss >> val;
+  }
+  ss >> utime >> stime;
+  return true;
+}
+
+bool ThreadMonitor::read_process_statm(long long& rss_pages) {
+  std::string statm_path = "/proc/self/statm";
+  std::ifstream statm_file(statm_path);
+
+  if (!statm_file.is_open()) {
+    dout(20) << __func__ << "Could not open " << statm_path << dendl;
+    return false;
+  }
+
+  long long vsize_pages;
+  statm_file >> vsize_pages >> rss_pages;
+  return true;
+}
+
+double ThreadMonitor::calculate_cpu_percentage(long long utime_diff, long long stime_diff, 
+                                              double elapsed_seconds) {
+  if (elapsed_seconds <= 0) {
+    return 0.0;
+  }
+  long long total_jiffies = utime_diff + stime_diff;
+  return (static_cast<double>(total_jiffies) / (m_clock_ticks_per_sec * elapsed_seconds)) * 100.0;
+}
\ No newline at end of file
diff --git a/src/mgr/ThreadMonitor.h b/src/mgr/ThreadMonitor.h
new file mode 100644 (file)
index 0000000..60a839a
--- /dev/null
@@ -0,0 +1,101 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License version 2.1, as published by
+ * the Free Software Foundation.  See file COPYING.
+ */
+
+#pragma once
+#include <unistd.h>
+#include "PyModule.h"
+#include "common/debug.h"
+
+class ThreadMonitor : public md_config_obs_t {
+public:
+  ThreadMonitor(CephContext *cct)
+    : m_cct(cct),
+      running(false),
+      monitoring_interval(m_cct->_conf.get_val<int64_t>("mgr_module_monitor_interval"))
+  {
+    m_cct->_conf.add_observer(this);
+    m_clock_ticks_per_sec = sysconf(_SC_CLK_TCK);
+    m_page_size = sysconf(_SC_PAGESIZE);
+  }
+
+  ~ThreadMonitor() {
+    m_cct->_conf.remove_observer(this);
+    stop_monitoring();
+  }
+
+  void start_monitoring();
+  void stop_monitoring();
+  void register_thread(const pid_t thread_id, const pid_t serve_thread_id, const std::string& name, const PyModuleRef py_module);
+
+protected:
+  std::vector<std::string> get_tracked_keys() const noexcept override {
+    return std::vector<std::string>{"mgr_module_monitor_interval"};
+  }
+  void handle_conf_change(const ConfigProxy& conf,
+    const std::set<std::string> &changed) override;
+private:
+  CephContext *m_cct;
+  struct ThreadSnapshot {
+    long long utime = 0;
+    long long stime = 0;
+    long long rss_pages = 0;
+    ceph::mono_clock::time_point timestamp;
+  };
+
+  // Information about a thread that is currently being monitored
+  struct MonitoredThreadInfo {
+    std::string name;
+    PyModuleRef py_module;
+    ThreadSnapshot last_snapshot;
+    pid_t serve_thread_id = 0; // TID of the thread runner
+    ThreadSnapshot last_serve_snapshot; // Last snapshot of the thread runner
+  };
+
+  struct ThreadEntry {
+    pid_t tid;
+    pid_t serve_tid;
+    std::string name;
+    PyModuleRef py_module;
+    ThreadSnapshot last_snapshot;
+    ThreadSnapshot last_serve_snapshot;
+  };
+
+  struct ThreadResult {
+    pid_t tid;
+    bool main_ok = false;
+    bool serve_ok = false;
+    long long new_utime = 0, new_stime = 0;
+    ceph::mono_clock::time_point new_ts;
+    double cpu_pct = 0;
+    long long new_serve_utime = 0, new_serve_stime = 0;
+    ceph::mono_clock::time_point new_serve_ts;
+    double serve_cpu_pct = 0;
+  };
+
+  std::map<pid_t, MonitoredThreadInfo> monitored_threads;
+
+  std::mutex monitored_threads_mutex;
+  std::atomic<bool> running;
+  std::unique_ptr<std::thread> monitor_thread;
+  ceph::mono_clock::duration monitoring_interval = std::chrono::seconds(2);
+  long m_clock_ticks_per_sec = 0;
+  long m_page_size = 0; 
+
+  void monitoring_loop();
+  bool read_thread_stat(pid_t tid, long long& utime, long long& stime);
+  bool read_process_statm(long long& rss_pages);
+  long get_clock_ticks_per_sec() const;
+  long get_page_size() const;
+  double calculate_cpu_percentage(long long utime_diff, long long stime_diff, double elapsed_seconds);
+  ThreadResult process_thread_stats(const ThreadEntry& e);
+};
\ No newline at end of file