]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
src/pybind/mgr: Add nvmeof-top tool wip-nvmeof-top-tool-25Feb-centos9-only
authorVallari Agrawal <vallari.agrawal@ibm.com>
Mon, 9 Feb 2026 21:28:28 +0000 (02:58 +0530)
committerVallari Agrawal <vallari.agrawal@ibm.com>
Wed, 25 Feb 2026 12:54:19 +0000 (18:24 +0530)
Add src/pybind/mgr/dashboard/services/nvmeof_top_cli.py
which adds commands:
"ceph nvmeof top io <subsystem>"
"ceph nvmeof top cpu"

This file is moved from the original work in:
https://github.com/pcuzner/ceph-nvmeof-top

Co-authored-by: Paul Cuzner <pcuzner@ibm.com>
Signed-off-by: Vallari Agrawal <vallari.agrawal@ibm.com>
src/ceph.in
src/pybind/mgr/dashboard/module.py
src/pybind/mgr/dashboard/services/nvmeof_client.py
src/pybind/mgr/dashboard/services/nvmeof_conf.py
src/pybind/mgr/dashboard/services/nvmeof_top_cli.py [new file with mode: 0644]
src/pybind/mgr/dashboard/tests/test_nvmeof_top_cli.py [new file with mode: 0644]

index abfc8fb57d98ab2c3cbd75d4087774e1f342663e..64a8857e98e388b3f5f0cd7bf4fbbbf7f599dd91 100755 (executable)
@@ -29,6 +29,7 @@ import stat
 import sys
 import time
 import platform
+import uuid
 
 from typing import Dict, List, Sequence, Tuple
 
@@ -593,6 +594,7 @@ def do_command(parsed_args, target, cmdargs, sigdict, inbuf, verbose):
     # Set extra options for polling commands only:
     if valid_dict.get('poll', False):
         valid_dict['width'] = Termsize().cols
+        valid_dict['session_id'] = str(uuid.uuid4())
     while True:
         try:
             # Only print the header for polling commands
index 713deca65e8028ec7b3aa88786c66ccbe3dc909a..5001a1bb73025b862b2231c6a475b21b12cf0df1 100644 (file)
@@ -34,9 +34,10 @@ from .cli import DBCLICommand
 from .controllers import nvmeof  # noqa # pylint: disable=unused-import
 from .controllers import Router, json_error_page
 from .grafana import push_local_dashboards
-from .services import nvmeof_cli  # noqa # pylint: disable=unused-import
+from .services import nvmeof_cli, nvmeof_top_cli  # noqa # pylint: disable=unused-import
 from .services.auth import AuthManager, AuthManagerTool, JwtManager
 from .services.exception import dashboard_exception_handler
+from .services.nvmeof_top_cli import NvmeofTopCollector
 from .services.service import RgwServiceManager
 from .services.sso import SSO_COMMANDS, handle_sso_command
 from .settings import handle_option_command, options_command_list, options_schema_list
@@ -304,6 +305,7 @@ class Module(MgrModule, CherryPyConfig):
         self.ACCESS_CTRL_DB = None
         self.SSO_DB = None
         self.health_checks = {}
+        self.nvmeof_collectors = {}
 
     @classmethod
     def can_run(cls):
@@ -556,6 +558,39 @@ class Module(MgrModule, CherryPyConfig):
 
         return self.__pool_stats
 
+    def get_nvmeof_collector(self, session_id: str = '', ttl: int = 3600):
+        STALE_POLL_THRESHOLD = 5  # expire if 5 poll intervals have passed without activity
+
+        def _expire_old_sessions():
+            now = time.time()
+            expired = []
+
+            for _id in list(self.nvmeof_collectors.keys()):
+                collector = self.nvmeof_collectors[_id]
+                delay = collector.delay
+                if delay < 1:  # for first poll iteration
+                    expire_time = collector.timestamp + ttl
+                else:
+                    expire_time = collector.timestamp + (STALE_POLL_THRESHOLD * delay)
+                if now > expire_time:
+                    expired.append(_id)
+
+            for _id in expired:
+                self.nvmeof_collectors.pop(_id, None)
+
+        if NvmeofTopCollector is None:
+            logger.error("NVMeoFClient is not available")
+            return None
+
+        if not session_id:
+            return None
+
+        _expire_old_sessions()
+
+        if session_id not in self.nvmeof_collectors:
+            self.nvmeof_collectors[session_id] = NvmeofTopCollector()
+        return self.nvmeof_collectors[session_id]
+
     def config_notify(self):
         """
         This method is called whenever one of our config options is changed.
index de9f76ed8581fdb7ee0bb505d11cd322ab0a0487..18b48831a76480596c3ef68fb1317ace0a238c15 100644 (file)
@@ -48,6 +48,7 @@ else:
                     f'Unable to retrieve the gateway info: {e}'
                 )
 
+            self.daemon_name = ''
             # While creating listener need to direct request to the gateway
             # address where listener is supposed to be added.
             if server_address:
@@ -62,6 +63,7 @@ else:
                     None
                 )
                 if matched_gateway:
+                    self.daemon_name = matched_gateway.get('daemon_name')
                     self.gateway_addr = matched_gateway.get('service_url')
                     logger.debug("Gateway address set to: %s", self.gateway_addr)
             enable_auth = is_mtls_enabled(service_name)
@@ -80,6 +82,7 @@ else:
                 logger.info("Insecurely connecting to: %s", self.gateway_addr)
                 self.channel = grpc.insecure_channel(self.gateway_addr)
             self.stub = pb2_grpc.GatewayStub(self.channel)
+            self.service_name = service_name
 
     Model = Dict[str, Any]
     Collection = List[Model]
index 0a80bc26debc68a69319b71548686001dc2c612b..da06f1b232832f4b35096cc35dc4abd4afdba1d8 100644 (file)
@@ -216,3 +216,12 @@ def is_mtls_enabled(service_name: str):
         return orch.services.get(service_name)[0].spec.enable_auth
     except OrchestratorError:
         return False
+
+
+def get_pool_group_name(service_name: str):
+    try:
+        orch = OrchClient.instance()
+        spec = orch.services.get(service_name)[0].spec
+        return (spec.pool, spec.group)
+    except OrchestratorError:
+        return None
diff --git a/src/pybind/mgr/dashboard/services/nvmeof_top_cli.py b/src/pybind/mgr/dashboard/services/nvmeof_top_cli.py
new file mode 100644 (file)
index 0000000..c2522d3
--- /dev/null
@@ -0,0 +1,665 @@
+# -*- coding: utf-8 -*-
+# This file is moved from the original work of "nvmeof-top" tool in:
+# https://github.com/pcuzner/ceph-nvmeof-top
+# by Paul Cuzner <pcuzner@ibm.com>
+import errno
+import json
+import logging
+import time
+from typing import Any, Optional
+
+from mgr_module import HandleCommandResult
+
+from .. import mgr
+from ..cli import DBCLICommand
+
+logger = logging.getLogger(__name__)
+
+NvmeofTopCollector = None
+
+try:
+    from .nvmeof_cli import NvmeofGatewaysConfig
+    from .nvmeof_client import NVMeoFClient
+    from .nvmeof_conf import get_pool_group_name
+except ImportError as e:
+    logger.error("Failed to import NVMeoFClient and related components: %s", e)
+else:
+    def get_collector(session_id: Optional[str]):
+        MAX_SESSION_TTL = 60 * 60
+        return mgr.get_nvmeof_collector(session_id, MAX_SESSION_TTL)
+
+    def get_lbg_gws_map(service_name: str):
+        pool_group = get_pool_group_name(service_name)
+        if not pool_group:
+            logger.error("Error getting pool name and group name of the service")
+            return {}
+        pool, group = pool_group
+        try:
+            cmd = {
+                'prefix': 'nvme-gw show',
+                'pool': pool,
+                'group': group,
+                'format': 'json'
+            }
+            ret_status, out, _ = mgr.mon_command(cmd)
+            if ret_status == 0 and out is not None:
+                lbg_gws_map = {}
+                gws_info = json.loads(out)
+                for gw in gws_info["Created Gateways:"]:
+                    gw_id = str(gw["gw-id"]).removeprefix("client.")
+                    gw_lbg = int(gw["anagrp-id"])
+                    lbg_gws_map[gw_lbg] = gw_id
+                return lbg_gws_map
+            return {}
+        except Exception:  # pylint: disable=broad-except
+            logger.exception('Failed to get nvme-gw show command')
+            return {}
+
+    class Health:
+        def __init__(self):
+            self.rc = 0
+            self.msg = ''
+
+    class Counter:
+        def __init__(self):
+            self.current = 0.0
+            self.last = 0.0
+
+        def update(self, new_value: float):
+            """Update the stats maintaining current and last"""
+            self.last = self.current
+            self.current = new_value
+
+        def rate(self, interval: float):
+            """Calculate the per second change rate"""
+            if not interval:
+                return 0.0
+            return (self.current - self.last) / interval
+
+    class PerformanceStats:  # pylint: disable=too-many-instance-attributes
+        def __init__(self, bdev: str):
+            self.bdev = bdev
+            self.read_ops = Counter()
+            self.read_bytes = Counter()
+            self.read_secs = Counter()
+            self.write_ops = Counter()
+            self.write_bytes = Counter()
+            self.write_secs = Counter()
+
+            self.read_ops_rate = 0
+            self.write_ops_rate = 0
+            self.read_bytes_rate = 0
+            self.write_bytes_rate = 0
+            self.read_secs_rate = 0.0
+            self.write_secs_rate = 0.0
+            self.total_ops_rate = 0
+            self.rareq_sz = 0.0
+            self.wareq_sz = 0.0
+            self.r_await = 0.0
+            self.w_await = 0.0
+
+        def calculate(self, delay: float):
+            self.read_ops_rate = self.read_ops.rate(delay)
+            self.read_bytes_rate = self.read_bytes.rate(delay)
+            self.read_secs_rate = self.read_secs.rate(delay)
+            self.write_ops_rate = self.write_ops.rate(delay)
+            self.write_bytes_rate = self.write_bytes.rate(delay)
+            self.write_secs_rate = self.write_secs.rate(delay)
+
+            self.total_ops_rate = self.read_ops_rate + self.write_ops_rate
+
+            if self.read_ops_rate:
+                self.rareq_sz = (int(self.read_bytes_rate / self.read_ops_rate) / 1024)
+                self.r_await = ((self.read_secs_rate / self.read_ops_rate) * 1000)  # for ms
+            else:
+                self.rareq_sz = 0.0
+                self.r_await = 0.0
+            if self.write_ops_rate:
+                self.wareq_sz = (int(self.write_bytes_rate / self.write_ops_rate) / 1024)
+                self.w_await = ((self.write_secs_rate / self.write_ops_rate) * 1000)  # for ms
+            else:
+                self.wareq_sz = 0.0
+                self.w_await = 0.0
+
+    class ReactorStats:
+        def __init__(self, thread: str):
+            self.thread = thread
+            self.busy_secs = Counter()
+            self.idle_secs = Counter()
+
+            self.busy_rate = 0.0
+            self.idle_rate = 0.0
+
+        def calculate(self, delay: float):
+            self.busy_rate = self.busy_secs.rate(delay)
+            self.idle_rate = self.idle_secs.rate(delay)
+
+    class NvmeofTopCollector:  # type: ignore[no-redef]  # noqa  # pylint: disable=function-redefined,too-many-instance-attributes
+        def __init__(self):
+            self.tool: Any = None
+            self.subsystem_nqn = ''
+            self.server_addr = ''
+            self.delay: float = 0.0
+            self.namespaces = {}
+            self.lbg_to_gateway: dict = {}
+            self.subsystems: Any = None
+            self.reactor_stats = {}
+            self.iostats = {}
+            self.gw_info: Any = None
+            self.client: Any = None
+            self.timestamp = time.time()
+            self.health = Health()
+
+        @property
+        def nqn_list(self):
+            return [subsys.nqn for subsys in self.subsystems.subsystems]
+
+        @property
+        def ready(self) -> bool:
+            return self.health.rc == 0
+
+        @property
+        def total_namespaces_defined(self) -> int:
+            return len(self.namespaces[self.subsystem_nqn])
+
+        @property
+        def total_subsystems(self) -> int:
+            return len(self.nqn_list)
+
+        @property
+        def total_namespaces_overall(self):
+            total = 0
+            for subsys in self.subsystems.subsystems:
+                total += subsys.namespace_count
+            return total
+
+        @property
+        def max_namespaces(self):
+            for subsys in self.subsystems.subsystems:
+                if subsys.nqn == self.subsystem_nqn:
+                    return subsys.max_namespaces
+            logger.error("Request for max namespaces could not find a "
+                         "match against the NQN! Returning 0")
+            return 0
+
+        @property
+        def load_balancing_group(self):
+            return self.gw_info.load_balancing_group
+
+        def get_sorted_namespaces(self, sort_pos: int, reverse_sort: bool):
+            logger.debug("get_sorted_namespaces")
+            ns_data = []
+            for ns in self.namespaces[self.subsystem_nqn]:
+                bdev_name = ns.bdev_name
+
+                daemon_name = ""
+                if self.tool.args.get('server_addr'):
+                    # only show namespaces owned by this gateway's LBG
+                    if ns.load_balancing_group != self.load_balancing_group:
+                        continue
+                    daemon_name = self.client.daemon_name
+                else:
+                    daemon_name = self.lbg_to_gateway.get(ns.load_balancing_group, '')
+                if not daemon_name:
+                    logger.warning("No gateway found for load balancing group %s, "
+                                   "skipping namespace %s",
+                                   ns.load_balancing_group, ns.nsid)
+                    continue
+                perf_stats = self.iostats.get(daemon_name, {}).get(bdev_name)
+                if perf_stats is None:
+                    logger.warning("No iostats for bdev %s on %s, skipping namespace %s",
+                                   bdev_name, daemon_name, ns.nsid)
+                    continue
+                perf_stats.calculate(self.delay)
+
+                ns_data.append((
+                    ns.nsid,
+                    f"{ns.rbd_pool_name}/{ns.rbd_image_name}",
+                    int(perf_stats.total_ops_rate),
+                    int(perf_stats.read_ops_rate),
+                    f"{self.bytes_to_MB(perf_stats.read_bytes_rate):3.2f}",
+                    f"{perf_stats.r_await:3.2f}",
+                    f"{perf_stats.rareq_sz:4.2f}",
+                    int(perf_stats.write_ops_rate),
+                    f"{self.bytes_to_MB(perf_stats.write_bytes_rate):3.2f}",
+                    f"{perf_stats.w_await:3.2f}",
+                    f"{perf_stats.wareq_sz:4.2f}",
+                    self.lb_group(ns.load_balancing_group),
+                    self.qos_enabled(ns)
+                ))
+
+            ns_data.sort(key=lambda t: t[sort_pos], reverse=reverse_sort)
+            return ns_data
+
+        def get_reactor_data(self, sort_pos: int, reverse_sort: bool):
+            reactor_data = []
+            for gw_addr, threads in self.reactor_stats.items():
+                for _, thread_stats in threads.items():
+                    thread_stats.calculate(self.delay)
+                    reactor_data.append((
+                        gw_addr,
+                        thread_stats.thread,
+                        f"{thread_stats.busy_rate * 100:.2f}",
+                        f"{thread_stats.idle_rate * 100:.2f}",
+                    ))
+            reactor_data.sort(key=lambda t: t[sort_pos], reverse=reverse_sort)
+            return reactor_data
+
+        def get_subsystem_summary_data(self):
+            return [
+                self.subsystem_nqn,
+                f'{self.total_namespaces_defined} / {self.max_namespaces}',
+            ]
+
+        def get_overall_summary_data(self):
+            return [
+                self.server_addr,
+                self.load_balancing_group,
+                self.total_subsystems,
+                self.total_namespaces_overall,
+            ]
+
+        def qos_enabled(self, ns) -> str:
+            if (ns.rw_ios_per_second or ns.rw_mbytes_per_second
+                    or ns.r_mbytes_per_second or ns.w_mbytes_per_second):
+                return 'Yes'
+            return 'No'
+
+        def lb_group(self, grp_id: int):
+            """Provide a meaningful default when load-balancing is not in use"""
+            return "N/A" if grp_id == 0 else f"{grp_id}"
+
+        def bytes_to_MB(self, num_bytes: int, si: int = 1024):
+            """Simple conversion of bytes to MiB or MB"""
+            return (num_bytes / si) / si
+
+        # grpc methods
+        def _call_grpc(self, method_name, request, client=None):
+            logger.debug("calling grpc method %s", method_name)
+            if not client:
+                client = self.client
+            try:
+                method = getattr(client.stub, method_name)
+                response = method(request)
+            except Exception as exc:  # pylint: disable=broad-except
+                self.health.rc = -errno.ECONNREFUSED
+                self.health.msg = f"RPC endpoint unavailable at {client.gateway_addr}"
+                logger.error("grpc call to %s failed: %s (%s)", method_name, self.health.msg, exc)
+                return None
+
+            self.health.msg = f"{method_name} success"
+            logger.debug("call to %s successful", method_name)
+            return response
+
+        def _fetch_namespace_iostats(self, client):
+            daemon_name = client.daemon_name
+            logger.debug("fetching iostats for namespaces from %s", daemon_name)
+            stats = self._call_grpc('list_namespaces_io_stats',
+                                    NVMeoFClient.pb2.list_namespaces_io_stats_req(), client)
+            logger.debug("list_namespaces_io_stats stats=%s", stats)
+            if stats is None:
+                return
+            if daemon_name not in self.iostats:
+                self.iostats[daemon_name] = {}
+
+            for ns in stats.namespaces:
+                bdev_name = ns.bdev_name
+                if bdev_name not in self.iostats[daemon_name]:
+                    self.iostats[daemon_name][bdev_name] = PerformanceStats(bdev_name)
+
+                ns_stats = self.iostats[daemon_name][bdev_name]
+                ns_stats.read_ops.update(ns.num_read_ops)
+                ns_stats.read_bytes.update(ns.bytes_read)
+                ns_stats.read_secs.update((ns.read_latency_ticks / stats.tick_rate))
+                ns_stats.write_ops.update(ns.num_write_ops)
+                ns_stats.write_bytes.update(ns.bytes_written)
+                ns_stats.write_secs.update((ns.write_latency_ticks / stats.tick_rate))
+
+        def _fetch_namespaces(self, subsystem_nqn):
+            return self._call_grpc(
+                'list_namespaces',
+                NVMeoFClient.pb2.list_namespaces_req(subsystem=subsystem_nqn))
+
+        def _fetch_thread_stats(self, client):
+            gateway_addr = client.gateway_addr
+            logger.debug("fetching thread stats for %s", gateway_addr)
+            stats = self._call_grpc('get_thread_stats',
+                                    NVMeoFClient.pb2.get_thread_stats_req(), client)
+            logger.debug("get_thread_stats stats=%s", stats)
+            if stats is None:
+                return
+            if gateway_addr not in self.reactor_stats:
+                self.reactor_stats[gateway_addr] = {}
+            tick_rate = stats.tick_rate
+            for thread in stats.threads:
+                name = thread.name
+                if name not in self.reactor_stats[gateway_addr]:
+                    self.reactor_stats[gateway_addr][name] = ReactorStats(thread.name)
+                thread_stats = self.reactor_stats[gateway_addr][name]
+                thread_stats.busy_secs.update(thread.busy / tick_rate)
+                thread_stats.idle_secs.update(thread.idle / tick_rate)
+
+        def _fetch_gateway_info(self, client):
+            return self._call_grpc(
+                'get_gateway_info',
+                NVMeoFClient.pb2.get_gateway_info_req(), client)
+
+        def _fetch_subsystems(self):
+            return self._call_grpc('list_subsystems', NVMeoFClient.pb2.list_subsystems_req())
+
+        def initialise(self, tool):
+            self.health = Health()
+            self.tool = tool
+            self.client = NVMeoFClient(tool.args.get('group', ''),
+                                       tool.args.get('server_addr', ''))
+            self.server_addr = self.client.gateway_addr
+
+            now = time.time()
+            self.delay = (now - self.timestamp)
+            self.timestamp = now
+
+            self.gw_info = self._fetch_gateway_info(self.client)
+            if not self.ready:
+                logger.error("Call to %s failed, RC=%s, MSG=%s",
+                             self.server_addr, self.health.rc, self.health.msg)
+                self.health.msg = (
+                    f"Unable to connect to {self.server_addr}, "
+                    "pass an available gateway as --server-addr"
+                )
+                return
+
+            logger.debug("Connected to %s", self.server_addr)
+
+        def collect_cpu_data(self):
+            service_name = self.tool.service_name
+            group = self.tool.args.get('group', '')
+            if service_name:
+                gw_conf = NvmeofGatewaysConfig.get_gateways_config()
+                gateways = gw_conf.get("gateways", {})
+                if service_name not in gateways:
+                    self.health.rc = -errno.ENOENT
+                    self.health.msg = f'Service {service_name} not found'
+                    return
+                for gw in gateways[service_name]:
+                    client = NVMeoFClient(group, gw["service_url"])
+                    self._fetch_thread_stats(client)
+                    if not self.ready:
+                        return
+            else:
+                self._fetch_thread_stats(self.client)
+            logger.debug("collect_cpu_data completed")
+
+        def collect_io_data(self):  # pylint: disable=too-many-return-statements
+            self.subsystem_nqn = self.tool.subsystem_nqn
+
+            self.subsystems = self._fetch_subsystems()
+            if self.subsystems is None or self.subsystems.status > 0:
+                logger.error("Failed to retrieve subsystems list")
+                self.health.rc = -errno.ECONNREFUSED
+                self.health.msg = "Unable to retrieve a list of subsystems"
+                return
+
+            if self.total_subsystems == 0:
+                self.health.rc = -errno.ENOENT
+                self.health.msg = 'No subsystems found'
+                return
+
+            if self.subsystem_nqn and self.subsystem_nqn not in self.nqn_list:
+                logger.error("nqn provided is not present on the gateway")
+                self.health.rc = -errno.ENOENT
+                self.health.msg = "Subsystem NQN provided not found"
+                return
+
+            namespace_info = self._fetch_namespaces(self.subsystem_nqn)
+            if namespace_info is None:
+                return
+
+            self.namespaces[self.subsystem_nqn] = namespace_info.namespaces
+            logger.debug("Subsystem '%s' has %s namespaces",
+                         self.subsystem_nqn, self.total_namespaces_defined)
+
+            group = self.tool.args.get('group', '')
+            if not self.tool.args.get('server_addr'):
+                service_name = self.client.service_name
+                gw_conf = NvmeofGatewaysConfig.get_gateways_config()
+                gateways = gw_conf.get("gateways", {})
+                if service_name not in gateways:
+                    self.health.rc = -errno.ENOENT
+                    self.health.msg = f'Service {service_name} not found'
+                    return
+                self.lbg_to_gateway = get_lbg_gws_map(service_name)
+                if not self.lbg_to_gateway:
+                    self.health.rc = -errno.ENOENT
+                    self.health.msg = (
+                        f'Failed to retrieve load balancing group '
+                        f'mapping for service {service_name}'
+                    )
+                    return
+                for gw in gateways[service_name]:
+                    client = NVMeoFClient(group, gw["service_url"])
+                    self._fetch_namespace_iostats(client)
+                    if not self.ready:
+                        return
+            else:
+                self._fetch_namespace_iostats(self.client)
+            logger.debug("collect_io_data completed")
+
+    class NVMeoFTopTool:
+        def __init__(self, args: dict, data_collector):
+            self.args = args
+            self.collector = data_collector
+            self.reverse_sort = args.get('sort_descending', False)
+            self.sort_key = args.get('sort_by')
+
+        def run(self) -> tuple:
+            try:
+                self.collector.initialise(self)
+                if not self.collector.ready:
+                    return (self.collector.health.rc,
+                            f"nvmeof-top has encountered an error: "
+                            f"{self.collector.health.msg}")
+
+                collect_start = time.time()
+                self._collect()
+                logger.info("collector methods took %.2fs", time.time() - collect_start)
+
+                if not self.collector.ready:
+                    return (self.collector.health.rc, self.collector.health.msg)
+
+                output = self.format_output()
+                output += "\n ---- "
+                return (0, output)
+            except Exception as ex:  # pylint: disable=broad-except
+                logger.exception("top tool failed to run: %s", ex)
+                return (-errno.EINVAL, str(ex))
+
+        def _collect(self):
+            raise NotImplementedError
+
+        def format_output(self):
+            raise NotImplementedError
+
+    class NVMeoFTopCPU(NVMeoFTopTool):
+        reactors_headers = ['Gateway', 'Thread Name', 'Busy Rate%', 'Idle Rate%']
+        reactors_template = "{:<30}   {:<30}   {:<20}   {:<20}\n"
+
+        def __init__(self, args: dict, data_collector):
+            super().__init__(args, data_collector)
+            self.service_name = args.get('service')
+
+        def _collect(self):
+            self.collector.collect_cpu_data()
+
+        def format_output(self):
+            if self.sort_key not in NVMeoFTopCPU.reactors_headers:
+                raise ValueError(
+                    f"Invalid sort key '{self.sort_key}'. "
+                    f"Valid options: {NVMeoFTopCPU.reactors_headers}"
+                )
+            sort_pos = NVMeoFTopCPU.reactors_headers.index(self.sort_key)
+            reactor_data = self.collector.get_reactor_data(sort_pos=sort_pos,
+                                                           reverse_sort=self.reverse_sort)
+            rows = []
+            if self.args.get('with_timestamp'):
+                timestamp = time.strftime('%Y-%m-%d %H:%M:%S',
+                                          time.localtime(self.collector.timestamp))
+                rows.append(f"{timestamp} (delay: {self.collector.delay:.2f}s)\n")
+
+            if not self.args.get('no_header'):
+                rows.append(NVMeoFTopCPU.reactors_template.format(*NVMeoFTopCPU.reactors_headers))
+            for reactor in reactor_data:
+                rows.append(NVMeoFTopCPU.reactors_template.format(*reactor))
+            rows.append("\n")
+
+            return ''.join(rows)
+
+    class NVMeoFTopIO(NVMeoFTopTool):
+        subsystem_summary_headers = ['Subsystem', 'Namespaces']
+        summary_headers = ['Gateway', 'Load Balancing Group',
+                           'Total Subsystems', 'Total Namespaces']
+
+        ns_headers = [
+            'NSID', 'RBD Image', 'IOPS', 'r/s', 'rMB/s', 'r_await', 'rareq-sz',
+            'w/s', 'wMB/s', 'w_await', 'wareq-sz', 'LBGrp', 'QoS'
+        ]
+        ns_template = (
+            "{:>4}   {:<40}   {:>7}   {:>6}   {:>6}   {:>7}   {:>8}"
+            "   {:>6}   {:>6}   {:>7}   {:>8}   {:^5}   {:>3}\n"
+        )
+
+        def __init__(self, args: dict, data_collector):
+            super().__init__(args, data_collector)
+            self.subsystem_nqn = args.get('subsystem')
+
+        def _collect(self):
+            self.collector.collect_io_data()
+
+        def format_output(self):
+            if self.sort_key not in NVMeoFTopIO.ns_headers:
+                raise ValueError(
+                    f"Invalid sort key '{self.sort_key}'. "
+                    f"Valid options: {NVMeoFTopIO.ns_headers}"
+                )
+            sort_pos = NVMeoFTopIO.ns_headers.index(self.sort_key)
+            ns_data = self.collector.get_sorted_namespaces(sort_pos=sort_pos,
+                                                           reverse_sort=self.reverse_sort)
+            subsystem_summary_data = self.collector.get_subsystem_summary_data()
+            overall_summary_data = self.collector.get_overall_summary_data()
+
+            rows = []
+            if self.args.get('with_timestamp'):
+                timestamp = time.strftime('%Y-%m-%d %H:%M:%S',
+                                          time.localtime(self.collector.timestamp))
+                rows.append(f"{timestamp} (delay: {self.collector.delay:.2f}s)\n")
+            if self.args.get('summary'):
+                if self.args.get('server_addr'):
+                    summary_row = ""
+                    for index, header in enumerate(NVMeoFTopIO.summary_headers):
+                        summary_row += f"{header}: {overall_summary_data[index]}  "
+                    rows.append(summary_row + "\n")
+                subsys_summary_row = ""
+                for index, header in enumerate(NVMeoFTopIO.subsystem_summary_headers):
+                    subsys_summary_row += f"{header}: {subsystem_summary_data[index]}  "
+                rows.append(subsys_summary_row + "\n\n")
+            if not self.args.get('no_header'):
+                rows.append(NVMeoFTopIO.ns_template.format(*NVMeoFTopIO.ns_headers))
+            if ns_data:
+                for ns in ns_data:
+                    rows.append(NVMeoFTopIO.ns_template.format(*ns))
+            else:
+                rows.append("<no namespaces defined>\n")
+
+            return ''.join(rows)
+
+    @DBCLICommand.Read('nvmeof top cpu', poll=True)
+    def nvmeof_top_cpu(_, service: str = '',
+                       server_addr: str = '', group: str = '',
+                       descending: bool = False, sort_by: str = 'Thread Name',
+                       with_timestamp: bool = False,
+                       no_header: bool = False,
+                       session_id: Optional[str] = None):
+        '''
+        NVMeoF Top CPU Tool
+        --period [-p] <delay> (default 1s, max 3600s)
+        --service '<service_name>'
+        --server-addr <ip>
+        --group '<group_name>'
+        --sort-by '<header>'
+        --descending
+        --with-timestamp
+        --no-header
+        '''
+        args = {
+            'service': service,
+            'with_timestamp': with_timestamp,
+            'no_header': no_header,
+            'sort_descending': descending,
+            'sort_by': sort_by,
+            'server_addr': server_addr,
+            'group': group,
+        }
+        try:
+            data_collector = get_collector(session_id)
+            if data_collector is None:
+                return HandleCommandResult(
+                    stderr="Unable to initialise collector",
+                    retval=-errno.EINVAL
+                )
+            top_tool = NVMeoFTopCPU(args, data_collector)
+            rc, output = top_tool.run()
+            if rc != 0:
+                return HandleCommandResult(stderr=output, retval=rc)
+            return HandleCommandResult(stdout=output, retval=rc)
+        except Exception as exc:  # pylint: disable=broad-except
+            logger.exception("top-cpu command failed: %s", exc)
+            return HandleCommandResult(stderr=str(exc), retval=-errno.EINVAL)
+
+    @DBCLICommand.Read('nvmeof top io', poll=True)
+    def nvmeof_top_io(_, subsystem: str = '',
+                      server_addr: str = '', group: str = '',
+                      descending: bool = False, sort_by: str = 'NSID',
+                      with_timestamp: bool = False,
+                      summary: bool = False, no_header: bool = False,
+                      session_id: Optional[str] = None):
+        '''
+        NVMeoF Top IO Tool
+        --period [-p] <delay> (default 1s, max 3600s)
+        --subsystem '<nqn>'
+        --server-addr <ip>
+        --group '<group_name>'
+        --descending
+        --sort-by '<header>'
+        --with-timestamp
+        --summary
+        --no-header
+        '''
+        args = {
+            'subsystem': subsystem,
+            'with_timestamp': with_timestamp,
+            'summary': summary,
+            'no_header': no_header,
+            'sort_descending': descending,
+            'sort_by': sort_by,
+            'server_addr': server_addr,
+            'group': group,
+        }
+        if not subsystem:
+            return HandleCommandResult(
+                stderr="Required argument '--subsystem' missing",
+                retval=-errno.EINVAL
+            )
+        try:
+            data_collector = get_collector(session_id)
+            if data_collector is None:
+                return HandleCommandResult(
+                    stderr="Unable to initialise collector",
+                    retval=-errno.EINVAL
+                )
+            top_tool = NVMeoFTopIO(args, data_collector)
+            rc, output = top_tool.run()
+            if rc != 0:
+                return HandleCommandResult(stderr=output, retval=rc)
+            return HandleCommandResult(stdout=output, retval=rc)
+        except Exception as exc:  # pylint: disable=broad-except
+            logger.exception("top-io command failed: %s", exc)
+            return HandleCommandResult(stderr=str(exc), retval=-errno.EINVAL)
diff --git a/src/pybind/mgr/dashboard/tests/test_nvmeof_top_cli.py b/src/pybind/mgr/dashboard/tests/test_nvmeof_top_cli.py
new file mode 100644 (file)
index 0000000..8c9f691
--- /dev/null
@@ -0,0 +1,308 @@
+# -*- coding: utf-8 -*-
+import errno
+import time
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from ..services.nvmeof_top_cli import Counter, NvmeofTopCollector, NVMeoFTopCPU, NVMeoFTopIO
+from ..tests import CLICommandTestMixin, CmdException
+
+
+@pytest.fixture(name='cpu_collector')
+def fixture_cpu_collector():
+    collector = MagicMock()
+    collector.get_reactor_data.return_value = []
+    collector.delay = 1.5
+    collector.timestamp = time.time()
+    return collector
+
+
+@pytest.fixture(name='io_collector')
+def fixture_io_collector():
+    collector = MagicMock()
+    collector.get_sorted_namespaces.return_value = []
+    collector.get_subsystem_summary_data.return_value = []
+    collector.get_overall_summary_data.return_value = []
+    collector.delay = 2.0
+    collector.timestamp = time.time()
+    return collector
+
+
+class TestCounter:
+    def test_initial_values(self):
+        c = Counter()
+        assert c.current == 0.0
+        assert c.last == 0.0
+
+    def test_update_tracks_last(self):
+        c = Counter()
+        c.update(10.0)
+        assert c.current == 10.0
+        assert c.last == 0.0
+        c.update(20.0)
+        assert c.current == 20.0
+        assert c.last == 10.0
+
+    def test_rate(self):
+        c = Counter()
+        c.update(100.0)
+        c.update(150.0)
+        assert c.rate(5.0) == 10.0
+
+    def test_rate_zero_interval_returns_zero(self):
+        c = Counter()
+        c.update(100.0)
+        assert c.rate(0) == 0.0
+
+
+class TestNVMeoFTopCPUFormat:
+    default_args = {
+        'sort_by': 'Thread Name',
+        'sort_descending': False,
+        'with_timestamp': False,
+        'no_header': False,
+        'service': '',
+        'server_addr': '',
+        'group': '',
+    }
+
+    def test_headers(self, cpu_collector):
+        tool = NVMeoFTopCPU(self.default_args, cpu_collector)
+        output = tool.format_output()
+        assert 'Gateway' in output
+        assert 'Thread Name' in output
+        assert 'Busy Rate%' in output
+        assert 'Idle Rate%' in output
+
+    def test_no_header(self, cpu_collector):
+        tool = NVMeoFTopCPU({**self.default_args, 'no_header': True}, cpu_collector)
+        output = tool.format_output()
+        assert 'Gateway' not in output
+
+    def test_with_timestamp(self, cpu_collector):
+        tool = NVMeoFTopCPU({**self.default_args, 'with_timestamp': True}, cpu_collector)
+        output = tool.format_output()
+        assert 'delay:' in output
+        assert '1.50s' in output
+
+    def test_reactor_data(self, cpu_collector):
+        cpu_collector.get_reactor_data.return_value = [
+            ('192.168.1.1:5500', 'reactor_0', '72.50', '27.50'),
+            ('192.168.1.1:5500', 'reactor_1', '45.00', '55.00'),
+        ]
+        tool = NVMeoFTopCPU(self.default_args, cpu_collector)
+        output = tool.format_output()
+        assert '192.168.1.1:5500' in output
+        assert 'reactor_0' in output
+        assert 'reactor_1' in output
+
+    def test_invalid_sort_key(self, cpu_collector):
+        tool = NVMeoFTopCPU({**self.default_args, 'sort_by': 'NonExistent'}, cpu_collector)
+        with pytest.raises(ValueError, match="Invalid sort key"):
+            tool.format_output()
+
+
+class TestNVMeoFTopIOFormat:
+    default_args = {
+        'sort_by': 'NSID',
+        'sort_descending': False,
+        'with_timestamp': False,
+        'no_header': False,
+        'summary': False,
+        'subsystem': 'nqn.2024-01.io.spdk:cnode1',
+        'server_addr': '',
+        'group': '',
+    }
+
+    def test_no_namespaces(self, io_collector):
+        tool = NVMeoFTopIO(self.default_args, io_collector)
+        output = tool.format_output()
+        assert '<no namespaces defined>' in output
+
+    def test_headers_present(self, io_collector):
+        tool = NVMeoFTopIO(self.default_args, io_collector)
+        output = tool.format_output()
+        assert 'NSID' in output
+        assert 'RBD Image' in output
+
+    def test_no_header(self, io_collector):
+        tool = NVMeoFTopIO({**self.default_args, 'no_header': True}, io_collector)
+        output = tool.format_output()
+        assert 'NSID' not in output
+
+    def test_namespace_data(self, io_collector):
+        io_collector.get_sorted_namespaces.return_value = [
+            (1, 'pool/image1', 100, 50, '1.00', '0.50', '4.00',
+             50, '1.00', '0.50', '4.00', '1', 'No'),
+            (2, 'pool/image2', 200, 100, '2.00', '1.00', '8.00',
+             100, '2.00', '1.00', '8.00', '2', 'Yes'),
+        ]
+        tool = NVMeoFTopIO(self.default_args, io_collector)
+        output = tool.format_output()
+        assert 'pool/image1' in output
+        assert 'pool/image2' in output
+
+    def test_with_timestamp(self, io_collector):
+        tool = NVMeoFTopIO({**self.default_args, 'with_timestamp': True}, io_collector)
+        output = tool.format_output()
+        assert 'delay:' in output
+
+    def test_invalid_sort_key(self, io_collector):
+        tool = NVMeoFTopIO({**self.default_args, 'sort_by': 'BadKey'}, io_collector)
+        with pytest.raises(ValueError, match="Invalid sort key"):
+            tool.format_output()
+
+
+class TestNvmeofTopCollector:
+    @pytest.fixture
+    def collector(self):
+        c = NvmeofTopCollector()
+        c.client = MagicMock()
+        c.client.service_name = 'myservice'
+        c.tool = MagicMock()
+        c.tool.args = {'group': '', 'server_addr': ''}
+        return c
+
+    def test_grpc_call_failure(self, collector):
+        collector.client.gateway_addr = '192.168.1.1:5500'
+        collector.client.stub.get_gateway_info.side_effect = Exception('connection refused')
+        collector._call_grpc(  # pylint: disable=protected-access
+            'get_gateway_info', MagicMock(), collector.client)
+        assert collector.health.rc == -errno.ECONNREFUSED
+        assert collector.health.msg == 'RPC endpoint unavailable at 192.168.1.1:5500'
+
+    def test_collect_cpu_data_service_not_found(self, collector):
+        collector.tool.service_name = 'myservice'
+        with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value={'gateways': {}}):
+            collector.collect_cpu_data()
+        assert collector.health.rc == -errno.ENOENT
+        assert collector.health.msg == 'Service myservice not found'
+
+    def test_collect_io_data_subsystems_unavailable(self, collector):
+        collector.tool.subsystem_nqn = 'nqn.test'
+        with patch.object(collector, '_fetch_subsystems', return_value=None):
+            collector.collect_io_data()
+        assert collector.health.rc == -errno.ECONNREFUSED
+        assert collector.health.msg == 'Unable to retrieve a list of subsystems'
+
+    def test_collect_io_data_no_subsystems(self, collector):
+        collector.tool.subsystem_nqn = ''
+        mock_subsystems = MagicMock()
+        mock_subsystems.status = 0
+        mock_subsystems.subsystems = []
+        with patch.object(collector, '_fetch_subsystems', return_value=mock_subsystems):
+            collector.collect_io_data()
+        assert collector.health.rc == -errno.ENOENT
+        assert collector.health.msg == 'No subsystems found'
+
+    def test_collect_io_data_nqn_not_found(self, collector):
+        collector.tool.subsystem_nqn = 'nqn.test'
+        mock_subsystems = MagicMock()
+        mock_subsystems.status = 0
+        mock_sub = MagicMock()
+        mock_sub.nqn = 'nqn.other'
+        mock_subsystems.subsystems = [mock_sub]
+        with patch.object(collector, '_fetch_subsystems', return_value=mock_subsystems):
+            collector.collect_io_data()
+        assert collector.health.rc == -errno.ENOENT
+        assert collector.health.msg == 'Subsystem NQN provided not found'
+
+    def test_collect_io_data_service_not_found(self, collector):
+        collector.tool.subsystem_nqn = 'nqn.test'
+        mock_subsystems = MagicMock()
+        mock_subsystems.status = 0
+        mock_sub = MagicMock()
+        mock_sub.nqn = 'nqn.test'
+        mock_subsystems.subsystems = [mock_sub]
+        mock_namespace_info = MagicMock()
+        mock_namespace_info.namespaces = []
+        with patch.object(collector, '_fetch_subsystems', return_value=mock_subsystems), \
+             patch.object(collector, '_fetch_namespaces', return_value=mock_namespace_info), \
+             patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value={'gateways': {}}):
+            collector.collect_io_data()
+        assert collector.health.rc == -errno.ENOENT
+        assert collector.health.msg == 'Service myservice not found'
+
+    def test_collect_io_data_lbg_mapping_failed(self, collector):
+        collector.tool.subsystem_nqn = 'nqn.test'
+        mock_subsystems = MagicMock()
+        mock_subsystems.status = 0
+        mock_sub = MagicMock()
+        mock_sub.nqn = 'nqn.test'
+        mock_subsystems.subsystems = [mock_sub]
+        mock_namespace_info = MagicMock()
+        mock_namespace_info.namespaces = []
+        with patch.object(collector, '_fetch_subsystems', return_value=mock_subsystems), \
+             patch.object(collector, '_fetch_namespaces', return_value=mock_namespace_info), \
+             patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value={'gateways': {'myservice': []}}), \
+             patch('dashboard.services.nvmeof_top_cli.get_lbg_gws_map', return_value={}):
+            collector.collect_io_data()
+        assert collector.health.rc == -errno.ENOENT
+        assert collector.health.msg == \
+            'Failed to retrieve load balancing group mapping for service myservice'
+
+
+class TestNvmeofTopCommands(CLICommandTestMixin):
+    @classmethod
+    def exec_nvmeof_cmd(cls, cmd, **kwargs):
+        return cls.exec_cmd('', prefix=cmd, **kwargs)
+
+    def test_top_io_missing_subsystem_returns_einval(self):
+        with pytest.raises(CmdException) as exc_info:
+            self.exec_nvmeof_cmd('nvmeof top io', subsystem='', session_id='sess1')
+        assert exc_info.value.retcode == -errno.EINVAL
+        assert str(exc_info.value) == "Required argument '--subsystem' missing"
+
+    def test_top_cpu_success(self):
+        with patch('dashboard.services.nvmeof_top_cli.get_collector') as mock_gc:
+            mock_gc.return_value = MagicMock()
+            with patch.object(NVMeoFTopCPU, 'run', return_value=(0, 'cpu output\n')):
+                result = self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1')
+                assert 'cpu output' in result
+                mock_gc.assert_called_once_with('sess1')
+
+    def test_top_cpu_run_fail(self):
+        with patch('dashboard.services.nvmeof_top_cli.get_collector') as mock_gc:
+            mock_gc.return_value = MagicMock()
+            with patch.object(NVMeoFTopCPU, 'run',
+                              return_value=(-errno.ENOENT, 'error')):
+                with pytest.raises(CmdException) as exc_info:
+                    self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1')
+                assert exc_info.value.retcode == -errno.ENOENT
+                assert str(exc_info.value) == 'error'
+
+    def test_top_io_success(self):
+        with patch('dashboard.services.nvmeof_top_cli.get_collector') as mock_gc:
+            mock_gc.return_value = MagicMock()
+            with patch.object(NVMeoFTopIO, 'run', return_value=(0, 'io output\n')):
+                result = self.exec_nvmeof_cmd(
+                    'nvmeof top io',
+                    subsystem='nqn.test',
+                    session_id='sess2'
+                )
+                assert 'io output' in result
+                mock_gc.assert_called_once_with('sess2')
+
+    def test_top_cpu_get_collector_fail(self):
+        with patch('dashboard.services.nvmeof_top_cli.get_collector',
+                   side_effect=RuntimeError("boom")):
+            with pytest.raises(CmdException) as exc_info:
+                self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1')
+            assert exc_info.value.retcode == -errno.EINVAL
+            assert str(exc_info.value) == 'boom'
+
+    def test_top_io_get_collector_fail(self):
+        with patch('dashboard.services.nvmeof_top_cli.get_collector',
+                   side_effect=RuntimeError("boom")):
+            with pytest.raises(CmdException) as exc_info:
+                self.exec_nvmeof_cmd(
+                    'nvmeof top io',
+                    subsystem='nqn.test',
+                    session_id='sess2'
+                )
+            assert exc_info.value.retcode == -errno.EINVAL
+            assert str(exc_info.value) == 'boom'