import errno
import logging
import json
-from typing import List, cast, Optional
+from typing import List, cast, Optional, NamedTuple
from ipaddress import ip_address, IPv6Address
from mgr_module import HandleCommandResult
NVMEOF_CLIENT_CERT_LABEL = 'client'
+class NvmeofTLSBundle(NamedTuple):
+ server_cert: str = ''
+ server_key: str = ''
+ client_cert: str = ''
+ client_key: str = ''
+ ca_cert: str = ''
+
+
@register_cephadm_service
class NvmeofService(CephService):
TYPE = 'nvmeof'
for blocking_daemon in blocking_daemons if blocking_daemon.hostname is not None
]
return blocking_daemon_hosts
+
+ def _pick_running_daemon_host_for_service(self, service_name: str) -> Optional[str]:
+ """
+ Resolve a deterministic host for a service when HOST-scoped objects are needed.
+ Picks the first RUNNING daemon host from the orchestrator cache.
+ Returns None if none found.
+ """
+ try:
+ dds = self.mgr.cache.get_daemons_by_service(service_name)
+ except Exception:
+ return None
+
+ for dd in dds:
+ # dd.hostname is the short host name used for HOST-scoped certmgr objects
+ if dd.status == DaemonDescriptionStatus.running and dd.hostname:
+ return dd.hostname
+
+ # fallback: any host if nothing is RUNNING
+ for dd in dds:
+ if dd.hostname:
+ return dd.hostname
+
+ return None
+
+ def get_nvmeof_tls_bundle(self, service_name: str) -> Optional[NvmeofTLSBundle]:
+ """
+ Deterministic NVMeoF TLS bundle retrieval based on the NVMeoF spec's certificate_source.
+
+ - INLINE: read from spec fields (ssl_cert/ssl_key[/client_cert/client_key/root_ca_cert]).
+ - REFERENCE: read from certmgr store objects (nvmeof_* names).
+ - CEPHADM_SIGNED: read from cephadm-signed objects (self_signed_* names) + certmgr root CA.
+
+ """
+ spec = cast(NvmeofServiceSpec, self.mgr.spec_store.all_specs.get(service_name, None))
+ if spec is None:
+ return None
+
+ # NVMeoF TLS may be disabled at spec level
+ if not getattr(spec, 'ssl', False):
+ return NvmeofTLSBundle()
+
+ cert_source = getattr(spec, 'certificate_source', None)
+ valid_sources = [source.value for source in CertificateSource]
+ if cert_source not in valid_sources:
+ # Unknown / unset certificate_source
+ logger.error(f"Found unknown/invalid certificate_source='{cert_source}' for service '{service_name}'")
+ return None
+
+ server_cert = ''
+ server_key = ''
+ client_cert = ''
+ client_key = ''
+ ca_cert = ''
+ cert_mgr = self.mgr.cert_mgr
+ enable_mtls = getattr(spec, 'enable_auth', False)
+
+ # -------- INLINE --------
+ if cert_source == CertificateSource.INLINE.value:
+ server_cert = getattr(spec, 'server_cert', None) or getattr(spec, 'ssl_cert', '') or ''
+ server_key = getattr(spec, 'server_key', None) or getattr(spec, 'ssl_key', '') or ''
+ ca_cert = getattr(spec, 'root_ca_cert', '') or ''
+ if enable_mtls:
+ client_cert = getattr(spec, 'client_cert', '') or ''
+ client_key = getattr(spec, 'client_key', '') or ''
+
+ # -------- REFERENCE --------
+ elif cert_source == CertificateSource.REFERENCE.value:
+ server_cert = cert_mgr.get_cert(self.cert_name, service_name=service_name) or ''
+ server_key = cert_mgr.get_key(self.key_name, service_name=service_name) or ''
+ ca_cert = cert_mgr.get_cert(self.ca_cert_name, service_name=service_name) or ''
+ if enable_mtls:
+ client_cert = cert_mgr.get_cert(self.client_cert_name, service_name=service_name) or ''
+ client_key = cert_mgr.get_key(self.client_key_name, service_name=service_name) or ''
+
+ # -------- CEPHADM_SIGNED --------
+ elif cert_source == CertificateSource.CEPHADM_SIGNED.value:
+ hostname = self._pick_running_daemon_host_for_service(service_name)
+ if not hostname:
+ logger.error(f"certificate_source=cephadm-signed for '{service_name}' but no hostname could be resolved")
+ return None
+
+ server_creds = cert_mgr.get_self_signed_tls_credentials(service_name, hostname)
+ server_cert = server_creds.cert
+ server_key = server_creds.key
+ ca_cert = server_creds.ca_cert or ''
+ if enable_mtls:
+ client_creds = cert_mgr.get_self_signed_tls_credentials(service_name, hostname, NVMEOF_CLIENT_CERT_LABEL)
+ client_cert = client_creds.cert
+ client_key = client_creds.key
+
+ # -------- Build bundle --------
+ # Return bundle with client creds only if mTLS is enabled
+ if enable_mtls:
+ return NvmeofTLSBundle(
+ client_cert=client_cert,
+ client_key=client_key,
+ server_cert=server_cert,
+ server_key=server_key,
+ ca_cert=ca_cert
+ )
+ else:
+ return NvmeofTLSBundle(
+ server_cert=server_cert,
+ server_key=server_key,
+ ca_cert=ca_cert
+ )