From: Redouane Kachach Date: Wed, 14 Jan 2026 15:16:20 +0000 (+0100) Subject: mgr/cephadm: adding unit-test for nvmeof TLS bundle API X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=72fd48b679933152dafeb6b5a48d393a6cbc9d35;p=ceph-ci.git mgr/cephadm: adding unit-test for nvmeof TLS bundle API Signed-off-by: Redouane Kachach --- diff --git a/src/pybind/mgr/cephadm/tests/test_services.py b/src/pybind/mgr/cephadm/tests/test_services.py index 5290328f2eb..4c32cc377ad 100644 --- a/src/pybind/mgr/cephadm/tests/test_services.py +++ b/src/pybind/mgr/cephadm/tests/test_services.py @@ -8,6 +8,7 @@ import pytest from unittest.mock import Mock, MagicMock, call, patch, ANY +from cephadm.services.nvmeof import NVMEOF_CLIENT_CERT_LABEL from cephadm.serve import CephadmServe from cephadm.services.service_registry import service_registry from cephadm.services.cephadmservice import MonService, CephadmDaemonDeploySpec @@ -5764,3 +5765,334 @@ class TestMgmtGateway: error_ok=True, use_current_daemon_image=False, ) + + +class TestNvmeofTLSBundle: + def _store_spec(self, cephadm_module: CephadmOrchestrator, spec: NvmeofServiceSpec) -> None: + # SpecStore in unit tests stores ServiceSpec objects directly. + cephadm_module.spec_store._specs[spec.service_name()] = spec + # Some SpecStore helpers expect spec_created to exist. + if hasattr(cephadm_module.spec_store, 'spec_created'): + cephadm_module.spec_store.spec_created[spec.service_name()] = datetime_now() + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_ssl_disabled(self, cephadm_module: CephadmOrchestrator): + """Test that SSL disabled returns empty bundle""" + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=False, + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == '' + assert bundle.server_key == '' + assert bundle.client_cert == '' + assert bundle.client_key == '' + assert bundle.ca_cert == '' + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_inline_server_only(self, cephadm_module: CephadmOrchestrator): + """Test INLINE certificate source with server creds only (no mTLS)""" + server_cert = '-----BEGIN CERTIFICATE-----\nSERVER_CERT\n-----END CERTIFICATE-----' + server_key = '-----BEGIN PRIVATE KEY-----\nSERVER_KEY\n-----END PRIVATE KEY-----' + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='inline', + ssl_cert=server_cert, + ssl_key=server_key, + enable_auth=False, + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == server_cert + assert bundle.server_key == server_key + assert bundle.client_cert == '' + assert bundle.client_key == '' + assert bundle.ca_cert == '' + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_inline_with_mtls(self, cephadm_module: CephadmOrchestrator): + """Test INLINE certificate source with mTLS enabled""" + server_cert = '-----BEGIN CERTIFICATE-----\nSERVER_CERT\n-----END CERTIFICATE-----' + server_key = '-----BEGIN PRIVATE KEY-----\nSERVER_KEY\n-----END PRIVATE KEY-----' + client_cert = '-----BEGIN CERTIFICATE-----\nCLIENT_CERT\n-----END CERTIFICATE-----' + client_key = '-----BEGIN PRIVATE KEY-----\nCLIENT_KEY\n-----END PRIVATE KEY-----' + ca_cert = '-----BEGIN CERTIFICATE-----\nCA_CERT\n-----END CERTIFICATE-----' + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='inline', + ssl_cert=server_cert, + ssl_key=server_key, + enable_auth=True, + client_cert=client_cert, + client_key=client_key, + root_ca_cert=ca_cert, + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == server_cert + assert bundle.server_key == server_key + assert bundle.client_cert == client_cert + assert bundle.client_key == client_key + assert bundle.ca_cert == ca_cert + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_reference_server_only(self, cephadm_module: CephadmOrchestrator): + """Test REFERENCE certificate source with server creds only""" + server_cert = '-----BEGIN CERTIFICATE-----\nREF_SERVER_CERT\n-----END CERTIFICATE-----' + server_key = '-----BEGIN PRIVATE KEY-----\nREF_SERVER_KEY\n-----END PRIVATE KEY-----' + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='reference', + enable_auth=False, + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + + # Mock cert_mgr.get_cert and get_key for SERVICE-scoped lookups + def _get_cert(name, service_name=None, host=None): + if name == nvmeof_service.cert_name: + return server_cert + return None + + def _get_key(name, service_name=None, host=None): + if name == nvmeof_service.key_name: + return server_key + return None + + with patch.object(cephadm_module.cert_mgr, 'get_cert', side_effect=_get_cert), \ + patch.object(cephadm_module.cert_mgr, 'get_key', side_effect=_get_key): + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == server_cert + assert bundle.server_key == server_key + assert bundle.client_cert == '' + assert bundle.client_key == '' + assert bundle.ca_cert == '' + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_reference_with_mtls(self, cephadm_module: CephadmOrchestrator): + """Test REFERENCE certificate source with mTLS enabled""" + server_cert = '-----BEGIN CERTIFICATE-----\nREF_SERVER_CERT\n-----END CERTIFICATE-----' + server_key = '-----BEGIN PRIVATE KEY-----\nREF_SERVER_KEY\n-----END PRIVATE KEY-----' + client_cert = '-----BEGIN CERTIFICATE-----\nREF_CLIENT_CERT\n-----END CERTIFICATE-----' + client_key = '-----BEGIN PRIVATE KEY-----\nREF_CLIENT_KEY\n-----END PRIVATE KEY-----' + ca_cert = '-----BEGIN CERTIFICATE-----\nREF_CA_CERT\n-----END CERTIFICATE-----' + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='reference', + enable_auth=True, + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + + def _get_cert(name, service_name=None, host=None): + if name == nvmeof_service.cert_name: + return server_cert + if name == nvmeof_service.client_cert_name: + return client_cert + if name == nvmeof_service.ca_cert_name: + return ca_cert + return None + + def _get_key(name, service_name=None, host=None): + if name == nvmeof_service.key_name: + return server_key + if name == nvmeof_service.client_key_name: + return client_key + return None + + with patch.object(cephadm_module.cert_mgr, 'get_cert', side_effect=_get_cert), \ + patch.object(cephadm_module.cert_mgr, 'get_key', side_effect=_get_key): + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == server_cert + assert bundle.server_key == server_key + assert bundle.client_cert == client_cert + assert bundle.client_key == client_key + assert bundle.ca_cert == ca_cert + + @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service") + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_cephadm_signed_server_only(self, _pick_host, cephadm_module: CephadmOrchestrator): + """Test CEPHADM_SIGNED certificate source without mTLS""" + _pick_host.return_value = 'test-host' + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='cephadm-signed', + enable_auth=False, + ) + self._store_spec(cephadm_module, spec) + + server_creds = TLSCredentials( + cert=ceph_generated_cert, + key=ceph_generated_key, + ca_cert=cephadm_root_ca, + ) + + def _get_self_signed(service_name, hostname, label=None): + assert label is None + return server_creds + + nvmeof_service = NvmeofService(cephadm_module) + with patch.object(cephadm_module.cert_mgr, 'get_self_signed_tls_credentials', side_effect=_get_self_signed): + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == ceph_generated_cert + assert bundle.server_key == ceph_generated_key + assert bundle.ca_cert == cephadm_root_ca + assert bundle.client_cert == '' + assert bundle.client_key == '' + + @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service") + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_cephadm_signed_with_mtls(self, _pick_host, cephadm_module: CephadmOrchestrator): + """Test CEPHADM_SIGNED certificate source with mTLS enabled""" + _pick_host.return_value = 'test-host' + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='cephadm-signed', + enable_auth=True, + ) + self._store_spec(cephadm_module, spec) + + server_creds = TLSCredentials( + cert=ceph_generated_cert, + key=ceph_generated_key, + ca_cert=cephadm_root_ca, + ) + client_creds = TLSCredentials( + cert='-----BEGIN CERTIFICATE-----\nCLIENT_CERT\n-----END CERTIFICATE-----', + key='-----BEGIN PRIVATE KEY-----\nCLIENT_KEY\n-----END PRIVATE KEY-----', + ca_cert=cephadm_root_ca, + ) + + def _get_self_signed(service_name, hostname, label=None): + if label == NVMEOF_CLIENT_CERT_LABEL: + return client_creds + assert label is None + return server_creds + + nvmeof_service = NvmeofService(cephadm_module) + with patch.object(cephadm_module.cert_mgr, 'get_self_signed_tls_credentials', side_effect=_get_self_signed): + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == ceph_generated_cert + assert bundle.server_key == ceph_generated_key + assert bundle.client_cert == client_creds.cert + assert bundle.client_key == client_creds.key + # API returns the CA from server_creds + assert bundle.ca_cert == cephadm_root_ca + + @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service") + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_cephadm_signed_no_hostname(self, _pick_host, cephadm_module: CephadmOrchestrator): + """Test CEPHADM_SIGNED returns None when no hostname can be resolved""" + _pick_host.return_value = None + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='cephadm-signed', + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + assert bundle is None + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_service_not_found(self, cephadm_module: CephadmOrchestrator): + """Test that None is returned when service doesn't exist""" + nvmeof_service = NvmeofService(cephadm_module) + bundle = nvmeof_service.get_nvmeof_tls_bundle('nvmeof.nonexistent') + assert bundle is None + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_unknown_cert_source(self, cephadm_module: CephadmOrchestrator): + """Test that None is returned for unknown certificate_source""" + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='unknown-source', + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + assert bundle is None + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_reference_missing_certs(self, cephadm_module: CephadmOrchestrator): + """Test REFERENCE source when certs are not stored in certmgr""" + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='reference', + enable_auth=True, + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + + # Mock get_cert and get_key to return None for all lookups + with patch.object(cephadm_module.cert_mgr, 'get_cert', return_value=None), \ + patch.object(cephadm_module.cert_mgr, 'get_key', return_value=None): + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == '' + assert bundle.server_key == '' + assert bundle.client_cert == '' + assert bundle.client_key == '' + assert bundle.ca_cert == ''