@APIDoc("NVMe-oF Gateway Management API", "NVMe-oF Gateway")
class NVMeoFGateway(RESTController):
@EndpointDoc("Get information about the NVMeoF gateway")
- @NvmeofCLICommand("nvmeof gw info")
+ @NvmeofCLICommand("nvmeof gw info", model.GatewayInfo)
@convert_to_model(model.GatewayInfo)
@handle_nvmeof_error
def list(self, gw_group: Optional[str] = None, traddr: Optional[str] = None):
@ReadPermission
@Endpoint('GET', '/version')
- @NvmeofCLICommand("nvmeof gw version")
+ @NvmeofCLICommand("nvmeof gw version", model.GatewayVersion)
@convert_to_model(model.GatewayVersion)
@handle_nvmeof_error
def version(self, gw_group: Optional[str] = None, traddr: Optional[str] = None):
@ReadPermission
@Endpoint('GET', '/log_level')
- @NvmeofCLICommand("nvmeof gw get_log_level")
+ @NvmeofCLICommand("nvmeof gw get_log_level", model.GatewayLogLevelInfo)
@convert_to_model(model.GatewayLogLevelInfo)
@handle_nvmeof_error
def get_log_level(self, gw_group: Optional[str] = None, traddr: Optional[str] = None):
@ReadPermission
@Endpoint('PUT', '/log_level')
- @NvmeofCLICommand("nvmeof gw set_log_level")
+ @NvmeofCLICommand("nvmeof gw set_log_level", model.RequestStatus)
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
def set_log_level(self, log_level: str, gw_group: Optional[str] = None,
class NVMeoFSpdk(RESTController):
@ReadPermission
@Endpoint('GET', '/log_level')
- @NvmeofCLICommand("nvmeof spdk_log_level get")
+ @NvmeofCLICommand("nvmeof spdk_log_level get", model.SpdkNvmfLogFlagsAndLevelInfo)
@convert_to_model(model.SpdkNvmfLogFlagsAndLevelInfo)
@handle_nvmeof_error
def get_spdk_log_level(self, gw_group: Optional[str] = None, traddr: Optional[str] = None):
@ReadPermission
@Endpoint('PUT', '/log_level')
- @NvmeofCLICommand("nvmeof spdk_log_level set")
+ @NvmeofCLICommand("nvmeof spdk_log_level set", model.RequestStatus)
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
def set_spdk_log_level(self, log_level: Optional[str] = None,
@ReadPermission
@Endpoint('PUT', '/log_level/disable')
- @NvmeofCLICommand("nvmeof spdk_log_level disable")
+ @NvmeofCLICommand("nvmeof spdk_log_level disable", model.RequestStatus)
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
def disable_spdk_log_level(self, gw_group: Optional[str] = None,
class NVMeoFSubsystem(RESTController):
@EndpointDoc("List all NVMeoF subsystems")
@pick(field="subsystems")
- @NvmeofCLICommand("nvmeof subsystem list")
+ @NvmeofCLICommand("nvmeof subsystem list", model.SubsystemList)
@convert_to_model(model.SubsystemList)
@handle_nvmeof_error
def list(self, gw_group: Optional[str] = None, traddr: Optional[str] = None):
},
)
@pick(field="subsystems", first=True)
- @NvmeofCLICommand("nvmeof subsystem get")
+ @NvmeofCLICommand("nvmeof subsystem get", model.SubsystemList)
@convert_to_model(model.SubsystemList)
@handle_nvmeof_error
def get(self, nqn: str, gw_group: Optional[str] = None, traddr: Optional[str] = None):
},
)
@empty_response
- @NvmeofCLICommand("nvmeof subsystem add")
+ @NvmeofCLICommand("nvmeof subsystem add", model.RequestStatus)
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
def create(self, nqn: str, enable_ha: bool = True, max_namespaces: int = 1024,
},
)
@empty_response
- @NvmeofCLICommand("nvmeof subsystem del")
+ @NvmeofCLICommand("nvmeof subsystem del", model.RequestStatus)
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
def delete(self, nqn: str, force: Optional[str] = "false", gw_group: Optional[str] = None,
},
)
@pick("listeners")
- @NvmeofCLICommand("nvmeof listener list")
+ @NvmeofCLICommand("nvmeof listener list", model.ListenerList)
@convert_to_model(model.ListenerList)
@handle_nvmeof_error
def list(self, nqn: str, gw_group: Optional[str] = None, traddr: Optional[str] = None):
},
)
@empty_response
- @NvmeofCLICommand("nvmeof listener add")
+ @NvmeofCLICommand("nvmeof listener add", model.RequestStatus)
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
def create(
},
)
@empty_response
- @NvmeofCLICommand("nvmeof listener del")
+ @NvmeofCLICommand("nvmeof listener del", model.RequestStatus)
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
def delete(
},
)
@pick("namespaces")
- @NvmeofCLICommand("nvmeof ns list")
+ @NvmeofCLICommand("nvmeof ns list", model.NamespaceList)
@convert_to_model(model.NamespaceList)
@handle_nvmeof_error
def list(self, nqn: str, gw_group: Optional[str] = None, traddr: Optional[str] = None):
},
)
@pick("namespaces", first=True)
- @NvmeofCLICommand("nvmeof ns get")
+ @NvmeofCLICommand("nvmeof ns get", model.NamespaceList)
@convert_to_model(model.NamespaceList)
@handle_nvmeof_error
def get(self, nqn: str, nsid: str, gw_group: Optional[str] = None,
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
- @NvmeofCLICommand("nvmeof ns get_io_stats")
+ @NvmeofCLICommand("nvmeof ns get_io_stats", model.NamespaceIOStats)
@convert_to_model(model.NamespaceIOStats)
@handle_nvmeof_error
def io_stats(self, nqn: str, nsid: str, gw_group: Optional[str] = None,
)
},
)
- @NvmeofCLICommand("nvmeof ns add")
+ @NvmeofCLICommand("nvmeof ns add", model.NamespaceCreation)
@convert_to_model(model.NamespaceCreation)
@handle_nvmeof_error
def create(
},
)
@pick("namespaces", first=True)
- @NvmeofCLICommand("nvmeof ns update")
+ @NvmeofCLICommand("nvmeof ns update", model.NamespaceList)
@convert_to_model(model.NamespaceList)
@handle_nvmeof_error
def update(
},
)
@empty_response
- @NvmeofCLICommand("nvmeof ns del")
+ @NvmeofCLICommand("nvmeof ns del", model.RequestStatus)
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
def delete(
},
)
@pick('hosts')
- @NvmeofCLICommand("nvmeof host list")
+ @NvmeofCLICommand("nvmeof host list", model.HostsInfo)
@convert_to_model(model.HostsInfo, finalize=_update_hosts)
@handle_nvmeof_error
def list(self, nqn: str, gw_group: Optional[str] = None, traddr: Optional[str] = None):
},
)
@empty_response
- @NvmeofCLICommand("nvmeof host add")
+ @NvmeofCLICommand("nvmeof host add", model.RequestStatus)
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
def create(self, nqn: str, host_nqn: str, gw_group: Optional[str] = None,
},
)
@empty_response
- @NvmeofCLICommand("nvmeof host del")
+ @NvmeofCLICommand("nvmeof host del", model.RequestStatus)
@convert_to_model(model.RequestStatus)
@handle_nvmeof_error
def delete(self, nqn: str, host_nqn: str, gw_group: Optional[str] = None,
},
)
@pick("connections")
- @NvmeofCLICommand("nvmeof connection list")
+ @NvmeofCLICommand("nvmeof connection list", model.ConnectionList)
@convert_to_model(model.ConnectionList)
@handle_nvmeof_error
def list(self, nqn: str, gw_group: Optional[str] = None, traddr: Optional[str] = None):
-from typing import List, NamedTuple, Optional
+from enum import Flag, auto
+from typing import Annotated, List, NamedTuple, Optional
+
+
+class CliFlags(Flag):
+ DROP = auto()
+ EXCLUSIVE_LIST = auto()
+ EXCLUSIVE_RESULT = auto()
+ SIZE = auto()
+
+
+class CliHeader:
+ def __init__(self, label: str):
+ self.label = label
class GatewayInfo(NamedTuple):
status: int
error_message: str
hostname: str
- cli_version: str
+ cli_version: Annotated[str, CliFlags.DROP]
version: str
name: str
group: str
addr: str
port: int
- load_balancing_group: int
+ load_balancing_group: Annotated[int, CliHeader('LB Group')]
max_hosts: int
max_hosts_per_subsystem: int
max_namespaces: int
class Subsystem(NamedTuple):
nqn: str
- enable_ha: bool
+ enable_ha: Annotated[bool, CliFlags.DROP]
serial_number: str
model_number: str
- min_cntlid: int
- max_cntlid: int
+ min_cntlid: Annotated[int, CliFlags.DROP]
+ max_cntlid: Annotated[int, CliFlags.DROP]
namespace_count: int
subtype: str
max_namespaces: int
class SubsystemList(NamedTuple):
status: int
error_message: str
- subsystems: List[Subsystem]
+ subsystems: Annotated[List[Subsystem], CliFlags.EXCLUSIVE_LIST]
class Connection(NamedTuple):
status: int
error_message: str
subsystem_nqn: str
- connections: List[Connection]
+ connections: Annotated[List[Connection], CliFlags.EXCLUSIVE_LIST]
class NamespaceCreation(NamedTuple):
- status: int
+ status: Annotated[int, CliFlags.EXCLUSIVE_RESULT]
error_message: str
nsid: int
class Namespace(NamedTuple):
bdev_name: str
- rbd_image_name: str
- rbd_pool_name: str
- load_balancing_group: int
- rbd_image_size: int
- block_size: int
- rw_ios_per_second: int
- rw_mbytes_per_second: int
- r_mbytes_per_second: int
- w_mbytes_per_second: int
+ rbd_image_name: Annotated[str, CliHeader("RBD Image")]
+ rbd_pool_name: Annotated[str, CliHeader("RBD Pool")]
+ load_balancing_group: Annotated[int, CliHeader('LB Group')]
+ rbd_image_size: Annotated[int, CliFlags.SIZE]
+ block_size: Annotated[int, CliFlags.SIZE]
+ rw_ios_per_second: Annotated[int, CliHeader('R/W IOs/sec')]
+ rw_mbytes_per_second: Annotated[int, CliHeader('R/W MBs/sec')]
+ r_mbytes_per_second: Annotated[int, CliHeader('Read MBs/sec')]
+ w_mbytes_per_second: Annotated[int, CliHeader('Write MBs/sec')]
auto_visible: bool
hosts: List[str]
nsid: Optional[int]
class NamespaceList(NamedTuple):
status: int
error_message: str
- namespaces: List[Namespace]
+ namespaces: Annotated[List[Namespace], CliFlags.EXCLUSIVE_LIST]
class NamespaceIOStats(NamedTuple):
- status: int
- error_message: str
+ status: Annotated[int, CliFlags.DROP]
+ error_message: Annotated[str, CliFlags.DROP]
subsystem_nqn: str
nsid: int
uuid: str
bdev_name: str
tick_rate: int
ticks: int
- bytes_read: int
+ bytes_read: Annotated[int, CliFlags.SIZE]
num_read_ops: int
- bytes_written: int
+ bytes_written: Annotated[int, CliFlags.SIZE]
num_write_ops: int
- bytes_unmapped: int
+ bytes_unmapped: Annotated[int, CliFlags.SIZE]
num_unmap_ops: int
read_latency_ticks: int
max_read_latency_ticks: int
class ListenerList(NamedTuple):
status: int
error_message: str
- listeners: List[Listener]
+ listeners: Annotated[List[Listener], CliFlags.EXCLUSIVE_LIST]
class Host(NamedTuple):
class RequestStatus(NamedTuple):
- status: int
+ status: Annotated[int, CliFlags.EXCLUSIVE_RESULT]
error_message: str
# -*- coding: utf-8 -*-
import errno
import json
-from typing import Any, Dict, Optional
+from abc import ABC, abstractmethod
+from typing import Annotated, Any, Dict, List, NamedTuple, Optional, Type, \
+ Union, get_args, get_origin, get_type_hints
import yaml
from mgr_module import CLICheckNonemptyFileInput, CLICommand, CLIReadCommand, \
CLIWriteCommand, HandleCommandResult, HandlerFuncType
+from prettytable import PrettyTable
from ..exceptions import DashboardException
+from ..model.nvmeof import CliFlags, CliHeader
from ..rest_client import RequestException
from .nvmeof_conf import ManagedByOrchestratorException, \
NvmeofGatewayAlreadyExists, NvmeofGatewaysConfig
return -errno.EINVAL, '', str(ex)
+def convert_from_bytes(num_in_bytes):
+ units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
+ size = float(num_in_bytes)
+ unit_index = 0
+
+ while size >= 1024 and unit_index < len(units) - 1:
+ size /= 1024
+ unit_index += 1
+
+ # Round to no decimal if it's an integer, otherwise show 1 decimal place
+ if size.is_integer():
+ size_str = f"{int(size)}"
+ else:
+ size_str = f"{size:.1f}"
+
+ return f"{size_str}{units[unit_index]}"
+
+
+class OutputFormatter(ABC):
+ @abstractmethod
+ def format_output(self, data, model):
+ """Format the given data for output."""
+ raise NotImplementedError()
+
+
+class AnnotatedDataTextOutputFormatter(OutputFormatter):
+ def _snake_case_to_title(self, s):
+ return s.replace('_', ' ').title()
+
+ def _create_table(self, field_names):
+ table = PrettyTable(border=True)
+ titles = [self._snake_case_to_title(field) for field in field_names]
+ table.field_names = titles
+ table.align = 'l'
+ table.padding_width = 0
+ return table
+
+ def _get_text_output(self, data):
+ if isinstance(data, list):
+ return self._get_list_text_output(data)
+ return self._get_object_text_output(data)
+
+ def _get_list_text_output(self, data):
+ columns = list(dict.fromkeys([key for obj in data for key in obj.keys()]))
+ table = self._create_table(columns)
+ for d in data:
+ row = []
+ for col in columns:
+ row.append(str(d.get(col)))
+ table.add_row(row)
+ return table.get_string()
+
+ def _get_object_text_output(self, data):
+ columns = [k for k in data.keys() if k not in ["status", "error_message"]]
+ table = self._create_table(columns)
+ row = []
+ for col in columns:
+ row.append(str(data.get(col)))
+ table.add_row(row)
+ return table.get_string()
+
+ def _is_list_of_complex_type(self, value):
+ if not isinstance(value, list):
+ return False
+
+ if not value:
+ return None
+
+ primitives = (int, float, str, bool, bytes)
+
+ return not isinstance(value[0], primitives)
+
+ def _select_list_field(self, data: Dict) -> Optional[str]:
+ for key, value in data.items():
+ if self._is_list_of_complex_type(value):
+ return key
+ return None
+
+ def is_namedtuple_type(self, obj):
+ return isinstance(obj, type) and issubclass(obj, tuple) and hasattr(obj, '_fields')
+
+ # pylint: disable=too-many-branches
+ def process_dict(self, input_dict: dict,
+ nt_class: Type[NamedTuple],
+ is_top_level: bool) -> Union[Dict, str, List]:
+ result: Dict = {}
+ if not input_dict:
+ return result
+ hints = get_type_hints(nt_class, include_extras=True)
+
+ for field, type_hint in hints.items():
+ if field not in input_dict:
+ continue
+
+ value = input_dict[field]
+ origin = get_origin(type_hint)
+
+ actual_type = type_hint
+ annotations = []
+ output_name = field
+ skip = False
+
+ if origin is Annotated:
+ actual_type, *annotations = get_args(type_hint)
+ for annotation in annotations:
+ if annotation == CliFlags.DROP:
+ skip = True
+ break
+ if isinstance(annotation, CliHeader):
+ output_name = annotation.label
+ elif is_top_level and annotation == CliFlags.EXCLUSIVE_LIST:
+ assert get_origin(actual_type) == list
+ assert len(get_args(actual_type)) == 1
+ return [self.process_dict(item, get_args(actual_type)[0],
+ False) for item in value]
+ elif is_top_level and annotation == CliFlags.EXCLUSIVE_RESULT:
+ return f"Failure: {input_dict.get('error_message')}" if bool(
+ input_dict[field]) else "Success"
+ elif annotation == CliFlags.SIZE:
+ value = convert_from_bytes(int(input_dict[field]))
+
+ if skip:
+ continue
+
+ # If it's a nested namedtuple and value is a dict, recurse
+ if self.is_namedtuple_type(actual_type) and isinstance(value, dict):
+ result[output_name] = self.process_dict(value, actual_type, False)
+ else:
+ result[output_name] = value
+
+ return result
+
+ def _convert_to_text_output(self, data, model):
+ data = self.process_dict(data, model, True)
+ if isinstance(data, str):
+ return data
+ return self._get_text_output(data)
+
+ def format_output(self, data, model):
+ return self._convert_to_text_output(data, model)
+
+
class NvmeofCLICommand(CLICommand):
+ def __init__(self, prefix, model: Type[NamedTuple], perm='rw', poll=False):
+ super().__init__(prefix, perm, poll)
+ self._output_formatter = AnnotatedDataTextOutputFormatter()
+ self._model = model
+
def __call__(self, func) -> HandlerFuncType: # type: ignore
# pylint: disable=useless-super-delegation
"""
try:
ret = super().call(mgr, cmd_dict, inbuf)
out_format = cmd_dict.get('format')
- if out_format == 'json' or not out_format:
- if ret is None:
- out = ''
- else:
- out = json.dumps(ret)
+ if ret is None:
+ out = ''
+ if out_format == 'plain' or not out_format:
+ out = self._output_formatter.format_output(ret, self._model)
+ elif out_format == 'json':
+ out = json.dumps(ret)
elif out_format == 'yaml':
- if ret is None:
- out = ''
- else:
- out = yaml.dump(ret)
+ out = yaml.dump(ret)
else:
return HandleCommandResult(-errno.EINVAL, '',
f"format '{out_format}' is not implemented")
import functools
import logging
-from typing import Any, Callable, Dict, Generator, List, NamedTuple, Optional, Type
+from typing import Annotated, Any, Callable, Dict, Generator, List, \
+ NamedTuple, Optional, Type, get_args, get_origin
from ..exceptions import DashboardException
from .nvmeof_conf import NvmeofGatewaysConfig
field_values = {}
for field, field_type in zip(target_type._fields,
target_type.__annotations__.values()):
+ if get_origin(field_type) == Annotated:
+ field_type = get_args(field_type)[0]
# these conditions are complex since we need to navigate between dicts,
# empty dicts and objects
if isinstance(data, dict) and data.get(field) is not None:
import errno
import json
import unittest
+from typing import Annotated, List, NamedTuple
from unittest.mock import MagicMock
import pytest
from mgr_module import CLICommand, HandleCommandResult
-from ..services.nvmeof_cli import NvmeofCLICommand
+from ..model.nvmeof import CliFlags, CliHeader
+from ..services.nvmeof_cli import AnnotatedDataTextOutputFormatter, \
+ NvmeofCLICommand, convert_from_bytes
from ..tests import CLICommandTestMixin
def fixture_sample_command():
test_cmd = "test command"
- @NvmeofCLICommand(test_cmd)
+ class Model(NamedTuple):
+ a: str
+ b: int
+
+ @NvmeofCLICommand(test_cmd, Model)
def func(_): # noqa # pylint: disable=unused-variable
return {'a': '1', 'b': 2}
yield test_cmd
result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {})
assert isinstance(result, HandleCommandResult)
assert result.retval == 0
- assert result.stdout == '{"a": "b"}'
+ assert result.stdout == (
+ "+-+\n"
+ "|A|\n"
+ "+-+\n"
+ "|b|\n"
+ "+-+"
+ )
+ assert result.stderr == ''
+ base_call_mock.assert_called_once()
+
+ def test_command_return_cmd_result_plain_format(self, base_call_mock, sample_command):
+ result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {'format': 'plain'})
+ assert isinstance(result, HandleCommandResult)
+ assert result.retval == 0
+ assert result.stdout == (
+ "+-+\n"
+ "|A|\n"
+ "+-+\n"
+ "|b|\n"
+ "+-+"
+ )
assert result.stderr == ''
base_call_mock.assert_called_once()
result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {})
assert isinstance(result, HandleCommandResult)
assert result.retval == 0
- assert result.stdout == ''
+ assert result.stdout == (
+ "++\n"
+ "||\n"
+ "++\n"
+ "\n"
+ "++"
+ )
assert result.stderr == ''
base_call_return_none_mock.assert_called_once()
self.assertEqual(
config['gateways'], {}
)
+
+
+class TestAnnotatedDataTextOutputFormatter():
+ def test_size_bytes_annotation(self):
+ class Sample(NamedTuple):
+ name: str
+ age: int
+ byte: Annotated[int, CliFlags.SIZE]
+
+ data = {'name': 'Alice', 'age': 30, "byte": 20971520}
+
+ formatter = AnnotatedDataTextOutputFormatter()
+ output = formatter.format_output(data, Sample)
+ assert output == (
+ '+-----+---+----+\n'
+ '|Name |Age|Byte|\n'
+ '+-----+---+----+\n'
+ '|Alice|30 |20MB|\n'
+ '+-----+---+----+'
+ )
+
+ def test_drop_annotation(self):
+ class Sample(NamedTuple):
+ name: str
+ age: Annotated[int, CliFlags.DROP]
+
+ data = {'name': 'Alice', 'age': 30}
+
+ formatter = AnnotatedDataTextOutputFormatter()
+ output = formatter.format_output(data, Sample)
+ assert output == (
+ '+-----+\n'
+ '|Name |\n'
+ '+-----+\n'
+ '|Alice|\n'
+ '+-----+'
+ )
+
+ def test_multiple_annotations(self):
+ class Sample(NamedTuple):
+ name: str
+ age: Annotated[int, CliFlags.SIZE, CliHeader('test')]
+
+ data = {'name': 'Alice', 'age': 1024*1024}
+
+ formatter = AnnotatedDataTextOutputFormatter()
+ output = formatter.format_output(data, Sample)
+ assert output == (
+ '+-----+----+\n'
+ '|Name |Test|\n'
+ '+-----+----+\n'
+ '|Alice|1MB |\n'
+ '+-----+----+'
+ )
+
+ def test_override_header_annotation(self):
+ class Sample(NamedTuple):
+ name: str
+ age: Annotated[int, CliHeader('test')]
+
+ data = {'name': 'Alice', 'age': 30}
+
+ formatter = AnnotatedDataTextOutputFormatter()
+ output = formatter.format_output(data, Sample)
+ assert output == (
+ '+-----+----+\n'
+ '|Name |Test|\n'
+ '+-----+----+\n'
+ '|Alice|30 |\n'
+ '+-----+----+'
+ )
+
+ def test_override_exclusive_list_field_annotation(self):
+ class Sample(NamedTuple):
+ name: str
+ age: int
+
+ class SampleList(NamedTuple):
+ status: int
+ error_message: str
+ samples: Annotated[List[Sample], CliFlags.EXCLUSIVE_LIST]
+
+ data = {"status": 0, "error_message": '',
+ "samples": [{'name': 'Alice', 'age': 30},
+ {'name': 'Bob', 'age': 40}]}
+
+ formatter = AnnotatedDataTextOutputFormatter()
+ output = formatter.format_output(data, SampleList)
+ assert output == (
+ '+-----+---+\n'
+ '|Name |Age|\n'
+ '+-----+---+\n'
+ '|Alice|30 |\n'
+ '|Bob |40 |\n'
+ '+-----+---+'
+ )
+
+ def test_exclusive_result_indicator_annotation(self):
+ class Sample(NamedTuple):
+ name: str
+ status: Annotated[int, CliFlags.EXCLUSIVE_RESULT]
+ error_message: str
+
+ data = {'name': 'Alice', 'status': 30, "error_message": 'bla'}
+
+ formatter = AnnotatedDataTextOutputFormatter()
+ output = formatter.format_output(data, Sample)
+ assert output == 'Failure: bla'
+
+ data = {'name': 'Alice', 'status': 0}
+ output = formatter.format_output(data, Sample)
+
+ assert output == 'Success'
+
+
+class TestConverFromBytes:
+ def test_valid_inputs(self):
+ assert convert_from_bytes(209715200) == '200MB'
+ assert convert_from_bytes(219715200) == '209.5MB'
+ assert convert_from_bytes(1048576) == '1MB'
+ assert convert_from_bytes(123) == '123B'
+ assert convert_from_bytes(5368709120) == '5GB'