)
from mgr_module import HandleCommandResult
+from cephadm.tlsobject_types import TLSObjectScope, TLSCredentials, EMPTY_TLS_CREDENTIALS
+from ceph.smb.constants import (
+ KEYBRIDGE,
+ REMOTE_CONTROL,
+ FEATURES,
+)
from ceph.deployment.service_spec import (
SMBExternalCephCluster,
CephadmDaemonDeploySpec,
simplified_keyring,
)
+from ..tlsobject_types import TLSCredentials, EMPTY_TLS_CREDENTIALS
from ..schedule import DaemonPlacement
if TYPE_CHECKING:
)
return daemon_spec
+ def _feature_tls_filename(self, feature: str) -> Optional[str]:
+ return {
+ KEYBRIDGE: "keybridge",
+ REMOTE_CONTROL: "remote_control"
+ }.get(feature)
+
+ # Flat SSL fields on SMBSpec per feature, used as a fallback when
+ # ssl_certificates is not set (backward compatibility).
+ _FEATURE_FLAT_ATTRS: Dict[str, Tuple[str, str, str]] = {
+ REMOTE_CONTROL: (
+ 'remote_control_ssl_cert',
+ 'remote_control_ssl_key',
+ 'remote_control_ca_cert',
+ ),
+ KEYBRIDGE: (
+ 'keybridge_kmip_ssl_cert',
+ 'keybridge_kmip_ssl_key',
+ 'keybridge_kmip_ca_cert',
+ ),
+ }
+
+ def _get_feature_certs(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ smb_spec: SMBSpec,
+ support_feature: str,
+ feature: str,
+ ) -> TLSCredentials:
+ feature_creds = self.get_certificates(
+ daemon_spec, ca_cert_required=True, feature=feature
+ )
+ if feature_creds:
+ return feature_creds
+ flat_attrs = self._FEATURE_FLAT_ATTRS.get(support_feature)
+ if flat_attrs is None:
+ return EMPTY_TLS_CREDENTIALS
+ cert_attr, key_attr, ca_attr = flat_attrs
+ tc = TLSCredentials(
+ cert=getattr(smb_spec, cert_attr, None) or '',
+ key=getattr(smb_spec, key_attr, None) or '',
+ ca_cert=getattr(smb_spec, ca_attr, None) or '',
+ )
+ return tc if tc else EMPTY_TLS_CREDENTIALS
+
+ def _get_certificates_from_spec_ssl_certificates(
+ self,
+ svc_spec: ServiceSpec,
+ _daemon_spec: CephadmDaemonDeploySpec,
+ feature: Optional[str] = None,
+ ) -> TLSCredentials:
+ smb_spec = cast(SMBSpec, svc_spec)
+ ssl_certs = getattr(smb_spec, 'ssl_certificates', None)
+ if not ssl_certs:
+ return EMPTY_TLS_CREDENTIALS
+ feature_key = feature if feature is not None else ''
+ ssl_params = smb_spec.ssl_certificates.get(feature_key)
+ if not ssl_params:
+ return EMPTY_TLS_CREDENTIALS
+ return TLSCredentials(
+ cert=ssl_params.ssl_cert or '',
+ key=ssl_params.ssl_key or '',
+ ca_cert=ssl_params.ssl_ca_cert or '',
+ )
+
def generate_config(
self, daemon_spec: CephadmDaemonDeploySpec
) -> Tuple[Dict[str, Any], List[str]]:
logger.debug('smb generate_config')
assert self.TYPE == daemon_spec.daemon_type
+ super().register_for_certificates(daemon_spec)
smb_spec = cast(
SMBSpec, self.mgr.spec_store[daemon_spec.service_name].spec
)
+ ssl_certificates = smb_spec.ssl_certificates or {}
config_blobs: Dict[str, Any] = {}
config_blobs['cluster_id'] = smb_spec.cluster_id
config_blobs['service_ports'] = smb_spec.service_ports()
if smb_spec.bind_addrs:
config_blobs['bind_networks'] = smb_spec.bind_networks()
- if 'remote-control' in smb_spec.features:
- files = config_blobs.setdefault('files', {})
- _add_cfg(
- files,
- 'remote_control.ssl.crt',
- self._cert_or_uri(smb_spec.remote_control_ssl_cert),
- )
- _add_cfg(
- files,
- 'remote_control.ssl.key',
- self._cert_or_uri(smb_spec.remote_control_ssl_key),
- )
- _add_cfg(
- files,
- 'remote_control.ca.crt',
- self._cert_or_uri(smb_spec.remote_control_ca_cert),
- )
- if 'keybridge' in smb_spec.features:
- files = config_blobs.setdefault('files', {})
- _add_cfg(
- files,
- 'keybridge.ssl.crt',
- self._cert_or_uri(smb_spec.keybridge_kmip_ssl_cert),
- )
- _add_cfg(
- files,
- 'keybridge.ssl.key',
- self._cert_or_uri(smb_spec.keybridge_kmip_ssl_key),
- )
- _add_cfg(
- files,
- 'keybridge.ca.crt',
- self._cert_or_uri(smb_spec.keybridge_kmip_ca_cert),
- )
+ for support_feature in smb_spec.features:
+ feature = self._feature_tls_filename(support_feature)
+ if feature is not None:
+ feature_creds = self._get_feature_certs(
+ daemon_spec, smb_spec, support_feature, feature
+ )
+ if feature_creds:
+ files = config_blobs.setdefault('files', {})
+ cert_pem = self._cert_or_uri(feature_creds.cert)
+ key_pem = self._cert_or_uri(feature_creds.key)
+ ca_pem = self._cert_or_uri(feature_creds.ca_cert)
+ _add_cfg(files, f'{feature}.ssl.crt', cert_pem)
+ _add_cfg(files, f'{feature}.ssl.key', key_pem)
+ _add_cfg(files, f'{feature}.ca.crt', ca_pem)
+ svc_name = smb_spec.service_name()
+ host = daemon_spec.host
+ cert_name = f'smb_{feature}_ssl_cert'
+ key_name = f'smb_{feature}_ssl_key'
+ ca_cert_name = f'smb_{feature}_ca_cert'
+ self.mgr.cert_mgr.register_cert_key_pair(
+ 'smb', cert_name, key_name, self.SCOPE, ca_cert_name
+ )
+ if cert_pem:
+ self.mgr.cert_mgr.save_cert(cert_name, cert_pem, svc_name, host, user_made=True)
+ if key_pem:
+ self.mgr.cert_mgr.save_key(key_name, key_pem, svc_name, host, user_made=True)
+ if ca_pem:
+ self.mgr.cert_mgr.save_cert(ca_cert_name, ca_pem, svc_name, host, user_made=True)
for ext_cluster in smb_spec.ceph_cluster_configs or []:
files = config_blobs.setdefault('files', {})
c_name = f'{ext_cluster.alias}.ceph.conf'