# -*- coding: utf-8 -*-
# pylint: disable=too-many-lines
import logging
+from functools import partial
from typing import Any, Dict, List, Optional
import cherrypy
from ..exceptions import DashboardException
from ..model import nvmeof as model
from ..security import Scope
-from ..services.nvmeof_cli import NvmeofCLICommand, convert_to_bytes
+from ..services.nvmeof_cli import NvmeofCLICommand, convert_to_bytes, \
+ escape_address_if_ipv6, format_host_updates
from ..services.orchestrator import OrchClient
from ..tools import str_to_bool
from . import APIDoc, APIRouter, BaseController, CreatePermission, \
logger = logging.getLogger(__name__)
-
NVME_SCHEMA = {
"available": (bool, "Is NVMe/TCP available?"),
"message": (str, "Descriptions")
@ReadPermission
@Endpoint('PUT', '/log_level')
@NvmeofCLICommand(
- "nvmeof gateway set_log_level", model.RequestStatus, alias="nvmeof gw set_log_level")
+ "nvmeof gateway set_log_level", model.RequestStatus, alias="nvmeof gw set_log_level",
+ success_message_template="Set gateway log level to {log_level}: Successful")
@EndpointDoc("Set NVMeoF gateway log levels")
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
@ReadPermission
@Endpoint('PUT', '/log_level')
- @NvmeofCLICommand("nvmeof spdk_log_level set", model.RequestStatus)
+ @NvmeofCLICommand(
+ "nvmeof spdk_log_level set",
+ model.RequestStatus,
+ success_message_template="Set SPDK log levels and nvmf log flags: Successful"
+ )
@EndpointDoc("Set NVMeoF gateway spdk log levels")
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
@ReadPermission
@Endpoint('PUT', '/log_level/disable')
- @NvmeofCLICommand("nvmeof spdk_log_level disable", model.RequestStatus)
+ @NvmeofCLICommand("nvmeof spdk_log_level disable", model.RequestStatus,
+ success_message_template="Disable SPDK log flags: Successful")
@EndpointDoc("Disable NVMeoF gateway spdk log")
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
)
@empty_response
- @NvmeofCLICommand("nvmeof subsystem add", model.RequestStatus)
+ @NvmeofCLICommand("nvmeof subsystem add", model.SubsystemStatus,
+ success_message_template="Adding subsystem {nqn}: Successful")
@EndpointDoc(
"Create a new NVMeoF subsystem",
parameters={
)
@empty_response
- @NvmeofCLICommand("nvmeof subsystem del", model.RequestStatus)
+ @NvmeofCLICommand("nvmeof subsystem del", model.RequestStatus,
+ success_message_template="Deleting subsystem {nqn}: Successful")
@EndpointDoc(
"Delete an existing NVMeoF subsystem",
parameters={
},
)
@empty_response
- @NvmeofCLICommand("nvmeof subsystem change_key", model.RequestStatus)
+ @NvmeofCLICommand("nvmeof subsystem change_key", model.RequestStatus,
+ success_message_template="Changing key for subsystem {nqn}: Successful")
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
def change_key(self, nqn: str, dhchap_key: str, gw_group: Optional[str] = None,
},
)
@empty_response
- @NvmeofCLICommand("nvmeof subsystem del_key", model.RequestStatus)
+ @NvmeofCLICommand("nvmeof subsystem del_key", model.RequestStatus,
+ success_message_template="Deleting key for subsystem {nqn}: Successful")
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
def del_key(self, nqn: str, gw_group: Optional[str] = None,
)
@empty_response
- @NvmeofCLICommand("nvmeof listener add", model.RequestStatus)
+ @NvmeofCLICommand(
+ "nvmeof listener add",
+ model.RequestStatus,
+ success_message_template="Adding {nqn} listener at {traddr}:{trsvcid}: Successful"
+ )
@EndpointDoc(
"Create a new NVMeoF listener",
parameters={
)
@empty_response
- @NvmeofCLICommand("nvmeof listener del", model.RequestStatus)
+ @NvmeofCLICommand(
+ "nvmeof listener del",
+ model.RequestStatus,
+ success_message_template=(
+ "Deleting listener {traddr}:{trsvcid} from {nqn} {host_msg}: Successful"
+ ),
+ success_message_map={
+ "traddr": lambda v, _f: escape_address_if_ipv6(v) if v is not None else "",
+ "host_msg": lambda _v, f: (
+ "for all hosts" if f.get("host_name") == "*"
+ else f"for host {f.get('host_name')}"
+ ),
+ }
+ )
@EndpointDoc(
"Delete an existing NVMeoF listener",
parameters={
)
@NvmeofCLICommand(
- "nvmeof namespace add", model.NamespaceCreation, alias="nvmeof ns add"
+ "nvmeof namespace add",
+ model.NamespaceCreation,
+ alias="nvmeof ns add",
+ success_message_template="Adding namespace {nsid} to {nqn}: Successful"
)
@EndpointDoc(
"Create a new NVMeoF namespace.",
@ReadPermission
@Endpoint('PUT', '{nsid}/set_qos')
@NvmeofCLICommand(
- "nvmeof namespace set_qos", model=model.RequestStatus, alias="nvmeof ns set_qos")
+ "nvmeof namespace set_qos", model=model.RequestStatus, alias="nvmeof ns set_qos",
+ success_message_template="Setting QOS limits of namespace {nsid} in {nqn}: Successful")
@EndpointDoc(
"set QOS for specified NVMeoF namespace",
parameters={
@ReadPermission
@Endpoint('PUT', '{nsid}/change_load_balancing_group')
@NvmeofCLICommand(
- "nvmeof namespace change_load_balancing_group", model=model.RequestStatus,
- alias="nvmeof ns change_load_balancing_group"
+ "nvmeof namespace change_load_balancing_group",
+ model=model.RequestStatus,
+ alias="nvmeof ns change_load_balancing_group",
+ success_message_template=("Changing load balancing group of namespace {nsid} "
+ "in {nqn} to {load_balancing_group}: Successful")
)
@EndpointDoc(
"set the load balancing group for specified NVMeoF namespace",
)
)
- @NvmeofCLICommand("nvmeof namespace resize", model=model.RequestStatus,
- alias="nvmeof ns resize")
+ @NvmeofCLICommand(
+ "nvmeof namespace resize",
+ model=model.RequestStatus,
+ alias="nvmeof ns resize",
+ success_message_template=("Resizing namespace {nsid} in {nqn} "
+ "to {rbd_image_size}: Successful")
+ )
@EndpointDoc(
"resize the specified NVMeoF namespace",
parameters={
@ReadPermission
@Endpoint('PUT', '{nsid}/add_host')
@NvmeofCLICommand(
- "nvmeof namespace add_host", model=model.RequestStatus, alias="nvmeof ns add_host"
+ "nvmeof namespace add_host",
+ model=model.RequestStatus,
+ alias="nvmeof ns add_host",
+ success_message_template=("Adding host {host_nqn} to "
+ "namespace {nsid} on {nqn}: Successful")
)
@EndpointDoc(
"Adds a host to the specified NVMeoF namespace",
@ReadPermission
@Endpoint('PUT', '{nsid}/del_host')
@NvmeofCLICommand(
- "nvmeof namespace del_host", model=model.RequestStatus, alias="nvmeof ns del_host"
+ "nvmeof namespace del_host",
+ model=model.RequestStatus,
+ alias="nvmeof ns del_host",
+ success_message_template=("Deleting host {host_nqn} from "
+ "namespace {nsid} on {nqn}: Successful")
)
@EndpointDoc(
"Removes a host from the specified NVMeoF namespace",
@ReadPermission
@Endpoint('PUT', '{nsid}/change_visibility')
@NvmeofCLICommand(
- "nvmeof namespace change_visibility", model=model.RequestStatus,
- alias="nvmeof ns change_visibility"
+ "nvmeof namespace change_visibility",
+ model=model.RequestStatus,
+ alias="nvmeof ns change_visibility",
+ success_message_template=(
+ 'Changing visibility of namespace {nsid} in {nqn} to "{auto_visible}": Successful'
+ ),
+ success_message_map={
+ "auto_visible": lambda v, _f: (
+ "visible to all hosts" if str_to_bool(v)
+ else "visible to selected hosts"
+ )
+ }
)
@EndpointDoc(
"changes the visibility of the specified NVMeoF namespace to all or selected hosts",
@ReadPermission
@Endpoint('PUT', '{nsid}/set_auto_resize')
@NvmeofCLICommand(
- "nvmeof namespace set_auto_resize", model=model.RequestStatus,
- alias="nvmeof ns set_auto_resize"
+ "nvmeof namespace set_auto_resize",
+ model=model.RequestStatus,
+ alias="nvmeof ns set_auto_resize",
+ success_message_template=(
+ 'Setting auto resize flag for namespace {nsid} '
+ 'in {nqn} to "{auto_resize_text}": Successful'
+ ),
+ success_message_map={
+ "auto_resize_text": lambda _v, f: (
+ "auto resize namespace" if str_to_bool(f.get("auto_resize_enabled"))
+ else "do not auto resize namespace"
+ )
+ }
)
@EndpointDoc(
"Enable or disable namespace auto resize when RBD image is resized",
@ReadPermission
@Endpoint('PUT', '{nsid}/set_rbd_trash_image')
@NvmeofCLICommand(
- "nvmeof namespace set_rbd_trash_image", model=model.RequestStatus,
- alias="nvmeof ns set_rbd_trash_image"
+ "nvmeof namespace set_rbd_trash_image",
+ model=model.RequestStatus,
+ alias="nvmeof ns set_rbd_trash_image",
+ success_message_template=(
+ 'Setting RBD trash image flag for namespace {nsid} '
+ 'in {nqn} to "{trash_text}": Successful'
+ ),
+ success_message_map={
+ "trash_text": lambda _v, f: (
+ "trash on namespace deletion" if str_to_bool(f.get("rbd_trash_image_on_delete"))
+ else "do not trash on namespace deletion"
+ )
+ }
)
@EndpointDoc(
"changes the trash image on delete of the specified NVMeoF \
@Endpoint('PUT', '{nsid}/refresh_size')
@NvmeofCLICommand(
"nvmeof namespace refresh_size", model=model.RequestStatus,
- alias="nvmeof ns refresh_size"
+ alias="nvmeof ns refresh_size",
+ success_message_template="Refreshing size for namespace {nsid} in {nqn}: Successful"
)
@EndpointDoc(
"refresh the specified NVMeoF namespace to current RBD image size",
return response
@empty_response
- @NvmeofCLICommand("nvmeof namespace del", model.RequestStatus, alias="nvmeof ns del")
+ @NvmeofCLICommand(
+ "nvmeof namespace del",
+ model.RequestStatus,
+ alias="nvmeof ns del",
+ success_message_template="Deleting namespace {nsid} from {nqn}: Successful")
@EndpointDoc(
"Delete an existing NVMeoF namespace",
parameters={
)
@empty_response
- @NvmeofCLICommand("nvmeof host add", model.RequestStatus)
+ @NvmeofCLICommand(
+ "nvmeof host add",
+ model.RequestStatus,
+ success_message_fn=partial(
+ format_host_updates,
+ template_wildcard="Allowing open host access to {nqn}: Successful",
+ template_item="Adding host {host_nqn} to {nqn}: Successful",
+ ),
+ )
@EndpointDoc(
"Allow hosts to access an NVMeoF subsystem",
parameters={
)
@empty_response
- @NvmeofCLICommand("nvmeof host del", model.RequestStatus)
+ @NvmeofCLICommand(
+ "nvmeof host del",
+ model.RequestStatus,
+ success_message_fn=partial(
+ format_host_updates,
+ template_wildcard="Disabling open host access to {nqn}: Successful",
+ template_item="Removing host {host_nqn} access from {nqn}: Successful",
+ ),
+ )
@EndpointDoc(
"Disallow hosts from accessing an NVMeoF subsystem",
parameters={
@Endpoint('PUT', '{host_nqn}/change_key')
@UpdatePermission
@empty_response
- @NvmeofCLICommand("nvmeof host change_key", model.RequestStatus)
+ @NvmeofCLICommand(
+ "nvmeof host change_key",
+ model.RequestStatus,
+ success_message_template=("Changing key for host {host_nqn} "
+ "on subsystem {nqn}: Successful")
+ )
@EndpointDoc(
"Change host DH-HMAC-CHAP key",
parameters={
)
@empty_response
- @NvmeofCLICommand("nvmeof host del_key", model.RequestStatus)
+ @NvmeofCLICommand(
+ "nvmeof host del_key",
+ model.RequestStatus,
+ success_message_template=("Deleting key for host {host_nqn} "
+ "on subsystem {nqn}: Successful")
+ )
@EndpointDoc(
"Delete host DH-HMAC-CHAP key",
parameters={
# -*- coding: utf-8 -*-
import errno
+import inspect
import json
+import logging
from abc import ABC, abstractmethod
from enum import Enum
-from typing import Annotated, Any, Dict, List, NamedTuple, Optional, Type, \
- Union, get_args, get_origin, get_type_hints
+from typing import Annotated, Any, Callable, Dict, List, Mapping, NamedTuple, \
+ Optional, Type, Union, get_args, get_origin, get_type_hints
import yaml
-from mgr_module import CLICheckNonemptyFileInput, HandleCommandResult, HandlerFuncType
+from mgr_module import CLICheckNonemptyFileInput, HandleCommandResult
from prettytable import PrettyTable
from ..cli import DBCLICommand
from .nvmeof_conf import ManagedByOrchestratorException, \
NvmeofGatewayAlreadyExists, NvmeofGatewaysConfig
+logger = logging.getLogger(__name__)
+
@DBCLICommand.Read('dashboard nvmeof-gateway-list')
def list_nvmeof_gateways(_):
}
+def escape_address_if_ipv6(addr: str) -> str:
+ ret_addr = addr
+ if ":" in addr and not addr.strip().startswith("["):
+ ret_addr = f"[{addr}]"
+ return ret_addr
+
+
+def _normalize_to_list(value: Any) -> List[Any]:
+ if value is None:
+ return []
+ if isinstance(value, list):
+ return value
+ return [value]
+
+
+def format_host_updates(args: Dict[str, Any],
+ template_wildcard: str,
+ template_item: str,
+ host_arg: str = 'host_nqn',
+ nqn_arg: str = 'nqn',
+ wildcard_value: str = '*') -> str:
+ nqn = args.get(nqn_arg)
+ hosts = _normalize_to_list(args.get(host_arg))
+
+ messages: List[str] = []
+ for host in hosts:
+ if host == wildcard_value:
+ messages.append(template_wildcard.format(nqn=nqn))
+ else:
+ messages.append(template_item.format(nqn=nqn, host_nqn=host))
+ return "\n".join(messages)
+
+
def convert_to_bytes(size: Union[int, str], default_unit=None):
if isinstance(size, int):
number = size
class NvmeofCLICommand(DBCLICommand):
desc: str
- def __init__(self, prefix, model: Type[NamedTuple], alias=None, perm='rw', poll=False):
+ def __init__(self,
+ prefix,
+ model: Type[NamedTuple],
+ alias: Optional[str] = None,
+ perm: str = 'rw',
+ poll: bool = False,
+ success_message_template: Optional[str] = None,
+ success_message_map: Optional[Dict[str, Any]] = None,
+ success_message_fn: Optional[Callable[[Dict[str, Any]], str]] = None):
super().__init__(prefix, perm, poll)
self._output_formatter = AnnotatedDataTextOutputFormatter()
self._model = model
self._alias = alias
self._alias_cmd: Optional[NvmeofCLICommand] = None
- def _use_api_endpoint_desc_if_available(self, func):
- if not self.desc and hasattr(func, 'doc_info'):
- self.desc = func.doc_info.get('summary', '')
+ self._success_message_template = success_message_template
+ self._success_message_map = success_message_map or {}
+ self._success_message_fn = success_message_fn
+ self._func_defaults: Dict[str, Any] = {}
+
+ def __call__(self, func):
+ resp = super().__call__(func)
+ self._func_defaults = self._compute_func_defaults()
- def __call__(self, func) -> HandlerFuncType: # type: ignore
if self._alias:
- self._alias_cmd = NvmeofCLICommand(self._alias, model=self._model)
- assert self._alias_cmd is not None
+ self._alias_cmd = NvmeofCLICommand(
+ self._alias,
+ model=self._model,
+ success_message_template=self._success_message_template,
+ success_message_map=self._success_message_map,
+ success_message_fn=self._success_message_fn,
+ )
self._alias_cmd(func)
+ self._alias_cmd._func_defaults = self._alias_cmd._compute_func_defaults()
- resp = super().__call__(func)
self._use_api_endpoint_desc_if_available(func)
return resp
+ def _use_api_endpoint_desc_if_available(self, func):
+ if not self.desc and hasattr(func, 'doc_info'):
+ self.desc = func.doc_info.get('summary', '')
+
+ def _compute_func_defaults(self) -> Dict[str, Any]:
+ defaults: Dict[str, Any] = {}
+ sig = inspect.signature(self.func)
+
+ for name, param in sig.parameters.items():
+ if name in DBCLICommand.KNOWN_ARGS:
+ continue
+ if name not in self.arg_spec:
+ continue
+ if param.default is not inspect.Parameter.empty:
+ defaults[name] = param.default
+
+ return defaults
+
+ def _stringify(self, value: Any) -> str:
+ if isinstance(value, (bytes, bytearray)):
+ try:
+ return value.decode('utf-8', errors='replace')
+ except Exception:
+ return str(value)
+
+ if isinstance(value, (list, tuple)):
+ try:
+ return ','.join(self._stringify(v) for v in value)
+ except Exception:
+ return str(value)
+
+ return str(value)
+
+ def _args_map_from_argspec(self,
+ cmd_dict: Dict[str, Any],
+ inbuf: Optional[str] = None) -> Dict[str, Any]:
+ kwargs, specials = self._collect_args_by_argspec(cmd_dict)
+ kwargs = kwargs or {}
+ specials = specials or {}
+
+ if inbuf and 'inbuf' in specials:
+ kwargs['inbuf'] = inbuf
+
+ return {**self._func_defaults, **kwargs}
+
+ def _apply_single_map_spec(
+ self,
+ spec: Union[str, dict, Callable],
+ raw: Any,
+ fields: Dict[str, Any],
+ ) -> Any:
+ """
+ spec can be:
+ - dict mapping exact raw values to literal/callable
+ - callable(value, fields)
+ - literal value (e.g. string)
+ """
+ if callable(spec):
+ return spec(raw, fields)
+
+ if isinstance(spec, dict):
+ if raw in spec:
+ val = spec[raw]
+ return val(raw, fields) if callable(val) else val
+ return raw
+ return spec
+
+ def _apply_success_message_map(self, fields: Dict[str, Any]) -> Dict[str, Any]:
+ out = dict(fields)
+ for field, spec in self._success_message_map.items():
+ raw = out.get(field)
+ try:
+ out[field] = self._apply_single_map_spec(spec, raw, out)
+ except Exception:
+ logger.warning("Failed applying success_message_map for field %s on %s",
+ field, self.prefix, exc_info=True)
+ return out
+
+ def get_success_msg(self, args_map: Dict[str, Any], response: Any) -> Optional[str]:
+ merged_fields: Dict[str, Any]
+
+ if isinstance(response, Mapping):
+ merged_fields = {**args_map, **dict(response)}
+ else:
+ merged_fields = dict(args_map)
+
+ if self._success_message_fn:
+ try:
+ return self._success_message_fn(merged_fields)
+ except Exception:
+ logger.warning("success_message_fn failed for %s", self.prefix, exc_info=True)
+ return None
+
+ return self._format_success_message_from_args(args_map, response)
+
+ def _format_success_message_from_args(self,
+ args_map: Dict[str, Any],
+ response: Any) -> Optional[str]:
+ if not self._success_message_template:
+ return None
+
+ resp_map: Dict[str, Any]
+ if isinstance(response, Mapping):
+ resp_map = dict(response)
+ else:
+ resp_map = {}
+
+ try:
+ fields_dict = {**args_map, **resp_map}
+ fields_dict = self._apply_success_message_map(fields_dict)
+ str_map = {k: self._stringify(v) for k, v in fields_dict.items()}
+ return self._success_message_template.format(**str_map)
+ except Exception:
+ logger.warning("Success message template failed for %s", self.prefix, exc_info=True)
+ return None
+
def call(self,
mgr: Any,
cmd_dict: Dict[str, Any],
inbuf: Optional[str] = None) -> HandleCommandResult:
try:
- ret = super().call(mgr, cmd_dict, inbuf)
out_format = cmd_dict.get('format')
+ args_map = self._args_map_from_argspec(cmd_dict, inbuf)
+ ret = super().call(mgr, cmd_dict, inbuf)
if ret is None:
- out = ''
+ ret = {}
+
if out_format == 'plain' or not out_format:
- out = self._output_formatter.format_output(ret, self._model)
+ message: Optional[str] = None
+ try:
+ message = self.get_success_msg(args_map, ret)
+ except Exception:
+ logger.warning("Formatting of success message failed for %s",
+ self.prefix, exc_info=True)
+
+ if message:
+ out = message
+ else:
+ out = self._output_formatter.format_output(ret, self._model)
+
elif out_format == 'json':
out = json.dumps(ret, indent=4)
elif out_format == 'yaml':
else:
return HandleCommandResult(-errno.EINVAL, '',
f"format '{out_format}' is not implemented")
+
return HandleCommandResult(0, out, '')
+
except Exception as e: # pylint: disable=broad-except
return HandleCommandResult(-errno.EINVAL, '', str(e))
import errno
import json
-import logging
import unittest
-from typing import Annotated, List, NamedTuple
+from typing import Annotated, List, NamedTuple, Optional
from unittest.mock import MagicMock
import pytest
-from mgr_module import CLICommandBase, HandleCommandResult
+from mgr_module import HandleCommandResult
+from ..cli import DBCLICommand
from ..controllers import EndpointDoc
from ..model.nvmeof import CliFieldTransformer, CliFlags, CliHeader
from ..services.nvmeof_cli import AnnotatedDataTextOutputFormatter, \
- NvmeofCLICommand, convert_from_bytes, convert_to_bytes
+ NvmeofCLICommand, convert_from_bytes, convert_to_bytes, \
+ format_host_updates
from ..tests import CLICommandTestMixin
mock_result = {'a': 'b'}
super_mock = MagicMock()
super_mock.return_value = mock_result
- monkeypatch.setattr(CLICommandBase, 'call', super_mock)
+ monkeypatch.setattr(DBCLICommand, 'call', super_mock)
return super_mock
mock_result = None
super_mock = MagicMock()
super_mock.return_value = mock_result
- monkeypatch.setattr(CLICommandBase, 'call', super_mock)
+ monkeypatch.setattr(DBCLICommand, 'call', super_mock)
return super_mock
def test_command_return_cmd_result_default_format(self, base_call_mock, sample_command):
result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {})
- logging.getLogger().error(result)
assert isinstance(result, HandleCommandResult)
assert result.retval == 0
assert result.stdout == (
assert test_alias not in NvmeofCLICommand.COMMANDS
+class TestNvmeofCLICommandSuccessMessage:
+
+ def test_plain_output_uses_success_message_template(self):
+ test_cmd = "nvmeof set_log_level"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="set log level to {log_level}"
+ )
+ def set_log_level(self, log_level: str, gw_group: Optional[str] = None, traddr: Optional[str] = None): # noqa
+ return {"status": 0}
+
+ result_default = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"log_level": "info"}
+ )
+ assert isinstance(result_default, HandleCommandResult)
+ assert result_default.retval == 0
+ assert result_default.stdout == "set log level to info"
+ assert result_default.stderr == ''
+
+ result_plain = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "log_level": "info"}
+ )
+ assert isinstance(result_plain, HandleCommandResult)
+ assert result_plain.retval == 0
+ assert result_plain.stdout == "set log level to info"
+ assert result_plain.stderr == ''
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_plain_output_falls_back_when_template_unresolvable(self):
+ test_cmd = "nvmeof gateway set_log_level_fallback"
+
+ class Model(NamedTuple):
+ a: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="set log level to {log_level}"
+ )
+ def set_log_level(self, a: str): # noqa
+ return {"a": "b"}
+
+ result_plain = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain"}
+ )
+ assert isinstance(result_plain, HandleCommandResult)
+ assert result_plain.retval == 0
+ assert result_plain.stdout == (
+ "+-+\n"
+ "|A|\n"
+ "+-+\n"
+ "|b|\n"
+ "+-+"
+ )
+ assert result_plain.stderr == ''
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_default_output_falls_back_when_template_unresolvable(self):
+ test_cmd = "nvmeof gateway set_log_level_fallback_default"
+
+ class Model(NamedTuple):
+ a: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="set log level to {log_level}"
+ )
+ def set_log_level(self, a: str): # noqa
+ return {"a": "b"}
+
+ result_default = NvmeofCLICommand.COMMANDS[test_cmd].call(MagicMock(), {})
+ assert isinstance(result_default, HandleCommandResult)
+ assert result_default.retval == 0
+ assert result_default.stdout == (
+ "+-+\n"
+ "|A|\n"
+ "+-+\n"
+ "|b|\n"
+ "+-+"
+ )
+ assert result_default.stderr == ''
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_alias_inherits_success_message_template(self):
+ test_cmd = "nvmeof gateway set_log_level_main"
+ test_alias = "nvmeof gw set_log_level_alias"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ alias=test_alias,
+ success_message_template="set log level to {log_level}"
+ )
+ def set_log_level(self, log_level: str): # noqa
+ return {"status": 0}
+
+ result_main = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "log_level": "debug"}
+ )
+ assert result_main.retval == 0
+ assert result_main.stdout == "set log level to debug"
+ assert result_main.stderr == ''
+
+ result_alias = NvmeofCLICommand.COMMANDS[test_alias].call(
+ MagicMock(),
+ {"format": "plain", "log_level": "warn"}
+ )
+ assert result_alias.retval == 0
+ assert result_alias.stdout == "set log level to warn"
+ assert result_alias.stderr == ''
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ del NvmeofCLICommand.COMMANDS[test_alias]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+ assert test_alias not in NvmeofCLICommand.COMMANDS
+
+ def test_plain_uses_success_message_map_callable(self):
+ test_cmd = "nvmeof gw set_log_level map callable"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="set log level to {log_level}{suffix}",
+ success_message_map={
+ "suffix": lambda _v, f: " for all hosts" if f.get("all_hosts") else ""
+ }
+ )
+ def fn(self, log_level: str, all_hosts: bool = False): # noqa
+ return {"status": 0}
+
+ res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "log_level": "info", "all_hosts": True}
+ )
+ assert res.retval == 0
+ assert res.stdout == "set log level to info for all hosts"
+ assert res.stderr == ''
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_success_message_map_dict_maps_exact_values(self):
+ test_cmd = "nvmeof ns change_visibility map dict"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template='visibility "{auto_visible}": Successful',
+ success_message_map={
+ "auto_visible": {
+ True: "visible to all hosts",
+ False: "visible to selected hosts",
+ }
+ }
+ )
+ def fn(self, auto_visible: bool): # noqa
+ return {"status": 0}
+
+ res_true = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "auto_visible": True}
+ )
+ assert res_true.retval == 0
+ assert res_true.stdout == 'visibility "visible to all hosts": Successful'
+
+ res_false = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "auto_visible": False}
+ )
+ assert res_false.retval == 0
+ assert res_false.stdout == 'visibility "visible to selected hosts": Successful'
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_success_message_map_dict_missing_key_leaves_raw_value(self):
+ test_cmd = "nvmeof map dict missing key"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="val {x}",
+ success_message_map={
+ "x": {1: "one"} # no mapping for 2
+ }
+ )
+ def fn(self, x: int): # noqa
+ return {"status": 0}
+
+ res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "x": 2}
+ )
+ assert res.retval == 0
+ assert res.stdout == "val 2"
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_success_message_map_dict_value_callable(self):
+ test_cmd = "nvmeof map dict value callable"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="host {host_name}",
+ success_message_map={
+ "host_name": {
+ "*": "for all hosts",
+ "h1": (lambda v, _f: f"for host {v}"),
+ }
+ }
+ )
+ def fn(self, host_name: str): # noqa
+ return {"status": 0}
+
+ res_star = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "host_name": "*"}
+ )
+ assert res_star.stdout == "host for all hosts"
+
+ res_h1 = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "host_name": "h1"}
+ )
+ assert res_h1.stdout == "host for host h1"
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_success_message_map_supports_derived_fields(self):
+ test_cmd = "nvmeof map derived field"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="msg {derived}",
+ success_message_map={
+ "derived": lambda _v, f: f"nqn={f.get('nqn')}",
+ }
+ )
+ def fn(self, nqn: str): # noqa
+ return {"status": 0}
+
+ res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "nqn": "subsys1"}
+ )
+ assert res.retval == 0
+ assert res.stdout == "msg nqn=subsys1"
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_alias_inherits_success_message_map(self):
+ test_cmd = "nvmeof map alias main"
+ test_alias = "nvmeof map alias alias"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ alias=test_alias,
+ success_message_template="lvl {log_level}{suffix}",
+ success_message_map={
+ "suffix": lambda _v, f: "!" if f.get("urgent") else "."
+ }
+ )
+ def fn(self, log_level: str, urgent: bool = False): # noqa
+ return {"status": 0}
+
+ res_main = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "log_level": "debug", "urgent": True}
+ )
+ assert isinstance(res_main, HandleCommandResult)
+ assert res_main.retval == 0
+ assert res_main.stdout == "lvl debug!"
+ assert res_main.stderr == ''
+
+ res_alias = NvmeofCLICommand.COMMANDS[test_alias].call(
+ MagicMock(),
+ {"format": "plain", "log_level": "warn", "urgent": False}
+ )
+ assert isinstance(res_alias, HandleCommandResult)
+ assert res_alias.retval == 0
+ assert res_alias.stdout == "lvl warn."
+ assert res_alias.stderr == ''
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ del NvmeofCLICommand.COMMANDS[test_alias]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+ assert test_alias not in NvmeofCLICommand.COMMANDS
+
+ def test_map_failure_does_not_break_template_rendering(self):
+ test_cmd = "nvmeof map failure fallback"
+
+ class Model(NamedTuple):
+ a: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="value {a}",
+ success_message_map={
+ "a": lambda _v, _f: 1 / 0, # force exception
+ }
+ )
+ def fn(self, a: str): # noqa
+ return {"a": "b"}
+
+ res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "a": "ignored"}
+ )
+ assert res.retval == 0
+ assert res.stdout == "value b"
+ assert res.stderr == ''
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_template_formats_int_and_list_without_failure(self):
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ "nvmeof mixed params",
+ Model,
+ success_message_template="ns {nsid} hosts {host_nqn}"
+ )
+ def fn(self, nsid: int, host_nqn: list[str]): # noqa
+ return {"status": 1}
+
+ res = NvmeofCLICommand.COMMANDS["nvmeof mixed params"].call(
+ MagicMock(),
+ {"format": "plain", "nsid": 42, "host_nqn": ["a", "b"]}
+ )
+ assert res.retval == 0
+ assert res.stdout == "ns 42 hosts a,b"
+
+ del NvmeofCLICommand.COMMANDS["nvmeof mixed params"]
+ assert "nvmeof mixed params" not in NvmeofCLICommand.COMMANDS
+
+ def test_success_message_uses_default_when_cli_omits_param(self):
+ class Model(NamedTuple):
+ status: str
+
+ def create(mgr, nqn: str, host_name: str, traddr: str,
+ trsvcid: int = 4420, adrfam: int = 0, gw_group: Optional[str] = None):
+ return dict(status=1)
+
+ cmd = NvmeofCLICommand(
+ "nvmeof listener add",
+ model=Model,
+ success_message_template="Adding {nqn} listener at {traddr}:{trsvcid}: Successful"
+ )
+ cmd(create)
+
+ cmd_dict = {
+ "nqn": "nqn.2014-08.org.nvmexpress:uuid:1234",
+ "host_name": "nvme-host-1",
+ "traddr": "10.0.0.5",
+ # 'trsvcid' omitted
+ # 'adrfam' omitted
+ }
+
+ result = cmd.call(mgr=None, cmd_dict=cmd_dict, inbuf=None)
+ assert result.retval == 0
+ assert result.stderr == ""
+ assert result.stdout == (
+ "Adding nqn.2014-08.org.nvmexpress:uuid:1234 listener at 10.0.0.5:4420: Successful"
+ )
+
+ def test_success_message_cli_value_overrides_default(self):
+ class Model(NamedTuple):
+ status: str
+
+ def create(mgr, nqn: str, host_name: str, traddr: str,
+ trsvcid: int = 4420, adrfam: int = 0, gw_group: Optional[str] = None):
+ return dict(status=1)
+
+ cmd = NvmeofCLICommand(
+ "nvmeof listener add",
+ model=Model,
+ success_message_template="Adding {nqn} listener at {traddr}:{trsvcid}: Successful"
+ )
+ cmd(create)
+
+ cmd_dict = {
+ "nqn": "nqn.2014-08.org.nvmexpress:uuid:abcd",
+ "host_name": "nvme-host-2",
+ "traddr": "192.168.1.10",
+ "trsvcid": 8009, # override default 4420
+ }
+
+ result = cmd.call(mgr=None, cmd_dict=cmd_dict, inbuf=None)
+ assert result.retval == 0
+ assert result.stderr == ""
+ assert result.stdout == (
+ "Adding nqn.2014-08.org.nvmexpress:uuid:abcd listener at 192.168.1.10:8009: Successful"
+ )
+
+ def test_defaults_allow_none_and_template_does_not_crash(self):
+ class Model(NamedTuple):
+ status: str
+
+ def create_with_none(
+ mgr,
+ nqn: str,
+ traddr: str,
+ trsvcid: int = 4420,
+ gw_group: Optional[str] = None, # None default intentionally used
+ ):
+ return dict(status=1)
+
+ cmd = NvmeofCLICommand(
+ "nvmeof listener add",
+ model=Model,
+ success_message_template=(
+ "Adding {nqn} listener at {traddr}:{trsvcid} "
+ "gw={gw_group}: Successful"
+ ),
+ )
+ cmd(create_with_none)
+
+ cmd_dict = {
+ "nqn": "nqn.none.test",
+ "traddr": "127.0.0.1",
+ # 'gw_group' omitted; None default should be injected
+ }
+
+ result = cmd.call(mgr=None, cmd_dict=cmd_dict, inbuf=None)
+ assert result.retval == 0
+ assert result.stderr == ""
+ assert result.stdout == (
+ "Adding nqn.none.test listener at 127.0.0.1:4420 gw=None: Successful"
+ )
+
+ def test_template_can_use_response_fields(self):
+ test_cmd = "nvmeof show op status"
+
+ class Model(NamedTuple):
+ status: str
+ message: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="operation {op} finished with status {message}"
+ )
+ def op(self, op: str):
+ return {"status": 1, "message": "done"}
+
+ res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "op": "rebuild"}
+ )
+ assert res.retval == 0
+ assert res.stdout == "operation rebuild finished with status done"
+ assert res.stderr == ''
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_success_message_fn_overrides_template(self):
+ test_cmd = "nvmeof success fn overrides template"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="TEMPLATE {nqn}",
+ success_message_fn=lambda f: f"FN {f.get('nqn')}"
+ )
+ def fn(self, nqn: str):
+ return {"status": 0}
+
+ res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "nqn": "subsysA"}
+ )
+ assert res.retval == 0
+ assert res.stderr == ''
+ assert res.stdout == "FN subsysA"
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_success_message_fn_can_use_response_fields(self):
+ test_cmd = "nvmeof success fn response fields"
+
+ class Model(NamedTuple):
+ status: str
+ message: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="TEMPLATE {message}",
+ success_message_fn=lambda f: f"FN message={f.get('message')}"
+ )
+ def fn(self, op: str): # noqa: ARG001
+ return {"status": 0, "message": "done"}
+
+ res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "op": "ignored"}
+ )
+ assert res.retval == 0
+ assert res.stderr == ''
+ assert res.stdout == "FN message=done"
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_success_message_fn_failure_falls_back_to_default_formatting(self):
+ test_cmd = "nvmeof success fn failure fallback"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="TEMPLATE {nqn}",
+ success_message_fn=lambda _f: 1 / 0 # force exception
+ )
+ def fn(self, nqn: str):
+ return {"status": 0}
+
+ res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "nqn": "subsysB"}
+ )
+ assert res.retval == 0
+ assert res.stderr == ""
+ # since success_message_fn raises and get_success_msg returns None,
+ # code falls back to formatted output of the response
+ assert res.stdout == (
+ "++\n"
+ "||\n"
+ "++\n"
+ "\n"
+ "++"
+ )
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+
class TestNVMeoFConfCLI(unittest.TestCase, CLICommandTestMixin):
def setUp(self):
self.mock_kv_store()
with pytest.raises(ValueError):
assert convert_to_bytes('5') == 5368709120
assert convert_to_bytes('5', default_unit='GB') == 5368709120
+
+
+class TestFormatHostUpdates:
+ def test_single_wildcard(self):
+ args = {"nqn": "subsys1", "host_nqn": "*"}
+ out = format_host_updates(
+ args,
+ template_wildcard="Allowing open host access to {nqn}: Successful",
+ template_item="Adding host {host_nqn} to {nqn}: Successful",
+ )
+ assert out == "Allowing open host access to subsys1: Successful"
+
+ def test_single_host(self):
+ args = {"nqn": "subsys1", "host_nqn": "hostA"}
+ out = format_host_updates(
+ args,
+ template_wildcard="Allowing open host access to {nqn}: Successful",
+ template_item="Adding host {host_nqn} to {nqn}: Successful",
+ )
+ assert out == "Adding host hostA to subsys1: Successful"
+
+ def test_multiple_hosts_mixed_including_wildcard(self):
+ args = {"nqn": "subsys1", "host_nqn": ["hostA", "*", "hostB"]}
+ out = format_host_updates(
+ args,
+ template_wildcard="Disabling open host access to {nqn}: Successful",
+ template_item="Removing host {host_nqn} access from {nqn}: Successful",
+ )
+ assert out == (
+ "Removing host hostA access from subsys1: Successful\n"
+ "Disabling open host access to subsys1: Successful\n"
+ "Removing host hostB access from subsys1: Successful"
+ )
+
+ def test_none_host_arg_returns_empty_string(self):
+ args = {"nqn": "subsys1", "host_nqn": None}
+ out = format_host_updates(
+ args,
+ template_wildcard="Allowing open host access to {nqn}: Successful",
+ template_item="Adding host {host_nqn} to {nqn}: Successful",
+ )
+ assert out == ""
+
+ def test_missing_host_arg_returns_empty_string(self):
+ args = {"nqn": "subsys1"} # host_nqn omitted
+ out = format_host_updates(
+ args,
+ template_wildcard="Allowing open host access to {nqn}: Successful",
+ template_item="Adding host {host_nqn} to {nqn}: Successful",
+ )
+ assert out == ""
+
+ def test_missing_nqn_renders_as_none(self):
+ args = {"host_nqn": "*"} # nqn omitted
+ out = format_host_updates(
+ args,
+ template_wildcard="Allowing open host access to {nqn}: Successful",
+ template_item="Adding host {host_nqn} to {nqn}: Successful",
+ )
+ assert out == "Allowing open host access to None: Successful"
+
+ def test_custom_arg_names(self):
+ args = {"subsystem": "nqn.test", "host": ["h1", "*"]}
+ out = format_host_updates(
+ args,
+ nqn_arg="subsystem",
+ host_arg="host",
+ template_wildcard="W {nqn}",
+ template_item="H {host_nqn} {nqn}",
+ )
+ assert out == "H h1 nqn.test\nW nqn.test"