From: Tomer Haskalovitch Date: Tue, 23 Dec 2025 21:21:15 +0000 (+0200) Subject: mgr/dashboard: introduce NvmeofCLICommand's success_message_template parameter to... X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=387f8cbb732bff6bc1a95e69d1c6ae02158be834;p=ceph-ci.git mgr/dashboard: introduce NvmeofCLICommand's success_message_template parameter to allow meaningful success messages Signed-off-by: Tomer Haskalovitch try dummy dummy (cherry picked from commit 3c11ca249de77e3a0356602e7dfe8a0c88b5d322) dummy 2 (cherry picked from commit bf14642c48d4a9e150c95d524e3e7679b8dbd9b8) --- diff --git a/src/pybind/mgr/dashboard/controllers/nvmeof.py b/src/pybind/mgr/dashboard/controllers/nvmeof.py index 66d915bb29e..ad190a637e7 100644 --- a/src/pybind/mgr/dashboard/controllers/nvmeof.py +++ b/src/pybind/mgr/dashboard/controllers/nvmeof.py @@ -30,6 +30,93 @@ try: except ImportError as e: logger.error("Failed to import NVMeoFClient and related components: %s", e) else: + def escape_address_if_ipv6(addr: str) -> str: + ret_addr = addr + if ":" in addr and not addr.strip().startswith("["): + ret_addr = f"[{addr}]" + return ret_addr + + def build_listener_del_success_message(args: Dict[str, Any], _) -> str: + traddr = args.get('traddr') + trsvcid = args.get('trsvcid') + subsystem = args.get('nqn') + host_name = args.get('host_name') + + escaped_traddr = escape_address_if_ipv6(traddr) if traddr is not None else '' + host_msg = "for all hosts" if host_name == "*" else f"for host {host_name}" + return ( + f"Deleting listener {escaped_traddr}:{trsvcid} from {subsystem} " + f"{host_msg}: Successful" + ) + + def build_ns_change_visibility_success_message(args: Dict[str, Any], _) -> str: + nsid = args.get('nsid') + subsystem = args.get('nqn') + auto_visible_val = args.get('auto_visible') + + if isinstance(auto_visible_val, str): + auto_visible = auto_visible_val.lower() == "yes" + else: + auto_visible = bool(auto_visible_val) + vis_text = "\"visible to all hosts\"" if auto_visible else "\"visible to selected hosts\"" + return ( + f"Changing visibility of namespace {nsid} in {subsystem} " + f"to {vis_text}: Successful" + ) + + def build_ns_set_auto_resize_success_message(args: Dict[str, Any], _) -> str: + nsid = args.get('nsid') + subsystem = args.get('nqn') + auto_resize_enabled = args.get('auto_resize_enabled') + + auto_resize_text = 'auto resize namespace"' + if not auto_resize_enabled: + auto_resize_text = "do not " + auto_resize_text + auto_resize_text = '"' + auto_resize_text + return ( + f"Setting auto resize flag for namespace {nsid} in " + f"{subsystem} to {auto_resize_text}: Successful" + ) + + def build_ns_set_rbd_trash_image_success_message(args: Dict[str, Any], _) -> str: + nsid = args.get('nsid') + subsystem = args.get('nqn') + rbd_trash_image_on_delete = args.get('rbd_trash_image_on_delete') + + trash_image = str_to_bool(rbd_trash_image_on_delete) + trash_text = 'trash on namespace deletion"' + if not trash_image: + trash_text = "do not " + trash_text + trash_text = '"' + trash_text + return ( + f"Setting RBD trash image flag for namespace {nsid} in " + f"{subsystem} to {trash_text}: Successful" + ) + + def build_host_add_success_message(args: Dict[str, Any], _) -> str: + subsystem = args.get('nqn') + host_nqn_list: List[str] = args.get('host_nqn') or [] + + messages: List[str] = [] + for one_host_nqn in host_nqn_list: + if one_host_nqn == "*": + messages.append(f"Allowing open host access to {subsystem}: Successful") + else: + messages.append(f"Adding host {one_host_nqn} to {subsystem}: Successful") + return "\n".join(messages) + + def build_host_del_success_message(args: Dict[str, Any], _) -> str: + subsystem = args.get('nqn') + host_nqn_list: List[str] = args.get('host_nqn') or [] + + messages: List[str] = [] + for one_host_nqn in host_nqn_list: + if one_host_nqn == "*": + messages.append(f"Disabling open host access to {subsystem}: Successful") + else: + messages.append(f"Removing host {one_host_nqn} access from {subsystem}: Successful") + return "\n".join(messages) + @APIRouter("/nvmeof/gateway", Scope.NVME_OF) @APIDoc("NVMe-oF Gateway Management API", "NVMe-oF Gateway") class NVMeoFGateway(RESTController): @@ -101,7 +188,8 @@ else: @ReadPermission @Endpoint('PUT', '/log_level') @NvmeofCLICommand( - "nvmeof gateway set_log_level", model.RequestStatus, alias="nvmeof gw set_log_level") + "nvmeof gateway set_log_level", model.RequestStatus, alias="nvmeof gw set_log_level", + success_message_template="Set gateway log level to {log_level}: Successful") @EndpointDoc("Set NVMeoF gateway log levels") @convert_to_model(model.RequestStatus) @handle_nvmeof_error @@ -172,7 +260,11 @@ else: @ReadPermission @Endpoint('PUT', '/log_level') - @NvmeofCLICommand("nvmeof spdk_log_level set", model.RequestStatus) + @NvmeofCLICommand( + "nvmeof spdk_log_level set", + model.RequestStatus, + success_message_template="Set SPDK log levels and nvmf log flags: Successful" + ) @EndpointDoc("Set NVMeoF gateway spdk log levels") @convert_to_model(model.RequestStatus) @handle_nvmeof_error @@ -195,7 +287,8 @@ else: @ReadPermission @Endpoint('PUT', '/log_level/disable') - @NvmeofCLICommand("nvmeof spdk_log_level disable", model.RequestStatus) + @NvmeofCLICommand("nvmeof spdk_log_level disable", model.RequestStatus, + success_message_template="Disable SPDK log flags: Successful") @EndpointDoc("Disable NVMeoF gateway spdk log") @convert_to_model(model.RequestStatus) @handle_nvmeof_error @@ -250,7 +343,8 @@ else: ) @empty_response - @NvmeofCLICommand("nvmeof subsystem add", model.RequestStatus) + @NvmeofCLICommand("nvmeof subsystem add", model.SubsystemStatus, + success_message_template="Adding subsystem {nqn}: Successful") @EndpointDoc( "Create a new NVMeoF subsystem", parameters={ @@ -283,7 +377,8 @@ else: ) @empty_response - @NvmeofCLICommand("nvmeof subsystem del", model.RequestStatus) + @NvmeofCLICommand("nvmeof subsystem del", model.RequestStatus, + success_message_template="Deleting subsystem {nqn}: Successful") @EndpointDoc( "Delete an existing NVMeoF subsystem", parameters={ @@ -316,7 +411,8 @@ else: }, ) @empty_response - @NvmeofCLICommand("nvmeof subsystem change_key", model.RequestStatus) + @NvmeofCLICommand("nvmeof subsystem change_key", model.RequestStatus, + success_message_template="Changing key for subsystem {nqn}: Successful") @convert_to_model(model.RequestStatus) @handle_nvmeof_error def change_key(self, nqn: str, dhchap_key: str, gw_group: Optional[str] = None, @@ -339,7 +435,8 @@ else: }, ) @empty_response - @NvmeofCLICommand("nvmeof subsystem del_key", model.RequestStatus) + @NvmeofCLICommand("nvmeof subsystem del_key", model.RequestStatus, + success_message_template="Deleting key for subsystem {nqn}: Successful") @convert_to_model(model.RequestStatus) @handle_nvmeof_error def del_key(self, nqn: str, gw_group: Optional[str] = None, @@ -393,7 +490,11 @@ else: ) @empty_response - @NvmeofCLICommand("nvmeof listener add", model.RequestStatus) + @NvmeofCLICommand( + "nvmeof listener add", + model.RequestStatus, + success_message_template="Adding {nqn} listener at {traddr}:{trsvcid}: Successful" + ) @EndpointDoc( "Create a new NVMeoF listener", parameters={ @@ -433,7 +534,8 @@ else: ) @empty_response - @NvmeofCLICommand("nvmeof listener del", model.RequestStatus) + @NvmeofCLICommand("nvmeof listener del", model.RequestStatus, + success_message_fn=build_listener_del_success_message) @EndpointDoc( "Delete an existing NVMeoF listener", parameters={ @@ -622,7 +724,10 @@ else: ) @NvmeofCLICommand( - "nvmeof namespace add", model.NamespaceCreation, alias="nvmeof ns add" + "nvmeof namespace add", + model.NamespaceCreation, + alias="nvmeof ns add", + success_message_template="Adding namespace {nsid} to {nqn}: Successful" ) @EndpointDoc( "Create a new NVMeoF namespace.", @@ -708,7 +813,8 @@ else: @ReadPermission @Endpoint('PUT', '{nsid}/set_qos') @NvmeofCLICommand( - "nvmeof namespace set_qos", model=model.RequestStatus, alias="nvmeof ns set_qos") + "nvmeof namespace set_qos", model=model.RequestStatus, alias="nvmeof ns set_qos", + success_message_template="Setting QOS limits of namespace {nsid} in {nqn}: Successful") @EndpointDoc( "set QOS for specified NVMeoF namespace", parameters={ @@ -758,8 +864,11 @@ else: @ReadPermission @Endpoint('PUT', '{nsid}/change_load_balancing_group') @NvmeofCLICommand( - "nvmeof namespace change_load_balancing_group", model=model.RequestStatus, - alias="nvmeof ns change_load_balancing_group" + "nvmeof namespace change_load_balancing_group", + model=model.RequestStatus, + alias="nvmeof ns change_load_balancing_group", + success_message_template=("Changing load balancing group of namespace {nsid} " + "in {nqn} to {load_balancing_group}: Successful") ) @EndpointDoc( "set the load balancing group for specified NVMeoF namespace", @@ -824,8 +933,13 @@ else: ) ) - @NvmeofCLICommand("nvmeof namespace resize", model=model.RequestStatus, - alias="nvmeof ns resize") + @NvmeofCLICommand( + "nvmeof namespace resize", + model=model.RequestStatus, + alias="nvmeof ns resize", + success_message_template=("Resizing namespace {nsid} in {nqn} " + "to {rbd_image_size}: Successful") + ) @EndpointDoc( "resize the specified NVMeoF namespace", parameters={ @@ -863,7 +977,11 @@ else: @ReadPermission @Endpoint('PUT', '{nsid}/add_host') @NvmeofCLICommand( - "nvmeof namespace add_host", model=model.RequestStatus, alias="nvmeof ns add_host" + "nvmeof namespace add_host", + model=model.RequestStatus, + alias="nvmeof ns add_host", + success_message_template=("Adding host {host_nqn} to " + "namespace {nsid} on {nqn}: Successful") ) @EndpointDoc( "Adds a host to the specified NVMeoF namespace", @@ -904,7 +1022,11 @@ else: @ReadPermission @Endpoint('PUT', '{nsid}/del_host') @NvmeofCLICommand( - "nvmeof namespace del_host", model=model.RequestStatus, alias="nvmeof ns del_host" + "nvmeof namespace del_host", + model=model.RequestStatus, + alias="nvmeof ns del_host", + success_message_template=("Deleting host {host_nqn} from " + "namespace {nsid} on {nqn}: Successful") ) @EndpointDoc( "Removes a host from the specified NVMeoF namespace", @@ -941,7 +1063,8 @@ else: @Endpoint('PUT', '{nsid}/change_visibility') @NvmeofCLICommand( "nvmeof namespace change_visibility", model=model.RequestStatus, - alias="nvmeof ns change_visibility" + alias="nvmeof ns change_visibility", + success_message_fn=build_ns_change_visibility_success_message ) @EndpointDoc( "changes the visibility of the specified NVMeoF namespace to all or selected hosts", @@ -981,7 +1104,8 @@ else: @Endpoint('PUT', '{nsid}/set_auto_resize') @NvmeofCLICommand( "nvmeof namespace set_auto_resize", model=model.RequestStatus, - alias="nvmeof ns set_auto_resize" + alias="nvmeof ns set_auto_resize", + success_message_fn=build_ns_set_auto_resize_success_message ) @EndpointDoc( "Enable or disable namespace auto resize when RBD image is resized", @@ -1022,7 +1146,8 @@ else: @Endpoint('PUT', '{nsid}/set_rbd_trash_image') @NvmeofCLICommand( "nvmeof namespace set_rbd_trash_image", model=model.RequestStatus, - alias="nvmeof ns set_rbd_trash_image" + alias="nvmeof ns set_rbd_trash_image", + success_message_fn=build_ns_set_rbd_trash_image_success_message ) @EndpointDoc( "changes the trash image on delete of the specified NVMeoF \ @@ -1060,7 +1185,8 @@ else: @Endpoint('PUT', '{nsid}/refresh_size') @NvmeofCLICommand( "nvmeof namespace refresh_size", model=model.RequestStatus, - alias="nvmeof ns refresh_size" + alias="nvmeof ns refresh_size", + success_message_template="Refreshing size for namespace {nsid} in {nqn}: Successful" ) @EndpointDoc( "refresh the specified NVMeoF namespace to current RBD image size", @@ -1191,7 +1317,11 @@ else: return response @empty_response - @NvmeofCLICommand("nvmeof namespace del", model.RequestStatus, alias="nvmeof ns del") + @NvmeofCLICommand( + "nvmeof namespace del", + model.RequestStatus, + alias="nvmeof ns del", + success_message_template="Deleting namespace {nsid} from {nqn}: Successful") @EndpointDoc( "Delete an existing NVMeoF namespace", parameters={ @@ -1258,7 +1388,8 @@ else: ) @empty_response - @NvmeofCLICommand("nvmeof host add", model.RequestStatus) + @NvmeofCLICommand("nvmeof host add", model.RequestStatus, + success_message_fn=build_host_add_success_message) @EndpointDoc( "Allow hosts to access an NVMeoF subsystem", parameters={ @@ -1306,7 +1437,12 @@ else: ) @empty_response - @NvmeofCLICommand("nvmeof host change_key", model.RequestStatus) + @NvmeofCLICommand( + "nvmeof host change_key", + model.RequestStatus, + success_message_template=("Changing key for host {host_nqn} " + "on subsystem {nqn}: Successful") + ) @EndpointDoc( "Change host DH-HMAC-CHAP key", parameters={ @@ -1333,7 +1469,12 @@ else: ) @empty_response - @NvmeofCLICommand("nvmeof host del_key", model.RequestStatus) + @NvmeofCLICommand( + "nvmeof host del_key", + model.RequestStatus, + success_message_template=("Deleting key for host {host_nqn} " + "on subsystem {nqn}: Successful") + ) @EndpointDoc( "Delete host DH-HMAC-CHAP key", parameters={ diff --git a/src/pybind/mgr/dashboard/services/nvmeof_cli.py b/src/pybind/mgr/dashboard/services/nvmeof_cli.py index 011eb58359d..dce57b104ef 100644 --- a/src/pybind/mgr/dashboard/services/nvmeof_cli.py +++ b/src/pybind/mgr/dashboard/services/nvmeof_cli.py @@ -1,12 +1,14 @@ # -*- coding: utf-8 -*- -import errno -import json from abc import ABC, abstractmethod from enum import Enum -from typing import Annotated, Any, Dict, List, NamedTuple, Optional, Type, \ +import errno +import inspect +import json +import logging +from typing import Annotated, Any, Callable, Dict, List, NamedTuple, Optional, Type, \ Union, get_args, get_origin, get_type_hints - import yaml + from mgr_module import CLICheckNonemptyFileInput, CLICommand, CLIReadCommand, \ CLIWriteCommand, HandleCommandResult, HandlerFuncType from prettytable import PrettyTable @@ -16,6 +18,8 @@ from ..rest_client import RequestException from .nvmeof_conf import ManagedByOrchestratorException, \ NvmeofGatewayAlreadyExists, NvmeofGatewaysConfig +logger = logging.getLogger(__name__) + @CLIReadCommand('dashboard nvmeof-gateway-list') def list_nvmeof_gateways(_): @@ -248,38 +252,131 @@ class AnnotatedDataTextOutputFormatter(OutputFormatter): class NvmeofCLICommand(CLICommand): desc: str - def __init__(self, prefix, model: Type[NamedTuple], alias=None, perm='rw', poll=False): + def __init__(self, + prefix, + model: Type[NamedTuple], + alias: Optional[str] = None, + perm: str = 'rw', + poll: bool = False, + success_message_template: Optional[str] = None, + success_message_fn: Optional[Callable[[Dict[str, Any]], str]] = None + ): super().__init__(prefix, perm, poll) self._output_formatter = AnnotatedDataTextOutputFormatter() self._model = model self._alias = alias self._alias_cmd: Optional[NvmeofCLICommand] = None + self._success_message_template = success_message_template + self._success_message_fn = success_message_fn + self._func_defaults: Dict[str, Any] = {} + def _use_api_endpoint_desc_if_available(self, func): if not self.desc and hasattr(func, 'doc_info'): self.desc = func.doc_info.get('summary', '') def __call__(self, func) -> HandlerFuncType: # type: ignore + resp = super().__call__(func) + self._func_defaults = self._compute_func_defaults() if self._alias: - self._alias_cmd = NvmeofCLICommand(self._alias, model=self._model) + self._alias_cmd = NvmeofCLICommand( + self._alias, + model=self._model, + success_message_template=self._success_message_template, + success_message_fn=self._success_message_fn + ) assert self._alias_cmd is not None self._alias_cmd(func) + self._alias_cmd._func_defaults = self._alias_cmd._compute_func_defaults() - resp = super().__call__(func) self._use_api_endpoint_desc_if_available(func) return resp + + def _compute_func_defaults(self) -> Dict[str, Any]: + defaults: Dict[str, Any] = {} + sig = inspect.signature(self.func) + + for name, param in sig.parameters.items(): + if name in CLICommand.KNOWN_ARGS: + continue + if name not in self.arg_spec: + continue + if param.default is not inspect.Parameter.empty: + defaults[name] = param.default + + return defaults + + def _args_map_from_argspec(self, + cmd_dict: Dict[str, Any], + inbuf: Optional[str] = None) -> Dict[str, Any]: + kwargs, specials = self._collect_args_by_argspec(cmd_dict) + if inbuf and 'inbuf' in specials: + kwargs['inbuf'] = inbuf + + + return {**self._func_defaults, **kwargs} + + + def _stringify(self, value: Any) -> str: + if isinstance(value, (bytes, bytearray)): + try: + return value.decode('utf-8', errors='replace') + except Exception: + return str(value) + + if isinstance(value, (list, tuple)): + try: + return ','.join(self._stringify(v) for v in value) + except Exception: + return str(value) + + return str(value) + + def _format_success_message_from_args(self, + args_map: Dict[str, Any], + response: Dict[str, Any]) -> Optional[str]: + if not self._success_message_template and not self._success_message_fn: + return None + + if self._success_message_fn: + try: + msg = self._success_message_fn(args_map, response) + if msg: + return msg + except Exception: + logger.warning("Success message function failed for %s", self.prefix, exc_info=True) + + if self._success_message_template: + try: + fields_dict = {**args_map, **response} + str_map = {k: self._stringify(v) for k, v in fields_dict.items()} + return self._success_message_template.format(**str_map) + except Exception: + logger.warning("Success message template failed for %s", self.prefix, exc_info=True) + + return None + def call(self, mgr: Any, cmd_dict: Dict[str, Any], inbuf: Optional[str] = None) -> HandleCommandResult: try: - ret = super().call(mgr, cmd_dict, inbuf) out_format = cmd_dict.get('format') - if ret is None: - out = '' + args_map = self._args_map_from_argspec(cmd_dict, inbuf) + ret = super().call(mgr, cmd_dict, inbuf) if out_format == 'plain' or not out_format: - out = self._output_formatter.format_output(ret, self._model) + message: Optional[str] = None + try: + message = self._format_success_message_from_args(args_map, ret) + except Exception: + logger.warning("Formatting of success message failed for %s", + self.prefix, exc_info=True) + if message: + out = message + else: + out = self._output_formatter.format_output(ret, self._model) + elif out_format == 'json': out = json.dumps(ret, indent=4) elif out_format == 'yaml': @@ -287,6 +384,8 @@ class NvmeofCLICommand(CLICommand): else: return HandleCommandResult(-errno.EINVAL, '', f"format '{out_format}' is not implemented") + return HandleCommandResult(0, out, '') + except Exception as e: # pylint: disable=broad-except return HandleCommandResult(-errno.EINVAL, '', str(e)) diff --git a/src/pybind/mgr/dashboard/tests/test_nvmeof_cli.py b/src/pybind/mgr/dashboard/tests/test_nvmeof_cli.py index 0a673d8da15..10621f912e8 100644 --- a/src/pybind/mgr/dashboard/tests/test_nvmeof_cli.py +++ b/src/pybind/mgr/dashboard/tests/test_nvmeof_cli.py @@ -1,7 +1,7 @@ import errno import json import unittest -from typing import Annotated, List, NamedTuple +from typing import Annotated, List, NamedTuple, Optional from unittest.mock import MagicMock import pytest @@ -209,6 +209,311 @@ class TestNvmeofCLICommand: assert test_alias not in NvmeofCLICommand.COMMANDS +class TestNvmeofCLICommandSuccessMessage: + + def test_plain_output_uses_success_message_template(self): + test_cmd = "nvmeof set_log_level" + + class Model(NamedTuple): + status: str + + @NvmeofCLICommand( + test_cmd, + Model, + success_message_template="set log level to {log_level}" + ) + def set_log_level(self, log_level: str, gw_group: Optional[str] = None, traddr: Optional[str] = None): # noqa + return {"status": 0} + + result_default = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"log_level": "info"} + ) + assert isinstance(result_default, HandleCommandResult) + assert result_default.retval == 0 + assert result_default.stdout == "set log level to info" + assert result_default.stderr == '' + + result_plain = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"format": "plain", "log_level": "info"} + ) + assert isinstance(result_plain, HandleCommandResult) + assert result_plain.retval == 0 + assert result_plain.stdout == "set log level to info" + assert result_plain.stderr == '' + + del NvmeofCLICommand.COMMANDS[test_cmd] + assert test_cmd not in NvmeofCLICommand.COMMANDS + + def test_plain_output_falls_back_when_template_unresolvable(self): + test_cmd = "nvmeof gateway set_log_level_fallback" + + class Model(NamedTuple): + a: str + + @NvmeofCLICommand( + test_cmd, + Model, + success_message_template="set log level to {log_level}" + ) + def set_log_level(self, a: str): # noqa + return {"a": "b"} + + result_plain = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"format": "plain"} + ) + assert isinstance(result_plain, HandleCommandResult) + assert result_plain.retval == 0 + assert result_plain.stdout == ( + "+-+\n" + "|A|\n" + "+-+\n" + "|b|\n" + "+-+" + ) + assert result_plain.stderr == '' + + del NvmeofCLICommand.COMMANDS[test_cmd] + assert test_cmd not in NvmeofCLICommand.COMMANDS + + def test_default_output_falls_back_when_template_unresolvable(self): + test_cmd = "nvmeof gateway set_log_level_fallback_default" + + class Model(NamedTuple): + a: str + + @NvmeofCLICommand( + test_cmd, + Model, + success_message_template="set log level to {log_level}" + ) + def set_log_level(self, a: str): # noqa + return {"a": "b"} + + result_default = NvmeofCLICommand.COMMANDS[test_cmd].call(MagicMock(), {}) + assert isinstance(result_default, HandleCommandResult) + assert result_default.retval == 0 + assert result_default.stdout == ( + "+-+\n" + "|A|\n" + "+-+\n" + "|b|\n" + "+-+" + ) + assert result_default.stderr == '' + + del NvmeofCLICommand.COMMANDS[test_cmd] + assert test_cmd not in NvmeofCLICommand.COMMANDS + + def test_alias_inherits_success_message_template(self): + test_cmd = "nvmeof gateway set_log_level_main" + test_alias = "nvmeof gw set_log_level_alias" + + class Model(NamedTuple): + status: str + + @NvmeofCLICommand( + test_cmd, + Model, + alias=test_alias, + success_message_template="set log level to {log_level}" + ) + def set_log_level(self, log_level: str): # noqa + return {"status": 0} + + result_main = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"format": "plain", "log_level": "debug"} + ) + assert result_main.retval == 0 + assert result_main.stdout == "set log level to debug" + assert result_main.stderr == '' + + result_alias = NvmeofCLICommand.COMMANDS[test_alias].call( + MagicMock(), + {"format": "plain", "log_level": "warn"} + ) + assert result_alias.retval == 0 + assert result_alias.stdout == "set log level to warn" + assert result_alias.stderr == '' + + del NvmeofCLICommand.COMMANDS[test_cmd] + del NvmeofCLICommand.COMMANDS[test_alias] + assert test_cmd not in NvmeofCLICommand.COMMANDS + assert test_alias not in NvmeofCLICommand.COMMANDS + + def test_plain_uses_success_message_fn(self): + test_cmd = "nvmeof gw set_log_level fn" + + class Model(NamedTuple): + status: str + + @NvmeofCLICommand( + test_cmd, + Model, + success_message_fn=lambda args, response: ( + f"set log level to {args.get('log_level', '')}" + + (" for all hosts" if args.get('all_hosts') else "") + ) + ) + def fn(self, log_level: str, all_hosts: bool = False): # noqa + return {"status": 0} + + res = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"format": "plain", "log_level": "info", "all_hosts": True} + ) + assert res.retval == 0 + assert res.stdout == "set log level to info for all hosts" + assert res.stderr == '' + + del NvmeofCLICommand.COMMANDS[test_cmd] + assert test_cmd not in NvmeofCLICommand.COMMANDS + + def test_template_formats_int_and_list_without_failure(self): + class Model(NamedTuple): + status: str + + @NvmeofCLICommand( + "nvmeof mixed params", + Model, + success_message_template="ns {nsid} hosts {host_nqn}" + ) + def fn(self, nsid: int, host_nqn: list[str]): # noqa + return {"status": 1} + + res = NvmeofCLICommand.COMMANDS["nvmeof mixed params"].call( + MagicMock(), + {"format": "plain", "nsid": 42, "host_nqn": ["a", "b"]} + ) + assert res.retval == 0 + assert res.stdout == "ns 42 hosts a,b" + + del NvmeofCLICommand.COMMANDS["nvmeof mixed params"] + assert "nvmeof mixed params" not in NvmeofCLICommand.COMMANDS + + def test_success_message_uses_default_when_cli_omits_param(self): + class Model(NamedTuple): + status: str + + def create(mgr, nqn: str, host_name: str, traddr: str, + trsvcid: int = 4420, adrfam: int = 0, gw_group: Optional[str] = None): + return dict(status=1) + + cmd = NvmeofCLICommand( + "nvmeof listener add", + model=Model, + success_message_template="Adding {nqn} listener at {traddr}:{trsvcid}: Successful" + ) + cmd(create) + + cmd_dict = { + "nqn": "nqn.2014-08.org.nvmexpress:uuid:1234", + "host_name": "nvme-host-1", + "traddr": "10.0.0.5", + # 'trsvcid' omitted + # 'adrfam' omitted + } + + result = cmd.call(mgr=None, cmd_dict=cmd_dict, inbuf=None) + assert result.retval == 0 + assert result.stderr == "" + assert result.stdout == ( + "Adding nqn.2014-08.org.nvmexpress:uuid:1234 listener at 10.0.0.5:4420: Successful" + ) + + def test_success_message_cli_value_overrides_default(self): + class Model(NamedTuple): + status: str + + def create(mgr, nqn: str, host_name: str, traddr: str, + trsvcid: int = 4420, adrfam: int = 0, gw_group: Optional[str] = None): + return dict(status=1) + + cmd = NvmeofCLICommand( + "nvmeof listener add", + model=Model, + success_message_template="Adding {nqn} listener at {traddr}:{trsvcid}: Successful" + ) + cmd(create) + + cmd_dict = { + "nqn": "nqn.2014-08.org.nvmexpress:uuid:abcd", + "host_name": "nvme-host-2", + "traddr": "192.168.1.10", + "trsvcid": 8009, # override default 4420 + } + + result = cmd.call(mgr=None, cmd_dict=cmd_dict, inbuf=None) + assert result.retval == 0 + assert result.stderr == "" + assert result.stdout == ( + "Adding nqn.2014-08.org.nvmexpress:uuid:abcd listener at 192.168.1.10:8009: Successful" + ) + + def test_defaults_allow_none_and_template_does_not_crash(self): + class Model(NamedTuple): + status: str + + def create_with_none( + mgr, + nqn: str, + traddr: str, + trsvcid: int = 4420, + gw_group: Optional[str] = None, # None default intentionally used + ): + return dict(status=1) + + cmd = NvmeofCLICommand( + "nvmeof listener add", + model=Model, + success_message_template="Adding {nqn} listener at {traddr}:{trsvcid} gw={gw_group}: Successful", + ) + cmd(create_with_none) + + cmd_dict = { + "nqn": "nqn.none.test", + "traddr": "127.0.0.1", + # 'gw_group' omitted; None default should be injected + } + + result = cmd.call(mgr=None, cmd_dict=cmd_dict, inbuf=None) + assert result.retval == 0 + assert result.stderr == "" + assert result.stdout == ( + "Adding nqn.none.test listener at 127.0.0.1:4420 gw=None: Successful" + ) + + def test_template_can_use_response_fields(self): + test_cmd = "nvmeof show op status" + + class Model(NamedTuple): + status: str + message: str + + @NvmeofCLICommand( + test_cmd, + Model, + success_message_template="operation {op} finished with status {message}" + ) + def op(self, op: str): + return {"status": 1, "message": "done"} + + res = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"format": "plain", "op": "rebuild"} + ) + assert res.retval == 0 + assert res.stdout == "operation rebuild finished with status done" + assert res.stderr == '' + + del NvmeofCLICommand.COMMANDS[test_cmd] + assert test_cmd not in NvmeofCLICommand.COMMANDS + + + class TestNVMeoFConfCLI(unittest.TestCase, CLICommandTestMixin): def setUp(self): self.mock_kv_store()