]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/dashboard: introduce NvmeofCLICommand's success_message_template and success_mess... wip-tomer-responses-centos9-only
authorTomer Haskalovitch <tomer.haska@ibm.com>
Tue, 23 Dec 2025 21:21:15 +0000 (23:21 +0200)
committerTomer Haskalovitch <tomer.haska@ibm.com>
Wed, 25 Feb 2026 10:44:22 +0000 (12:44 +0200)
Signed-off-by: Tomer Haskalovitch <tomer.haska@ibm.com>
pr fixes

isort

src/pybind/mgr/dashboard/controllers/nvmeof.py
src/pybind/mgr/dashboard/services/nvmeof_cli.py
src/pybind/mgr/dashboard/tests/test_nvmeof_cli.py

index 893560486ef0825254e26158dad6d35384bff66b..7655e989477b48fb0f55950a016e30566bd8d6ee 100644 (file)
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # pylint: disable=too-many-lines
 import logging
+from functools import partial
 from typing import Any, Dict, List, Optional
 
 import cherrypy
@@ -10,7 +11,8 @@ from .. import mgr
 from ..exceptions import DashboardException
 from ..model import nvmeof as model
 from ..security import Scope
-from ..services.nvmeof_cli import NvmeofCLICommand, convert_to_bytes
+from ..services.nvmeof_cli import NvmeofCLICommand, convert_to_bytes, \
+    escape_address_if_ipv6, format_host_updates
 from ..services.orchestrator import OrchClient
 from ..tools import str_to_bool
 from . import APIDoc, APIRouter, BaseController, CreatePermission, \
@@ -19,7 +21,6 @@ from . import APIDoc, APIRouter, BaseController, CreatePermission, \
 
 logger = logging.getLogger(__name__)
 
-
 NVME_SCHEMA = {
     "available": (bool, "Is NVMe/TCP available?"),
     "message": (str, "Descriptions")
@@ -102,7 +103,8 @@ else:
         @ReadPermission
         @Endpoint('PUT', '/log_level')
         @NvmeofCLICommand(
-            "nvmeof gateway set_log_level", model.RequestStatus, alias="nvmeof gw set_log_level")
+            "nvmeof gateway set_log_level", model.RequestStatus, alias="nvmeof gw set_log_level",
+            success_message_template="Set gateway log level to {log_level}: Successful")
         @EndpointDoc("Set NVMeoF gateway log levels")
         @convert_to_model(model.RequestStatus)
         @handle_nvmeof_error
@@ -173,7 +175,11 @@ else:
 
         @ReadPermission
         @Endpoint('PUT', '/log_level')
-        @NvmeofCLICommand("nvmeof spdk_log_level set", model.RequestStatus)
+        @NvmeofCLICommand(
+            "nvmeof spdk_log_level set",
+            model.RequestStatus,
+            success_message_template="Set SPDK log levels and nvmf log flags: Successful"
+        )
         @EndpointDoc("Set NVMeoF gateway spdk log levels")
         @convert_to_model(model.RequestStatus)
         @handle_nvmeof_error
@@ -196,7 +202,8 @@ else:
 
         @ReadPermission
         @Endpoint('PUT', '/log_level/disable')
-        @NvmeofCLICommand("nvmeof spdk_log_level disable", model.RequestStatus)
+        @NvmeofCLICommand("nvmeof spdk_log_level disable", model.RequestStatus,
+                          success_message_template="Disable SPDK log flags: Successful")
         @EndpointDoc("Disable NVMeoF gateway spdk log")
         @convert_to_model(model.RequestStatus)
         @handle_nvmeof_error
@@ -251,7 +258,8 @@ else:
             )
 
         @empty_response
-        @NvmeofCLICommand("nvmeof subsystem add", model.RequestStatus)
+        @NvmeofCLICommand("nvmeof subsystem add", model.SubsystemStatus,
+                          success_message_template="Adding subsystem {nqn}: Successful")
         @EndpointDoc(
             "Create a new NVMeoF subsystem",
             parameters={
@@ -285,7 +293,8 @@ else:
             )
 
         @empty_response
-        @NvmeofCLICommand("nvmeof subsystem del", model.RequestStatus)
+        @NvmeofCLICommand("nvmeof subsystem del", model.RequestStatus,
+                          success_message_template="Deleting subsystem {nqn}: Successful")
         @EndpointDoc(
             "Delete an existing NVMeoF subsystem",
             parameters={
@@ -318,7 +327,8 @@ else:
             },
         )
         @empty_response
-        @NvmeofCLICommand("nvmeof subsystem change_key", model.RequestStatus)
+        @NvmeofCLICommand("nvmeof subsystem change_key", model.RequestStatus,
+                          success_message_template="Changing key for subsystem {nqn}: Successful")
         @convert_to_model(model.RequestStatus)
         @handle_nvmeof_error
         def change_key(self, nqn: str, dhchap_key: str, gw_group: Optional[str] = None,
@@ -342,7 +352,8 @@ else:
             },
         )
         @empty_response
-        @NvmeofCLICommand("nvmeof subsystem del_key", model.RequestStatus)
+        @NvmeofCLICommand("nvmeof subsystem del_key", model.RequestStatus,
+                          success_message_template="Deleting key for subsystem {nqn}: Successful")
         @convert_to_model(model.RequestStatus)
         @handle_nvmeof_error
         def del_key(self, nqn: str, gw_group: Optional[str] = None,
@@ -396,7 +407,11 @@ else:
             )
 
         @empty_response
-        @NvmeofCLICommand("nvmeof listener add", model.RequestStatus)
+        @NvmeofCLICommand(
+            "nvmeof listener add",
+            model.RequestStatus,
+            success_message_template="Adding {nqn} listener at {traddr}:{trsvcid}: Successful"
+        )
         @EndpointDoc(
             "Create a new NVMeoF listener",
             parameters={
@@ -436,7 +451,20 @@ else:
             )
 
         @empty_response
-        @NvmeofCLICommand("nvmeof listener del", model.RequestStatus)
+        @NvmeofCLICommand(
+            "nvmeof listener del",
+            model.RequestStatus,
+            success_message_template=(
+                "Deleting listener {traddr}:{trsvcid} from {nqn} {host_msg}: Successful"
+            ),
+            success_message_map={
+                "traddr": lambda v, _f: escape_address_if_ipv6(v) if v is not None else "",
+                "host_msg": lambda _v, f: (
+                    "for all hosts" if f.get("host_name") == "*"
+                    else f"for host {f.get('host_name')}"
+                ),
+            }
+        )
         @EndpointDoc(
             "Delete an existing NVMeoF listener",
             parameters={
@@ -628,7 +656,10 @@ else:
             )
 
         @NvmeofCLICommand(
-            "nvmeof namespace add", model.NamespaceCreation, alias="nvmeof ns add"
+            "nvmeof namespace add",
+            model.NamespaceCreation,
+            alias="nvmeof ns add",
+            success_message_template="Adding namespace {nsid} to {nqn}: Successful"
         )
         @EndpointDoc(
             "Create a new NVMeoF namespace.",
@@ -718,7 +749,8 @@ else:
         @ReadPermission
         @Endpoint('PUT', '{nsid}/set_qos')
         @NvmeofCLICommand(
-            "nvmeof namespace set_qos", model=model.RequestStatus, alias="nvmeof ns set_qos")
+            "nvmeof namespace set_qos", model=model.RequestStatus, alias="nvmeof ns set_qos",
+            success_message_template="Setting QOS limits of namespace {nsid} in {nqn}: Successful")
         @EndpointDoc(
             "set QOS for specified NVMeoF namespace",
             parameters={
@@ -768,8 +800,11 @@ else:
         @ReadPermission
         @Endpoint('PUT', '{nsid}/change_load_balancing_group')
         @NvmeofCLICommand(
-            "nvmeof namespace change_load_balancing_group", model=model.RequestStatus,
-            alias="nvmeof ns change_load_balancing_group"
+            "nvmeof namespace change_load_balancing_group",
+            model=model.RequestStatus,
+            alias="nvmeof ns change_load_balancing_group",
+            success_message_template=("Changing load balancing group of namespace {nsid} "
+                                      "in {nqn} to {load_balancing_group}: Successful")
         )
         @EndpointDoc(
             "set the load balancing group for specified NVMeoF namespace",
@@ -834,8 +869,13 @@ else:
                 )
             )
 
-        @NvmeofCLICommand("nvmeof namespace resize", model=model.RequestStatus,
-                          alias="nvmeof ns resize")
+        @NvmeofCLICommand(
+            "nvmeof namespace resize",
+            model=model.RequestStatus,
+            alias="nvmeof ns resize",
+            success_message_template=("Resizing namespace {nsid} in {nqn} "
+                                      "to {rbd_image_size}: Successful")
+        )
         @EndpointDoc(
             "resize the specified NVMeoF namespace",
             parameters={
@@ -873,7 +913,11 @@ else:
         @ReadPermission
         @Endpoint('PUT', '{nsid}/add_host')
         @NvmeofCLICommand(
-            "nvmeof namespace add_host", model=model.RequestStatus, alias="nvmeof ns add_host"
+            "nvmeof namespace add_host",
+            model=model.RequestStatus,
+            alias="nvmeof ns add_host",
+            success_message_template=("Adding host {host_nqn} to "
+                                      "namespace {nsid} on {nqn}: Successful")
         )
         @EndpointDoc(
             "Adds a host to the specified NVMeoF namespace",
@@ -914,7 +958,11 @@ else:
         @ReadPermission
         @Endpoint('PUT', '{nsid}/del_host')
         @NvmeofCLICommand(
-            "nvmeof namespace del_host", model=model.RequestStatus, alias="nvmeof ns del_host"
+            "nvmeof namespace del_host",
+            model=model.RequestStatus,
+            alias="nvmeof ns del_host",
+            success_message_template=("Deleting host {host_nqn} from "
+                                      "namespace {nsid} on {nqn}: Successful")
         )
         @EndpointDoc(
             "Removes a host from the specified NVMeoF namespace",
@@ -950,8 +998,18 @@ else:
         @ReadPermission
         @Endpoint('PUT', '{nsid}/change_visibility')
         @NvmeofCLICommand(
-            "nvmeof namespace change_visibility", model=model.RequestStatus,
-            alias="nvmeof ns change_visibility"
+            "nvmeof namespace change_visibility",
+            model=model.RequestStatus,
+            alias="nvmeof ns change_visibility",
+            success_message_template=(
+                'Changing visibility of namespace {nsid} in {nqn} to "{auto_visible}": Successful'
+            ),
+            success_message_map={
+                "auto_visible": lambda v, _f: (
+                    "visible to all hosts" if str_to_bool(v)
+                    else "visible to selected hosts"
+                )
+            }
         )
         @EndpointDoc(
             "changes the visibility of the specified NVMeoF namespace to all or selected hosts",
@@ -990,8 +1048,19 @@ else:
         @ReadPermission
         @Endpoint('PUT', '{nsid}/set_auto_resize')
         @NvmeofCLICommand(
-            "nvmeof namespace set_auto_resize", model=model.RequestStatus,
-            alias="nvmeof ns set_auto_resize"
+            "nvmeof namespace set_auto_resize",
+            model=model.RequestStatus,
+            alias="nvmeof ns set_auto_resize",
+            success_message_template=(
+                'Setting auto resize flag for namespace {nsid} '
+                'in {nqn} to "{auto_resize_text}": Successful'
+            ),
+            success_message_map={
+                "auto_resize_text": lambda _v, f: (
+                    "auto resize namespace" if str_to_bool(f.get("auto_resize_enabled"))
+                    else "do not auto resize namespace"
+                )
+            }
         )
         @EndpointDoc(
             "Enable or disable namespace auto resize when RBD image is resized",
@@ -1031,8 +1100,19 @@ else:
         @ReadPermission
         @Endpoint('PUT', '{nsid}/set_rbd_trash_image')
         @NvmeofCLICommand(
-            "nvmeof namespace set_rbd_trash_image", model=model.RequestStatus,
-            alias="nvmeof ns set_rbd_trash_image"
+            "nvmeof namespace set_rbd_trash_image",
+            model=model.RequestStatus,
+            alias="nvmeof ns set_rbd_trash_image",
+            success_message_template=(
+                'Setting RBD trash image flag for namespace {nsid} '
+                'in {nqn} to "{trash_text}": Successful'
+            ),
+            success_message_map={
+                "trash_text": lambda _v, f: (
+                    "trash on namespace deletion" if str_to_bool(f.get("rbd_trash_image_on_delete"))
+                    else "do not trash on namespace deletion"
+                )
+            }
         )
         @EndpointDoc(
             "changes the trash image on delete of the specified NVMeoF \
@@ -1070,7 +1150,8 @@ else:
         @Endpoint('PUT', '{nsid}/refresh_size')
         @NvmeofCLICommand(
             "nvmeof namespace refresh_size", model=model.RequestStatus,
-            alias="nvmeof ns refresh_size"
+            alias="nvmeof ns refresh_size",
+            success_message_template="Refreshing size for namespace {nsid} in {nqn}: Successful"
         )
         @EndpointDoc(
             "refresh the specified NVMeoF namespace to current RBD image size",
@@ -1201,7 +1282,11 @@ else:
             return response
 
         @empty_response
-        @NvmeofCLICommand("nvmeof namespace del", model.RequestStatus, alias="nvmeof ns del")
+        @NvmeofCLICommand(
+            "nvmeof namespace del",
+            model.RequestStatus,
+            alias="nvmeof ns del",
+            success_message_template="Deleting namespace {nsid} from {nqn}: Successful")
         @EndpointDoc(
             "Delete an existing NVMeoF namespace",
             parameters={
@@ -1268,7 +1353,15 @@ else:
             )
 
         @empty_response
-        @NvmeofCLICommand("nvmeof host add", model.RequestStatus)
+        @NvmeofCLICommand(
+            "nvmeof host add",
+            model.RequestStatus,
+            success_message_fn=partial(
+                format_host_updates,
+                template_wildcard="Allowing open host access to {nqn}: Successful",
+                template_item="Adding host {host_nqn} to {nqn}: Successful",
+            ),
+        )
         @EndpointDoc(
             "Allow hosts to access an NVMeoF subsystem",
             parameters={
@@ -1295,7 +1388,15 @@ else:
             )
 
         @empty_response
-        @NvmeofCLICommand("nvmeof host del", model.RequestStatus)
+        @NvmeofCLICommand(
+            "nvmeof host del",
+            model.RequestStatus,
+            success_message_fn=partial(
+                format_host_updates,
+                template_wildcard="Disabling open host access to {nqn}: Successful",
+                template_item="Removing host {host_nqn} access from {nqn}: Successful",
+            ),
+        )
         @EndpointDoc(
             "Disallow hosts from accessing an NVMeoF subsystem",
             parameters={
@@ -1319,7 +1420,12 @@ else:
         @Endpoint('PUT', '{host_nqn}/change_key')
         @UpdatePermission
         @empty_response
-        @NvmeofCLICommand("nvmeof host change_key", model.RequestStatus)
+        @NvmeofCLICommand(
+            "nvmeof host change_key",
+            model.RequestStatus,
+            success_message_template=("Changing key for host {host_nqn} "
+                                      "on subsystem {nqn}: Successful")
+        )
         @EndpointDoc(
             "Change host DH-HMAC-CHAP key",
             parameters={
@@ -1347,7 +1453,12 @@ else:
             )
 
         @empty_response
-        @NvmeofCLICommand("nvmeof host del_key", model.RequestStatus)
+        @NvmeofCLICommand(
+            "nvmeof host del_key",
+            model.RequestStatus,
+            success_message_template=("Deleting key for host {host_nqn} "
+                                      "on subsystem {nqn}: Successful")
+        )
         @EndpointDoc(
             "Delete host DH-HMAC-CHAP key",
             parameters={
index 9343ddbb377fb9f3d641d5493d6f1528e5e94a11..cd09305efe548f9cc8bfb9c28ae29d728d1c74d5 100644 (file)
@@ -1,13 +1,15 @@
 # -*- coding: utf-8 -*-
 import errno
+import inspect
 import json
+import logging
 from abc import ABC, abstractmethod
 from enum import Enum
-from typing import Annotated, Any, Dict, List, NamedTuple, Optional, Type, \
-    Union, get_args, get_origin, get_type_hints
+from typing import Annotated, Any, Callable, Dict, List, Mapping, NamedTuple, \
+    Optional, Type, Union, get_args, get_origin, get_type_hints
 
 import yaml
-from mgr_module import CLICheckNonemptyFileInput, HandleCommandResult, HandlerFuncType
+from mgr_module import CLICheckNonemptyFileInput, HandleCommandResult
 from prettytable import PrettyTable
 
 from ..cli import DBCLICommand
@@ -16,6 +18,8 @@ from ..rest_client import RequestException
 from .nvmeof_conf import ManagedByOrchestratorException, \
     NvmeofGatewayAlreadyExists, NvmeofGatewaysConfig
 
+logger = logging.getLogger(__name__)
+
 
 @DBCLICommand.Read('dashboard nvmeof-gateway-list')
 def list_nvmeof_gateways(_):
@@ -64,6 +68,39 @@ UNITS = {
 }
 
 
+def escape_address_if_ipv6(addr: str) -> str:
+    ret_addr = addr
+    if ":" in addr and not addr.strip().startswith("["):
+        ret_addr = f"[{addr}]"
+    return ret_addr
+
+
+def _normalize_to_list(value: Any) -> List[Any]:
+    if value is None:
+        return []
+    if isinstance(value, list):
+        return value
+    return [value]
+
+
+def format_host_updates(args: Dict[str, Any],
+                        template_wildcard: str,
+                        template_item: str,
+                        host_arg: str = 'host_nqn',
+                        nqn_arg: str = 'nqn',
+                        wildcard_value: str = '*') -> str:
+    nqn = args.get(nqn_arg)
+    hosts = _normalize_to_list(args.get(host_arg))
+
+    messages: List[str] = []
+    for host in hosts:
+        if host == wildcard_value:
+            messages.append(template_wildcard.format(nqn=nqn))
+        else:
+            messages.append(template_item.format(nqn=nqn, host_nqn=host))
+    return "\n".join(messages)
+
+
 def convert_to_bytes(size: Union[int, str], default_unit=None):
     if isinstance(size, int):
         number = size
@@ -248,38 +285,184 @@ class AnnotatedDataTextOutputFormatter(OutputFormatter):
 class NvmeofCLICommand(DBCLICommand):
     desc: str
 
-    def __init__(self, prefix, model: Type[NamedTuple], alias=None, perm='rw', poll=False):
+    def __init__(self,
+                 prefix,
+                 model: Type[NamedTuple],
+                 alias: Optional[str] = None,
+                 perm: str = 'rw',
+                 poll: bool = False,
+                 success_message_template: Optional[str] = None,
+                 success_message_map: Optional[Dict[str, Any]] = None,
+                 success_message_fn: Optional[Callable[[Dict[str, Any]], str]] = None):
         super().__init__(prefix, perm, poll)
         self._output_formatter = AnnotatedDataTextOutputFormatter()
         self._model = model
         self._alias = alias
         self._alias_cmd: Optional[NvmeofCLICommand] = None
 
-    def _use_api_endpoint_desc_if_available(self, func):
-        if not self.desc and hasattr(func, 'doc_info'):
-            self.desc = func.doc_info.get('summary', '')
+        self._success_message_template = success_message_template
+        self._success_message_map = success_message_map or {}
+        self._success_message_fn = success_message_fn
+        self._func_defaults: Dict[str, Any] = {}
+
+    def __call__(self, func):
+        resp = super().__call__(func)
+        self._func_defaults = self._compute_func_defaults()
 
-    def __call__(self, func) -> HandlerFuncType:  # type: ignore
         if self._alias:
-            self._alias_cmd = NvmeofCLICommand(self._alias, model=self._model)
-            assert self._alias_cmd is not None
+            self._alias_cmd = NvmeofCLICommand(
+                self._alias,
+                model=self._model,
+                success_message_template=self._success_message_template,
+                success_message_map=self._success_message_map,
+                success_message_fn=self._success_message_fn,
+            )
             self._alias_cmd(func)
+            self._alias_cmd._func_defaults = self._alias_cmd._compute_func_defaults()
 
-        resp = super().__call__(func)
         self._use_api_endpoint_desc_if_available(func)
         return resp
 
+    def _use_api_endpoint_desc_if_available(self, func):
+        if not self.desc and hasattr(func, 'doc_info'):
+            self.desc = func.doc_info.get('summary', '')
+
+    def _compute_func_defaults(self) -> Dict[str, Any]:
+        defaults: Dict[str, Any] = {}
+        sig = inspect.signature(self.func)
+
+        for name, param in sig.parameters.items():
+            if name in DBCLICommand.KNOWN_ARGS:
+                continue
+            if name not in self.arg_spec:
+                continue
+            if param.default is not inspect.Parameter.empty:
+                defaults[name] = param.default
+
+        return defaults
+
+    def _stringify(self, value: Any) -> str:
+        if isinstance(value, (bytes, bytearray)):
+            try:
+                return value.decode('utf-8', errors='replace')
+            except Exception:
+                return str(value)
+
+        if isinstance(value, (list, tuple)):
+            try:
+                return ','.join(self._stringify(v) for v in value)
+            except Exception:
+                return str(value)
+
+        return str(value)
+
+    def _args_map_from_argspec(self,
+                               cmd_dict: Dict[str, Any],
+                               inbuf: Optional[str] = None) -> Dict[str, Any]:
+        kwargs, specials = self._collect_args_by_argspec(cmd_dict)
+        kwargs = kwargs or {}
+        specials = specials or {}
+
+        if inbuf and 'inbuf' in specials:
+            kwargs['inbuf'] = inbuf
+
+        return {**self._func_defaults, **kwargs}
+
+    def _apply_single_map_spec(
+        self,
+        spec: Union[str, dict, Callable],
+        raw: Any,
+        fields: Dict[str, Any],
+    ) -> Any:
+        """
+        spec can be:
+          - dict mapping exact raw values to literal/callable
+          - callable(value, fields)
+          - literal value (e.g. string)
+        """
+        if callable(spec):
+            return spec(raw, fields)
+
+        if isinstance(spec, dict):
+            if raw in spec:
+                val = spec[raw]
+                return val(raw, fields) if callable(val) else val
+            return raw
+        return spec
+
+    def _apply_success_message_map(self, fields: Dict[str, Any]) -> Dict[str, Any]:
+        out = dict(fields)
+        for field, spec in self._success_message_map.items():
+            raw = out.get(field)
+            try:
+                out[field] = self._apply_single_map_spec(spec, raw, out)
+            except Exception:
+                logger.warning("Failed applying success_message_map for field %s on %s",
+                               field, self.prefix, exc_info=True)
+        return out
+
+    def get_success_msg(self, args_map: Dict[str, Any], response: Any) -> Optional[str]:
+        merged_fields: Dict[str, Any]
+
+        if isinstance(response, Mapping):
+            merged_fields = {**args_map, **dict(response)}
+        else:
+            merged_fields = dict(args_map)
+
+        if self._success_message_fn:
+            try:
+                return self._success_message_fn(merged_fields)
+            except Exception:
+                logger.warning("success_message_fn failed for %s", self.prefix, exc_info=True)
+                return None
+
+        return self._format_success_message_from_args(args_map, response)
+
+    def _format_success_message_from_args(self,
+                                          args_map: Dict[str, Any],
+                                          response: Any) -> Optional[str]:
+        if not self._success_message_template:
+            return None
+
+        resp_map: Dict[str, Any]
+        if isinstance(response, Mapping):
+            resp_map = dict(response)
+        else:
+            resp_map = {}
+
+        try:
+            fields_dict = {**args_map, **resp_map}
+            fields_dict = self._apply_success_message_map(fields_dict)
+            str_map = {k: self._stringify(v) for k, v in fields_dict.items()}
+            return self._success_message_template.format(**str_map)
+        except Exception:
+            logger.warning("Success message template failed for %s", self.prefix, exc_info=True)
+            return None
+
     def call(self,
              mgr: Any,
              cmd_dict: Dict[str, Any],
              inbuf: Optional[str] = None) -> HandleCommandResult:
         try:
-            ret = super().call(mgr, cmd_dict, inbuf)
             out_format = cmd_dict.get('format')
+            args_map = self._args_map_from_argspec(cmd_dict, inbuf)
+            ret = super().call(mgr, cmd_dict, inbuf)
             if ret is None:
-                out = ''
+                ret = {}
+
             if out_format == 'plain' or not out_format:
-                out = self._output_formatter.format_output(ret, self._model)
+                message: Optional[str] = None
+                try:
+                    message = self.get_success_msg(args_map, ret)
+                except Exception:
+                    logger.warning("Formatting of success message failed for %s",
+                                   self.prefix, exc_info=True)
+
+                if message:
+                    out = message
+                else:
+                    out = self._output_formatter.format_output(ret, self._model)
+
             elif out_format == 'json':
                 out = json.dumps(ret, indent=4)
             elif out_format == 'yaml':
@@ -287,6 +470,8 @@ class NvmeofCLICommand(DBCLICommand):
             else:
                 return HandleCommandResult(-errno.EINVAL, '',
                                            f"format '{out_format}' is not implemented")
+
             return HandleCommandResult(0, out, '')
+
         except Exception as e:  # pylint: disable=broad-except
             return HandleCommandResult(-errno.EINVAL, '', str(e))
index 8b98016065ad4815f3a66c783dc8fd72ec858ed5..4895a300bc942cd2a1259b92084ad27ab46a13ef 100644 (file)
@@ -1,17 +1,18 @@
 import errno
 import json
-import logging
 import unittest
-from typing import Annotated, List, NamedTuple
+from typing import Annotated, List, NamedTuple, Optional
 from unittest.mock import MagicMock
 
 import pytest
-from mgr_module import CLICommandBase, HandleCommandResult
+from mgr_module import HandleCommandResult
 
+from ..cli import DBCLICommand
 from ..controllers import EndpointDoc
 from ..model.nvmeof import CliFieldTransformer, CliFlags, CliHeader
 from ..services.nvmeof_cli import AnnotatedDataTextOutputFormatter, \
-    NvmeofCLICommand, convert_from_bytes, convert_to_bytes
+    NvmeofCLICommand, convert_from_bytes, convert_to_bytes, \
+    format_host_updates
 from ..tests import CLICommandTestMixin
 
 
@@ -36,7 +37,7 @@ def fixture_base_call_mock(monkeypatch):
     mock_result = {'a': 'b'}
     super_mock = MagicMock()
     super_mock.return_value = mock_result
-    monkeypatch.setattr(CLICommandBase, 'call', super_mock)
+    monkeypatch.setattr(DBCLICommand, 'call', super_mock)
     return super_mock
 
 
@@ -45,7 +46,7 @@ def fixture_base_call_return_none_mock(monkeypatch):
     mock_result = None
     super_mock = MagicMock()
     super_mock.return_value = mock_result
-    monkeypatch.setattr(CLICommandBase, 'call', super_mock)
+    monkeypatch.setattr(DBCLICommand, 'call', super_mock)
     return super_mock
 
 
@@ -56,7 +57,6 @@ class TestNvmeofCLICommand:
 
     def test_command_return_cmd_result_default_format(self, base_call_mock, sample_command):
         result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {})
-        logging.getLogger().error(result)
         assert isinstance(result, HandleCommandResult)
         assert result.retval == 0
         assert result.stdout == (
@@ -211,6 +211,596 @@ class TestNvmeofCLICommand:
         assert test_alias not in NvmeofCLICommand.COMMANDS
 
 
+class TestNvmeofCLICommandSuccessMessage:
+
+    def test_plain_output_uses_success_message_template(self):
+        test_cmd = "nvmeof set_log_level"
+
+        class Model(NamedTuple):
+            status: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            success_message_template="set log level to {log_level}"
+        )
+        def set_log_level(self, log_level: str, gw_group: Optional[str] = None, traddr: Optional[str] = None):  # noqa
+            return {"status": 0}
+
+        result_default = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"log_level": "info"}
+        )
+        assert isinstance(result_default, HandleCommandResult)
+        assert result_default.retval == 0
+        assert result_default.stdout == "set log level to info"
+        assert result_default.stderr == ''
+
+        result_plain = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "log_level": "info"}
+        )
+        assert isinstance(result_plain, HandleCommandResult)
+        assert result_plain.retval == 0
+        assert result_plain.stdout == "set log level to info"
+        assert result_plain.stderr == ''
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+    def test_plain_output_falls_back_when_template_unresolvable(self):
+        test_cmd = "nvmeof gateway set_log_level_fallback"
+
+        class Model(NamedTuple):
+            a: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            success_message_template="set log level to {log_level}"
+        )
+        def set_log_level(self, a: str):  # noqa
+            return {"a": "b"}
+
+        result_plain = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain"}
+        )
+        assert isinstance(result_plain, HandleCommandResult)
+        assert result_plain.retval == 0
+        assert result_plain.stdout == (
+            "+-+\n"
+            "|A|\n"
+            "+-+\n"
+            "|b|\n"
+            "+-+"
+        )
+        assert result_plain.stderr == ''
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+    def test_default_output_falls_back_when_template_unresolvable(self):
+        test_cmd = "nvmeof gateway set_log_level_fallback_default"
+
+        class Model(NamedTuple):
+            a: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            success_message_template="set log level to {log_level}"
+        )
+        def set_log_level(self, a: str):  # noqa
+            return {"a": "b"}
+
+        result_default = NvmeofCLICommand.COMMANDS[test_cmd].call(MagicMock(), {})
+        assert isinstance(result_default, HandleCommandResult)
+        assert result_default.retval == 0
+        assert result_default.stdout == (
+            "+-+\n"
+            "|A|\n"
+            "+-+\n"
+            "|b|\n"
+            "+-+"
+        )
+        assert result_default.stderr == ''
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+    def test_alias_inherits_success_message_template(self):
+        test_cmd = "nvmeof gateway set_log_level_main"
+        test_alias = "nvmeof gw set_log_level_alias"
+
+        class Model(NamedTuple):
+            status: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            alias=test_alias,
+            success_message_template="set log level to {log_level}"
+        )
+        def set_log_level(self, log_level: str):  # noqa
+            return {"status": 0}
+
+        result_main = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "log_level": "debug"}
+        )
+        assert result_main.retval == 0
+        assert result_main.stdout == "set log level to debug"
+        assert result_main.stderr == ''
+
+        result_alias = NvmeofCLICommand.COMMANDS[test_alias].call(
+            MagicMock(),
+            {"format": "plain", "log_level": "warn"}
+        )
+        assert result_alias.retval == 0
+        assert result_alias.stdout == "set log level to warn"
+        assert result_alias.stderr == ''
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        del NvmeofCLICommand.COMMANDS[test_alias]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+        assert test_alias not in NvmeofCLICommand.COMMANDS
+
+    def test_plain_uses_success_message_map_callable(self):
+        test_cmd = "nvmeof gw set_log_level map callable"
+
+        class Model(NamedTuple):
+            status: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            success_message_template="set log level to {log_level}{suffix}",
+            success_message_map={
+                "suffix": lambda _v, f: " for all hosts" if f.get("all_hosts") else ""
+            }
+        )
+        def fn(self, log_level: str, all_hosts: bool = False):  # noqa
+            return {"status": 0}
+
+        res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "log_level": "info", "all_hosts": True}
+        )
+        assert res.retval == 0
+        assert res.stdout == "set log level to info for all hosts"
+        assert res.stderr == ''
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+    def test_success_message_map_dict_maps_exact_values(self):
+        test_cmd = "nvmeof ns change_visibility map dict"
+
+        class Model(NamedTuple):
+            status: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            success_message_template='visibility "{auto_visible}": Successful',
+            success_message_map={
+                "auto_visible": {
+                    True: "visible to all hosts",
+                    False: "visible to selected hosts",
+                }
+            }
+        )
+        def fn(self, auto_visible: bool):  # noqa
+            return {"status": 0}
+
+        res_true = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "auto_visible": True}
+        )
+        assert res_true.retval == 0
+        assert res_true.stdout == 'visibility "visible to all hosts": Successful'
+
+        res_false = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "auto_visible": False}
+        )
+        assert res_false.retval == 0
+        assert res_false.stdout == 'visibility "visible to selected hosts": Successful'
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+    def test_success_message_map_dict_missing_key_leaves_raw_value(self):
+        test_cmd = "nvmeof map dict missing key"
+
+        class Model(NamedTuple):
+            status: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            success_message_template="val {x}",
+            success_message_map={
+                "x": {1: "one"}  # no mapping for 2
+            }
+        )
+        def fn(self, x: int):  # noqa
+            return {"status": 0}
+
+        res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "x": 2}
+        )
+        assert res.retval == 0
+        assert res.stdout == "val 2"
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+    def test_success_message_map_dict_value_callable(self):
+        test_cmd = "nvmeof map dict value callable"
+
+        class Model(NamedTuple):
+            status: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            success_message_template="host {host_name}",
+            success_message_map={
+                "host_name": {
+                    "*": "for all hosts",
+                    "h1": (lambda v, _f: f"for host {v}"),
+                }
+            }
+        )
+        def fn(self, host_name: str):  # noqa
+            return {"status": 0}
+
+        res_star = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "host_name": "*"}
+        )
+        assert res_star.stdout == "host for all hosts"
+
+        res_h1 = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "host_name": "h1"}
+        )
+        assert res_h1.stdout == "host for host h1"
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+    def test_success_message_map_supports_derived_fields(self):
+        test_cmd = "nvmeof map derived field"
+
+        class Model(NamedTuple):
+            status: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            success_message_template="msg {derived}",
+            success_message_map={
+                "derived": lambda _v, f: f"nqn={f.get('nqn')}",
+            }
+        )
+        def fn(self, nqn: str):  # noqa
+            return {"status": 0}
+
+        res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "nqn": "subsys1"}
+        )
+        assert res.retval == 0
+        assert res.stdout == "msg nqn=subsys1"
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+    def test_alias_inherits_success_message_map(self):
+        test_cmd = "nvmeof map alias main"
+        test_alias = "nvmeof map alias alias"
+
+        class Model(NamedTuple):
+            status: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            alias=test_alias,
+            success_message_template="lvl {log_level}{suffix}",
+            success_message_map={
+                "suffix": lambda _v, f: "!" if f.get("urgent") else "."
+            }
+        )
+        def fn(self, log_level: str, urgent: bool = False):  # noqa
+            return {"status": 0}
+
+        res_main = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "log_level": "debug", "urgent": True}
+        )
+        assert isinstance(res_main, HandleCommandResult)
+        assert res_main.retval == 0
+        assert res_main.stdout == "lvl debug!"
+        assert res_main.stderr == ''
+
+        res_alias = NvmeofCLICommand.COMMANDS[test_alias].call(
+            MagicMock(),
+            {"format": "plain", "log_level": "warn", "urgent": False}
+        )
+        assert isinstance(res_alias, HandleCommandResult)
+        assert res_alias.retval == 0
+        assert res_alias.stdout == "lvl warn."
+        assert res_alias.stderr == ''
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        del NvmeofCLICommand.COMMANDS[test_alias]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+        assert test_alias not in NvmeofCLICommand.COMMANDS
+
+    def test_map_failure_does_not_break_template_rendering(self):
+        test_cmd = "nvmeof map failure fallback"
+
+        class Model(NamedTuple):
+            a: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            success_message_template="value {a}",
+            success_message_map={
+                "a": lambda _v, _f: 1 / 0,  # force exception
+            }
+        )
+        def fn(self, a: str):  # noqa
+            return {"a": "b"}
+
+        res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "a": "ignored"}
+        )
+        assert res.retval == 0
+        assert res.stdout == "value b"
+        assert res.stderr == ''
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+    def test_template_formats_int_and_list_without_failure(self):
+        class Model(NamedTuple):
+            status: str
+
+        @NvmeofCLICommand(
+            "nvmeof mixed params",
+            Model,
+            success_message_template="ns {nsid} hosts {host_nqn}"
+        )
+        def fn(self, nsid: int, host_nqn: list[str]):  # noqa
+            return {"status": 1}
+
+        res = NvmeofCLICommand.COMMANDS["nvmeof mixed params"].call(
+            MagicMock(),
+            {"format": "plain", "nsid": 42, "host_nqn": ["a", "b"]}
+        )
+        assert res.retval == 0
+        assert res.stdout == "ns 42 hosts a,b"
+
+        del NvmeofCLICommand.COMMANDS["nvmeof mixed params"]
+        assert "nvmeof mixed params" not in NvmeofCLICommand.COMMANDS
+
+    def test_success_message_uses_default_when_cli_omits_param(self):
+        class Model(NamedTuple):
+            status: str
+
+        def create(mgr, nqn: str, host_name: str, traddr: str,
+                   trsvcid: int = 4420, adrfam: int = 0, gw_group: Optional[str] = None):
+            return dict(status=1)
+
+        cmd = NvmeofCLICommand(
+            "nvmeof listener add",
+            model=Model,
+            success_message_template="Adding {nqn} listener at {traddr}:{trsvcid}: Successful"
+        )
+        cmd(create)
+
+        cmd_dict = {
+            "nqn": "nqn.2014-08.org.nvmexpress:uuid:1234",
+            "host_name": "nvme-host-1",
+            "traddr": "10.0.0.5",
+            # 'trsvcid' omitted
+            # 'adrfam' omitted
+        }
+
+        result = cmd.call(mgr=None, cmd_dict=cmd_dict, inbuf=None)
+        assert result.retval == 0
+        assert result.stderr == ""
+        assert result.stdout == (
+            "Adding nqn.2014-08.org.nvmexpress:uuid:1234 listener at 10.0.0.5:4420: Successful"
+        )
+
+    def test_success_message_cli_value_overrides_default(self):
+        class Model(NamedTuple):
+            status: str
+
+        def create(mgr, nqn: str, host_name: str, traddr: str,
+                   trsvcid: int = 4420, adrfam: int = 0, gw_group: Optional[str] = None):
+            return dict(status=1)
+
+        cmd = NvmeofCLICommand(
+            "nvmeof listener add",
+            model=Model,
+            success_message_template="Adding {nqn} listener at {traddr}:{trsvcid}: Successful"
+        )
+        cmd(create)
+
+        cmd_dict = {
+            "nqn": "nqn.2014-08.org.nvmexpress:uuid:abcd",
+            "host_name": "nvme-host-2",
+            "traddr": "192.168.1.10",
+            "trsvcid": 8009,  # override default 4420
+        }
+
+        result = cmd.call(mgr=None, cmd_dict=cmd_dict, inbuf=None)
+        assert result.retval == 0
+        assert result.stderr == ""
+        assert result.stdout == (
+            "Adding nqn.2014-08.org.nvmexpress:uuid:abcd listener at 192.168.1.10:8009: Successful"
+        )
+
+    def test_defaults_allow_none_and_template_does_not_crash(self):
+        class Model(NamedTuple):
+            status: str
+
+        def create_with_none(
+            mgr,
+            nqn: str,
+            traddr: str,
+            trsvcid: int = 4420,
+            gw_group: Optional[str] = None,  # None default intentionally used
+        ):
+            return dict(status=1)
+
+        cmd = NvmeofCLICommand(
+            "nvmeof listener add",
+            model=Model,
+            success_message_template=(
+                "Adding {nqn} listener at {traddr}:{trsvcid} "
+                "gw={gw_group}: Successful"
+            ),
+        )
+        cmd(create_with_none)
+
+        cmd_dict = {
+            "nqn": "nqn.none.test",
+            "traddr": "127.0.0.1",
+            # 'gw_group' omitted; None default should be injected
+        }
+
+        result = cmd.call(mgr=None, cmd_dict=cmd_dict, inbuf=None)
+        assert result.retval == 0
+        assert result.stderr == ""
+        assert result.stdout == (
+            "Adding nqn.none.test listener at 127.0.0.1:4420 gw=None: Successful"
+        )
+
+    def test_template_can_use_response_fields(self):
+        test_cmd = "nvmeof show op status"
+
+        class Model(NamedTuple):
+            status: str
+            message: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            success_message_template="operation {op} finished with status {message}"
+        )
+        def op(self, op: str):
+            return {"status": 1, "message": "done"}
+
+        res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "op": "rebuild"}
+        )
+        assert res.retval == 0
+        assert res.stdout == "operation rebuild finished with status done"
+        assert res.stderr == ''
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+    def test_success_message_fn_overrides_template(self):
+        test_cmd = "nvmeof success fn overrides template"
+
+        class Model(NamedTuple):
+            status: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            success_message_template="TEMPLATE {nqn}",
+            success_message_fn=lambda f: f"FN {f.get('nqn')}"
+        )
+        def fn(self, nqn: str):
+            return {"status": 0}
+
+        res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "nqn": "subsysA"}
+        )
+        assert res.retval == 0
+        assert res.stderr == ''
+        assert res.stdout == "FN subsysA"
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+    def test_success_message_fn_can_use_response_fields(self):
+        test_cmd = "nvmeof success fn response fields"
+
+        class Model(NamedTuple):
+            status: str
+            message: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            success_message_template="TEMPLATE {message}",
+            success_message_fn=lambda f: f"FN message={f.get('message')}"
+        )
+        def fn(self, op: str):  # noqa: ARG001
+            return {"status": 0, "message": "done"}
+
+        res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "op": "ignored"}
+        )
+        assert res.retval == 0
+        assert res.stderr == ''
+        assert res.stdout == "FN message=done"
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+    def test_success_message_fn_failure_falls_back_to_default_formatting(self):
+        test_cmd = "nvmeof success fn failure fallback"
+
+        class Model(NamedTuple):
+            status: str
+
+        @NvmeofCLICommand(
+            test_cmd,
+            Model,
+            success_message_template="TEMPLATE {nqn}",
+            success_message_fn=lambda _f: 1 / 0  # force exception
+        )
+        def fn(self, nqn: str):
+            return {"status": 0}
+
+        res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+            MagicMock(),
+            {"format": "plain", "nqn": "subsysB"}
+        )
+        assert res.retval == 0
+        assert res.stderr == ""
+        # since success_message_fn raises and get_success_msg returns None,
+        # code falls back to formatted output of the response
+        assert res.stdout == (
+            "++\n"
+            "||\n"
+            "++\n"
+            "\n"
+            "++"
+        )
+
+        del NvmeofCLICommand.COMMANDS[test_cmd]
+        assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+
 class TestNVMeoFConfCLI(unittest.TestCase, CLICommandTestMixin):
     def setUp(self):
         self.mock_kv_store()
@@ -642,3 +1232,74 @@ class TestConvertToBytes:
         with pytest.raises(ValueError):
             assert convert_to_bytes('5') == 5368709120
         assert convert_to_bytes('5', default_unit='GB') == 5368709120
+
+
+class TestFormatHostUpdates:
+    def test_single_wildcard(self):
+        args = {"nqn": "subsys1", "host_nqn": "*"}
+        out = format_host_updates(
+            args,
+            template_wildcard="Allowing open host access to {nqn}: Successful",
+            template_item="Adding host {host_nqn} to {nqn}: Successful",
+        )
+        assert out == "Allowing open host access to subsys1: Successful"
+
+    def test_single_host(self):
+        args = {"nqn": "subsys1", "host_nqn": "hostA"}
+        out = format_host_updates(
+            args,
+            template_wildcard="Allowing open host access to {nqn}: Successful",
+            template_item="Adding host {host_nqn} to {nqn}: Successful",
+        )
+        assert out == "Adding host hostA to subsys1: Successful"
+
+    def test_multiple_hosts_mixed_including_wildcard(self):
+        args = {"nqn": "subsys1", "host_nqn": ["hostA", "*", "hostB"]}
+        out = format_host_updates(
+            args,
+            template_wildcard="Disabling open host access to {nqn}: Successful",
+            template_item="Removing host {host_nqn} access from {nqn}: Successful",
+        )
+        assert out == (
+            "Removing host hostA access from subsys1: Successful\n"
+            "Disabling open host access to subsys1: Successful\n"
+            "Removing host hostB access from subsys1: Successful"
+        )
+
+    def test_none_host_arg_returns_empty_string(self):
+        args = {"nqn": "subsys1", "host_nqn": None}
+        out = format_host_updates(
+            args,
+            template_wildcard="Allowing open host access to {nqn}: Successful",
+            template_item="Adding host {host_nqn} to {nqn}: Successful",
+        )
+        assert out == ""
+
+    def test_missing_host_arg_returns_empty_string(self):
+        args = {"nqn": "subsys1"}  # host_nqn omitted
+        out = format_host_updates(
+            args,
+            template_wildcard="Allowing open host access to {nqn}: Successful",
+            template_item="Adding host {host_nqn} to {nqn}: Successful",
+        )
+        assert out == ""
+
+    def test_missing_nqn_renders_as_none(self):
+        args = {"host_nqn": "*"}  # nqn omitted
+        out = format_host_updates(
+            args,
+            template_wildcard="Allowing open host access to {nqn}: Successful",
+            template_item="Adding host {host_nqn} to {nqn}: Successful",
+        )
+        assert out == "Allowing open host access to None: Successful"
+
+    def test_custom_arg_names(self):
+        args = {"subsystem": "nqn.test", "host": ["h1", "*"]}
+        out = format_host_updates(
+            args,
+            nqn_arg="subsystem",
+            host_arg="host",
+            template_wildcard="W {nqn}",
+            template_item="H {host_nqn} {nqn}",
+        )
+        assert out == "H h1 nqn.test\nW nqn.test"