From: Rabinarayan Panigrahi Date: Fri, 8 May 2026 05:28:09 +0000 (+0530) Subject: mgr/cephadm: Add function to get ssl certificate from ssl_certificates X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ee6fd847b03e9d2c68c9f2f7644b8b9bede1437a;p=ceph.git mgr/cephadm: Add function to get ssl certificate from ssl_certificates A function is added _get_certificates_from_spec_ssl_certificates to get certificates from ssl_certificates buffer. it returns TLSCredentials from SSLParameters of ssl_certificates[feature] Signed-off-by: Rabinarayan Panigrahi Signed-off-by: Avan Thakkar --- diff --git a/src/pybind/mgr/cephadm/services/smb.py b/src/pybind/mgr/cephadm/services/smb.py index fbb4fcf57d8..831b89dc1ef 100644 --- a/src/pybind/mgr/cephadm/services/smb.py +++ b/src/pybind/mgr/cephadm/services/smb.py @@ -16,6 +16,12 @@ from typing import ( ) from mgr_module import HandleCommandResult +from cephadm.tlsobject_types import TLSObjectScope, TLSCredentials, EMPTY_TLS_CREDENTIALS +from ceph.smb.constants import ( + KEYBRIDGE, + REMOTE_CONTROL, + FEATURES, +) from ceph.deployment.service_spec import ( SMBExternalCephCluster, @@ -31,6 +37,7 @@ from .cephadmservice import ( CephadmDaemonDeploySpec, simplified_keyring, ) +from ..tlsobject_types import TLSCredentials, EMPTY_TLS_CREDENTIALS from ..schedule import DaemonPlacement if TYPE_CHECKING: @@ -145,14 +152,80 @@ class SMBService(CephService): ) return daemon_spec + def _feature_tls_filename(self, feature: str) -> Optional[str]: + return { + KEYBRIDGE: "keybridge", + REMOTE_CONTROL: "remote_control" + }.get(feature) + + # Flat SSL fields on SMBSpec per feature, used as a fallback when + # ssl_certificates is not set (backward compatibility). + _FEATURE_FLAT_ATTRS: Dict[str, Tuple[str, str, str]] = { + REMOTE_CONTROL: ( + 'remote_control_ssl_cert', + 'remote_control_ssl_key', + 'remote_control_ca_cert', + ), + KEYBRIDGE: ( + 'keybridge_kmip_ssl_cert', + 'keybridge_kmip_ssl_key', + 'keybridge_kmip_ca_cert', + ), + } + + def _get_feature_certs( + self, + daemon_spec: CephadmDaemonDeploySpec, + smb_spec: SMBSpec, + support_feature: str, + feature: str, + ) -> TLSCredentials: + feature_creds = self.get_certificates( + daemon_spec, ca_cert_required=True, feature=feature + ) + if feature_creds: + return feature_creds + flat_attrs = self._FEATURE_FLAT_ATTRS.get(support_feature) + if flat_attrs is None: + return EMPTY_TLS_CREDENTIALS + cert_attr, key_attr, ca_attr = flat_attrs + tc = TLSCredentials( + cert=getattr(smb_spec, cert_attr, None) or '', + key=getattr(smb_spec, key_attr, None) or '', + ca_cert=getattr(smb_spec, ca_attr, None) or '', + ) + return tc if tc else EMPTY_TLS_CREDENTIALS + + def _get_certificates_from_spec_ssl_certificates( + self, + svc_spec: ServiceSpec, + _daemon_spec: CephadmDaemonDeploySpec, + feature: Optional[str] = None, + ) -> TLSCredentials: + smb_spec = cast(SMBSpec, svc_spec) + ssl_certs = getattr(smb_spec, 'ssl_certificates', None) + if not ssl_certs: + return EMPTY_TLS_CREDENTIALS + feature_key = feature if feature is not None else '' + ssl_params = smb_spec.ssl_certificates.get(feature_key) + if not ssl_params: + return EMPTY_TLS_CREDENTIALS + return TLSCredentials( + cert=ssl_params.ssl_cert or '', + key=ssl_params.ssl_key or '', + ca_cert=ssl_params.ssl_ca_cert or '', + ) + def generate_config( self, daemon_spec: CephadmDaemonDeploySpec ) -> Tuple[Dict[str, Any], List[str]]: logger.debug('smb generate_config') assert self.TYPE == daemon_spec.daemon_type + super().register_for_certificates(daemon_spec) smb_spec = cast( SMBSpec, self.mgr.spec_store[daemon_spec.service_name].spec ) + ssl_certificates = smb_spec.ssl_certificates or {} config_blobs: Dict[str, Any] = {} config_blobs['cluster_id'] = smb_spec.cluster_id @@ -181,40 +254,34 @@ class SMBService(CephService): config_blobs['service_ports'] = smb_spec.service_ports() if smb_spec.bind_addrs: config_blobs['bind_networks'] = smb_spec.bind_networks() - if 'remote-control' in smb_spec.features: - files = config_blobs.setdefault('files', {}) - _add_cfg( - files, - 'remote_control.ssl.crt', - self._cert_or_uri(smb_spec.remote_control_ssl_cert), - ) - _add_cfg( - files, - 'remote_control.ssl.key', - self._cert_or_uri(smb_spec.remote_control_ssl_key), - ) - _add_cfg( - files, - 'remote_control.ca.crt', - self._cert_or_uri(smb_spec.remote_control_ca_cert), - ) - if 'keybridge' in smb_spec.features: - files = config_blobs.setdefault('files', {}) - _add_cfg( - files, - 'keybridge.ssl.crt', - self._cert_or_uri(smb_spec.keybridge_kmip_ssl_cert), - ) - _add_cfg( - files, - 'keybridge.ssl.key', - self._cert_or_uri(smb_spec.keybridge_kmip_ssl_key), - ) - _add_cfg( - files, - 'keybridge.ca.crt', - self._cert_or_uri(smb_spec.keybridge_kmip_ca_cert), - ) + for support_feature in smb_spec.features: + feature = self._feature_tls_filename(support_feature) + if feature is not None: + feature_creds = self._get_feature_certs( + daemon_spec, smb_spec, support_feature, feature + ) + if feature_creds: + files = config_blobs.setdefault('files', {}) + cert_pem = self._cert_or_uri(feature_creds.cert) + key_pem = self._cert_or_uri(feature_creds.key) + ca_pem = self._cert_or_uri(feature_creds.ca_cert) + _add_cfg(files, f'{feature}.ssl.crt', cert_pem) + _add_cfg(files, f'{feature}.ssl.key', key_pem) + _add_cfg(files, f'{feature}.ca.crt', ca_pem) + svc_name = smb_spec.service_name() + host = daemon_spec.host + cert_name = f'smb_{feature}_ssl_cert' + key_name = f'smb_{feature}_ssl_key' + ca_cert_name = f'smb_{feature}_ca_cert' + self.mgr.cert_mgr.register_cert_key_pair( + 'smb', cert_name, key_name, self.SCOPE, ca_cert_name + ) + if cert_pem: + self.mgr.cert_mgr.save_cert(cert_name, cert_pem, svc_name, host, user_made=True) + if key_pem: + self.mgr.cert_mgr.save_key(key_name, key_pem, svc_name, host, user_made=True) + if ca_pem: + self.mgr.cert_mgr.save_cert(ca_cert_name, ca_pem, svc_name, host, user_made=True) for ext_cluster in smb_spec.ceph_cluster_configs or []: files = config_blobs.setdefault('files', {}) c_name = f'{ext_cluster.alias}.ceph.conf'