return out
+class SSLParameters:
+ def __init__(
+ self,
+ enabled: bool = False,
+ ssl_cert: Optional[str] = None,
+ ssl_key: Optional[str] = None,
+ ssl_ca_cert: Optional[str] = None,
+ certificate_source: Optional[str] = None,
+ ):
+ self.enabled = enabled
+ self.ssl_cert = ssl_cert
+ self.ssl_key = ssl_key
+ self.ssl_ca_cert = ssl_ca_cert
+ self.certificate_source = certificate_source
+ self.validate()
+
+ def validate(
+ self,
+ component: str = "ssl",
+ ca_cert_required: bool = False,
+ ) -> None:
+ if not self.enabled:
+ return
+ missing: list[Any] = []
+ if not self.certificate_source:
+ missing.append("certificate_source")
+ if self.certificate_source == 'inline':
+ if not self.ssl_cert:
+ missing.append("ssl_cert")
+ if not self.ssl_key:
+ missing.append("ssl_key")
+ if ca_cert_required and not self.ssl_ca_cert:
+ missing.append("ssl_ca_cert")
+ if missing:
+ raise ValueError(
+ f"[{component}] SSL is enabled "
+ f"but the following fields are missing: {', '.join(missing)}"
+ )
+
+ @classmethod
+ def from_dict(cls, data: Any) -> 'SSLParameters':
+ if not isinstance(data, dict):
+ return cls(enabled=False)
+
+ return cls(
+ enabled=data.get('enabled', False),
+ ssl_cert=data.get('ssl_cert'),
+ ssl_key=data.get('ssl_key'),
+ ssl_ca_cert=data.get('ssl_ca_cert'),
+ certificate_source=data.get('certificate_source'),
+ )
+
+ def to_json(self) -> Dict[str, Any]:
+ return {
+ 'enabled': self.enabled,
+ 'ssl_cert': self.ssl_cert,
+ 'ssl_key': self.ssl_key,
+ 'ssl_ca_cert': self.ssl_ca_cert,
+ 'certificate_source': self.certificate_source,
+ }
+
+
class SMBExternalCephCluster:
"""Configure access to a non-local Ceph cluster for SMB services."""
def __init__(
# not listed the default port will be used.
custom_ports: Optional[Dict[str, int]] = None,
bind_addrs: Optional[List[SMBClusterBindIPSpec]] = None,
+ ssl: Optional[bool] = None,
+ ssl_certificates: Optional[Dict[str, SSLParameters]] = None,
# === remote control server ===
remote_control_ssl_cert: Optional[str] = None,
remote_control_ssl_key: Optional[str] = None,
) -> None:
if service_type != self.service_type:
raise ValueError(f'invalid service_type: {service_type!r}')
+
+ self.ssl_certificates = {
+ name: (
+ value
+ if isinstance(value, SSLParameters)
+ else SSLParameters.from_dict(value)
+ )
+ for name, value in (ssl_certificates or {}).items()
+ }
+ any_ssl = any(p.enabled for p in self.ssl_certificates.values())
super().__init__(
self.service_type,
service_id=service_id,
placement=placement,
count=count,
config=config,
+ ssl=any_ssl,
unmanaged=unmanaged,
preview_only=preview_only,
networks=networks,
for key in self.custom_ports or {}:
if key not in self._valid_service_names:
raise ValueError(f'{key} is not a valid service name')
+ # TLS certificate validation
+ for feature_name, ssl_params in self.ssl_certificates.items():
+ ssl_params.validate(
+ component=feature_name,
+ ca_cert_required=feature_name in smbconst.CA_CERT_REQUIRED_FEATURES
+ )
def _derive_cluster_uri(self, uri: str, objname: str) -> str:
if not uri.startswith(('rados://', 'mem:')):
spec['ceph_cluster_configs'] = [
c.to_json() for c in spec['ceph_cluster_configs']
]
+ if spec and spec.get('ssl_certificates'):
+ spec['ssl_certificates'] = {
+ k: v.to_json() for k, v in self.ssl_certificates.items()
+ }
return obj