From: John Mulligan Date: Fri, 28 Nov 2025 17:53:06 +0000 (-0500) Subject: mgr/smb: update the handler to support external ceph cluster type X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5712016c2133870da3f704d8457358ad06efc87f;p=ceph.git mgr/smb: update the handler to support external ceph cluster type Signed-off-by: John Mulligan --- diff --git a/src/pybind/mgr/smb/handler.py b/src/pybind/mgr/smb/handler.py index db77e9abccf..ee0616573ff 100644 --- a/src/pybind/mgr/smb/handler.py +++ b/src/pybind/mgr/smb/handler.py @@ -13,10 +13,12 @@ from typing import ( ) import contextlib +import dataclasses import logging import time -from ceph.deployment.service_spec import SMBSpec +from ceph.deployment.service_spec import SMBExternalCephCluster, SMBSpec +from ceph.fs.earmarking import EarmarkTopScope from . import config_store, external, resources from .enums import ( @@ -31,6 +33,7 @@ from .enums import ( ) from .internal import ( ClusterEntry, + ExternalCephClusterEntry, JoinAuthEntry, ShareEntry, TLSCredentialEntry, @@ -43,6 +46,7 @@ from .proto import ( EarmarkResolver, OrchSubmitter, PathResolver, + Self, Simplified, ) from .resources import SMBResource @@ -51,6 +55,7 @@ from .staging import ( Staging, auth_refs, cross_check_resource, + ext_cluster_refs, tls_refs, ug_refs, ) @@ -82,12 +87,14 @@ class ClusterChangeGroup: join_auths: List[resources.JoinAuth], users_and_groups: List[resources.UsersAndGroups], tls_credentials: List[resources.TLSCredential], + ext_ceph_clusters: List[resources.ExternalCephCluster], ): self.cluster = cluster self.shares = shares self.join_auths = join_auths self.users_and_groups = users_and_groups self.tls_credentials = tls_credentials + self.ext_ceph_clusters = ext_ceph_clusters # a cache for modified entries self.cache = config_store.EntryCache() @@ -120,6 +127,20 @@ class _FakePathResolver: resolve_exists = resolve +class ExoResolver: + def __init__(self, cluster: resources.Cluster) -> None: + self._cluster = cluster + + def resolve( + self, volume: str, subvolumegroup: str, subvolume: str, path: str + ) -> str: + assert not subvolumegroup + assert not subvolume + return path + + resolve_exists = resolve + + class _FakeEarmarkResolver: """A stub EarmarkResolver for unit testing.""" @@ -132,7 +153,9 @@ class _FakeEarmarkResolver: def set_earmark(self, path: str, volume: str, earmark: str) -> None: pass - def check_earmark(self, earmark: str, top_level_scope: str) -> bool: + def check_earmark( + self, earmark: str, top_level_scope: EarmarkTopScope + ) -> bool: return True @@ -152,6 +175,7 @@ class _Matcher: resources.JoinAuth, resources.UsersAndGroups, resources.TLSCredential, + resources.ExternalCephCluster, ) def __init__(self) -> None: @@ -406,11 +430,15 @@ class ClusterConfigHandler: msg='a resource with the same ID already exists', ) try: + path_resolver = self._choose_path_resolver(resource, staging) + earmark_resolver = self._choose_earmark_resolver( + resource, staging + ) cross_check_resource( resource, staging, - path_resolver=self._path_resolver, - earmark_resolver=self._earmark_resolver, + path_resolver=path_resolver, + earmark_resolver=earmark_resolver, ) except ErrorResult as err: log.debug('rejected resource: %r', resource) @@ -419,6 +447,30 @@ class ClusterConfigHandler: result = Result(resource, success=True, status={'checked': True}) return result + def _choose_path_resolver( + self, resource: SMBResource, staging: Staging + ) -> PathResolver: + if isinstance(resource, resources.Share): + cluster = staging.get_cluster(resource.cluster_id) + if refs := ext_cluster_refs(cluster): + log.debug( + "Selected external resolver for %s (ext refs: %r)", + resource, + refs, + ) + return ExoResolver(cluster) + log.debug("Selected default path resolver for %s", resource) + return self._path_resolver + + def _choose_earmark_resolver( + self, resource: SMBResource, staging: Staging + ) -> EarmarkResolver: + if isinstance(resource, resources.Share): + cluster = staging.get_cluster(resource.cluster_id) + if ext_cluster_refs(cluster): + return _FakeEarmarkResolver() + return self._earmark_resolver + def _sync_clusters( self, modified_cluster_ids: Optional[Collection[str]] = None ) -> None: @@ -466,6 +518,12 @@ class ClusterConfigHandler: ).get_tls_credential() for _id in tls_refs(cluster) ], + [ + ExternalCephClusterEntry.from_store( + self.internal_store, _id + ).get_external_ceph_cluster() + for _id in ext_cluster_refs(cluster) + ], ) change_groups.append(change_group) for change_group in change_groups: @@ -500,6 +558,7 @@ class ClusterConfigHandler: chg_join_ids: Set[str] = set() chg_ug_ids: Set[str] = set() chg_tls_ids: Set[str] = set() + chg_extc_ids: Set[str] = set() for result in updated: state = (result.status or {}).get('state', None) if state in (State.PRESENT, State.NOT_PRESENT): @@ -520,11 +579,13 @@ class ClusterConfigHandler: chg_ug_ids.add(result.src.users_groups_id) elif isinstance(result.src, resources.TLSCredential): chg_tls_ids.add(result.src.tls_credential_id) + elif isinstance(result.src, resources.ExternalCephCluster): + chg_extc_ids.add(result.src.external_ceph_cluster_id) # TODO: here's a lazy bit. if any join auths or users/groups changed we # will regen all clusters because these can be shared by >1 cluster. # In future, make this only pick clusters using the named resources. - if chg_join_ids or chg_ug_ids or chg_tls_ids: + if chg_join_ids or chg_ug_ids or chg_tls_ids or chg_extc_ids: chg_cluster_ids.update(ClusterEntry.ids(self.internal_store)) return chg_cluster_ids @@ -536,10 +597,11 @@ class ClusterConfigHandler: 'saving external store for cluster: %s', change_group.cluster.cluster_id, ) + cluster = change_group.cluster + assert isinstance(cluster, resources.Cluster) # vols: hold the cephfs volumes our shares touch. some operations are # disabled/skipped unless we touch volumes. vols = {share.checked_cephfs.volume for share in change_group.shares} - data_entity = _cephx_data_entity(change_group.cluster.cluster_id) # save the various object types previous_info = _swap_pending_cluster_info( self.public_store, @@ -549,12 +611,10 @@ class ClusterConfigHandler: _save_pending_join_auths(self.priv_store, change_group) _save_pending_users_and_groups(self.priv_store, change_group) _save_pending_tls_credentials(self.priv_store, change_group) - _save_pending_config( - self.public_store, - change_group, - self._path_resolver, - data_entity, + cluster_conf = _ClusterConf.assemble( + change_group, self._path_resolver, self._authorizer ) + _save_pending_config(self.public_store, cluster_conf) # remove any stray objects external.rm_other_in_ns( self.priv_store, @@ -562,17 +622,7 @@ class ClusterConfigHandler: set(change_group.cache), ) - # ensure a entity exists with access to the volumes - for volume in vols: - self._authorizer.authorize_entity(volume, data_entity) - if not vols: - # there were no volumes, and thus nothing to authorize. set data_entity - # to an empty string to avoid adding it to the svc spec later. - data_entity = '' - # build a service spec for smb cluster - cluster = change_group.cluster - assert isinstance(cluster, resources.Cluster) config_entries = [ change_group.cache[external.config_key(cluster.cluster_id)], self.public_store[ @@ -601,14 +651,19 @@ class ClusterConfigHandler: ] for tc in change_group.tls_credentials } + ext_ceph_cluster = None + if change_group.ext_ceph_clusters: + assert len(change_group.ext_ceph_clusters) == 1 + ext_ceph_cluster = change_group.ext_ceph_clusters[0] smb_spec = _generate_smb_service_spec( cluster, config_entries=config_entries, join_source_entries=join_source_entries, user_source_entries=user_source_entries, tls_credential_entries=tls_credential_entries, - data_entity=data_entity, + data_entity=cluster_conf.data_entity, needs_proxy=_has_proxied_vfs(change_group), + ext_ceph_cluster=ext_ceph_cluster, ) _save_pending_spec_backup(self.public_store, change_group, smb_spec) # if orch was ever needed in the past we must "re-orch", but if we have @@ -668,9 +723,68 @@ def order_resources( return sorted(resource_objs, key=_keyfunc) -def _generate_share( - share: resources.Share, resolver: PathResolver, cephx_entity: str -) -> Dict[str, Dict[str, str]]: +@dataclasses.dataclass(frozen=True) +class _ShareConf: + resource: resources.Share + resolver: PathResolver + cephx_entity: str + ceph_cluster: str + + +@dataclasses.dataclass(frozen=True) +class _ClusterConf: + resource: resources.Cluster + shares: Iterable[_ShareConf] + change_group: ClusterChangeGroup + data_entity: str + + @classmethod + def assemble( + cls, + change_group: ClusterChangeGroup, + default_resolver: PathResolver, + authorizer: AccessAuthorizer, + ) -> Self: + extcc = None + assert isinstance(change_group.cluster, resources.Cluster) + if change_group.ext_ceph_clusters: + assert len(change_group.ext_ceph_clusters) == 1 + extcc = change_group.ext_ceph_clusters[0] + + resolver = default_resolver + cephx_entity = '' # default to no entity for data access + cephadm_data_entity = '' # passed to cephadm service spec + ceph_cluster = '' # empty string means local cluster + if extcc: + log.debug('external ceph cluster') + resolver = ExoResolver(change_group.cluster) + ceph_cluster = 'exo' + cephx_entity = checked(extcc.cluster).cephfs_user.name + elif change_group.shares: + log.debug('local ceph cluster with shares') + cephx_entity = _cephx_data_entity(change_group.cluster) + # ensure an entity exists with access to the volumes + for share in change_group.shares: + authorizer.authorize_entity( + share.checked_cephfs.volume, cephx_entity + ) + cephadm_data_entity = cephx_entity + else: + log.debug('local cluster without shares: skipping ceph auth') + return cls( + change_group.cluster, + [ + _ShareConf(s, resolver, cephx_entity, ceph_cluster) + for s in change_group.shares + ], + change_group, + cephadm_data_entity, + ) + + +def _generate_share(conf: _ShareConf) -> Dict[str, Dict[str, str]]: + share = conf.resource + cephx_entity = conf.cephx_entity cephfs = share.checked_cephfs assert cephfs.provider.is_vfs(), "not a vfs provider" assert cephx_entity, "cephx entity name missing" @@ -681,7 +795,7 @@ def _generate_share( plen = len(_prefix) if cephx_entity.startswith(_prefix): cephx_entity = cephx_entity[plen:] - path = resolver.resolve( + path = conf.resolver.resolve( cephfs.volume, cephfs.subvolumegroup, cephfs.subvolume, @@ -695,13 +809,18 @@ def _generate_share( }[cephfs.provider.expand()] except KeyError: raise ValueError(f'unsupported provider: {cephfs.provider}') + ceph_config_file = ( + f'/etc/ceph/{conf.ceph_cluster}.ceph.conf' + if conf.ceph_cluster + else '/etc/ceph/ceph.conf' + ) cfg = { # smb.conf options 'options': { 'path': path, "vfs objects": f"acl_xattr ceph_snapshots {ceph_vfs}", 'acl_xattr:security_acl_name': 'user.NTACL', - f'{ceph_vfs}:config_file': '/etc/ceph/ceph.conf', + f'{ceph_vfs}:config_file': ceph_config_file, f'{ceph_vfs}:filesystem': cephfs.volume, f'{ceph_vfs}:user_id': cephx_entity, 'read only': ynbool(share.readonly), @@ -791,13 +910,9 @@ def _generate_share_hosts_access( cfg['options']['hosts deny'] = ', '.join(hosts_deny) -def _generate_config( - cluster: resources.Cluster, - shares: Iterable[resources.Share], - resolver: PathResolver, - cephx_entity: str = "", -) -> Dict[str, Any]: +def _generate_config(conf: _ClusterConf) -> Dict[str, Any]: cluster_global_opts = {} + cluster = conf.resource if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY: assert cluster.domain_settings is not None cluster_global_opts['security'] = 'ads' @@ -810,8 +925,7 @@ def _generate_config( cluster_global_opts['smb ports'] = str(_smb_port(cluster)) share_configs = { - share.name: _generate_share(share, resolver, cephx_entity) - for share in shares + share.resource.name: _generate_share(share) for share in conf.shares } instance_features = [] @@ -862,6 +976,7 @@ def _generate_smb_service_spec( tls_credential_entries: Dict[str, ConfigEntry], data_entity: str = '', needs_proxy: bool = False, + ext_ceph_cluster: Optional[resources.ExternalCephCluster], ) -> SMBSpec: features = [] if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY: @@ -903,6 +1018,19 @@ def _generate_smb_service_spec( rc_ca_cert = _tls_uri( cluster.remote_control.ca_cert, tls_credential_entries ) + ceph_cluster_configs = None + if ext_ceph_cluster: + exo = checked(ext_ceph_cluster.cluster) + ceph_cluster_configs = [ + SMBExternalCephCluster( + alias='exo', + fsid=exo.fsid, + mon_host=exo.mon_host, + user=exo.cephfs_user.name, + key=exo.cephfs_user.key, + ) + ] + return SMBSpec( service_id=cluster.cluster_id, placement=cluster.placement, @@ -919,6 +1047,7 @@ def _generate_smb_service_spec( remote_control_ssl_cert=rc_cert, remote_control_ssl_key=rc_key, remote_control_ca_cert=rc_ca_cert, + ceph_cluster_configs=ceph_cluster_configs, ) @@ -1024,18 +1153,13 @@ def _save_pending_tls_credentials( def _save_pending_config( store: ConfigStore, - change_group: ClusterChangeGroup, - resolver: PathResolver, - cephx_entity: str = "", + cluster_conf: _ClusterConf, ) -> None: - assert isinstance(change_group.cluster, resources.Cluster) # generate the cluster configuration and save it in the public store - cconfig = _generate_config( - change_group.cluster, change_group.shares, resolver, cephx_entity - ) - centry = store[external.config_key(change_group.cluster.cluster_id)] + cconfig = _generate_config(cluster_conf) + centry = store[external.config_key(cluster_conf.resource.cluster_id)] centry.set(cconfig) - change_group.cache_updated_entry(centry) + cluster_conf.change_group.cache_updated_entry(centry) def _save_pending_spec_backup( @@ -1046,11 +1170,13 @@ def _save_pending_spec_backup( change_group.cache_updated_entry(ssentry) -def _cephx_data_entity(cluster_id: str) -> str: +def _cephx_data_entity(cluster: resources.Cluster) -> str: """Generate a name for the (default?) cephx key that a cluster (smbd) will use for data access. """ - return f'client.smb.fs.cluster.{cluster_id}' + if cluster.external_ceph_cluster: + return '' + return f'client.smb.fs.cluster.{cluster.cluster_id}' @contextlib.contextmanager