)
@empty_response
- @NvmeofCLICommand("nvmeof listener del", model.RequestStatus,
- success_message_fn=build_listener_del_success_message)
+ @NvmeofCLICommand(
+ "nvmeof listener del",
+ model.RequestStatus,
+ success_message_template=(
+ "Deleting listener {traddr}:{trsvcid} from {nqn} {host_msg}: Successful"
+ ),
+ success_message_map={
+ "traddr": lambda v, _f: escape_address_if_ipv6(v) if v is not None else "",
+ "host_msg": lambda _v, f: (
+ "for all hosts" if f.get("host_name") == "*"
+ else f"for host {f.get('host_name')}"
+ ),
+ }
+ )
@EndpointDoc(
"Delete an existing NVMeoF listener",
parameters={
@ReadPermission
@Endpoint('PUT', '{nsid}/change_visibility')
@NvmeofCLICommand(
- "nvmeof namespace change_visibility", model=model.RequestStatus,
+ "nvmeof namespace change_visibility",
+ model=model.RequestStatus,
alias="nvmeof ns change_visibility",
- success_message_fn=build_ns_change_visibility_success_message
+ success_message_template=(
+ 'Changing visibility of namespace {nsid} in {nqn} to "{auto_visible}": Successful'
+ ),
+ success_message_map={
+ "auto_visible": lambda v, _f: (
+ "visible to all hosts" if str_to_bool(v)
+ else "visible to selected hosts"
+ )
+ }
)
@EndpointDoc(
"changes the visibility of the specified NVMeoF namespace to all or selected hosts",
@ReadPermission
@Endpoint('PUT', '{nsid}/set_auto_resize')
@NvmeofCLICommand(
- "nvmeof namespace set_auto_resize", model=model.RequestStatus,
+ "nvmeof namespace set_auto_resize",
+ model=model.RequestStatus,
alias="nvmeof ns set_auto_resize",
- success_message_fn=build_ns_set_auto_resize_success_message
+ success_message_template=(
+ 'Setting auto resize flag for namespace {nsid} in {nqn} to "{auto_resize_text}": Successful'
+ ),
+ success_message_map={
+ "auto_resize_text": lambda _v, f: (
+ "auto resize namespace" if str_to_bool(f.get("auto_resize_enabled"))
+ else "do not auto resize namespace"
+ )
+ }
)
@EndpointDoc(
"Enable or disable namespace auto resize when RBD image is resized",
@ReadPermission
@Endpoint('PUT', '{nsid}/set_rbd_trash_image')
@NvmeofCLICommand(
- "nvmeof namespace set_rbd_trash_image", model=model.RequestStatus,
+ "nvmeof namespace set_rbd_trash_image",
+ model=model.RequestStatus,
alias="nvmeof ns set_rbd_trash_image",
- success_message_fn=build_ns_set_rbd_trash_image_success_message
+ success_message_template=(
+ 'Setting RBD trash image flag for namespace {nsid} in {nqn} to "{trash_text}": Successful'
+ ),
+ success_message_map={
+ "trash_text": lambda _v, f: (
+ "trash on namespace deletion" if str_to_bool(f.get("rbd_trash_image_on_delete"))
+ else "do not trash on namespace deletion"
+ )
+ }
)
@EndpointDoc(
"changes the trash image on delete of the specified NVMeoF \
)
@empty_response
- @NvmeofCLICommand("nvmeof host add", model.RequestStatus,
- success_message_fn=build_host_add_success_message)
+ @NvmeofCLICommand(
+ "nvmeof host add",
+ model.RequestStatus,
+ success_message_template="{messages}",
+ success_message_map={
+ "messages": lambda _v, f: (
+ (lambda host_list, nqn:
+ "\n".join(
+ (
+ f"Allowing open host access to {nqn}: Successful"
+ if h == "*"
+ else f"Adding host {h} to {nqn}: Successful"
+ ) for h in host_list
+ )
+ )(
+ f.get("host_nqn") if isinstance(f.get("host_nqn"), list)
+ else ([f.get("host_nqn")] if f.get("host_nqn") else []),
+ f.get("nqn"),
+ )
+ )
+ }
+ )
@EndpointDoc(
"Allow hosts to access an NVMeoF subsystem",
parameters={
)
@empty_response
- @NvmeofCLICommand("nvmeof host del", model.RequestStatus)
+ @NvmeofCLICommand(
+ "nvmeof host del",
+ model.RequestStatus,
+ success_message_template="{messages}",
+ success_message_map={
+ "messages": lambda _v, f: (
+ (lambda host_list, nqn:
+ "\n".join(
+ (
+ f"Disabling open host access to {nqn}: Successful"
+ if h == "*"
+ else f"Removing host {h} access from {nqn}: Successful"
+ ) for h in host_list
+ )
+ )(
+ f.get("host_nqn") if isinstance(f.get("host_nqn"), list)
+ else ([f.get("host_nqn")] if f.get("host_nqn") else []),
+ f.get("nqn"),
+ )
+ )
+ }
+ )
@EndpointDoc(
"Disallow hosts from accessing an NVMeoF subsystem",
parameters={
return self._convert_to_text_output(data, model)
+DEFAULT_MAP_KEY = "__default__" # you can delete this if you want
+
class NvmeofCLICommand(CLICommand):
desc: str
perm: str = 'rw',
poll: bool = False,
success_message_template: Optional[str] = None,
- success_message_fn: Optional[Callable[[Dict[str, Any]], str]] = None
- ):
+ success_message_map: Optional[Dict[str, Any]] = None):
super().__init__(prefix, perm, poll)
self._output_formatter = AnnotatedDataTextOutputFormatter()
self._model = model
self._alias_cmd: Optional[NvmeofCLICommand] = None
self._success_message_template = success_message_template
- self._success_message_fn = success_message_fn
+ self._success_message_map = success_message_map or {}
self._func_defaults: Dict[str, Any] = {}
- def _use_api_endpoint_desc_if_available(self, func):
- if not self.desc and hasattr(func, 'doc_info'):
- self.desc = func.doc_info.get('summary', '')
-
- def __call__(self, func) -> HandlerFuncType: # type: ignore
+ def __call__(self, func):
resp = super().__call__(func)
self._func_defaults = self._compute_func_defaults()
+
if self._alias:
self._alias_cmd = NvmeofCLICommand(
self._alias,
model=self._model,
success_message_template=self._success_message_template,
- success_message_fn=self._success_message_fn
+ success_message_map=self._success_message_map,
)
- assert self._alias_cmd is not None
self._alias_cmd(func)
self._alias_cmd._func_defaults = self._alias_cmd._compute_func_defaults()
self._use_api_endpoint_desc_if_available(func)
return resp
-
- def _compute_func_defaults(self) -> Dict[str, Any]:
- defaults: Dict[str, Any] = {}
- sig = inspect.signature(self.func)
-
- for name, param in sig.parameters.items():
- if name in CLICommand.KNOWN_ARGS:
- continue
- if name not in self.arg_spec:
- continue
- if param.default is not inspect.Parameter.empty:
- defaults[name] = param.default
-
- return defaults
-
- def _args_map_from_argspec(self,
- cmd_dict: Dict[str, Any],
- inbuf: Optional[str] = None) -> Dict[str, Any]:
- kwargs, specials = self._collect_args_by_argspec(cmd_dict)
- if inbuf and 'inbuf' in specials:
- kwargs['inbuf'] = inbuf
-
-
- return {**self._func_defaults, **kwargs}
-
-
- def _stringify(self, value: Any) -> str:
- if isinstance(value, (bytes, bytearray)):
- try:
- return value.decode('utf-8', errors='replace')
- except Exception:
- return str(value)
-
- if isinstance(value, (list, tuple)):
+ def _apply_single_map_spec(self, spec: Any, raw: Any, fields: Dict[str, Any]) -> Any:
+ """
+ spec can be:
+ - callable(value, fields)
+ - literal value (e.g. string)
+ - dict mapping exact raw values to literal/callable
+ """
+ if callable(spec):
+ return spec(raw, fields)
+
+ if isinstance(spec, dict):
+ if raw in spec:
+ val = spec[raw]
+ return val(raw, fields) if callable(val) else val
+ # No default behavior — if key missing → return raw unchanged
+ return raw
+
+ # simple literal replacement
+ return spec
+
+ def _apply_success_message_map(self, fields: Dict[str, Any]) -> Dict[str, Any]:
+ out = dict(fields)
+ for field, spec in self._success_message_map.items():
+ raw = out.get(field)
try:
- return ','.join(self._stringify(v) for v in value)
+ out[field] = self._apply_single_map_spec(spec, raw, out)
except Exception:
- return str(value)
-
- return str(value)
+ logger.warning("Failed applying success_message_map for field %s on %s",
+ field, self.prefix, exc_info=True)
+ return out
- def _format_success_message_from_args(self,
- args_map: Dict[str, Any],
- response: Dict[str, Any]) -> Optional[str]:
- if not self._success_message_template and not self._success_message_fn:
+ def _format_success_message_from_args(self, args_map, response):
+ if not self._success_message_template:
return None
-
- if self._success_message_fn:
- try:
- msg = self._success_message_fn(args_map, response)
- if msg:
- return msg
- except Exception:
- logger.warning("Success message function failed for %s", self.prefix, exc_info=True)
-
- if self._success_message_template:
- try:
- fields_dict = {**args_map, **response}
- str_map = {k: self._stringify(v) for k, v in fields_dict.items()}
- return self._success_message_template.format(**str_map)
- except Exception:
- logger.warning("Success message template failed for %s", self.prefix, exc_info=True)
-
- return None
-
- def call(self,
- mgr: Any,
- cmd_dict: Dict[str, Any],
- inbuf: Optional[str] = None) -> HandleCommandResult:
- try:
- out_format = cmd_dict.get('format')
- args_map = self._args_map_from_argspec(cmd_dict, inbuf)
- ret = super().call(mgr, cmd_dict, inbuf)
- if out_format == 'plain' or not out_format:
- message: Optional[str] = None
- try:
- message = self._format_success_message_from_args(args_map, ret)
- except Exception:
- logger.warning("Formatting of success message failed for %s",
- self.prefix, exc_info=True)
- if message:
- out = message
- else:
- out = self._output_formatter.format_output(ret, self._model)
-
- elif out_format == 'json':
- out = json.dumps(ret, indent=4)
- elif out_format == 'yaml':
- out = yaml.dump(ret)
- else:
- return HandleCommandResult(-errno.EINVAL, '',
- f"format '{out_format}' is not implemented")
-
- return HandleCommandResult(0, out, '')
-
- except Exception as e: # pylint: disable=broad-except
- return HandleCommandResult(-errno.EINVAL, '', str(e))
+ fields = {**args_map, **response}
+ fields = self._apply_success_message_map(fields)
+ return self._success_message_template.format(
+ **{k: self._stringify(v) for k, v in fields.items()}
+ )
\ No newline at end of file
assert test_cmd not in NvmeofCLICommand.COMMANDS
assert test_alias not in NvmeofCLICommand.COMMANDS
- def test_plain_uses_success_message_fn(self):
- test_cmd = "nvmeof gw set_log_level fn"
+ def test_plain_uses_success_message_map_callable(self):
+ test_cmd = "nvmeof gw set_log_level map callable"
class Model(NamedTuple):
status: str
@NvmeofCLICommand(
test_cmd,
Model,
- success_message_fn=lambda args, response: (
- f"set log level to {args.get('log_level', '')}"
- + (" for all hosts" if args.get('all_hosts') else "")
- )
+ success_message_template="set log level to {log_level}{suffix}",
+ success_message_map={
+ "suffix": lambda _v, f: " for all hosts" if f.get("all_hosts") else ""
+ }
)
def fn(self, log_level: str, all_hosts: bool = False): # noqa
return {"status": 0}
del NvmeofCLICommand.COMMANDS[test_cmd]
assert test_cmd not in NvmeofCLICommand.COMMANDS
+ def test_success_message_map_dict_maps_exact_values(self):
+ test_cmd = "nvmeof ns change_visibility map dict"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template='visibility "{auto_visible}": Successful',
+ success_message_map={
+ "auto_visible": {
+ True: "visible to all hosts",
+ False: "visible to selected hosts",
+ }
+ }
+ )
+ def fn(self, auto_visible: bool): # noqa
+ return {"status": 0}
+
+ res_true = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "auto_visible": True}
+ )
+ assert res_true.retval == 0
+ assert res_true.stdout == 'visibility "visible to all hosts": Successful'
+
+ res_false = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "auto_visible": False}
+ )
+ assert res_false.retval == 0
+ assert res_false.stdout == 'visibility "visible to selected hosts": Successful'
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_success_message_map_dict_missing_key_leaves_raw_value(self):
+ test_cmd = "nvmeof map dict missing key"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="val {x}",
+ success_message_map={
+ "x": {1: "one"} # no mapping for 2
+ }
+ )
+ def fn(self, x: int): # noqa
+ return {"status": 0}
+
+ res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "x": 2}
+ )
+ assert res.retval == 0
+ assert res.stdout == "val 2"
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_success_message_map_dict_value_callable(self):
+ test_cmd = "nvmeof map dict value callable"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="host {host_name}",
+ success_message_map={
+ "host_name": {
+ "*": "for all hosts",
+ "h1": (lambda v, _f: f"for host {v}"),
+ }
+ }
+ )
+ def fn(self, host_name: str): # noqa
+ return {"status": 0}
+
+ res_star = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "host_name": "*"}
+ )
+ assert res_star.stdout == "host for all hosts"
+
+ res_h1 = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "host_name": "h1"}
+ )
+ assert res_h1.stdout == "host for host h1"
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_success_message_map_supports_derived_fields(self):
+ test_cmd = "nvmeof map derived field"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="msg {derived}",
+ success_message_map={
+ "derived": lambda _v, f: f"nqn={f.get('nqn')}",
+ }
+ )
+ def fn(self, nqn: str): # noqa
+ return {"status": 0}
+
+ res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "nqn": "subsys1"}
+ )
+ assert res.retval == 0
+ assert res.stdout == "msg nqn=subsys1"
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+ def test_alias_inherits_success_message_map(self):
+ test_cmd = "nvmeof map alias main"
+ test_alias = "nvmeof map alias alias"
+
+ class Model(NamedTuple):
+ status: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ alias=test_alias,
+ success_message_template="lvl {log_level}{suffix}",
+ success_message_map={
+ "suffix": lambda _v, f: "!" if f.get("urgent") else "."
+ }
+ )
+ def fn(self, log_level: str, urgent: bool = False): # noqa
+ return {"status": 0}
+
+ res_main = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "log_level": "debug", "urgent": True}
+ )
+ assert isinstance(res_main, HandleCommandResult)
+ assert res_main.retval == 0
+ assert res_main.stdout == "lvl debug!"
+ assert res_main.stderr == ''
+
+ res_alias = NvmeofCLICommand.COMMANDS[test_alias].call(
+ MagicMock(),
+ {"format": "plain", "log_level": "warn", "urgent": False}
+ )
+ assert isinstance(res_alias, HandleCommandResult)
+ assert res_alias.retval == 0
+ assert res_alias.stdout == "lvl warn."
+ assert res_alias.stderr == ''
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ del NvmeofCLICommand.COMMANDS[test_alias]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+ assert test_alias not in NvmeofCLICommand.COMMANDS
+
+ def test_map_failure_falls_back_to_default_output(self):
+ test_cmd = "nvmeof map failure fallback"
+
+ class Model(NamedTuple):
+ a: str
+
+ @NvmeofCLICommand(
+ test_cmd,
+ Model,
+ success_message_template="value {a}",
+ success_message_map={
+ "a": lambda _v, _f: 1 / 0, # force exception
+ }
+ )
+ def fn(self, a: str): # noqa
+ return {"a": "b"}
+
+ res = NvmeofCLICommand.COMMANDS[test_cmd].call(
+ MagicMock(),
+ {"format": "plain", "a": "ignored"}
+ )
+ assert res.retval == 0
+ # falls back to default output formatter, same style as your other test
+ assert res.stdout == (
+ "+-+\n"
+ "|A|\n"
+ "+-+\n"
+ "|b|\n"
+ "+-+"
+ )
+ assert res.stderr == ''
+
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
def test_template_formats_int_and_list_without_failure(self):
class Model(NamedTuple):
status: str