]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: Add function to get ssl certificate from ssl_certificates
authorRabinarayan Panigrahi <rapanigr@redhat.com>
Fri, 8 May 2026 05:28:09 +0000 (10:58 +0530)
committerRabinarayan Panigrahi <rapanigr@redhat.com>
Thu, 18 Jun 2026 13:50:17 +0000 (19:20 +0530)
A function is added _get_certificates_from_spec_ssl_certificates to get
certificates from ssl_certificates buffer. it returns TLSCredentials from
SSLParameters of ssl_certificates[feature]

Signed-off-by: Rabinarayan Panigrahi <rapanigr@redhat.com>
Signed-off-by: Avan Thakkar <athakkar@redhat.com>
src/pybind/mgr/cephadm/services/smb.py

index fbb4fcf57d8ae4543041a8ac91c7796beb11fa23..831b89dc1efb91761baefeda27e3fd4ab166a052 100644 (file)
@@ -16,6 +16,12 @@ from typing import (
 )
 
 from mgr_module import HandleCommandResult
+from cephadm.tlsobject_types import TLSObjectScope, TLSCredentials, EMPTY_TLS_CREDENTIALS
+from ceph.smb.constants import (
+    KEYBRIDGE, 
+    REMOTE_CONTROL,
+    FEATURES,
+)
 
 from ceph.deployment.service_spec import (
     SMBExternalCephCluster,
@@ -31,6 +37,7 @@ from .cephadmservice import (
     CephadmDaemonDeploySpec,
     simplified_keyring,
 )
+from ..tlsobject_types import TLSCredentials, EMPTY_TLS_CREDENTIALS
 from ..schedule import DaemonPlacement
 
 if TYPE_CHECKING:
@@ -145,14 +152,80 @@ class SMBService(CephService):
         )
         return daemon_spec
 
+    def _feature_tls_filename(self, feature: str) -> Optional[str]:
+        return {
+                KEYBRIDGE: "keybridge",
+                REMOTE_CONTROL: "remote_control"
+            }.get(feature)
+
+    # Flat SSL fields on SMBSpec per feature, used as a fallback when
+    # ssl_certificates is not set (backward compatibility).
+    _FEATURE_FLAT_ATTRS: Dict[str, Tuple[str, str, str]] = {
+        REMOTE_CONTROL: (
+            'remote_control_ssl_cert',
+            'remote_control_ssl_key',
+            'remote_control_ca_cert',
+        ),
+        KEYBRIDGE: (
+            'keybridge_kmip_ssl_cert',
+            'keybridge_kmip_ssl_key',
+            'keybridge_kmip_ca_cert',
+        ),
+    }
+
+    def _get_feature_certs(
+        self,
+        daemon_spec: CephadmDaemonDeploySpec,
+        smb_spec: SMBSpec,
+        support_feature: str,
+        feature: str,
+    ) -> TLSCredentials:
+        feature_creds = self.get_certificates(
+            daemon_spec, ca_cert_required=True, feature=feature
+        )
+        if feature_creds:
+            return feature_creds
+        flat_attrs = self._FEATURE_FLAT_ATTRS.get(support_feature)
+        if flat_attrs is None:
+            return EMPTY_TLS_CREDENTIALS
+        cert_attr, key_attr, ca_attr = flat_attrs
+        tc = TLSCredentials(
+            cert=getattr(smb_spec, cert_attr, None) or '',
+            key=getattr(smb_spec, key_attr, None) or '',
+            ca_cert=getattr(smb_spec, ca_attr, None) or '',
+        )
+        return tc if tc else EMPTY_TLS_CREDENTIALS
+
+    def _get_certificates_from_spec_ssl_certificates(
+        self,
+        svc_spec: ServiceSpec,
+        _daemon_spec: CephadmDaemonDeploySpec,
+        feature: Optional[str] = None,
+    ) -> TLSCredentials:
+        smb_spec = cast(SMBSpec, svc_spec)
+        ssl_certs = getattr(smb_spec, 'ssl_certificates', None)
+        if not ssl_certs:
+            return EMPTY_TLS_CREDENTIALS
+        feature_key = feature if feature is not None else ''
+        ssl_params = smb_spec.ssl_certificates.get(feature_key)
+        if not ssl_params:
+            return EMPTY_TLS_CREDENTIALS
+        return TLSCredentials(
+            cert=ssl_params.ssl_cert or '',
+            key=ssl_params.ssl_key or '',
+            ca_cert=ssl_params.ssl_ca_cert or '',
+        )
+
     def generate_config(
         self, daemon_spec: CephadmDaemonDeploySpec
     ) -> Tuple[Dict[str, Any], List[str]]:
         logger.debug('smb generate_config')
         assert self.TYPE == daemon_spec.daemon_type
+        super().register_for_certificates(daemon_spec)
         smb_spec = cast(
             SMBSpec, self.mgr.spec_store[daemon_spec.service_name].spec
         )
+        ssl_certificates = smb_spec.ssl_certificates or {}
         config_blobs: Dict[str, Any] = {}
 
         config_blobs['cluster_id'] = smb_spec.cluster_id
@@ -181,40 +254,34 @@ class SMBService(CephService):
         config_blobs['service_ports'] = smb_spec.service_ports()
         if smb_spec.bind_addrs:
             config_blobs['bind_networks'] = smb_spec.bind_networks()
-        if 'remote-control' in smb_spec.features:
-            files = config_blobs.setdefault('files', {})
-            _add_cfg(
-                files,
-                'remote_control.ssl.crt',
-                self._cert_or_uri(smb_spec.remote_control_ssl_cert),
-            )
-            _add_cfg(
-                files,
-                'remote_control.ssl.key',
-                self._cert_or_uri(smb_spec.remote_control_ssl_key),
-            )
-            _add_cfg(
-                files,
-                'remote_control.ca.crt',
-                self._cert_or_uri(smb_spec.remote_control_ca_cert),
-            )
-        if 'keybridge' in smb_spec.features:
-            files = config_blobs.setdefault('files', {})
-            _add_cfg(
-                files,
-                'keybridge.ssl.crt',
-                self._cert_or_uri(smb_spec.keybridge_kmip_ssl_cert),
-            )
-            _add_cfg(
-                files,
-                'keybridge.ssl.key',
-                self._cert_or_uri(smb_spec.keybridge_kmip_ssl_key),
-            )
-            _add_cfg(
-                files,
-                'keybridge.ca.crt',
-                self._cert_or_uri(smb_spec.keybridge_kmip_ca_cert),
-            )
+        for support_feature in smb_spec.features:
+            feature = self._feature_tls_filename(support_feature)
+            if feature is not None:
+                feature_creds = self._get_feature_certs(
+                    daemon_spec, smb_spec, support_feature, feature
+                )
+                if feature_creds:
+                    files = config_blobs.setdefault('files', {})
+                    cert_pem = self._cert_or_uri(feature_creds.cert)
+                    key_pem = self._cert_or_uri(feature_creds.key)
+                    ca_pem = self._cert_or_uri(feature_creds.ca_cert)
+                    _add_cfg(files, f'{feature}.ssl.crt', cert_pem)
+                    _add_cfg(files, f'{feature}.ssl.key', key_pem)
+                    _add_cfg(files, f'{feature}.ca.crt', ca_pem)
+                    svc_name = smb_spec.service_name()
+                    host = daemon_spec.host
+                    cert_name = f'smb_{feature}_ssl_cert'
+                    key_name = f'smb_{feature}_ssl_key'
+                    ca_cert_name = f'smb_{feature}_ca_cert'
+                    self.mgr.cert_mgr.register_cert_key_pair(
+                        'smb', cert_name, key_name, self.SCOPE, ca_cert_name
+                    )
+                    if cert_pem:
+                        self.mgr.cert_mgr.save_cert(cert_name, cert_pem, svc_name, host, user_made=True)
+                    if key_pem:
+                        self.mgr.cert_mgr.save_key(key_name, key_pem, svc_name, host, user_made=True)
+                    if ca_pem:
+                        self.mgr.cert_mgr.save_cert(ca_cert_name, ca_pem, svc_name, host, user_made=True)
         for ext_cluster in smb_spec.ceph_cluster_configs or []:
             files = config_blobs.setdefault('files', {})
             c_name = f'{ext_cluster.alias}.ceph.conf'