]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: Adding suppport for nvmeof
authorRedouane Kachach <rkachach@redhat.com>
Tue, 7 Mar 2023 13:45:35 +0000 (14:45 +0100)
committerAdam King <adking@redhat.com>
Thu, 31 Aug 2023 17:36:09 +0000 (13:36 -0400)
Fixes: https://tracker.ceph.com/issues/61929
Signed-off-by: Redouane Kachach <rkachach@redhat.com>
(cherry picked from commit ea6eb5e3eec1b5be6a1d7d85f7affcad87123604)

src/cephadm/cephadm.py
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/services/cephadmservice.py
src/pybind/mgr/cephadm/services/nvmeof.py [new file with mode: 0644]
src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 [new file with mode: 0644]
src/pybind/mgr/cephadm/tests/test_services.py
src/pybind/mgr/cephadm/utils.py
src/pybind/mgr/orchestrator/_interface.py
src/pybind/mgr/orchestrator/module.py
src/python-common/ceph/deployment/service_spec.py

index 71d65d821de8b052aa971006e9a0a0d7d0e99a8d..268710f965c30c933a56676731790cb358d51bee 100755 (executable)
@@ -55,6 +55,7 @@ DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0'
 DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:9.4.7'
 DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3'
 DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4'
+DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.1'
 DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1'
 DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23'
 DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29'
@@ -389,7 +390,7 @@ class UnauthorizedRegistryError(Error):
 class Ceph(object):
     daemons = ('mon', 'mgr', 'osd', 'mds', 'rgw', 'rbd-mirror',
                'crash', 'cephfs-mirror', 'ceph-exporter')
-    gateways = ('iscsi', 'nfs')
+    gateways = ('iscsi', 'nfs', 'nvmeof')
 
 ##################################
 
@@ -912,7 +913,8 @@ class CephIscsi(object):
         version = None
         out, err, code = call(ctx,
                               [ctx.container_engine.path, 'exec', container_id,
-                               '/usr/bin/python3', '-c', "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"],
+                               '/usr/bin/python3', '-c',
+                               "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"],
                               verbosity=CallVerbosity.QUIET)
         if code == 0:
             version = out.strip()
@@ -979,6 +981,133 @@ class CephIscsi(object):
         tcmu_container.cname = self.get_container_name(desc='tcmu')
         return tcmu_container
 
+
+##################################
+
+
+class CephNvmeof(object):
+    """Defines a Ceph-Nvmeof container"""
+
+    daemon_type = 'nvmeof'
+    required_files = ['ceph-nvmeof.conf']
+    default_image = DEFAULT_NVMEOF_IMAGE
+
+    def __init__(self,
+                 ctx,
+                 fsid,
+                 daemon_id,
+                 config_json,
+                 image=DEFAULT_NVMEOF_IMAGE):
+        # type: (CephadmContext, str, Union[int, str], Dict, str) -> None
+        self.ctx = ctx
+        self.fsid = fsid
+        self.daemon_id = daemon_id
+        self.image = image
+
+        # config-json options
+        self.files = dict_get(config_json, 'files', {})
+
+        # validate the supplied args
+        self.validate()
+
+    @classmethod
+    def init(cls, ctx, fsid, daemon_id):
+        # type: (CephadmContext, str, Union[int, str]) -> CephNvmeof
+        return cls(ctx, fsid, daemon_id,
+                   fetch_configs(ctx), ctx.image)
+
+    @staticmethod
+    def get_container_mounts(data_dir: str) -> Dict[str, str]:
+        mounts = dict()
+        mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z'
+        # mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
+        mounts['/etc/ceph/ceph.client.admin.keyring'] = '/etc/ceph/keyring:z'  # TODO: FIXME
+        mounts[os.path.join(data_dir, 'ceph-nvmeof.conf')] = '/src/ceph-nvmeof.conf:z'
+        mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config'
+        # mounts[log_dir] = '/var/log:z' # TODO: would we need a logdir?
+        mounts['/dev'] = '/dev'
+        return mounts
+
+    @staticmethod
+    def get_container_binds():
+        # type: () -> List[List[str]]
+        binds = []
+        lib_modules = ['type=bind',
+                       'source=/lib/modules',
+                       'destination=/lib/modules',
+                       'ro=true']
+        binds.append(lib_modules)
+        return binds
+
+    @staticmethod
+    def get_version(ctx: CephadmContext, container_id: str) -> Optional[str]:
+        out, err, ret = call_throws(ctx, [
+            ctx.container_engine.path, 'inspect',
+            '--format', '{{index .Config.Labels "io.ceph.version"}}',
+            ctx.image])
+        version = None
+        if ret == 0:
+            version = out.strip()
+        return version
+
+    def validate(self):
+        # type: () -> None
+        if not is_fsid(self.fsid):
+            raise Error('not an fsid: %s' % self.fsid)
+        if not self.daemon_id:
+            raise Error('invalid daemon_id: %s' % self.daemon_id)
+        if not self.image:
+            raise Error('invalid image: %s' % self.image)
+
+        # check for the required files
+        if self.required_files:
+            for fname in self.required_files:
+                if fname not in self.files:
+                    raise Error('required file missing from config-json: %s' % fname)
+
+    def get_daemon_name(self):
+        # type: () -> str
+        return '%s.%s' % (self.daemon_type, self.daemon_id)
+
+    def get_container_name(self, desc=None):
+        # type: (Optional[str]) -> str
+        cname = '%s-%s' % (self.fsid, self.get_daemon_name())
+        if desc:
+            cname = '%s-%s' % (cname, desc)
+        return cname
+
+    def create_daemon_dirs(self, data_dir, uid, gid):
+        # type: (str, int, int) -> None
+        """Create files under the container data dir"""
+        if not os.path.isdir(data_dir):
+            raise OSError('data_dir is not a directory: %s' % (data_dir))
+
+        logger.info('Creating ceph-nvmeof config...')
+        configfs_dir = os.path.join(data_dir, 'configfs')
+        makedirs(configfs_dir, uid, gid, 0o755)
+
+        # populate files from the config-json
+        populate_files(data_dir, self.files, uid, gid)
+
+    @staticmethod
+    def configfs_mount_umount(data_dir, mount=True):
+        # type: (str, bool) -> List[str]
+        mount_path = os.path.join(data_dir, 'configfs')
+        if mount:
+            cmd = 'if ! grep -qs {0} /proc/mounts; then ' \
+                  'mount -t configfs none {0}; fi'.format(mount_path)
+        else:
+            cmd = 'if grep -qs {0} /proc/mounts; then ' \
+                  'umount {0}; fi'.format(mount_path)
+        return cmd.split()
+
+    @staticmethod
+    def get_sysctl_settings() -> List[str]:
+        return [
+            'vm.nr_hugepages = 2048',
+        ]
+
+
 ##################################
 
 
@@ -1430,6 +1559,7 @@ def get_supported_daemons():
     supported_daemons.extend(Monitoring.components)
     supported_daemons.append(NFSGanesha.daemon_type)
     supported_daemons.append(CephIscsi.daemon_type)
+    supported_daemons.append(CephNvmeof.daemon_type)
     supported_daemons.append(CustomContainer.daemon_type)
     supported_daemons.append(HAproxy.daemon_type)
     supported_daemons.append(Keepalived.daemon_type)
@@ -2322,6 +2452,8 @@ def update_default_image(ctx: CephadmContext) -> None:
             ctx.image = Keepalived.default_image
         if type_ == SNMPGateway.daemon_type:
             ctx.image = SNMPGateway.default_image
+        if type_ == CephNvmeof.daemon_type:
+            ctx.image = CephNvmeof.default_image
         if type_ in Tracing.components:
             ctx.image = Tracing.components[type_]['image']
     if not ctx.image:
@@ -2961,6 +3093,10 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
         ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id)
         ceph_iscsi.create_daemon_dirs(data_dir, uid, gid)
 
+    elif daemon_type == CephNvmeof.daemon_type:
+        ceph_nvmeof = CephNvmeof.init(ctx, fsid, daemon_id)
+        ceph_nvmeof.create_daemon_dirs(data_dir, uid, gid)
+
     elif daemon_type == HAproxy.daemon_type:
         haproxy = HAproxy.init(ctx, fsid, daemon_id)
         haproxy.create_daemon_dirs(data_dir, uid, gid)
@@ -3139,6 +3275,8 @@ def get_container_binds(ctx, fsid, daemon_type, daemon_id):
 
     if daemon_type == CephIscsi.daemon_type:
         binds.extend(CephIscsi.get_container_binds())
+    if daemon_type == CephNvmeof.daemon_type:
+        binds.extend(CephNvmeof.get_container_binds())
     elif daemon_type == CustomContainer.daemon_type:
         assert daemon_id
         cc = CustomContainer.init(ctx, fsid, daemon_id)
@@ -3251,6 +3389,11 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id,
         data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
         mounts.update(HAproxy.get_container_mounts(data_dir))
 
+    if daemon_type == CephNvmeof.daemon_type:
+        assert daemon_id
+        data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
+        mounts.update(CephNvmeof.get_container_mounts(data_dir))
+
     if daemon_type == CephIscsi.daemon_type:
         assert daemon_id
         data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
@@ -3385,6 +3528,11 @@ def get_container(ctx: CephadmContext,
         name = '%s.%s' % (daemon_type, daemon_id)
         envs.extend(Keepalived.get_container_envs())
         container_args.extend(['--cap-add=NET_ADMIN', '--cap-add=NET_RAW'])
+    elif daemon_type == CephNvmeof.daemon_type:
+        name = '%s.%s' % (daemon_type, daemon_id)
+        container_args.extend(['--ulimit', 'memlock=-1:-1'])
+        container_args.extend(['--ulimit', 'nofile=10240'])
+        container_args.extend(['--cap-add=SYS_ADMIN', '--cap-add=CAP_SYS_NICE'])
     elif daemon_type == CephIscsi.daemon_type:
         entrypoint = CephIscsi.entrypoint
         name = '%s.%s' % (daemon_type, daemon_id)
@@ -3962,6 +4110,8 @@ def install_sysctl(ctx: CephadmContext, fsid: str, daemon_type: str) -> None:
         lines = HAproxy.get_sysctl_settings()
     elif daemon_type == 'keepalived':
         lines = Keepalived.get_sysctl_settings()
+    elif daemon_type == 'nvmeof':
+        lines = CephNvmeof.get_sysctl_settings()
     lines = filter_sysctl_settings(ctx, lines)
 
     # apply the sysctl settings
@@ -6435,6 +6585,14 @@ def _dispatch_deploy(
                       config=config, keyring=keyring,
                       deployment_type=deployment_type,
                       ports=daemon_ports)
+    elif daemon_type == CephNvmeof.daemon_type:
+        config, keyring = get_config_and_keyring(ctx)
+        uid, gid = 65534, 65534  # TODO: check this
+        c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
+        deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
+                      config=config, keyring=keyring,
+                      deployment_type=deployment_type,
+                      ports=daemon_ports)
     elif daemon_type in Tracing.components:
         uid, gid = 65534, 65534
         c = get_container(ctx, ctx.fsid, daemon_type, daemon_id)
@@ -6976,6 +7134,8 @@ def list_daemons(ctx, detail=True, legacy_dir=None):
                                 version = NFSGanesha.get_version(ctx, container_id)
                             if daemon_type == CephIscsi.daemon_type:
                                 version = CephIscsi.get_version(ctx, container_id)
+                            if daemon_type == CephNvmeof.daemon_type:
+                                version = CephNvmeof.get_version(ctx, container_id)
                             elif not version:
                                 if daemon_type in Ceph.daemons:
                                     out, err, code = call(ctx,
index e12cda2ed3b3b6a1f4e8ee48db6f287a272faa45..47f2f2fc8f5639a87c8a3bbc9f49a1f5368da058 100644 (file)
@@ -58,6 +58,7 @@ from .services.cephadmservice import MonService, MgrService, MdsService, RgwServ
 from .services.ingress import IngressService
 from .services.container import CustomContainerService
 from .services.iscsi import IscsiService
+from .services.nvmeof import NvmeofService
 from .services.nfs import NFSService
 from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
@@ -106,6 +107,7 @@ os._exit = os_exit_noop   # type: ignore
 DEFAULT_IMAGE = 'quay.io/ceph/ceph:v18'
 DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.43.0'
 DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.5.0'
+DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.1'
 DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0'
 DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0'
 DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0'
@@ -201,6 +203,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             default=DEFAULT_PROMETHEUS_IMAGE,
             desc='Prometheus container image',
         ),
+        Option(
+            'container_image_nvmeof',
+            default=DEFAULT_NVMEOF_IMAGE,
+            desc='Nvme-of container image',
+        ),
         Option(
             'container_image_grafana',
             default=DEFAULT_GRAFANA_IMAGE,
@@ -487,6 +494,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             self.mode = ''
             self.container_image_base = ''
             self.container_image_prometheus = ''
+            self.container_image_nvmeof = ''
             self.container_image_grafana = ''
             self.container_image_alertmanager = ''
             self.container_image_node_exporter = ''
@@ -603,7 +611,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             OSDService, NFSService, MonService, MgrService, MdsService,
             RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
             PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService,
-            IngressService, CustomContainerService, CephfsMirrorService,
+            IngressService, CustomContainerService, CephfsMirrorService, NvmeofService,
             CephadmAgent, CephExporterService, SNMPGatewayService, ElasticSearchService,
             JaegerQueryService, JaegerAgentService, JaegerCollectorService
         ]
@@ -615,6 +623,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr'])
         self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
         self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi'])
+        self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof'])
 
         self.scheduled_async_actions: List[Callable] = []
 
@@ -1193,7 +1202,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             if code:
                 return 1, '', ('check-host failed:\n' + '\n'.join(err))
         except ssh.HostConnectionError as e:
-            self.log.exception(f"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}")
+            self.log.exception(
+                f"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}")
             return 1, '', ('check-host failed:\n'
                            + f"Failed to connect to {host} at address ({e.addr}): {str(e)}")
         except OrchestratorError:
@@ -1474,6 +1484,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             )).strip()
         elif daemon_type == 'prometheus':
             image = self.container_image_prometheus
+        elif daemon_type == 'nvmeof':
+            image = self.container_image_nvmeof
         elif daemon_type == 'grafana':
             image = self.container_image_grafana
         elif daemon_type == 'alertmanager':
@@ -1664,7 +1676,8 @@ Then run the following:
 
                 if d.daemon_type != 'osd':
                     self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].pre_remove(d)
-                    self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].post_remove(d, is_failed_deploy=False)
+                    self.cephadm_services[daemon_type_to_service(
+                        str(d.daemon_type))].post_remove(d, is_failed_deploy=False)
                 else:
                     cmd_args = {
                         'prefix': 'osd purge-actual',
@@ -1682,7 +1695,8 @@ Then run the following:
         self.inventory.rm_host(host)
         self.cache.rm_host(host)
         self.ssh.reset_con(host)
-        self.offline_hosts_remove(host)  # if host was in offline host list, we should remove it now.
+        # if host was in offline host list, we should remove it now.
+        self.offline_hosts_remove(host)
         self.event.set()  # refresh stray health check
         self.log.info('Removed host %s' % host)
         return "Removed {} host '{}'".format('offline' if offline else '', host)
@@ -2270,7 +2284,7 @@ Then run the following:
 
         if action == 'rotate-key':
             if d.daemon_type not in ['mgr', 'osd', 'mds',
-                                     'rgw', 'crash', 'nfs', 'rbd-mirror', 'iscsi']:
+                                     'rgw', 'crash', 'nfs', 'rbd-mirror', 'iscsi', 'nvmeof']:
                 raise OrchestratorError(
                     f'key rotation not supported for {d.daemon_type}'
                 )
@@ -2648,6 +2662,8 @@ Then run the following:
                 deps = [self.iscsi_service.get_trusted_ips(iscsi_spec)]
             else:
                 deps = [self.get_mgr_ip()]
+        elif daemon_type == 'nvmeof':
+            deps = []  # TODO(redo)
         elif daemon_type == 'prometheus':
             # for prometheus we add the active mgr as an explicit dependency,
             # this way we force a redeploy after a mgr failover
@@ -2660,7 +2676,8 @@ Then run the following:
             # an explicit dependency is added for each service-type to force a reconfig
             # whenever the number of daemons for those service-type changes from 0 to greater
             # than zero and vice versa.
-            deps += [s for s in ['node-exporter', 'alertmanager'] if self.cache.get_daemons_by_service(s)]
+            deps += [s for s in ['node-exporter', 'alertmanager']
+                     if self.cache.get_daemons_by_service(s)]
             if len(self.cache.get_daemons_by_type('ingress')) > 0:
                 deps.append('ingress')
             # add dependency on ceph-exporter daemons
@@ -3009,6 +3026,7 @@ Then run the following:
                 'rgw': PlacementSpec(count=2),
                 'ingress': PlacementSpec(count=2),
                 'iscsi': PlacementSpec(count=1),
+                'nvmeof': PlacementSpec(count=1),
                 'rbd-mirror': PlacementSpec(count=2),
                 'cephfs-mirror': PlacementSpec(count=1),
                 'nfs': PlacementSpec(count=1),
@@ -3232,7 +3250,8 @@ Then run the following:
         if self.inventory.get_host_with_state("maintenance"):
             raise OrchestratorError("Upgrade aborted - you have host(s) in maintenance state")
         if self.offline_hosts:
-            raise OrchestratorError(f"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}")
+            raise OrchestratorError(
+                f"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}")
         if daemon_types is not None and services is not None:
             raise OrchestratorError('--daemon-types and --services are mutually exclusive')
         if daemon_types is not None:
index b34be195fa38c68d02a8b07cda50e009061f15a9..73c15e295a4bc14f3cb962ce3d3b73ecebc6e897 100644 (file)
@@ -39,7 +39,7 @@ def get_auth_entity(daemon_type: str, daemon_id: str, host: str = "") -> AuthEnt
     """
     # despite this mapping entity names to daemons, self.TYPE within
     # the CephService class refers to service types, not daemon types
-    if daemon_type in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress', 'ceph-exporter']:
+    if daemon_type in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'nvmeof', 'ingress', 'ceph-exporter']:
         return AuthEntity(f'client.{daemon_type}.{daemon_id}')
     elif daemon_type in ['crash', 'agent']:
         if host == "":
diff --git a/src/pybind/mgr/cephadm/services/nvmeof.py b/src/pybind/mgr/cephadm/services/nvmeof.py
new file mode 100644 (file)
index 0000000..00db7e5
--- /dev/null
@@ -0,0 +1,106 @@
+import errno
+import json
+import logging
+from typing import List, cast, Optional
+
+from mgr_module import HandleCommandResult
+from ceph.deployment.service_spec import NvmeofServiceSpec
+
+from orchestrator import DaemonDescription, DaemonDescriptionStatus
+from .cephadmservice import CephadmDaemonDeploySpec, CephService
+from .. import utils
+
+logger = logging.getLogger(__name__)
+
+
+class NvmeofService(CephService):
+    TYPE = 'nvmeof'
+
+    def config(self, spec: NvmeofServiceSpec) -> None:  # type: ignore
+        assert self.TYPE == spec.service_type
+        assert spec.pool
+        self.mgr._check_pool_exists(spec.pool, spec.service_name())
+
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+        assert self.TYPE == daemon_spec.daemon_type
+
+        spec = cast(NvmeofServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+        igw_id = daemon_spec.daemon_id
+
+        # TODO: fixme, we should restrict the permissions here to only the necessary ones
+        keyring = self.get_keyring_with_caps(self.get_auth_entity(igw_id),
+                                             ['mon', 'allow *',
+                                              'mds', 'allow *',
+                                              'mgr', 'allow *',
+                                              'osd', 'allow *'])
+        context = {
+            'spec': spec,
+            'name': '{}.{}'.format(utils.name_to_config_section('nvmeof'), igw_id),
+            'addr': self.mgr.get_mgr_ip(),
+            'port': spec.port,
+            'tgt_cmd_extra_args': None,
+            'log_level': 'WARN',
+            'rpc_socket': '/var/tmp/spdk.sock',
+        }
+        gw_conf = self.mgr.template.render('services/nvmeof/ceph-nvmeof.conf.j2', context)
+
+        daemon_spec.keyring = keyring
+        daemon_spec.extra_files = {'ceph-nvmeof.conf': gw_conf}
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+        daemon_spec.deps = []  # TODO: which gw parameters will require a reconfig?
+        return daemon_spec
+
+    def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+        # TODO: what integration do we need with the dashboard?
+        pass
+
+    def ok_to_stop(self,
+                   daemon_ids: List[str],
+                   force: bool = False,
+                   known: Optional[List[str]] = None) -> HandleCommandResult:
+        # if only 1 nvmeof, alert user (this is not passable with --force)
+        warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Nvmeof', 1, True)
+        if warn:
+            return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+        # if reached here, there is > 1 nvmeof daemon. make sure none are down
+        warn_message = ('ALERT: 1 nvmeof daemon is already down. Please bring it back up before stopping this one')
+        nvmeof_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
+        for i in nvmeof_daemons:
+            if i.status != DaemonDescriptionStatus.running:
+                return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+        names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
+        warn_message = f'It is presumed safe to stop {names}'
+        return HandleCommandResult(0, warn_message, '')
+
+    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+        """
+        Called after the daemon is removed.
+        """
+        logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
+
+        # TODO: remove config for dashboard nvmeof gateways if any
+        # needed to know if we have ssl stuff for nvmeof in ceph config
+        nvmeof_config_dict = {}
+        ret, nvmeof_config, err = self.mgr.mon_command({
+            'prefix': 'config-key dump',
+            'key': 'nvmeof',
+        })
+        if nvmeof_config:
+            nvmeof_config_dict = json.loads(nvmeof_config)
+
+        # remove nvmeof cert and key from ceph config
+        for nvmeof_key, value in nvmeof_config_dict.items():
+            if f'nvmeof/client.{daemon.name()}/' in nvmeof_key:
+                ret, out, err = self.mgr.mon_command({
+                    'prefix': 'config-key rm',
+                    'key': nvmeof_key,
+                })
+                logger.info(f'{nvmeof_key} removed from ceph config')
+
+    def purge(self, service_name: str) -> None:
+        """Removes configuration
+        """
+        #  TODO: what should we purge in this case (if any)?
+        pass
diff --git a/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 b/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2
new file mode 100644 (file)
index 0000000..2b27e96
--- /dev/null
@@ -0,0 +1,30 @@
+# {{ cephadm_managed }}
+[gateway]
+name = {{ name }}
+group = {{ spec.group }}
+addr = {{ addr }}
+port = {{ port }}
+enable_auth = {{ spec.enable_auth }}
+state_update_notify = True
+state_update_interval_sec = 5
+
+[ceph]
+pool = {{ spec.pool }}
+config_file = /etc/ceph/ceph.conf
+
+[mtls]
+server_key = {{ spec.server_key }}
+client_key = {{ spec.client_key }}
+server_cert = {{ spec.server_cert }}
+client_cert = {{ spec.client_cert }}
+
+[spdk]
+tgt_path = {{ spec.tgt_path }}
+rpc_socket = {{ rpc_socket }}
+timeout = {{ spec.timeout }}
+log_level = {{ log_level }}
+conn_retries = {{ spec.conn_retries }}
+transports = {{ spec.transports }}
+{% if tgt_cmd_extra_args %}
+tgt_cmd_extra_args = {{ tgt_cmd_extra_args }}
+{% endif %}
index 5b8863f578ce06f46e7f65fa58ea485b45e0d717..2551d196ddf89e3a6fa3f129cdc022b80d954a27 100644 (file)
@@ -13,13 +13,14 @@ from cephadm.services.cephadmservice import MonService, MgrService, MdsService,
     RbdMirrorService, CrashService, CephadmDaemonDeploySpec
 from cephadm.services.iscsi import IscsiService
 from cephadm.services.nfs import NFSService
+from cephadm.services.nvmeof import NvmeofService
 from cephadm.services.osd import OSDService
 from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
     NodeExporterService, LokiService, PromtailService
 from cephadm.module import CephadmOrchestrator
 from ceph.deployment.service_spec import IscsiServiceSpec, MonitoringSpec, AlertManagerSpec, \
     ServiceSpec, RGWSpec, GrafanaSpec, SNMPGatewaySpec, IngressSpec, PlacementSpec, TracingSpec, \
-    PrometheusSpec, CephExporterSpec, NFSServiceSpec
+    PrometheusSpec, CephExporterSpec, NFSServiceSpec, NvmeofServiceSpec
 from cephadm.tests.fixtures import with_host, with_service, _run_cephadm, async_side_effect
 
 from ceph.utils import datetime_now
@@ -105,6 +106,7 @@ class TestCephadmService:
         promtail_service = PromtailService(mgr)
         crash_service = CrashService(mgr)
         iscsi_service = IscsiService(mgr)
+        nvmeof_service = NvmeofService(mgr)
         cephadm_services = {
             'mon': mon_service,
             'mgr': mgr_service,
@@ -121,6 +123,7 @@ class TestCephadmService:
             'promtail': promtail_service,
             'crash': crash_service,
             'iscsi': iscsi_service,
+            'nvmeof': nvmeof_service,
         }
         return cephadm_services
 
@@ -330,8 +333,108 @@ log_to_file = False"""
                 )
 
 
+class TestNVMEOFService:
+
+    mgr = FakeMgr()
+    nvmeof_service = NvmeofService(mgr)
+
+    nvmeof_spec = NvmeofServiceSpec(service_type='nvmeof', service_id="a")
+    nvmeof_spec.daemon_type = 'nvmeof'
+    nvmeof_spec.daemon_id = "a"
+    nvmeof_spec.spec = MagicMock()
+    nvmeof_spec.spec.daemon_type = 'nvmeof'
+
+    mgr.spec_store = MagicMock()
+    mgr.spec_store.all_specs.get.return_value = nvmeof_spec
+
+    def test_nvmeof_client_caps(self):
+        pass
+
+    @patch('cephadm.utils.resolve_ip')
+    def test_nvmeof_dashboard_config(self, mock_resolve_ip):
+        pass
+
+    @patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '192.168.100.100')
+    @patch("cephadm.serve.CephadmServe._run_cephadm")
+    @patch("cephadm.module.CephadmOrchestrator.get_unique_name")
+    def test_nvmeof_config(self, _get_name, _run_cephadm, cephadm_module: CephadmOrchestrator):
+
+        nvmeof_daemon_id = 'testpool.test.qwert'
+        pool = 'testpool'
+        default_port = 5500
+        group = 'mygroup'
+        _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+        _get_name.return_value = nvmeof_daemon_id
+
+        nvmeof_gateway_conf = f"""# This file is generated by cephadm.
+[gateway]
+name = client.nvmeof.{nvmeof_daemon_id}
+group = {group}
+addr = 192.168.100.100
+port = {default_port}
+enable_auth = False
+state_update_notify = True
+state_update_interval_sec = 5
+
+[ceph]
+pool = {pool}
+config_file = /etc/ceph/ceph.conf
+
+[mtls]
+server_key = ./server.key
+client_key = ./client.key
+server_cert = ./server.crt
+client_cert = ./client.crt
+
+[spdk]
+tgt_path = /usr/local/bin/nvmf_tgt
+rpc_socket = /var/tmp/spdk.sock
+timeout = 60
+log_level = WARN
+conn_retries = 10
+transports = tcp\n"""
+
+        with with_host(cephadm_module, 'test'):
+            with with_service(cephadm_module, NvmeofServiceSpec(service_id=pool,
+                                                                group=group,
+                                                                pool=pool)):
+                _run_cephadm.assert_called_with(
+                    'test',
+                    f'nvmeof.{nvmeof_daemon_id}',
+                    ['_orch', 'deploy'],
+                    [],
+                    stdin=json.dumps({
+                        "fsid": "fsid",
+                        "name": "nvmeof.testpool.test.qwert",
+                        "image": "",
+                        "deploy_arguments": [],
+                        "params": {
+                            "tcp_ports": [5500]
+                        },
+                        "meta": {
+                            "service_name": "nvmeof.testpool",
+                            "ports": [5500],
+                            "ip": None,
+                            "deployed_by": [],
+                            "rank": None,
+                            "rank_generation": None,
+                            "extra_container_args": None,
+                            "extra_entrypoint_args": None
+                        },
+                        "config_blobs": {
+                            "config": "",
+                            "keyring": "[client.nvmeof.testpool.test.qwert]\nkey = None\n",
+                            "files": {
+                                "ceph-nvmeof.conf": nvmeof_gateway_conf
+                            }
+                        }
+                    }),
+                )
+
+
 class TestMonitoring:
     def _get_config(self, url: str) -> str:
+
         return f"""
         # This file is generated by cephadm.
         # See https://prometheus.io/docs/alerting/configuration/ for documentation.
index 091c378147d8b793079a7e91488d031ee3e90436..e920179710ce7953371bc0f552d31c0810779158 100644 (file)
@@ -23,7 +23,7 @@ class CephadmNoImage(Enum):
 # NOTE: order important here as these are used for upgrade order
 CEPH_TYPES = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw',
               'rbd-mirror', 'cephfs-mirror', 'ceph-exporter']
-GATEWAY_TYPES = ['iscsi', 'nfs']
+GATEWAY_TYPES = ['iscsi', 'nfs', 'nvmeof']
 MONITORING_STACK_TYPES = ['node-exporter', 'prometheus',
                           'alertmanager', 'grafana', 'loki', 'promtail']
 RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES = ['haproxy', 'nfs']
@@ -58,7 +58,7 @@ def name_to_config_section(name: str) -> ConfEntity:
     Map from daemon names to ceph entity names (as seen in config)
     """
     daemon_type = name.split('.', 1)[0]
-    if daemon_type in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi', 'ceph-exporter']:
+    if daemon_type in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi', 'ceph-exporter', 'nvmeof']:
         return ConfEntity('client.' + name)
     elif daemon_type in ['mon', 'osd', 'mds', 'mgr', 'client']:
         return ConfEntity(name)
index f9957e8049d7227056b6de41a69aaabe4719d854..581e925a0922a24791c07bee0cc916a777504672 100644 (file)
@@ -42,6 +42,7 @@ from ceph.deployment.service_spec import (
     SNMPGatewaySpec,
     ServiceSpec,
     TunedProfileSpec,
+    NvmeofServiceSpec
 )
 from ceph.deployment.drive_group import DriveGroupSpec
 from ceph.deployment.hostspec import HostSpec, SpecValidationError
@@ -489,6 +490,7 @@ class Orchestrator(object):
             'crash': self.apply_crash,
             'grafana': self.apply_grafana,
             'iscsi': self.apply_iscsi,
+            'nvmeof': self.apply_nvmeof,
             'mds': self.apply_mds,
             'mgr': self.apply_mgr,
             'mon': self.apply_mon,
@@ -677,6 +679,10 @@ class Orchestrator(object):
         """Update iscsi cluster"""
         raise NotImplementedError()
 
+    def apply_nvmeof(self, spec: NvmeofServiceSpec) -> OrchResult[str]:
+        """Update nvmeof cluster"""
+        raise NotImplementedError()
+
     def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update prometheus cluster"""
         raise NotImplementedError()
@@ -807,6 +813,7 @@ def daemon_type_to_service(dtype: str) -> str:
         'haproxy': 'ingress',
         'keepalived': 'ingress',
         'iscsi': 'iscsi',
+        'nvmeof': 'nvmeof',
         'rbd-mirror': 'rbd-mirror',
         'cephfs-mirror': 'cephfs-mirror',
         'nfs': 'nfs',
@@ -839,6 +846,7 @@ def service_to_daemon_types(stype: str) -> List[str]:
         'osd': ['osd'],
         'ingress': ['haproxy', 'keepalived'],
         'iscsi': ['iscsi'],
+        'nvmeof': ['nvmeof'],
         'rbd-mirror': ['rbd-mirror'],
         'cephfs-mirror': ['cephfs-mirror'],
         'nfs': ['nfs'],
index a664b24773f29692a1c29bde66af9bc4b51c5910..26966aa8fcf29130dc33314732dbb016e4535038 100644 (file)
@@ -30,7 +30,8 @@ from ._interface import OrchestratorClientMixin, DeviceLightLoc, _cli_read_comma
     NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, \
     RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \
     ServiceDescription, DaemonDescription, IscsiServiceSpec, json_to_generic_spec, \
-    GenericSpec, DaemonDescriptionStatus, SNMPGatewaySpec, MDSSpec, TunedProfileSpec
+    GenericSpec, DaemonDescriptionStatus, SNMPGatewaySpec, MDSSpec, TunedProfileSpec, \
+    NvmeofServiceSpec
 
 
 def nice_delta(now: datetime.datetime, t: Optional[datetime.datetime], suffix: str = '') -> str:
@@ -150,6 +151,7 @@ class ServiceType(enum.Enum):
     rgw = 'rgw'
     nfs = 'nfs'
     iscsi = 'iscsi'
+    nvmeof = 'nvmeof'
     snmp_gateway = 'snmp-gateway'
     elasticsearch = 'elasticsearch'
     jaeger_agent = 'jaeger-agent'
@@ -1196,6 +1198,22 @@ Usage:
         )
         return self._daemon_add_misc(spec)
 
+    @_cli_write_command('orch daemon add nvmeof')
+    def _nvmeof_add(self,
+                    pool: str,
+                    placement: Optional[str] = None,
+                    inbuf: Optional[str] = None) -> HandleCommandResult:
+        """Start nvmeof daemon(s)"""
+        if inbuf:
+            raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
+
+        spec = NvmeofServiceSpec(
+            service_id='nvmeof',
+            pool=pool,
+            placement=PlacementSpec.from_string(placement),
+        )
+        return self._daemon_add_misc(spec)
+
     @_cli_write_command('orch')
     def _service_action(self, action: ServiceAction, service_name: str) -> HandleCommandResult:
         """Start, stop, restart, redeploy, or reconfig an entire service (i.e. all daemons)"""
@@ -1437,6 +1455,31 @@ Usage:
 
         return self._apply_misc([spec], dry_run, format, no_overwrite)
 
+    @_cli_write_command('orch apply nvmeof')
+    def _apply_nvmeof(self,
+                      pool: str,
+                      placement: Optional[str] = None,
+                      unmanaged: bool = False,
+                      dry_run: bool = False,
+                      format: Format = Format.plain,
+                      no_overwrite: bool = False,
+                      inbuf: Optional[str] = None) -> HandleCommandResult:
+        """Scale an nvmeof service"""
+        if inbuf:
+            raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
+
+        spec = NvmeofServiceSpec(
+            service_id=pool,
+            pool=pool,
+            placement=PlacementSpec.from_string(placement),
+            unmanaged=unmanaged,
+            preview_only=dry_run
+        )
+
+        spec.validate()  # force any validation exceptions to be caught correctly
+
+        return self._apply_misc([spec], dry_run, format, no_overwrite)
+
     @_cli_write_command('orch apply snmp-gateway')
     def _apply_snmp_gateway(self,
                             snmp_version: SNMPGatewaySpec.SNMPVersion,
index ffa7ea66ecec2adef937db2463055bc7f195c177..0b76f8215c718bf2c494e442daece83848c151e7 100644 (file)
@@ -617,16 +617,16 @@ class ServiceSpec(object):
     Details of service creation.
 
     Request to the orchestrator for a cluster of daemons
-    such as MDS, RGW, iscsi gateway, MONs, MGRs, Prometheus
+    such as MDS, RGW, iscsi gateway, nvmeof gateway, MONs, MGRs, Prometheus
 
     This structure is supposed to be enough information to
     start the services.
     """
-    KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi loki promtail mds mgr mon nfs ' \
+    KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi nvmeof loki promtail mds mgr mon nfs ' \
                           'node-exporter osd prometheus rbd-mirror rgw agent ceph-exporter ' \
                           'container ingress cephfs-mirror snmp-gateway jaeger-tracing ' \
                           'elasticsearch jaeger-agent jaeger-collector jaeger-query'.split()
-    REQUIRES_SERVICE_ID = 'iscsi mds nfs rgw container ingress '.split()
+    REQUIRES_SERVICE_ID = 'iscsi nvmeof mds nfs rgw container ingress '.split()
     MANAGED_CONFIG_OPTIONS = [
         'mds_join_fs',
     ]
@@ -642,6 +642,7 @@ class ServiceSpec(object):
             'osd': DriveGroupSpec,
             'mds': MDSSpec,
             'iscsi': IscsiServiceSpec,
+            'nvmeof': NvmeofServiceSpec,
             'alertmanager': AlertManagerSpec,
             'ingress': IngressSpec,
             'container': CustomContainerSpec,
@@ -702,8 +703,8 @@ class ServiceSpec(object):
         #: ``prometheus``) or (``container``) for custom containers.
         self.service_type = service_type
 
-        #: The name of the service. Required for ``iscsi``, ``mds``, ``nfs``, ``osd``, ``rgw``,
-        #: ``container``, ``ingress``
+        #: The name of the service. Required for ``iscsi``, ``nvmeof``, ``mds``, ``nfs``, ``osd``,
+        #: ``rgw``, ``container``, ``ingress``
         self.service_id = None
 
         if self.service_type in self.REQUIRES_SERVICE_ID or self.service_type == 'osd':
@@ -1095,6 +1096,93 @@ class RGWSpec(ServiceSpec):
 yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer)
 
 
+class NvmeofServiceSpec(ServiceSpec):
+    def __init__(self,
+                 service_type: str = 'nvmeof',
+                 service_id: Optional[str] = None,
+                 name: Optional[str] = None,
+                 group: Optional[str] = None,
+                 port: Optional[int] = None,
+                 pool: Optional[str] = None,
+                 enable_auth: bool = False,
+                 server_key: Optional[str] = None,
+                 server_cert: Optional[str] = None,
+                 client_key: Optional[str] = None,
+                 client_cert: Optional[str] = None,
+                 spdk_path: Optional[str] = None,
+                 tgt_path: Optional[str] = None,
+                 timeout: Optional[int] = 60,
+                 conn_retries: Optional[int] = 10,
+                 transports: Optional[str] = "tcp",
+                 placement: Optional[PlacementSpec] = None,
+                 unmanaged: bool = False,
+                 preview_only: bool = False,
+                 config: Optional[Dict[str, str]] = None,
+                 networks: Optional[List[str]] = None,
+                 extra_container_args: Optional[List[str]] = None,
+                 extra_entrypoint_args: Optional[List[str]] = None,
+                 custom_configs: Optional[List[CustomConfig]] = None,
+                 ):
+        assert service_type == 'nvmeof'
+        super(NvmeofServiceSpec, self).__init__('nvmeof', service_id=service_id,
+                                                placement=placement, unmanaged=unmanaged,
+                                                preview_only=preview_only,
+                                                config=config, networks=networks,
+                                                extra_container_args=extra_container_args,
+                                                extra_entrypoint_args=extra_entrypoint_args,
+                                                custom_configs=custom_configs)
+
+        #: RADOS pool where ceph-nvmeof config data is stored.
+        self.pool = pool
+        #: ``port`` port of the nvmeof gateway
+        self.port = port or 5500
+        #: ``name`` name of the nvmeof gateway
+        self.name = name
+        #: ``group`` name of the nvmeof gateway
+        self.group = group
+        #: ``enable_auth`` enables user authentication on nvmeof gateway
+        self.enable_auth = enable_auth
+        #: ``server_key`` gateway server key
+        self.server_key = server_key or './server.key'
+        #: ``server_cert`` gateway server certificate
+        self.server_cert = server_cert or './server.crt'
+        #: ``client_key`` client key
+        self.client_key = client_key or './client.key'
+        #: ``client_cert`` client certificate
+        self.client_cert = client_cert or './client.crt'
+        #: ``spdk_path`` path to SPDK
+        self.spdk_path = spdk_path or '/usr/local/bin/nvmf_tgt'
+        #: ``tgt_path`` nvmeof target path
+        self.tgt_path = tgt_path or '/usr/local/bin/nvmf_tgt'
+        #: ``timeout`` ceph connectivity timeout
+        self.timeout = timeout
+        #: ``conn_retries`` ceph connection retries number
+        self.conn_retries = conn_retries
+        #: ``transports`` TODO
+        self.transports = transports
+
+    def get_port_start(self) -> List[int]:
+        return [self.port or 5500]
+
+    def validate(self) -> None:
+        #  TODO: what other parameters should be validated as part of this function?
+        super(NvmeofServiceSpec, self).validate()
+
+        if not self.pool:
+            raise SpecValidationError('Cannot add NVMEOF: No Pool specified')
+
+        if self.enable_auth:
+            if not any([self.server_key, self.server_cert, self.client_key, self.client_cert]):
+                raise SpecValidationError(
+                    'enable_auth is true but client/server certificates are missing')
+
+        if self.transports not in ['tcp']:
+            raise SpecValidationError('Invalid transport. Valid values are tcp')
+
+
+yaml.add_representer(NvmeofServiceSpec, ServiceSpec.yaml_representer)
+
+
 class IscsiServiceSpec(ServiceSpec):
     def __init__(self,
                  service_type: str = 'iscsi',