from .. import mgr
from ..model import nvmeof as model
from ..security import Scope
+from ..services.nvmeof_cli import NvmeofCLICommand
from ..services.orchestrator import OrchClient
from ..tools import str_to_bool
from . import APIDoc, APIRouter, BaseController, CreatePermission, \
@APIDoc("NVMe-oF Gateway Management API", "NVMe-oF Gateway")
class NVMeoFGateway(RESTController):
@EndpointDoc("Get information about the NVMeoF gateway")
+ @NvmeofCLICommand("nvmeof gw info")
@map_model(model.GatewayInfo)
@handle_nvmeof_error
def list(self, gw_group: Optional[str] = None):
@APIDoc("NVMe-oF Subsystem Management API", "NVMe-oF Subsystem")
class NVMeoFSubsystem(RESTController):
@EndpointDoc("List all NVMeoF subsystems")
+ @NvmeofCLICommand("nvmeof subsystem list")
@map_collection(model.Subsystem, pick="subsystems")
@handle_nvmeof_error
def list(self, gw_group: Optional[str] = None):
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof subsystem get")
@map_model(model.Subsystem, first="subsystems")
@handle_nvmeof_error
def get(self, nqn: str, gw_group: Optional[str] = None):
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof subsystem add")
@empty_response
@handle_nvmeof_error
def create(self, nqn: str, enable_ha: bool, max_namespaces: int = 1024,
"Delete an existing NVMeoF subsystem",
parameters={
"nqn": Param(str, "NVMeoF subsystem NQN"),
- "force": Param(bool, "Force delete", "false"),
+ "force": Param(bool, "Force delete", True, False),
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof subsystem del")
@empty_response
@handle_nvmeof_error
def delete(self, nqn: str, force: Optional[str] = "false", gw_group: Optional[str] = None):
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof listener list")
@map_collection(model.Listener, pick="listeners")
@handle_nvmeof_error
def list(self, nqn: str, gw_group: Optional[str] = None):
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof listener add")
@empty_response
@handle_nvmeof_error
def create(
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof listener del")
@empty_response
@handle_nvmeof_error
def delete(
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof ns list")
@map_collection(model.Namespace, pick="namespaces")
@handle_nvmeof_error
def list(self, nqn: str, gw_group: Optional[str] = None):
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof ns get")
@map_model(model.Namespace, first="namespaces")
@handle_nvmeof_error
def get(self, nqn: str, nsid: str, gw_group: Optional[str] = None):
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof ns get_io_stats")
@map_model(model.NamespaceIOStats)
@handle_nvmeof_error
def io_stats(self, nqn: str, nsid: str, gw_group: Optional[str] = None):
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof ns add")
@map_model(model.NamespaceCreation)
@handle_nvmeof_error
def create(
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof ns update")
@empty_response
@handle_nvmeof_error
def update(
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof ns del")
@empty_response
@handle_nvmeof_error
def delete(self, nqn: str, nsid: str, gw_group: Optional[str] = None):
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof host list")
@map_collection(
model.Host,
pick="hosts",
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof host add")
@empty_response
@handle_nvmeof_error
def create(self, nqn: str, host_nqn: str, gw_group: Optional[str] = None):
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof host del")
@empty_response
@handle_nvmeof_error
def delete(self, nqn: str, host_nqn: str, gw_group: Optional[str] = None):
"gw_group": Param(str, "NVMeoF gateway group", True, None),
},
)
+ @NvmeofCLICommand("nvmeof connection list")
@map_collection(model.Connection, pick="connections")
@handle_nvmeof_error
def list(self, nqn: str, gw_group: Optional[str] = None):
create_self_signed_cert, get_default_addr, verify_tls_files
from . import mgr
+from .controllers import nvmeof # noqa # pylint: disable=unused-import
from .controllers import Router, json_error_page
from .grafana import push_local_dashboards
from .services import nvmeof_cli # noqa # pylint: disable=unused-import
# -*- coding: utf-8 -*-
import errno
import json
+from typing import Any, Dict, Optional
-from mgr_module import CLICheckNonemptyFileInput, CLIReadCommand, CLIWriteCommand
+import yaml
+from mgr_module import CLICheckNonemptyFileInput, CLICommand, CLIReadCommand, \
+ CLIWriteCommand, HandleCommandResult, HandlerFuncType
+from ..exceptions import DashboardException
from ..rest_client import RequestException
from .nvmeof_conf import ManagedByOrchestratorException, \
NvmeofGatewayAlreadyExists, NvmeofGatewaysConfig
return 0, 'Success', ''
except ManagedByOrchestratorException as ex:
return -errno.EINVAL, '', str(ex)
+
+
+class NvmeofCLICommand(CLICommand):
+ def __call__(self, func) -> HandlerFuncType: # type: ignore
+ # pylint: disable=useless-super-delegation
+ """
+ This method is being overriden solely to be able to disable the linters checks for typing.
+ The NvmeofCLICommand decorator assumes a different type returned from the
+ function it wraps compared to CLICmmand, breaking a Liskov substitution principal,
+ hence triggering linters alerts.
+ """
+ return super().__call__(func)
+
+ def call(self,
+ mgr: Any,
+ cmd_dict: Dict[str, Any],
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ try:
+ ret = super().call(mgr, cmd_dict, inbuf)
+ out_format = cmd_dict.get('format')
+ if out_format == 'json' or not out_format:
+ if ret is None:
+ out = ''
+ else:
+ out = json.dumps(ret)
+ elif out_format == 'yaml':
+ if ret is None:
+ out = ''
+ else:
+ out = yaml.dump(ret)
+ else:
+ return HandleCommandResult(-errno.EINVAL, '',
+ f"format '{out_format}' is not implemented")
+ return HandleCommandResult(0, out, '')
+ except DashboardException as e:
+ return HandleCommandResult(-errno.EINVAL, '', str(e))
import grpc._channel # type: ignore
from google.protobuf.message import Message # type: ignore
- from .proto import gateway_pb2 as pb2
- from .proto import gateway_pb2_grpc as pb2_grpc
+ from .proto import gateway_pb2 as pb2 # type: ignore
+ from .proto import gateway_pb2_grpc as pb2_grpc # type: ignore
except ImportError:
grpc = None
else:
--- /dev/null
+import errno
+from unittest.mock import MagicMock
+
+import pytest
+from mgr_module import CLICommand, HandleCommandResult
+
+from ..services.nvmeof_cli import NvmeofCLICommand
+
+
+@pytest.fixture(scope="class", name="sample_command")
+def fixture_sample_command():
+ test_cmd = "test command"
+
+ @NvmeofCLICommand(test_cmd)
+ def func(_): # noqa # pylint: disable=unused-variable
+ return {'a': '1', 'b': 2}
+ yield test_cmd
+ del NvmeofCLICommand.COMMANDS[test_cmd]
+ assert test_cmd not in NvmeofCLICommand.COMMANDS
+
+
+@pytest.fixture(name='base_call_mock')
+def fixture_base_call_mock(monkeypatch):
+ mock_result = {'a': 'b'}
+ super_mock = MagicMock()
+ super_mock.return_value = mock_result
+ monkeypatch.setattr(CLICommand, 'call', super_mock)
+ return super_mock
+
+
+@pytest.fixture(name='base_call_return_none_mock')
+def fixture_base_call_return_none_mock(monkeypatch):
+ mock_result = None
+ super_mock = MagicMock()
+ super_mock.return_value = mock_result
+ monkeypatch.setattr(CLICommand, 'call', super_mock)
+ return super_mock
+
+
+class TestNvmeofCLICommand:
+ def test_command_being_added(self, sample_command):
+ assert sample_command in NvmeofCLICommand.COMMANDS
+ assert isinstance(NvmeofCLICommand.COMMANDS[sample_command], NvmeofCLICommand)
+
+ def test_command_return_cmd_result_default_format(self, base_call_mock, sample_command):
+ result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {})
+ assert isinstance(result, HandleCommandResult)
+ assert result.retval == 0
+ assert result.stdout == '{"a": "b"}'
+ assert result.stderr == ''
+ base_call_mock.assert_called_once()
+
+ def test_command_return_cmd_result_json_format(self, base_call_mock, sample_command):
+ result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {'format': 'json'})
+ assert isinstance(result, HandleCommandResult)
+ assert result.retval == 0
+ assert result.stdout == '{"a": "b"}'
+ assert result.stderr == ''
+ base_call_mock.assert_called_once()
+
+ def test_command_return_cmd_result_yaml_format(self, base_call_mock, sample_command):
+ result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {'format': 'yaml'})
+ assert isinstance(result, HandleCommandResult)
+ assert result.retval == 0
+ assert result.stdout == 'a: b\n'
+ assert result.stderr == ''
+ base_call_mock.assert_called_once()
+
+ def test_command_return_cmd_result_invalid_format(self, base_call_mock, sample_command):
+ mock_result = {'a': 'b'}
+ super_mock = MagicMock()
+ super_mock.call.return_value = mock_result
+
+ result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {'format': 'invalid'})
+ assert isinstance(result, HandleCommandResult)
+ assert result.retval == -errno.EINVAL
+ assert result.stdout == ''
+ assert result.stderr
+ base_call_mock.assert_called_once()
+
+ def test_command_return_empty_cmd_result(self, base_call_return_none_mock, sample_command):
+ result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {})
+ assert isinstance(result, HandleCommandResult)
+ assert result.retval == 0
+ assert result.stdout == ''
+ assert result.stderr == ''
+ base_call_return_none_mock.assert_called_once()