)
import contextlib
+import dataclasses
import logging
import time
-from ceph.deployment.service_spec import SMBSpec
+from ceph.deployment.service_spec import SMBExternalCephCluster, SMBSpec
+from ceph.fs.earmarking import EarmarkTopScope
from . import config_store, external, resources
from .enums import (
)
from .internal import (
ClusterEntry,
+ ExternalCephClusterEntry,
JoinAuthEntry,
ShareEntry,
TLSCredentialEntry,
EarmarkResolver,
OrchSubmitter,
PathResolver,
+ Self,
Simplified,
)
from .resources import SMBResource
Staging,
auth_refs,
cross_check_resource,
+ ext_cluster_refs,
tls_refs,
ug_refs,
)
join_auths: List[resources.JoinAuth],
users_and_groups: List[resources.UsersAndGroups],
tls_credentials: List[resources.TLSCredential],
+ ext_ceph_clusters: List[resources.ExternalCephCluster],
):
self.cluster = cluster
self.shares = shares
self.join_auths = join_auths
self.users_and_groups = users_and_groups
self.tls_credentials = tls_credentials
+ self.ext_ceph_clusters = ext_ceph_clusters
# a cache for modified entries
self.cache = config_store.EntryCache()
resolve_exists = resolve
+class ExoResolver:
+ def __init__(self, cluster: resources.Cluster) -> None:
+ self._cluster = cluster
+
+ def resolve(
+ self, volume: str, subvolumegroup: str, subvolume: str, path: str
+ ) -> str:
+ assert not subvolumegroup
+ assert not subvolume
+ return path
+
+ resolve_exists = resolve
+
+
class _FakeEarmarkResolver:
"""A stub EarmarkResolver for unit testing."""
def set_earmark(self, path: str, volume: str, earmark: str) -> None:
pass
- def check_earmark(self, earmark: str, top_level_scope: str) -> bool:
+ def check_earmark(
+ self, earmark: str, top_level_scope: EarmarkTopScope
+ ) -> bool:
return True
resources.JoinAuth,
resources.UsersAndGroups,
resources.TLSCredential,
+ resources.ExternalCephCluster,
)
def __init__(self) -> None:
msg='a resource with the same ID already exists',
)
try:
+ path_resolver = self._choose_path_resolver(resource, staging)
+ earmark_resolver = self._choose_earmark_resolver(
+ resource, staging
+ )
cross_check_resource(
resource,
staging,
- path_resolver=self._path_resolver,
- earmark_resolver=self._earmark_resolver,
+ path_resolver=path_resolver,
+ earmark_resolver=earmark_resolver,
)
except ErrorResult as err:
log.debug('rejected resource: %r', resource)
result = Result(resource, success=True, status={'checked': True})
return result
+ def _choose_path_resolver(
+ self, resource: SMBResource, staging: Staging
+ ) -> PathResolver:
+ if isinstance(resource, resources.Share):
+ cluster = staging.get_cluster(resource.cluster_id)
+ if refs := ext_cluster_refs(cluster):
+ log.debug(
+ "Selected external resolver for %s (ext refs: %r)",
+ resource,
+ refs,
+ )
+ return ExoResolver(cluster)
+ log.debug("Selected default path resolver for %s", resource)
+ return self._path_resolver
+
+ def _choose_earmark_resolver(
+ self, resource: SMBResource, staging: Staging
+ ) -> EarmarkResolver:
+ if isinstance(resource, resources.Share):
+ cluster = staging.get_cluster(resource.cluster_id)
+ if ext_cluster_refs(cluster):
+ return _FakeEarmarkResolver()
+ return self._earmark_resolver
+
def _sync_clusters(
self, modified_cluster_ids: Optional[Collection[str]] = None
) -> None:
).get_tls_credential()
for _id in tls_refs(cluster)
],
+ [
+ ExternalCephClusterEntry.from_store(
+ self.internal_store, _id
+ ).get_external_ceph_cluster()
+ for _id in ext_cluster_refs(cluster)
+ ],
)
change_groups.append(change_group)
for change_group in change_groups:
chg_join_ids: Set[str] = set()
chg_ug_ids: Set[str] = set()
chg_tls_ids: Set[str] = set()
+ chg_extc_ids: Set[str] = set()
for result in updated:
state = (result.status or {}).get('state', None)
if state in (State.PRESENT, State.NOT_PRESENT):
chg_ug_ids.add(result.src.users_groups_id)
elif isinstance(result.src, resources.TLSCredential):
chg_tls_ids.add(result.src.tls_credential_id)
+ elif isinstance(result.src, resources.ExternalCephCluster):
+ chg_extc_ids.add(result.src.external_ceph_cluster_id)
# TODO: here's a lazy bit. if any join auths or users/groups changed we
# will regen all clusters because these can be shared by >1 cluster.
# In future, make this only pick clusters using the named resources.
- if chg_join_ids or chg_ug_ids or chg_tls_ids:
+ if chg_join_ids or chg_ug_ids or chg_tls_ids or chg_extc_ids:
chg_cluster_ids.update(ClusterEntry.ids(self.internal_store))
return chg_cluster_ids
'saving external store for cluster: %s',
change_group.cluster.cluster_id,
)
+ cluster = change_group.cluster
+ assert isinstance(cluster, resources.Cluster)
# vols: hold the cephfs volumes our shares touch. some operations are
# disabled/skipped unless we touch volumes.
vols = {share.checked_cephfs.volume for share in change_group.shares}
- data_entity = _cephx_data_entity(change_group.cluster.cluster_id)
# save the various object types
previous_info = _swap_pending_cluster_info(
self.public_store,
_save_pending_join_auths(self.priv_store, change_group)
_save_pending_users_and_groups(self.priv_store, change_group)
_save_pending_tls_credentials(self.priv_store, change_group)
- _save_pending_config(
- self.public_store,
- change_group,
- self._path_resolver,
- data_entity,
+ cluster_conf = _ClusterConf.assemble(
+ change_group, self._path_resolver, self._authorizer
)
+ _save_pending_config(self.public_store, cluster_conf)
# remove any stray objects
external.rm_other_in_ns(
self.priv_store,
set(change_group.cache),
)
- # ensure a entity exists with access to the volumes
- for volume in vols:
- self._authorizer.authorize_entity(volume, data_entity)
- if not vols:
- # there were no volumes, and thus nothing to authorize. set data_entity
- # to an empty string to avoid adding it to the svc spec later.
- data_entity = ''
-
# build a service spec for smb cluster
- cluster = change_group.cluster
- assert isinstance(cluster, resources.Cluster)
config_entries = [
change_group.cache[external.config_key(cluster.cluster_id)],
self.public_store[
]
for tc in change_group.tls_credentials
}
+ ext_ceph_cluster = None
+ if change_group.ext_ceph_clusters:
+ assert len(change_group.ext_ceph_clusters) == 1
+ ext_ceph_cluster = change_group.ext_ceph_clusters[0]
smb_spec = _generate_smb_service_spec(
cluster,
config_entries=config_entries,
join_source_entries=join_source_entries,
user_source_entries=user_source_entries,
tls_credential_entries=tls_credential_entries,
- data_entity=data_entity,
+ data_entity=cluster_conf.data_entity,
needs_proxy=_has_proxied_vfs(change_group),
+ ext_ceph_cluster=ext_ceph_cluster,
)
_save_pending_spec_backup(self.public_store, change_group, smb_spec)
# if orch was ever needed in the past we must "re-orch", but if we have
return sorted(resource_objs, key=_keyfunc)
-def _generate_share(
- share: resources.Share, resolver: PathResolver, cephx_entity: str
-) -> Dict[str, Dict[str, str]]:
+@dataclasses.dataclass(frozen=True)
+class _ShareConf:
+ resource: resources.Share
+ resolver: PathResolver
+ cephx_entity: str
+ ceph_cluster: str
+
+
+@dataclasses.dataclass(frozen=True)
+class _ClusterConf:
+ resource: resources.Cluster
+ shares: Iterable[_ShareConf]
+ change_group: ClusterChangeGroup
+ data_entity: str
+
+ @classmethod
+ def assemble(
+ cls,
+ change_group: ClusterChangeGroup,
+ default_resolver: PathResolver,
+ authorizer: AccessAuthorizer,
+ ) -> Self:
+ extcc = None
+ assert isinstance(change_group.cluster, resources.Cluster)
+ if change_group.ext_ceph_clusters:
+ assert len(change_group.ext_ceph_clusters) == 1
+ extcc = change_group.ext_ceph_clusters[0]
+
+ resolver = default_resolver
+ cephx_entity = '' # default to no entity for data access
+ cephadm_data_entity = '' # passed to cephadm service spec
+ ceph_cluster = '' # empty string means local cluster
+ if extcc:
+ log.debug('external ceph cluster')
+ resolver = ExoResolver(change_group.cluster)
+ ceph_cluster = 'exo'
+ cephx_entity = checked(extcc.cluster).cephfs_user.name
+ elif change_group.shares:
+ log.debug('local ceph cluster with shares')
+ cephx_entity = _cephx_data_entity(change_group.cluster)
+ # ensure an entity exists with access to the volumes
+ for share in change_group.shares:
+ authorizer.authorize_entity(
+ share.checked_cephfs.volume, cephx_entity
+ )
+ cephadm_data_entity = cephx_entity
+ else:
+ log.debug('local cluster without shares: skipping ceph auth')
+ return cls(
+ change_group.cluster,
+ [
+ _ShareConf(s, resolver, cephx_entity, ceph_cluster)
+ for s in change_group.shares
+ ],
+ change_group,
+ cephadm_data_entity,
+ )
+
+
+def _generate_share(conf: _ShareConf) -> Dict[str, Dict[str, str]]:
+ share = conf.resource
+ cephx_entity = conf.cephx_entity
cephfs = share.checked_cephfs
assert cephfs.provider.is_vfs(), "not a vfs provider"
assert cephx_entity, "cephx entity name missing"
plen = len(_prefix)
if cephx_entity.startswith(_prefix):
cephx_entity = cephx_entity[plen:]
- path = resolver.resolve(
+ path = conf.resolver.resolve(
cephfs.volume,
cephfs.subvolumegroup,
cephfs.subvolume,
}[cephfs.provider.expand()]
except KeyError:
raise ValueError(f'unsupported provider: {cephfs.provider}')
+ ceph_config_file = (
+ f'/etc/ceph/{conf.ceph_cluster}.ceph.conf'
+ if conf.ceph_cluster
+ else '/etc/ceph/ceph.conf'
+ )
cfg = {
# smb.conf options
'options': {
'path': path,
"vfs objects": f"acl_xattr ceph_snapshots {ceph_vfs}",
'acl_xattr:security_acl_name': 'user.NTACL',
- f'{ceph_vfs}:config_file': '/etc/ceph/ceph.conf',
+ f'{ceph_vfs}:config_file': ceph_config_file,
f'{ceph_vfs}:filesystem': cephfs.volume,
f'{ceph_vfs}:user_id': cephx_entity,
'read only': ynbool(share.readonly),
cfg['options']['hosts deny'] = ', '.join(hosts_deny)
-def _generate_config(
- cluster: resources.Cluster,
- shares: Iterable[resources.Share],
- resolver: PathResolver,
- cephx_entity: str = "",
-) -> Dict[str, Any]:
+def _generate_config(conf: _ClusterConf) -> Dict[str, Any]:
cluster_global_opts = {}
+ cluster = conf.resource
if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY:
assert cluster.domain_settings is not None
cluster_global_opts['security'] = 'ads'
cluster_global_opts['smb ports'] = str(_smb_port(cluster))
share_configs = {
- share.name: _generate_share(share, resolver, cephx_entity)
- for share in shares
+ share.resource.name: _generate_share(share) for share in conf.shares
}
instance_features = []
tls_credential_entries: Dict[str, ConfigEntry],
data_entity: str = '',
needs_proxy: bool = False,
+ ext_ceph_cluster: Optional[resources.ExternalCephCluster],
) -> SMBSpec:
features = []
if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY:
rc_ca_cert = _tls_uri(
cluster.remote_control.ca_cert, tls_credential_entries
)
+ ceph_cluster_configs = None
+ if ext_ceph_cluster:
+ exo = checked(ext_ceph_cluster.cluster)
+ ceph_cluster_configs = [
+ SMBExternalCephCluster(
+ alias='exo',
+ fsid=exo.fsid,
+ mon_host=exo.mon_host,
+ user=exo.cephfs_user.name,
+ key=exo.cephfs_user.key,
+ )
+ ]
+
return SMBSpec(
service_id=cluster.cluster_id,
placement=cluster.placement,
remote_control_ssl_cert=rc_cert,
remote_control_ssl_key=rc_key,
remote_control_ca_cert=rc_ca_cert,
+ ceph_cluster_configs=ceph_cluster_configs,
)
def _save_pending_config(
store: ConfigStore,
- change_group: ClusterChangeGroup,
- resolver: PathResolver,
- cephx_entity: str = "",
+ cluster_conf: _ClusterConf,
) -> None:
- assert isinstance(change_group.cluster, resources.Cluster)
# generate the cluster configuration and save it in the public store
- cconfig = _generate_config(
- change_group.cluster, change_group.shares, resolver, cephx_entity
- )
- centry = store[external.config_key(change_group.cluster.cluster_id)]
+ cconfig = _generate_config(cluster_conf)
+ centry = store[external.config_key(cluster_conf.resource.cluster_id)]
centry.set(cconfig)
- change_group.cache_updated_entry(centry)
+ cluster_conf.change_group.cache_updated_entry(centry)
def _save_pending_spec_backup(
change_group.cache_updated_entry(ssentry)
-def _cephx_data_entity(cluster_id: str) -> str:
+def _cephx_data_entity(cluster: resources.Cluster) -> str:
"""Generate a name for the (default?) cephx key that a cluster (smbd) will
use for data access.
"""
- return f'client.smb.fs.cluster.{cluster_id}'
+ if cluster.external_ceph_cluster:
+ return ''
+ return f'client.smb.fs.cluster.{cluster.cluster_id}'
@contextlib.contextmanager