DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:9.4.7'
DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3'
DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4'
+DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.1'
DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1'
DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23'
DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29'
class Ceph(object):
daemons = ('mon', 'mgr', 'osd', 'mds', 'rgw', 'rbd-mirror',
'crash', 'cephfs-mirror', 'ceph-exporter')
- gateways = ('iscsi', 'nfs')
+ gateways = ('iscsi', 'nfs', 'nvmeof')
##################################
version = None
out, err, code = call(ctx,
[ctx.container_engine.path, 'exec', container_id,
- '/usr/bin/python3', '-c', "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"],
+ '/usr/bin/python3', '-c',
+ "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"],
verbosity=CallVerbosity.QUIET)
if code == 0:
version = out.strip()
tcmu_container.cname = self.get_container_name(desc='tcmu')
return tcmu_container
+
+##################################
+
+
+class CephNvmeof(object):
+ """Defines a Ceph-Nvmeof container"""
+
+ daemon_type = 'nvmeof'
+ required_files = ['ceph-nvmeof.conf']
+ default_image = DEFAULT_NVMEOF_IMAGE
+
+ def __init__(self,
+ ctx,
+ fsid,
+ daemon_id,
+ config_json,
+ image=DEFAULT_NVMEOF_IMAGE):
+ # type: (CephadmContext, str, Union[int, str], Dict, str) -> None
+ self.ctx = ctx
+ self.fsid = fsid
+ self.daemon_id = daemon_id
+ self.image = image
+
+ # config-json options
+ self.files = dict_get(config_json, 'files', {})
+
+ # validate the supplied args
+ self.validate()
+
+ @classmethod
+ def init(cls, ctx, fsid, daemon_id):
+ # type: (CephadmContext, str, Union[int, str]) -> CephNvmeof
+ return cls(ctx, fsid, daemon_id,
+ fetch_configs(ctx), ctx.image)
+
+ @staticmethod
+ def get_container_mounts(data_dir: str) -> Dict[str, str]:
+ mounts = dict()
+ mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z'
+ # mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
+ mounts['/etc/ceph/ceph.client.admin.keyring'] = '/etc/ceph/keyring:z' # TODO: FIXME
+ mounts[os.path.join(data_dir, 'ceph-nvmeof.conf')] = '/src/ceph-nvmeof.conf:z'
+ mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config'
+ # mounts[log_dir] = '/var/log:z' # TODO: would we need a logdir?
+ mounts['/dev'] = '/dev'
+ return mounts
+
+ @staticmethod
+ def get_container_binds():
+ # type: () -> List[List[str]]
+ binds = []
+ lib_modules = ['type=bind',
+ 'source=/lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true']
+ binds.append(lib_modules)
+ return binds
+
+ @staticmethod
+ def get_version(ctx: CephadmContext, container_id: str) -> Optional[str]:
+ out, err, ret = call_throws(ctx, [
+ ctx.container_engine.path, 'inspect',
+ '--format', '{{index .Config.Labels "io.ceph.version"}}',
+ ctx.image])
+ version = None
+ if ret == 0:
+ version = out.strip()
+ return version
+
+ def validate(self):
+ # type: () -> None
+ if not is_fsid(self.fsid):
+ raise Error('not an fsid: %s' % self.fsid)
+ if not self.daemon_id:
+ raise Error('invalid daemon_id: %s' % self.daemon_id)
+ if not self.image:
+ raise Error('invalid image: %s' % self.image)
+
+ # check for the required files
+ if self.required_files:
+ for fname in self.required_files:
+ if fname not in self.files:
+ raise Error('required file missing from config-json: %s' % fname)
+
+ def get_daemon_name(self):
+ # type: () -> str
+ return '%s.%s' % (self.daemon_type, self.daemon_id)
+
+ def get_container_name(self, desc=None):
+ # type: (Optional[str]) -> str
+ cname = '%s-%s' % (self.fsid, self.get_daemon_name())
+ if desc:
+ cname = '%s-%s' % (cname, desc)
+ return cname
+
+ def create_daemon_dirs(self, data_dir, uid, gid):
+ # type: (str, int, int) -> None
+ """Create files under the container data dir"""
+ if not os.path.isdir(data_dir):
+ raise OSError('data_dir is not a directory: %s' % (data_dir))
+
+ logger.info('Creating ceph-nvmeof config...')
+ configfs_dir = os.path.join(data_dir, 'configfs')
+ makedirs(configfs_dir, uid, gid, 0o755)
+
+ # populate files from the config-json
+ populate_files(data_dir, self.files, uid, gid)
+
+ @staticmethod
+ def configfs_mount_umount(data_dir, mount=True):
+ # type: (str, bool) -> List[str]
+ mount_path = os.path.join(data_dir, 'configfs')
+ if mount:
+ cmd = 'if ! grep -qs {0} /proc/mounts; then ' \
+ 'mount -t configfs none {0}; fi'.format(mount_path)
+ else:
+ cmd = 'if grep -qs {0} /proc/mounts; then ' \
+ 'umount {0}; fi'.format(mount_path)
+ return cmd.split()
+
+ @staticmethod
+ def get_sysctl_settings() -> List[str]:
+ return [
+ 'vm.nr_hugepages = 2048',
+ ]
+
+
##################################
supported_daemons.extend(Monitoring.components)
supported_daemons.append(NFSGanesha.daemon_type)
supported_daemons.append(CephIscsi.daemon_type)
+ supported_daemons.append(CephNvmeof.daemon_type)
supported_daemons.append(CustomContainer.daemon_type)
supported_daemons.append(HAproxy.daemon_type)
supported_daemons.append(Keepalived.daemon_type)
ctx.image = Keepalived.default_image
if type_ == SNMPGateway.daemon_type:
ctx.image = SNMPGateway.default_image
+ if type_ == CephNvmeof.daemon_type:
+ ctx.image = CephNvmeof.default_image
if type_ in Tracing.components:
ctx.image = Tracing.components[type_]['image']
if not ctx.image:
ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id)
ceph_iscsi.create_daemon_dirs(data_dir, uid, gid)
+ elif daemon_type == CephNvmeof.daemon_type:
+ ceph_nvmeof = CephNvmeof.init(ctx, fsid, daemon_id)
+ ceph_nvmeof.create_daemon_dirs(data_dir, uid, gid)
+
elif daemon_type == HAproxy.daemon_type:
haproxy = HAproxy.init(ctx, fsid, daemon_id)
haproxy.create_daemon_dirs(data_dir, uid, gid)
if daemon_type == CephIscsi.daemon_type:
binds.extend(CephIscsi.get_container_binds())
+ if daemon_type == CephNvmeof.daemon_type:
+ binds.extend(CephNvmeof.get_container_binds())
elif daemon_type == CustomContainer.daemon_type:
assert daemon_id
cc = CustomContainer.init(ctx, fsid, daemon_id)
data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
mounts.update(HAproxy.get_container_mounts(data_dir))
+ if daemon_type == CephNvmeof.daemon_type:
+ assert daemon_id
+ data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
+ mounts.update(CephNvmeof.get_container_mounts(data_dir))
+
if daemon_type == CephIscsi.daemon_type:
assert daemon_id
data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
name = '%s.%s' % (daemon_type, daemon_id)
envs.extend(Keepalived.get_container_envs())
container_args.extend(['--cap-add=NET_ADMIN', '--cap-add=NET_RAW'])
+ elif daemon_type == CephNvmeof.daemon_type:
+ name = '%s.%s' % (daemon_type, daemon_id)
+ container_args.extend(['--ulimit', 'memlock=-1:-1'])
+ container_args.extend(['--ulimit', 'nofile=10240'])
+ container_args.extend(['--cap-add=SYS_ADMIN', '--cap-add=CAP_SYS_NICE'])
elif daemon_type == CephIscsi.daemon_type:
entrypoint = CephIscsi.entrypoint
name = '%s.%s' % (daemon_type, daemon_id)
lines = HAproxy.get_sysctl_settings()
elif daemon_type == 'keepalived':
lines = Keepalived.get_sysctl_settings()
+ elif daemon_type == 'nvmeof':
+ lines = CephNvmeof.get_sysctl_settings()
lines = filter_sysctl_settings(ctx, lines)
# apply the sysctl settings
config=config, keyring=keyring,
deployment_type=deployment_type,
ports=daemon_ports)
+ elif daemon_type == CephNvmeof.daemon_type:
+ config, keyring = get_config_and_keyring(ctx)
+ uid, gid = 65534, 65534 # TODO: check this
+ c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
+ deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
+ config=config, keyring=keyring,
+ deployment_type=deployment_type,
+ ports=daemon_ports)
elif daemon_type in Tracing.components:
uid, gid = 65534, 65534
c = get_container(ctx, ctx.fsid, daemon_type, daemon_id)
version = NFSGanesha.get_version(ctx, container_id)
if daemon_type == CephIscsi.daemon_type:
version = CephIscsi.get_version(ctx, container_id)
+ if daemon_type == CephNvmeof.daemon_type:
+ version = CephNvmeof.get_version(ctx, container_id)
elif not version:
if daemon_type in Ceph.daemons:
out, err, code = call(ctx,
from .services.ingress import IngressService
from .services.container import CustomContainerService
from .services.iscsi import IscsiService
+from .services.nvmeof import NvmeofService
from .services.nfs import NFSService
from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
DEFAULT_IMAGE = 'quay.io/ceph/ceph:v18'
DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.43.0'
DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.5.0'
+DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.1'
DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0'
DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0'
DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0'
default=DEFAULT_PROMETHEUS_IMAGE,
desc='Prometheus container image',
),
+ Option(
+ 'container_image_nvmeof',
+ default=DEFAULT_NVMEOF_IMAGE,
+ desc='Nvme-of container image',
+ ),
Option(
'container_image_grafana',
default=DEFAULT_GRAFANA_IMAGE,
self.mode = ''
self.container_image_base = ''
self.container_image_prometheus = ''
+ self.container_image_nvmeof = ''
self.container_image_grafana = ''
self.container_image_alertmanager = ''
self.container_image_node_exporter = ''
OSDService, NFSService, MonService, MgrService, MdsService,
RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService,
- IngressService, CustomContainerService, CephfsMirrorService,
+ IngressService, CustomContainerService, CephfsMirrorService, NvmeofService,
CephadmAgent, CephExporterService, SNMPGatewayService, ElasticSearchService,
JaegerQueryService, JaegerAgentService, JaegerCollectorService
]
self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr'])
self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi'])
+ self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof'])
self.scheduled_async_actions: List[Callable] = []
if code:
return 1, '', ('check-host failed:\n' + '\n'.join(err))
except ssh.HostConnectionError as e:
- self.log.exception(f"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}")
+ self.log.exception(
+ f"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}")
return 1, '', ('check-host failed:\n'
+ f"Failed to connect to {host} at address ({e.addr}): {str(e)}")
except OrchestratorError:
)).strip()
elif daemon_type == 'prometheus':
image = self.container_image_prometheus
+ elif daemon_type == 'nvmeof':
+ image = self.container_image_nvmeof
elif daemon_type == 'grafana':
image = self.container_image_grafana
elif daemon_type == 'alertmanager':
if d.daemon_type != 'osd':
self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].pre_remove(d)
- self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].post_remove(d, is_failed_deploy=False)
+ self.cephadm_services[daemon_type_to_service(
+ str(d.daemon_type))].post_remove(d, is_failed_deploy=False)
else:
cmd_args = {
'prefix': 'osd purge-actual',
self.inventory.rm_host(host)
self.cache.rm_host(host)
self.ssh.reset_con(host)
- self.offline_hosts_remove(host) # if host was in offline host list, we should remove it now.
+ # if host was in offline host list, we should remove it now.
+ self.offline_hosts_remove(host)
self.event.set() # refresh stray health check
self.log.info('Removed host %s' % host)
return "Removed {} host '{}'".format('offline' if offline else '', host)
if action == 'rotate-key':
if d.daemon_type not in ['mgr', 'osd', 'mds',
- 'rgw', 'crash', 'nfs', 'rbd-mirror', 'iscsi']:
+ 'rgw', 'crash', 'nfs', 'rbd-mirror', 'iscsi', 'nvmeof']:
raise OrchestratorError(
f'key rotation not supported for {d.daemon_type}'
)
deps = [self.iscsi_service.get_trusted_ips(iscsi_spec)]
else:
deps = [self.get_mgr_ip()]
+ elif daemon_type == 'nvmeof':
+ deps = [] # TODO(redo)
elif daemon_type == 'prometheus':
# for prometheus we add the active mgr as an explicit dependency,
# this way we force a redeploy after a mgr failover
# an explicit dependency is added for each service-type to force a reconfig
# whenever the number of daemons for those service-type changes from 0 to greater
# than zero and vice versa.
- deps += [s for s in ['node-exporter', 'alertmanager'] if self.cache.get_daemons_by_service(s)]
+ deps += [s for s in ['node-exporter', 'alertmanager']
+ if self.cache.get_daemons_by_service(s)]
if len(self.cache.get_daemons_by_type('ingress')) > 0:
deps.append('ingress')
# add dependency on ceph-exporter daemons
'rgw': PlacementSpec(count=2),
'ingress': PlacementSpec(count=2),
'iscsi': PlacementSpec(count=1),
+ 'nvmeof': PlacementSpec(count=1),
'rbd-mirror': PlacementSpec(count=2),
'cephfs-mirror': PlacementSpec(count=1),
'nfs': PlacementSpec(count=1),
if self.inventory.get_host_with_state("maintenance"):
raise OrchestratorError("Upgrade aborted - you have host(s) in maintenance state")
if self.offline_hosts:
- raise OrchestratorError(f"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}")
+ raise OrchestratorError(
+ f"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}")
if daemon_types is not None and services is not None:
raise OrchestratorError('--daemon-types and --services are mutually exclusive')
if daemon_types is not None:
"""
# despite this mapping entity names to daemons, self.TYPE within
# the CephService class refers to service types, not daemon types
- if daemon_type in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress', 'ceph-exporter']:
+ if daemon_type in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'nvmeof', 'ingress', 'ceph-exporter']:
return AuthEntity(f'client.{daemon_type}.{daemon_id}')
elif daemon_type in ['crash', 'agent']:
if host == "":
--- /dev/null
+import errno
+import json
+import logging
+from typing import List, cast, Optional
+
+from mgr_module import HandleCommandResult
+from ceph.deployment.service_spec import NvmeofServiceSpec
+
+from orchestrator import DaemonDescription, DaemonDescriptionStatus
+from .cephadmservice import CephadmDaemonDeploySpec, CephService
+from .. import utils
+
+logger = logging.getLogger(__name__)
+
+
+class NvmeofService(CephService):
+ TYPE = 'nvmeof'
+
+ def config(self, spec: NvmeofServiceSpec) -> None: # type: ignore
+ assert self.TYPE == spec.service_type
+ assert spec.pool
+ self.mgr._check_pool_exists(spec.pool, spec.service_name())
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+
+ spec = cast(NvmeofServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ igw_id = daemon_spec.daemon_id
+
+ # TODO: fixme, we should restrict the permissions here to only the necessary ones
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(igw_id),
+ ['mon', 'allow *',
+ 'mds', 'allow *',
+ 'mgr', 'allow *',
+ 'osd', 'allow *'])
+ context = {
+ 'spec': spec,
+ 'name': '{}.{}'.format(utils.name_to_config_section('nvmeof'), igw_id),
+ 'addr': self.mgr.get_mgr_ip(),
+ 'port': spec.port,
+ 'tgt_cmd_extra_args': None,
+ 'log_level': 'WARN',
+ 'rpc_socket': '/var/tmp/spdk.sock',
+ }
+ gw_conf = self.mgr.template.render('services/nvmeof/ceph-nvmeof.conf.j2', context)
+
+ daemon_spec.keyring = keyring
+ daemon_spec.extra_files = {'ceph-nvmeof.conf': gw_conf}
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ daemon_spec.deps = [] # TODO: which gw parameters will require a reconfig?
+ return daemon_spec
+
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+ # TODO: what integration do we need with the dashboard?
+ pass
+
+ def ok_to_stop(self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None) -> HandleCommandResult:
+ # if only 1 nvmeof, alert user (this is not passable with --force)
+ warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Nvmeof', 1, True)
+ if warn:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ # if reached here, there is > 1 nvmeof daemon. make sure none are down
+ warn_message = ('ALERT: 1 nvmeof daemon is already down. Please bring it back up before stopping this one')
+ nvmeof_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
+ for i in nvmeof_daemons:
+ if i.status != DaemonDescriptionStatus.running:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
+ warn_message = f'It is presumed safe to stop {names}'
+ return HandleCommandResult(0, warn_message, '')
+
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ """
+ Called after the daemon is removed.
+ """
+ logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
+
+ # TODO: remove config for dashboard nvmeof gateways if any
+ # needed to know if we have ssl stuff for nvmeof in ceph config
+ nvmeof_config_dict = {}
+ ret, nvmeof_config, err = self.mgr.mon_command({
+ 'prefix': 'config-key dump',
+ 'key': 'nvmeof',
+ })
+ if nvmeof_config:
+ nvmeof_config_dict = json.loads(nvmeof_config)
+
+ # remove nvmeof cert and key from ceph config
+ for nvmeof_key, value in nvmeof_config_dict.items():
+ if f'nvmeof/client.{daemon.name()}/' in nvmeof_key:
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'config-key rm',
+ 'key': nvmeof_key,
+ })
+ logger.info(f'{nvmeof_key} removed from ceph config')
+
+ def purge(self, service_name: str) -> None:
+ """Removes configuration
+ """
+ # TODO: what should we purge in this case (if any)?
+ pass
--- /dev/null
+# {{ cephadm_managed }}
+[gateway]
+name = {{ name }}
+group = {{ spec.group }}
+addr = {{ addr }}
+port = {{ port }}
+enable_auth = {{ spec.enable_auth }}
+state_update_notify = True
+state_update_interval_sec = 5
+
+[ceph]
+pool = {{ spec.pool }}
+config_file = /etc/ceph/ceph.conf
+
+[mtls]
+server_key = {{ spec.server_key }}
+client_key = {{ spec.client_key }}
+server_cert = {{ spec.server_cert }}
+client_cert = {{ spec.client_cert }}
+
+[spdk]
+tgt_path = {{ spec.tgt_path }}
+rpc_socket = {{ rpc_socket }}
+timeout = {{ spec.timeout }}
+log_level = {{ log_level }}
+conn_retries = {{ spec.conn_retries }}
+transports = {{ spec.transports }}
+{% if tgt_cmd_extra_args %}
+tgt_cmd_extra_args = {{ tgt_cmd_extra_args }}
+{% endif %}
RbdMirrorService, CrashService, CephadmDaemonDeploySpec
from cephadm.services.iscsi import IscsiService
from cephadm.services.nfs import NFSService
+from cephadm.services.nvmeof import NvmeofService
from cephadm.services.osd import OSDService
from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService, LokiService, PromtailService
from cephadm.module import CephadmOrchestrator
from ceph.deployment.service_spec import IscsiServiceSpec, MonitoringSpec, AlertManagerSpec, \
ServiceSpec, RGWSpec, GrafanaSpec, SNMPGatewaySpec, IngressSpec, PlacementSpec, TracingSpec, \
- PrometheusSpec, CephExporterSpec, NFSServiceSpec
+ PrometheusSpec, CephExporterSpec, NFSServiceSpec, NvmeofServiceSpec
from cephadm.tests.fixtures import with_host, with_service, _run_cephadm, async_side_effect
from ceph.utils import datetime_now
promtail_service = PromtailService(mgr)
crash_service = CrashService(mgr)
iscsi_service = IscsiService(mgr)
+ nvmeof_service = NvmeofService(mgr)
cephadm_services = {
'mon': mon_service,
'mgr': mgr_service,
'promtail': promtail_service,
'crash': crash_service,
'iscsi': iscsi_service,
+ 'nvmeof': nvmeof_service,
}
return cephadm_services
)
+class TestNVMEOFService:
+
+ mgr = FakeMgr()
+ nvmeof_service = NvmeofService(mgr)
+
+ nvmeof_spec = NvmeofServiceSpec(service_type='nvmeof', service_id="a")
+ nvmeof_spec.daemon_type = 'nvmeof'
+ nvmeof_spec.daemon_id = "a"
+ nvmeof_spec.spec = MagicMock()
+ nvmeof_spec.spec.daemon_type = 'nvmeof'
+
+ mgr.spec_store = MagicMock()
+ mgr.spec_store.all_specs.get.return_value = nvmeof_spec
+
+ def test_nvmeof_client_caps(self):
+ pass
+
+ @patch('cephadm.utils.resolve_ip')
+ def test_nvmeof_dashboard_config(self, mock_resolve_ip):
+ pass
+
+ @patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '192.168.100.100')
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ @patch("cephadm.module.CephadmOrchestrator.get_unique_name")
+ def test_nvmeof_config(self, _get_name, _run_cephadm, cephadm_module: CephadmOrchestrator):
+
+ nvmeof_daemon_id = 'testpool.test.qwert'
+ pool = 'testpool'
+ default_port = 5500
+ group = 'mygroup'
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ _get_name.return_value = nvmeof_daemon_id
+
+ nvmeof_gateway_conf = f"""# This file is generated by cephadm.
+[gateway]
+name = client.nvmeof.{nvmeof_daemon_id}
+group = {group}
+addr = 192.168.100.100
+port = {default_port}
+enable_auth = False
+state_update_notify = True
+state_update_interval_sec = 5
+
+[ceph]
+pool = {pool}
+config_file = /etc/ceph/ceph.conf
+
+[mtls]
+server_key = ./server.key
+client_key = ./client.key
+server_cert = ./server.crt
+client_cert = ./client.crt
+
+[spdk]
+tgt_path = /usr/local/bin/nvmf_tgt
+rpc_socket = /var/tmp/spdk.sock
+timeout = 60
+log_level = WARN
+conn_retries = 10
+transports = tcp\n"""
+
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, NvmeofServiceSpec(service_id=pool,
+ group=group,
+ pool=pool)):
+ _run_cephadm.assert_called_with(
+ 'test',
+ f'nvmeof.{nvmeof_daemon_id}',
+ ['_orch', 'deploy'],
+ [],
+ stdin=json.dumps({
+ "fsid": "fsid",
+ "name": "nvmeof.testpool.test.qwert",
+ "image": "",
+ "deploy_arguments": [],
+ "params": {
+ "tcp_ports": [5500]
+ },
+ "meta": {
+ "service_name": "nvmeof.testpool",
+ "ports": [5500],
+ "ip": None,
+ "deployed_by": [],
+ "rank": None,
+ "rank_generation": None,
+ "extra_container_args": None,
+ "extra_entrypoint_args": None
+ },
+ "config_blobs": {
+ "config": "",
+ "keyring": "[client.nvmeof.testpool.test.qwert]\nkey = None\n",
+ "files": {
+ "ceph-nvmeof.conf": nvmeof_gateway_conf
+ }
+ }
+ }),
+ )
+
+
class TestMonitoring:
def _get_config(self, url: str) -> str:
+
return f"""
# This file is generated by cephadm.
# See https://prometheus.io/docs/alerting/configuration/ for documentation.
# NOTE: order important here as these are used for upgrade order
CEPH_TYPES = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw',
'rbd-mirror', 'cephfs-mirror', 'ceph-exporter']
-GATEWAY_TYPES = ['iscsi', 'nfs']
+GATEWAY_TYPES = ['iscsi', 'nfs', 'nvmeof']
MONITORING_STACK_TYPES = ['node-exporter', 'prometheus',
'alertmanager', 'grafana', 'loki', 'promtail']
RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES = ['haproxy', 'nfs']
Map from daemon names to ceph entity names (as seen in config)
"""
daemon_type = name.split('.', 1)[0]
- if daemon_type in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi', 'ceph-exporter']:
+ if daemon_type in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi', 'ceph-exporter', 'nvmeof']:
return ConfEntity('client.' + name)
elif daemon_type in ['mon', 'osd', 'mds', 'mgr', 'client']:
return ConfEntity(name)
SNMPGatewaySpec,
ServiceSpec,
TunedProfileSpec,
+ NvmeofServiceSpec
)
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.hostspec import HostSpec, SpecValidationError
'crash': self.apply_crash,
'grafana': self.apply_grafana,
'iscsi': self.apply_iscsi,
+ 'nvmeof': self.apply_nvmeof,
'mds': self.apply_mds,
'mgr': self.apply_mgr,
'mon': self.apply_mon,
"""Update iscsi cluster"""
raise NotImplementedError()
+ def apply_nvmeof(self, spec: NvmeofServiceSpec) -> OrchResult[str]:
+ """Update nvmeof cluster"""
+ raise NotImplementedError()
+
def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update prometheus cluster"""
raise NotImplementedError()
'haproxy': 'ingress',
'keepalived': 'ingress',
'iscsi': 'iscsi',
+ 'nvmeof': 'nvmeof',
'rbd-mirror': 'rbd-mirror',
'cephfs-mirror': 'cephfs-mirror',
'nfs': 'nfs',
'osd': ['osd'],
'ingress': ['haproxy', 'keepalived'],
'iscsi': ['iscsi'],
+ 'nvmeof': ['nvmeof'],
'rbd-mirror': ['rbd-mirror'],
'cephfs-mirror': ['cephfs-mirror'],
'nfs': ['nfs'],
NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, \
RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \
ServiceDescription, DaemonDescription, IscsiServiceSpec, json_to_generic_spec, \
- GenericSpec, DaemonDescriptionStatus, SNMPGatewaySpec, MDSSpec, TunedProfileSpec
+ GenericSpec, DaemonDescriptionStatus, SNMPGatewaySpec, MDSSpec, TunedProfileSpec, \
+ NvmeofServiceSpec
def nice_delta(now: datetime.datetime, t: Optional[datetime.datetime], suffix: str = '') -> str:
rgw = 'rgw'
nfs = 'nfs'
iscsi = 'iscsi'
+ nvmeof = 'nvmeof'
snmp_gateway = 'snmp-gateway'
elasticsearch = 'elasticsearch'
jaeger_agent = 'jaeger-agent'
)
return self._daemon_add_misc(spec)
+ @_cli_write_command('orch daemon add nvmeof')
+ def _nvmeof_add(self,
+ pool: str,
+ placement: Optional[str] = None,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Start nvmeof daemon(s)"""
+ if inbuf:
+ raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
+
+ spec = NvmeofServiceSpec(
+ service_id='nvmeof',
+ pool=pool,
+ placement=PlacementSpec.from_string(placement),
+ )
+ return self._daemon_add_misc(spec)
+
@_cli_write_command('orch')
def _service_action(self, action: ServiceAction, service_name: str) -> HandleCommandResult:
"""Start, stop, restart, redeploy, or reconfig an entire service (i.e. all daemons)"""
return self._apply_misc([spec], dry_run, format, no_overwrite)
+ @_cli_write_command('orch apply nvmeof')
+ def _apply_nvmeof(self,
+ pool: str,
+ placement: Optional[str] = None,
+ unmanaged: bool = False,
+ dry_run: bool = False,
+ format: Format = Format.plain,
+ no_overwrite: bool = False,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Scale an nvmeof service"""
+ if inbuf:
+ raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
+
+ spec = NvmeofServiceSpec(
+ service_id=pool,
+ pool=pool,
+ placement=PlacementSpec.from_string(placement),
+ unmanaged=unmanaged,
+ preview_only=dry_run
+ )
+
+ spec.validate() # force any validation exceptions to be caught correctly
+
+ return self._apply_misc([spec], dry_run, format, no_overwrite)
+
@_cli_write_command('orch apply snmp-gateway')
def _apply_snmp_gateway(self,
snmp_version: SNMPGatewaySpec.SNMPVersion,
Details of service creation.
Request to the orchestrator for a cluster of daemons
- such as MDS, RGW, iscsi gateway, MONs, MGRs, Prometheus
+ such as MDS, RGW, iscsi gateway, nvmeof gateway, MONs, MGRs, Prometheus
This structure is supposed to be enough information to
start the services.
"""
- KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi loki promtail mds mgr mon nfs ' \
+ KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi nvmeof loki promtail mds mgr mon nfs ' \
'node-exporter osd prometheus rbd-mirror rgw agent ceph-exporter ' \
'container ingress cephfs-mirror snmp-gateway jaeger-tracing ' \
'elasticsearch jaeger-agent jaeger-collector jaeger-query'.split()
- REQUIRES_SERVICE_ID = 'iscsi mds nfs rgw container ingress '.split()
+ REQUIRES_SERVICE_ID = 'iscsi nvmeof mds nfs rgw container ingress '.split()
MANAGED_CONFIG_OPTIONS = [
'mds_join_fs',
]
'osd': DriveGroupSpec,
'mds': MDSSpec,
'iscsi': IscsiServiceSpec,
+ 'nvmeof': NvmeofServiceSpec,
'alertmanager': AlertManagerSpec,
'ingress': IngressSpec,
'container': CustomContainerSpec,
#: ``prometheus``) or (``container``) for custom containers.
self.service_type = service_type
- #: The name of the service. Required for ``iscsi``, ``mds``, ``nfs``, ``osd``, ``rgw``,
- #: ``container``, ``ingress``
+ #: The name of the service. Required for ``iscsi``, ``nvmeof``, ``mds``, ``nfs``, ``osd``,
+ #: ``rgw``, ``container``, ``ingress``
self.service_id = None
if self.service_type in self.REQUIRES_SERVICE_ID or self.service_type == 'osd':
yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer)
+class NvmeofServiceSpec(ServiceSpec):
+ def __init__(self,
+ service_type: str = 'nvmeof',
+ service_id: Optional[str] = None,
+ name: Optional[str] = None,
+ group: Optional[str] = None,
+ port: Optional[int] = None,
+ pool: Optional[str] = None,
+ enable_auth: bool = False,
+ server_key: Optional[str] = None,
+ server_cert: Optional[str] = None,
+ client_key: Optional[str] = None,
+ client_cert: Optional[str] = None,
+ spdk_path: Optional[str] = None,
+ tgt_path: Optional[str] = None,
+ timeout: Optional[int] = 60,
+ conn_retries: Optional[int] = 10,
+ transports: Optional[str] = "tcp",
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ config: Optional[Dict[str, str]] = None,
+ networks: Optional[List[str]] = None,
+ extra_container_args: Optional[List[str]] = None,
+ extra_entrypoint_args: Optional[List[str]] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ ):
+ assert service_type == 'nvmeof'
+ super(NvmeofServiceSpec, self).__init__('nvmeof', service_id=service_id,
+ placement=placement, unmanaged=unmanaged,
+ preview_only=preview_only,
+ config=config, networks=networks,
+ extra_container_args=extra_container_args,
+ extra_entrypoint_args=extra_entrypoint_args,
+ custom_configs=custom_configs)
+
+ #: RADOS pool where ceph-nvmeof config data is stored.
+ self.pool = pool
+ #: ``port`` port of the nvmeof gateway
+ self.port = port or 5500
+ #: ``name`` name of the nvmeof gateway
+ self.name = name
+ #: ``group`` name of the nvmeof gateway
+ self.group = group
+ #: ``enable_auth`` enables user authentication on nvmeof gateway
+ self.enable_auth = enable_auth
+ #: ``server_key`` gateway server key
+ self.server_key = server_key or './server.key'
+ #: ``server_cert`` gateway server certificate
+ self.server_cert = server_cert or './server.crt'
+ #: ``client_key`` client key
+ self.client_key = client_key or './client.key'
+ #: ``client_cert`` client certificate
+ self.client_cert = client_cert or './client.crt'
+ #: ``spdk_path`` path to SPDK
+ self.spdk_path = spdk_path or '/usr/local/bin/nvmf_tgt'
+ #: ``tgt_path`` nvmeof target path
+ self.tgt_path = tgt_path or '/usr/local/bin/nvmf_tgt'
+ #: ``timeout`` ceph connectivity timeout
+ self.timeout = timeout
+ #: ``conn_retries`` ceph connection retries number
+ self.conn_retries = conn_retries
+ #: ``transports`` TODO
+ self.transports = transports
+
+ def get_port_start(self) -> List[int]:
+ return [self.port or 5500]
+
+ def validate(self) -> None:
+ # TODO: what other parameters should be validated as part of this function?
+ super(NvmeofServiceSpec, self).validate()
+
+ if not self.pool:
+ raise SpecValidationError('Cannot add NVMEOF: No Pool specified')
+
+ if self.enable_auth:
+ if not any([self.server_key, self.server_cert, self.client_key, self.client_cert]):
+ raise SpecValidationError(
+ 'enable_auth is true but client/server certificates are missing')
+
+ if self.transports not in ['tcp']:
+ raise SpecValidationError('Invalid transport. Valid values are tcp')
+
+
+yaml.add_representer(NvmeofServiceSpec, ServiceSpec.yaml_representer)
+
+
class IscsiServiceSpec(ServiceSpec):
def __init__(self,
service_type: str = 'iscsi',