if hasattr(cephadm_module.spec_store, 'spec_created'):
cephadm_module.spec_store.spec_created[spec.service_name()] = datetime_now()
+ def _daemon_name(self, spec: NvmeofServiceSpec) -> str:
+ return f'{spec.service_name()}.test-daemon'
+
@patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_get_nvmeof_tls_bundle_ssl_disabled(self, cephadm_module: CephadmOrchestrator):
"""Test that SSL disabled returns empty bundle"""
self._store_spec(cephadm_module, spec)
nvmeof_service = NvmeofService(cephadm_module)
- bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(
+ spec.service_name(), self._daemon_name(spec)
+ )
assert bundle is not None
assert bundle.server_cert == ''
self._store_spec(cephadm_module, spec)
nvmeof_service = NvmeofService(cephadm_module)
- bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(
+ spec.service_name(), self._daemon_name(spec)
+ )
assert bundle is not None
assert bundle.server_cert == server_cert
self._store_spec(cephadm_module, spec)
nvmeof_service = NvmeofService(cephadm_module)
- bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(
+ spec.service_name(), self._daemon_name(spec)
+ )
assert bundle is not None
assert bundle.server_cert == server_cert
self._store_spec(cephadm_module, spec)
nvmeof_service = NvmeofService(cephadm_module)
+ daemon_name = self._daemon_name(spec)
# Mock cert_mgr.get_cert and get_key for SERVICE-scoped lookups
def _get_cert(name, service_name=None, host=None):
with patch.object(cephadm_module.cert_mgr, 'get_cert', side_effect=_get_cert), \
patch.object(cephadm_module.cert_mgr, 'get_key', side_effect=_get_key):
- bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name(), daemon_name)
assert bundle is not None
assert bundle.server_cert == server_cert
self._store_spec(cephadm_module, spec)
nvmeof_service = NvmeofService(cephadm_module)
+ daemon_name = self._daemon_name(spec)
def _get_cert(name, service_name=None, host=None):
if name == nvmeof_service.cert_name:
with patch.object(cephadm_module.cert_mgr, 'get_cert', side_effect=_get_cert), \
patch.object(cephadm_module.cert_mgr, 'get_key', side_effect=_get_key):
- bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name(), daemon_name)
assert bundle is not None
assert bundle.server_cert == server_cert
assert bundle.client_key == client_key
assert bundle.ca_cert == ca_cert
- @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service")
+ @patch("cephadm.services.nvmeof.NvmeofService._get_daemon_hostname")
@patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
- def test_get_nvmeof_tls_bundle_cephadm_signed_server_only(self, _pick_host, cephadm_module: CephadmOrchestrator):
+ def test_get_nvmeof_tls_bundle_cephadm_signed_server_only(self, _get_daemon_hostname, cephadm_module: CephadmOrchestrator):
"""Test CEPHADM_SIGNED certificate source without mTLS"""
- _pick_host.return_value = 'test-host'
+ _get_daemon_hostname.return_value = 'test-host'
spec = NvmeofServiceSpec(
service_id='test.group',
return server_creds
nvmeof_service = NvmeofService(cephadm_module)
- with patch.object(cephadm_module.cert_mgr, 'get_self_signed_tls_credentials', side_effect=_get_self_signed):
- bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+ daemon_name = self._daemon_name(spec)
+ with patch.object(
+ cephadm_module.cert_mgr,
+ 'get_self_signed_tls_credentials',
+ side_effect=_get_self_signed,
+ ):
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name(), daemon_name)
assert bundle is not None
assert bundle.server_cert == ceph_generated_cert
assert bundle.client_cert == ''
assert bundle.client_key == ''
- @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service")
+ @patch("cephadm.services.nvmeof.NvmeofService._get_daemon_hostname")
@patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
- def test_get_nvmeof_tls_bundle_cephadm_signed_with_mtls(self, _pick_host, cephadm_module: CephadmOrchestrator):
+ def test_get_nvmeof_tls_bundle_cephadm_signed_with_mtls(self, _get_daemon_hostname, cephadm_module: CephadmOrchestrator):
"""Test CEPHADM_SIGNED certificate source with mTLS enabled"""
- _pick_host.return_value = 'test-host'
+ _get_daemon_hostname.return_value = 'test-host'
spec = NvmeofServiceSpec(
service_id='test.group',
return server_creds
nvmeof_service = NvmeofService(cephadm_module)
- with patch.object(cephadm_module.cert_mgr, 'get_self_signed_tls_credentials', side_effect=_get_self_signed):
- bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+ daemon_name = self._daemon_name(spec)
+ with patch.object(
+ cephadm_module.cert_mgr,
+ 'get_self_signed_tls_credentials',
+ side_effect=_get_self_signed,
+ ):
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name(), daemon_name)
assert bundle is not None
assert bundle.server_cert == ceph_generated_cert
# API returns the CA from server_creds
assert bundle.ca_cert == cephadm_root_ca
- @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service")
+ @patch("cephadm.services.nvmeof.NvmeofService._get_daemon_hostname")
@patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
- def test_get_nvmeof_tls_bundle_cephadm_signed_no_hostname(self, _pick_host, cephadm_module: CephadmOrchestrator):
+ def test_get_nvmeof_tls_bundle_cephadm_signed_no_hostname(self, _get_daemon_hostname, cephadm_module: CephadmOrchestrator):
"""Test CEPHADM_SIGNED returns None when no hostname can be resolved"""
- _pick_host.return_value = None
+ _get_daemon_hostname.return_value = None
spec = NvmeofServiceSpec(
service_id='test.group',
self._store_spec(cephadm_module, spec)
nvmeof_service = NvmeofService(cephadm_module)
- bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(
+ spec.service_name(), self._daemon_name(spec)
+ )
assert bundle is None
@patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_get_nvmeof_tls_bundle_service_not_found(self, cephadm_module: CephadmOrchestrator):
"""Test that None is returned when service doesn't exist"""
nvmeof_service = NvmeofService(cephadm_module)
- bundle = nvmeof_service.get_nvmeof_tls_bundle('nvmeof.nonexistent')
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(
+ 'nvmeof.nonexistent', 'nvmeof.nonexistent.test-daemon'
+ )
assert bundle is None
@patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
self._store_spec(cephadm_module, spec)
nvmeof_service = NvmeofService(cephadm_module)
- bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(
+ spec.service_name(), self._daemon_name(spec)
+ )
assert bundle is None
@patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
self._store_spec(cephadm_module, spec)
nvmeof_service = NvmeofService(cephadm_module)
+ daemon_name = self._daemon_name(spec)
# Mock get_cert and get_key to return None for all lookups
with patch.object(cephadm_module.cert_mgr, 'get_cert', return_value=None), \
patch.object(cephadm_module.cert_mgr, 'get_key', return_value=None):
- bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+ bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name(), daemon_name)
assert bundle is not None
assert bundle.server_cert == ''