From 4bf952b210d90abd071405ae6fe66f64c0b70f6f Mon Sep 17 00:00:00 2001 From: Tomer Haskalovitch Date: Thu, 8 May 2025 11:54:39 +0300 Subject: [PATCH] mgr/dashboard: add plain output type support to nvme cli Signed-off-by: Tomer Haskalovitch (cherry picked from commit 27698aec2b1aedf9124725709e85803213633232) --- .../mgr/dashboard/controllers/nvmeof.py | 48 ++--- src/pybind/mgr/dashboard/model/nvmeof.py | 65 ++++--- .../mgr/dashboard/services/nvmeof_cli.py | 169 ++++++++++++++++-- .../mgr/dashboard/services/nvmeof_client.py | 5 +- .../mgr/dashboard/tests/test_nvmeof_cli.py | 163 ++++++++++++++++- 5 files changed, 385 insertions(+), 65 deletions(-) diff --git a/src/pybind/mgr/dashboard/controllers/nvmeof.py b/src/pybind/mgr/dashboard/controllers/nvmeof.py index 700fc30daecd1..a0e1fb9404f88 100644 --- a/src/pybind/mgr/dashboard/controllers/nvmeof.py +++ b/src/pybind/mgr/dashboard/controllers/nvmeof.py @@ -32,7 +32,7 @@ else: @APIDoc("NVMe-oF Gateway Management API", "NVMe-oF Gateway") class NVMeoFGateway(RESTController): @EndpointDoc("Get information about the NVMeoF gateway") - @NvmeofCLICommand("nvmeof gw info") + @NvmeofCLICommand("nvmeof gw info", model.GatewayInfo) @convert_to_model(model.GatewayInfo) @handle_nvmeof_error def list(self, gw_group: Optional[str] = None, traddr: Optional[str] = None): @@ -55,7 +55,7 @@ else: @ReadPermission @Endpoint('GET', '/version') - @NvmeofCLICommand("nvmeof gw version") + @NvmeofCLICommand("nvmeof gw version", model.GatewayVersion) @convert_to_model(model.GatewayVersion) @handle_nvmeof_error def version(self, gw_group: Optional[str] = None, traddr: Optional[str] = None): @@ -68,7 +68,7 @@ else: @ReadPermission @Endpoint('GET', '/log_level') - @NvmeofCLICommand("nvmeof gw get_log_level") + @NvmeofCLICommand("nvmeof gw get_log_level", model.GatewayLogLevelInfo) @convert_to_model(model.GatewayLogLevelInfo) @handle_nvmeof_error def get_log_level(self, gw_group: Optional[str] = None, traddr: Optional[str] = None): @@ -80,7 +80,7 @@ else: @ReadPermission @Endpoint('PUT', '/log_level') - @NvmeofCLICommand("nvmeof gw set_log_level") + @NvmeofCLICommand("nvmeof gw set_log_level", model.RequestStatus) @convert_to_model(model.RequestStatus) @handle_nvmeof_error def set_log_level(self, log_level: str, gw_group: Optional[str] = None, @@ -97,7 +97,7 @@ else: class NVMeoFSpdk(RESTController): @ReadPermission @Endpoint('GET', '/log_level') - @NvmeofCLICommand("nvmeof spdk_log_level get") + @NvmeofCLICommand("nvmeof spdk_log_level get", model.SpdkNvmfLogFlagsAndLevelInfo) @convert_to_model(model.SpdkNvmfLogFlagsAndLevelInfo) @handle_nvmeof_error def get_spdk_log_level(self, gw_group: Optional[str] = None, traddr: Optional[str] = None): @@ -109,7 +109,7 @@ else: @ReadPermission @Endpoint('PUT', '/log_level') - @NvmeofCLICommand("nvmeof spdk_log_level set") + @NvmeofCLICommand("nvmeof spdk_log_level set", model.RequestStatus) @convert_to_model(model.RequestStatus) @handle_nvmeof_error def set_spdk_log_level(self, log_level: Optional[str] = None, @@ -126,7 +126,7 @@ else: @ReadPermission @Endpoint('PUT', '/log_level/disable') - @NvmeofCLICommand("nvmeof spdk_log_level disable") + @NvmeofCLICommand("nvmeof spdk_log_level disable", model.RequestStatus) @convert_to_model(model.RequestStatus) @handle_nvmeof_error def disable_spdk_log_level(self, gw_group: Optional[str] = None, @@ -142,7 +142,7 @@ else: class NVMeoFSubsystem(RESTController): @EndpointDoc("List all NVMeoF subsystems") @pick(field="subsystems") - @NvmeofCLICommand("nvmeof subsystem list") + @NvmeofCLICommand("nvmeof subsystem list", model.SubsystemList) @convert_to_model(model.SubsystemList) @handle_nvmeof_error def list(self, gw_group: Optional[str] = None, traddr: Optional[str] = None): @@ -158,7 +158,7 @@ else: }, ) @pick(field="subsystems", first=True) - @NvmeofCLICommand("nvmeof subsystem get") + @NvmeofCLICommand("nvmeof subsystem get", model.SubsystemList) @convert_to_model(model.SubsystemList) @handle_nvmeof_error def get(self, nqn: str, gw_group: Optional[str] = None, traddr: Optional[str] = None): @@ -176,7 +176,7 @@ else: }, ) @empty_response - @NvmeofCLICommand("nvmeof subsystem add") + @NvmeofCLICommand("nvmeof subsystem add", model.RequestStatus) @convert_to_model(model.RequestStatus) @handle_nvmeof_error def create(self, nqn: str, enable_ha: bool = True, max_namespaces: int = 1024, @@ -196,7 +196,7 @@ else: }, ) @empty_response - @NvmeofCLICommand("nvmeof subsystem del") + @NvmeofCLICommand("nvmeof subsystem del", model.RequestStatus) @convert_to_model(model.RequestStatus) @handle_nvmeof_error def delete(self, nqn: str, force: Optional[str] = "false", gw_group: Optional[str] = None, @@ -218,7 +218,7 @@ else: }, ) @pick("listeners") - @NvmeofCLICommand("nvmeof listener list") + @NvmeofCLICommand("nvmeof listener list", model.ListenerList) @convert_to_model(model.ListenerList) @handle_nvmeof_error def list(self, nqn: str, gw_group: Optional[str] = None, traddr: Optional[str] = None): @@ -238,7 +238,7 @@ else: }, ) @empty_response - @NvmeofCLICommand("nvmeof listener add") + @NvmeofCLICommand("nvmeof listener add", model.RequestStatus) @convert_to_model(model.RequestStatus) @handle_nvmeof_error def create( @@ -272,7 +272,7 @@ else: }, ) @empty_response - @NvmeofCLICommand("nvmeof listener del") + @NvmeofCLICommand("nvmeof listener del", model.RequestStatus) @convert_to_model(model.RequestStatus) @handle_nvmeof_error def delete( @@ -307,7 +307,7 @@ else: }, ) @pick("namespaces") - @NvmeofCLICommand("nvmeof ns list") + @NvmeofCLICommand("nvmeof ns list", model.NamespaceList) @convert_to_model(model.NamespaceList) @handle_nvmeof_error def list(self, nqn: str, gw_group: Optional[str] = None, traddr: Optional[str] = None): @@ -324,7 +324,7 @@ else: }, ) @pick("namespaces", first=True) - @NvmeofCLICommand("nvmeof ns get") + @NvmeofCLICommand("nvmeof ns get", model.NamespaceList) @convert_to_model(model.NamespaceList) @handle_nvmeof_error def get(self, nqn: str, nsid: str, gw_group: Optional[str] = None, @@ -343,7 +343,7 @@ else: "gw_group": Param(str, "NVMeoF gateway group", True, None), }, ) - @NvmeofCLICommand("nvmeof ns get_io_stats") + @NvmeofCLICommand("nvmeof ns get_io_stats", model.NamespaceIOStats) @convert_to_model(model.NamespaceIOStats) @handle_nvmeof_error def io_stats(self, nqn: str, nsid: str, gw_group: Optional[str] = None, @@ -376,7 +376,7 @@ else: ) }, ) - @NvmeofCLICommand("nvmeof ns add") + @NvmeofCLICommand("nvmeof ns add", model.NamespaceCreation) @convert_to_model(model.NamespaceCreation) @handle_nvmeof_error def create( @@ -426,7 +426,7 @@ else: }, ) @pick("namespaces", first=True) - @NvmeofCLICommand("nvmeof ns update") + @NvmeofCLICommand("nvmeof ns update", model.NamespaceList) @convert_to_model(model.NamespaceList) @handle_nvmeof_error def update( @@ -510,7 +510,7 @@ else: }, ) @empty_response - @NvmeofCLICommand("nvmeof ns del") + @NvmeofCLICommand("nvmeof ns del", model.RequestStatus) @convert_to_model(model.RequestStatus) @handle_nvmeof_error def delete( @@ -546,7 +546,7 @@ else: }, ) @pick('hosts') - @NvmeofCLICommand("nvmeof host list") + @NvmeofCLICommand("nvmeof host list", model.HostsInfo) @convert_to_model(model.HostsInfo, finalize=_update_hosts) @handle_nvmeof_error def list(self, nqn: str, gw_group: Optional[str] = None, traddr: Optional[str] = None): @@ -563,7 +563,7 @@ else: }, ) @empty_response - @NvmeofCLICommand("nvmeof host add") + @NvmeofCLICommand("nvmeof host add", model.RequestStatus) @convert_to_model(model.RequestStatus) @handle_nvmeof_error def create(self, nqn: str, host_nqn: str, gw_group: Optional[str] = None, @@ -581,7 +581,7 @@ else: }, ) @empty_response - @NvmeofCLICommand("nvmeof host del") + @NvmeofCLICommand("nvmeof host del", model.RequestStatus) @convert_to_model(model.RequestStatus) @handle_nvmeof_error def delete(self, nqn: str, host_nqn: str, gw_group: Optional[str] = None, @@ -601,7 +601,7 @@ else: }, ) @pick("connections") - @NvmeofCLICommand("nvmeof connection list") + @NvmeofCLICommand("nvmeof connection list", model.ConnectionList) @convert_to_model(model.ConnectionList) @handle_nvmeof_error def list(self, nqn: str, gw_group: Optional[str] = None, traddr: Optional[str] = None): diff --git a/src/pybind/mgr/dashboard/model/nvmeof.py b/src/pybind/mgr/dashboard/model/nvmeof.py index 05cd785605e89..0c3cf4c7907d7 100644 --- a/src/pybind/mgr/dashboard/model/nvmeof.py +++ b/src/pybind/mgr/dashboard/model/nvmeof.py @@ -1,4 +1,17 @@ -from typing import List, NamedTuple, Optional +from enum import Flag, auto +from typing import Annotated, List, NamedTuple, Optional + + +class CliFlags(Flag): + DROP = auto() + EXCLUSIVE_LIST = auto() + EXCLUSIVE_RESULT = auto() + SIZE = auto() + + +class CliHeader: + def __init__(self, label: str): + self.label = label class GatewayInfo(NamedTuple): @@ -6,13 +19,13 @@ class GatewayInfo(NamedTuple): status: int error_message: str hostname: str - cli_version: str + cli_version: Annotated[str, CliFlags.DROP] version: str name: str group: str addr: str port: int - load_balancing_group: int + load_balancing_group: Annotated[int, CliHeader('LB Group')] max_hosts: int max_hosts_per_subsystem: int max_namespaces: int @@ -48,11 +61,11 @@ class SpdkNvmfLogFlagsAndLevelInfo(NamedTuple): class Subsystem(NamedTuple): nqn: str - enable_ha: bool + enable_ha: Annotated[bool, CliFlags.DROP] serial_number: str model_number: str - min_cntlid: int - max_cntlid: int + min_cntlid: Annotated[int, CliFlags.DROP] + max_cntlid: Annotated[int, CliFlags.DROP] namespace_count: int subtype: str max_namespaces: int @@ -64,7 +77,7 @@ class Subsystem(NamedTuple): class SubsystemList(NamedTuple): status: int error_message: str - subsystems: List[Subsystem] + subsystems: Annotated[List[Subsystem], CliFlags.EXCLUSIVE_LIST] class Connection(NamedTuple): @@ -81,26 +94,26 @@ class ConnectionList(NamedTuple): status: int error_message: str subsystem_nqn: str - connections: List[Connection] + connections: Annotated[List[Connection], CliFlags.EXCLUSIVE_LIST] class NamespaceCreation(NamedTuple): - status: int + status: Annotated[int, CliFlags.EXCLUSIVE_RESULT] error_message: str nsid: int class Namespace(NamedTuple): bdev_name: str - rbd_image_name: str - rbd_pool_name: str - load_balancing_group: int - rbd_image_size: int - block_size: int - rw_ios_per_second: int - rw_mbytes_per_second: int - r_mbytes_per_second: int - w_mbytes_per_second: int + rbd_image_name: Annotated[str, CliHeader("RBD Image")] + rbd_pool_name: Annotated[str, CliHeader("RBD Pool")] + load_balancing_group: Annotated[int, CliHeader('LB Group')] + rbd_image_size: Annotated[int, CliFlags.SIZE] + block_size: Annotated[int, CliFlags.SIZE] + rw_ios_per_second: Annotated[int, CliHeader('R/W IOs/sec')] + rw_mbytes_per_second: Annotated[int, CliHeader('R/W MBs/sec')] + r_mbytes_per_second: Annotated[int, CliHeader('Read MBs/sec')] + w_mbytes_per_second: Annotated[int, CliHeader('Write MBs/sec')] auto_visible: bool hosts: List[str] nsid: Optional[int] @@ -112,23 +125,23 @@ class Namespace(NamedTuple): class NamespaceList(NamedTuple): status: int error_message: str - namespaces: List[Namespace] + namespaces: Annotated[List[Namespace], CliFlags.EXCLUSIVE_LIST] class NamespaceIOStats(NamedTuple): - status: int - error_message: str + status: Annotated[int, CliFlags.DROP] + error_message: Annotated[str, CliFlags.DROP] subsystem_nqn: str nsid: int uuid: str bdev_name: str tick_rate: int ticks: int - bytes_read: int + bytes_read: Annotated[int, CliFlags.SIZE] num_read_ops: int - bytes_written: int + bytes_written: Annotated[int, CliFlags.SIZE] num_write_ops: int - bytes_unmapped: int + bytes_unmapped: Annotated[int, CliFlags.SIZE] num_unmap_ops: int read_latency_ticks: int max_read_latency_ticks: int @@ -157,7 +170,7 @@ class Listener(NamedTuple): class ListenerList(NamedTuple): status: int error_message: str - listeners: List[Listener] + listeners: Annotated[List[Listener], CliFlags.EXCLUSIVE_LIST] class Host(NamedTuple): @@ -175,5 +188,5 @@ class HostsInfo(NamedTuple): class RequestStatus(NamedTuple): - status: int + status: Annotated[int, CliFlags.EXCLUSIVE_RESULT] error_message: str diff --git a/src/pybind/mgr/dashboard/services/nvmeof_cli.py b/src/pybind/mgr/dashboard/services/nvmeof_cli.py index b181fb0f5fe1d..f1887d86da219 100644 --- a/src/pybind/mgr/dashboard/services/nvmeof_cli.py +++ b/src/pybind/mgr/dashboard/services/nvmeof_cli.py @@ -1,13 +1,17 @@ # -*- coding: utf-8 -*- import errno import json -from typing import Any, Dict, Optional +from abc import ABC, abstractmethod +from typing import Annotated, Any, Dict, List, NamedTuple, Optional, Type, \ + Union, get_args, get_origin, get_type_hints import yaml from mgr_module import CLICheckNonemptyFileInput, CLICommand, CLIReadCommand, \ CLIWriteCommand, HandleCommandResult, HandlerFuncType +from prettytable import PrettyTable from ..exceptions import DashboardException +from ..model.nvmeof import CliFlags, CliHeader from ..rest_client import RequestException from .nvmeof_conf import ManagedByOrchestratorException, \ NvmeofGatewayAlreadyExists, NvmeofGatewaysConfig @@ -51,7 +55,154 @@ def remove_nvmeof_gateway(_, name: str, daemon_name: str = ''): return -errno.EINVAL, '', str(ex) +def convert_from_bytes(num_in_bytes): + units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] + size = float(num_in_bytes) + unit_index = 0 + + while size >= 1024 and unit_index < len(units) - 1: + size /= 1024 + unit_index += 1 + + # Round to no decimal if it's an integer, otherwise show 1 decimal place + if size.is_integer(): + size_str = f"{int(size)}" + else: + size_str = f"{size:.1f}" + + return f"{size_str}{units[unit_index]}" + + +class OutputFormatter(ABC): + @abstractmethod + def format_output(self, data, model): + """Format the given data for output.""" + raise NotImplementedError() + + +class AnnotatedDataTextOutputFormatter(OutputFormatter): + def _snake_case_to_title(self, s): + return s.replace('_', ' ').title() + + def _create_table(self, field_names): + table = PrettyTable(border=True) + titles = [self._snake_case_to_title(field) for field in field_names] + table.field_names = titles + table.align = 'l' + table.padding_width = 0 + return table + + def _get_text_output(self, data): + if isinstance(data, list): + return self._get_list_text_output(data) + return self._get_object_text_output(data) + + def _get_list_text_output(self, data): + columns = list(dict.fromkeys([key for obj in data for key in obj.keys()])) + table = self._create_table(columns) + for d in data: + row = [] + for col in columns: + row.append(str(d.get(col))) + table.add_row(row) + return table.get_string() + + def _get_object_text_output(self, data): + columns = [k for k in data.keys() if k not in ["status", "error_message"]] + table = self._create_table(columns) + row = [] + for col in columns: + row.append(str(data.get(col))) + table.add_row(row) + return table.get_string() + + def _is_list_of_complex_type(self, value): + if not isinstance(value, list): + return False + + if not value: + return None + + primitives = (int, float, str, bool, bytes) + + return not isinstance(value[0], primitives) + + def _select_list_field(self, data: Dict) -> Optional[str]: + for key, value in data.items(): + if self._is_list_of_complex_type(value): + return key + return None + + def is_namedtuple_type(self, obj): + return isinstance(obj, type) and issubclass(obj, tuple) and hasattr(obj, '_fields') + + # pylint: disable=too-many-branches + def process_dict(self, input_dict: dict, + nt_class: Type[NamedTuple], + is_top_level: bool) -> Union[Dict, str, List]: + result: Dict = {} + if not input_dict: + return result + hints = get_type_hints(nt_class, include_extras=True) + + for field, type_hint in hints.items(): + if field not in input_dict: + continue + + value = input_dict[field] + origin = get_origin(type_hint) + + actual_type = type_hint + annotations = [] + output_name = field + skip = False + + if origin is Annotated: + actual_type, *annotations = get_args(type_hint) + for annotation in annotations: + if annotation == CliFlags.DROP: + skip = True + break + if isinstance(annotation, CliHeader): + output_name = annotation.label + elif is_top_level and annotation == CliFlags.EXCLUSIVE_LIST: + assert get_origin(actual_type) == list + assert len(get_args(actual_type)) == 1 + return [self.process_dict(item, get_args(actual_type)[0], + False) for item in value] + elif is_top_level and annotation == CliFlags.EXCLUSIVE_RESULT: + return f"Failure: {input_dict.get('error_message')}" if bool( + input_dict[field]) else "Success" + elif annotation == CliFlags.SIZE: + value = convert_from_bytes(int(input_dict[field])) + + if skip: + continue + + # If it's a nested namedtuple and value is a dict, recurse + if self.is_namedtuple_type(actual_type) and isinstance(value, dict): + result[output_name] = self.process_dict(value, actual_type, False) + else: + result[output_name] = value + + return result + + def _convert_to_text_output(self, data, model): + data = self.process_dict(data, model, True) + if isinstance(data, str): + return data + return self._get_text_output(data) + + def format_output(self, data, model): + return self._convert_to_text_output(data, model) + + class NvmeofCLICommand(CLICommand): + def __init__(self, prefix, model: Type[NamedTuple], perm='rw', poll=False): + super().__init__(prefix, perm, poll) + self._output_formatter = AnnotatedDataTextOutputFormatter() + self._model = model + def __call__(self, func) -> HandlerFuncType: # type: ignore # pylint: disable=useless-super-delegation """ @@ -69,16 +220,14 @@ class NvmeofCLICommand(CLICommand): try: ret = super().call(mgr, cmd_dict, inbuf) out_format = cmd_dict.get('format') - if out_format == 'json' or not out_format: - if ret is None: - out = '' - else: - out = json.dumps(ret) + if ret is None: + out = '' + if out_format == 'plain' or not out_format: + out = self._output_formatter.format_output(ret, self._model) + elif out_format == 'json': + out = json.dumps(ret) elif out_format == 'yaml': - if ret is None: - out = '' - else: - out = yaml.dump(ret) + out = yaml.dump(ret) else: return HandleCommandResult(-errno.EINVAL, '', f"format '{out_format}' is not implemented") diff --git a/src/pybind/mgr/dashboard/services/nvmeof_client.py b/src/pybind/mgr/dashboard/services/nvmeof_client.py index 46ff2493927b3..fb079cfca6468 100644 --- a/src/pybind/mgr/dashboard/services/nvmeof_client.py +++ b/src/pybind/mgr/dashboard/services/nvmeof_client.py @@ -2,7 +2,8 @@ import functools import logging -from typing import Any, Callable, Dict, Generator, List, NamedTuple, Optional, Type +from typing import Annotated, Any, Callable, Dict, Generator, List, \ + NamedTuple, Optional, Type, get_args, get_origin from ..exceptions import DashboardException from .nvmeof_conf import NvmeofGatewaysConfig @@ -166,6 +167,8 @@ else: field_values = {} for field, field_type in zip(target_type._fields, target_type.__annotations__.values()): + if get_origin(field_type) == Annotated: + field_type = get_args(field_type)[0] # these conditions are complex since we need to navigate between dicts, # empty dicts and objects if isinstance(data, dict) and data.get(field) is not None: diff --git a/src/pybind/mgr/dashboard/tests/test_nvmeof_cli.py b/src/pybind/mgr/dashboard/tests/test_nvmeof_cli.py index 1dfbb0af3b015..bceaa7f04c127 100644 --- a/src/pybind/mgr/dashboard/tests/test_nvmeof_cli.py +++ b/src/pybind/mgr/dashboard/tests/test_nvmeof_cli.py @@ -1,12 +1,15 @@ import errno import json import unittest +from typing import Annotated, List, NamedTuple from unittest.mock import MagicMock import pytest from mgr_module import CLICommand, HandleCommandResult -from ..services.nvmeof_cli import NvmeofCLICommand +from ..model.nvmeof import CliFlags, CliHeader +from ..services.nvmeof_cli import AnnotatedDataTextOutputFormatter, \ + NvmeofCLICommand, convert_from_bytes from ..tests import CLICommandTestMixin @@ -14,7 +17,11 @@ from ..tests import CLICommandTestMixin def fixture_sample_command(): test_cmd = "test command" - @NvmeofCLICommand(test_cmd) + class Model(NamedTuple): + a: str + b: int + + @NvmeofCLICommand(test_cmd, Model) def func(_): # noqa # pylint: disable=unused-variable return {'a': '1', 'b': 2} yield test_cmd @@ -49,7 +56,27 @@ class TestNvmeofCLICommand: result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {}) assert isinstance(result, HandleCommandResult) assert result.retval == 0 - assert result.stdout == '{"a": "b"}' + assert result.stdout == ( + "+-+\n" + "|A|\n" + "+-+\n" + "|b|\n" + "+-+" + ) + assert result.stderr == '' + base_call_mock.assert_called_once() + + def test_command_return_cmd_result_plain_format(self, base_call_mock, sample_command): + result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {'format': 'plain'}) + assert isinstance(result, HandleCommandResult) + assert result.retval == 0 + assert result.stdout == ( + "+-+\n" + "|A|\n" + "+-+\n" + "|b|\n" + "+-+" + ) assert result.stderr == '' base_call_mock.assert_called_once() @@ -85,7 +112,13 @@ class TestNvmeofCLICommand: result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {}) assert isinstance(result, HandleCommandResult) assert result.retval == 0 - assert result.stdout == '' + assert result.stdout == ( + "++\n" + "||\n" + "++\n" + "\n" + "++" + ) assert result.stderr == '' base_call_return_none_mock.assert_called_once() @@ -261,3 +294,125 @@ class TestNVMeoFConfCLI(unittest.TestCase, CLICommandTestMixin): self.assertEqual( config['gateways'], {} ) + + +class TestAnnotatedDataTextOutputFormatter(): + def test_size_bytes_annotation(self): + class Sample(NamedTuple): + name: str + age: int + byte: Annotated[int, CliFlags.SIZE] + + data = {'name': 'Alice', 'age': 30, "byte": 20971520} + + formatter = AnnotatedDataTextOutputFormatter() + output = formatter.format_output(data, Sample) + assert output == ( + '+-----+---+----+\n' + '|Name |Age|Byte|\n' + '+-----+---+----+\n' + '|Alice|30 |20MB|\n' + '+-----+---+----+' + ) + + def test_drop_annotation(self): + class Sample(NamedTuple): + name: str + age: Annotated[int, CliFlags.DROP] + + data = {'name': 'Alice', 'age': 30} + + formatter = AnnotatedDataTextOutputFormatter() + output = formatter.format_output(data, Sample) + assert output == ( + '+-----+\n' + '|Name |\n' + '+-----+\n' + '|Alice|\n' + '+-----+' + ) + + def test_multiple_annotations(self): + class Sample(NamedTuple): + name: str + age: Annotated[int, CliFlags.SIZE, CliHeader('test')] + + data = {'name': 'Alice', 'age': 1024*1024} + + formatter = AnnotatedDataTextOutputFormatter() + output = formatter.format_output(data, Sample) + assert output == ( + '+-----+----+\n' + '|Name |Test|\n' + '+-----+----+\n' + '|Alice|1MB |\n' + '+-----+----+' + ) + + def test_override_header_annotation(self): + class Sample(NamedTuple): + name: str + age: Annotated[int, CliHeader('test')] + + data = {'name': 'Alice', 'age': 30} + + formatter = AnnotatedDataTextOutputFormatter() + output = formatter.format_output(data, Sample) + assert output == ( + '+-----+----+\n' + '|Name |Test|\n' + '+-----+----+\n' + '|Alice|30 |\n' + '+-----+----+' + ) + + def test_override_exclusive_list_field_annotation(self): + class Sample(NamedTuple): + name: str + age: int + + class SampleList(NamedTuple): + status: int + error_message: str + samples: Annotated[List[Sample], CliFlags.EXCLUSIVE_LIST] + + data = {"status": 0, "error_message": '', + "samples": [{'name': 'Alice', 'age': 30}, + {'name': 'Bob', 'age': 40}]} + + formatter = AnnotatedDataTextOutputFormatter() + output = formatter.format_output(data, SampleList) + assert output == ( + '+-----+---+\n' + '|Name |Age|\n' + '+-----+---+\n' + '|Alice|30 |\n' + '|Bob |40 |\n' + '+-----+---+' + ) + + def test_exclusive_result_indicator_annotation(self): + class Sample(NamedTuple): + name: str + status: Annotated[int, CliFlags.EXCLUSIVE_RESULT] + error_message: str + + data = {'name': 'Alice', 'status': 30, "error_message": 'bla'} + + formatter = AnnotatedDataTextOutputFormatter() + output = formatter.format_output(data, Sample) + assert output == 'Failure: bla' + + data = {'name': 'Alice', 'status': 0} + output = formatter.format_output(data, Sample) + + assert output == 'Success' + + +class TestConverFromBytes: + def test_valid_inputs(self): + assert convert_from_bytes(209715200) == '200MB' + assert convert_from_bytes(219715200) == '209.5MB' + assert convert_from_bytes(1048576) == '1MB' + assert convert_from_bytes(123) == '123B' + assert convert_from_bytes(5368709120) == '5GB' -- 2.39.5