)
assert rm_mock.call_count == 2
+ def test_cleanup_leftovers_after_cert_source_switch(self, cephadm_module: CephadmOrchestrator, caplog):
+ """Validate the combined cleanup used by post_remove after cert-source switches.
+
+ Scenario:
+ - Service previously used cephadm-signed certs (leftover host-scoped cephadm-signed pair exists)
+ - Service previously used inline certs (leftover non-editable service-scoped objects exist)
+
+ After a cert-source switch, post_remove should be able to clean both kinds of
+ cephadm-managed leftovers regardless of the *current* cert source.
+ """
+ cephadm_module._init_cert_mgr()
+ cm: CertMgr = cephadm_module.cert_mgr
+
+ svc = 'ingress.foo'
+ host = 'host1'
+
+ # Seed a cephadm-signed (host-scoped) pair
+ cm.register_self_signed_cert_key_pair(svc)
+ cm.save_self_signed_cert_key_pair(
+ svc,
+ TLSCredentials(CEPHADM_SELF_GENERATED_CERT_1, CEPHADM_SELF_GENERATED_KEY_2048),
+ host=host,
+ )
+
+ cephadm_cert = cm.self_signed_cert(svc)
+ cephadm_key = cm.self_signed_key(svc)
+ assert cm.get_cert(cephadm_cert, host=host) == CEPHADM_SELF_GENERATED_CERT_1
+ assert cm.get_key(cephadm_key, host=host) == CEPHADM_SELF_GENERATED_KEY_2048
+
+ # Seed inline-saved (service-scoped) objects for the same service
+ cm.save_cert('ingress_ssl_cert', 'inline-cert', service_name=svc, user_made=True, editable=False)
+ cm.save_key('ingress_ssl_key', 'inline-key', service_name=svc, user_made=True, editable=False)
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'inline-cert'
+ assert cm.get_key('ingress_ssl_key', service_name=svc) == 'inline-key'
+
+ # Combined cleanup (as in post_remove)
+ with caplog.at_level(logging.INFO):
+ cm.try_rm_self_signed_cert_key_pair(svc, host)
+ cm.rm_inline_saved_cert_key_pair(
+ 'ingress_ssl_cert',
+ 'ingress_ssl_key',
+ service_name=svc,
+ host=host,
+ )
+
+ # Both categories removed
+ assert cm.get_cert(cephadm_cert, host=host) is None
+ assert cm.get_key(cephadm_key, host=host) is None
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) is None
+ assert cm.get_key('ingress_ssl_key', service_name=svc) is None
+
+ # The INFO log should have been emitted for the cephadm-signed cleanup
+ assert any(
+ f"Removing cephadm-signed cert/key for service: {svc}, host: {host}" in r.getMessage()
+ for r in caplog.records
+ )
+
+ # If referenced creds are present (editable=True), inline cleanup must not remove them.
+ cm.save_cert('ingress_ssl_cert', 'ref-cert', service_name=svc, user_made=True, editable=True)
+ cm.save_key('ingress_ssl_key', 'ref-key', service_name=svc, user_made=True, editable=True)
+ cm.rm_inline_saved_cert_key_pair(
+ 'ingress_ssl_cert',
+ 'ingress_ssl_key',
+ service_name=svc,
+ host=host,
+ )
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+ assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key'
+
+ def test_try_rm_self_signed_cert_key_pair_best_effort_and_logs(self, cephadm_module: CephadmOrchestrator, caplog):
+ cephadm_module._init_cert_mgr()
+ cm: CertMgr = cephadm_module.cert_mgr
+
+ svc = "rgw.foo"
+ host = "host1"
+
+ cm.register_self_signed_cert_key_pair(svc)
+ cm.save_self_signed_cert_key_pair(
+ svc,
+ TLSCredentials(CEPHADM_SELF_GENERATED_CERT_1, CEPHADM_SELF_GENERATED_KEY_2048),
+ host=host,
+ )
+
+ cert_name = cm.self_signed_cert(svc)
+ key_name = cm.self_signed_key(svc)
+ assert cm.get_cert(cert_name, host=host) == CEPHADM_SELF_GENERATED_CERT_1
+ assert cm.get_key(key_name, host=host) == CEPHADM_SELF_GENERATED_KEY_2048
+
+ with caplog.at_level(logging.INFO):
+ cm.try_rm_self_signed_cert_key_pair(svc, host)
+ assert cm.get_cert(cert_name, host=host) is None
+ assert cm.get_key(key_name, host=host) is None
+ assert any("Removing cephadm-signed cert/key" in r.getMessage() for r in caplog.records)
+
+ caplog.clear()
+ with caplog.at_level(logging.INFO):
+ cm.try_rm_self_signed_cert_key_pair(svc, host)
+ assert not any("Removing cephadm-signed cert/key" in r.getMessage() for r in caplog.records)
+
+ def test_rm_inline_saved_cert_key_pair_best_effort_and_editable(self, cephadm_module: CephadmOrchestrator):
+
+ cephadm_module._init_cert_mgr()
+ cm: CertMgr = cephadm_module.cert_mgr
+
+ svc = "nfs.foo"
+
+ # 1) unknown/unregistered names should never raise (best-effort cleanup)
+ cm.rm_inline_saved_cert_key_pair(
+ "totally_unknown_cert",
+ "totally_unknown_key",
+ service_name=svc,
+ host="host1",
+ ca_cert_name="totally_unknown_ca",
+ )
+
+ # 2) inline-saved objects (user_made=True, editable=False) should be removed
+ cm.save_cert("nfs_ssl_cert", "inline-cert", service_name=svc, user_made=True, editable=False)
+ cm.save_key("nfs_ssl_key", "inline-key", service_name=svc, user_made=True, editable=False)
+ cm.save_cert("nfs_ssl_ca_cert", "inline-ca", service_name=svc, user_made=True, editable=False)
+
+ cm.rm_inline_saved_cert_key_pair(
+ "nfs_ssl_cert",
+ "nfs_ssl_key",
+ service_name=svc,
+ host="host1", # host passed even though SERVICE scope (should be ignored by store)
+ ca_cert_name="nfs_ssl_ca_cert",
+ )
+ assert cm.get_cert("nfs_ssl_cert", service_name=svc) is None
+ assert cm.get_key("nfs_ssl_key", service_name=svc) is None
+ assert cm.get_cert("nfs_ssl_ca_cert", service_name=svc) is None
+
+ # 3) referenced/user-managed entries (editable=True) must NOT be removed
+ cm.save_cert("nfs_ssl_cert", "ref-cert", service_name=svc, user_made=True, editable=True)
+ cm.save_key("nfs_ssl_key", "ref-key", service_name=svc, user_made=True, editable=True)
+ cm.save_cert("nfs_ssl_ca_cert", "ref-ca", service_name=svc, user_made=True, editable=True)
+
+ cm.rm_inline_saved_cert_key_pair(
+ "nfs_ssl_cert",
+ "nfs_ssl_key",
+ service_name=svc,
+ host="host1",
+ ca_cert_name="nfs_ssl_ca_cert",
+ )
+ assert cm.get_cert("nfs_ssl_cert", service_name=svc) == "ref-cert"
+ assert cm.get_key("nfs_ssl_key", service_name=svc) == "ref-key"
+ assert cm.get_cert("nfs_ssl_ca_cert", service_name=svc) == "ref-ca"
+
+ # ------------------------------------------------------------------ #
+ # Tests for certificate_source as a dependency + user warning #
+ # ------------------------------------------------------------------ #
+
+ def test_base_get_dependencies_includes_certificate_source(self, cephadm_module: CephadmOrchestrator):
+ """CephadmService.get_dependencies should include certificate_source in deps when ssl=True."""
+ from cephadm.services.cephadmservice import CephadmService
+
+ spec = mock.MagicMock()
+ spec.ssl = True
+ spec.certificate_source = 'cephadm-signed'
+ spec.ssl_cert = None
+ spec.ssl_key = None
+ spec.ssl_ca_cert = None
+
+ deps = CephadmService.get_dependencies(cephadm_module, spec)
+ assert 'certificate_source: cephadm-signed' in deps
+
+ def test_base_get_dependencies_no_certificate_source_when_ssl_false(self, cephadm_module: CephadmOrchestrator):
+ """When ssl=False, get_dependencies should return [] — no certificate_source dep."""
+ from cephadm.services.cephadmservice import CephadmService
+
+ spec = mock.MagicMock()
+ spec.ssl = False
+ spec.certificate_source = 'inline'
+
+ deps = CephadmService.get_dependencies(cephadm_module, spec)
+ assert deps == []
+
+ def test_base_get_dependencies_no_spec(self, cephadm_module: CephadmOrchestrator):
+ """When spec is None, get_dependencies should return []."""
+ from cephadm.services.cephadmservice import CephadmService
+
+ deps = CephadmService.get_dependencies(cephadm_module, None)
+ assert deps == []
+
+ def test_base_get_dependencies_certificate_source_changes_detected(self, cephadm_module: CephadmOrchestrator):
+ """Changing certificate_source should produce different deps, triggering reconfig."""
+ from cephadm.services.cephadmservice import CephadmService
+
+ spec_signed = mock.MagicMock()
+ spec_signed.ssl = True
+ spec_signed.certificate_source = 'cephadm-signed'
+ spec_signed.ssl_cert = None
+ spec_signed.ssl_key = None
+ spec_signed.ssl_ca_cert = None
+
+ spec_ref = mock.MagicMock()
+ spec_ref.ssl = True
+ spec_ref.certificate_source = 'reference'
+ spec_ref.ssl_cert = None
+ spec_ref.ssl_key = None
+ spec_ref.ssl_ca_cert = None
+
+ spec_inline = mock.MagicMock()
+ spec_inline.ssl = True
+ spec_inline.certificate_source = 'inline'
+ spec_inline.ssl_cert = 'some-cert'
+ spec_inline.ssl_key = 'some-key'
+ spec_inline.ssl_ca_cert = None
+
+ deps_signed = CephadmService.get_dependencies(cephadm_module, spec_signed)
+ deps_ref = CephadmService.get_dependencies(cephadm_module, spec_ref)
+ deps_inline = CephadmService.get_dependencies(cephadm_module, spec_inline)
+
+ # All three should differ from each other
+ assert deps_signed != deps_ref
+ assert deps_signed != deps_inline
+ assert deps_ref != deps_inline
+
+ # cephadm-signed <-> reference: previously had identical (empty) deps, now differ
+ assert 'certificate_source: cephadm-signed' in deps_signed
+ assert 'certificate_source: reference' in deps_ref
+
+ def test_base_get_dependencies_includes_cert_key_hashes_with_certificate_source(self, cephadm_module: CephadmOrchestrator):
+ """When ssl=True with inline cert/key, deps should include both certificate_source AND hashes."""
+ from cephadm.services.cephadmservice import CephadmService
+ from cephadm import utils
+
+ spec = mock.MagicMock()
+ spec.ssl = True
+ spec.certificate_source = 'inline'
+ spec.ssl_cert = 'my-cert-data'
+ spec.ssl_key = 'my-key-data'
+ spec.ssl_ca_cert = None
+
+ deps = CephadmService.get_dependencies(cephadm_module, spec)
+ assert 'certificate_source: inline' in deps
+ assert f'ssl_cert: {utils.md5_hash("my-cert-data")}' in deps
+ assert f'ssl_key: {utils.md5_hash("my-key-data")}' in deps
+
+ def test_grafana_get_dependencies_includes_parent_tls_deps(self, cephadm_module: CephadmOrchestrator):
+ """GrafanaService.get_dependencies should include parent TLS deps (certificate_source)
+ since Grafana has user_cert_allowed=True."""
+ from cephadm.services.monitoring import GrafanaService
+
+ spec = mock.MagicMock()
+ spec.ssl = True
+ spec.certificate_source = 'inline'
+ spec.ssl_cert = 'grafana-cert'
+ spec.ssl_key = 'grafana-key'
+ spec.ssl_ca_cert = None
+
+ deps = GrafanaService.get_dependencies(cephadm_module, spec)
+ assert 'certificate_source: inline' in deps
+
+ def test_grafana_get_dependencies_no_tls_deps_when_ssl_false(self, cephadm_module: CephadmOrchestrator):
+ """When ssl=False, GrafanaService should not include any TLS deps from parent."""
+ from cephadm.services.monitoring import GrafanaService
+
+ spec = mock.MagicMock()
+ spec.ssl = False
+ spec.certificate_source = 'cephadm-signed'
+
+ deps = GrafanaService.get_dependencies(cephadm_module, spec)
+ assert not any('certificate_source' in d for d in deps)
+
+ def test_grafana_get_dependencies_consistent_with_and_without_spec(self, cephadm_module: CephadmOrchestrator):
+ """Calling get_dependencies without spec should return no TLS deps (spec=None → [])
+ while calling with spec and ssl=True should include TLS deps."""
+ from cephadm.services.monitoring import GrafanaService
+
+ # Without spec — should not crash and should not include TLS deps
+ deps_no_spec = GrafanaService.get_dependencies(cephadm_module)
+ assert not any('certificate_source' in d for d in deps_no_spec)
+
+ # With spec and ssl=True
+ spec = mock.MagicMock()
+ spec.ssl = True
+ spec.certificate_source = 'reference'
+ spec.ssl_cert = None
+ spec.ssl_key = None
+ spec.ssl_ca_cert = None
+
+ deps_with_spec = GrafanaService.get_dependencies(cephadm_module, spec)
+ assert 'certificate_source: reference' in deps_with_spec
+
+ def test_check_cert_source_warns_on_certificate_source_change(self, cephadm_module: CephadmOrchestrator):
+ """_check_cert_source should return a warning when certificate_source changes."""
+ import datetime
+
+ old_spec = mock.MagicMock()
+ old_spec.certificate_source = 'cephadm-signed'
+ old_spec.service_name.return_value = 'grafana'
+ old_spec.service_type = 'grafana'
+
+ new_spec = mock.MagicMock()
+ new_spec.certificate_source = 'inline'
+ new_spec.service_name.return_value = 'grafana'
+ new_spec.service_type = 'grafana'
+ new_spec.is_using_certificates_source.return_value = False
+
+ # Seed the spec_store so _check_cert_source can find the old spec
+ cephadm_module.spec_store._specs['grafana'] = old_spec
+ cephadm_module.spec_store.spec_created['grafana'] = datetime.datetime.utcnow()
+
+ warning = cephadm_module._check_cert_source(new_spec)
+ assert "certificate_source" in warning
+ assert "cephadm-signed" in warning
+ assert "inline" in warning
+ assert "reconfiguration" in warning
+
+ def test_check_cert_source_no_warning_when_source_unchanged(self, cephadm_module: CephadmOrchestrator):
+ """_check_cert_source should return no warning when certificate_source stays the same."""
+ import datetime
+
+ old_spec = mock.MagicMock()
+ old_spec.certificate_source = 'cephadm-signed'
+ old_spec.service_name.return_value = 'grafana'
+ old_spec.service_type = 'grafana'
+
+ new_spec = mock.MagicMock()
+ new_spec.certificate_source = 'cephadm-signed'
+ new_spec.service_name.return_value = 'grafana'
+ new_spec.service_type = 'grafana'
+ new_spec.is_using_certificates_source.return_value = False
+
+ cephadm_module.spec_store._specs['grafana'] = old_spec
+ cephadm_module.spec_store.spec_created['grafana'] = datetime.datetime.utcnow()
+
+ warning = cephadm_module._check_cert_source(new_spec)
+ assert "certificate_source" not in warning
+
+ def test_check_cert_source_no_warning_for_new_service(self, cephadm_module: CephadmOrchestrator):
+ """_check_cert_source should return no source-change warning for a brand new service."""
+ new_spec = mock.MagicMock()
+ new_spec.certificate_source = 'inline'
+ new_spec.service_name.return_value = 'grafana'
+ new_spec.service_type = 'grafana'
+ new_spec.is_using_certificates_source.return_value = False
+
+ # spec_store does NOT contain 'grafana'
+ warning = cephadm_module._check_cert_source(new_spec)
+ assert "certificate_source" not in warning
+
+ def test_check_cert_source_warning_stacks_with_reference_warning(self, cephadm_module: CephadmOrchestrator):
+ """When switching to 'reference' on a per-host service, both the source-change
+ warning and the reference provisioning instructions should be present (+=)."""
+ import datetime
+ from cephadm.services.service_registry import service_registry
+
+ old_spec = mock.MagicMock()
+ old_spec.certificate_source = 'cephadm-signed'
+ old_spec.service_name.return_value = 'grafana'
+ old_spec.service_type = 'grafana'
+
+ new_spec = mock.MagicMock()
+ new_spec.certificate_source = 'reference'
+ new_spec.service_name.return_value = 'grafana'
+ new_spec.service_type = 'grafana'
+ new_spec.is_using_certificates_source.return_value = True
+
+ cephadm_module.spec_store._specs['grafana'] = old_spec
+ cephadm_module.spec_store.spec_created['grafana'] = datetime.datetime.utcnow()
+
+ # Mock the service registry instance to return a service with HOST scope
+ svc_mock = mock.MagicMock()
+ svc_mock.SCOPE = TLSObjectScope.HOST
+ svc_mock.cert_name = 'grafana_ssl_cert'
+ svc_mock.key_name = 'grafana_ssl_key'
+ with mock.patch.object(service_registry, 'get_service', return_value=svc_mock):
+ warning = cephadm_module._check_cert_source(new_spec)
+
+ # Both warnings should be present
+ assert "certificate_source" in warning
+ assert "cephadm-signed" in warning
+ assert "reference" in warning
+ assert "per-host certificates" in warning
+ assert "ceph orch certmgr cert set" in warning
+
+ def test_check_cert_source_all_transitions_warn(self, cephadm_module: CephadmOrchestrator):
+ """Every cross-source transition should produce a certificate_source warning."""
+ import datetime
+
+ sources = ['cephadm-signed', 'inline', 'reference']
+
+ for old_source in sources:
+ for new_source in sources:
+ if old_source == new_source:
+ continue
+
+ old_spec = mock.MagicMock()
+ old_spec.certificate_source = old_source
+ old_spec.service_name.return_value = 'rgw.foo'
+ old_spec.service_type = 'rgw'
+
+ new_spec = mock.MagicMock()
+ new_spec.certificate_source = new_source
+ new_spec.service_name.return_value = 'rgw.foo'
+ new_spec.service_type = 'rgw'
+ new_spec.is_using_certificates_source.return_value = False
+
+ cephadm_module.spec_store._specs['rgw.foo'] = old_spec
+ cephadm_module.spec_store.spec_created['rgw.foo'] = datetime.datetime.utcnow()
+
+ warning = cephadm_module._check_cert_source(new_spec)
+ assert "certificate_source" in warning, \
+ f"Expected warning for {old_source} -> {new_source}, got: {warning!r}"
+ assert old_source in warning
+ assert new_source in warning
+
+ # ------------------------------------------------------------------ #
+ # Cleanup tests for certificate_source transitions #
+ # #
+ # These tests verify the reconciliation logic in #
+ # get_certificates_generic: for each cross-source transition we #
+ # seed the "old" cert artifacts and then call the cleanup routines #
+ # exactly as get_certificates_generic does, asserting that: #
+ # - stale artifacts are removed #
+ # - user-provisioned reference certs (editable=True) are NEVER #
+ # removed #
+ # ------------------------------------------------------------------ #
+
+ def _seed_inline_certs(self, cm, svc, host):
+ """Seed inline-saved certs (user_made=True, editable=False) for a service-scoped service."""
+ cm.save_cert('ingress_ssl_cert', 'inline-cert', service_name=svc, user_made=True, editable=False)
+ cm.save_key('ingress_ssl_key', 'inline-key', service_name=svc, user_made=True, editable=False)
+
+ def _seed_cephadm_signed_certs(self, cm, svc, host):
+ """Seed cephadm-signed certs for a service."""
+ cm.register_self_signed_cert_key_pair(svc)
+ cm.save_self_signed_cert_key_pair(
+ svc,
+ TLSCredentials(CEPHADM_SELF_GENERATED_CERT_1, CEPHADM_SELF_GENERATED_KEY_2048),
+ host=host,
+ )
+
+ def _seed_reference_certs(self, cm, svc):
+ """Seed user-provisioned reference certs (editable=True) for a service."""
+ cm.save_cert('ingress_ssl_cert', 'ref-cert', service_name=svc, user_made=True, editable=True)
+ cm.save_key('ingress_ssl_key', 'ref-key', service_name=svc, user_made=True, editable=True)
+
+ def _run_reconciliation_cleanup(self, cm, cert_source, svc, host,
+ cert_name='ingress_ssl_cert', key_name='ingress_ssl_key'):
+ """Simulate the reconciliation cleanup from get_certificates_generic."""
+ if cert_source in ('reference', 'cephadm-signed'):
+ cm.rm_inline_saved_cert_key_pair(
+ cert_name, key_name,
+ service_name=svc, host=host,
+ )
+ if cert_source != 'cephadm-signed':
+ cm.try_rm_self_signed_cert_key_pair(svc, host)
+
+ def test_cleanup_inline_to_cephadm_signed(self, cephadm_module: CephadmOrchestrator):
+ """inline -> cephadm-signed: inline-saved certs removed, reference certs untouched."""
+ cephadm_module._init_cert_mgr()
+ cm = cephadm_module.cert_mgr
+ svc, host = 'ingress.foo', 'host1'
+
+ self._seed_inline_certs(cm, svc, host)
+ self._seed_reference_certs(cm, svc) # should survive
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert' # editable overwrites non-editable
+
+ # Re-seed inline on top (simulating actual inline-saved state before switch)
+ cm.save_cert('ingress_ssl_cert', 'inline-cert', service_name=svc, user_made=True, editable=False)
+ cm.save_key('ingress_ssl_key', 'inline-key', service_name=svc, user_made=True, editable=False)
+
+ self._run_reconciliation_cleanup(cm, 'cephadm-signed', svc, host)
+
+ # Inline-saved removed
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) is None
+ assert cm.get_key('ingress_ssl_key', service_name=svc) is None
+
+ def test_cleanup_inline_to_reference(self, cephadm_module: CephadmOrchestrator):
+ """inline -> reference: inline-saved removed, cephadm-signed removed, reference survives."""
+ cephadm_module._init_cert_mgr()
+ cm = cephadm_module.cert_mgr
+ svc, host = 'ingress.foo', 'host1'
+
+ self._seed_inline_certs(cm, svc, host)
+ self._seed_cephadm_signed_certs(cm, svc, host)
+
+ cephadm_cert = cm.self_signed_cert(svc)
+ cephadm_key = cm.self_signed_key(svc)
+
+ self._run_reconciliation_cleanup(cm, 'reference', svc, host)
+
+ # Inline-saved removed
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) is None
+ assert cm.get_key('ingress_ssl_key', service_name=svc) is None
+ # Cephadm-signed removed
+ assert cm.get_cert(cephadm_cert, host=host) is None
+ assert cm.get_key(cephadm_key, host=host) is None
+
+ def test_cleanup_cephadm_signed_to_inline(self, cephadm_module: CephadmOrchestrator):
+ """cephadm-signed -> inline: cephadm-signed removed."""
+ cephadm_module._init_cert_mgr()
+ cm = cephadm_module.cert_mgr
+ svc, host = 'ingress.foo', 'host1'
+
+ self._seed_cephadm_signed_certs(cm, svc, host)
+ cephadm_cert = cm.self_signed_cert(svc)
+ cephadm_key = cm.self_signed_key(svc)
+
+ self._run_reconciliation_cleanup(cm, 'inline', svc, host)
+
+ # Cephadm-signed removed
+ assert cm.get_cert(cephadm_cert, host=host) is None
+ assert cm.get_key(cephadm_key, host=host) is None
+
+ def test_cleanup_cephadm_signed_to_reference(self, cephadm_module: CephadmOrchestrator):
+ """cephadm-signed -> reference: cephadm-signed removed, reference untouched."""
+ cephadm_module._init_cert_mgr()
+ cm = cephadm_module.cert_mgr
+ svc, host = 'ingress.foo', 'host1'
+
+ self._seed_cephadm_signed_certs(cm, svc, host)
+ self._seed_reference_certs(cm, svc)
+
+ cephadm_cert = cm.self_signed_cert(svc)
+ cephadm_key = cm.self_signed_key(svc)
+
+ self._run_reconciliation_cleanup(cm, 'reference', svc, host)
+
+ # Cephadm-signed removed
+ assert cm.get_cert(cephadm_cert, host=host) is None
+ assert cm.get_key(cephadm_key, host=host) is None
+ # Reference survives
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+ assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key'
+
+ def test_cleanup_reference_to_inline(self, cephadm_module: CephadmOrchestrator):
+ """reference -> inline: cephadm-signed removed, reference certs NOT removed."""
+ cephadm_module._init_cert_mgr()
+ cm = cephadm_module.cert_mgr
+ svc, host = 'ingress.foo', 'host1'
+
+ self._seed_reference_certs(cm, svc)
+ self._seed_cephadm_signed_certs(cm, svc, host)
+
+ cephadm_cert = cm.self_signed_cert(svc)
+ cephadm_key = cm.self_signed_key(svc)
+
+ self._run_reconciliation_cleanup(cm, 'inline', svc, host)
+
+ # Cephadm-signed removed
+ assert cm.get_cert(cephadm_cert, host=host) is None
+ assert cm.get_key(cephadm_key, host=host) is None
+ # Reference NOT removed (editable=True)
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+ assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key'
+
+ def test_cleanup_reference_to_cephadm_signed(self, cephadm_module: CephadmOrchestrator):
+ """reference -> cephadm-signed: reference certs NOT removed (editable=True preserved)."""
+ cephadm_module._init_cert_mgr()
+ cm = cephadm_module.cert_mgr
+ svc, host = 'ingress.foo', 'host1'
+
+ self._seed_reference_certs(cm, svc)
+
+ self._run_reconciliation_cleanup(cm, 'cephadm-signed', svc, host)
+
+ # Reference NOT removed — rm_inline_saved skips editable=True
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+ assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key'
+
+ def test_cleanup_reference_never_removed_in_any_transition(self, cephadm_module: CephadmOrchestrator):
+ """Reference certs (editable=True) must survive ALL transitions."""
+ cephadm_module._init_cert_mgr()
+ cm = cephadm_module.cert_mgr
+ svc, host = 'ingress.foo', 'host1'
+
+ for target_source in ['inline', 'cephadm-signed', 'reference']:
+ # Reset: seed fresh reference certs
+ cm.save_cert('ingress_ssl_cert', 'ref-cert', service_name=svc, user_made=True, editable=True)
+ cm.save_key('ingress_ssl_key', 'ref-key', service_name=svc, user_made=True, editable=True)
+
+ self._run_reconciliation_cleanup(cm, target_source, svc, host)
+
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert', \
+ f"Reference cert removed during transition to {target_source}"
+ assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key', \
+ f"Reference key removed during transition to {target_source}"
+
+ # ------------------------------------------------------------------ #
+ # post_remove cleanup tests #
+ # #
+ # These tests verify that CephadmService.post_remove correctly #
+ # cleans up TLS artifacts when a daemon is removed, respecting #
+ # scope, remaining daemons, and reference cert preservation. #
+ # ------------------------------------------------------------------ #
+
+ def test_post_remove_cleans_cephadm_signed_regardless_of_source(self, cephadm_module: CephadmOrchestrator):
+ """post_remove always calls try_rm_self_signed_cert_key_pair, even when
+ the current certificate_source is 'inline' or 'reference'."""
+ cephadm_module._init_cert_mgr()
+ cm = cephadm_module.cert_mgr
+ svc, host = 'ingress.foo', 'host1'
+
+ for current_source in ['inline', 'reference', 'cephadm-signed']:
+ # Seed cephadm-signed leftovers
+ self._seed_cephadm_signed_certs(cm, svc, host)
+ cephadm_cert = cm.self_signed_cert(svc)
+ cephadm_key = cm.self_signed_key(svc)
+ assert cm.get_cert(cephadm_cert, host=host) is not None
+
+ # post_remove always does try_rm_self_signed unconditionally
+ cm.try_rm_self_signed_cert_key_pair(svc, host)
+
+ assert cm.get_cert(cephadm_cert, host=host) is None, \
+ f"cephadm-signed cert not cleaned when source={current_source}"
+ assert cm.get_key(cephadm_key, host=host) is None, \
+ f"cephadm-signed key not cleaned when source={current_source}"
+
+ def test_post_remove_inline_saved_cleanup_service_scope(self, cephadm_module: CephadmOrchestrator):
+ """For SERVICE-scoped services, inline-saved certs are only removed when
+ it's the last daemon (other_daemons_in_service=False)."""
+ cephadm_module._init_cert_mgr()
+ cm = cephadm_module.cert_mgr
+ svc = 'ingress.foo'
+
+ # Seed inline-saved (service scope: service_name, host=None)
+ cm.save_cert('ingress_ssl_cert', 'inline-cert', service_name=svc, user_made=True, editable=False)
+ cm.save_key('ingress_ssl_key', 'inline-key', service_name=svc, user_made=True, editable=False)
+
+ # Simulate: other daemons still exist → cleanup should NOT happen
+ # (post_remove returns early if other_daemons_in_service=True for SERVICE scope)
+ # Just verify the certs are still there (no cleanup call made)
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'inline-cert'
+
+ # Simulate: last daemon removed → cleanup should happen (service_name scope, host=None)
+ cm.rm_inline_saved_cert_key_pair(
+ 'ingress_ssl_cert', 'ingress_ssl_key',
+ service_name=svc, host=None,
+ )
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) is None
+ assert cm.get_key('ingress_ssl_key', service_name=svc) is None
+
+ def test_post_remove_inline_saved_cleanup_host_scope(self, cephadm_module: CephadmOrchestrator):
+ """For HOST-scoped services (like grafana), inline-saved certs are removed
+ per-host when the last daemon on that host is removed."""
+ cephadm_module._init_cert_mgr()
+ cm = cephadm_module.cert_mgr
+ svc = 'grafana'
+ host1, host2 = 'host1', 'host2'
+
+ # Seed inline-saved per host
+ cm.save_cert('grafana_ssl_cert', 'cert-host1', host=host1, user_made=True, editable=False)
+ cm.save_cert('grafana_ssl_cert', 'cert-host2', host=host2, user_made=True, editable=False)
+
+ # Remove from host1 only
+ cm.rm_inline_saved_cert_key_pair(
+ 'grafana_ssl_cert', 'grafana_ssl_key',
+ service_name=svc, host=host1,
+ )
+ # host1 removed, host2 untouched
+ assert cm.get_cert('grafana_ssl_cert', host=host1) is None
+ assert cm.get_cert('grafana_ssl_cert', host=host2) == 'cert-host2'
+
+ def test_post_remove_preserves_reference_certs(self, cephadm_module: CephadmOrchestrator):
+ """post_remove must never remove user-provisioned reference certs (editable=True),
+ even when the service is fully removed."""
+ cephadm_module._init_cert_mgr()
+ cm = cephadm_module.cert_mgr
+ svc, host = 'ingress.foo', 'host1'
+
+ # Seed reference certs
+ self._seed_reference_certs(cm, svc)
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+
+ # Simulate full post_remove cleanup: both try_rm_self_signed + rm_inline_saved
+ cm.try_rm_self_signed_cert_key_pair(svc, host)
+ cm.rm_inline_saved_cert_key_pair(
+ 'ingress_ssl_cert', 'ingress_ssl_key',
+ service_name=svc, host=None,
+ )
+
+ # Reference certs must survive
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+ assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key'
+
+ def test_post_remove_cleans_all_stale_artifacts_on_full_removal(self, cephadm_module: CephadmOrchestrator):
+ """When the last daemon is removed, both cephadm-signed and inline-saved
+ artifacts should be cleaned up, but reference certs should survive."""
+ cephadm_module._init_cert_mgr()
+ cm = cephadm_module.cert_mgr
+ svc, host = 'ingress.foo', 'host1'
+
+ # Seed all three types of artifacts
+ self._seed_cephadm_signed_certs(cm, svc, host)
+ self._seed_inline_certs(cm, svc, host)
+ self._seed_reference_certs(cm, svc)
+
+ cephadm_cert = cm.self_signed_cert(svc)
+ cephadm_key = cm.self_signed_key(svc)
+
+ # The reference save (editable=True) overwrites the inline save (editable=False)
+ # for the same key. Re-seed inline to have both in a testable state:
+ # First verify reference is in place
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+
+ # Simulate full post_remove (as the code does):
+ # 1) Always clean cephadm-signed
+ cm.try_rm_self_signed_cert_key_pair(svc, host)
+ # 2) Clean inline-saved (service scope, last daemon)
+ cm.rm_inline_saved_cert_key_pair(
+ 'ingress_ssl_cert', 'ingress_ssl_key',
+ service_name=svc, host=None,
+ )
+
+ # Cephadm-signed removed
+ assert cm.get_cert(cephadm_cert, host=host) is None
+ assert cm.get_key(cephadm_key, host=host) is None
+
+ # Reference certs survive (editable=True → rm_inline_saved skips them)
+ assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+ assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key'
+
class MockTLSObject(TLSObjectProtocol):
STORAGE_PREFIX = "mocktls"
self.assertEqual(scope, TLSObjectScope.GLOBAL)
self.assertEqual(target, None)
+ def test_get_tlsobject_if_exists(self):
+ """get_tlsobject_if_exists should never raise and return None on missing/invalid access."""
+ # Unknown entity
+ assert self.store.get_tlsobject_if_exists("unknown_entity") is None
+
+ # Known but missing required target
+ assert self.store.get_tlsobject_if_exists("per_host1") is None
+ assert self.store.get_tlsobject_if_exists("per_service1") is None
+
+ # Existing object returns normally
+ self.store.save_tlsobject("per_host1", "cert_data", host="my_host")
+ obj = self.store.get_tlsobject_if_exists("per_host1", host="my_host")
+ assert obj is not None
+ assert obj.data == "cert_data"
+
def test_list_tlsobjects(self):
self.store.save_tlsobject("global_cert_1", "cert_data1")
self.store.save_tlsobject("global_cert_2", "cert_data2")