From e5b669b11dc58d9821380cbfb74272d9d54dd432 Mon Sep 17 00:00:00 2001 From: Vallari Agrawal Date: Tue, 10 Feb 2026 02:58:28 +0530 Subject: [PATCH] src/pybind/mgr: Add nvmeof-top tool Add src/pybind/mgr/dashboard/services/nvmeof_top_cli.py which adds commands: "ceph nvmeof top io " "ceph nvmeof top cpu" This file is moved from the original work in: https://github.com/pcuzner/ceph-nvmeof-top Co-authored-by: Paul Cuzner Signed-off-by: Vallari Agrawal --- src/ceph.in | 2 + src/pybind/mgr/dashboard/module.py | 37 +- .../mgr/dashboard/services/nvmeof_client.py | 3 + .../mgr/dashboard/services/nvmeof_conf.py | 9 + .../mgr/dashboard/services/nvmeof_top_cli.py | 665 ++++++++++++++++++ .../dashboard/tests/test_nvmeof_top_cli.py | 308 ++++++++ 6 files changed, 1023 insertions(+), 1 deletion(-) create mode 100644 src/pybind/mgr/dashboard/services/nvmeof_top_cli.py create mode 100644 src/pybind/mgr/dashboard/tests/test_nvmeof_top_cli.py diff --git a/src/ceph.in b/src/ceph.in index abfc8fb57d9..64a8857e98e 100755 --- a/src/ceph.in +++ b/src/ceph.in @@ -29,6 +29,7 @@ import stat import sys import time import platform +import uuid from typing import Dict, List, Sequence, Tuple @@ -593,6 +594,7 @@ def do_command(parsed_args, target, cmdargs, sigdict, inbuf, verbose): # Set extra options for polling commands only: if valid_dict.get('poll', False): valid_dict['width'] = Termsize().cols + valid_dict['session_id'] = str(uuid.uuid4()) while True: try: # Only print the header for polling commands diff --git a/src/pybind/mgr/dashboard/module.py b/src/pybind/mgr/dashboard/module.py index 713deca65e8..5001a1bb730 100644 --- a/src/pybind/mgr/dashboard/module.py +++ b/src/pybind/mgr/dashboard/module.py @@ -34,9 +34,10 @@ from .cli import DBCLICommand from .controllers import nvmeof # noqa # pylint: disable=unused-import from .controllers import Router, json_error_page from .grafana import push_local_dashboards -from .services import nvmeof_cli # noqa # pylint: disable=unused-import +from .services import nvmeof_cli, nvmeof_top_cli # noqa # pylint: disable=unused-import from .services.auth import AuthManager, AuthManagerTool, JwtManager from .services.exception import dashboard_exception_handler +from .services.nvmeof_top_cli import NvmeofTopCollector from .services.service import RgwServiceManager from .services.sso import SSO_COMMANDS, handle_sso_command from .settings import handle_option_command, options_command_list, options_schema_list @@ -304,6 +305,7 @@ class Module(MgrModule, CherryPyConfig): self.ACCESS_CTRL_DB = None self.SSO_DB = None self.health_checks = {} + self.nvmeof_collectors = {} @classmethod def can_run(cls): @@ -556,6 +558,39 @@ class Module(MgrModule, CherryPyConfig): return self.__pool_stats + def get_nvmeof_collector(self, session_id: str = '', ttl: int = 3600): + STALE_POLL_THRESHOLD = 5 # expire if 5 poll intervals have passed without activity + + def _expire_old_sessions(): + now = time.time() + expired = [] + + for _id in list(self.nvmeof_collectors.keys()): + collector = self.nvmeof_collectors[_id] + delay = collector.delay + if delay < 1: # for first poll iteration + expire_time = collector.timestamp + ttl + else: + expire_time = collector.timestamp + (STALE_POLL_THRESHOLD * delay) + if now > expire_time: + expired.append(_id) + + for _id in expired: + self.nvmeof_collectors.pop(_id, None) + + if NvmeofTopCollector is None: + logger.error("NVMeoFClient is not available") + return None + + if not session_id: + return None + + _expire_old_sessions() + + if session_id not in self.nvmeof_collectors: + self.nvmeof_collectors[session_id] = NvmeofTopCollector() + return self.nvmeof_collectors[session_id] + def config_notify(self): """ This method is called whenever one of our config options is changed. diff --git a/src/pybind/mgr/dashboard/services/nvmeof_client.py b/src/pybind/mgr/dashboard/services/nvmeof_client.py index de9f76ed858..18b48831a76 100644 --- a/src/pybind/mgr/dashboard/services/nvmeof_client.py +++ b/src/pybind/mgr/dashboard/services/nvmeof_client.py @@ -48,6 +48,7 @@ else: f'Unable to retrieve the gateway info: {e}' ) + self.daemon_name = '' # While creating listener need to direct request to the gateway # address where listener is supposed to be added. if server_address: @@ -62,6 +63,7 @@ else: None ) if matched_gateway: + self.daemon_name = matched_gateway.get('daemon_name') self.gateway_addr = matched_gateway.get('service_url') logger.debug("Gateway address set to: %s", self.gateway_addr) enable_auth = is_mtls_enabled(service_name) @@ -80,6 +82,7 @@ else: logger.info("Insecurely connecting to: %s", self.gateway_addr) self.channel = grpc.insecure_channel(self.gateway_addr) self.stub = pb2_grpc.GatewayStub(self.channel) + self.service_name = service_name Model = Dict[str, Any] Collection = List[Model] diff --git a/src/pybind/mgr/dashboard/services/nvmeof_conf.py b/src/pybind/mgr/dashboard/services/nvmeof_conf.py index 0a80bc26deb..da06f1b2328 100644 --- a/src/pybind/mgr/dashboard/services/nvmeof_conf.py +++ b/src/pybind/mgr/dashboard/services/nvmeof_conf.py @@ -216,3 +216,12 @@ def is_mtls_enabled(service_name: str): return orch.services.get(service_name)[0].spec.enable_auth except OrchestratorError: return False + + +def get_pool_group_name(service_name: str): + try: + orch = OrchClient.instance() + spec = orch.services.get(service_name)[0].spec + return (spec.pool, spec.group) + except OrchestratorError: + return None diff --git a/src/pybind/mgr/dashboard/services/nvmeof_top_cli.py b/src/pybind/mgr/dashboard/services/nvmeof_top_cli.py new file mode 100644 index 00000000000..c2522d3dafc --- /dev/null +++ b/src/pybind/mgr/dashboard/services/nvmeof_top_cli.py @@ -0,0 +1,665 @@ +# -*- coding: utf-8 -*- +# This file is moved from the original work of "nvmeof-top" tool in: +# https://github.com/pcuzner/ceph-nvmeof-top +# by Paul Cuzner +import errno +import json +import logging +import time +from typing import Any, Optional + +from mgr_module import HandleCommandResult + +from .. import mgr +from ..cli import DBCLICommand + +logger = logging.getLogger(__name__) + +NvmeofTopCollector = None + +try: + from .nvmeof_cli import NvmeofGatewaysConfig + from .nvmeof_client import NVMeoFClient + from .nvmeof_conf import get_pool_group_name +except ImportError as e: + logger.error("Failed to import NVMeoFClient and related components: %s", e) +else: + def get_collector(session_id: Optional[str]): + MAX_SESSION_TTL = 60 * 60 + return mgr.get_nvmeof_collector(session_id, MAX_SESSION_TTL) + + def get_lbg_gws_map(service_name: str): + pool_group = get_pool_group_name(service_name) + if not pool_group: + logger.error("Error getting pool name and group name of the service") + return {} + pool, group = pool_group + try: + cmd = { + 'prefix': 'nvme-gw show', + 'pool': pool, + 'group': group, + 'format': 'json' + } + ret_status, out, _ = mgr.mon_command(cmd) + if ret_status == 0 and out is not None: + lbg_gws_map = {} + gws_info = json.loads(out) + for gw in gws_info["Created Gateways:"]: + gw_id = str(gw["gw-id"]).removeprefix("client.") + gw_lbg = int(gw["anagrp-id"]) + lbg_gws_map[gw_lbg] = gw_id + return lbg_gws_map + return {} + except Exception: # pylint: disable=broad-except + logger.exception('Failed to get nvme-gw show command') + return {} + + class Health: + def __init__(self): + self.rc = 0 + self.msg = '' + + class Counter: + def __init__(self): + self.current = 0.0 + self.last = 0.0 + + def update(self, new_value: float): + """Update the stats maintaining current and last""" + self.last = self.current + self.current = new_value + + def rate(self, interval: float): + """Calculate the per second change rate""" + if not interval: + return 0.0 + return (self.current - self.last) / interval + + class PerformanceStats: # pylint: disable=too-many-instance-attributes + def __init__(self, bdev: str): + self.bdev = bdev + self.read_ops = Counter() + self.read_bytes = Counter() + self.read_secs = Counter() + self.write_ops = Counter() + self.write_bytes = Counter() + self.write_secs = Counter() + + self.read_ops_rate = 0 + self.write_ops_rate = 0 + self.read_bytes_rate = 0 + self.write_bytes_rate = 0 + self.read_secs_rate = 0.0 + self.write_secs_rate = 0.0 + self.total_ops_rate = 0 + self.rareq_sz = 0.0 + self.wareq_sz = 0.0 + self.r_await = 0.0 + self.w_await = 0.0 + + def calculate(self, delay: float): + self.read_ops_rate = self.read_ops.rate(delay) + self.read_bytes_rate = self.read_bytes.rate(delay) + self.read_secs_rate = self.read_secs.rate(delay) + self.write_ops_rate = self.write_ops.rate(delay) + self.write_bytes_rate = self.write_bytes.rate(delay) + self.write_secs_rate = self.write_secs.rate(delay) + + self.total_ops_rate = self.read_ops_rate + self.write_ops_rate + + if self.read_ops_rate: + self.rareq_sz = (int(self.read_bytes_rate / self.read_ops_rate) / 1024) + self.r_await = ((self.read_secs_rate / self.read_ops_rate) * 1000) # for ms + else: + self.rareq_sz = 0.0 + self.r_await = 0.0 + if self.write_ops_rate: + self.wareq_sz = (int(self.write_bytes_rate / self.write_ops_rate) / 1024) + self.w_await = ((self.write_secs_rate / self.write_ops_rate) * 1000) # for ms + else: + self.wareq_sz = 0.0 + self.w_await = 0.0 + + class ReactorStats: + def __init__(self, thread: str): + self.thread = thread + self.busy_secs = Counter() + self.idle_secs = Counter() + + self.busy_rate = 0.0 + self.idle_rate = 0.0 + + def calculate(self, delay: float): + self.busy_rate = self.busy_secs.rate(delay) + self.idle_rate = self.idle_secs.rate(delay) + + class NvmeofTopCollector: # type: ignore[no-redef] # noqa # pylint: disable=function-redefined,too-many-instance-attributes + def __init__(self): + self.tool: Any = None + self.subsystem_nqn = '' + self.server_addr = '' + self.delay: float = 0.0 + self.namespaces = {} + self.lbg_to_gateway: dict = {} + self.subsystems: Any = None + self.reactor_stats = {} + self.iostats = {} + self.gw_info: Any = None + self.client: Any = None + self.timestamp = time.time() + self.health = Health() + + @property + def nqn_list(self): + return [subsys.nqn for subsys in self.subsystems.subsystems] + + @property + def ready(self) -> bool: + return self.health.rc == 0 + + @property + def total_namespaces_defined(self) -> int: + return len(self.namespaces[self.subsystem_nqn]) + + @property + def total_subsystems(self) -> int: + return len(self.nqn_list) + + @property + def total_namespaces_overall(self): + total = 0 + for subsys in self.subsystems.subsystems: + total += subsys.namespace_count + return total + + @property + def max_namespaces(self): + for subsys in self.subsystems.subsystems: + if subsys.nqn == self.subsystem_nqn: + return subsys.max_namespaces + logger.error("Request for max namespaces could not find a " + "match against the NQN! Returning 0") + return 0 + + @property + def load_balancing_group(self): + return self.gw_info.load_balancing_group + + def get_sorted_namespaces(self, sort_pos: int, reverse_sort: bool): + logger.debug("get_sorted_namespaces") + ns_data = [] + for ns in self.namespaces[self.subsystem_nqn]: + bdev_name = ns.bdev_name + + daemon_name = "" + if self.tool.args.get('server_addr'): + # only show namespaces owned by this gateway's LBG + if ns.load_balancing_group != self.load_balancing_group: + continue + daemon_name = self.client.daemon_name + else: + daemon_name = self.lbg_to_gateway.get(ns.load_balancing_group, '') + if not daemon_name: + logger.warning("No gateway found for load balancing group %s, " + "skipping namespace %s", + ns.load_balancing_group, ns.nsid) + continue + perf_stats = self.iostats.get(daemon_name, {}).get(bdev_name) + if perf_stats is None: + logger.warning("No iostats for bdev %s on %s, skipping namespace %s", + bdev_name, daemon_name, ns.nsid) + continue + perf_stats.calculate(self.delay) + + ns_data.append(( + ns.nsid, + f"{ns.rbd_pool_name}/{ns.rbd_image_name}", + int(perf_stats.total_ops_rate), + int(perf_stats.read_ops_rate), + f"{self.bytes_to_MB(perf_stats.read_bytes_rate):3.2f}", + f"{perf_stats.r_await:3.2f}", + f"{perf_stats.rareq_sz:4.2f}", + int(perf_stats.write_ops_rate), + f"{self.bytes_to_MB(perf_stats.write_bytes_rate):3.2f}", + f"{perf_stats.w_await:3.2f}", + f"{perf_stats.wareq_sz:4.2f}", + self.lb_group(ns.load_balancing_group), + self.qos_enabled(ns) + )) + + ns_data.sort(key=lambda t: t[sort_pos], reverse=reverse_sort) + return ns_data + + def get_reactor_data(self, sort_pos: int, reverse_sort: bool): + reactor_data = [] + for gw_addr, threads in self.reactor_stats.items(): + for _, thread_stats in threads.items(): + thread_stats.calculate(self.delay) + reactor_data.append(( + gw_addr, + thread_stats.thread, + f"{thread_stats.busy_rate * 100:.2f}", + f"{thread_stats.idle_rate * 100:.2f}", + )) + reactor_data.sort(key=lambda t: t[sort_pos], reverse=reverse_sort) + return reactor_data + + def get_subsystem_summary_data(self): + return [ + self.subsystem_nqn, + f'{self.total_namespaces_defined} / {self.max_namespaces}', + ] + + def get_overall_summary_data(self): + return [ + self.server_addr, + self.load_balancing_group, + self.total_subsystems, + self.total_namespaces_overall, + ] + + def qos_enabled(self, ns) -> str: + if (ns.rw_ios_per_second or ns.rw_mbytes_per_second + or ns.r_mbytes_per_second or ns.w_mbytes_per_second): + return 'Yes' + return 'No' + + def lb_group(self, grp_id: int): + """Provide a meaningful default when load-balancing is not in use""" + return "N/A" if grp_id == 0 else f"{grp_id}" + + def bytes_to_MB(self, num_bytes: int, si: int = 1024): + """Simple conversion of bytes to MiB or MB""" + return (num_bytes / si) / si + + # grpc methods + def _call_grpc(self, method_name, request, client=None): + logger.debug("calling grpc method %s", method_name) + if not client: + client = self.client + try: + method = getattr(client.stub, method_name) + response = method(request) + except Exception as exc: # pylint: disable=broad-except + self.health.rc = -errno.ECONNREFUSED + self.health.msg = f"RPC endpoint unavailable at {client.gateway_addr}" + logger.error("grpc call to %s failed: %s (%s)", method_name, self.health.msg, exc) + return None + + self.health.msg = f"{method_name} success" + logger.debug("call to %s successful", method_name) + return response + + def _fetch_namespace_iostats(self, client): + daemon_name = client.daemon_name + logger.debug("fetching iostats for namespaces from %s", daemon_name) + stats = self._call_grpc('list_namespaces_io_stats', + NVMeoFClient.pb2.list_namespaces_io_stats_req(), client) + logger.debug("list_namespaces_io_stats stats=%s", stats) + if stats is None: + return + if daemon_name not in self.iostats: + self.iostats[daemon_name] = {} + + for ns in stats.namespaces: + bdev_name = ns.bdev_name + if bdev_name not in self.iostats[daemon_name]: + self.iostats[daemon_name][bdev_name] = PerformanceStats(bdev_name) + + ns_stats = self.iostats[daemon_name][bdev_name] + ns_stats.read_ops.update(ns.num_read_ops) + ns_stats.read_bytes.update(ns.bytes_read) + ns_stats.read_secs.update((ns.read_latency_ticks / stats.tick_rate)) + ns_stats.write_ops.update(ns.num_write_ops) + ns_stats.write_bytes.update(ns.bytes_written) + ns_stats.write_secs.update((ns.write_latency_ticks / stats.tick_rate)) + + def _fetch_namespaces(self, subsystem_nqn): + return self._call_grpc( + 'list_namespaces', + NVMeoFClient.pb2.list_namespaces_req(subsystem=subsystem_nqn)) + + def _fetch_thread_stats(self, client): + gateway_addr = client.gateway_addr + logger.debug("fetching thread stats for %s", gateway_addr) + stats = self._call_grpc('get_thread_stats', + NVMeoFClient.pb2.get_thread_stats_req(), client) + logger.debug("get_thread_stats stats=%s", stats) + if stats is None: + return + if gateway_addr not in self.reactor_stats: + self.reactor_stats[gateway_addr] = {} + tick_rate = stats.tick_rate + for thread in stats.threads: + name = thread.name + if name not in self.reactor_stats[gateway_addr]: + self.reactor_stats[gateway_addr][name] = ReactorStats(thread.name) + thread_stats = self.reactor_stats[gateway_addr][name] + thread_stats.busy_secs.update(thread.busy / tick_rate) + thread_stats.idle_secs.update(thread.idle / tick_rate) + + def _fetch_gateway_info(self, client): + return self._call_grpc( + 'get_gateway_info', + NVMeoFClient.pb2.get_gateway_info_req(), client) + + def _fetch_subsystems(self): + return self._call_grpc('list_subsystems', NVMeoFClient.pb2.list_subsystems_req()) + + def initialise(self, tool): + self.health = Health() + self.tool = tool + self.client = NVMeoFClient(tool.args.get('group', ''), + tool.args.get('server_addr', '')) + self.server_addr = self.client.gateway_addr + + now = time.time() + self.delay = (now - self.timestamp) + self.timestamp = now + + self.gw_info = self._fetch_gateway_info(self.client) + if not self.ready: + logger.error("Call to %s failed, RC=%s, MSG=%s", + self.server_addr, self.health.rc, self.health.msg) + self.health.msg = ( + f"Unable to connect to {self.server_addr}, " + "pass an available gateway as --server-addr" + ) + return + + logger.debug("Connected to %s", self.server_addr) + + def collect_cpu_data(self): + service_name = self.tool.service_name + group = self.tool.args.get('group', '') + if service_name: + gw_conf = NvmeofGatewaysConfig.get_gateways_config() + gateways = gw_conf.get("gateways", {}) + if service_name not in gateways: + self.health.rc = -errno.ENOENT + self.health.msg = f'Service {service_name} not found' + return + for gw in gateways[service_name]: + client = NVMeoFClient(group, gw["service_url"]) + self._fetch_thread_stats(client) + if not self.ready: + return + else: + self._fetch_thread_stats(self.client) + logger.debug("collect_cpu_data completed") + + def collect_io_data(self): # pylint: disable=too-many-return-statements + self.subsystem_nqn = self.tool.subsystem_nqn + + self.subsystems = self._fetch_subsystems() + if self.subsystems is None or self.subsystems.status > 0: + logger.error("Failed to retrieve subsystems list") + self.health.rc = -errno.ECONNREFUSED + self.health.msg = "Unable to retrieve a list of subsystems" + return + + if self.total_subsystems == 0: + self.health.rc = -errno.ENOENT + self.health.msg = 'No subsystems found' + return + + if self.subsystem_nqn and self.subsystem_nqn not in self.nqn_list: + logger.error("nqn provided is not present on the gateway") + self.health.rc = -errno.ENOENT + self.health.msg = "Subsystem NQN provided not found" + return + + namespace_info = self._fetch_namespaces(self.subsystem_nqn) + if namespace_info is None: + return + + self.namespaces[self.subsystem_nqn] = namespace_info.namespaces + logger.debug("Subsystem '%s' has %s namespaces", + self.subsystem_nqn, self.total_namespaces_defined) + + group = self.tool.args.get('group', '') + if not self.tool.args.get('server_addr'): + service_name = self.client.service_name + gw_conf = NvmeofGatewaysConfig.get_gateways_config() + gateways = gw_conf.get("gateways", {}) + if service_name not in gateways: + self.health.rc = -errno.ENOENT + self.health.msg = f'Service {service_name} not found' + return + self.lbg_to_gateway = get_lbg_gws_map(service_name) + if not self.lbg_to_gateway: + self.health.rc = -errno.ENOENT + self.health.msg = ( + f'Failed to retrieve load balancing group ' + f'mapping for service {service_name}' + ) + return + for gw in gateways[service_name]: + client = NVMeoFClient(group, gw["service_url"]) + self._fetch_namespace_iostats(client) + if not self.ready: + return + else: + self._fetch_namespace_iostats(self.client) + logger.debug("collect_io_data completed") + + class NVMeoFTopTool: + def __init__(self, args: dict, data_collector): + self.args = args + self.collector = data_collector + self.reverse_sort = args.get('sort_descending', False) + self.sort_key = args.get('sort_by') + + def run(self) -> tuple: + try: + self.collector.initialise(self) + if not self.collector.ready: + return (self.collector.health.rc, + f"nvmeof-top has encountered an error: " + f"{self.collector.health.msg}") + + collect_start = time.time() + self._collect() + logger.info("collector methods took %.2fs", time.time() - collect_start) + + if not self.collector.ready: + return (self.collector.health.rc, self.collector.health.msg) + + output = self.format_output() + output += "\n ---- " + return (0, output) + except Exception as ex: # pylint: disable=broad-except + logger.exception("top tool failed to run: %s", ex) + return (-errno.EINVAL, str(ex)) + + def _collect(self): + raise NotImplementedError + + def format_output(self): + raise NotImplementedError + + class NVMeoFTopCPU(NVMeoFTopTool): + reactors_headers = ['Gateway', 'Thread Name', 'Busy Rate%', 'Idle Rate%'] + reactors_template = "{:<30} {:<30} {:<20} {:<20}\n" + + def __init__(self, args: dict, data_collector): + super().__init__(args, data_collector) + self.service_name = args.get('service') + + def _collect(self): + self.collector.collect_cpu_data() + + def format_output(self): + if self.sort_key not in NVMeoFTopCPU.reactors_headers: + raise ValueError( + f"Invalid sort key '{self.sort_key}'. " + f"Valid options: {NVMeoFTopCPU.reactors_headers}" + ) + sort_pos = NVMeoFTopCPU.reactors_headers.index(self.sort_key) + reactor_data = self.collector.get_reactor_data(sort_pos=sort_pos, + reverse_sort=self.reverse_sort) + rows = [] + if self.args.get('with_timestamp'): + timestamp = time.strftime('%Y-%m-%d %H:%M:%S', + time.localtime(self.collector.timestamp)) + rows.append(f"{timestamp} (delay: {self.collector.delay:.2f}s)\n") + + if not self.args.get('no_header'): + rows.append(NVMeoFTopCPU.reactors_template.format(*NVMeoFTopCPU.reactors_headers)) + for reactor in reactor_data: + rows.append(NVMeoFTopCPU.reactors_template.format(*reactor)) + rows.append("\n") + + return ''.join(rows) + + class NVMeoFTopIO(NVMeoFTopTool): + subsystem_summary_headers = ['Subsystem', 'Namespaces'] + summary_headers = ['Gateway', 'Load Balancing Group', + 'Total Subsystems', 'Total Namespaces'] + + ns_headers = [ + 'NSID', 'RBD Image', 'IOPS', 'r/s', 'rMB/s', 'r_await', 'rareq-sz', + 'w/s', 'wMB/s', 'w_await', 'wareq-sz', 'LBGrp', 'QoS' + ] + ns_template = ( + "{:>4} {:<40} {:>7} {:>6} {:>6} {:>7} {:>8}" + " {:>6} {:>6} {:>7} {:>8} {:^5} {:>3}\n" + ) + + def __init__(self, args: dict, data_collector): + super().__init__(args, data_collector) + self.subsystem_nqn = args.get('subsystem') + + def _collect(self): + self.collector.collect_io_data() + + def format_output(self): + if self.sort_key not in NVMeoFTopIO.ns_headers: + raise ValueError( + f"Invalid sort key '{self.sort_key}'. " + f"Valid options: {NVMeoFTopIO.ns_headers}" + ) + sort_pos = NVMeoFTopIO.ns_headers.index(self.sort_key) + ns_data = self.collector.get_sorted_namespaces(sort_pos=sort_pos, + reverse_sort=self.reverse_sort) + subsystem_summary_data = self.collector.get_subsystem_summary_data() + overall_summary_data = self.collector.get_overall_summary_data() + + rows = [] + if self.args.get('with_timestamp'): + timestamp = time.strftime('%Y-%m-%d %H:%M:%S', + time.localtime(self.collector.timestamp)) + rows.append(f"{timestamp} (delay: {self.collector.delay:.2f}s)\n") + if self.args.get('summary'): + if self.args.get('server_addr'): + summary_row = "" + for index, header in enumerate(NVMeoFTopIO.summary_headers): + summary_row += f"{header}: {overall_summary_data[index]} " + rows.append(summary_row + "\n") + subsys_summary_row = "" + for index, header in enumerate(NVMeoFTopIO.subsystem_summary_headers): + subsys_summary_row += f"{header}: {subsystem_summary_data[index]} " + rows.append(subsys_summary_row + "\n\n") + if not self.args.get('no_header'): + rows.append(NVMeoFTopIO.ns_template.format(*NVMeoFTopIO.ns_headers)) + if ns_data: + for ns in ns_data: + rows.append(NVMeoFTopIO.ns_template.format(*ns)) + else: + rows.append("\n") + + return ''.join(rows) + + @DBCLICommand.Read('nvmeof top cpu', poll=True) + def nvmeof_top_cpu(_, service: str = '', + server_addr: str = '', group: str = '', + descending: bool = False, sort_by: str = 'Thread Name', + with_timestamp: bool = False, + no_header: bool = False, + session_id: Optional[str] = None): + ''' + NVMeoF Top CPU Tool + --period [-p] (default 1s, max 3600s) + --service '' + --server-addr + --group '' + --sort-by '
' + --descending + --with-timestamp + --no-header + ''' + args = { + 'service': service, + 'with_timestamp': with_timestamp, + 'no_header': no_header, + 'sort_descending': descending, + 'sort_by': sort_by, + 'server_addr': server_addr, + 'group': group, + } + try: + data_collector = get_collector(session_id) + if data_collector is None: + return HandleCommandResult( + stderr="Unable to initialise collector", + retval=-errno.EINVAL + ) + top_tool = NVMeoFTopCPU(args, data_collector) + rc, output = top_tool.run() + if rc != 0: + return HandleCommandResult(stderr=output, retval=rc) + return HandleCommandResult(stdout=output, retval=rc) + except Exception as exc: # pylint: disable=broad-except + logger.exception("top-cpu command failed: %s", exc) + return HandleCommandResult(stderr=str(exc), retval=-errno.EINVAL) + + @DBCLICommand.Read('nvmeof top io', poll=True) + def nvmeof_top_io(_, subsystem: str = '', + server_addr: str = '', group: str = '', + descending: bool = False, sort_by: str = 'NSID', + with_timestamp: bool = False, + summary: bool = False, no_header: bool = False, + session_id: Optional[str] = None): + ''' + NVMeoF Top IO Tool + --period [-p] (default 1s, max 3600s) + --subsystem '' + --server-addr + --group '' + --descending + --sort-by '
' + --with-timestamp + --summary + --no-header + ''' + args = { + 'subsystem': subsystem, + 'with_timestamp': with_timestamp, + 'summary': summary, + 'no_header': no_header, + 'sort_descending': descending, + 'sort_by': sort_by, + 'server_addr': server_addr, + 'group': group, + } + if not subsystem: + return HandleCommandResult( + stderr="Required argument '--subsystem' missing", + retval=-errno.EINVAL + ) + try: + data_collector = get_collector(session_id) + if data_collector is None: + return HandleCommandResult( + stderr="Unable to initialise collector", + retval=-errno.EINVAL + ) + top_tool = NVMeoFTopIO(args, data_collector) + rc, output = top_tool.run() + if rc != 0: + return HandleCommandResult(stderr=output, retval=rc) + return HandleCommandResult(stdout=output, retval=rc) + except Exception as exc: # pylint: disable=broad-except + logger.exception("top-io command failed: %s", exc) + return HandleCommandResult(stderr=str(exc), retval=-errno.EINVAL) diff --git a/src/pybind/mgr/dashboard/tests/test_nvmeof_top_cli.py b/src/pybind/mgr/dashboard/tests/test_nvmeof_top_cli.py new file mode 100644 index 00000000000..8c9f691310b --- /dev/null +++ b/src/pybind/mgr/dashboard/tests/test_nvmeof_top_cli.py @@ -0,0 +1,308 @@ +# -*- coding: utf-8 -*- +import errno +import time +from unittest.mock import MagicMock, patch + +import pytest + +from ..services.nvmeof_top_cli import Counter, NvmeofTopCollector, NVMeoFTopCPU, NVMeoFTopIO +from ..tests import CLICommandTestMixin, CmdException + + +@pytest.fixture(name='cpu_collector') +def fixture_cpu_collector(): + collector = MagicMock() + collector.get_reactor_data.return_value = [] + collector.delay = 1.5 + collector.timestamp = time.time() + return collector + + +@pytest.fixture(name='io_collector') +def fixture_io_collector(): + collector = MagicMock() + collector.get_sorted_namespaces.return_value = [] + collector.get_subsystem_summary_data.return_value = [] + collector.get_overall_summary_data.return_value = [] + collector.delay = 2.0 + collector.timestamp = time.time() + return collector + + +class TestCounter: + def test_initial_values(self): + c = Counter() + assert c.current == 0.0 + assert c.last == 0.0 + + def test_update_tracks_last(self): + c = Counter() + c.update(10.0) + assert c.current == 10.0 + assert c.last == 0.0 + c.update(20.0) + assert c.current == 20.0 + assert c.last == 10.0 + + def test_rate(self): + c = Counter() + c.update(100.0) + c.update(150.0) + assert c.rate(5.0) == 10.0 + + def test_rate_zero_interval_returns_zero(self): + c = Counter() + c.update(100.0) + assert c.rate(0) == 0.0 + + +class TestNVMeoFTopCPUFormat: + default_args = { + 'sort_by': 'Thread Name', + 'sort_descending': False, + 'with_timestamp': False, + 'no_header': False, + 'service': '', + 'server_addr': '', + 'group': '', + } + + def test_headers(self, cpu_collector): + tool = NVMeoFTopCPU(self.default_args, cpu_collector) + output = tool.format_output() + assert 'Gateway' in output + assert 'Thread Name' in output + assert 'Busy Rate%' in output + assert 'Idle Rate%' in output + + def test_no_header(self, cpu_collector): + tool = NVMeoFTopCPU({**self.default_args, 'no_header': True}, cpu_collector) + output = tool.format_output() + assert 'Gateway' not in output + + def test_with_timestamp(self, cpu_collector): + tool = NVMeoFTopCPU({**self.default_args, 'with_timestamp': True}, cpu_collector) + output = tool.format_output() + assert 'delay:' in output + assert '1.50s' in output + + def test_reactor_data(self, cpu_collector): + cpu_collector.get_reactor_data.return_value = [ + ('192.168.1.1:5500', 'reactor_0', '72.50', '27.50'), + ('192.168.1.1:5500', 'reactor_1', '45.00', '55.00'), + ] + tool = NVMeoFTopCPU(self.default_args, cpu_collector) + output = tool.format_output() + assert '192.168.1.1:5500' in output + assert 'reactor_0' in output + assert 'reactor_1' in output + + def test_invalid_sort_key(self, cpu_collector): + tool = NVMeoFTopCPU({**self.default_args, 'sort_by': 'NonExistent'}, cpu_collector) + with pytest.raises(ValueError, match="Invalid sort key"): + tool.format_output() + + +class TestNVMeoFTopIOFormat: + default_args = { + 'sort_by': 'NSID', + 'sort_descending': False, + 'with_timestamp': False, + 'no_header': False, + 'summary': False, + 'subsystem': 'nqn.2024-01.io.spdk:cnode1', + 'server_addr': '', + 'group': '', + } + + def test_no_namespaces(self, io_collector): + tool = NVMeoFTopIO(self.default_args, io_collector) + output = tool.format_output() + assert '' in output + + def test_headers_present(self, io_collector): + tool = NVMeoFTopIO(self.default_args, io_collector) + output = tool.format_output() + assert 'NSID' in output + assert 'RBD Image' in output + + def test_no_header(self, io_collector): + tool = NVMeoFTopIO({**self.default_args, 'no_header': True}, io_collector) + output = tool.format_output() + assert 'NSID' not in output + + def test_namespace_data(self, io_collector): + io_collector.get_sorted_namespaces.return_value = [ + (1, 'pool/image1', 100, 50, '1.00', '0.50', '4.00', + 50, '1.00', '0.50', '4.00', '1', 'No'), + (2, 'pool/image2', 200, 100, '2.00', '1.00', '8.00', + 100, '2.00', '1.00', '8.00', '2', 'Yes'), + ] + tool = NVMeoFTopIO(self.default_args, io_collector) + output = tool.format_output() + assert 'pool/image1' in output + assert 'pool/image2' in output + + def test_with_timestamp(self, io_collector): + tool = NVMeoFTopIO({**self.default_args, 'with_timestamp': True}, io_collector) + output = tool.format_output() + assert 'delay:' in output + + def test_invalid_sort_key(self, io_collector): + tool = NVMeoFTopIO({**self.default_args, 'sort_by': 'BadKey'}, io_collector) + with pytest.raises(ValueError, match="Invalid sort key"): + tool.format_output() + + +class TestNvmeofTopCollector: + @pytest.fixture + def collector(self): + c = NvmeofTopCollector() + c.client = MagicMock() + c.client.service_name = 'myservice' + c.tool = MagicMock() + c.tool.args = {'group': '', 'server_addr': ''} + return c + + def test_grpc_call_failure(self, collector): + collector.client.gateway_addr = '192.168.1.1:5500' + collector.client.stub.get_gateway_info.side_effect = Exception('connection refused') + collector._call_grpc( # pylint: disable=protected-access + 'get_gateway_info', MagicMock(), collector.client) + assert collector.health.rc == -errno.ECONNREFUSED + assert collector.health.msg == 'RPC endpoint unavailable at 192.168.1.1:5500' + + def test_collect_cpu_data_service_not_found(self, collector): + collector.tool.service_name = 'myservice' + with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config', + return_value={'gateways': {}}): + collector.collect_cpu_data() + assert collector.health.rc == -errno.ENOENT + assert collector.health.msg == 'Service myservice not found' + + def test_collect_io_data_subsystems_unavailable(self, collector): + collector.tool.subsystem_nqn = 'nqn.test' + with patch.object(collector, '_fetch_subsystems', return_value=None): + collector.collect_io_data() + assert collector.health.rc == -errno.ECONNREFUSED + assert collector.health.msg == 'Unable to retrieve a list of subsystems' + + def test_collect_io_data_no_subsystems(self, collector): + collector.tool.subsystem_nqn = '' + mock_subsystems = MagicMock() + mock_subsystems.status = 0 + mock_subsystems.subsystems = [] + with patch.object(collector, '_fetch_subsystems', return_value=mock_subsystems): + collector.collect_io_data() + assert collector.health.rc == -errno.ENOENT + assert collector.health.msg == 'No subsystems found' + + def test_collect_io_data_nqn_not_found(self, collector): + collector.tool.subsystem_nqn = 'nqn.test' + mock_subsystems = MagicMock() + mock_subsystems.status = 0 + mock_sub = MagicMock() + mock_sub.nqn = 'nqn.other' + mock_subsystems.subsystems = [mock_sub] + with patch.object(collector, '_fetch_subsystems', return_value=mock_subsystems): + collector.collect_io_data() + assert collector.health.rc == -errno.ENOENT + assert collector.health.msg == 'Subsystem NQN provided not found' + + def test_collect_io_data_service_not_found(self, collector): + collector.tool.subsystem_nqn = 'nqn.test' + mock_subsystems = MagicMock() + mock_subsystems.status = 0 + mock_sub = MagicMock() + mock_sub.nqn = 'nqn.test' + mock_subsystems.subsystems = [mock_sub] + mock_namespace_info = MagicMock() + mock_namespace_info.namespaces = [] + with patch.object(collector, '_fetch_subsystems', return_value=mock_subsystems), \ + patch.object(collector, '_fetch_namespaces', return_value=mock_namespace_info), \ + patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config', + return_value={'gateways': {}}): + collector.collect_io_data() + assert collector.health.rc == -errno.ENOENT + assert collector.health.msg == 'Service myservice not found' + + def test_collect_io_data_lbg_mapping_failed(self, collector): + collector.tool.subsystem_nqn = 'nqn.test' + mock_subsystems = MagicMock() + mock_subsystems.status = 0 + mock_sub = MagicMock() + mock_sub.nqn = 'nqn.test' + mock_subsystems.subsystems = [mock_sub] + mock_namespace_info = MagicMock() + mock_namespace_info.namespaces = [] + with patch.object(collector, '_fetch_subsystems', return_value=mock_subsystems), \ + patch.object(collector, '_fetch_namespaces', return_value=mock_namespace_info), \ + patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config', + return_value={'gateways': {'myservice': []}}), \ + patch('dashboard.services.nvmeof_top_cli.get_lbg_gws_map', return_value={}): + collector.collect_io_data() + assert collector.health.rc == -errno.ENOENT + assert collector.health.msg == \ + 'Failed to retrieve load balancing group mapping for service myservice' + + +class TestNvmeofTopCommands(CLICommandTestMixin): + @classmethod + def exec_nvmeof_cmd(cls, cmd, **kwargs): + return cls.exec_cmd('', prefix=cmd, **kwargs) + + def test_top_io_missing_subsystem_returns_einval(self): + with pytest.raises(CmdException) as exc_info: + self.exec_nvmeof_cmd('nvmeof top io', subsystem='', session_id='sess1') + assert exc_info.value.retcode == -errno.EINVAL + assert str(exc_info.value) == "Required argument '--subsystem' missing" + + def test_top_cpu_success(self): + with patch('dashboard.services.nvmeof_top_cli.get_collector') as mock_gc: + mock_gc.return_value = MagicMock() + with patch.object(NVMeoFTopCPU, 'run', return_value=(0, 'cpu output\n')): + result = self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1') + assert 'cpu output' in result + mock_gc.assert_called_once_with('sess1') + + def test_top_cpu_run_fail(self): + with patch('dashboard.services.nvmeof_top_cli.get_collector') as mock_gc: + mock_gc.return_value = MagicMock() + with patch.object(NVMeoFTopCPU, 'run', + return_value=(-errno.ENOENT, 'error')): + with pytest.raises(CmdException) as exc_info: + self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1') + assert exc_info.value.retcode == -errno.ENOENT + assert str(exc_info.value) == 'error' + + def test_top_io_success(self): + with patch('dashboard.services.nvmeof_top_cli.get_collector') as mock_gc: + mock_gc.return_value = MagicMock() + with patch.object(NVMeoFTopIO, 'run', return_value=(0, 'io output\n')): + result = self.exec_nvmeof_cmd( + 'nvmeof top io', + subsystem='nqn.test', + session_id='sess2' + ) + assert 'io output' in result + mock_gc.assert_called_once_with('sess2') + + def test_top_cpu_get_collector_fail(self): + with patch('dashboard.services.nvmeof_top_cli.get_collector', + side_effect=RuntimeError("boom")): + with pytest.raises(CmdException) as exc_info: + self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1') + assert exc_info.value.retcode == -errno.EINVAL + assert str(exc_info.value) == 'boom' + + def test_top_io_get_collector_fail(self): + with patch('dashboard.services.nvmeof_top_cli.get_collector', + side_effect=RuntimeError("boom")): + with pytest.raises(CmdException) as exc_info: + self.exec_nvmeof_cmd( + 'nvmeof top io', + subsystem='nqn.test', + session_id='sess2' + ) + assert exc_info.value.retcode == -errno.EINVAL + assert str(exc_info.value) == 'boom' -- 2.47.3