self.mgr.spec_store.all_specs.get(daemon_descrs[0].service_name(), None))
for dd in daemon_descrs:
- if dd.hostname is None:
- err_msg = ('Trying to config_dashboard nvmeof but no hostname is defined')
- logger.error(err_msg)
- raise OrchestratorError(err_msg)
+ assert dd.hostname is not None
+ service_name = dd.service_name()
if not spec:
- logger.warning(f'No ServiceSpec found for {dd.service_name()}')
+ logger.warning(f'No ServiceSpec found for {service_name}')
continue
ip = utils.resolve_ip(self.mgr.inventory.get_addr(dd.hostname))
cmd_dicts.append({
'prefix': 'dashboard nvmeof-gateway-add',
'inbuf': service_url,
- 'name': dd.hostname
+ 'name': service_name
})
return cmd_dicts
logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
# to clean the keyring up
super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
+ service_name = daemon.service_name()
# remove config for dashboard nvmeof gateways if any
ret, out, err = self.mgr.mon_command({
'prefix': 'dashboard nvmeof-gateway-rm',
- 'name': daemon.hostname,
+ 'name': service_name,
})
if not ret:
logger.info(f'{daemon.hostname} removed from nvmeof gateways dashboard config')
--- /dev/null
+import functools
+import logging
+from collections.abc import Iterable
+from typing import Any, Callable, Dict, List, NamedTuple, Optional, Type
+
+from ..exceptions import DashboardException
+from .nvmeof_conf import NvmeofGatewaysConfig
+
+logger = logging.getLogger("nvmeof_client")
+
+try:
+ import grpc # type: ignore
+ import grpc._channel # type: ignore
+ from google.protobuf.message import Message # type: ignore
+
+ from .proto import gateway_pb2 as pb2
+ from .proto import gateway_pb2_grpc as pb2_grpc
+except ImportError:
+ grpc = None
+else:
+
+ class NVMeoFClient(object):
+ pb2 = pb2
+
+ def __init__(self):
+ logger.info("Initiating nvmeof gateway connection...")
+ service_name, self.gateway_addr = NvmeofGatewaysConfig.get_service_info()
+
+ root_ca_cert = NvmeofGatewaysConfig.get_root_ca_cert(service_name)
+ client_key = NvmeofGatewaysConfig.get_client_key(service_name)
+ client_cert = NvmeofGatewaysConfig.get_client_cert(service_name)
+
+ if root_ca_cert and client_key and client_cert:
+ logger.info('Securely connecting to: %s', self.gateway_addr)
+ credentials = grpc.ssl_channel_credentials(
+ root_certificates=root_ca_cert,
+ private_key=client_key,
+ certificate_chain=client_cert,
+ )
+ self.channel = grpc.secure_channel(self.gateway_addr, credentials)
+ else:
+ logger.info("Insecurely connecting to: %s", self.gateway_addr)
+ self.channel = grpc.insecure_channel(self.gateway_addr)
+ self.stub = pb2_grpc.GatewayStub(self.channel)
+
+ def make_namedtuple_from_object(cls: Type[NamedTuple], obj: Any) -> NamedTuple:
+ return cls(
+ **{
+ field: getattr(obj, field)
+ for field in cls._fields
+ if hasattr(obj, field)
+ }
+ ) # type: ignore
+
+ Model = Dict[str, Any]
+
+ def map_model(
+ model: Type[NamedTuple],
+ first: Optional[str] = None,
+ ) -> Callable[..., Callable[..., Model]]:
+ def decorator(func: Callable[..., Message]) -> Callable[..., Model]:
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs) -> Model:
+ message = func(*args, **kwargs)
+ if first:
+ try:
+ message = getattr(message, first)[0]
+ except IndexError:
+ raise DashboardException(
+ msg="Not Found", http_status_code=404, component="nvmeof"
+ )
+
+ return make_namedtuple_from_object(model, message)._asdict()
+
+ return wrapper
+
+ return decorator
+
+ Collection = List[Model]
+
+ def map_collection(
+ model: Type[NamedTuple],
+ pick: str,
+ finalize: Optional[Callable[[Message, Collection], Collection]] = None,
+ ) -> Callable[..., Callable[..., Collection]]:
+ def decorator(func: Callable[..., Message]) -> Callable[..., Collection]:
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs) -> Collection:
+ message = func(*args, **kwargs)
+ collection: Iterable = getattr(message, pick)
+ out = [
+ make_namedtuple_from_object(model, i)._asdict() for i in collection
+ ]
+ if finalize:
+ return finalize(message, out)
+ return out
+
+ return wrapper
+
+ return decorator
+
+ import errno
+
+ NVMeoFError2HTTP = {
+ # errno errors
+ errno.EPERM: 403, # 1
+ errno.ENOENT: 404, # 2
+ errno.EACCES: 403, # 13
+ errno.EEXIST: 409, # 17
+ errno.ENODEV: 404, # 19
+ # JSONRPC Spec: https://www.jsonrpc.org/specification#error_object
+ -32602: 422, # Invalid Params
+ -32603: 500, # Internal Error
+ }
+
+ def handle_nvmeof_error(func: Callable[..., Message]) -> Callable[..., Message]:
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs) -> Message:
+ try:
+ response = func(*args, **kwargs)
+ except grpc._channel._InactiveRpcError as e: # pylint: disable=protected-access
+ raise DashboardException(
+ msg=e.details(),
+ code=e.code(),
+ http_status_code=504,
+ component="nvmeof",
+ )
+
+ if response.status != 0:
+ raise DashboardException(
+ msg=response.error_message,
+ code=response.status,
+ http_status_code=NVMeoFError2HTTP.get(response.status, 400),
+ component="nvmeof",
+ )
+ return response
+
+ return wrapper
+
+ def empty_response(func: Callable[..., Message]) -> Callable[..., None]:
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs) -> None:
+ func(*args, **kwargs)
+
+ return wrapper
--- /dev/null
+# -*- coding: utf-8 -*-
+
+import json
+
+from orchestrator import OrchestratorError
+
+from .. import mgr
+from ..exceptions import DashboardException
+from ..services.orchestrator import OrchClient
+
+
+class NvmeofGatewayAlreadyExists(Exception):
+ def __init__(self, gateway_name):
+ super(NvmeofGatewayAlreadyExists, self).__init__(
+ "NVMe-oF gateway '{}' already exists".format(gateway_name))
+
+
+class NvmeofGatewayDoesNotExist(Exception):
+ def __init__(self, hostname):
+ super(NvmeofGatewayDoesNotExist, self).__init__(
+ "NVMe-oF gateway '{}' does not exist".format(hostname))
+
+
+class ManagedByOrchestratorException(Exception):
+ def __init__(self):
+ super(ManagedByOrchestratorException, self).__init__(
+ "NVMe-oF configuration is managed by the orchestrator")
+
+
+_NVMEOF_STORE_KEY = "_nvmeof_config"
+
+
+class NvmeofGatewaysConfig(object):
+ @classmethod
+ def _load_config_from_store(cls):
+ json_db = mgr.get_store(_NVMEOF_STORE_KEY,
+ '{"gateways": {}}')
+ config = json.loads(json_db)
+ cls._save_config(config)
+ return config
+
+ @classmethod
+ def _save_config(cls, config):
+ mgr.set_store(_NVMEOF_STORE_KEY, json.dumps(config))
+
+ @classmethod
+ def get_gateways_config(cls):
+ return cls._load_config_from_store()
+
+ @classmethod
+ def add_gateway(cls, name, service_url):
+ config = cls.get_gateways_config()
+ if name in config:
+ raise NvmeofGatewayAlreadyExists(name)
+ config['gateways'][name] = {'service_url': service_url}
+ cls._save_config(config)
+
+ @classmethod
+ def remove_gateway(cls, name):
+ config = cls.get_gateways_config()
+ if name not in config['gateways']:
+ raise NvmeofGatewayDoesNotExist(name)
+ del config['gateways'][name]
+ cls._save_config(config)
+
+ @classmethod
+ def get_service_info(cls):
+ try:
+ config = cls.get_gateways_config()
+ service_name = list(config['gateways'].keys())[0]
+ addr = config['gateways'][service_name]['service_url']
+ return service_name, addr
+ except (KeyError, IndexError) as e:
+ raise DashboardException(
+ msg=f'NVMe-oF configuration is not set: {e}',
+ )
+
+ @classmethod
+ def get_client_cert(cls, service_name: str):
+ client_cert = cls.from_cert_store('nvmeof_client_cert', service_name)
+ return client_cert.encode() if client_cert else None
+
+ @classmethod
+ def get_client_key(cls, service_name: str):
+ client_key = cls.from_cert_store('nvmeof_client_key', service_name, key=True)
+ return client_key.encode() if client_key else None
+
+ @classmethod
+ def get_root_ca_cert(cls, service_name: str):
+ root_ca_cert = cls.from_cert_store('nvmeof_root_ca_cert', service_name)
+ return root_ca_cert.encode() if root_ca_cert else None
+
+ @classmethod
+ def from_cert_store(cls, entity: str, service_name: str, key=False):
+ try:
+ orch = OrchClient.instance()
+ if orch.available():
+ if key:
+ return orch.cert_store.get_key(entity, service_name)
+ return orch.cert_store.get_cert(entity, service_name)
+ return None
+ except OrchestratorError as e:
+ raise DashboardException(
+ msg=f'Failed to get {entity} for {service_name}: {e}',
+ )
return self.api.upgrade_stop()
+class HardwareManager(ResourceManager):
+
+ @wait_api_result
+ def common(self, category: str, hostname: Optional[List[str]] = None) -> str:
+ return self.api.node_proxy_common(category, hostname=hostname)
+
+
+class CertStoreManager(ResourceManager):
+
+ @wait_api_result
+ def get_cert(self, entity: str, service_name: Optional[str] = None,
+ hostname: Optional[str] = None) -> str:
+ return self.api.cert_store_get_cert(entity, service_name, hostname)
+
+ @wait_api_result
+ def get_key(self, entity: str, service_name: Optional[str] = None,
+ hostname: Optional[str] = None) -> str:
+ return self.api.cert_store_get_key(entity, service_name, hostname)
+
+
class OrchClient(object):
_instance = None
self.osds = OsdManager(self.api)
self.daemons = DaemonManager(self.api)
self.upgrades = UpgradeManager(self.api)
+ self.hardware = HardwareManager(self.api)
+ self.cert_store = CertStoreManager(self.api)
def available(self, features: Optional[List[str]] = None) -> bool:
available = self.status()['available']