]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/dashboard: validate args in nvmeof top cmds 68275/head
authorVallari Agrawal <vallari.agrawal@ibm.com>
Thu, 9 Apr 2026 07:49:18 +0000 (13:19 +0530)
committerVallari Agrawal <vallari.agrawal@ibm.com>
Sun, 12 Apr 2026 14:09:00 +0000 (19:39 +0530)
This commit makes these changes to nvmeof top tool:
1. Improve/cleanup help text
2. Rename args (--group, --server-addr, --subsystem) to
   (--gw-group, --server-address, --nqn) to match other nvmeof cmds
3. Validate args --period, --gw-group, --server-address, --sort-by
4. Remove --service arg (since group and service have 1-1 mapping, this is redundant)
5. Show all cpu stats if no args are passed to "ceph nvmeof top cpu"
6. Don't show busy/idle rate more than 100%

Fixes: https://tracker.ceph.com/issues/75927
Signed-off-by: Vallari Agrawal <vallari.agrawal@ibm.com>
src/ceph.in
src/pybind/mgr/dashboard/services/nvmeof_top_cli.py
src/pybind/mgr/dashboard/tests/test_nvmeof_top_cli.py

index 64a8857e98e388b3f5f0cd7bf4fbbbf7f599dd91..4694da29db5fea32f5f6e7986920b60037d710d1 100755 (executable)
@@ -595,6 +595,7 @@ def do_command(parsed_args, target, cmdargs, sigdict, inbuf, verbose):
     if valid_dict.get('poll', False):
         valid_dict['width'] = Termsize().cols
         valid_dict['session_id'] = str(uuid.uuid4())
+        valid_dict['period'] = parsed_args.period
     while True:
         try:
             # Only print the header for polling commands
index 44aa5ebec97e710dde4c6df4d16e92a2b1d7b2b8..b5a1a510e5766afd5958b5d58e6ce06b055dfa65 100644 (file)
@@ -3,6 +3,7 @@
 # https://github.com/pcuzner/ceph-nvmeof-top
 # by Paul Cuzner <pcuzner@ibm.com>
 import errno
+import ipaddress
 import json
 import logging
 import time
@@ -24,8 +25,9 @@ try:
 except ImportError as e:
     logger.error("Failed to import NVMeoFClient and related components: %s", e)
 else:
+    MAX_SESSION_TTL = 60 * 60
+
     def get_collector(session_id: Optional[str]):
-        MAX_SESSION_TTL = 60 * 60
         return mgr.get_nvmeof_collector(session_id, MAX_SESSION_TTL)
 
     def get_lbg_gws_map(service_name: str):
@@ -138,7 +140,8 @@ else:
         def __init__(self):
             self.tool: Any = None
             self.subsystem_nqn = ''
-            self.server_addr = ''
+            self.service = ''
+            self.group = ''
             self.delay: float = 0.0
             self.namespaces = {}
             self.lbg_to_gateway: dict = {}
@@ -194,7 +197,7 @@ else:
                 bdev_name = ns.bdev_name
 
                 daemon_name = ""
-                if self.tool.args.get('server_addr'):
+                if self.tool.args.get('server_address'):
                     # only show namespaces owned by this gateway's LBG
                     if ns.load_balancing_group != self.load_balancing_group:
                         continue
@@ -240,8 +243,8 @@ else:
                     reactor_data.append((
                         gw_addr,
                         thread_stats.thread,
-                        f"{thread_stats.busy_rate * 100:.2f}",
-                        f"{thread_stats.idle_rate * 100:.2f}",
+                        min(thread_stats.busy_rate * 100, 100.0),
+                        min(thread_stats.idle_rate * 100, 100.0),
                     ))
             reactor_data.sort(key=lambda t: t[sort_pos], reverse=reverse_sort)
             return reactor_data
@@ -254,7 +257,14 @@ else:
 
         def get_overall_summary_data(self):
             return [
-                self.server_addr,
+                self.group,
+                self.total_subsystems,
+                self.total_namespaces_overall,
+            ]
+
+        def get_gateway_summary_data(self):
+            return [
+                self.client.gateway_addr,
                 self.load_balancing_group,
                 self.total_subsystems,
                 self.total_namespaces_overall,
@@ -348,52 +358,97 @@ else:
         def _fetch_subsystems(self):
             return self._call_grpc('list_subsystems', NVMeoFClient.pb2.list_subsystems_req())
 
-        def _get_client(self, group, service_url):
-            key = (group, service_url)
+        def _get_client(self, group, server_addr):
+            key = (group, server_addr)
             if key not in self.clients:
-                self.clients[key] = NVMeoFClient(group, service_url)
+                self.clients[key] = NVMeoFClient(group, server_addr)
             return self.clients[key]
 
+        def _set_gateways(self, group_filter: str, addr_filter: str,
+                          port_filter: Optional[int] = None):
+            if self.service and self.group:
+                return
+
+            services = NvmeofGatewaysConfig.get_gateways_config().get("gateways", {})
+
+            if not services:
+                self.health.rc = -errno.ENOENT
+                self.health.msg = "No NVMeoF gateways configured"
+                return
+
+            if not addr_filter and not group_filter and len(services) > 1:
+                self.health.rc = -errno.EINVAL
+                self.health.msg = (
+                    f"Multiple gateway groups found: {', '.join(services.keys())}. "
+                    "Provide --gw-group <name>"
+                )
+                return
+
+            matched_service_name = ''
+            matched_gws = []
+            for svc_name, svc_gateways in services.items():
+                for gw in svc_gateways:
+                    gw_host, _, gw_port = gw['service_url'].rpartition(':')
+                    gw_host = gw_host.strip('[]')
+                    if (addr_filter and addr_filter != gw_host) or \
+                            (port_filter and str(port_filter) != gw_port):
+                        continue
+                    if group_filter and gw.get('group') != group_filter:
+                        if addr_filter:
+                            self.health.rc = -errno.EINVAL
+                            self.health.msg = (
+                                f"Address '{addr_filter}' belongs to group "
+                                f"'{gw.get('group')}', not '{group_filter}'"
+                            )
+                            return
+                        continue
+                    matched_service_name = svc_name
+                    matched_gws.append(gw)
+
+            if not matched_gws:
+                if addr_filter:
+                    self.health.rc = -errno.ENOENT
+                    self.health.msg = f"No gateway found matching address: {addr_filter}"
+                elif group_filter:
+                    self.health.rc = -errno.ENOENT
+                    self.health.msg = f"Gateway group '{group_filter}' not found"
+                return
+
+            self.service = matched_service_name
+            self.group = matched_gws[0].get('group', '')
+            for gw in matched_gws:
+                self._get_client(self.group, gw['service_url'])
+
         def initialise(self, tool):
             self.health = Health()
             self.tool = tool
-            self.client = self._get_client(tool.args.get('group', ''),
-                                           tool.args.get('server_addr', ''))
-            self.server_addr = self.client.gateway_addr
+
+            self._set_gateways(
+                group_filter=tool.args.get('gw_group', ''),
+                addr_filter=tool.args.get('server_address', ''),
+                port_filter=tool.args.get('server_port')
+            )
+            if not self.ready:
+                return
 
             now = time.time()
             self.delay = (now - self.timestamp)
             self.timestamp = now
 
-            self.gw_info = self._fetch_gateway_info(self.client)
-            if not self.ready:
-                logger.error("Call to %s failed, RC=%s, MSG=%s",
-                             self.server_addr, self.health.rc, self.health.msg)
-                self.health.msg = (
-                    f"Unable to connect to {self.server_addr}, "
-                    "pass an available gateway as --server-addr"
-                )
-                return
+            self.client = next(iter(self.clients.values()))
 
-            logger.debug("Connected to %s", self.server_addr)
+            if self.gw_info is None:
+                self.gw_info = self._fetch_gateway_info(self.client)
+                if not self.ready:
+                    self.health.msg = f"Unable to connect to {self.client.gateway_addr}"
+                    return
+                logger.debug("Connected to %s", self.client.gateway_addr)
 
         def collect_cpu_data(self):
-            service_name = self.tool.service_name
-            group = self.tool.args.get('group', '')
-            if service_name:
-                gw_conf = NvmeofGatewaysConfig.get_gateways_config()
-                gateways = gw_conf.get("gateways", {})
-                if service_name not in gateways:
-                    self.health.rc = -errno.ENOENT
-                    self.health.msg = f'Service {service_name} not found'
+            for client in self.clients.values():
+                self._fetch_thread_stats(client)
+                if not self.ready:
                     return
-                for gw in gateways[service_name]:
-                    client = self._get_client(group, gw["service_url"])
-                    self._fetch_thread_stats(client)
-                    if not self.ready:
-                        return
-            else:
-                self._fetch_thread_stats(self.client)
             logger.debug("collect_cpu_data completed")
 
         def _set_subsystem_and_namespaces(self):
@@ -433,41 +488,56 @@ else:
             if not self.ready:
                 return
 
-            group = self.tool.args.get('group', '')
-            if not self.tool.args.get('server_addr'):
-                service_name = self.client.service_name
-                gw_conf = NvmeofGatewaysConfig.get_gateways_config()
-                gateways = gw_conf.get("gateways", {})
-                if service_name not in gateways:
-                    self.health.rc = -errno.ENOENT
-                    self.health.msg = f'Service {service_name} not found'
-                    return
-                self.lbg_to_gateway = get_lbg_gws_map(service_name)
+            if not self.tool.args.get('server_address'):
+                self.lbg_to_gateway = get_lbg_gws_map(self.service)
                 if not self.lbg_to_gateway:
                     self.health.rc = -errno.ENOENT
                     self.health.msg = (
                         f'Failed to retrieve load balancing group '
-                        f'mapping for service {service_name}'
+                        f'mapping for service {self.service}'
                     )
                     return
-                for gw in gateways[service_name]:
-                    client = self._get_client(group, gw["service_url"])
-                    self._fetch_namespace_iostats(client)
-                    if not self.ready:
-                        return
-            else:
-                self._fetch_namespace_iostats(self.client)
+            for client in self.clients.values():
+                self._fetch_namespace_iostats(client)
+                if not self.ready:
+                    return
             logger.debug("collect_io_data completed")
 
     class NVMeoFTopTool:
-        def __init__(self, args: dict, data_collector):
+        def __init__(self, args: dict):
             self.args = args
-            self.collector = data_collector
+            self.collector: Any = None
             self.reverse_sort = args.get('sort_descending', False)
             self.sort_key = args.get('sort_by')
 
+        def _validate_args(self) -> Optional[tuple]:
+            period = self.args.get('period', 1)
+            if not 1 <= period <= MAX_SESSION_TTL:
+                return (-errno.EINVAL,
+                        f"Invalid period '{period}': must be between 1 and {MAX_SESSION_TTL}")
+            server_address = self.args.get('server_address', '')
+            if server_address:
+                try:
+                    ipaddress.ip_address(server_address)
+                except Exception:  # pylint: disable=broad-except
+                    return (-errno.EINVAL,
+                            f"Invalid server-address '{server_address}': "
+                            "must be a valid IP address")
+            server_port = self.args.get('server_port')
+            if server_port is not None and not 1 <= server_port <= 65535:
+                return (-errno.EINVAL,
+                        f"Invalid server-port '{server_port}': "
+                        "must be between 1 and 65535")
+            return None
+
         def run(self) -> tuple:
             try:
+                err = self._validate_args()
+                if err:
+                    return err
+                self.collector = get_collector(self.args.get('session_id'))
+                if self.collector is None:
+                    return (-errno.EINVAL, "Unable to initialise collector")
                 self.collector.initialise(self)
                 if not self.collector.ready:
                     return (self.collector.health.rc,
@@ -498,10 +568,6 @@ else:
         reactors_headers = ['Gateway', 'Thread Name', 'Busy Rate%', 'Idle Rate%']
         reactors_template = "{:<30}   {:<30}   {:<20}   {:<20}\n"
 
-        def __init__(self, args: dict, data_collector):
-            super().__init__(args, data_collector)
-            self.service_name = args.get('service')
-
         def _collect(self):
             self.collector.collect_cpu_data()
 
@@ -522,16 +588,19 @@ else:
 
             if not self.args.get('no_header'):
                 rows.append(NVMeoFTopCPU.reactors_template.format(*NVMeoFTopCPU.reactors_headers))
-            for reactor in reactor_data:
-                rows.append(NVMeoFTopCPU.reactors_template.format(*reactor))
+            for gw_addr, thread_name, busy_rate, idle_rate in reactor_data:
+                rows.append(NVMeoFTopCPU.reactors_template.format(
+                    gw_addr, thread_name, f"{busy_rate:.2f}", f"{idle_rate:.2f}"
+                ))
             rows.append("\n")
 
             return ''.join(rows)
 
     class NVMeoFTopIO(NVMeoFTopTool):
         subsystem_summary_headers = ['Subsystem', 'Namespaces']
-        summary_headers = ['Gateway', 'Load Balancing Group',
-                           'Total Subsystems', 'Total Namespaces']
+        gateway_summary_headers = ['Gateway', 'Load Balancing Group',
+                                   'Total Subsystems', 'Total Namespaces']
+        summary_headers = ['Group', 'Total Subsystems', 'Total Namespaces']
 
         ns_headers = [
             'NSID', 'RBD Image', 'IOPS', 'r/s', 'rMB/s', 'r_await', 'rareq-sz',
@@ -542,9 +611,9 @@ else:
             "   {:>6}   {:>6}   {:>7}   {:>8}   {:^5}   {:>3}\n"
         )
 
-        def __init__(self, args: dict, data_collector):
-            super().__init__(args, data_collector)
-            self.subsystem_nqn = args.get('subsystem')
+        def __init__(self, args: dict):
+            super().__init__(args)
+            self.subsystem_nqn = args.get('nqn')
 
         def _collect(self):
             self.collector.collect_io_data()
@@ -567,11 +636,15 @@ else:
                                           time.localtime(self.collector.timestamp))
                 rows.append(f"{timestamp} (delay: {self.collector.delay:.2f}s)\n")
             if self.args.get('summary'):
-                if self.args.get('server_addr'):
-                    summary_row = ""
+                summary_row = ""
+                if self.args.get('server_address'):
+                    gateway_summary_data = self.collector.get_gateway_summary_data()
+                    for index, header in enumerate(NVMeoFTopIO.gateway_summary_headers):
+                        summary_row += f"{header}: {gateway_summary_data[index]}  "
+                else:
                     for index, header in enumerate(NVMeoFTopIO.summary_headers):
                         summary_row += f"{header}: {overall_summary_data[index]}  "
-                    rows.append(summary_row + "\n")
+                rows.append(summary_row + "\n")
                 subsys_summary_row = ""
                 for index, header in enumerate(NVMeoFTopIO.subsystem_summary_headers):
                     subsys_summary_row += f"{header}: {subsystem_summary_data[index]}  "
@@ -587,94 +660,73 @@ else:
             return ''.join(rows)
 
     @DBCLICommand.Read('nvmeof top cpu', poll=True)
-    def nvmeof_top_cpu(_, service: str = '',
-                       server_addr: str = '', group: str = '',
+    def nvmeof_top_cpu(_, server_address: str = '', server_port: Optional[int] = None,
+                       gw_group: str = '',
                        descending: bool = False, sort_by: str = 'Thread Name',
                        with_timestamp: bool = False,
                        no_header: bool = False,
-                       session_id: Optional[str] = None):
+                       period: float = 1.0, session_id: Optional[str] = None):
         '''
         NVMeoF Top CPU Tool
-        --period [-p] <delay> (default 1s, max 3600s)
-        --service '<service_name>'
-        --server-addr <ip>
-        --group '<group_name>'
-        --sort-by '<header>'
-        --descending
-        --with-timestamp
-        --no-header
         '''
+        if sort_by not in NVMeoFTopCPU.reactors_headers:
+            return HandleCommandResult(
+                stderr=f"Invalid sort-by '{sort_by}': must match a header title: "
+                       f"{NVMeoFTopCPU.reactors_headers}",
+                retval=-errno.EINVAL
+            )
         args = {
-            'service': service,
             'with_timestamp': with_timestamp,
             'no_header': no_header,
             'sort_descending': descending,
             'sort_by': sort_by,
-            'server_addr': server_addr,
-            'group': group,
+            'server_address': server_address,
+            'server_port': server_port,
+            'gw_group': gw_group,
+            'period': period,
+            'session_id': session_id,
         }
-        try:
-            data_collector = get_collector(session_id)
-            if data_collector is None:
-                return HandleCommandResult(
-                    stderr="Unable to initialise collector",
-                    retval=-errno.EINVAL
-                )
-            top_tool = NVMeoFTopCPU(args, data_collector)
-            rc, output = top_tool.run()
-            if rc != 0:
-                return HandleCommandResult(stderr=output, retval=rc)
-            return HandleCommandResult(stdout=output, retval=rc)
-        except Exception as exc:  # pylint: disable=broad-except
-            logger.exception("top-cpu command failed: %s", exc)
-            return HandleCommandResult(stderr=str(exc), retval=-errno.EINVAL)
+        rc, output = NVMeoFTopCPU(args).run()
+        if rc != 0:
+            return HandleCommandResult(stderr=output, retval=rc)
+        return HandleCommandResult(stdout=output, retval=rc)
 
     @DBCLICommand.Read('nvmeof top io', poll=True)
-    def nvmeof_top_io(_, subsystem: str = '',
-                      server_addr: str = '', group: str = '',
+    def nvmeof_top_io(_, nqn: str = '',
+                      server_address: str = '', server_port: Optional[int] = None,
+                      gw_group: str = '',
                       descending: bool = False, sort_by: str = 'NSID',
                       with_timestamp: bool = False,
                       summary: bool = False, no_header: bool = False,
-                      session_id: Optional[str] = None):
+                      period: float = 1.0, session_id: Optional[str] = None):
         '''
         NVMeoF Top IO Tool
-        --period [-p] <delay> (default 1s, max 3600s)
-        --subsystem '<nqn>'
-        --server-addr <ip>
-        --group '<group_name>'
-        --descending
-        --sort-by '<header>'
-        --with-timestamp
-        --summary
-        --no-header
         '''
+        if not nqn:
+            return HandleCommandResult(
+                stderr="Required argument '--nqn' missing",
+                retval=-errno.EINVAL
+            )
+        if sort_by not in NVMeoFTopIO.ns_headers:
+            return HandleCommandResult(
+                stderr=f"Invalid sort-by '{sort_by}': must match a header title: "
+                       f"{NVMeoFTopIO.ns_headers}",
+                retval=-errno.EINVAL
+            )
         args = {
-            'subsystem': subsystem,
+            'nqn': nqn,
             'with_timestamp': with_timestamp,
             'summary': summary,
             'no_header': no_header,
             'sort_descending': descending,
             'sort_by': sort_by,
-            'server_addr': server_addr,
-            'group': group,
+            'server_address': server_address,
+            'server_port': server_port,
+            'gw_group': gw_group,
+            'period': period,
+            'session_id': session_id,
         }
-        if not subsystem:
-            return HandleCommandResult(
-                stderr="Required argument '--subsystem' missing",
-                retval=-errno.EINVAL
-            )
-        try:
-            data_collector = get_collector(session_id)
-            if data_collector is None:
-                return HandleCommandResult(
-                    stderr="Unable to initialise collector",
-                    retval=-errno.EINVAL
-                )
-            top_tool = NVMeoFTopIO(args, data_collector)
-            rc, output = top_tool.run()
-            if rc != 0:
-                return HandleCommandResult(stderr=output, retval=rc)
-            return HandleCommandResult(stdout=output, retval=rc)
-        except Exception as exc:  # pylint: disable=broad-except
-            logger.exception("top-io command failed: %s", exc)
-            return HandleCommandResult(stderr=str(exc), retval=-errno.EINVAL)
+        rc, output = NVMeoFTopIO(args).run()
+        if rc != 0:
+            return HandleCommandResult(stderr=output, retval=rc)
+        return HandleCommandResult(stdout=output, retval=rc)
index 8c9f691310b1b93c79151e91fdf2e1a27dac7c1b..0b24a6ffab8303d298eb13b966218920f7491d4c 100644 (file)
@@ -5,7 +5,8 @@ from unittest.mock import MagicMock, patch
 
 import pytest
 
-from ..services.nvmeof_top_cli import Counter, NvmeofTopCollector, NVMeoFTopCPU, NVMeoFTopIO
+from ..services.nvmeof_top_cli import MAX_SESSION_TTL, Counter, \
+    NvmeofTopCollector, NVMeoFTopCPU, NVMeoFTopIO
 from ..tests import CLICommandTestMixin, CmdException
 
 
@@ -24,6 +25,7 @@ def fixture_io_collector():
     collector.get_sorted_namespaces.return_value = []
     collector.get_subsystem_summary_data.return_value = []
     collector.get_overall_summary_data.return_value = []
+    collector.get_gateway_summary_data.return_value = []
     collector.delay = 2.0
     collector.timestamp = time.time()
     return collector
@@ -62,13 +64,16 @@ class TestNVMeoFTopCPUFormat:
         'sort_descending': False,
         'with_timestamp': False,
         'no_header': False,
-        'service': '',
-        'server_addr': '',
-        'group': '',
+        'server_address': '',
+        'server_port': None,
+        'gw_group': '',
+        'period': 1,
+        'session_id': None,
     }
 
     def test_headers(self, cpu_collector):
-        tool = NVMeoFTopCPU(self.default_args, cpu_collector)
+        tool = NVMeoFTopCPU(self.default_args)
+        tool.collector = cpu_collector
         output = tool.format_output()
         assert 'Gateway' in output
         assert 'Thread Name' in output
@@ -76,29 +81,33 @@ class TestNVMeoFTopCPUFormat:
         assert 'Idle Rate%' in output
 
     def test_no_header(self, cpu_collector):
-        tool = NVMeoFTopCPU({**self.default_args, 'no_header': True}, cpu_collector)
+        tool = NVMeoFTopCPU({**self.default_args, 'no_header': True})
+        tool.collector = cpu_collector
         output = tool.format_output()
         assert 'Gateway' not in output
 
     def test_with_timestamp(self, cpu_collector):
-        tool = NVMeoFTopCPU({**self.default_args, 'with_timestamp': True}, cpu_collector)
+        tool = NVMeoFTopCPU({**self.default_args, 'with_timestamp': True})
+        tool.collector = cpu_collector
         output = tool.format_output()
         assert 'delay:' in output
         assert '1.50s' in output
 
     def test_reactor_data(self, cpu_collector):
         cpu_collector.get_reactor_data.return_value = [
-            ('192.168.1.1:5500', 'reactor_0', '72.50', '27.50'),
-            ('192.168.1.1:5500', 'reactor_1', '45.00', '55.00'),
+            ('192.168.1.1:5500', 'reactor_0', 72.50, 27.50),
+            ('192.168.1.1:5500', 'reactor_1', 45.00, 55.00),
         ]
-        tool = NVMeoFTopCPU(self.default_args, cpu_collector)
+        tool = NVMeoFTopCPU(self.default_args)
+        tool.collector = cpu_collector
         output = tool.format_output()
         assert '192.168.1.1:5500' in output
         assert 'reactor_0' in output
         assert 'reactor_1' in output
 
     def test_invalid_sort_key(self, cpu_collector):
-        tool = NVMeoFTopCPU({**self.default_args, 'sort_by': 'NonExistent'}, cpu_collector)
+        tool = NVMeoFTopCPU({**self.default_args, 'sort_by': 'NonExistent'})
+        tool.collector = cpu_collector
         with pytest.raises(ValueError, match="Invalid sort key"):
             tool.format_output()
 
@@ -110,24 +119,30 @@ class TestNVMeoFTopIOFormat:
         'with_timestamp': False,
         'no_header': False,
         'summary': False,
-        'subsystem': 'nqn.2024-01.io.spdk:cnode1',
-        'server_addr': '',
-        'group': '',
+        'nqn': 'nqn.2024-01.io.spdk:cnode1',
+        'server_address': '',
+        'server_port': None,
+        'gw_group': '',
+        'period': 1,
+        'session_id': None,
     }
 
     def test_no_namespaces(self, io_collector):
-        tool = NVMeoFTopIO(self.default_args, io_collector)
+        tool = NVMeoFTopIO(self.default_args)
+        tool.collector = io_collector
         output = tool.format_output()
         assert '<no namespaces defined>' in output
 
     def test_headers_present(self, io_collector):
-        tool = NVMeoFTopIO(self.default_args, io_collector)
+        tool = NVMeoFTopIO(self.default_args)
+        tool.collector = io_collector
         output = tool.format_output()
         assert 'NSID' in output
         assert 'RBD Image' in output
 
     def test_no_header(self, io_collector):
-        tool = NVMeoFTopIO({**self.default_args, 'no_header': True}, io_collector)
+        tool = NVMeoFTopIO({**self.default_args, 'no_header': True})
+        tool.collector = io_collector
         output = tool.format_output()
         assert 'NSID' not in output
 
@@ -138,18 +153,21 @@ class TestNVMeoFTopIOFormat:
             (2, 'pool/image2', 200, 100, '2.00', '1.00', '8.00',
              100, '2.00', '1.00', '8.00', '2', 'Yes'),
         ]
-        tool = NVMeoFTopIO(self.default_args, io_collector)
+        tool = NVMeoFTopIO(self.default_args)
+        tool.collector = io_collector
         output = tool.format_output()
         assert 'pool/image1' in output
         assert 'pool/image2' in output
 
     def test_with_timestamp(self, io_collector):
-        tool = NVMeoFTopIO({**self.default_args, 'with_timestamp': True}, io_collector)
+        tool = NVMeoFTopIO({**self.default_args, 'with_timestamp': True})
+        tool.collector = io_collector
         output = tool.format_output()
         assert 'delay:' in output
 
     def test_invalid_sort_key(self, io_collector):
-        tool = NVMeoFTopIO({**self.default_args, 'sort_by': 'BadKey'}, io_collector)
+        tool = NVMeoFTopIO({**self.default_args, 'sort_by': 'BadKey'})
+        tool.collector = io_collector
         with pytest.raises(ValueError, match="Invalid sort key"):
             tool.format_output()
 
@@ -159,9 +177,10 @@ class TestNvmeofTopCollector:
     def collector(self):
         c = NvmeofTopCollector()
         c.client = MagicMock()
-        c.client.service_name = 'myservice'
+        c.service = 'myservice'
+        c.group = 'mygroup'
         c.tool = MagicMock()
-        c.tool.args = {'group': '', 'server_addr': ''}
+        c.tool.args = {'gw_group': '', 'server_address': ''}
         return c
 
     def test_grpc_call_failure(self, collector):
@@ -172,13 +191,149 @@ class TestNvmeofTopCollector:
         assert collector.health.rc == -errno.ECONNREFUSED
         assert collector.health.msg == 'RPC endpoint unavailable at 192.168.1.1:5500'
 
-    def test_collect_cpu_data_service_not_found(self, collector):
-        collector.tool.service_name = 'myservice'
+    def test_set_gateways_no_services(self, collector):
+        collector.service = ''
+        collector.group = ''
         with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
                    return_value={'gateways': {}}):
-            collector.collect_cpu_data()
+            collector._set_gateways('', '')  # pylint: disable=protected-access
+        assert collector.health.rc == -errno.ENOENT
+        assert collector.health.msg == 'No NVMeoF gateways configured'
+
+    def test_set_gateways_multiple_groups_no_filter(self, collector):
+        collector.service = ''
+        collector.group = ''
+        config = {'gateways': {
+            'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+            'nvmeof.pool.group2': [{'service_url': '2.2.2.2:5500', 'group': 'group2'}],
+        }}
+        with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value=config):
+            collector._set_gateways('', '')  # pylint: disable=protected-access
+        assert collector.health.rc == -errno.EINVAL
+        assert 'Multiple gateway groups found' in collector.health.msg
+
+    def test_set_gateways_address_not_found(self, collector):
+        collector.service = ''
+        collector.group = ''
+        config = {'gateways': {
+            'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+        }}
+        with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value=config):
+            collector._set_gateways('', '9.9.9.9')  # pylint: disable=protected-access
+        assert collector.health.rc == -errno.ENOENT
+        assert 'No gateway found matching address' in collector.health.msg
+
+    def test_set_gateways_group_not_found(self, collector):
+        collector.service = ''
+        collector.group = ''
+        config = {'gateways': {
+            'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+        }}
+        with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value=config):
+            collector._set_gateways('nonexistent', '')  # pylint: disable=protected-access
+        assert collector.health.rc == -errno.ENOENT
+        assert "Gateway group 'nonexistent' not found" in collector.health.msg
+
+    def test_set_gateways_address_group_mismatch(self, collector):
+        collector.service = ''
+        collector.group = ''
+        config = {'gateways': {
+            'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+        }}
+        with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value=config):
+            collector._set_gateways('group2', '1.1.1.1')  # pylint: disable=protected-access
+        assert collector.health.rc == -errno.EINVAL
+        assert "Address '1.1.1.1' belongs to group 'group1', not 'group2'" in collector.health.msg
+
+    def test_set_gateways_single_service_auto_detect(self, collector):
+        collector.service = ''
+        collector.group = ''
+        config = {'gateways': {
+            'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+        }}
+        with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value=config), \
+             patch.object(collector, '_get_client'):
+            collector._set_gateways('', '')  # pylint: disable=protected-access
+        assert collector.service == 'nvmeof.pool.group1'
+        assert collector.group == 'group1'
+
+    def test_set_gateways_by_group(self, collector):
+        collector.service = ''
+        collector.group = ''
+        config = {'gateways': {
+            'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+            'nvmeof.pool.group2': [{'service_url': '2.2.2.2:5500', 'group': 'group2'}],
+        }}
+        with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value=config), \
+             patch.object(collector, '_get_client'):
+            collector._set_gateways('group1', '')  # pylint: disable=protected-access
+        assert collector.service == 'nvmeof.pool.group1'
+        assert collector.group == 'group1'
+
+    def test_set_gateways_by_address(self, collector):
+        collector.service = ''
+        collector.group = ''
+        config = {'gateways': {
+            'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+        }}
+        with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value=config), \
+             patch.object(collector, '_get_client'):
+            collector._set_gateways('', '1.1.1.1')  # pylint: disable=protected-access
+        assert collector.service == 'nvmeof.pool.group1'
+        assert collector.group == 'group1'
+
+    def test_set_gateways_by_address_no_substring_match(self, collector):
+        collector.service = ''
+        collector.group = ''
+        config = {'gateways': {
+            'nvmeof.pool.group1': [{'service_url': '1.1.1.11:5500', 'group': 'group1'}],
+        }}
+        with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value=config):
+            collector._set_gateways('', '1.1.1.1')  # pylint: disable=protected-access
         assert collector.health.rc == -errno.ENOENT
-        assert collector.health.msg == 'Service myservice not found'
+
+    def test_set_gateways_by_address_and_port(self, collector):
+        collector.service = ''
+        collector.group = ''
+        config = {'gateways': {
+            'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+        }}
+        with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value=config), \
+             patch.object(collector, '_get_client'):
+            collector._set_gateways('', '1.1.1.1', 5500)  # pylint: disable=protected-access
+        assert collector.service == 'nvmeof.pool.group1'
+
+    def test_set_gateways_port_mismatch(self, collector):
+        collector.service = ''
+        collector.group = ''
+        config = {'gateways': {
+            'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+        }}
+        with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value=config):
+            collector._set_gateways('', '1.1.1.1', 9999)  # pylint: disable=protected-access
+        assert collector.health.rc == -errno.ENOENT
+
+    def test_set_gateways_ipv6_address(self, collector):
+        collector.service = ''
+        collector.group = ''
+        config = {'gateways': {
+            'nvmeof.pool.group1': [{'service_url': '[::1]:5500', 'group': 'group1'}],
+        }}
+        with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+                   return_value=config), \
+             patch.object(collector, '_get_client'):
+            collector._set_gateways('', '::1')  # pylint: disable=protected-access
+        assert collector.service == 'nvmeof.pool.group1'
 
     def test_collect_io_data_subsystems_unavailable(self, collector):
         collector.tool.subsystem_nqn = 'nqn.test'
@@ -209,23 +364,6 @@ class TestNvmeofTopCollector:
         assert collector.health.rc == -errno.ENOENT
         assert collector.health.msg == 'Subsystem NQN provided not found'
 
-    def test_collect_io_data_service_not_found(self, collector):
-        collector.tool.subsystem_nqn = 'nqn.test'
-        mock_subsystems = MagicMock()
-        mock_subsystems.status = 0
-        mock_sub = MagicMock()
-        mock_sub.nqn = 'nqn.test'
-        mock_subsystems.subsystems = [mock_sub]
-        mock_namespace_info = MagicMock()
-        mock_namespace_info.namespaces = []
-        with patch.object(collector, '_fetch_subsystems', return_value=mock_subsystems), \
-             patch.object(collector, '_fetch_namespaces', return_value=mock_namespace_info), \
-             patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
-                   return_value={'gateways': {}}):
-            collector.collect_io_data()
-        assert collector.health.rc == -errno.ENOENT
-        assert collector.health.msg == 'Service myservice not found'
-
     def test_collect_io_data_lbg_mapping_failed(self, collector):
         collector.tool.subsystem_nqn = 'nqn.test'
         mock_subsystems = MagicMock()
@@ -237,8 +375,6 @@ class TestNvmeofTopCollector:
         mock_namespace_info.namespaces = []
         with patch.object(collector, '_fetch_subsystems', return_value=mock_subsystems), \
              patch.object(collector, '_fetch_namespaces', return_value=mock_namespace_info), \
-             patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
-                   return_value={'gateways': {'myservice': []}}), \
              patch('dashboard.services.nvmeof_top_cli.get_lbg_gws_map', return_value={}):
             collector.collect_io_data()
         assert collector.health.rc == -errno.ENOENT
@@ -246,46 +382,96 @@ class TestNvmeofTopCollector:
             'Failed to retrieve load balancing group mapping for service myservice'
 
 
+class TestNvmeofTopValidateArgs:
+    base_args = {
+        'sort_by': 'Thread Name',
+        'sort_descending': False,
+        'with_timestamp': False,
+        'no_header': False,
+        'server_address': '',
+        'server_port': None,
+        'gw_group': '',
+        'period': 1,
+        'session_id': None,
+    }
+
+    def _validate(self, **overrides):
+        args = {**self.base_args, **overrides}
+        return NVMeoFTopCPU(args)._validate_args()  # pylint: disable=protected-access
+
+    def test_valid_period(self):
+        assert self._validate(period=5) is None
+
+    def test_invalid_period_too_low(self):
+        rc, msg = self._validate(period=0)
+        assert rc == -errno.EINVAL
+        assert msg == f"Invalid period '0': must be between 1 and {MAX_SESSION_TTL}"
+
+    def test_invalid_period_too_high(self):
+        rc, msg = self._validate(period=999999)
+        assert rc == -errno.EINVAL
+        assert msg == f"Invalid period '999999': must be between 1 and {MAX_SESSION_TTL}"
+
+    def test_valid_server_address(self):
+        assert self._validate(server_address='1.2.3.4') is None
+
+    def test_invalid_server_address(self):
+        rc, msg = self._validate(server_address='not-an-ip')
+        assert rc == -errno.EINVAL
+        assert msg == "Invalid server-address 'not-an-ip': must be a valid IP address"
+
+    def test_valid_server_address_ipv6(self):
+        assert self._validate(server_address='::1') is None
+
+    def test_valid_server_port(self):
+        assert self._validate(server_address='1.2.3.4', server_port=5500) is None
+
+    def test_invalid_server_port_zero(self):
+        rc, msg = self._validate(server_address='1.2.3.4', server_port=0)
+        assert rc == -errno.EINVAL
+        assert msg == "Invalid server-port '0': must be between 1 and 65535"
+
+    def test_invalid_server_port_too_high(self):
+        rc, msg = self._validate(server_address='1.2.3.4', server_port=65536)
+        assert rc == -errno.EINVAL
+        assert msg == "Invalid server-port '65536': must be between 1 and 65535"
+
+    def test_port_without_address_is_valid(self):
+        assert self._validate(server_port=5500) is None
+
+
 class TestNvmeofTopCommands(CLICommandTestMixin):
     @classmethod
     def exec_nvmeof_cmd(cls, cmd, **kwargs):
         return cls.exec_cmd('', prefix=cmd, **kwargs)
 
-    def test_top_io_missing_subsystem_returns_einval(self):
+    def test_top_io_missing_nqn_returns_einval(self):
         with pytest.raises(CmdException) as exc_info:
-            self.exec_nvmeof_cmd('nvmeof top io', subsystem='', session_id='sess1')
+            self.exec_nvmeof_cmd('nvmeof top io', nqn='', session_id='sess1')
         assert exc_info.value.retcode == -errno.EINVAL
-        assert str(exc_info.value) == "Required argument '--subsystem' missing"
+        assert str(exc_info.value) == "Required argument '--nqn' missing"
 
     def test_top_cpu_success(self):
-        with patch('dashboard.services.nvmeof_top_cli.get_collector') as mock_gc:
-            mock_gc.return_value = MagicMock()
-            with patch.object(NVMeoFTopCPU, 'run', return_value=(0, 'cpu output\n')):
-                result = self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1')
-                assert 'cpu output' in result
-                mock_gc.assert_called_once_with('sess1')
+        with patch.object(NVMeoFTopCPU, 'run', return_value=(0, 'cpu output\n')):
+            result = self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1')
+            assert 'cpu output' in result
 
     def test_top_cpu_run_fail(self):
-        with patch('dashboard.services.nvmeof_top_cli.get_collector') as mock_gc:
-            mock_gc.return_value = MagicMock()
-            with patch.object(NVMeoFTopCPU, 'run',
-                              return_value=(-errno.ENOENT, 'error')):
-                with pytest.raises(CmdException) as exc_info:
-                    self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1')
-                assert exc_info.value.retcode == -errno.ENOENT
-                assert str(exc_info.value) == 'error'
+        with patch.object(NVMeoFTopCPU, 'run',
+                          return_value=(-errno.ENOENT, 'error')):
+            with pytest.raises(CmdException) as exc_info:
+                self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1')
+            assert exc_info.value.retcode == -errno.ENOENT
+            assert str(exc_info.value) == 'error'
 
     def test_top_io_success(self):
-        with patch('dashboard.services.nvmeof_top_cli.get_collector') as mock_gc:
-            mock_gc.return_value = MagicMock()
-            with patch.object(NVMeoFTopIO, 'run', return_value=(0, 'io output\n')):
-                result = self.exec_nvmeof_cmd(
-                    'nvmeof top io',
-                    subsystem='nqn.test',
-                    session_id='sess2'
-                )
-                assert 'io output' in result
-                mock_gc.assert_called_once_with('sess2')
+        with patch.object(NVMeoFTopIO, 'run', return_value=(0, 'io output\n')):
+            result = self.exec_nvmeof_cmd(
+                'nvmeof top io',
+                nqn='nqn.test',
+                session_id='sess2'
+            )
+            assert 'io output' in result
 
     def test_top_cpu_get_collector_fail(self):
         with patch('dashboard.services.nvmeof_top_cli.get_collector',
@@ -301,7 +487,7 @@ class TestNvmeofTopCommands(CLICommandTestMixin):
             with pytest.raises(CmdException) as exc_info:
                 self.exec_nvmeof_cmd(
                     'nvmeof top io',
-                    subsystem='nqn.test',
+                    nqn='nqn.test',
                     session_id='sess2'
                 )
             assert exc_info.value.retcode == -errno.EINVAL