From: Tomer Haskalovitch Date: Thu, 29 Jan 2026 20:27:23 +0000 (+0200) Subject: pr fixes X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cc908c81ff4d5e1e1682e4d876fa4b656d0d5b8a;p=ceph-ci.git pr fixes --- diff --git a/src/pybind/mgr/dashboard/controllers/nvmeof.py b/src/pybind/mgr/dashboard/controllers/nvmeof.py index ad190a637e7..84289996cd9 100644 --- a/src/pybind/mgr/dashboard/controllers/nvmeof.py +++ b/src/pybind/mgr/dashboard/controllers/nvmeof.py @@ -534,8 +534,20 @@ else: ) @empty_response - @NvmeofCLICommand("nvmeof listener del", model.RequestStatus, - success_message_fn=build_listener_del_success_message) + @NvmeofCLICommand( + "nvmeof listener del", + model.RequestStatus, + success_message_template=( + "Deleting listener {traddr}:{trsvcid} from {nqn} {host_msg}: Successful" + ), + success_message_map={ + "traddr": lambda v, _f: escape_address_if_ipv6(v) if v is not None else "", + "host_msg": lambda _v, f: ( + "for all hosts" if f.get("host_name") == "*" + else f"for host {f.get('host_name')}" + ), + } + ) @EndpointDoc( "Delete an existing NVMeoF listener", parameters={ @@ -1062,9 +1074,18 @@ else: @ReadPermission @Endpoint('PUT', '{nsid}/change_visibility') @NvmeofCLICommand( - "nvmeof namespace change_visibility", model=model.RequestStatus, + "nvmeof namespace change_visibility", + model=model.RequestStatus, alias="nvmeof ns change_visibility", - success_message_fn=build_ns_change_visibility_success_message + success_message_template=( + 'Changing visibility of namespace {nsid} in {nqn} to "{auto_visible}": Successful' + ), + success_message_map={ + "auto_visible": lambda v, _f: ( + "visible to all hosts" if str_to_bool(v) + else "visible to selected hosts" + ) + } ) @EndpointDoc( "changes the visibility of the specified NVMeoF namespace to all or selected hosts", @@ -1103,9 +1124,18 @@ else: @ReadPermission @Endpoint('PUT', '{nsid}/set_auto_resize') @NvmeofCLICommand( - "nvmeof namespace set_auto_resize", model=model.RequestStatus, + "nvmeof namespace set_auto_resize", + model=model.RequestStatus, alias="nvmeof ns set_auto_resize", - success_message_fn=build_ns_set_auto_resize_success_message + success_message_template=( + 'Setting auto resize flag for namespace {nsid} in {nqn} to "{auto_resize_text}": Successful' + ), + success_message_map={ + "auto_resize_text": lambda _v, f: ( + "auto resize namespace" if str_to_bool(f.get("auto_resize_enabled")) + else "do not auto resize namespace" + ) + } ) @EndpointDoc( "Enable or disable namespace auto resize when RBD image is resized", @@ -1145,9 +1175,18 @@ else: @ReadPermission @Endpoint('PUT', '{nsid}/set_rbd_trash_image') @NvmeofCLICommand( - "nvmeof namespace set_rbd_trash_image", model=model.RequestStatus, + "nvmeof namespace set_rbd_trash_image", + model=model.RequestStatus, alias="nvmeof ns set_rbd_trash_image", - success_message_fn=build_ns_set_rbd_trash_image_success_message + success_message_template=( + 'Setting RBD trash image flag for namespace {nsid} in {nqn} to "{trash_text}": Successful' + ), + success_message_map={ + "trash_text": lambda _v, f: ( + "trash on namespace deletion" if str_to_bool(f.get("rbd_trash_image_on_delete")) + else "do not trash on namespace deletion" + ) + } ) @EndpointDoc( "changes the trash image on delete of the specified NVMeoF \ @@ -1388,8 +1427,28 @@ else: ) @empty_response - @NvmeofCLICommand("nvmeof host add", model.RequestStatus, - success_message_fn=build_host_add_success_message) + @NvmeofCLICommand( + "nvmeof host add", + model.RequestStatus, + success_message_template="{messages}", + success_message_map={ + "messages": lambda _v, f: ( + (lambda host_list, nqn: + "\n".join( + ( + f"Allowing open host access to {nqn}: Successful" + if h == "*" + else f"Adding host {h} to {nqn}: Successful" + ) for h in host_list + ) + )( + f.get("host_nqn") if isinstance(f.get("host_nqn"), list) + else ([f.get("host_nqn")] if f.get("host_nqn") else []), + f.get("nqn"), + ) + ) + } + ) @EndpointDoc( "Allow hosts to access an NVMeoF subsystem", parameters={ @@ -1415,7 +1474,28 @@ else: ) @empty_response - @NvmeofCLICommand("nvmeof host del", model.RequestStatus) + @NvmeofCLICommand( + "nvmeof host del", + model.RequestStatus, + success_message_template="{messages}", + success_message_map={ + "messages": lambda _v, f: ( + (lambda host_list, nqn: + "\n".join( + ( + f"Disabling open host access to {nqn}: Successful" + if h == "*" + else f"Removing host {h} access from {nqn}: Successful" + ) for h in host_list + ) + )( + f.get("host_nqn") if isinstance(f.get("host_nqn"), list) + else ([f.get("host_nqn")] if f.get("host_nqn") else []), + f.get("nqn"), + ) + ) + } + ) @EndpointDoc( "Disallow hosts from accessing an NVMeoF subsystem", parameters={ diff --git a/src/pybind/mgr/dashboard/services/nvmeof_cli.py b/src/pybind/mgr/dashboard/services/nvmeof_cli.py index dce57b104ef..418630c8867 100644 --- a/src/pybind/mgr/dashboard/services/nvmeof_cli.py +++ b/src/pybind/mgr/dashboard/services/nvmeof_cli.py @@ -249,6 +249,8 @@ class AnnotatedDataTextOutputFormatter(OutputFormatter): return self._convert_to_text_output(data, model) +DEFAULT_MAP_KEY = "__default__" # you can delete this if you want + class NvmeofCLICommand(CLICommand): desc: str @@ -259,8 +261,7 @@ class NvmeofCLICommand(CLICommand): perm: str = 'rw', poll: bool = False, success_message_template: Optional[str] = None, - success_message_fn: Optional[Callable[[Dict[str, Any]], str]] = None - ): + success_message_map: Optional[Dict[str, Any]] = None): super().__init__(prefix, perm, poll) self._output_formatter = AnnotatedDataTextOutputFormatter() self._model = model @@ -268,124 +269,62 @@ class NvmeofCLICommand(CLICommand): self._alias_cmd: Optional[NvmeofCLICommand] = None self._success_message_template = success_message_template - self._success_message_fn = success_message_fn + self._success_message_map = success_message_map or {} self._func_defaults: Dict[str, Any] = {} - def _use_api_endpoint_desc_if_available(self, func): - if not self.desc and hasattr(func, 'doc_info'): - self.desc = func.doc_info.get('summary', '') - - def __call__(self, func) -> HandlerFuncType: # type: ignore + def __call__(self, func): resp = super().__call__(func) self._func_defaults = self._compute_func_defaults() + if self._alias: self._alias_cmd = NvmeofCLICommand( self._alias, model=self._model, success_message_template=self._success_message_template, - success_message_fn=self._success_message_fn + success_message_map=self._success_message_map, ) - assert self._alias_cmd is not None self._alias_cmd(func) self._alias_cmd._func_defaults = self._alias_cmd._compute_func_defaults() self._use_api_endpoint_desc_if_available(func) return resp - - def _compute_func_defaults(self) -> Dict[str, Any]: - defaults: Dict[str, Any] = {} - sig = inspect.signature(self.func) - - for name, param in sig.parameters.items(): - if name in CLICommand.KNOWN_ARGS: - continue - if name not in self.arg_spec: - continue - if param.default is not inspect.Parameter.empty: - defaults[name] = param.default - - return defaults - - def _args_map_from_argspec(self, - cmd_dict: Dict[str, Any], - inbuf: Optional[str] = None) -> Dict[str, Any]: - kwargs, specials = self._collect_args_by_argspec(cmd_dict) - if inbuf and 'inbuf' in specials: - kwargs['inbuf'] = inbuf - - - return {**self._func_defaults, **kwargs} - - - def _stringify(self, value: Any) -> str: - if isinstance(value, (bytes, bytearray)): - try: - return value.decode('utf-8', errors='replace') - except Exception: - return str(value) - - if isinstance(value, (list, tuple)): + def _apply_single_map_spec(self, spec: Any, raw: Any, fields: Dict[str, Any]) -> Any: + """ + spec can be: + - callable(value, fields) + - literal value (e.g. string) + - dict mapping exact raw values to literal/callable + """ + if callable(spec): + return spec(raw, fields) + + if isinstance(spec, dict): + if raw in spec: + val = spec[raw] + return val(raw, fields) if callable(val) else val + # No default behavior — if key missing → return raw unchanged + return raw + + # simple literal replacement + return spec + + def _apply_success_message_map(self, fields: Dict[str, Any]) -> Dict[str, Any]: + out = dict(fields) + for field, spec in self._success_message_map.items(): + raw = out.get(field) try: - return ','.join(self._stringify(v) for v in value) + out[field] = self._apply_single_map_spec(spec, raw, out) except Exception: - return str(value) - - return str(value) + logger.warning("Failed applying success_message_map for field %s on %s", + field, self.prefix, exc_info=True) + return out - def _format_success_message_from_args(self, - args_map: Dict[str, Any], - response: Dict[str, Any]) -> Optional[str]: - if not self._success_message_template and not self._success_message_fn: + def _format_success_message_from_args(self, args_map, response): + if not self._success_message_template: return None - - if self._success_message_fn: - try: - msg = self._success_message_fn(args_map, response) - if msg: - return msg - except Exception: - logger.warning("Success message function failed for %s", self.prefix, exc_info=True) - - if self._success_message_template: - try: - fields_dict = {**args_map, **response} - str_map = {k: self._stringify(v) for k, v in fields_dict.items()} - return self._success_message_template.format(**str_map) - except Exception: - logger.warning("Success message template failed for %s", self.prefix, exc_info=True) - - return None - - def call(self, - mgr: Any, - cmd_dict: Dict[str, Any], - inbuf: Optional[str] = None) -> HandleCommandResult: - try: - out_format = cmd_dict.get('format') - args_map = self._args_map_from_argspec(cmd_dict, inbuf) - ret = super().call(mgr, cmd_dict, inbuf) - if out_format == 'plain' or not out_format: - message: Optional[str] = None - try: - message = self._format_success_message_from_args(args_map, ret) - except Exception: - logger.warning("Formatting of success message failed for %s", - self.prefix, exc_info=True) - if message: - out = message - else: - out = self._output_formatter.format_output(ret, self._model) - - elif out_format == 'json': - out = json.dumps(ret, indent=4) - elif out_format == 'yaml': - out = yaml.dump(ret) - else: - return HandleCommandResult(-errno.EINVAL, '', - f"format '{out_format}' is not implemented") - - return HandleCommandResult(0, out, '') - - except Exception as e: # pylint: disable=broad-except - return HandleCommandResult(-errno.EINVAL, '', str(e)) + fields = {**args_map, **response} + fields = self._apply_success_message_map(fields) + return self._success_message_template.format( + **{k: self._stringify(v) for k, v in fields.items()} + ) \ No newline at end of file diff --git a/src/pybind/mgr/dashboard/tests/test_nvmeof_cli.py b/src/pybind/mgr/dashboard/tests/test_nvmeof_cli.py index 10621f912e8..914971fe797 100644 --- a/src/pybind/mgr/dashboard/tests/test_nvmeof_cli.py +++ b/src/pybind/mgr/dashboard/tests/test_nvmeof_cli.py @@ -344,8 +344,8 @@ class TestNvmeofCLICommandSuccessMessage: assert test_cmd not in NvmeofCLICommand.COMMANDS assert test_alias not in NvmeofCLICommand.COMMANDS - def test_plain_uses_success_message_fn(self): - test_cmd = "nvmeof gw set_log_level fn" + def test_plain_uses_success_message_map_callable(self): + test_cmd = "nvmeof gw set_log_level map callable" class Model(NamedTuple): status: str @@ -353,10 +353,10 @@ class TestNvmeofCLICommandSuccessMessage: @NvmeofCLICommand( test_cmd, Model, - success_message_fn=lambda args, response: ( - f"set log level to {args.get('log_level', '')}" - + (" for all hosts" if args.get('all_hosts') else "") - ) + success_message_template="set log level to {log_level}{suffix}", + success_message_map={ + "suffix": lambda _v, f: " for all hosts" if f.get("all_hosts") else "" + } ) def fn(self, log_level: str, all_hosts: bool = False): # noqa return {"status": 0} @@ -372,6 +372,209 @@ class TestNvmeofCLICommandSuccessMessage: del NvmeofCLICommand.COMMANDS[test_cmd] assert test_cmd not in NvmeofCLICommand.COMMANDS + def test_success_message_map_dict_maps_exact_values(self): + test_cmd = "nvmeof ns change_visibility map dict" + + class Model(NamedTuple): + status: str + + @NvmeofCLICommand( + test_cmd, + Model, + success_message_template='visibility "{auto_visible}": Successful', + success_message_map={ + "auto_visible": { + True: "visible to all hosts", + False: "visible to selected hosts", + } + } + ) + def fn(self, auto_visible: bool): # noqa + return {"status": 0} + + res_true = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"format": "plain", "auto_visible": True} + ) + assert res_true.retval == 0 + assert res_true.stdout == 'visibility "visible to all hosts": Successful' + + res_false = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"format": "plain", "auto_visible": False} + ) + assert res_false.retval == 0 + assert res_false.stdout == 'visibility "visible to selected hosts": Successful' + + del NvmeofCLICommand.COMMANDS[test_cmd] + assert test_cmd not in NvmeofCLICommand.COMMANDS + + def test_success_message_map_dict_missing_key_leaves_raw_value(self): + test_cmd = "nvmeof map dict missing key" + + class Model(NamedTuple): + status: str + + @NvmeofCLICommand( + test_cmd, + Model, + success_message_template="val {x}", + success_message_map={ + "x": {1: "one"} # no mapping for 2 + } + ) + def fn(self, x: int): # noqa + return {"status": 0} + + res = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"format": "plain", "x": 2} + ) + assert res.retval == 0 + assert res.stdout == "val 2" + + del NvmeofCLICommand.COMMANDS[test_cmd] + assert test_cmd not in NvmeofCLICommand.COMMANDS + + def test_success_message_map_dict_value_callable(self): + test_cmd = "nvmeof map dict value callable" + + class Model(NamedTuple): + status: str + + @NvmeofCLICommand( + test_cmd, + Model, + success_message_template="host {host_name}", + success_message_map={ + "host_name": { + "*": "for all hosts", + "h1": (lambda v, _f: f"for host {v}"), + } + } + ) + def fn(self, host_name: str): # noqa + return {"status": 0} + + res_star = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"format": "plain", "host_name": "*"} + ) + assert res_star.stdout == "host for all hosts" + + res_h1 = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"format": "plain", "host_name": "h1"} + ) + assert res_h1.stdout == "host for host h1" + + del NvmeofCLICommand.COMMANDS[test_cmd] + assert test_cmd not in NvmeofCLICommand.COMMANDS + + def test_success_message_map_supports_derived_fields(self): + test_cmd = "nvmeof map derived field" + + class Model(NamedTuple): + status: str + + @NvmeofCLICommand( + test_cmd, + Model, + success_message_template="msg {derived}", + success_message_map={ + "derived": lambda _v, f: f"nqn={f.get('nqn')}", + } + ) + def fn(self, nqn: str): # noqa + return {"status": 0} + + res = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"format": "plain", "nqn": "subsys1"} + ) + assert res.retval == 0 + assert res.stdout == "msg nqn=subsys1" + + del NvmeofCLICommand.COMMANDS[test_cmd] + assert test_cmd not in NvmeofCLICommand.COMMANDS + + def test_alias_inherits_success_message_map(self): + test_cmd = "nvmeof map alias main" + test_alias = "nvmeof map alias alias" + + class Model(NamedTuple): + status: str + + @NvmeofCLICommand( + test_cmd, + Model, + alias=test_alias, + success_message_template="lvl {log_level}{suffix}", + success_message_map={ + "suffix": lambda _v, f: "!" if f.get("urgent") else "." + } + ) + def fn(self, log_level: str, urgent: bool = False): # noqa + return {"status": 0} + + res_main = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"format": "plain", "log_level": "debug", "urgent": True} + ) + assert isinstance(res_main, HandleCommandResult) + assert res_main.retval == 0 + assert res_main.stdout == "lvl debug!" + assert res_main.stderr == '' + + res_alias = NvmeofCLICommand.COMMANDS[test_alias].call( + MagicMock(), + {"format": "plain", "log_level": "warn", "urgent": False} + ) + assert isinstance(res_alias, HandleCommandResult) + assert res_alias.retval == 0 + assert res_alias.stdout == "lvl warn." + assert res_alias.stderr == '' + + del NvmeofCLICommand.COMMANDS[test_cmd] + del NvmeofCLICommand.COMMANDS[test_alias] + assert test_cmd not in NvmeofCLICommand.COMMANDS + assert test_alias not in NvmeofCLICommand.COMMANDS + + def test_map_failure_falls_back_to_default_output(self): + test_cmd = "nvmeof map failure fallback" + + class Model(NamedTuple): + a: str + + @NvmeofCLICommand( + test_cmd, + Model, + success_message_template="value {a}", + success_message_map={ + "a": lambda _v, _f: 1 / 0, # force exception + } + ) + def fn(self, a: str): # noqa + return {"a": "b"} + + res = NvmeofCLICommand.COMMANDS[test_cmd].call( + MagicMock(), + {"format": "plain", "a": "ignored"} + ) + assert res.retval == 0 + # falls back to default output formatter, same style as your other test + assert res.stdout == ( + "+-+\n" + "|A|\n" + "+-+\n" + "|b|\n" + "+-+" + ) + assert res.stderr == '' + + del NvmeofCLICommand.COMMANDS[test_cmd] + assert test_cmd not in NvmeofCLICommand.COMMANDS + def test_template_formats_int_and_list_without_failure(self): class Model(NamedTuple): status: str