]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/smb: update the handler to support external ceph cluster type
authorJohn Mulligan <jmulligan@redhat.com>
Fri, 28 Nov 2025 17:53:06 +0000 (12:53 -0500)
committerJohn Mulligan <jmulligan@redhat.com>
Tue, 17 Feb 2026 15:59:12 +0000 (10:59 -0500)
Signed-off-by: John Mulligan <jmulligan@redhat.com>
src/pybind/mgr/smb/handler.py

index db77e9abccf97704c4f342abe1cce30a0b236356..ee0616573ffcd85da34ef54170366bd67857aba3 100644 (file)
@@ -13,10 +13,12 @@ from typing import (
 )
 
 import contextlib
+import dataclasses
 import logging
 import time
 
-from ceph.deployment.service_spec import SMBSpec
+from ceph.deployment.service_spec import SMBExternalCephCluster, SMBSpec
+from ceph.fs.earmarking import EarmarkTopScope
 
 from . import config_store, external, resources
 from .enums import (
@@ -31,6 +33,7 @@ from .enums import (
 )
 from .internal import (
     ClusterEntry,
+    ExternalCephClusterEntry,
     JoinAuthEntry,
     ShareEntry,
     TLSCredentialEntry,
@@ -43,6 +46,7 @@ from .proto import (
     EarmarkResolver,
     OrchSubmitter,
     PathResolver,
+    Self,
     Simplified,
 )
 from .resources import SMBResource
@@ -51,6 +55,7 @@ from .staging import (
     Staging,
     auth_refs,
     cross_check_resource,
+    ext_cluster_refs,
     tls_refs,
     ug_refs,
 )
@@ -82,12 +87,14 @@ class ClusterChangeGroup:
         join_auths: List[resources.JoinAuth],
         users_and_groups: List[resources.UsersAndGroups],
         tls_credentials: List[resources.TLSCredential],
+        ext_ceph_clusters: List[resources.ExternalCephCluster],
     ):
         self.cluster = cluster
         self.shares = shares
         self.join_auths = join_auths
         self.users_and_groups = users_and_groups
         self.tls_credentials = tls_credentials
+        self.ext_ceph_clusters = ext_ceph_clusters
         # a cache for modified entries
         self.cache = config_store.EntryCache()
 
@@ -120,6 +127,20 @@ class _FakePathResolver:
     resolve_exists = resolve
 
 
+class ExoResolver:
+    def __init__(self, cluster: resources.Cluster) -> None:
+        self._cluster = cluster
+
+    def resolve(
+        self, volume: str, subvolumegroup: str, subvolume: str, path: str
+    ) -> str:
+        assert not subvolumegroup
+        assert not subvolume
+        return path
+
+    resolve_exists = resolve
+
+
 class _FakeEarmarkResolver:
     """A stub EarmarkResolver for unit testing."""
 
@@ -132,7 +153,9 @@ class _FakeEarmarkResolver:
     def set_earmark(self, path: str, volume: str, earmark: str) -> None:
         pass
 
-    def check_earmark(self, earmark: str, top_level_scope: str) -> bool:
+    def check_earmark(
+        self, earmark: str, top_level_scope: EarmarkTopScope
+    ) -> bool:
         return True
 
 
@@ -152,6 +175,7 @@ class _Matcher:
         resources.JoinAuth,
         resources.UsersAndGroups,
         resources.TLSCredential,
+        resources.ExternalCephCluster,
     )
 
     def __init__(self) -> None:
@@ -406,11 +430,15 @@ class ClusterConfigHandler:
                     msg='a resource with the same ID already exists',
                 )
         try:
+            path_resolver = self._choose_path_resolver(resource, staging)
+            earmark_resolver = self._choose_earmark_resolver(
+                resource, staging
+            )
             cross_check_resource(
                 resource,
                 staging,
-                path_resolver=self._path_resolver,
-                earmark_resolver=self._earmark_resolver,
+                path_resolver=path_resolver,
+                earmark_resolver=earmark_resolver,
             )
         except ErrorResult as err:
             log.debug('rejected resource: %r', resource)
@@ -419,6 +447,30 @@ class ClusterConfigHandler:
         result = Result(resource, success=True, status={'checked': True})
         return result
 
+    def _choose_path_resolver(
+        self, resource: SMBResource, staging: Staging
+    ) -> PathResolver:
+        if isinstance(resource, resources.Share):
+            cluster = staging.get_cluster(resource.cluster_id)
+            if refs := ext_cluster_refs(cluster):
+                log.debug(
+                    "Selected external resolver for %s (ext refs: %r)",
+                    resource,
+                    refs,
+                )
+                return ExoResolver(cluster)
+        log.debug("Selected default path resolver for %s", resource)
+        return self._path_resolver
+
+    def _choose_earmark_resolver(
+        self, resource: SMBResource, staging: Staging
+    ) -> EarmarkResolver:
+        if isinstance(resource, resources.Share):
+            cluster = staging.get_cluster(resource.cluster_id)
+            if ext_cluster_refs(cluster):
+                return _FakeEarmarkResolver()
+        return self._earmark_resolver
+
     def _sync_clusters(
         self, modified_cluster_ids: Optional[Collection[str]] = None
     ) -> None:
@@ -466,6 +518,12 @@ class ClusterConfigHandler:
                     ).get_tls_credential()
                     for _id in tls_refs(cluster)
                 ],
+                [
+                    ExternalCephClusterEntry.from_store(
+                        self.internal_store, _id
+                    ).get_external_ceph_cluster()
+                    for _id in ext_cluster_refs(cluster)
+                ],
             )
             change_groups.append(change_group)
         for change_group in change_groups:
@@ -500,6 +558,7 @@ class ClusterConfigHandler:
         chg_join_ids: Set[str] = set()
         chg_ug_ids: Set[str] = set()
         chg_tls_ids: Set[str] = set()
+        chg_extc_ids: Set[str] = set()
         for result in updated:
             state = (result.status or {}).get('state', None)
             if state in (State.PRESENT, State.NOT_PRESENT):
@@ -520,11 +579,13 @@ class ClusterConfigHandler:
                 chg_ug_ids.add(result.src.users_groups_id)
             elif isinstance(result.src, resources.TLSCredential):
                 chg_tls_ids.add(result.src.tls_credential_id)
+            elif isinstance(result.src, resources.ExternalCephCluster):
+                chg_extc_ids.add(result.src.external_ceph_cluster_id)
 
         # TODO: here's a lazy bit. if any join auths or users/groups changed we
         # will regen all clusters because these can be shared by >1 cluster.
         # In future, make this only pick clusters using the named resources.
-        if chg_join_ids or chg_ug_ids or chg_tls_ids:
+        if chg_join_ids or chg_ug_ids or chg_tls_ids or chg_extc_ids:
             chg_cluster_ids.update(ClusterEntry.ids(self.internal_store))
         return chg_cluster_ids
 
@@ -536,10 +597,11 @@ class ClusterConfigHandler:
             'saving external store for cluster: %s',
             change_group.cluster.cluster_id,
         )
+        cluster = change_group.cluster
+        assert isinstance(cluster, resources.Cluster)
         # vols: hold the cephfs volumes our shares touch. some operations are
         # disabled/skipped unless we touch volumes.
         vols = {share.checked_cephfs.volume for share in change_group.shares}
-        data_entity = _cephx_data_entity(change_group.cluster.cluster_id)
         # save the various object types
         previous_info = _swap_pending_cluster_info(
             self.public_store,
@@ -549,12 +611,10 @@ class ClusterConfigHandler:
         _save_pending_join_auths(self.priv_store, change_group)
         _save_pending_users_and_groups(self.priv_store, change_group)
         _save_pending_tls_credentials(self.priv_store, change_group)
-        _save_pending_config(
-            self.public_store,
-            change_group,
-            self._path_resolver,
-            data_entity,
+        cluster_conf = _ClusterConf.assemble(
+            change_group, self._path_resolver, self._authorizer
         )
+        _save_pending_config(self.public_store, cluster_conf)
         # remove any stray objects
         external.rm_other_in_ns(
             self.priv_store,
@@ -562,17 +622,7 @@ class ClusterConfigHandler:
             set(change_group.cache),
         )
 
-        # ensure a entity exists with access to the volumes
-        for volume in vols:
-            self._authorizer.authorize_entity(volume, data_entity)
-        if not vols:
-            # there were no volumes, and thus nothing to authorize. set data_entity
-            # to an empty string to avoid adding it to the svc spec later.
-            data_entity = ''
-
         # build a service spec for smb cluster
-        cluster = change_group.cluster
-        assert isinstance(cluster, resources.Cluster)
         config_entries = [
             change_group.cache[external.config_key(cluster.cluster_id)],
             self.public_store[
@@ -601,14 +651,19 @@ class ClusterConfigHandler:
             ]
             for tc in change_group.tls_credentials
         }
+        ext_ceph_cluster = None
+        if change_group.ext_ceph_clusters:
+            assert len(change_group.ext_ceph_clusters) == 1
+            ext_ceph_cluster = change_group.ext_ceph_clusters[0]
         smb_spec = _generate_smb_service_spec(
             cluster,
             config_entries=config_entries,
             join_source_entries=join_source_entries,
             user_source_entries=user_source_entries,
             tls_credential_entries=tls_credential_entries,
-            data_entity=data_entity,
+            data_entity=cluster_conf.data_entity,
             needs_proxy=_has_proxied_vfs(change_group),
+            ext_ceph_cluster=ext_ceph_cluster,
         )
         _save_pending_spec_backup(self.public_store, change_group, smb_spec)
         # if orch was ever needed in the past we must "re-orch", but if we have
@@ -668,9 +723,68 @@ def order_resources(
     return sorted(resource_objs, key=_keyfunc)
 
 
-def _generate_share(
-    share: resources.Share, resolver: PathResolver, cephx_entity: str
-) -> Dict[str, Dict[str, str]]:
+@dataclasses.dataclass(frozen=True)
+class _ShareConf:
+    resource: resources.Share
+    resolver: PathResolver
+    cephx_entity: str
+    ceph_cluster: str
+
+
+@dataclasses.dataclass(frozen=True)
+class _ClusterConf:
+    resource: resources.Cluster
+    shares: Iterable[_ShareConf]
+    change_group: ClusterChangeGroup
+    data_entity: str
+
+    @classmethod
+    def assemble(
+        cls,
+        change_group: ClusterChangeGroup,
+        default_resolver: PathResolver,
+        authorizer: AccessAuthorizer,
+    ) -> Self:
+        extcc = None
+        assert isinstance(change_group.cluster, resources.Cluster)
+        if change_group.ext_ceph_clusters:
+            assert len(change_group.ext_ceph_clusters) == 1
+            extcc = change_group.ext_ceph_clusters[0]
+
+        resolver = default_resolver
+        cephx_entity = ''  # default to no entity for data access
+        cephadm_data_entity = ''  # passed to cephadm service spec
+        ceph_cluster = ''  # empty string means local cluster
+        if extcc:
+            log.debug('external ceph cluster')
+            resolver = ExoResolver(change_group.cluster)
+            ceph_cluster = 'exo'
+            cephx_entity = checked(extcc.cluster).cephfs_user.name
+        elif change_group.shares:
+            log.debug('local ceph cluster with shares')
+            cephx_entity = _cephx_data_entity(change_group.cluster)
+            # ensure an entity exists with access to the volumes
+            for share in change_group.shares:
+                authorizer.authorize_entity(
+                    share.checked_cephfs.volume, cephx_entity
+                )
+            cephadm_data_entity = cephx_entity
+        else:
+            log.debug('local cluster without shares: skipping ceph auth')
+        return cls(
+            change_group.cluster,
+            [
+                _ShareConf(s, resolver, cephx_entity, ceph_cluster)
+                for s in change_group.shares
+            ],
+            change_group,
+            cephadm_data_entity,
+        )
+
+
+def _generate_share(conf: _ShareConf) -> Dict[str, Dict[str, str]]:
+    share = conf.resource
+    cephx_entity = conf.cephx_entity
     cephfs = share.checked_cephfs
     assert cephfs.provider.is_vfs(), "not a vfs provider"
     assert cephx_entity, "cephx entity name missing"
@@ -681,7 +795,7 @@ def _generate_share(
     plen = len(_prefix)
     if cephx_entity.startswith(_prefix):
         cephx_entity = cephx_entity[plen:]
-    path = resolver.resolve(
+    path = conf.resolver.resolve(
         cephfs.volume,
         cephfs.subvolumegroup,
         cephfs.subvolume,
@@ -695,13 +809,18 @@ def _generate_share(
         }[cephfs.provider.expand()]
     except KeyError:
         raise ValueError(f'unsupported provider: {cephfs.provider}')
+    ceph_config_file = (
+        f'/etc/ceph/{conf.ceph_cluster}.ceph.conf'
+        if conf.ceph_cluster
+        else '/etc/ceph/ceph.conf'
+    )
     cfg = {
         # smb.conf options
         'options': {
             'path': path,
             "vfs objects": f"acl_xattr ceph_snapshots {ceph_vfs}",
             'acl_xattr:security_acl_name': 'user.NTACL',
-            f'{ceph_vfs}:config_file': '/etc/ceph/ceph.conf',
+            f'{ceph_vfs}:config_file': ceph_config_file,
             f'{ceph_vfs}:filesystem': cephfs.volume,
             f'{ceph_vfs}:user_id': cephx_entity,
             'read only': ynbool(share.readonly),
@@ -791,13 +910,9 @@ def _generate_share_hosts_access(
         cfg['options']['hosts deny'] = ', '.join(hosts_deny)
 
 
-def _generate_config(
-    cluster: resources.Cluster,
-    shares: Iterable[resources.Share],
-    resolver: PathResolver,
-    cephx_entity: str = "",
-) -> Dict[str, Any]:
+def _generate_config(conf: _ClusterConf) -> Dict[str, Any]:
     cluster_global_opts = {}
+    cluster = conf.resource
     if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY:
         assert cluster.domain_settings is not None
         cluster_global_opts['security'] = 'ads'
@@ -810,8 +925,7 @@ def _generate_config(
     cluster_global_opts['smb ports'] = str(_smb_port(cluster))
 
     share_configs = {
-        share.name: _generate_share(share, resolver, cephx_entity)
-        for share in shares
+        share.resource.name: _generate_share(share) for share in conf.shares
     }
 
     instance_features = []
@@ -862,6 +976,7 @@ def _generate_smb_service_spec(
     tls_credential_entries: Dict[str, ConfigEntry],
     data_entity: str = '',
     needs_proxy: bool = False,
+    ext_ceph_cluster: Optional[resources.ExternalCephCluster],
 ) -> SMBSpec:
     features = []
     if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY:
@@ -903,6 +1018,19 @@ def _generate_smb_service_spec(
         rc_ca_cert = _tls_uri(
             cluster.remote_control.ca_cert, tls_credential_entries
         )
+    ceph_cluster_configs = None
+    if ext_ceph_cluster:
+        exo = checked(ext_ceph_cluster.cluster)
+        ceph_cluster_configs = [
+            SMBExternalCephCluster(
+                alias='exo',
+                fsid=exo.fsid,
+                mon_host=exo.mon_host,
+                user=exo.cephfs_user.name,
+                key=exo.cephfs_user.key,
+            )
+        ]
+
     return SMBSpec(
         service_id=cluster.cluster_id,
         placement=cluster.placement,
@@ -919,6 +1047,7 @@ def _generate_smb_service_spec(
         remote_control_ssl_cert=rc_cert,
         remote_control_ssl_key=rc_key,
         remote_control_ca_cert=rc_ca_cert,
+        ceph_cluster_configs=ceph_cluster_configs,
     )
 
 
@@ -1024,18 +1153,13 @@ def _save_pending_tls_credentials(
 
 def _save_pending_config(
     store: ConfigStore,
-    change_group: ClusterChangeGroup,
-    resolver: PathResolver,
-    cephx_entity: str = "",
+    cluster_conf: _ClusterConf,
 ) -> None:
-    assert isinstance(change_group.cluster, resources.Cluster)
     # generate the cluster configuration and save it in the public store
-    cconfig = _generate_config(
-        change_group.cluster, change_group.shares, resolver, cephx_entity
-    )
-    centry = store[external.config_key(change_group.cluster.cluster_id)]
+    cconfig = _generate_config(cluster_conf)
+    centry = store[external.config_key(cluster_conf.resource.cluster_id)]
     centry.set(cconfig)
-    change_group.cache_updated_entry(centry)
+    cluster_conf.change_group.cache_updated_entry(centry)
 
 
 def _save_pending_spec_backup(
@@ -1046,11 +1170,13 @@ def _save_pending_spec_backup(
     change_group.cache_updated_entry(ssentry)
 
 
-def _cephx_data_entity(cluster_id: str) -> str:
+def _cephx_data_entity(cluster: resources.Cluster) -> str:
     """Generate a name for the (default?) cephx key that a cluster (smbd) will
     use for data access.
     """
-    return f'client.smb.fs.cluster.{cluster_id}'
+    if cluster.external_ceph_cluster:
+        return ''
+    return f'client.smb.fs.cluster.{cluster.cluster_id}'
 
 
 @contextlib.contextmanager