]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: adding UT for this new functionality (leftover cleanup)
authorRedouane Kachach <rkachach@ibm.com>
Wed, 11 Feb 2026 15:55:21 +0000 (16:55 +0100)
committerRedouane Kachach <rkachach@ibm.com>
Thu, 14 May 2026 09:08:17 +0000 (11:08 +0200)
Fixes: https://tracker.ceph.com/issues/75009
Signed-off-by: Redouane Kachach <rkachach@ibm.com>
src/pybind/mgr/cephadm/tests/test_certmgr.py

index 2fc1c06baa9017f2faf013ae4a27ce231a14385d..e4283300667436c7faa0d7c552d120d096f9b0ab 100644 (file)
@@ -1117,6 +1117,721 @@ class TestCertMgr(object):
             )
             assert rm_mock.call_count == 2
 
+    def test_cleanup_leftovers_after_cert_source_switch(self, cephadm_module: CephadmOrchestrator, caplog):
+        """Validate the combined cleanup used by post_remove after cert-source switches.
+
+        Scenario:
+        - Service previously used cephadm-signed certs (leftover host-scoped cephadm-signed pair exists)
+        - Service previously used inline certs (leftover non-editable service-scoped objects exist)
+
+        After a cert-source switch, post_remove should be able to clean both kinds of
+        cephadm-managed leftovers regardless of the *current* cert source.
+        """
+        cephadm_module._init_cert_mgr()
+        cm: CertMgr = cephadm_module.cert_mgr
+
+        svc = 'ingress.foo'
+        host = 'host1'
+
+        # Seed a cephadm-signed (host-scoped) pair
+        cm.register_self_signed_cert_key_pair(svc)
+        cm.save_self_signed_cert_key_pair(
+            svc,
+            TLSCredentials(CEPHADM_SELF_GENERATED_CERT_1, CEPHADM_SELF_GENERATED_KEY_2048),
+            host=host,
+        )
+
+        cephadm_cert = cm.self_signed_cert(svc)
+        cephadm_key = cm.self_signed_key(svc)
+        assert cm.get_cert(cephadm_cert, host=host) == CEPHADM_SELF_GENERATED_CERT_1
+        assert cm.get_key(cephadm_key, host=host) == CEPHADM_SELF_GENERATED_KEY_2048
+
+        # Seed inline-saved (service-scoped) objects for the same service
+        cm.save_cert('ingress_ssl_cert', 'inline-cert', service_name=svc, user_made=True, editable=False)
+        cm.save_key('ingress_ssl_key', 'inline-key', service_name=svc, user_made=True, editable=False)
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'inline-cert'
+        assert cm.get_key('ingress_ssl_key', service_name=svc) == 'inline-key'
+
+        # Combined cleanup (as in post_remove)
+        with caplog.at_level(logging.INFO):
+            cm.try_rm_self_signed_cert_key_pair(svc, host)
+        cm.rm_inline_saved_cert_key_pair(
+            'ingress_ssl_cert',
+            'ingress_ssl_key',
+            service_name=svc,
+            host=host,
+        )
+
+        # Both categories removed
+        assert cm.get_cert(cephadm_cert, host=host) is None
+        assert cm.get_key(cephadm_key, host=host) is None
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) is None
+        assert cm.get_key('ingress_ssl_key', service_name=svc) is None
+
+        # The INFO log should have been emitted for the cephadm-signed cleanup
+        assert any(
+            f"Removing cephadm-signed cert/key for service: {svc}, host: {host}" in r.getMessage()
+            for r in caplog.records
+        )
+
+        # If referenced creds are present (editable=True), inline cleanup must not remove them.
+        cm.save_cert('ingress_ssl_cert', 'ref-cert', service_name=svc, user_made=True, editable=True)
+        cm.save_key('ingress_ssl_key', 'ref-key', service_name=svc, user_made=True, editable=True)
+        cm.rm_inline_saved_cert_key_pair(
+            'ingress_ssl_cert',
+            'ingress_ssl_key',
+            service_name=svc,
+            host=host,
+        )
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+        assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key'
+
+    def test_try_rm_self_signed_cert_key_pair_best_effort_and_logs(self, cephadm_module: CephadmOrchestrator, caplog):
+        cephadm_module._init_cert_mgr()
+        cm: CertMgr = cephadm_module.cert_mgr
+
+        svc = "rgw.foo"
+        host = "host1"
+
+        cm.register_self_signed_cert_key_pair(svc)
+        cm.save_self_signed_cert_key_pair(
+            svc,
+            TLSCredentials(CEPHADM_SELF_GENERATED_CERT_1, CEPHADM_SELF_GENERATED_KEY_2048),
+            host=host,
+        )
+
+        cert_name = cm.self_signed_cert(svc)
+        key_name = cm.self_signed_key(svc)
+        assert cm.get_cert(cert_name, host=host) == CEPHADM_SELF_GENERATED_CERT_1
+        assert cm.get_key(key_name, host=host) == CEPHADM_SELF_GENERATED_KEY_2048
+
+        with caplog.at_level(logging.INFO):
+            cm.try_rm_self_signed_cert_key_pair(svc, host)
+        assert cm.get_cert(cert_name, host=host) is None
+        assert cm.get_key(key_name, host=host) is None
+        assert any("Removing cephadm-signed cert/key" in r.getMessage() for r in caplog.records)
+
+        caplog.clear()
+        with caplog.at_level(logging.INFO):
+            cm.try_rm_self_signed_cert_key_pair(svc, host)
+        assert not any("Removing cephadm-signed cert/key" in r.getMessage() for r in caplog.records)
+
+    def test_rm_inline_saved_cert_key_pair_best_effort_and_editable(self, cephadm_module: CephadmOrchestrator):
+
+        cephadm_module._init_cert_mgr()
+        cm: CertMgr = cephadm_module.cert_mgr
+
+        svc = "nfs.foo"
+
+        # 1) unknown/unregistered names should never raise (best-effort cleanup)
+        cm.rm_inline_saved_cert_key_pair(
+            "totally_unknown_cert",
+            "totally_unknown_key",
+            service_name=svc,
+            host="host1",
+            ca_cert_name="totally_unknown_ca",
+        )
+
+        # 2) inline-saved objects (user_made=True, editable=False) should be removed
+        cm.save_cert("nfs_ssl_cert", "inline-cert", service_name=svc, user_made=True, editable=False)
+        cm.save_key("nfs_ssl_key", "inline-key", service_name=svc, user_made=True, editable=False)
+        cm.save_cert("nfs_ssl_ca_cert", "inline-ca", service_name=svc, user_made=True, editable=False)
+
+        cm.rm_inline_saved_cert_key_pair(
+            "nfs_ssl_cert",
+            "nfs_ssl_key",
+            service_name=svc,
+            host="host1",  # host passed even though SERVICE scope (should be ignored by store)
+            ca_cert_name="nfs_ssl_ca_cert",
+        )
+        assert cm.get_cert("nfs_ssl_cert", service_name=svc) is None
+        assert cm.get_key("nfs_ssl_key", service_name=svc) is None
+        assert cm.get_cert("nfs_ssl_ca_cert", service_name=svc) is None
+
+        # 3) referenced/user-managed entries (editable=True) must NOT be removed
+        cm.save_cert("nfs_ssl_cert", "ref-cert", service_name=svc, user_made=True, editable=True)
+        cm.save_key("nfs_ssl_key", "ref-key", service_name=svc, user_made=True, editable=True)
+        cm.save_cert("nfs_ssl_ca_cert", "ref-ca", service_name=svc, user_made=True, editable=True)
+
+        cm.rm_inline_saved_cert_key_pair(
+            "nfs_ssl_cert",
+            "nfs_ssl_key",
+            service_name=svc,
+            host="host1",
+            ca_cert_name="nfs_ssl_ca_cert",
+        )
+        assert cm.get_cert("nfs_ssl_cert", service_name=svc) == "ref-cert"
+        assert cm.get_key("nfs_ssl_key", service_name=svc) == "ref-key"
+        assert cm.get_cert("nfs_ssl_ca_cert", service_name=svc) == "ref-ca"
+
+    # ------------------------------------------------------------------ #
+    #  Tests for certificate_source as a dependency + user warning        #
+    # ------------------------------------------------------------------ #
+
+    def test_base_get_dependencies_includes_certificate_source(self, cephadm_module: CephadmOrchestrator):
+        """CephadmService.get_dependencies should include certificate_source in deps when ssl=True."""
+        from cephadm.services.cephadmservice import CephadmService
+
+        spec = mock.MagicMock()
+        spec.ssl = True
+        spec.certificate_source = 'cephadm-signed'
+        spec.ssl_cert = None
+        spec.ssl_key = None
+        spec.ssl_ca_cert = None
+
+        deps = CephadmService.get_dependencies(cephadm_module, spec)
+        assert 'certificate_source: cephadm-signed' in deps
+
+    def test_base_get_dependencies_no_certificate_source_when_ssl_false(self, cephadm_module: CephadmOrchestrator):
+        """When ssl=False, get_dependencies should return [] — no certificate_source dep."""
+        from cephadm.services.cephadmservice import CephadmService
+
+        spec = mock.MagicMock()
+        spec.ssl = False
+        spec.certificate_source = 'inline'
+
+        deps = CephadmService.get_dependencies(cephadm_module, spec)
+        assert deps == []
+
+    def test_base_get_dependencies_no_spec(self, cephadm_module: CephadmOrchestrator):
+        """When spec is None, get_dependencies should return []."""
+        from cephadm.services.cephadmservice import CephadmService
+
+        deps = CephadmService.get_dependencies(cephadm_module, None)
+        assert deps == []
+
+    def test_base_get_dependencies_certificate_source_changes_detected(self, cephadm_module: CephadmOrchestrator):
+        """Changing certificate_source should produce different deps, triggering reconfig."""
+        from cephadm.services.cephadmservice import CephadmService
+
+        spec_signed = mock.MagicMock()
+        spec_signed.ssl = True
+        spec_signed.certificate_source = 'cephadm-signed'
+        spec_signed.ssl_cert = None
+        spec_signed.ssl_key = None
+        spec_signed.ssl_ca_cert = None
+
+        spec_ref = mock.MagicMock()
+        spec_ref.ssl = True
+        spec_ref.certificate_source = 'reference'
+        spec_ref.ssl_cert = None
+        spec_ref.ssl_key = None
+        spec_ref.ssl_ca_cert = None
+
+        spec_inline = mock.MagicMock()
+        spec_inline.ssl = True
+        spec_inline.certificate_source = 'inline'
+        spec_inline.ssl_cert = 'some-cert'
+        spec_inline.ssl_key = 'some-key'
+        spec_inline.ssl_ca_cert = None
+
+        deps_signed = CephadmService.get_dependencies(cephadm_module, spec_signed)
+        deps_ref = CephadmService.get_dependencies(cephadm_module, spec_ref)
+        deps_inline = CephadmService.get_dependencies(cephadm_module, spec_inline)
+
+        # All three should differ from each other
+        assert deps_signed != deps_ref
+        assert deps_signed != deps_inline
+        assert deps_ref != deps_inline
+
+        # cephadm-signed <-> reference: previously had identical (empty) deps, now differ
+        assert 'certificate_source: cephadm-signed' in deps_signed
+        assert 'certificate_source: reference' in deps_ref
+
+    def test_base_get_dependencies_includes_cert_key_hashes_with_certificate_source(self, cephadm_module: CephadmOrchestrator):
+        """When ssl=True with inline cert/key, deps should include both certificate_source AND hashes."""
+        from cephadm.services.cephadmservice import CephadmService
+        from cephadm import utils
+
+        spec = mock.MagicMock()
+        spec.ssl = True
+        spec.certificate_source = 'inline'
+        spec.ssl_cert = 'my-cert-data'
+        spec.ssl_key = 'my-key-data'
+        spec.ssl_ca_cert = None
+
+        deps = CephadmService.get_dependencies(cephadm_module, spec)
+        assert 'certificate_source: inline' in deps
+        assert f'ssl_cert: {utils.md5_hash("my-cert-data")}' in deps
+        assert f'ssl_key: {utils.md5_hash("my-key-data")}' in deps
+
+    def test_grafana_get_dependencies_includes_parent_tls_deps(self, cephadm_module: CephadmOrchestrator):
+        """GrafanaService.get_dependencies should include parent TLS deps (certificate_source)
+        since Grafana has user_cert_allowed=True."""
+        from cephadm.services.monitoring import GrafanaService
+
+        spec = mock.MagicMock()
+        spec.ssl = True
+        spec.certificate_source = 'inline'
+        spec.ssl_cert = 'grafana-cert'
+        spec.ssl_key = 'grafana-key'
+        spec.ssl_ca_cert = None
+
+        deps = GrafanaService.get_dependencies(cephadm_module, spec)
+        assert 'certificate_source: inline' in deps
+
+    def test_grafana_get_dependencies_no_tls_deps_when_ssl_false(self, cephadm_module: CephadmOrchestrator):
+        """When ssl=False, GrafanaService should not include any TLS deps from parent."""
+        from cephadm.services.monitoring import GrafanaService
+
+        spec = mock.MagicMock()
+        spec.ssl = False
+        spec.certificate_source = 'cephadm-signed'
+
+        deps = GrafanaService.get_dependencies(cephadm_module, spec)
+        assert not any('certificate_source' in d for d in deps)
+
+    def test_grafana_get_dependencies_consistent_with_and_without_spec(self, cephadm_module: CephadmOrchestrator):
+        """Calling get_dependencies without spec should return no TLS deps (spec=None → [])
+        while calling with spec and ssl=True should include TLS deps."""
+        from cephadm.services.monitoring import GrafanaService
+
+        # Without spec — should not crash and should not include TLS deps
+        deps_no_spec = GrafanaService.get_dependencies(cephadm_module)
+        assert not any('certificate_source' in d for d in deps_no_spec)
+
+        # With spec and ssl=True
+        spec = mock.MagicMock()
+        spec.ssl = True
+        spec.certificate_source = 'reference'
+        spec.ssl_cert = None
+        spec.ssl_key = None
+        spec.ssl_ca_cert = None
+
+        deps_with_spec = GrafanaService.get_dependencies(cephadm_module, spec)
+        assert 'certificate_source: reference' in deps_with_spec
+
+    def test_check_cert_source_warns_on_certificate_source_change(self, cephadm_module: CephadmOrchestrator):
+        """_check_cert_source should return a warning when certificate_source changes."""
+        import datetime
+
+        old_spec = mock.MagicMock()
+        old_spec.certificate_source = 'cephadm-signed'
+        old_spec.service_name.return_value = 'grafana'
+        old_spec.service_type = 'grafana'
+
+        new_spec = mock.MagicMock()
+        new_spec.certificate_source = 'inline'
+        new_spec.service_name.return_value = 'grafana'
+        new_spec.service_type = 'grafana'
+        new_spec.is_using_certificates_source.return_value = False
+
+        # Seed the spec_store so _check_cert_source can find the old spec
+        cephadm_module.spec_store._specs['grafana'] = old_spec
+        cephadm_module.spec_store.spec_created['grafana'] = datetime.datetime.utcnow()
+
+        warning = cephadm_module._check_cert_source(new_spec)
+        assert "certificate_source" in warning
+        assert "cephadm-signed" in warning
+        assert "inline" in warning
+        assert "reconfiguration" in warning
+
+    def test_check_cert_source_no_warning_when_source_unchanged(self, cephadm_module: CephadmOrchestrator):
+        """_check_cert_source should return no warning when certificate_source stays the same."""
+        import datetime
+
+        old_spec = mock.MagicMock()
+        old_spec.certificate_source = 'cephadm-signed'
+        old_spec.service_name.return_value = 'grafana'
+        old_spec.service_type = 'grafana'
+
+        new_spec = mock.MagicMock()
+        new_spec.certificate_source = 'cephadm-signed'
+        new_spec.service_name.return_value = 'grafana'
+        new_spec.service_type = 'grafana'
+        new_spec.is_using_certificates_source.return_value = False
+
+        cephadm_module.spec_store._specs['grafana'] = old_spec
+        cephadm_module.spec_store.spec_created['grafana'] = datetime.datetime.utcnow()
+
+        warning = cephadm_module._check_cert_source(new_spec)
+        assert "certificate_source" not in warning
+
+    def test_check_cert_source_no_warning_for_new_service(self, cephadm_module: CephadmOrchestrator):
+        """_check_cert_source should return no source-change warning for a brand new service."""
+        new_spec = mock.MagicMock()
+        new_spec.certificate_source = 'inline'
+        new_spec.service_name.return_value = 'grafana'
+        new_spec.service_type = 'grafana'
+        new_spec.is_using_certificates_source.return_value = False
+
+        # spec_store does NOT contain 'grafana'
+        warning = cephadm_module._check_cert_source(new_spec)
+        assert "certificate_source" not in warning
+
+    def test_check_cert_source_warning_stacks_with_reference_warning(self, cephadm_module: CephadmOrchestrator):
+        """When switching to 'reference' on a per-host service, both the source-change
+        warning and the reference provisioning instructions should be present (+=)."""
+        import datetime
+        from cephadm.services.service_registry import service_registry
+
+        old_spec = mock.MagicMock()
+        old_spec.certificate_source = 'cephadm-signed'
+        old_spec.service_name.return_value = 'grafana'
+        old_spec.service_type = 'grafana'
+
+        new_spec = mock.MagicMock()
+        new_spec.certificate_source = 'reference'
+        new_spec.service_name.return_value = 'grafana'
+        new_spec.service_type = 'grafana'
+        new_spec.is_using_certificates_source.return_value = True
+
+        cephadm_module.spec_store._specs['grafana'] = old_spec
+        cephadm_module.spec_store.spec_created['grafana'] = datetime.datetime.utcnow()
+
+        # Mock the service registry instance to return a service with HOST scope
+        svc_mock = mock.MagicMock()
+        svc_mock.SCOPE = TLSObjectScope.HOST
+        svc_mock.cert_name = 'grafana_ssl_cert'
+        svc_mock.key_name = 'grafana_ssl_key'
+        with mock.patch.object(service_registry, 'get_service', return_value=svc_mock):
+            warning = cephadm_module._check_cert_source(new_spec)
+
+        # Both warnings should be present
+        assert "certificate_source" in warning
+        assert "cephadm-signed" in warning
+        assert "reference" in warning
+        assert "per-host certificates" in warning
+        assert "ceph orch certmgr cert set" in warning
+
+    def test_check_cert_source_all_transitions_warn(self, cephadm_module: CephadmOrchestrator):
+        """Every cross-source transition should produce a certificate_source warning."""
+        import datetime
+
+        sources = ['cephadm-signed', 'inline', 'reference']
+
+        for old_source in sources:
+            for new_source in sources:
+                if old_source == new_source:
+                    continue
+
+                old_spec = mock.MagicMock()
+                old_spec.certificate_source = old_source
+                old_spec.service_name.return_value = 'rgw.foo'
+                old_spec.service_type = 'rgw'
+
+                new_spec = mock.MagicMock()
+                new_spec.certificate_source = new_source
+                new_spec.service_name.return_value = 'rgw.foo'
+                new_spec.service_type = 'rgw'
+                new_spec.is_using_certificates_source.return_value = False
+
+                cephadm_module.spec_store._specs['rgw.foo'] = old_spec
+                cephadm_module.spec_store.spec_created['rgw.foo'] = datetime.datetime.utcnow()
+
+                warning = cephadm_module._check_cert_source(new_spec)
+                assert "certificate_source" in warning, \
+                    f"Expected warning for {old_source} -> {new_source}, got: {warning!r}"
+                assert old_source in warning
+                assert new_source in warning
+
+    # ------------------------------------------------------------------ #
+    #  Cleanup tests for certificate_source transitions                   #
+    #                                                                     #
+    #  These tests verify the reconciliation logic in                     #
+    #  get_certificates_generic: for each cross-source transition we      #
+    #  seed the "old" cert artifacts and then call the cleanup routines   #
+    #  exactly as get_certificates_generic does, asserting that:          #
+    #    - stale artifacts are removed                                    #
+    #    - user-provisioned reference certs (editable=True) are NEVER     #
+    #      removed                                                        #
+    # ------------------------------------------------------------------ #
+
+    def _seed_inline_certs(self, cm, svc, host):
+        """Seed inline-saved certs (user_made=True, editable=False) for a service-scoped service."""
+        cm.save_cert('ingress_ssl_cert', 'inline-cert', service_name=svc, user_made=True, editable=False)
+        cm.save_key('ingress_ssl_key', 'inline-key', service_name=svc, user_made=True, editable=False)
+
+    def _seed_cephadm_signed_certs(self, cm, svc, host):
+        """Seed cephadm-signed certs for a service."""
+        cm.register_self_signed_cert_key_pair(svc)
+        cm.save_self_signed_cert_key_pair(
+            svc,
+            TLSCredentials(CEPHADM_SELF_GENERATED_CERT_1, CEPHADM_SELF_GENERATED_KEY_2048),
+            host=host,
+        )
+
+    def _seed_reference_certs(self, cm, svc):
+        """Seed user-provisioned reference certs (editable=True) for a service."""
+        cm.save_cert('ingress_ssl_cert', 'ref-cert', service_name=svc, user_made=True, editable=True)
+        cm.save_key('ingress_ssl_key', 'ref-key', service_name=svc, user_made=True, editable=True)
+
+    def _run_reconciliation_cleanup(self, cm, cert_source, svc, host,
+                                    cert_name='ingress_ssl_cert', key_name='ingress_ssl_key'):
+        """Simulate the reconciliation cleanup from get_certificates_generic."""
+        if cert_source in ('reference', 'cephadm-signed'):
+            cm.rm_inline_saved_cert_key_pair(
+                cert_name, key_name,
+                service_name=svc, host=host,
+            )
+        if cert_source != 'cephadm-signed':
+            cm.try_rm_self_signed_cert_key_pair(svc, host)
+
+    def test_cleanup_inline_to_cephadm_signed(self, cephadm_module: CephadmOrchestrator):
+        """inline -> cephadm-signed: inline-saved certs removed, reference certs untouched."""
+        cephadm_module._init_cert_mgr()
+        cm = cephadm_module.cert_mgr
+        svc, host = 'ingress.foo', 'host1'
+
+        self._seed_inline_certs(cm, svc, host)
+        self._seed_reference_certs(cm, svc)  # should survive
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'  # editable overwrites non-editable
+
+        # Re-seed inline on top (simulating actual inline-saved state before switch)
+        cm.save_cert('ingress_ssl_cert', 'inline-cert', service_name=svc, user_made=True, editable=False)
+        cm.save_key('ingress_ssl_key', 'inline-key', service_name=svc, user_made=True, editable=False)
+
+        self._run_reconciliation_cleanup(cm, 'cephadm-signed', svc, host)
+
+        # Inline-saved removed
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) is None
+        assert cm.get_key('ingress_ssl_key', service_name=svc) is None
+
+    def test_cleanup_inline_to_reference(self, cephadm_module: CephadmOrchestrator):
+        """inline -> reference: inline-saved removed, cephadm-signed removed, reference survives."""
+        cephadm_module._init_cert_mgr()
+        cm = cephadm_module.cert_mgr
+        svc, host = 'ingress.foo', 'host1'
+
+        self._seed_inline_certs(cm, svc, host)
+        self._seed_cephadm_signed_certs(cm, svc, host)
+
+        cephadm_cert = cm.self_signed_cert(svc)
+        cephadm_key = cm.self_signed_key(svc)
+
+        self._run_reconciliation_cleanup(cm, 'reference', svc, host)
+
+        # Inline-saved removed
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) is None
+        assert cm.get_key('ingress_ssl_key', service_name=svc) is None
+        # Cephadm-signed removed
+        assert cm.get_cert(cephadm_cert, host=host) is None
+        assert cm.get_key(cephadm_key, host=host) is None
+
+    def test_cleanup_cephadm_signed_to_inline(self, cephadm_module: CephadmOrchestrator):
+        """cephadm-signed -> inline: cephadm-signed removed."""
+        cephadm_module._init_cert_mgr()
+        cm = cephadm_module.cert_mgr
+        svc, host = 'ingress.foo', 'host1'
+
+        self._seed_cephadm_signed_certs(cm, svc, host)
+        cephadm_cert = cm.self_signed_cert(svc)
+        cephadm_key = cm.self_signed_key(svc)
+
+        self._run_reconciliation_cleanup(cm, 'inline', svc, host)
+
+        # Cephadm-signed removed
+        assert cm.get_cert(cephadm_cert, host=host) is None
+        assert cm.get_key(cephadm_key, host=host) is None
+
+    def test_cleanup_cephadm_signed_to_reference(self, cephadm_module: CephadmOrchestrator):
+        """cephadm-signed -> reference: cephadm-signed removed, reference untouched."""
+        cephadm_module._init_cert_mgr()
+        cm = cephadm_module.cert_mgr
+        svc, host = 'ingress.foo', 'host1'
+
+        self._seed_cephadm_signed_certs(cm, svc, host)
+        self._seed_reference_certs(cm, svc)
+
+        cephadm_cert = cm.self_signed_cert(svc)
+        cephadm_key = cm.self_signed_key(svc)
+
+        self._run_reconciliation_cleanup(cm, 'reference', svc, host)
+
+        # Cephadm-signed removed
+        assert cm.get_cert(cephadm_cert, host=host) is None
+        assert cm.get_key(cephadm_key, host=host) is None
+        # Reference survives
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+        assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key'
+
+    def test_cleanup_reference_to_inline(self, cephadm_module: CephadmOrchestrator):
+        """reference -> inline: cephadm-signed removed, reference certs NOT removed."""
+        cephadm_module._init_cert_mgr()
+        cm = cephadm_module.cert_mgr
+        svc, host = 'ingress.foo', 'host1'
+
+        self._seed_reference_certs(cm, svc)
+        self._seed_cephadm_signed_certs(cm, svc, host)
+
+        cephadm_cert = cm.self_signed_cert(svc)
+        cephadm_key = cm.self_signed_key(svc)
+
+        self._run_reconciliation_cleanup(cm, 'inline', svc, host)
+
+        # Cephadm-signed removed
+        assert cm.get_cert(cephadm_cert, host=host) is None
+        assert cm.get_key(cephadm_key, host=host) is None
+        # Reference NOT removed (editable=True)
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+        assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key'
+
+    def test_cleanup_reference_to_cephadm_signed(self, cephadm_module: CephadmOrchestrator):
+        """reference -> cephadm-signed: reference certs NOT removed (editable=True preserved)."""
+        cephadm_module._init_cert_mgr()
+        cm = cephadm_module.cert_mgr
+        svc, host = 'ingress.foo', 'host1'
+
+        self._seed_reference_certs(cm, svc)
+
+        self._run_reconciliation_cleanup(cm, 'cephadm-signed', svc, host)
+
+        # Reference NOT removed — rm_inline_saved skips editable=True
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+        assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key'
+
+    def test_cleanup_reference_never_removed_in_any_transition(self, cephadm_module: CephadmOrchestrator):
+        """Reference certs (editable=True) must survive ALL transitions."""
+        cephadm_module._init_cert_mgr()
+        cm = cephadm_module.cert_mgr
+        svc, host = 'ingress.foo', 'host1'
+
+        for target_source in ['inline', 'cephadm-signed', 'reference']:
+            # Reset: seed fresh reference certs
+            cm.save_cert('ingress_ssl_cert', 'ref-cert', service_name=svc, user_made=True, editable=True)
+            cm.save_key('ingress_ssl_key', 'ref-key', service_name=svc, user_made=True, editable=True)
+
+            self._run_reconciliation_cleanup(cm, target_source, svc, host)
+
+            assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert', \
+                f"Reference cert removed during transition to {target_source}"
+            assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key', \
+                f"Reference key removed during transition to {target_source}"
+
+    # ------------------------------------------------------------------ #
+    #  post_remove cleanup tests                                          #
+    #                                                                     #
+    #  These tests verify that CephadmService.post_remove correctly       #
+    #  cleans up TLS artifacts when a daemon is removed, respecting       #
+    #  scope, remaining daemons, and reference cert preservation.         #
+    # ------------------------------------------------------------------ #
+
+    def test_post_remove_cleans_cephadm_signed_regardless_of_source(self, cephadm_module: CephadmOrchestrator):
+        """post_remove always calls try_rm_self_signed_cert_key_pair, even when
+        the current certificate_source is 'inline' or 'reference'."""
+        cephadm_module._init_cert_mgr()
+        cm = cephadm_module.cert_mgr
+        svc, host = 'ingress.foo', 'host1'
+
+        for current_source in ['inline', 'reference', 'cephadm-signed']:
+            # Seed cephadm-signed leftovers
+            self._seed_cephadm_signed_certs(cm, svc, host)
+            cephadm_cert = cm.self_signed_cert(svc)
+            cephadm_key = cm.self_signed_key(svc)
+            assert cm.get_cert(cephadm_cert, host=host) is not None
+
+            # post_remove always does try_rm_self_signed unconditionally
+            cm.try_rm_self_signed_cert_key_pair(svc, host)
+
+            assert cm.get_cert(cephadm_cert, host=host) is None, \
+                f"cephadm-signed cert not cleaned when source={current_source}"
+            assert cm.get_key(cephadm_key, host=host) is None, \
+                f"cephadm-signed key not cleaned when source={current_source}"
+
+    def test_post_remove_inline_saved_cleanup_service_scope(self, cephadm_module: CephadmOrchestrator):
+        """For SERVICE-scoped services, inline-saved certs are only removed when
+        it's the last daemon (other_daemons_in_service=False)."""
+        cephadm_module._init_cert_mgr()
+        cm = cephadm_module.cert_mgr
+        svc = 'ingress.foo'
+
+        # Seed inline-saved (service scope: service_name, host=None)
+        cm.save_cert('ingress_ssl_cert', 'inline-cert', service_name=svc, user_made=True, editable=False)
+        cm.save_key('ingress_ssl_key', 'inline-key', service_name=svc, user_made=True, editable=False)
+
+        # Simulate: other daemons still exist → cleanup should NOT happen
+        # (post_remove returns early if other_daemons_in_service=True for SERVICE scope)
+        # Just verify the certs are still there (no cleanup call made)
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'inline-cert'
+
+        # Simulate: last daemon removed → cleanup should happen (service_name scope, host=None)
+        cm.rm_inline_saved_cert_key_pair(
+            'ingress_ssl_cert', 'ingress_ssl_key',
+            service_name=svc, host=None,
+        )
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) is None
+        assert cm.get_key('ingress_ssl_key', service_name=svc) is None
+
+    def test_post_remove_inline_saved_cleanup_host_scope(self, cephadm_module: CephadmOrchestrator):
+        """For HOST-scoped services (like grafana), inline-saved certs are removed
+        per-host when the last daemon on that host is removed."""
+        cephadm_module._init_cert_mgr()
+        cm = cephadm_module.cert_mgr
+        svc = 'grafana'
+        host1, host2 = 'host1', 'host2'
+
+        # Seed inline-saved per host
+        cm.save_cert('grafana_ssl_cert', 'cert-host1', host=host1, user_made=True, editable=False)
+        cm.save_cert('grafana_ssl_cert', 'cert-host2', host=host2, user_made=True, editable=False)
+
+        # Remove from host1 only
+        cm.rm_inline_saved_cert_key_pair(
+            'grafana_ssl_cert', 'grafana_ssl_key',
+            service_name=svc, host=host1,
+        )
+        # host1 removed, host2 untouched
+        assert cm.get_cert('grafana_ssl_cert', host=host1) is None
+        assert cm.get_cert('grafana_ssl_cert', host=host2) == 'cert-host2'
+
+    def test_post_remove_preserves_reference_certs(self, cephadm_module: CephadmOrchestrator):
+        """post_remove must never remove user-provisioned reference certs (editable=True),
+        even when the service is fully removed."""
+        cephadm_module._init_cert_mgr()
+        cm = cephadm_module.cert_mgr
+        svc, host = 'ingress.foo', 'host1'
+
+        # Seed reference certs
+        self._seed_reference_certs(cm, svc)
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+
+        # Simulate full post_remove cleanup: both try_rm_self_signed + rm_inline_saved
+        cm.try_rm_self_signed_cert_key_pair(svc, host)
+        cm.rm_inline_saved_cert_key_pair(
+            'ingress_ssl_cert', 'ingress_ssl_key',
+            service_name=svc, host=None,
+        )
+
+        # Reference certs must survive
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+        assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key'
+
+    def test_post_remove_cleans_all_stale_artifacts_on_full_removal(self, cephadm_module: CephadmOrchestrator):
+        """When the last daemon is removed, both cephadm-signed and inline-saved
+        artifacts should be cleaned up, but reference certs should survive."""
+        cephadm_module._init_cert_mgr()
+        cm = cephadm_module.cert_mgr
+        svc, host = 'ingress.foo', 'host1'
+
+        # Seed all three types of artifacts
+        self._seed_cephadm_signed_certs(cm, svc, host)
+        self._seed_inline_certs(cm, svc, host)
+        self._seed_reference_certs(cm, svc)
+
+        cephadm_cert = cm.self_signed_cert(svc)
+        cephadm_key = cm.self_signed_key(svc)
+
+        # The reference save (editable=True) overwrites the inline save (editable=False)
+        # for the same key. Re-seed inline to have both in a testable state:
+        # First verify reference is in place
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+
+        # Simulate full post_remove (as the code does):
+        # 1) Always clean cephadm-signed
+        cm.try_rm_self_signed_cert_key_pair(svc, host)
+        # 2) Clean inline-saved (service scope, last daemon)
+        cm.rm_inline_saved_cert_key_pair(
+            'ingress_ssl_cert', 'ingress_ssl_key',
+            service_name=svc, host=None,
+        )
+
+        # Cephadm-signed removed
+        assert cm.get_cert(cephadm_cert, host=host) is None
+        assert cm.get_key(cephadm_key, host=host) is None
+
+        # Reference certs survive (editable=True → rm_inline_saved skips them)
+        assert cm.get_cert('ingress_ssl_cert', service_name=svc) == 'ref-cert'
+        assert cm.get_key('ingress_ssl_key', service_name=svc) == 'ref-key'
+
 
 class MockTLSObject(TLSObjectProtocol):
     STORAGE_PREFIX = "mocktls"
@@ -1188,6 +1903,21 @@ class TestTLSObjectStore(unittest.TestCase):
         self.assertEqual(scope, TLSObjectScope.GLOBAL)
         self.assertEqual(target, None)
 
+    def test_get_tlsobject_if_exists(self):
+        """get_tlsobject_if_exists should never raise and return None on missing/invalid access."""
+        # Unknown entity
+        assert self.store.get_tlsobject_if_exists("unknown_entity") is None
+
+        # Known but missing required target
+        assert self.store.get_tlsobject_if_exists("per_host1") is None
+        assert self.store.get_tlsobject_if_exists("per_service1") is None
+
+        # Existing object returns normally
+        self.store.save_tlsobject("per_host1", "cert_data", host="my_host")
+        obj = self.store.get_tlsobject_if_exists("per_host1", host="my_host")
+        assert obj is not None
+        assert obj.data == "cert_data"
+
     def test_list_tlsobjects(self):
         self.store.save_tlsobject("global_cert_1", "cert_data1")
         self.store.save_tlsobject("global_cert_2", "cert_data2")