import time
import ceph.smb.constants
-from ceph.deployment.service_spec import SMBExternalCephCluster, SMBSpec
+from ceph.deployment.service_spec import (
+ SMBExternalCephCluster,
+ SMBSpec,
+ SSLParameters,
+)
from ceph.fs.earmarking import EarmarkTopScope
from . import config_store, external, resources
change_group.cluster.cluster_id,
)
cluster = change_group.cluster
+ ssl_certificates: Dict[str, SSLParameters] = {}
assert isinstance(cluster, resources.Cluster)
# vols: hold the cephfs volumes our shares touch. some operations are
# disabled/skipped unless we touch volumes.
if change_group.ext_ceph_clusters:
assert len(change_group.ext_ceph_clusters) == 1
ext_ceph_cluster = change_group.ext_ceph_clusters[0]
+
smb_spec = _generate_smb_service_spec(
cluster,
config_entries=config_entries,
data_entity=cluster_conf.data_entity,
needs_proxy=_has_proxied_vfs(change_group),
ext_ceph_cluster=ext_ceph_cluster,
+ ssl_certificates=ssl_certificates,
)
_save_pending_spec_backup(self.public_store, change_group, smb_spec)
# if orch was ever needed in the past we must "re-orch", but if we have
data_entity: str = '',
needs_proxy: bool = False,
ext_ceph_cluster: Optional[resources.ExternalCephCluster],
+ ssl_certificates: Dict[str, SSLParameters],
) -> SMBSpec:
features = []
if cluster.auth_mode == AuthMode.ACTIVE_DIRECTORY:
user_entities: Optional[List[str]] = None
if data_entity:
user_entities = [data_entity]
+
rc_cert = rc_key = rc_ca_cert = None
- if cluster.remote_control_is_enabled:
+ if cluster.is_feature_enabled(_REMOTE_CONTROL):
assert cluster.remote_control
rc_cert = _tls_uri(
cluster.remote_control.cert, tls_credential_entries
rc_ca_cert = _tls_uri(
cluster.remote_control.ca_cert, tls_credential_entries
)
+ feature = ceph.smb.constants.FEATURE_FILE_NAMES.get(_REMOTE_CONTROL)
+ if feature is not None:
+ ssl_certificates[feature] = SSLParameters(
+ enabled=bool(rc_cert and rc_key),
+ ssl_cert=rc_cert,
+ ssl_key=rc_key,
+ ssl_ca_cert=rc_ca_cert,
+ certificate_source='uri',
+ )
+
kb_cert = kb_key = kb_ca_cert = None
- if cluster.keybridge_is_enabled:
+ if cluster.is_feature_enabled(_KEYBRIDGE):
assert cluster.keybridge # type narrow
# TODO: current all KMIP scopes must share the same tls creds
# and that sucks. need to update keybridge to fetch cert URIs directly
kb_ca_cert = _tls_uri(
kmip_scope.kmip_ca_cert, tls_credential_entries
)
+ feature = ceph.smb.constants.FEATURE_FILE_NAMES.get(_KEYBRIDGE)
+ if feature is not None:
+ ssl_certificates[feature] = SSLParameters(
+ enabled=bool(kb_cert and kb_key),
+ ssl_cert=kb_cert,
+ ssl_key=kb_key,
+ ssl_ca_cert=kb_ca_cert,
+ certificate_source='uri',
+ )
+
ceph_cluster_configs = None
if ext_ceph_cluster:
exo = checked(ext_ceph_cluster.cluster)
key=exo.cephfs_user.key,
)
]
-
return SMBSpec(
service_id=cluster.cluster_id,
placement=cluster.placement,
cluster_public_addrs=cluster.service_spec_public_addrs(),
custom_ports=cluster.custom_ports,
bind_addrs=cluster.service_spec_bind_addrs(),
- remote_control_ssl_cert=rc_cert,
- remote_control_ssl_key=rc_key,
- remote_control_ca_cert=rc_ca_cert,
- keybridge_kmip_ssl_cert=kb_cert,
- keybridge_kmip_ssl_key=kb_key,
- keybridge_kmip_ca_cert=kb_ca_cert,
+ ssl_certificates=ssl_certificates or None,
ceph_cluster_configs=ceph_cluster_configs,
tunables=_service_spec_tunables(cluster),
)