From: Rabinarayan Panigrahi Date: Fri, 24 Apr 2026 10:47:40 +0000 (+0530) Subject: ceph/deployment: Adding class SSLParameters for storing cert, key X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=176de7b9030919ee268c733de387170ea60ca454;p=ceph.git ceph/deployment: Adding class SSLParameters for storing cert, key and ca-cert for ssl and other features Signed-off-by: Rabinarayan Panigrahi --- diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py index b5726820ee5..1d4b913c4c2 100644 --- a/src/python-common/ceph/deployment/service_spec.py +++ b/src/python-common/ceph/deployment/service_spec.py @@ -3959,6 +3959,68 @@ class SMBClusterBindIPSpec: return out +class SSLParameters: + def __init__( + self, + enabled: bool = False, + ssl_cert: Optional[str] = None, + ssl_key: Optional[str] = None, + ssl_ca_cert: Optional[str] = None, + certificate_source: Optional[str] = None, + ): + self.enabled = enabled + self.ssl_cert = ssl_cert + self.ssl_key = ssl_key + self.ssl_ca_cert = ssl_ca_cert + self.certificate_source = certificate_source + self.validate() + + def validate( + self, + component: str = "ssl", + ca_cert_required: bool = False, + ) -> None: + if not self.enabled: + return + missing: list[Any] = [] + if not self.certificate_source: + missing.append("certificate_source") + if self.certificate_source == 'inline': + if not self.ssl_cert: + missing.append("ssl_cert") + if not self.ssl_key: + missing.append("ssl_key") + if ca_cert_required and not self.ssl_ca_cert: + missing.append("ssl_ca_cert") + if missing: + raise ValueError( + f"[{component}] SSL is enabled " + f"but the following fields are missing: {', '.join(missing)}" + ) + + @classmethod + def from_dict(cls, data: Any) -> 'SSLParameters': + if not isinstance(data, dict): + return cls(enabled=False) + + return cls( + enabled=data.get('enabled', False), + ssl_cert=data.get('ssl_cert'), + ssl_key=data.get('ssl_key'), + ssl_ca_cert=data.get('ssl_ca_cert'), + certificate_source=data.get('certificate_source'), + ) + + def to_json(self) -> Dict[str, Any]: + return { + 'enabled': self.enabled, + 'ssl_cert': self.ssl_cert, + 'ssl_key': self.ssl_key, + 'ssl_ca_cert': self.ssl_ca_cert, + 'certificate_source': self.certificate_source, + } + + class SMBExternalCephCluster: """Configure access to a non-local Ceph cluster for SMB services.""" def __init__( @@ -4101,6 +4163,8 @@ class SMBSpec(ServiceSpec): # not listed the default port will be used. custom_ports: Optional[Dict[str, int]] = None, bind_addrs: Optional[List[SMBClusterBindIPSpec]] = None, + ssl: Optional[bool] = None, + ssl_certificates: Optional[Dict[str, SSLParameters]] = None, # === remote control server === remote_control_ssl_cert: Optional[str] = None, remote_control_ssl_key: Optional[str] = None, @@ -4124,12 +4188,23 @@ class SMBSpec(ServiceSpec): ) -> None: if service_type != self.service_type: raise ValueError(f'invalid service_type: {service_type!r}') + + self.ssl_certificates = { + name: ( + value + if isinstance(value, SSLParameters) + else SSLParameters.from_dict(value) + ) + for name, value in (ssl_certificates or {}).items() + } + any_ssl = any(p.enabled for p in self.ssl_certificates.values()) super().__init__( self.service_type, service_id=service_id, placement=placement, count=count, config=config, + ssl=any_ssl, unmanaged=unmanaged, preview_only=preview_only, networks=networks, @@ -4200,6 +4275,12 @@ class SMBSpec(ServiceSpec): for key in self.custom_ports or {}: if key not in self._valid_service_names: raise ValueError(f'{key} is not a valid service name') + # TLS certificate validation + for feature_name, ssl_params in self.ssl_certificates.items(): + ssl_params.validate( + component=feature_name, + ca_cert_required=feature_name in smbconst.CA_CERT_REQUIRED_FEATURES + ) def _derive_cluster_uri(self, uri: str, objname: str) -> str: if not uri.startswith(('rados://', 'mem:')): @@ -4253,6 +4334,10 @@ class SMBSpec(ServiceSpec): spec['ceph_cluster_configs'] = [ c.to_json() for c in spec['ceph_cluster_configs'] ] + if spec and spec.get('ssl_certificates'): + spec['ssl_certificates'] = { + k: v.to_json() for k, v in self.ssl_certificates.items() + } return obj