from unittest.mock import Mock, MagicMock, call, patch, ANY
+from cephadm.services.nvmeof import NVMEOF_CLIENT_CERT_LABEL
from cephadm.serve import CephadmServe
from cephadm.services.service_registry import service_registry
from cephadm.services.cephadmservice import MonService, CephadmDaemonDeploySpec
error_ok=True,
use_current_daemon_image=False,
)
+
+
+class TestNvmeofTLSBundle:
+ def _store_spec(self, cephadm_module: CephadmOrchestrator, spec: NvmeofServiceSpec) -> None:
+ # SpecStore in unit tests stores ServiceSpec objects directly.
+ cephadm_module.spec_store._specs[spec.service_name()] = spec
+ # Some SpecStore helpers expect spec_created to exist.
+ if hasattr(cephadm_module.spec_store, 'spec_created'):
+ cephadm_module.spec_store.spec_created[spec.service_name()] = datetime_now()
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_get_nvmeof_tls_bundle_ssl_disabled(self, cephadm_module: CephadmOrchestrator):
+ """Test that SSL disabled returns empty bundle"""
+ spec = NvmeofServiceSpec(
+ service_id='test.group',
+ pool='testpool',
+ group='group',
+ ssl=False,
+ )
+ self._store_spec(cephadm_module, spec)
+
+ nvmeof_service = NvmeofService(cephadm_module)
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+ assert bundle is not None
+ assert bundle.server_cert == ''
+ assert bundle.server_key == ''
+ assert bundle.client_cert == ''
+ assert bundle.client_key == ''
+ assert bundle.ca_cert == ''
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_get_nvmeof_tls_bundle_inline_server_only(self, cephadm_module: CephadmOrchestrator):
+ """Test INLINE certificate source with server creds only (no mTLS)"""
+ server_cert = '-----BEGIN CERTIFICATE-----\nSERVER_CERT\n-----END CERTIFICATE-----'
+ server_key = '-----BEGIN PRIVATE KEY-----\nSERVER_KEY\n-----END PRIVATE KEY-----'
+
+ spec = NvmeofServiceSpec(
+ service_id='test.group',
+ pool='testpool',
+ group='group',
+ ssl=True,
+ certificate_source='inline',
+ ssl_cert=server_cert,
+ ssl_key=server_key,
+ enable_auth=False,
+ )
+ self._store_spec(cephadm_module, spec)
+
+ nvmeof_service = NvmeofService(cephadm_module)
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+ assert bundle is not None
+ assert bundle.server_cert == server_cert
+ assert bundle.server_key == server_key
+ assert bundle.client_cert == ''
+ assert bundle.client_key == ''
+ assert bundle.ca_cert == ''
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_get_nvmeof_tls_bundle_inline_with_mtls(self, cephadm_module: CephadmOrchestrator):
+ """Test INLINE certificate source with mTLS enabled"""
+ server_cert = '-----BEGIN CERTIFICATE-----\nSERVER_CERT\n-----END CERTIFICATE-----'
+ server_key = '-----BEGIN PRIVATE KEY-----\nSERVER_KEY\n-----END PRIVATE KEY-----'
+ client_cert = '-----BEGIN CERTIFICATE-----\nCLIENT_CERT\n-----END CERTIFICATE-----'
+ client_key = '-----BEGIN PRIVATE KEY-----\nCLIENT_KEY\n-----END PRIVATE KEY-----'
+ ca_cert = '-----BEGIN CERTIFICATE-----\nCA_CERT\n-----END CERTIFICATE-----'
+
+ spec = NvmeofServiceSpec(
+ service_id='test.group',
+ pool='testpool',
+ group='group',
+ ssl=True,
+ certificate_source='inline',
+ ssl_cert=server_cert,
+ ssl_key=server_key,
+ enable_auth=True,
+ client_cert=client_cert,
+ client_key=client_key,
+ root_ca_cert=ca_cert,
+ )
+ self._store_spec(cephadm_module, spec)
+
+ nvmeof_service = NvmeofService(cephadm_module)
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+ assert bundle is not None
+ assert bundle.server_cert == server_cert
+ assert bundle.server_key == server_key
+ assert bundle.client_cert == client_cert
+ assert bundle.client_key == client_key
+ assert bundle.ca_cert == ca_cert
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_get_nvmeof_tls_bundle_reference_server_only(self, cephadm_module: CephadmOrchestrator):
+ """Test REFERENCE certificate source with server creds only"""
+ server_cert = '-----BEGIN CERTIFICATE-----\nREF_SERVER_CERT\n-----END CERTIFICATE-----'
+ server_key = '-----BEGIN PRIVATE KEY-----\nREF_SERVER_KEY\n-----END PRIVATE KEY-----'
+
+ spec = NvmeofServiceSpec(
+ service_id='test.group',
+ pool='testpool',
+ group='group',
+ ssl=True,
+ certificate_source='reference',
+ enable_auth=False,
+ )
+ self._store_spec(cephadm_module, spec)
+
+ nvmeof_service = NvmeofService(cephadm_module)
+
+ # Mock cert_mgr.get_cert and get_key for SERVICE-scoped lookups
+ def _get_cert(name, service_name=None, host=None):
+ if name == nvmeof_service.cert_name:
+ return server_cert
+ return None
+
+ def _get_key(name, service_name=None, host=None):
+ if name == nvmeof_service.key_name:
+ return server_key
+ return None
+
+ with patch.object(cephadm_module.cert_mgr, 'get_cert', side_effect=_get_cert), \
+ patch.object(cephadm_module.cert_mgr, 'get_key', side_effect=_get_key):
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+ assert bundle is not None
+ assert bundle.server_cert == server_cert
+ assert bundle.server_key == server_key
+ assert bundle.client_cert == ''
+ assert bundle.client_key == ''
+ assert bundle.ca_cert == ''
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_get_nvmeof_tls_bundle_reference_with_mtls(self, cephadm_module: CephadmOrchestrator):
+ """Test REFERENCE certificate source with mTLS enabled"""
+ server_cert = '-----BEGIN CERTIFICATE-----\nREF_SERVER_CERT\n-----END CERTIFICATE-----'
+ server_key = '-----BEGIN PRIVATE KEY-----\nREF_SERVER_KEY\n-----END PRIVATE KEY-----'
+ client_cert = '-----BEGIN CERTIFICATE-----\nREF_CLIENT_CERT\n-----END CERTIFICATE-----'
+ client_key = '-----BEGIN PRIVATE KEY-----\nREF_CLIENT_KEY\n-----END PRIVATE KEY-----'
+ ca_cert = '-----BEGIN CERTIFICATE-----\nREF_CA_CERT\n-----END CERTIFICATE-----'
+
+ spec = NvmeofServiceSpec(
+ service_id='test.group',
+ pool='testpool',
+ group='group',
+ ssl=True,
+ certificate_source='reference',
+ enable_auth=True,
+ )
+ self._store_spec(cephadm_module, spec)
+
+ nvmeof_service = NvmeofService(cephadm_module)
+
+ def _get_cert(name, service_name=None, host=None):
+ if name == nvmeof_service.cert_name:
+ return server_cert
+ if name == nvmeof_service.client_cert_name:
+ return client_cert
+ if name == nvmeof_service.ca_cert_name:
+ return ca_cert
+ return None
+
+ def _get_key(name, service_name=None, host=None):
+ if name == nvmeof_service.key_name:
+ return server_key
+ if name == nvmeof_service.client_key_name:
+ return client_key
+ return None
+
+ with patch.object(cephadm_module.cert_mgr, 'get_cert', side_effect=_get_cert), \
+ patch.object(cephadm_module.cert_mgr, 'get_key', side_effect=_get_key):
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+ assert bundle is not None
+ assert bundle.server_cert == server_cert
+ assert bundle.server_key == server_key
+ assert bundle.client_cert == client_cert
+ assert bundle.client_key == client_key
+ assert bundle.ca_cert == ca_cert
+
+ @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service")
+ @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_get_nvmeof_tls_bundle_cephadm_signed_server_only(self, _pick_host, cephadm_module: CephadmOrchestrator):
+ """Test CEPHADM_SIGNED certificate source without mTLS"""
+ _pick_host.return_value = 'test-host'
+
+ spec = NvmeofServiceSpec(
+ service_id='test.group',
+ pool='testpool',
+ group='group',
+ ssl=True,
+ certificate_source='cephadm-signed',
+ enable_auth=False,
+ )
+ self._store_spec(cephadm_module, spec)
+
+ server_creds = TLSCredentials(
+ cert=ceph_generated_cert,
+ key=ceph_generated_key,
+ ca_cert=cephadm_root_ca,
+ )
+
+ def _get_self_signed(service_name, hostname, label=None):
+ assert label is None
+ return server_creds
+
+ nvmeof_service = NvmeofService(cephadm_module)
+ with patch.object(cephadm_module.cert_mgr, 'get_self_signed_tls_credentials', side_effect=_get_self_signed):
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+ assert bundle is not None
+ assert bundle.server_cert == ceph_generated_cert
+ assert bundle.server_key == ceph_generated_key
+ assert bundle.ca_cert == cephadm_root_ca
+ assert bundle.client_cert == ''
+ assert bundle.client_key == ''
+
+ @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service")
+ @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_get_nvmeof_tls_bundle_cephadm_signed_with_mtls(self, _pick_host, cephadm_module: CephadmOrchestrator):
+ """Test CEPHADM_SIGNED certificate source with mTLS enabled"""
+ _pick_host.return_value = 'test-host'
+
+ spec = NvmeofServiceSpec(
+ service_id='test.group',
+ pool='testpool',
+ group='group',
+ ssl=True,
+ certificate_source='cephadm-signed',
+ enable_auth=True,
+ )
+ self._store_spec(cephadm_module, spec)
+
+ server_creds = TLSCredentials(
+ cert=ceph_generated_cert,
+ key=ceph_generated_key,
+ ca_cert=cephadm_root_ca,
+ )
+ client_creds = TLSCredentials(
+ cert='-----BEGIN CERTIFICATE-----\nCLIENT_CERT\n-----END CERTIFICATE-----',
+ key='-----BEGIN PRIVATE KEY-----\nCLIENT_KEY\n-----END PRIVATE KEY-----',
+ ca_cert=cephadm_root_ca,
+ )
+
+ def _get_self_signed(service_name, hostname, label=None):
+ if label == NVMEOF_CLIENT_CERT_LABEL:
+ return client_creds
+ assert label is None
+ return server_creds
+
+ nvmeof_service = NvmeofService(cephadm_module)
+ with patch.object(cephadm_module.cert_mgr, 'get_self_signed_tls_credentials', side_effect=_get_self_signed):
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+ assert bundle is not None
+ assert bundle.server_cert == ceph_generated_cert
+ assert bundle.server_key == ceph_generated_key
+ assert bundle.client_cert == client_creds.cert
+ assert bundle.client_key == client_creds.key
+ # API returns the CA from server_creds
+ assert bundle.ca_cert == cephadm_root_ca
+
+ @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service")
+ @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_get_nvmeof_tls_bundle_cephadm_signed_no_hostname(self, _pick_host, cephadm_module: CephadmOrchestrator):
+ """Test CEPHADM_SIGNED returns None when no hostname can be resolved"""
+ _pick_host.return_value = None
+
+ spec = NvmeofServiceSpec(
+ service_id='test.group',
+ pool='testpool',
+ group='group',
+ ssl=True,
+ certificate_source='cephadm-signed',
+ )
+ self._store_spec(cephadm_module, spec)
+
+ nvmeof_service = NvmeofService(cephadm_module)
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+ assert bundle is None
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_get_nvmeof_tls_bundle_service_not_found(self, cephadm_module: CephadmOrchestrator):
+ """Test that None is returned when service doesn't exist"""
+ nvmeof_service = NvmeofService(cephadm_module)
+ bundle = nvmeof_service.get_nvmeof_tls_bundle('nvmeof.nonexistent')
+ assert bundle is None
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_get_nvmeof_tls_bundle_unknown_cert_source(self, cephadm_module: CephadmOrchestrator):
+ """Test that None is returned for unknown certificate_source"""
+ spec = NvmeofServiceSpec(
+ service_id='test.group',
+ pool='testpool',
+ group='group',
+ ssl=True,
+ certificate_source='unknown-source',
+ )
+ self._store_spec(cephadm_module, spec)
+
+ nvmeof_service = NvmeofService(cephadm_module)
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+ assert bundle is None
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_get_nvmeof_tls_bundle_reference_missing_certs(self, cephadm_module: CephadmOrchestrator):
+ """Test REFERENCE source when certs are not stored in certmgr"""
+ spec = NvmeofServiceSpec(
+ service_id='test.group',
+ pool='testpool',
+ group='group',
+ ssl=True,
+ certificate_source='reference',
+ enable_auth=True,
+ )
+ self._store_spec(cephadm_module, spec)
+
+ nvmeof_service = NvmeofService(cephadm_module)
+
+ # Mock get_cert and get_key to return None for all lookups
+ with patch.object(cephadm_module.cert_mgr, 'get_cert', return_value=None), \
+ patch.object(cephadm_module.cert_mgr, 'get_key', return_value=None):
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+ assert bundle is not None
+ assert bundle.server_cert == ''
+ assert bundle.server_key == ''
+ assert bundle.client_cert == ''
+ assert bundle.client_key == ''
+ assert bundle.ca_cert == ''