# https://github.com/pcuzner/ceph-nvmeof-top
# by Paul Cuzner <pcuzner@ibm.com>
import errno
+import ipaddress
import json
import logging
import time
except ImportError as e:
logger.error("Failed to import NVMeoFClient and related components: %s", e)
else:
+ MAX_SESSION_TTL = 60 * 60
+
def get_collector(session_id: Optional[str]):
- MAX_SESSION_TTL = 60 * 60
return mgr.get_nvmeof_collector(session_id, MAX_SESSION_TTL)
def get_lbg_gws_map(service_name: str):
def __init__(self):
self.tool: Any = None
self.subsystem_nqn = ''
- self.server_addr = ''
+ self.service = ''
+ self.group = ''
self.delay: float = 0.0
self.namespaces = {}
self.lbg_to_gateway: dict = {}
bdev_name = ns.bdev_name
daemon_name = ""
- if self.tool.args.get('server_addr'):
+ if self.tool.args.get('server_address'):
# only show namespaces owned by this gateway's LBG
if ns.load_balancing_group != self.load_balancing_group:
continue
reactor_data.append((
gw_addr,
thread_stats.thread,
- f"{thread_stats.busy_rate * 100:.2f}",
- f"{thread_stats.idle_rate * 100:.2f}",
+ min(thread_stats.busy_rate * 100, 100.0),
+ min(thread_stats.idle_rate * 100, 100.0),
))
reactor_data.sort(key=lambda t: t[sort_pos], reverse=reverse_sort)
return reactor_data
def get_overall_summary_data(self):
return [
- self.server_addr,
+ self.group,
+ self.total_subsystems,
+ self.total_namespaces_overall,
+ ]
+
+ def get_gateway_summary_data(self):
+ return [
+ self.client.gateway_addr,
self.load_balancing_group,
self.total_subsystems,
self.total_namespaces_overall,
def _fetch_subsystems(self):
return self._call_grpc('list_subsystems', NVMeoFClient.pb2.list_subsystems_req())
- def _get_client(self, group, service_url):
- key = (group, service_url)
+ def _get_client(self, group, server_addr):
+ key = (group, server_addr)
if key not in self.clients:
- self.clients[key] = NVMeoFClient(group, service_url)
+ self.clients[key] = NVMeoFClient(group, server_addr)
return self.clients[key]
+ def _set_gateways(self, group_filter: str, addr_filter: str,
+ port_filter: Optional[int] = None):
+ if self.service and self.group:
+ return
+
+ services = NvmeofGatewaysConfig.get_gateways_config().get("gateways", {})
+
+ if not services:
+ self.health.rc = -errno.ENOENT
+ self.health.msg = "No NVMeoF gateways configured"
+ return
+
+ if not addr_filter and not group_filter and len(services) > 1:
+ self.health.rc = -errno.EINVAL
+ self.health.msg = (
+ f"Multiple gateway groups found: {', '.join(services.keys())}. "
+ "Provide --gw-group <name>"
+ )
+ return
+
+ matched_service_name = ''
+ matched_gws = []
+ for svc_name, svc_gateways in services.items():
+ for gw in svc_gateways:
+ gw_host, _, gw_port = gw['service_url'].rpartition(':')
+ gw_host = gw_host.strip('[]')
+ if (addr_filter and addr_filter != gw_host) or \
+ (port_filter and str(port_filter) != gw_port):
+ continue
+ if group_filter and gw.get('group') != group_filter:
+ if addr_filter:
+ self.health.rc = -errno.EINVAL
+ self.health.msg = (
+ f"Address '{addr_filter}' belongs to group "
+ f"'{gw.get('group')}', not '{group_filter}'"
+ )
+ return
+ continue
+ matched_service_name = svc_name
+ matched_gws.append(gw)
+
+ if not matched_gws:
+ if addr_filter:
+ self.health.rc = -errno.ENOENT
+ self.health.msg = f"No gateway found matching address: {addr_filter}"
+ elif group_filter:
+ self.health.rc = -errno.ENOENT
+ self.health.msg = f"Gateway group '{group_filter}' not found"
+ return
+
+ self.service = matched_service_name
+ self.group = matched_gws[0].get('group', '')
+ for gw in matched_gws:
+ self._get_client(self.group, gw['service_url'])
+
def initialise(self, tool):
self.health = Health()
self.tool = tool
- self.client = self._get_client(tool.args.get('group', ''),
- tool.args.get('server_addr', ''))
- self.server_addr = self.client.gateway_addr
+
+ self._set_gateways(
+ group_filter=tool.args.get('gw_group', ''),
+ addr_filter=tool.args.get('server_address', ''),
+ port_filter=tool.args.get('server_port')
+ )
+ if not self.ready:
+ return
now = time.time()
self.delay = (now - self.timestamp)
self.timestamp = now
- self.gw_info = self._fetch_gateway_info(self.client)
- if not self.ready:
- logger.error("Call to %s failed, RC=%s, MSG=%s",
- self.server_addr, self.health.rc, self.health.msg)
- self.health.msg = (
- f"Unable to connect to {self.server_addr}, "
- "pass an available gateway as --server-addr"
- )
- return
+ self.client = next(iter(self.clients.values()))
- logger.debug("Connected to %s", self.server_addr)
+ if self.gw_info is None:
+ self.gw_info = self._fetch_gateway_info(self.client)
+ if not self.ready:
+ self.health.msg = f"Unable to connect to {self.client.gateway_addr}"
+ return
+ logger.debug("Connected to %s", self.client.gateway_addr)
def collect_cpu_data(self):
- service_name = self.tool.service_name
- group = self.tool.args.get('group', '')
- if service_name:
- gw_conf = NvmeofGatewaysConfig.get_gateways_config()
- gateways = gw_conf.get("gateways", {})
- if service_name not in gateways:
- self.health.rc = -errno.ENOENT
- self.health.msg = f'Service {service_name} not found'
+ for client in self.clients.values():
+ self._fetch_thread_stats(client)
+ if not self.ready:
return
- for gw in gateways[service_name]:
- client = self._get_client(group, gw["service_url"])
- self._fetch_thread_stats(client)
- if not self.ready:
- return
- else:
- self._fetch_thread_stats(self.client)
logger.debug("collect_cpu_data completed")
def _set_subsystem_and_namespaces(self):
if not self.ready:
return
- group = self.tool.args.get('group', '')
- if not self.tool.args.get('server_addr'):
- service_name = self.client.service_name
- gw_conf = NvmeofGatewaysConfig.get_gateways_config()
- gateways = gw_conf.get("gateways", {})
- if service_name not in gateways:
- self.health.rc = -errno.ENOENT
- self.health.msg = f'Service {service_name} not found'
- return
- self.lbg_to_gateway = get_lbg_gws_map(service_name)
+ if not self.tool.args.get('server_address'):
+ self.lbg_to_gateway = get_lbg_gws_map(self.service)
if not self.lbg_to_gateway:
self.health.rc = -errno.ENOENT
self.health.msg = (
f'Failed to retrieve load balancing group '
- f'mapping for service {service_name}'
+ f'mapping for service {self.service}'
)
return
- for gw in gateways[service_name]:
- client = self._get_client(group, gw["service_url"])
- self._fetch_namespace_iostats(client)
- if not self.ready:
- return
- else:
- self._fetch_namespace_iostats(self.client)
+ for client in self.clients.values():
+ self._fetch_namespace_iostats(client)
+ if not self.ready:
+ return
logger.debug("collect_io_data completed")
class NVMeoFTopTool:
- def __init__(self, args: dict, data_collector):
+ def __init__(self, args: dict):
self.args = args
- self.collector = data_collector
+ self.collector: Any = None
self.reverse_sort = args.get('sort_descending', False)
self.sort_key = args.get('sort_by')
+ def _validate_args(self) -> Optional[tuple]:
+ period = self.args.get('period', 1)
+ if not 1 <= period <= MAX_SESSION_TTL:
+ return (-errno.EINVAL,
+ f"Invalid period '{period}': must be between 1 and {MAX_SESSION_TTL}")
+ server_address = self.args.get('server_address', '')
+ if server_address:
+ try:
+ ipaddress.ip_address(server_address)
+ except Exception: # pylint: disable=broad-except
+ return (-errno.EINVAL,
+ f"Invalid server-address '{server_address}': "
+ "must be a valid IP address")
+ server_port = self.args.get('server_port')
+ if server_port is not None and not 1 <= server_port <= 65535:
+ return (-errno.EINVAL,
+ f"Invalid server-port '{server_port}': "
+ "must be between 1 and 65535")
+ return None
+
def run(self) -> tuple:
try:
+ err = self._validate_args()
+ if err:
+ return err
+ self.collector = get_collector(self.args.get('session_id'))
+ if self.collector is None:
+ return (-errno.EINVAL, "Unable to initialise collector")
self.collector.initialise(self)
if not self.collector.ready:
return (self.collector.health.rc,
reactors_headers = ['Gateway', 'Thread Name', 'Busy Rate%', 'Idle Rate%']
reactors_template = "{:<30} {:<30} {:<20} {:<20}\n"
- def __init__(self, args: dict, data_collector):
- super().__init__(args, data_collector)
- self.service_name = args.get('service')
-
def _collect(self):
self.collector.collect_cpu_data()
if not self.args.get('no_header'):
rows.append(NVMeoFTopCPU.reactors_template.format(*NVMeoFTopCPU.reactors_headers))
- for reactor in reactor_data:
- rows.append(NVMeoFTopCPU.reactors_template.format(*reactor))
+ for gw_addr, thread_name, busy_rate, idle_rate in reactor_data:
+ rows.append(NVMeoFTopCPU.reactors_template.format(
+ gw_addr, thread_name, f"{busy_rate:.2f}", f"{idle_rate:.2f}"
+ ))
rows.append("\n")
return ''.join(rows)
class NVMeoFTopIO(NVMeoFTopTool):
subsystem_summary_headers = ['Subsystem', 'Namespaces']
- summary_headers = ['Gateway', 'Load Balancing Group',
- 'Total Subsystems', 'Total Namespaces']
+ gateway_summary_headers = ['Gateway', 'Load Balancing Group',
+ 'Total Subsystems', 'Total Namespaces']
+ summary_headers = ['Group', 'Total Subsystems', 'Total Namespaces']
ns_headers = [
'NSID', 'RBD Image', 'IOPS', 'r/s', 'rMB/s', 'r_await', 'rareq-sz',
" {:>6} {:>6} {:>7} {:>8} {:^5} {:>3}\n"
)
- def __init__(self, args: dict, data_collector):
- super().__init__(args, data_collector)
- self.subsystem_nqn = args.get('subsystem')
+ def __init__(self, args: dict):
+ super().__init__(args)
+ self.subsystem_nqn = args.get('nqn')
def _collect(self):
self.collector.collect_io_data()
time.localtime(self.collector.timestamp))
rows.append(f"{timestamp} (delay: {self.collector.delay:.2f}s)\n")
if self.args.get('summary'):
- if self.args.get('server_addr'):
- summary_row = ""
+ summary_row = ""
+ if self.args.get('server_address'):
+ gateway_summary_data = self.collector.get_gateway_summary_data()
+ for index, header in enumerate(NVMeoFTopIO.gateway_summary_headers):
+ summary_row += f"{header}: {gateway_summary_data[index]} "
+ else:
for index, header in enumerate(NVMeoFTopIO.summary_headers):
summary_row += f"{header}: {overall_summary_data[index]} "
- rows.append(summary_row + "\n")
+ rows.append(summary_row + "\n")
subsys_summary_row = ""
for index, header in enumerate(NVMeoFTopIO.subsystem_summary_headers):
subsys_summary_row += f"{header}: {subsystem_summary_data[index]} "
return ''.join(rows)
@DBCLICommand.Read('nvmeof top cpu', poll=True)
- def nvmeof_top_cpu(_, service: str = '',
- server_addr: str = '', group: str = '',
+ def nvmeof_top_cpu(_, server_address: str = '', server_port: Optional[int] = None,
+ gw_group: str = '',
descending: bool = False, sort_by: str = 'Thread Name',
with_timestamp: bool = False,
no_header: bool = False,
- session_id: Optional[str] = None):
+ period: float = 1.0, session_id: Optional[str] = None):
'''
NVMeoF Top CPU Tool
- --period [-p] <delay> (default 1s, max 3600s)
- --service '<service_name>'
- --server-addr <ip>
- --group '<group_name>'
- --sort-by '<header>'
- --descending
- --with-timestamp
- --no-header
'''
+ if sort_by not in NVMeoFTopCPU.reactors_headers:
+ return HandleCommandResult(
+ stderr=f"Invalid sort-by '{sort_by}': must match a header title: "
+ f"{NVMeoFTopCPU.reactors_headers}",
+ retval=-errno.EINVAL
+ )
args = {
- 'service': service,
'with_timestamp': with_timestamp,
'no_header': no_header,
'sort_descending': descending,
'sort_by': sort_by,
- 'server_addr': server_addr,
- 'group': group,
+ 'server_address': server_address,
+ 'server_port': server_port,
+ 'gw_group': gw_group,
+ 'period': period,
+ 'session_id': session_id,
}
- try:
- data_collector = get_collector(session_id)
- if data_collector is None:
- return HandleCommandResult(
- stderr="Unable to initialise collector",
- retval=-errno.EINVAL
- )
- top_tool = NVMeoFTopCPU(args, data_collector)
- rc, output = top_tool.run()
- if rc != 0:
- return HandleCommandResult(stderr=output, retval=rc)
- return HandleCommandResult(stdout=output, retval=rc)
- except Exception as exc: # pylint: disable=broad-except
- logger.exception("top-cpu command failed: %s", exc)
- return HandleCommandResult(stderr=str(exc), retval=-errno.EINVAL)
+ rc, output = NVMeoFTopCPU(args).run()
+ if rc != 0:
+ return HandleCommandResult(stderr=output, retval=rc)
+ return HandleCommandResult(stdout=output, retval=rc)
@DBCLICommand.Read('nvmeof top io', poll=True)
- def nvmeof_top_io(_, subsystem: str = '',
- server_addr: str = '', group: str = '',
+ def nvmeof_top_io(_, nqn: str = '',
+ server_address: str = '', server_port: Optional[int] = None,
+ gw_group: str = '',
descending: bool = False, sort_by: str = 'NSID',
with_timestamp: bool = False,
summary: bool = False, no_header: bool = False,
- session_id: Optional[str] = None):
+ period: float = 1.0, session_id: Optional[str] = None):
'''
NVMeoF Top IO Tool
- --period [-p] <delay> (default 1s, max 3600s)
- --subsystem '<nqn>'
- --server-addr <ip>
- --group '<group_name>'
- --descending
- --sort-by '<header>'
- --with-timestamp
- --summary
- --no-header
'''
+ if not nqn:
+ return HandleCommandResult(
+ stderr="Required argument '--nqn' missing",
+ retval=-errno.EINVAL
+ )
+ if sort_by not in NVMeoFTopIO.ns_headers:
+ return HandleCommandResult(
+ stderr=f"Invalid sort-by '{sort_by}': must match a header title: "
+ f"{NVMeoFTopIO.ns_headers}",
+ retval=-errno.EINVAL
+ )
args = {
- 'subsystem': subsystem,
+ 'nqn': nqn,
'with_timestamp': with_timestamp,
'summary': summary,
'no_header': no_header,
'sort_descending': descending,
'sort_by': sort_by,
- 'server_addr': server_addr,
- 'group': group,
+ 'server_address': server_address,
+ 'server_port': server_port,
+ 'gw_group': gw_group,
+ 'period': period,
+ 'session_id': session_id,
}
- if not subsystem:
- return HandleCommandResult(
- stderr="Required argument '--subsystem' missing",
- retval=-errno.EINVAL
- )
- try:
- data_collector = get_collector(session_id)
- if data_collector is None:
- return HandleCommandResult(
- stderr="Unable to initialise collector",
- retval=-errno.EINVAL
- )
- top_tool = NVMeoFTopIO(args, data_collector)
- rc, output = top_tool.run()
- if rc != 0:
- return HandleCommandResult(stderr=output, retval=rc)
- return HandleCommandResult(stdout=output, retval=rc)
- except Exception as exc: # pylint: disable=broad-except
- logger.exception("top-io command failed: %s", exc)
- return HandleCommandResult(stderr=str(exc), retval=-errno.EINVAL)
+ rc, output = NVMeoFTopIO(args).run()
+ if rc != 0:
+ return HandleCommandResult(stderr=output, retval=rc)
+ return HandleCommandResult(stdout=output, retval=rc)
import pytest
-from ..services.nvmeof_top_cli import Counter, NvmeofTopCollector, NVMeoFTopCPU, NVMeoFTopIO
+from ..services.nvmeof_top_cli import MAX_SESSION_TTL, Counter, \
+ NvmeofTopCollector, NVMeoFTopCPU, NVMeoFTopIO
from ..tests import CLICommandTestMixin, CmdException
collector.get_sorted_namespaces.return_value = []
collector.get_subsystem_summary_data.return_value = []
collector.get_overall_summary_data.return_value = []
+ collector.get_gateway_summary_data.return_value = []
collector.delay = 2.0
collector.timestamp = time.time()
return collector
'sort_descending': False,
'with_timestamp': False,
'no_header': False,
- 'service': '',
- 'server_addr': '',
- 'group': '',
+ 'server_address': '',
+ 'server_port': None,
+ 'gw_group': '',
+ 'period': 1,
+ 'session_id': None,
}
def test_headers(self, cpu_collector):
- tool = NVMeoFTopCPU(self.default_args, cpu_collector)
+ tool = NVMeoFTopCPU(self.default_args)
+ tool.collector = cpu_collector
output = tool.format_output()
assert 'Gateway' in output
assert 'Thread Name' in output
assert 'Idle Rate%' in output
def test_no_header(self, cpu_collector):
- tool = NVMeoFTopCPU({**self.default_args, 'no_header': True}, cpu_collector)
+ tool = NVMeoFTopCPU({**self.default_args, 'no_header': True})
+ tool.collector = cpu_collector
output = tool.format_output()
assert 'Gateway' not in output
def test_with_timestamp(self, cpu_collector):
- tool = NVMeoFTopCPU({**self.default_args, 'with_timestamp': True}, cpu_collector)
+ tool = NVMeoFTopCPU({**self.default_args, 'with_timestamp': True})
+ tool.collector = cpu_collector
output = tool.format_output()
assert 'delay:' in output
assert '1.50s' in output
def test_reactor_data(self, cpu_collector):
cpu_collector.get_reactor_data.return_value = [
- ('192.168.1.1:5500', 'reactor_0', '72.50', '27.50'),
- ('192.168.1.1:5500', 'reactor_1', '45.00', '55.00'),
+ ('192.168.1.1:5500', 'reactor_0', 72.50, 27.50),
+ ('192.168.1.1:5500', 'reactor_1', 45.00, 55.00),
]
- tool = NVMeoFTopCPU(self.default_args, cpu_collector)
+ tool = NVMeoFTopCPU(self.default_args)
+ tool.collector = cpu_collector
output = tool.format_output()
assert '192.168.1.1:5500' in output
assert 'reactor_0' in output
assert 'reactor_1' in output
def test_invalid_sort_key(self, cpu_collector):
- tool = NVMeoFTopCPU({**self.default_args, 'sort_by': 'NonExistent'}, cpu_collector)
+ tool = NVMeoFTopCPU({**self.default_args, 'sort_by': 'NonExistent'})
+ tool.collector = cpu_collector
with pytest.raises(ValueError, match="Invalid sort key"):
tool.format_output()
'with_timestamp': False,
'no_header': False,
'summary': False,
- 'subsystem': 'nqn.2024-01.io.spdk:cnode1',
- 'server_addr': '',
- 'group': '',
+ 'nqn': 'nqn.2024-01.io.spdk:cnode1',
+ 'server_address': '',
+ 'server_port': None,
+ 'gw_group': '',
+ 'period': 1,
+ 'session_id': None,
}
def test_no_namespaces(self, io_collector):
- tool = NVMeoFTopIO(self.default_args, io_collector)
+ tool = NVMeoFTopIO(self.default_args)
+ tool.collector = io_collector
output = tool.format_output()
assert '<no namespaces defined>' in output
def test_headers_present(self, io_collector):
- tool = NVMeoFTopIO(self.default_args, io_collector)
+ tool = NVMeoFTopIO(self.default_args)
+ tool.collector = io_collector
output = tool.format_output()
assert 'NSID' in output
assert 'RBD Image' in output
def test_no_header(self, io_collector):
- tool = NVMeoFTopIO({**self.default_args, 'no_header': True}, io_collector)
+ tool = NVMeoFTopIO({**self.default_args, 'no_header': True})
+ tool.collector = io_collector
output = tool.format_output()
assert 'NSID' not in output
(2, 'pool/image2', 200, 100, '2.00', '1.00', '8.00',
100, '2.00', '1.00', '8.00', '2', 'Yes'),
]
- tool = NVMeoFTopIO(self.default_args, io_collector)
+ tool = NVMeoFTopIO(self.default_args)
+ tool.collector = io_collector
output = tool.format_output()
assert 'pool/image1' in output
assert 'pool/image2' in output
def test_with_timestamp(self, io_collector):
- tool = NVMeoFTopIO({**self.default_args, 'with_timestamp': True}, io_collector)
+ tool = NVMeoFTopIO({**self.default_args, 'with_timestamp': True})
+ tool.collector = io_collector
output = tool.format_output()
assert 'delay:' in output
def test_invalid_sort_key(self, io_collector):
- tool = NVMeoFTopIO({**self.default_args, 'sort_by': 'BadKey'}, io_collector)
+ tool = NVMeoFTopIO({**self.default_args, 'sort_by': 'BadKey'})
+ tool.collector = io_collector
with pytest.raises(ValueError, match="Invalid sort key"):
tool.format_output()
def collector(self):
c = NvmeofTopCollector()
c.client = MagicMock()
- c.client.service_name = 'myservice'
+ c.service = 'myservice'
+ c.group = 'mygroup'
c.tool = MagicMock()
- c.tool.args = {'group': '', 'server_addr': ''}
+ c.tool.args = {'gw_group': '', 'server_address': ''}
return c
def test_grpc_call_failure(self, collector):
assert collector.health.rc == -errno.ECONNREFUSED
assert collector.health.msg == 'RPC endpoint unavailable at 192.168.1.1:5500'
- def test_collect_cpu_data_service_not_found(self, collector):
- collector.tool.service_name = 'myservice'
+ def test_set_gateways_no_services(self, collector):
+ collector.service = ''
+ collector.group = ''
with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
return_value={'gateways': {}}):
- collector.collect_cpu_data()
+ collector._set_gateways('', '') # pylint: disable=protected-access
+ assert collector.health.rc == -errno.ENOENT
+ assert collector.health.msg == 'No NVMeoF gateways configured'
+
+ def test_set_gateways_multiple_groups_no_filter(self, collector):
+ collector.service = ''
+ collector.group = ''
+ config = {'gateways': {
+ 'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+ 'nvmeof.pool.group2': [{'service_url': '2.2.2.2:5500', 'group': 'group2'}],
+ }}
+ with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+ return_value=config):
+ collector._set_gateways('', '') # pylint: disable=protected-access
+ assert collector.health.rc == -errno.EINVAL
+ assert 'Multiple gateway groups found' in collector.health.msg
+
+ def test_set_gateways_address_not_found(self, collector):
+ collector.service = ''
+ collector.group = ''
+ config = {'gateways': {
+ 'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+ }}
+ with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+ return_value=config):
+ collector._set_gateways('', '9.9.9.9') # pylint: disable=protected-access
+ assert collector.health.rc == -errno.ENOENT
+ assert 'No gateway found matching address' in collector.health.msg
+
+ def test_set_gateways_group_not_found(self, collector):
+ collector.service = ''
+ collector.group = ''
+ config = {'gateways': {
+ 'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+ }}
+ with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+ return_value=config):
+ collector._set_gateways('nonexistent', '') # pylint: disable=protected-access
+ assert collector.health.rc == -errno.ENOENT
+ assert "Gateway group 'nonexistent' not found" in collector.health.msg
+
+ def test_set_gateways_address_group_mismatch(self, collector):
+ collector.service = ''
+ collector.group = ''
+ config = {'gateways': {
+ 'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+ }}
+ with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+ return_value=config):
+ collector._set_gateways('group2', '1.1.1.1') # pylint: disable=protected-access
+ assert collector.health.rc == -errno.EINVAL
+ assert "Address '1.1.1.1' belongs to group 'group1', not 'group2'" in collector.health.msg
+
+ def test_set_gateways_single_service_auto_detect(self, collector):
+ collector.service = ''
+ collector.group = ''
+ config = {'gateways': {
+ 'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+ }}
+ with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+ return_value=config), \
+ patch.object(collector, '_get_client'):
+ collector._set_gateways('', '') # pylint: disable=protected-access
+ assert collector.service == 'nvmeof.pool.group1'
+ assert collector.group == 'group1'
+
+ def test_set_gateways_by_group(self, collector):
+ collector.service = ''
+ collector.group = ''
+ config = {'gateways': {
+ 'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+ 'nvmeof.pool.group2': [{'service_url': '2.2.2.2:5500', 'group': 'group2'}],
+ }}
+ with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+ return_value=config), \
+ patch.object(collector, '_get_client'):
+ collector._set_gateways('group1', '') # pylint: disable=protected-access
+ assert collector.service == 'nvmeof.pool.group1'
+ assert collector.group == 'group1'
+
+ def test_set_gateways_by_address(self, collector):
+ collector.service = ''
+ collector.group = ''
+ config = {'gateways': {
+ 'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+ }}
+ with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+ return_value=config), \
+ patch.object(collector, '_get_client'):
+ collector._set_gateways('', '1.1.1.1') # pylint: disable=protected-access
+ assert collector.service == 'nvmeof.pool.group1'
+ assert collector.group == 'group1'
+
+ def test_set_gateways_by_address_no_substring_match(self, collector):
+ collector.service = ''
+ collector.group = ''
+ config = {'gateways': {
+ 'nvmeof.pool.group1': [{'service_url': '1.1.1.11:5500', 'group': 'group1'}],
+ }}
+ with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+ return_value=config):
+ collector._set_gateways('', '1.1.1.1') # pylint: disable=protected-access
assert collector.health.rc == -errno.ENOENT
- assert collector.health.msg == 'Service myservice not found'
+
+ def test_set_gateways_by_address_and_port(self, collector):
+ collector.service = ''
+ collector.group = ''
+ config = {'gateways': {
+ 'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+ }}
+ with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+ return_value=config), \
+ patch.object(collector, '_get_client'):
+ collector._set_gateways('', '1.1.1.1', 5500) # pylint: disable=protected-access
+ assert collector.service == 'nvmeof.pool.group1'
+
+ def test_set_gateways_port_mismatch(self, collector):
+ collector.service = ''
+ collector.group = ''
+ config = {'gateways': {
+ 'nvmeof.pool.group1': [{'service_url': '1.1.1.1:5500', 'group': 'group1'}],
+ }}
+ with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+ return_value=config):
+ collector._set_gateways('', '1.1.1.1', 9999) # pylint: disable=protected-access
+ assert collector.health.rc == -errno.ENOENT
+
+ def test_set_gateways_ipv6_address(self, collector):
+ collector.service = ''
+ collector.group = ''
+ config = {'gateways': {
+ 'nvmeof.pool.group1': [{'service_url': '[::1]:5500', 'group': 'group1'}],
+ }}
+ with patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
+ return_value=config), \
+ patch.object(collector, '_get_client'):
+ collector._set_gateways('', '::1') # pylint: disable=protected-access
+ assert collector.service == 'nvmeof.pool.group1'
def test_collect_io_data_subsystems_unavailable(self, collector):
collector.tool.subsystem_nqn = 'nqn.test'
assert collector.health.rc == -errno.ENOENT
assert collector.health.msg == 'Subsystem NQN provided not found'
- def test_collect_io_data_service_not_found(self, collector):
- collector.tool.subsystem_nqn = 'nqn.test'
- mock_subsystems = MagicMock()
- mock_subsystems.status = 0
- mock_sub = MagicMock()
- mock_sub.nqn = 'nqn.test'
- mock_subsystems.subsystems = [mock_sub]
- mock_namespace_info = MagicMock()
- mock_namespace_info.namespaces = []
- with patch.object(collector, '_fetch_subsystems', return_value=mock_subsystems), \
- patch.object(collector, '_fetch_namespaces', return_value=mock_namespace_info), \
- patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
- return_value={'gateways': {}}):
- collector.collect_io_data()
- assert collector.health.rc == -errno.ENOENT
- assert collector.health.msg == 'Service myservice not found'
-
def test_collect_io_data_lbg_mapping_failed(self, collector):
collector.tool.subsystem_nqn = 'nqn.test'
mock_subsystems = MagicMock()
mock_namespace_info.namespaces = []
with patch.object(collector, '_fetch_subsystems', return_value=mock_subsystems), \
patch.object(collector, '_fetch_namespaces', return_value=mock_namespace_info), \
- patch('dashboard.services.nvmeof_top_cli.NvmeofGatewaysConfig.get_gateways_config',
- return_value={'gateways': {'myservice': []}}), \
patch('dashboard.services.nvmeof_top_cli.get_lbg_gws_map', return_value={}):
collector.collect_io_data()
assert collector.health.rc == -errno.ENOENT
'Failed to retrieve load balancing group mapping for service myservice'
+class TestNvmeofTopValidateArgs:
+ base_args = {
+ 'sort_by': 'Thread Name',
+ 'sort_descending': False,
+ 'with_timestamp': False,
+ 'no_header': False,
+ 'server_address': '',
+ 'server_port': None,
+ 'gw_group': '',
+ 'period': 1,
+ 'session_id': None,
+ }
+
+ def _validate(self, **overrides):
+ args = {**self.base_args, **overrides}
+ return NVMeoFTopCPU(args)._validate_args() # pylint: disable=protected-access
+
+ def test_valid_period(self):
+ assert self._validate(period=5) is None
+
+ def test_invalid_period_too_low(self):
+ rc, msg = self._validate(period=0)
+ assert rc == -errno.EINVAL
+ assert msg == f"Invalid period '0': must be between 1 and {MAX_SESSION_TTL}"
+
+ def test_invalid_period_too_high(self):
+ rc, msg = self._validate(period=999999)
+ assert rc == -errno.EINVAL
+ assert msg == f"Invalid period '999999': must be between 1 and {MAX_SESSION_TTL}"
+
+ def test_valid_server_address(self):
+ assert self._validate(server_address='1.2.3.4') is None
+
+ def test_invalid_server_address(self):
+ rc, msg = self._validate(server_address='not-an-ip')
+ assert rc == -errno.EINVAL
+ assert msg == "Invalid server-address 'not-an-ip': must be a valid IP address"
+
+ def test_valid_server_address_ipv6(self):
+ assert self._validate(server_address='::1') is None
+
+ def test_valid_server_port(self):
+ assert self._validate(server_address='1.2.3.4', server_port=5500) is None
+
+ def test_invalid_server_port_zero(self):
+ rc, msg = self._validate(server_address='1.2.3.4', server_port=0)
+ assert rc == -errno.EINVAL
+ assert msg == "Invalid server-port '0': must be between 1 and 65535"
+
+ def test_invalid_server_port_too_high(self):
+ rc, msg = self._validate(server_address='1.2.3.4', server_port=65536)
+ assert rc == -errno.EINVAL
+ assert msg == "Invalid server-port '65536': must be between 1 and 65535"
+
+ def test_port_without_address_is_valid(self):
+ assert self._validate(server_port=5500) is None
+
+
class TestNvmeofTopCommands(CLICommandTestMixin):
@classmethod
def exec_nvmeof_cmd(cls, cmd, **kwargs):
return cls.exec_cmd('', prefix=cmd, **kwargs)
- def test_top_io_missing_subsystem_returns_einval(self):
+ def test_top_io_missing_nqn_returns_einval(self):
with pytest.raises(CmdException) as exc_info:
- self.exec_nvmeof_cmd('nvmeof top io', subsystem='', session_id='sess1')
+ self.exec_nvmeof_cmd('nvmeof top io', nqn='', session_id='sess1')
assert exc_info.value.retcode == -errno.EINVAL
- assert str(exc_info.value) == "Required argument '--subsystem' missing"
+ assert str(exc_info.value) == "Required argument '--nqn' missing"
def test_top_cpu_success(self):
- with patch('dashboard.services.nvmeof_top_cli.get_collector') as mock_gc:
- mock_gc.return_value = MagicMock()
- with patch.object(NVMeoFTopCPU, 'run', return_value=(0, 'cpu output\n')):
- result = self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1')
- assert 'cpu output' in result
- mock_gc.assert_called_once_with('sess1')
+ with patch.object(NVMeoFTopCPU, 'run', return_value=(0, 'cpu output\n')):
+ result = self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1')
+ assert 'cpu output' in result
def test_top_cpu_run_fail(self):
- with patch('dashboard.services.nvmeof_top_cli.get_collector') as mock_gc:
- mock_gc.return_value = MagicMock()
- with patch.object(NVMeoFTopCPU, 'run',
- return_value=(-errno.ENOENT, 'error')):
- with pytest.raises(CmdException) as exc_info:
- self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1')
- assert exc_info.value.retcode == -errno.ENOENT
- assert str(exc_info.value) == 'error'
+ with patch.object(NVMeoFTopCPU, 'run',
+ return_value=(-errno.ENOENT, 'error')):
+ with pytest.raises(CmdException) as exc_info:
+ self.exec_nvmeof_cmd('nvmeof top cpu', session_id='sess1')
+ assert exc_info.value.retcode == -errno.ENOENT
+ assert str(exc_info.value) == 'error'
def test_top_io_success(self):
- with patch('dashboard.services.nvmeof_top_cli.get_collector') as mock_gc:
- mock_gc.return_value = MagicMock()
- with patch.object(NVMeoFTopIO, 'run', return_value=(0, 'io output\n')):
- result = self.exec_nvmeof_cmd(
- 'nvmeof top io',
- subsystem='nqn.test',
- session_id='sess2'
- )
- assert 'io output' in result
- mock_gc.assert_called_once_with('sess2')
+ with patch.object(NVMeoFTopIO, 'run', return_value=(0, 'io output\n')):
+ result = self.exec_nvmeof_cmd(
+ 'nvmeof top io',
+ nqn='nqn.test',
+ session_id='sess2'
+ )
+ assert 'io output' in result
def test_top_cpu_get_collector_fail(self):
with patch('dashboard.services.nvmeof_top_cli.get_collector',
with pytest.raises(CmdException) as exc_info:
self.exec_nvmeof_cmd(
'nvmeof top io',
- subsystem='nqn.test',
+ nqn='nqn.test',
session_id='sess2'
)
assert exc_info.value.retcode == -errno.EINVAL