]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/ceph_secrets: add unit tests for all modules
authorRedouane Kachach <rkachach@ibm.com>
Thu, 11 Jun 2026 08:55:43 +0000 (10:55 +0200)
committerRedouane Kachach <rkachach@ibm.com>
Thu, 11 Jun 2026 08:56:42 +0000 (10:56 +0200)
Add pytest coverage for the full stack: secret types and URI/path
parsing, storage backend contract, Mon KV store (CRUD, epoch,
serialization, corruption handling), SecretMgr (scan/resolve),
module RPC surface and CLI handlers, and the CephSecretsClient
wrapper. Gate test imports on the UNITTEST env var following the
SMB module pattern.

Fixes: https://tracker.ceph.com/issues/74562
Assisted-by: Claude <claude.ai>
Assisted-by: ChatGPT <chatgpt.com>
Signed-off-by: Redouane Kachach <rkachach@ibm.com>
12 files changed:
src/pybind/mgr/ceph_secrets/__init__.py
src/pybind/mgr/ceph_secrets/tests/__init__.py [new file with mode: 0644]
src/pybind/mgr/ceph_secrets/tests/conftest.py [new file with mode: 0644]
src/pybind/mgr/ceph_secrets/tests/test_backends.py [new file with mode: 0644]
src/pybind/mgr/ceph_secrets/tests/test_module.py [new file with mode: 0644]
src/pybind/mgr/ceph_secrets/tests/test_secret_backend.py [new file with mode: 0644]
src/pybind/mgr/ceph_secrets/tests/test_secret_mgr.py [new file with mode: 0644]
src/pybind/mgr/ceph_secrets/tests/test_secret_store.py [new file with mode: 0644]
src/pybind/mgr/ceph_secrets_client.py
src/pybind/mgr/ceph_secrets_types.py
src/pybind/mgr/tests/test_ceph_secrets_client.py [new file with mode: 0644]
src/pybind/mgr/tests/test_ceph_secrets_types.py [new file with mode: 0644]

index ee85dc9d376e61235d719cf25585769bd07299a6..b02a13abff15d1e255010c9b36dc8d8e85fb1c49 100644 (file)
@@ -1,2 +1,8 @@
-# flake8: noqa
+import os
+
+if 'UNITTEST' in os.environ:
+    import tests  # noqa: F401
+
 from .module import Module
+
+__all__ = ['Module']
diff --git a/src/pybind/mgr/ceph_secrets/tests/__init__.py b/src/pybind/mgr/ceph_secrets/tests/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/pybind/mgr/ceph_secrets/tests/conftest.py b/src/pybind/mgr/ceph_secrets/tests/conftest.py
new file mode 100644 (file)
index 0000000..3625d3e
--- /dev/null
@@ -0,0 +1,20 @@
+# -*- coding: utf-8 -*-
+import pytest
+from mgr_module import MgrModule
+
+
+@pytest.fixture
+def mgr() -> MgrModule:
+    return MgrModule.__new__(MgrModule)
+
+
+@pytest.fixture
+def store(mgr: MgrModule):
+    from ceph_secrets.secret_store import SecretStoreMon
+    return SecretStoreMon(mgr)
+
+
+@pytest.fixture
+def secret_mgr(store):
+    from ceph_secrets.secret_mgr import SecretMgr
+    return SecretMgr(store)
diff --git a/src/pybind/mgr/ceph_secrets/tests/test_backends.py b/src/pybind/mgr/ceph_secrets/tests/test_backends.py
new file mode 100644 (file)
index 0000000..8f40f47
--- /dev/null
@@ -0,0 +1,23 @@
+# -*- coding: utf-8 -*-
+"""Tests for ceph_secrets.backends (registry of storage backends)."""
+from __future__ import annotations
+
+from ceph_secrets.backends import BACKENDS
+from ceph_secrets.secret_store import SecretStoreMon
+
+
+class TestBackendsRegistry:
+    def test_mon_backend_registered(self):
+        assert "mon" in BACKENDS
+
+    def test_mon_maps_to_correct_class(self):
+        assert BACKENDS["mon"] is SecretStoreMon
+
+    def test_only_known_backends(self):
+        # Update this test when new backends are added.
+        assert set(BACKENDS.keys()) == {"mon"}
+
+    def test_backend_is_instantiable(self, mgr):
+        cls = BACKENDS["mon"]
+        instance = cls(mgr)
+        assert isinstance(instance, SecretStoreMon)
diff --git a/src/pybind/mgr/ceph_secrets/tests/test_module.py b/src/pybind/mgr/ceph_secrets/tests/test_module.py
new file mode 100644 (file)
index 0000000..8728ec8
--- /dev/null
@@ -0,0 +1,424 @@
+# -*- coding: utf-8 -*-
+"""Tests for ceph_secrets.module.Module — RPC surface and internal methods."""
+from __future__ import annotations
+
+import json
+import pytest
+from typing import Any
+from unittest.mock import MagicMock, patch
+
+from ceph_secrets.module import Module
+from ceph_secrets.secret_store import SecretStoreMon
+from ceph_secrets.secret_mgr import SecretMgr
+from ceph_secrets_types import (
+    SecretScope,
+    SecretRef,
+    CephSecretException,
+    CephSecretDataError,
+)
+
+
+# ---------------------------------------------------------------------------
+# Helper: build a Module without going through MgrModule.__init__
+# ---------------------------------------------------------------------------
+
+def _make_module(mgr_stub) -> Module:
+    """Bypass MgrModule.__init__ and wire up manually."""
+    mod = object.__new__(Module)
+    mod.secret_mgr = SecretMgr(SecretStoreMon(mgr_stub))
+    return mod
+
+
+# ---------------------------------------------------------------------------
+# ---------------------------------------------------------------------------
+# Module.__init__ — backend selection
+# ---------------------------------------------------------------------------
+
+class TestModuleInit:
+    def test_default_backend_initialises_secret_mgr(self, mgr):
+        """Module.__init__ with default 'mon' backend wires up SecretMgr."""
+        from mgr_module import MgrModule
+        mod = object.__new__(Module)
+        with patch.object(MgrModule, "__init__", lambda self, *a, **kw: None):
+            with patch.object(MgrModule, "get_module_option", return_value="mon"):
+                Module.__init__(mod, mgr)
+        assert isinstance(mod.secret_mgr, SecretMgr)
+
+    def test_unsupported_backend_raises(self, mgr):
+        """Unknown backend name raises RuntimeError."""
+        from mgr_module import MgrModule
+        with patch.object(MgrModule, "__init__", lambda self, *a, **kw: None):
+            with patch("ceph_secrets.module.BACKENDS", {}):
+                with patch.object(MgrModule, "get_module_option", return_value="vault"):
+                    with pytest.raises(RuntimeError, match="Unsupported secrets backend"):
+                        Module.__init__(object.__new__(Module), mgr)
+
+    def test_backend_init_failure_raises(self, mgr):
+        """Backend constructor failure raises RuntimeError with helpful message."""
+        from mgr_module import MgrModule
+        broken_cls = MagicMock(side_effect=Exception("connection refused"))
+        with patch.object(MgrModule, "__init__", lambda self, *a, **kw: None):
+            with patch("ceph_secrets.module.BACKENDS", {"mon": broken_cls}):
+                with patch.object(MgrModule, "get_module_option", return_value="mon"):
+                    with pytest.raises(RuntimeError, match="Failed to initialize secrets backend"):
+                        Module.__init__(object.__new__(Module), mgr)
+
+
+# ---------------------------------------------------------------------------
+# Module RPC surface
+# ---------------------------------------------------------------------------
+
+class TestModuleRPC:
+    @pytest.fixture
+    def mod(self, mgr):
+        return _make_module(mgr)
+
+    def test_secret_get_epoch_zero(self, mod):
+        assert mod.secret_get_epoch("cephadm") == 0
+
+    def test_secret_get_epoch_after_set(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+        assert mod.secret_get_epoch("ns") == 1
+
+    def test_secret_set_and_get(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+        result = mod.secret_get("ns", SecretScope.GLOBAL, "", "pw", reveal=True)
+        assert result is not None
+        assert result["data"] == "s3cr3t"
+
+    def test_secret_get_without_reveal_hides_data(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+        result = mod.secret_get("ns", SecretScope.GLOBAL, "", "pw", reveal=False)
+        assert result is not None
+        assert "data" not in result
+
+    def test_secret_get_missing_returns_none(self, mod):
+        assert mod.secret_get("ns", SecretScope.GLOBAL, "", "ghost") is None
+
+    def test_secret_get_version_existing(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+        assert mod.secret_get_version("ns", SecretScope.GLOBAL, "", "k") == 1
+
+    def test_secret_get_version_missing_returns_none(self, mod):
+        assert mod.secret_get_version("ns", SecretScope.GLOBAL, "", "ghost") is None
+
+    def test_secret_get_value_existing(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+        assert mod.secret_get_value("ns", SecretScope.GLOBAL, "", "pw") == "s3cr3t"
+
+    def test_secret_get_value_missing_returns_none(self, mod):
+        assert mod.secret_get_value("ns", SecretScope.GLOBAL, "", "ghost") is None
+
+    def test_secret_get_value_opaque_string(self, mod):
+        # Any string is valid — including one that looks like JSON
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "creds", '{"u": "a", "p": "b"}')
+        assert mod.secret_get_value("ns", SecretScope.GLOBAL, "", "creds") == '{"u": "a", "p": "b"}'
+
+    def test_secret_get_value_corrupt_raises_data_error(self, mgr, mod):
+        mgr.set_store("secret_store/v1/ns/global/bad", "{not json")
+        with pytest.raises(CephSecretDataError):
+            mod.secret_get_value("ns", SecretScope.GLOBAL, "", "bad")
+
+    def test_secret_get_versions_batch(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "a", "data-a")
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "b", "data-b")
+        result = mod.secret_get_versions([
+            "secret:/ns/global/a",
+            "secret:/ns/global/b",
+            "secret:/ns/global/ghost",
+        ])
+        assert result["secret:/ns/global/a"] == 1
+        assert result["secret:/ns/global/b"] == 1
+        assert result["secret:/ns/global/ghost"] is None
+
+    def test_secret_get_versions_skips_invalid_uris(self, mod):
+        result = mod.secret_get_versions(["not-a-uri", "secret:/ns/badscope/key"])
+        assert len(result) == 0
+
+    def test_secret_get_versions_empty_list(self, mod):
+        assert mod.secret_get_versions([]) == {}
+
+    def test_secret_get_versions_corrupt_raises_data_error(self, mgr, mod):
+        """Corruption must not be silently swallowed as a missing/invalid URI."""
+        mgr.set_store("secret_store/v1/ns/global/bad", "{not json")
+        with pytest.raises(CephSecretDataError):
+            mod.secret_get_versions(["secret:/ns/global/bad"])
+
+    def test_secret_set_returns_metadata(self, mod):
+        result = mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+        assert "metadata" in result
+        assert result["metadata"]["version"] == 1
+
+    def test_secret_set_empty_data_raises(self, mod):
+        with pytest.raises(CephSecretException, match="must not be empty"):
+            mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "")
+
+    def test_secret_rm_existing(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+        assert mod.secret_rm("ns", SecretScope.GLOBAL, "", "k") is True
+
+    def test_secret_rm_nonexistent(self, mod):
+        assert mod.secret_rm("ns", SecretScope.GLOBAL, "", "ghost") is False
+
+    def test_secret_ls_empty(self, mod):
+        assert mod.secret_ls(namespace="ns") == {}
+
+    def test_secret_ls_returns_records(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "a", "data-a")
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "b", "data-b")
+        out = mod.secret_ls(namespace="ns")
+        assert "ns/global/a" in out
+        assert "ns/global/b" in out
+
+    def test_secret_ls_scope_filter(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "g", "data-g")
+        mod.secret_set("ns", SecretScope.SERVICE, "prom", "auth", "data-auth")
+        out = mod.secret_ls(namespace="ns", scope="service")
+        assert all("service" in k for k in out.keys())
+
+    def test_secret_ls_with_target(self, mod):
+        mod.secret_set("ns", SecretScope.SERVICE, "prom", "auth", "data-auth")
+        out = mod.secret_ls(namespace="ns", scope="service", target="prom")
+        assert "ns/service/prom/auth" in out
+
+    def test_secret_ls_show_internals(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+        out = mod.secret_ls(namespace="ns", show_internals=True)
+        rec = out["ns/global/k"]
+        assert "policy" in rec
+
+    def test_secret_ls_key_format_global(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+        out = mod.secret_ls(namespace="ns")
+        assert "ns/global/k" in out
+        for k in out:
+            assert "//" not in k
+
+    def test_scan_refs(self, mod):
+        result = mod.scan_refs({"key": "secret:/ns/global/pw"}, namespace="ns")
+        assert "secret:/ns/global/pw" in result
+
+    def test_scan_unresolved_refs_all_missing(self, mod):
+        result = mod.scan_unresolved_refs("secret:/ns/global/ghost", namespace="ns")
+        assert "secret:/ns/global/ghost" in result
+
+    def test_scan_unresolved_refs_exists(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "x")
+        result = mod.scan_unresolved_refs("secret:/ns/global/pw", namespace="ns")
+        assert result == []
+
+    def test_resolve_object(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+        result = mod.resolve_object("secret:/ns/global/pw")
+        assert result == "s3cr3t"
+
+
+# ---------------------------------------------------------------------------
+# Module internal ref-based methods
+# ---------------------------------------------------------------------------
+
+class TestModuleInternals:
+    @pytest.fixture
+    def mod(self, mgr):
+        return _make_module(mgr)
+
+    def _ref(self, ns="ns", scope=SecretScope.GLOBAL, target="", name="key"):
+        return SecretRef(ns, scope, target, name)
+
+    def test_secret_get_existing(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "key", "data-x")
+        result = mod._secret_get(self._ref())
+        assert result is not None
+
+    def test_secret_get_not_found(self, mod):
+        assert mod._secret_get(self._ref(name="ghost")) is None
+
+    def test_secret_get_corrupt_raises_data_error(self, mgr, mod):
+        mgr.set_store("secret_store/v1/ns/global/bad", "{not json")
+        with pytest.raises(CephSecretDataError):
+            mod._secret_get(self._ref(name="bad"))
+
+    def test_secret_get_version_existing(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "key", "data-x")
+        assert mod._secret_get_version(self._ref()) == 1
+
+    def test_secret_get_version_not_found(self, mod):
+        assert mod._secret_get_version(self._ref(name="ghost")) is None
+
+    def test_secret_get_version_corrupt_raises_data_error(self, mgr, mod):
+        mgr.set_store("secret_store/v1/ns/global/bad", "{not json")
+        with pytest.raises(CephSecretDataError):
+            mod._secret_get_version(self._ref(name="bad"))
+
+    def test_secret_set_returns_metadata_without_data(self, mod):
+        result = mod._secret_set(self._ref(), "data-x")
+        assert "metadata" in result
+        assert "data" not in result
+
+    def test_secret_rm_existing(self, mod):
+        mod._secret_set(self._ref(), "data-x")
+        assert mod._secret_rm(self._ref()) is True
+
+    def test_secret_rm_nonexistent(self, mod):
+        assert mod._secret_rm(self._ref(name="ghost")) is False
+
+
+# ---------------------------------------------------------------------------
+# CLI handler logic (path-based dispatch)
+# ---------------------------------------------------------------------------
+
+class TestModuleCLIHandlers:
+    @pytest.fixture
+    def mod(self, mgr):
+        return _make_module(mgr)
+
+    def _ok(self, result: Any) -> Any:
+        """Unwrap a Responder tuple (retcode, body, status) → parsed dict.
+        Falls through unchanged when Responder is a no-op stub."""
+        if isinstance(result, tuple):
+            retcode, body, _ = result
+            assert retcode == 0, f"unexpected error retcode {retcode}: {body}"
+            return json.loads(body)
+        return result
+
+    def _assert_error(self, result: Any, match: str = "") -> None:
+        """Assert a CLI call failed — either via a non-zero tuple or ErrorResponse.
+        Never accepts a raw CephSecretException (would mean _handle_secret_errors broke)."""
+        from object_format import ErrorResponse
+        if isinstance(result, tuple):
+            retcode, body, status = result
+            assert retcode != 0, "expected non-zero retcode"
+            if match:
+                assert match in (body + status).lower(), \
+                    f"expected {match!r} in response, got body={body!r} status={status!r}"
+        elif isinstance(result, ErrorResponse):
+            if match:
+                assert match in str(result).lower()
+        else:
+            raise AssertionError(
+                f"expected error tuple or ErrorResponse, got {type(result).__name__}: {result!r}"
+            )
+
+    def test_cli_get_by_path_found(self, mod):
+        mod.secret_set("ns", SecretScope.GLOBAL, "", "key", "data-x")
+        result = self._ok(mod._cli_secret_get_by_path(path="ns/global/key"))
+        assert "metadata" in result
+
+    def test_cli_get_by_path_not_found(self, mod):
+        from object_format import ErrorResponse
+        try:
+            result = mod._cli_secret_get_by_path(path="ns/global/ghost")
+            self._assert_error(result, match="not found")
+        except ErrorResponse as e:
+            assert "not found" in str(e).lower()
+
+    def test_cli_get_by_path_bad_path(self, mod):
+        from object_format import ErrorResponse
+        try:
+            result = mod._cli_secret_get_by_path(path="ns/badscope/key")
+            self._assert_error(result)
+        except ErrorResponse:
+            pass
+
+    def test_cli_get_corrupt_raises_data_error(self, mgr, mod):
+        from object_format import ErrorResponse
+        mgr.set_store("secret_store/v1/ns/global/bad", "{not json")
+        # CephSecretDataError is a CephSecretException subclass so _handle_secret_errors
+        # wraps it into ErrorResponse; Responder formats it as a non-zero tuple.
+        try:
+            result = mod._cli_secret_get_by_path(path="ns/global/bad")
+            self._assert_error(result)
+        except ErrorResponse:
+            pass
+
+    def test_cli_set_by_path(self, mod):
+        result = self._ok(mod._cli_secret_set_by_path(
+            path="ns/global/key",
+            inbuf="s3cr3t"
+        ))
+        assert result["metadata"]["version"] == 1
+
+    def test_cli_set_by_path_no_inbuf(self, mod):
+        from object_format import ErrorResponse
+        try:
+            result = mod._cli_secret_set_by_path(path="ns/global/key", inbuf=None)
+            self._assert_error(result, match="-i")
+        except ErrorResponse as e:
+            assert "-i" in str(e)
+
+    def test_cli_set_by_path_empty_data(self, mod):
+        from object_format import ErrorResponse
+        try:
+            result = mod._cli_secret_set_by_path(path="ns/global/key", inbuf="")
+            self._assert_error(result, match="must not be empty")
+        except ErrorResponse as e:
+            assert "must not be empty" in str(e).lower()
+
+    def test_cli_rm_by_path_existing(self, mod):
+        mod._cli_secret_set_by_path(path="ns/global/key", inbuf='{"v": "x"}')
+        result = self._ok(mod._cli_secret_rm_by_path(path="ns/global/key"))
+        assert result["status"] == "removed"
+
+    def test_cli_rm_by_path_not_found(self, mod):
+        result = self._ok(mod._cli_secret_rm_by_path(path="ns/global/ghost"))
+        assert result["status"] == "not_found"
+
+    def test_cli_rm_by_path_bad_path(self, mod):
+        from object_format import ErrorResponse
+        try:
+            result = mod._cli_secret_rm_by_path(path="ns/badscope/key")
+            self._assert_error(result)
+        except ErrorResponse:
+            pass
+
+    def test_cli_ls(self, mod):
+        mod._cli_secret_set_by_path(path="ns/global/a", inbuf='{"v": "1"}')
+        mod._cli_secret_set_by_path(path="ns/global/b", inbuf='{"v": "2"}')
+        result = self._ok(mod._cli_secret_ls(namespace="ns"))
+        assert "ns/global/a" in result
+        assert "ns/global/b" in result
+
+    def test_cli_get_with_reveal(self, mod):
+        mod._cli_secret_set_by_path(path="ns/global/pw", inbuf="s3cr3t")
+        result = self._ok(mod._cli_secret_get_by_path(path="ns/global/pw", reveal=True))
+        assert result["data"] == "s3cr3t"
+
+    def test_cli_get_without_reveal_hides_data(self, mod):
+        mod._cli_secret_set_by_path(path="ns/global/pw", inbuf="s3cr3t")
+        result = self._ok(mod._cli_secret_get_by_path(path="ns/global/pw", reveal=False))
+        assert "data" not in result
+
+    def _get_value(self, result: Any) -> str:
+        """Unwrap a get-value tuple (retcode, body, status) → raw string."""
+        if isinstance(result, tuple):
+            retcode, body, _ = result
+            assert retcode == 0, f"unexpected error retcode {retcode}: {body}"
+            return body
+        return result
+
+    def test_cli_get_value_returns_raw_string(self, mod):
+        mod._cli_secret_set_by_path(path="ns/global/pw", inbuf="s3cr3t")
+        result = self._get_value(mod._cli_secret_get_value_by_path(path="ns/global/pw"))
+        assert result == "s3cr3t"
+
+    def test_cli_get_value_not_found_raises(self, mod):
+        from object_format import ErrorResponse
+        try:
+            result = mod._cli_secret_get_value_by_path(path="ns/global/ghost")
+            self._assert_error(result)
+        except ErrorResponse:
+            pass
+
+    def test_cli_get_value_bad_path_raises(self, mod):
+        from object_format import ErrorResponse
+        try:
+            result = mod._cli_secret_get_value_by_path(path="ns/badscope/key")
+            self._assert_error(result)
+        except ErrorResponse:
+            pass
+
+    def test_cli_get_value_opaque_json_string(self, mod):
+        payload = '{"u": "a", "p": "b"}'
+        mod._cli_secret_set_by_path(path="ns/global/creds", inbuf=payload)
+        result = self._get_value(mod._cli_secret_get_value_by_path(path="ns/global/creds"))
+        assert result == payload
diff --git a/src/pybind/mgr/ceph_secrets/tests/test_secret_backend.py b/src/pybind/mgr/ceph_secrets/tests/test_secret_backend.py
new file mode 100644 (file)
index 0000000..1b98dd6
--- /dev/null
@@ -0,0 +1,80 @@
+# -*- coding: utf-8 -*-
+"""Tests for the SecretStorageBackend ABC and contract."""
+from __future__ import annotations
+
+import pytest
+from abc import ABC
+
+from ceph_secrets.secret_backend import SecretStorageBackend
+from ceph_secrets_types import SecretScope
+
+
+class TestSecretStorageBackend:
+    def test_is_abstract(self):
+        assert issubclass(SecretStorageBackend, ABC)
+
+    def test_cannot_instantiate_directly(self):
+        with pytest.raises(TypeError):
+            SecretStorageBackend()  # type: ignore[abstract]
+
+    def test_concrete_subclass_instantiates(self, store):
+        """SecretStoreMon (the concrete impl) must satisfy the ABC."""
+        assert isinstance(store, SecretStorageBackend)
+
+    def test_abstract_methods_defined(self):
+        abstract = SecretStorageBackend.__abstractmethods__
+        assert 'get' in abstract
+        assert 'set' in abstract
+        assert 'rm' in abstract
+        assert 'ls' in abstract
+        assert 'get_epoch' in abstract
+        assert 'bump_epoch' in abstract
+
+    def test_partial_implementation_still_abstract(self):
+        """Subclass missing any abstract method stays abstract."""
+        class PartialImpl(SecretStorageBackend):
+            def get(self, ns, scope, target, name):
+                pass
+
+            def set(self, ns, scope, target, name, data, user_made=True, editable=True):
+                pass
+
+            def rm(self, ns, scope, target, name):
+                pass
+
+            def ls(self, namespace=None, scope=None, target=None):
+                pass
+
+            def get_epoch(self, namespace):
+                pass
+            # missing bump_epoch
+
+        with pytest.raises(TypeError):
+            PartialImpl()  # type: ignore[abstract]
+
+    def test_full_implementation_is_instantiable(self):
+
+        class FullImpl(SecretStorageBackend):
+
+            def get(self, ns, scope, target, name):
+                return None
+
+            def set(self, ns, scope, target, name, data, user_made=True, editable=True):
+                pass
+
+            def rm(self, ns, scope, target, name):
+                return False
+
+            def ls(self, namespace=None, scope=None, target=None):
+                return []
+
+            def get_epoch(self, namespace):
+                return 0
+
+            def bump_epoch(self, namespace):
+                return 1
+
+        impl = FullImpl()
+        assert impl.get("ns", SecretScope.GLOBAL, "", "k") is None
+        assert impl.get_epoch("ns") == 0
+        assert impl.bump_epoch("ns") == 1
diff --git a/src/pybind/mgr/ceph_secrets/tests/test_secret_mgr.py b/src/pybind/mgr/ceph_secrets/tests/test_secret_mgr.py
new file mode 100644 (file)
index 0000000..a4be58a
--- /dev/null
@@ -0,0 +1,339 @@
+# -*- coding: utf-8 -*-
+"""Tests for ceph_secrets.secret_mgr (SecretMgr)."""
+from __future__ import annotations
+
+import pytest
+
+from ceph_secrets_types import (
+    SecretScope, SecretRef,
+    CephSecretException, CephSecretNotFoundError,
+    BadSecretURI,
+)
+
+
+# ============================================================
+# make_ref
+# ============================================================
+
+class TestMakeRef:
+    def test_basic(self, secret_mgr):
+        ref = secret_mgr.make_ref("ns", SecretScope.GLOBAL, "", "key")
+        assert isinstance(ref, SecretRef)
+        assert ref.namespace == "ns"
+
+    def test_scope_as_string(self, secret_mgr):
+        ref = secret_mgr.make_ref("ns", "host", "node1", "ssh_key")
+        assert ref.scope == SecretScope.HOST
+
+    def test_bad_scope_raises(self, secret_mgr):
+        with pytest.raises(CephSecretException):
+            secret_mgr.make_ref("ns", "badscope", "", "key")
+
+    def test_bad_namespace_raises(self, secret_mgr):
+        with pytest.raises(CephSecretException):
+            secret_mgr.make_ref("bad ns!", SecretScope.GLOBAL, "", "key")
+
+
+# ============================================================
+# get / get_value
+# ============================================================
+
+class TestGet:
+    def test_get_existing(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+        ref = SecretRef("ns", SecretScope.GLOBAL, "", "pw")
+        rec = secret_mgr.get(ref)
+        assert rec.data == "s3cr3t"
+
+    def test_get_missing_raises(self, secret_mgr):
+        ref = SecretRef("ns", SecretScope.GLOBAL, "", "ghost")
+        with pytest.raises(CephSecretNotFoundError):
+            secret_mgr.get(ref)
+
+    def test_get_value(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+        ref = SecretRef("ns", SecretScope.GLOBAL, "", "pw")
+        assert secret_mgr.get_value(ref) == "s3cr3t"
+
+    def test_get_value_opaque_string(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "creds", '{"u": "admin", "p": "pw"}')
+        ref = SecretRef("ns", SecretScope.GLOBAL, "", "creds")
+        val = secret_mgr.get_value(ref)
+        assert isinstance(val, str)
+        assert val == '{"u": "admin", "p": "pw"}'
+
+    def test_get_value_missing_raises(self, secret_mgr):
+        ref = SecretRef("ns", SecretScope.GLOBAL, "", "ghost")
+        with pytest.raises(CephSecretNotFoundError):
+            secret_mgr.get_value(ref)
+
+
+# ============================================================
+# set
+# ============================================================
+
+class TestSet:
+    def test_set_global(self, secret_mgr):
+        rec = secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "data-v1")
+        assert rec.metadata.version == 1
+        assert rec.data == "data-v1"
+
+    def test_set_service(self, secret_mgr):
+        rec = secret_mgr.set("ns", SecretScope.SERVICE, "prom", "auth", "user-a")
+        assert rec.target == "prom"
+
+    def test_set_custom(self, secret_mgr):
+        rec = secret_mgr.set("ns", SecretScope.CUSTOM, "", "a/b/c", "tok")
+        assert rec.name == "a/b/c"
+
+    def test_set_non_str_raises(self, secret_mgr):
+        with pytest.raises(CephSecretException, match="string"):
+            secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", {"not": "a-string"})  # type: ignore[arg-type]
+
+    def test_set_empty_string_raises(self, secret_mgr):
+        with pytest.raises(CephSecretException, match="must not be empty"):
+            secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "")
+
+    def test_set_preserves_whitespace_and_newlines(self, secret_mgr):
+        payload = "  secret-value\n"
+        rec = secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", payload)
+        assert rec.data == payload
+        assert secret_mgr.get_value(SecretRef("ns", SecretScope.GLOBAL, "", "k")) == payload
+
+    def test_set_increments_version(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "data-v1")
+        rec = secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "data-v2")
+        assert rec.metadata.version == 2
+
+    def test_set_scope_string(self, secret_mgr):
+        rec = secret_mgr.set("ns", "global", "", "k", "data-x")
+        assert rec.scope == SecretScope.GLOBAL
+
+
+# ============================================================
+# rm
+# ============================================================
+
+class TestRm:
+    def test_rm_existing(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+        assert secret_mgr.rm("ns", SecretScope.GLOBAL, "", "k") is True
+
+    def test_rm_nonexistent(self, secret_mgr):
+        assert secret_mgr.rm("ns", SecretScope.GLOBAL, "", "ghost") is False
+
+
+# ============================================================
+# ls
+# ============================================================
+
+class TestLs:
+    def test_ls_empty(self, secret_mgr):
+        assert secret_mgr.ls(namespace="ns") == []
+
+    def test_ls_returns_records(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "a", "data-a")
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "b", "data-b")
+        recs = secret_mgr.ls(namespace="ns")
+        assert len(recs) == 2
+
+    def test_ls_scope_filter(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "g", "data-g")
+        secret_mgr.set("ns", SecretScope.SERVICE, "prom", "auth", "data-auth")
+        recs = secret_mgr.ls(namespace="ns", scope="service")
+        assert len(recs) == 1
+        assert recs[0].scope == SecretScope.SERVICE
+
+
+# ============================================================
+# scan_refs / scan_unresolved_refs
+# ============================================================
+
+class TestScanRefs:
+    def test_scan_simple_string(self, secret_mgr):
+        obj = "secret:/ns/global/pw"
+        refs = secret_mgr.scan_refs(obj, namespace="ns")
+        uris = {r.to_uri() for r in refs}
+        assert "secret:/ns/global/pw" in uris
+
+    def test_scan_in_dict(self, secret_mgr):
+        obj = {"key": "secret:/ns/global/pw"}
+        refs = secret_mgr.scan_refs(obj, namespace="ns")
+        assert len(refs) == 1
+
+    def test_scan_in_list(self, secret_mgr):
+        obj = ["secret:/ns/global/pw", "secret:/ns/host/node1/ssh"]
+        refs = secret_mgr.scan_refs(obj, namespace="ns")
+        assert len(refs) == 2
+
+    def test_scan_nested(self, secret_mgr):
+        obj = {"a": {"b": "secret:/ns/global/pw"}}
+        refs = secret_mgr.scan_refs(obj, namespace="ns")
+        assert len(refs) == 1
+
+    def test_scan_no_refs(self, secret_mgr):
+        obj = {"plain": "value"}
+        assert secret_mgr.scan_refs(obj, namespace="ns") == set()
+
+    def test_scan_bad_uri_yields_bad_secret_uri(self, secret_mgr):
+        obj = "secret:/ns/badscope/key"
+        refs = secret_mgr.scan_refs(obj, namespace="ns")
+        bad = [r for r in refs if isinstance(r, BadSecretURI)]
+        assert len(bad) == 1
+
+    def test_scan_unresolved_all_exist(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "x")
+        obj = "secret:/ns/global/pw"
+        unresolved = secret_mgr.scan_unresolved_refs(obj, namespace="ns")
+        assert len(unresolved) == 0
+
+    def test_scan_unresolved_missing(self, secret_mgr):
+        obj = "secret:/ns/global/ghost"
+        unresolved = secret_mgr.scan_unresolved_refs(obj, namespace="ns")
+        assert len(unresolved) == 1
+
+    def test_scan_unresolved_bad_uri_is_unresolved(self, secret_mgr):
+        obj = "secret:/ns/badscope/key"
+        unresolved = secret_mgr.scan_unresolved_refs(obj, namespace="ns")
+        assert len(unresolved) == 1
+
+    def test_scan_unresolved_embedded_uri_is_unresolved(self, secret_mgr):
+        # Even though the referenced secret exists, an embedded (non-whole-value)
+        # URI is never resolvable, so it must be reported as unresolved so that
+        # pre-deploy validation catches it.
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "x")
+        obj = {"auth": "Bearer secret:/ns/global/pw"}
+        unresolved = secret_mgr.scan_unresolved_refs(obj, namespace="ns")
+        assert len(unresolved) == 1
+
+
+# ============================================================
+# resolve_object
+# ============================================================
+
+class TestResolveObject:
+    def test_resolve_secret(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+        result = secret_mgr.resolve_object("secret:/ns/global/pw")
+        assert result == "s3cr3t"
+
+    def test_resolve_opaque_json_string(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "creds", '{"u": "a", "p": "b"}')
+        result = secret_mgr.resolve_object("secret:/ns/global/creds")
+        assert isinstance(result, str)
+        assert result == '{"u": "a", "p": "b"}'
+
+    def test_resolve_in_dict(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+        result = secret_mgr.resolve_object({"password": "secret:/ns/global/pw"})
+        assert result["password"] == "s3cr3t"
+
+    def test_resolve_in_list(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "a", "x")
+        result = secret_mgr.resolve_object(["secret:/ns/global/a", "plain"])
+        assert result[0] == "x"
+        assert result[1] == "plain"
+
+    def test_resolve_in_tuple(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "t", "y")
+        result = secret_mgr.resolve_object(("secret:/ns/global/t",))
+        assert result == ("y",)
+
+    def test_resolve_non_secret_string_unchanged(self, secret_mgr):
+        result = secret_mgr.resolve_object("just a normal string")
+        assert result == "just a normal string"
+
+    def test_resolve_non_string_unchanged(self, secret_mgr):
+        assert secret_mgr.resolve_object(42) == 42
+        assert secret_mgr.resolve_object(None) is None
+
+    def test_resolve_missing_secret_raises(self, secret_mgr):
+        with pytest.raises(CephSecretException):
+            secret_mgr.resolve_object("secret:/ns/global/ghost")
+
+    def test_resolve_invalid_uri_raises(self, secret_mgr):
+        with pytest.raises(CephSecretException):
+            secret_mgr.resolve_object("secret:/ns/badscope/key")
+
+    def test_resolve_edge_whitespace_tolerated(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+        assert secret_mgr.resolve_object("  secret:/ns/global/pw  ") == "s3cr3t"
+
+    def test_resolve_newline_whitespace_tolerated(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+        assert secret_mgr.resolve_object("\nsecret:/ns/global/pw\n") == "s3cr3t"
+
+    def test_resolve_ref_with_suffix_raises(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+        with pytest.raises(CephSecretException):
+            secret_mgr.resolve_object("secret:/ns/global/pw suffix")
+
+    def test_resolve_embedded_uri_raises(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+        with pytest.raises(CephSecretException, match="embedded"):
+            secret_mgr.resolve_object("Bearer secret:/ns/global/pw")
+
+    def test_resolve_embedded_uri_in_dict_raises(self, secret_mgr):
+        secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+        with pytest.raises(CephSecretException, match="embedded"):
+            secret_mgr.resolve_object({"auth": "Bearer secret:/ns/global/pw"})
+
+    def test_resolve_plain_string_whitespace_preserved(self, secret_mgr):
+        # non-secret strings pass through byte-for-byte, including whitespace
+        assert secret_mgr.resolve_object("  plain value  ") == "  plain value  "
+
+
+# ============================================================
+# scan_refs — edge cases
+# ============================================================
+
+class TestScanRefsEdgeCases:
+    def test_embedded_refs_rejected_as_bad(self, secret_mgr):
+        # A string that embeds URIs inside other text is NOT a whole-value
+        # reference; it is surfaced as a single BadSecretURI, not parsed into
+        # multiple SecretRefs.
+        obj = "use secret:/ns/global/a and secret:/ns/global/b"
+        refs = secret_mgr.scan_refs(obj, namespace="ns")
+        assert len(refs) == 1
+        bad = [r for r in refs if isinstance(r, BadSecretURI)]
+        assert len(bad) == 1
+        assert bad[0].raw == obj
+
+    def test_duplicate_refs_deduped(self, secret_mgr):
+        obj = ["secret:/ns/global/pw", "secret:/ns/global/pw"]
+        refs = secret_mgr.scan_refs(obj, namespace="ns")
+        uris = [r.to_uri() for r in refs]
+        assert uris.count("secret:/ns/global/pw") == 1
+
+    def test_ref_followed_by_punctuation(self, secret_mgr):
+        # "secret:/ns/global/pw," is a whole-value string that starts with the
+        # prefix but is not a valid URI (trailing comma in name) → BadSecretURI.
+        obj = "secret:/ns/global/pw,"
+        refs = secret_mgr.scan_refs(obj, namespace="ns")
+        bad = [r for r in refs if isinstance(r, BadSecretURI)]
+        assert len(bad) == 1
+
+    def test_whole_value_ref_with_edge_whitespace(self, secret_mgr):
+        # Surrounding whitespace around an otherwise-clean URI is tolerated.
+        obj = "  secret:/ns/global/pw  "
+        refs = secret_mgr.scan_refs(obj, namespace="ns")
+        uris = {r.to_uri() for r in refs if isinstance(r, SecretRef)}
+        assert "secret:/ns/global/pw" in uris
+
+    def test_embedded_ref_in_dict_value_is_bad(self, secret_mgr):
+        obj = {"auth": "Bearer secret:/ns/global/pw"}
+        refs = secret_mgr.scan_refs(obj, namespace="ns")
+        bad = [r for r in refs if isinstance(r, BadSecretURI)]
+        assert len(bad) == 1
+
+    def test_ref_inside_tuple(self, secret_mgr):
+        obj = ("secret:/ns/global/pw",)
+        refs = secret_mgr.scan_refs(obj, namespace="ns")
+        assert len(refs) == 1
+
+    def test_cross_namespace_ref_is_scanned(self, secret_mgr):
+        """scan_refs finds refs regardless of namespace — namespace arg does not filter."""
+        obj = "secret:/other/global/pw"
+        refs = secret_mgr.scan_refs(obj, namespace="ns")
+        uris = {r.to_uri() for r in refs}
+        assert "secret:/other/global/pw" in uris
diff --git a/src/pybind/mgr/ceph_secrets/tests/test_secret_store.py b/src/pybind/mgr/ceph_secrets/tests/test_secret_store.py
new file mode 100644 (file)
index 0000000..5473944
--- /dev/null
@@ -0,0 +1,516 @@
+# -*- coding: utf-8 -*-
+"""Tests for ceph_secrets.secret_store (SecretStoreMon + data model)."""
+from __future__ import annotations
+
+import pytest
+
+# conftest injects stubs; just import production code after.
+from ceph_secrets.secret_store import (
+    SecretRecord,
+    SecretMetadata,
+    SecretPolicy,
+    SECRET_STORE_PREFIX,
+    SECRET_STORE_FORMAT_VERSION,
+    _checked_ref,
+    _checked_namespace,
+    _epoch_key,
+    _now_iso,
+    _expect_bool,
+    _expect_str,
+    _expect_positive_int,
+    _expect_object,
+    _reject_unknown_keys,
+    _require_keys,
+)
+from ceph_secrets_types import (
+    SecretScope, SecretRef,
+    CephSecretException, CephSecretDataError,
+)
+
+
+# ============================================================
+# Helpers / primitive validators
+# ============================================================
+
+class TestPrimitiveHelpers:
+    def test_now_iso_format(self):
+        ts = _now_iso()
+        import re
+        assert re.match(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', ts)
+
+    def test_expect_bool_ok(self):
+        assert _expect_bool("f", True) is True
+        assert _expect_bool("f", False) is False
+
+    def test_expect_bool_bad(self):
+        with pytest.raises(CephSecretDataError, match="must be a boolean"):
+            _expect_bool("f", 1)
+
+    def test_expect_str_ok(self):
+        assert _expect_str("f", "hello") == "hello"
+
+    def test_expect_str_bad(self):
+        with pytest.raises(CephSecretDataError, match="must be a string"):
+            _expect_str("f", 42)
+
+    def test_expect_positive_int_ok(self):
+        assert _expect_positive_int("f", 1) == 1
+        assert _expect_positive_int("f", 99) == 99
+
+    def test_expect_positive_int_zero(self):
+        with pytest.raises(CephSecretDataError):
+            _expect_positive_int("f", 0)
+
+    def test_expect_positive_int_bool_rejected(self):
+        with pytest.raises(CephSecretDataError):
+            _expect_positive_int("f", True)
+
+    def test_expect_object_ok(self):
+        d = {"a": 1}
+        assert _expect_object("o", d) is d
+
+    def test_expect_object_bad(self):
+        with pytest.raises(CephSecretDataError, match="must be a JSON object"):
+            _expect_object("o", [1, 2])
+
+    def test_reject_unknown_keys(self):
+        with pytest.raises(CephSecretDataError, match="unknown field"):
+            _reject_unknown_keys("L", {"a": 1, "z": 2}, {"a"})
+
+    def test_require_keys_missing(self):
+        with pytest.raises(CephSecretDataError, match="missing required"):
+            _require_keys("L", {"a": 1}, {"a", "b"})
+
+    def test_checked_namespace_valid(self):
+        assert _checked_namespace("cephadm") == "cephadm"
+
+    def test_checked_namespace_bad(self):
+        with pytest.raises(CephSecretDataError):
+            _checked_namespace("bad ns!")
+
+    def test_epoch_key_format(self):
+        assert _epoch_key("cephadm") == "secret_store/meta/cephadm/_epoch"
+
+    def test_checked_ref_valid(self):
+        ref = _checked_ref("ns", SecretScope.GLOBAL, "", "key")
+        assert isinstance(ref, SecretRef)
+
+    def test_checked_ref_bad_scope(self):
+        with pytest.raises(CephSecretDataError):
+            _checked_ref("ns", "bad", "", "key")
+
+
+# ============================================================
+# SecretMetadata
+# ============================================================
+
+class TestSecretMetadata:
+    def test_defaults(self):
+        m = SecretMetadata(version=1, created="2024-01-01T00:00:00Z", updated="2024-01-01T00:00:00Z")
+        assert m.version == 1
+
+    def test_to_json(self):
+        m = SecretMetadata(version=2, created="2024-01-01T00:00:00Z", updated="2024-01-02T00:00:00Z")
+        d = m.to_json()
+        assert d["version"] == 2
+        assert "created" in d
+        assert "updated" in d
+
+    def test_from_json_roundtrip(self):
+        m = SecretMetadata(version=3, created="2024-01-01T00:00:00Z", updated="2024-01-02T00:00:00Z")
+        m2 = SecretMetadata.from_json(m.to_json())
+        assert m2.version == 3
+
+    def test_from_json_missing_field(self):
+        with pytest.raises(CephSecretDataError, match="missing required"):
+            SecretMetadata.from_json({"version": 1, "created": "x"})
+
+    def test_from_json_unknown_field(self):
+        with pytest.raises(CephSecretDataError, match="unknown"):
+            SecretMetadata.from_json({"version": 1, "created": "x", "updated": "y", "extra": 1})
+
+    def test_version_zero_raises(self):
+        with pytest.raises(CephSecretDataError):
+            SecretMetadata(version=0, created="x", updated="y")
+
+    def test_non_dict_raises(self):
+        with pytest.raises(CephSecretDataError):
+            SecretMetadata.from_json("not a dict")
+
+
+# ============================================================
+# SecretPolicy
+# ============================================================
+
+class TestSecretPolicy:
+    def test_defaults(self):
+        p = SecretPolicy()
+        assert p.user_made is True
+        assert p.editable is True
+
+    def test_to_json(self):
+        p = SecretPolicy(user_made=False, editable=True)
+        d = p.to_json()
+        assert d["user_made"] is False
+
+    def test_from_json_roundtrip(self):
+        p = SecretPolicy(user_made=True, editable=False)
+        p2 = SecretPolicy.from_json(p.to_json())
+        assert p2.editable is False
+
+    def test_from_json_missing(self):
+        with pytest.raises(CephSecretDataError):
+            SecretPolicy.from_json({"user_made": True})
+
+    def test_from_json_bad_type(self):
+        with pytest.raises(CephSecretDataError, match="must be a boolean"):
+            SecretPolicy.from_json({"user_made": 1, "editable": True})
+
+    def test_non_bool_raises(self):
+        with pytest.raises(CephSecretDataError):
+            SecretPolicy(user_made="yes", editable=True)  # type: ignore[arg-type]
+
+
+# ============================================================
+# SecretRecord
+# ============================================================
+
+class TestSecretRecord:
+    def _ref(self, scope=SecretScope.GLOBAL, target="", name="key"):
+        return SecretRef("ns", scope, target, name)
+
+    def _meta(self):
+        return SecretMetadata(version=1, created="2024-01-01T00:00:00Z", updated="2024-01-01T00:00:00Z")
+
+    def test_basic_construction(self):
+        rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="s3cr3t")
+        assert rec.namespace == "ns"
+        assert rec.scope == SecretScope.GLOBAL
+        assert rec.name == "key"
+
+    def test_to_public_json_no_data(self):
+        rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="s3cr3t")
+        out = rec.to_public_json(include_data=False)
+        assert "data" not in out
+        assert "metadata" in out
+
+    def test_to_public_json_with_data(self):
+        rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="s3cr3t")
+        out = rec.to_public_json(include_data=True)
+        assert out["data"] == "s3cr3t"
+
+    def test_to_public_json_with_ref(self):
+        rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="x")
+        out = rec.to_public_json(include_ref=True)
+        assert out["ref"]["namespace"] == "ns"
+        assert out["ref"]["scope"] == "global"
+
+    def test_to_public_json_with_policy(self):
+        rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="x")
+        out = rec.to_public_json(include_policy=True)
+        assert "policy" in out
+
+    def test_to_store_json(self):
+        rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="x")
+        stored = rec.to_store_json()
+        assert stored["format_version"] == SECRET_STORE_FORMAT_VERSION
+        assert "data" in stored
+        assert "policy" in stored
+
+    def test_from_store_json_roundtrip(self):
+        ref = self._ref()
+        rec = SecretRecord(ref=ref, metadata=self._meta(), data="pw")
+        rec2 = SecretRecord.from_store_json(ref, rec.to_store_json())
+        assert rec2.data == "pw"
+        assert rec2.metadata.version == 1
+
+    def test_from_store_json_wrong_version(self):
+        ref = self._ref()
+        payload = {
+            "format_version": 99,
+            "metadata": {"version": 1, "created": "x", "updated": "y"},
+            "policy": {"user_made": True, "editable": True},
+            "data": "x",
+        }
+        with pytest.raises(CephSecretDataError, match="unsupported"):
+            SecretRecord.from_store_json(ref, payload)
+
+    def test_from_store_json_missing_key(self):
+        ref = self._ref()
+        with pytest.raises(CephSecretDataError, match="missing required"):
+            SecretRecord.from_store_json(ref, {"format_version": 1, "metadata": {}})
+
+    def test_bad_ref_type_raises(self):
+        with pytest.raises(CephSecretDataError, match="must be a SecretRef"):
+            SecretRecord(ref="not-a-ref", metadata=self._meta(), data="x")  # type: ignore[arg-type]
+
+    def test_bad_metadata_type_raises(self):
+        with pytest.raises(CephSecretDataError):
+            SecretRecord(ref=self._ref(), metadata={"version": 1}, data="x")  # type: ignore[arg-type]
+
+    def test_bad_data_type_raises(self):
+        with pytest.raises(CephSecretDataError):
+            SecretRecord(ref=self._ref(), metadata=self._meta(), data={"not": "a-string"})  # type: ignore[arg-type]
+
+    def test_empty_data_raises(self):
+        with pytest.raises(CephSecretDataError, match="must not be empty"):
+            SecretRecord(ref=self._ref(), metadata=self._meta(), data="")
+
+    def test_data_in_store_json(self):
+        rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="myvalue")
+        stored = rec.to_store_json()
+        assert stored["data"] == "myvalue"
+
+    def test_ident(self):
+        ref = self._ref()
+        rec = SecretRecord(ref=ref, metadata=self._meta(), data="x")
+        assert rec.ident() == ("ns", "global", "", "key")
+
+
+# ============================================================
+# SecretStoreMon – epoch
+# ============================================================
+
+class TestSecretStoreMon_Epoch:
+    def test_initial_epoch_is_zero(self, store):
+        assert store.get_epoch("cephadm") == 0
+
+    def test_bump_epoch(self, store):
+        assert store.bump_epoch("cephadm") == 1
+        assert store.bump_epoch("cephadm") == 2
+
+    def test_namespace_isolation(self, store):
+        store.bump_epoch("cephadm")
+        store.bump_epoch("cephadm")
+        assert store.get_epoch("rook") == 0
+
+    def test_epoch_key_stored_correctly(self, mgr, store):
+        store.bump_epoch("myns")
+        assert mgr.get_store("secret_store/meta/myns/_epoch") == "1"
+
+    def test_corrupted_epoch_returns_zero(self, mgr, store):
+        mgr.set_store("secret_store/meta/ns/_epoch", "notanumber")
+        assert store.get_epoch("ns") == 0
+
+
+# ============================================================
+# SecretStoreMon – CRUD
+# ============================================================
+
+class TestSecretStoreMon_Crud:
+    def test_set_and_get_global(self, store):
+        store.set("ns", SecretScope.GLOBAL, "", "pw", "secret")
+        rec = store.get("ns", SecretScope.GLOBAL, "", "pw")
+        assert rec is not None
+        assert rec.data == "secret"
+
+    def test_get_missing_returns_none(self, store):
+        assert store.get("ns", SecretScope.GLOBAL, "", "noexist") is None
+
+    def test_set_increments_version(self, store):
+        store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1")
+        store.set("ns", SecretScope.GLOBAL, "", "k", "data-v2")
+        rec = store.get("ns", SecretScope.GLOBAL, "", "k")
+        assert rec.metadata.version == 2
+
+    def test_set_preserves_created_timestamp(self, store):
+        import time
+        store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1")
+        rec1 = store.get("ns", SecretScope.GLOBAL, "", "k")
+        time.sleep(1)
+        store.set("ns", SecretScope.GLOBAL, "", "k", "data-v2")
+        rec2 = store.get("ns", SecretScope.GLOBAL, "", "k")
+        assert rec1.metadata.created == rec2.metadata.created
+        assert rec2.metadata.updated != rec2.metadata.created
+
+    def test_set_service_scope(self, store):
+        store.set("ns", SecretScope.SERVICE, "prometheus", "auth", "admin")
+        rec = store.get("ns", SecretScope.SERVICE, "prometheus", "auth")
+        assert rec.data == "admin"
+
+    def test_set_host_scope(self, store):
+        store.set("ns", SecretScope.HOST, "node1", "ssh", "abc")
+        rec = store.get("ns", SecretScope.HOST, "node1", "ssh")
+        assert rec.data == "abc"
+
+    def test_set_custom_scope(self, store):
+        store.set("ns", SecretScope.CUSTOM, "", "a/b/c", "tok")
+        rec = store.get("ns", SecretScope.CUSTOM, "", "a/b/c")
+        assert rec.data == "tok"
+
+    def test_set_bumps_epoch(self, store):
+        store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1")
+        assert store.get_epoch("ns") == 1
+
+    def test_set_non_str_data_raises(self, store):
+        with pytest.raises(CephSecretException):
+            store.set("ns", SecretScope.GLOBAL, "", "k", {"not": "a-string"})  # type: ignore[arg-type]
+
+    def test_set_empty_data_raises(self, store):
+        with pytest.raises(CephSecretException, match="must not be empty"):
+            store.set("ns", SecretScope.GLOBAL, "", "k", "")
+
+    def test_set_non_editable_raises(self, store):
+        store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1", editable=False)
+        with pytest.raises(CephSecretException, match="not editable"):
+            store.set("ns", SecretScope.GLOBAL, "", "k", "data-v2")
+
+    def test_rm_existing(self, store):
+        store.set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+        assert store.rm("ns", SecretScope.GLOBAL, "", "k") is True
+        assert store.get("ns", SecretScope.GLOBAL, "", "k") is None
+
+    def test_rm_nonexistent(self, store):
+        assert store.rm("ns", SecretScope.GLOBAL, "", "ghost") is False
+
+    def test_rm_bumps_epoch(self, store):
+        store.set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+        epoch_after_set = store.get_epoch("ns")
+        store.rm("ns", SecretScope.GLOBAL, "", "k")
+        assert store.get_epoch("ns") == epoch_after_set + 1
+
+    def test_rm_nonexistent_no_epoch_bump(self, store):
+        assert store.get_epoch("ns") == 0
+        store.rm("ns", SecretScope.GLOBAL, "", "ghost")
+        assert store.get_epoch("ns") == 0
+
+    def test_get_corrupted_json_raises(self, mgr, store):
+        mgr.set_store(
+            f"{SECRET_STORE_PREFIX}ns/global/badkey", "NOT JSON"
+        )
+        with pytest.raises(CephSecretDataError):
+            store.get("ns", SecretScope.GLOBAL, "", "badkey")
+
+    def test_set_with_bad_scope_string(self, store):
+        with pytest.raises(CephSecretDataError):
+            store.set("ns", "badscope", "", "k", "data-v1")
+
+
+# ============================================================
+# SecretStoreMon – ls
+# ============================================================
+
+class TestSecretStoreMon_Ls:
+    def test_ls_empty(self, store):
+        assert store.ls(namespace="ns") == []
+
+    def test_ls_returns_all(self, store):
+        store.set("ns", SecretScope.GLOBAL, "", "k1", "data-1")
+        store.set("ns", SecretScope.GLOBAL, "", "k2", "data-2")
+        recs = store.ls(namespace="ns")
+        assert len(recs) == 2
+
+    def test_ls_filter_scope(self, store):
+        store.set("ns", SecretScope.GLOBAL, "", "g", "data-g")
+        store.set("ns", SecretScope.SERVICE, "prom", "auth", "data-auth")
+        recs = store.ls(namespace="ns", scope=SecretScope.GLOBAL)
+        assert len(recs) == 1
+        assert recs[0].scope == SecretScope.GLOBAL
+
+    def test_ls_filter_target(self, store):
+        store.set("ns", SecretScope.SERVICE, "prom", "a1", "data-a1")
+        store.set("ns", SecretScope.SERVICE, "grafana", "a2", "data-a2")
+        recs = store.ls(namespace="ns", scope=SecretScope.SERVICE, target="prom")
+        assert len(recs) == 1
+        assert recs[0].target == "prom"
+
+    def test_ls_namespace_isolation(self, store):
+        store.set("ns1", SecretScope.GLOBAL, "", "k", "data-ns1")
+        store.set("ns2", SecretScope.GLOBAL, "", "k", "data-ns2")
+        recs = store.ls(namespace="ns1")
+        assert len(recs) == 1
+
+    def test_ls_custom_scope(self, store):
+        store.set("ns", SecretScope.CUSTOM, "", "a/b/c", "data-abc")
+        store.set("ns", SecretScope.CUSTOM, "", "x/y", "data-xy")
+        recs = store.ls(namespace="ns", scope=SecretScope.CUSTOM)
+        assert len(recs) == 2
+
+    def test_ls_sorted(self, store):
+        store.set("ns", SecretScope.GLOBAL, "", "zz", "data-zz")
+        store.set("ns", SecretScope.GLOBAL, "", "aa", "data-aa")
+        recs = store.ls(namespace="ns")
+        names = [r.name for r in recs]
+        assert names == sorted(names)
+
+    def test_ls_invalid_namespace(self, store):
+        with pytest.raises(CephSecretDataError):
+            store.ls(namespace="bad ns!")
+
+    def test_ls_scope_as_string(self, store):
+        store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1")
+        recs = store.ls(namespace="ns", scope="global")
+        assert len(recs) == 1
+
+    def test_ls_corrupted_json_raises(self, mgr, store):
+        mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global/bad", "NOT-JSON")
+        with pytest.raises(CephSecretDataError):
+            store.ls(namespace="ns")
+
+    def test_ls_no_filter(self, store):
+        store.set("ns1", SecretScope.GLOBAL, "", "a", "data-a")
+        store.set("ns2", SecretScope.GLOBAL, "", "b", "data-b")
+        recs = store.ls()
+        assert len(recs) == 2
+
+    def test_ls_kv_key_structure_validation(self, mgr, store):
+        """A key with empty segments inside the prefix should raise CephSecretDataError."""
+        mgr.set_store(f"{SECRET_STORE_PREFIX}ns//global/k", "{}")
+        with pytest.raises(CephSecretDataError, match="empty path component"):
+            store.ls(namespace="ns")
+
+
+# ============================================================
+# TestSecretStoreMon_Ls — malformed persisted-record cases
+# ============================================================
+
+class TestSecretStoreMon_Ls_Malformed:
+    def test_ls_too_few_segments(self, mgr, store):
+        mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global", "{}")
+        with pytest.raises(CephSecretDataError, match="unexpected key structure"):
+            store.ls(namespace="ns")
+
+    def test_ls_invalid_scope_in_key(self, mgr, store):
+        mgr.set_store(f"{SECRET_STORE_PREFIX}ns/badscope/key", "{}")
+        with pytest.raises(CephSecretDataError, match="invalid scope"):
+            store.ls(namespace="ns")
+
+    def test_ls_global_with_extra_segment(self, mgr, store):
+        mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global/target/name", "{}")
+        with pytest.raises(CephSecretDataError, match="unexpected global key structure"):
+            store.ls(namespace="ns")
+
+    def test_ls_service_with_missing_segment(self, mgr, store):
+        # service needs 4 parts (ns/service/target/name); 3 is too few
+        mgr.set_store(f"{SECRET_STORE_PREFIX}ns/service/onlytarget", "{}")
+        with pytest.raises(CephSecretDataError, match="unexpected targeted-scope key structure"):
+            store.ls(namespace="ns")
+
+    def test_ls_payload_not_object(self, mgr, store):
+        mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global/k", '["not", "object"]')
+        with pytest.raises(CephSecretDataError, match="not a JSON object"):
+            store.ls(namespace="ns")
+
+    def test_ls_payload_valid_json_missing_fields(self, mgr, store):
+        # Valid JSON object but missing required secret record fields
+        mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global/k", '{"unexpected": 1}')
+        with pytest.raises(CephSecretDataError):
+            store.ls(namespace="ns")
+
+
+# ============================================================
+# TestSecretStoreMon_Crud — editable=False allows rm by design
+# ============================================================
+
+class TestSecretStoreMon_EditableRm:
+    def test_rm_non_editable_secret_is_allowed(self, store):
+        """rm ignores editable — deletion is always permitted by design."""
+        store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1", editable=False)
+        result = store.rm("ns", SecretScope.GLOBAL, "", "k")
+        assert result is True
+
+    def test_set_non_editable_blocks_update(self, store):
+        """Confirmed: set refuses to update a non-editable secret."""
+        store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1", editable=False)
+        with pytest.raises(CephSecretException):
+            store.set("ns", SecretScope.GLOBAL, "", "k", "data-v2")
index 2386de710438b97d5e8f9e5806cc6e6894049cce..3240641dbd81f1f9c62bcab009595c0b3d98fe4b 100644 (file)
@@ -249,8 +249,9 @@ class CephSecretsClient:
                        rather than automatically by a Ceph component.  Defaults
                        to True; set to False for programmatically generated
                        secrets.
-            editable:  Whether the secret may be updated or removed by
-                       automated tooling.  Defaults to True.
+            editable:  Whether the secret may be updated by automated
+                       tooling.  Deletion is always permitted regardless of
+                       this flag.  Defaults to True.
 
         Returns:
             A dict containing the written record's ``metadata`` object.  The
index ae2ccd586723158265e39efa48cd3650682fa710..88cdce454224a6e4ff578906dad72149b467662b 100644 (file)
@@ -196,6 +196,11 @@ def parse_secret_uri(uri: str) -> SecretRef:
         if not isinstance(uri, str):
             raise CephSecretException('secret uri must be a string')
 
+        if uri != uri.strip():
+            raise CephSecretException(
+                f'Invalid secret uri {uri!r}: leading/trailing whitespace is not allowed'
+            )
+
         parsed = urlparse(uri)
         if parsed.scheme != SECRET_SCHEME:
             raise CephSecretException(f'Not a secret uri: {uri!r}')
@@ -289,6 +294,11 @@ def parse_secret_path(path: str) -> SecretRef:
     if not isinstance(path, str):
         raise CephSecretException('secret path must be a string')
 
+    if path != path.strip():
+        raise CephSecretException(
+            f'Invalid secret path {path!r}: leading/trailing whitespace is not allowed'
+        )
+
     p = path.strip()
     if not p:
         raise CephSecretException('Invalid secret path: empty')
diff --git a/src/pybind/mgr/tests/test_ceph_secrets_client.py b/src/pybind/mgr/tests/test_ceph_secrets_client.py
new file mode 100644 (file)
index 0000000..ab52077
--- /dev/null
@@ -0,0 +1,317 @@
+# -*- coding: utf-8 -*-
+"""
+Unit tests for ceph_secrets_client.py.
+Placed at the same level as the module under test (src/pybind/mgr/).
+"""
+from __future__ import annotations
+
+import sys, os
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+import pytest
+from typing import Any
+from unittest.mock import MagicMock, call
+
+from ceph_secrets_client import CephSecretsClient, MgrRemote
+from ceph_secrets_types import SecretScope
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+def _make_mgr(return_value: Any = None, raise_exc: Exception = None) -> MagicMock:
+    """Return a mock that satisfies the MgrRemote Protocol."""
+    mgr = MagicMock()
+    if raise_exc is not None:
+        mgr.remote.side_effect = raise_exc
+    else:
+        mgr.remote.return_value = return_value
+    return mgr
+
+
+def _client(return_value: Any = None, raise_exc: Exception = None) -> CephSecretsClient:
+    return CephSecretsClient(_make_mgr(return_value, raise_exc))
+
+
+# ============================================================
+# CephSecretsClient construction
+# ============================================================
+
+class TestConstruction:
+    def test_default_module_name(self):
+        mgr = MagicMock()
+        client = CephSecretsClient(mgr)
+        assert client.module == "ceph_secrets"
+
+    def test_custom_module_name(self):
+        mgr = MagicMock()
+        client = CephSecretsClient(mgr, module="my_secrets")
+        assert client.module == "my_secrets"
+
+    def test_stores_mgr(self):
+        mgr = MagicMock()
+        client = CephSecretsClient(mgr)
+        assert client.mgr is mgr
+
+
+# ============================================================
+# _remote error handling
+# ============================================================
+
+class TestRemoteErrorHandling:
+    def test_passes_through_return_value(self):
+        client = _client(return_value=42)
+        assert client._remote("some_method", foo="bar") == 42
+
+    def test_wraps_exception_as_runtime_error(self):
+        client = _client(raise_exc=RuntimeError("module down"))
+        with pytest.raises(RuntimeError, match="Cannot call secrets mgr-module"):
+            client._remote("any_method")
+
+    def test_exception_includes_module_name(self):
+        mgr = _make_mgr(raise_exc=Exception("boom"))
+        client = CephSecretsClient(mgr, module="my_secrets")
+        with pytest.raises(RuntimeError, match="my_secrets"):
+            client._remote("any_method")
+
+    def test_exception_is_enabled_hint(self):
+        client = _client(raise_exc=Exception("unavailable"))
+        with pytest.raises(RuntimeError, match="enabled"):
+            client._remote("any_method")
+
+
+# ============================================================
+# secret_get_epoch
+# ============================================================
+
+class TestSecretGetEpoch:
+    def test_calls_correct_method(self):
+        mgr = _make_mgr(return_value=5)
+        client = CephSecretsClient(mgr)
+        result = client.secret_get_epoch("cephadm")
+        mgr.remote.assert_called_once_with("ceph_secrets", "secret_get_epoch", namespace="cephadm")
+        assert result == 5
+
+    def test_returns_zero_epoch(self):
+        assert _client(return_value=0).secret_get_epoch("ns") == 0
+
+
+# ============================================================
+# secret_get
+# ============================================================
+
+class TestSecretGet:
+    def test_calls_remote_correctly(self):
+        mgr = _make_mgr(return_value={"metadata": {"version": 1}})
+        client = CephSecretsClient(mgr)
+        result = client.secret_get("ns", SecretScope.GLOBAL, "", "pw")
+        mgr.remote.assert_called_once_with(
+            "ceph_secrets", "secret_get",
+            namespace="ns", scope=SecretScope.GLOBAL,
+            target="", name="pw", reveal=False,
+        )
+        assert result["metadata"]["version"] == 1
+
+    def test_reveal_kwarg_forwarded(self):
+        mgr = _make_mgr(return_value={"metadata": {}, "data": "s3cr3t"})
+        client = CephSecretsClient(mgr)
+        client.secret_get("ns", SecretScope.GLOBAL, "", "pw", reveal=True)
+        _, kwargs = mgr.remote.call_args
+        assert kwargs["reveal"] is True
+
+    def test_returns_none_when_not_found(self):
+        assert _client(return_value=None).secret_get("ns", "global", "", "ghost") is None
+
+    def test_scope_as_string(self):
+        mgr = _make_mgr(return_value=None)
+        client = CephSecretsClient(mgr)
+        client.secret_get("ns", "host", "node1", "ssh")
+        _, kwargs = mgr.remote.call_args
+        assert kwargs["scope"] == "host"
+
+
+# ============================================================
+# secret_get_value
+# ============================================================
+
+class TestSecretGetValue:
+    def test_calls_remote(self):
+        mgr = _make_mgr(return_value="s3cr3t")
+        client = CephSecretsClient(mgr)
+        result = client.secret_get_value("ns", SecretScope.GLOBAL, "", "pw")
+        mgr.remote.assert_called_once_with(
+            "ceph_secrets", "secret_get_value",
+            namespace="ns", scope=SecretScope.GLOBAL, target="", name="pw",
+        )
+        assert result == "s3cr3t"
+
+    def test_returns_none_if_not_found(self):
+        assert _client(return_value=None).secret_get_value("ns", "global", "", "ghost") is None
+
+    def test_returns_opaque_string(self):
+        payload = '{"u": "a", "p": "b"}'
+        result = _client(return_value=payload).secret_get_value("ns", "global", "", "creds")
+        assert result == payload
+
+    def test_module_unreachable_raises(self):
+        with pytest.raises(RuntimeError, match="Cannot call"):
+            _client(raise_exc=Exception("down")).secret_get_value("ns", "global", "", "pw")
+
+
+# secret_get_version
+# ============================================================
+
+class TestSecretGetVersion:
+    def test_calls_remote(self):
+        mgr = _make_mgr(return_value=3)
+        client = CephSecretsClient(mgr)
+        result = client.secret_get_version("ns", SecretScope.GLOBAL, "", "k")
+        mgr.remote.assert_called_once_with(
+            "ceph_secrets", "secret_get_version",
+            namespace="ns", scope=SecretScope.GLOBAL, target="", name="k",
+        )
+        assert result == 3
+
+    def test_returns_none_if_not_found(self):
+        assert _client(return_value=None).secret_get_version("ns", "global", "", "ghost") is None
+
+
+# ============================================================
+# secret_get_versions
+# ============================================================
+
+class TestSecretGetVersions:
+    def test_calls_remote_with_list(self):
+        uris = ["secret:/ns/global/a", "secret:/ns/global/b"]
+        expected = {"secret:/ns/global/a": 1, "secret:/ns/global/b": None}
+        mgr = _make_mgr(return_value=expected)
+        client = CephSecretsClient(mgr)
+        result = client.secret_get_versions(uris)
+        mgr.remote.assert_called_once_with("ceph_secrets", "secret_get_versions", uris=uris)
+        assert result == expected
+
+    def test_returns_empty_dict_for_empty_list(self):
+        result = _client(return_value={}).secret_get_versions([])
+        assert result == {}
+
+
+# ============================================================
+# secret_set
+# ============================================================
+
+class TestSecretSet:
+    def test_calls_remote_with_defaults(self):
+        expected = {"metadata": {"version": 1}}
+        mgr = _make_mgr(return_value=expected)
+        client = CephSecretsClient(mgr)
+        result = client.secret_set("ns", SecretScope.GLOBAL, "", "pw", "x")
+        mgr.remote.assert_called_once_with(
+            "ceph_secrets", "secret_set",
+            namespace="ns", scope=SecretScope.GLOBAL,
+            target="", name="pw", data="x",
+            user_made=True, editable=True,
+        )
+        assert result == expected
+
+    def test_user_made_editable_overrides(self):
+        mgr = _make_mgr(return_value={})
+        client = CephSecretsClient(mgr)
+        client.secret_set("ns", "global", "", "k", "x", user_made=False, editable=False)
+        _, kwargs = mgr.remote.call_args
+        assert kwargs["user_made"] is False
+        assert kwargs["editable"] is False
+
+    def test_raises_on_module_error(self):
+        with pytest.raises(RuntimeError):
+            _client(raise_exc=Exception("down")).secret_set("ns", "global", "", "k", "x")
+
+
+# ============================================================
+# secret_rm
+# ============================================================
+
+class TestSecretRm:
+    def test_calls_remote(self):
+        mgr = _make_mgr(return_value=True)
+        client = CephSecretsClient(mgr)
+        result = client.secret_rm("ns", SecretScope.GLOBAL, "", "k")
+        mgr.remote.assert_called_once_with(
+            "ceph_secrets", "secret_rm",
+            namespace="ns", scope=SecretScope.GLOBAL, target="", name="k",
+        )
+        assert result is True
+
+    def test_returns_false_when_not_found(self):
+        assert _client(return_value=False).secret_rm("ns", "global", "", "ghost") is False
+
+    def test_truthy_value_coerced_to_bool(self):
+        # remote might return 1 instead of True
+        assert _client(return_value=1).secret_rm("ns", "global", "", "k") is True
+
+    def test_falsy_value_coerced_to_bool(self):
+        assert _client(return_value=0).secret_rm("ns", "global", "", "k") is False
+
+
+# ============================================================
+# scan_unresolved_refs
+# ============================================================
+
+class TestScanUnresolvedRefs:
+    def test_calls_remote(self):
+        expected = ["secret:/ns/global/missing"]
+        mgr = _make_mgr(return_value=expected)
+        client = CephSecretsClient(mgr)
+        result = client.scan_unresolved_refs({"k": "secret:/ns/global/missing"}, "ns")
+        mgr.remote.assert_called_once_with(
+            "ceph_secrets", "scan_unresolved_refs",
+            obj={"k": "secret:/ns/global/missing"}, namespace="ns",
+        )
+        assert result == expected
+
+    def test_returns_empty_when_all_resolved(self):
+        assert _client(return_value=[]).scan_unresolved_refs({}, "ns") == []
+
+
+# ============================================================
+# scan_refs
+# ============================================================
+
+class TestScanRefs:
+    def test_calls_remote(self):
+        expected = ["secret:/ns/global/pw"]
+        mgr = _make_mgr(return_value=expected)
+        client = CephSecretsClient(mgr)
+        result = client.scan_refs("secret:/ns/global/pw", "ns")
+        mgr.remote.assert_called_once_with(
+            "ceph_secrets", "scan_refs",
+            obj="secret:/ns/global/pw", namespace="ns",
+        )
+        assert result == expected
+
+    def test_returns_empty_for_no_refs(self):
+        assert _client(return_value=[]).scan_refs("no refs here", "ns") == []
+
+
+# ============================================================
+# resolve_object
+# ============================================================
+
+class TestResolveObject:
+    def test_calls_remote(self):
+        mgr = _make_mgr(return_value={"password": "s3cr3t"})
+        client = CephSecretsClient(mgr)
+        result = client.resolve_object({"password": "secret:/ns/global/pw"})
+        mgr.remote.assert_called_once_with(
+            "ceph_secrets", "resolve_object",
+            obj={"password": "secret:/ns/global/pw"},
+        )
+        assert result["password"] == "s3cr3t"
+
+    def test_raises_on_unresolvable(self):
+        with pytest.raises(RuntimeError):
+            _client(raise_exc=Exception("missing")).resolve_object("secret:/ns/global/ghost")
+
+    def test_returns_plain_value_unchanged(self):
+        result = _client(return_value="plain").resolve_object("plain")
+        assert result == "plain"
diff --git a/src/pybind/mgr/tests/test_ceph_secrets_types.py b/src/pybind/mgr/tests/test_ceph_secrets_types.py
new file mode 100644 (file)
index 0000000..062af9b
--- /dev/null
@@ -0,0 +1,487 @@
+# -*- coding: utf-8 -*-
+"""
+Unit tests for ceph_secrets_types.py.
+Placed at the same level as the module under test (src/pybind/mgr/).
+"""
+from __future__ import annotations
+
+import sys, os
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+import pytest
+
+from ceph_secrets_types import (
+    CephSecretException,
+    CephSecretDataError,
+    CephSecretNotFoundError,
+    SecretScope,
+    SecretRef,
+    BadSecretURI,
+    parse_secret_uri,
+    parse_secret_path,
+    validate_secret_namespace,
+    SECRET_SCHEME,
+    _validate_segment,
+    _validate_custom_path,
+    _quote_segment,
+    _quote_custom_path,
+)
+
+
+# ============================================================
+# Module constants
+# ============================================================
+
+class TestConstants:
+    def test_secret_scheme(self):
+        assert SECRET_SCHEME == "secret"
+
+
+# ============================================================
+# Exception hierarchy
+# ============================================================
+
+class TestExceptions:
+    def test_base_is_exception(self):
+        assert issubclass(CephSecretException, Exception)
+
+    def test_data_error_inherits_base(self):
+        assert issubclass(CephSecretDataError, CephSecretException)
+
+    def test_not_found_inherits_base(self):
+        assert issubclass(CephSecretNotFoundError, CephSecretException)
+
+    def test_can_raise_and_catch_base(self):
+        with pytest.raises(CephSecretException):
+            raise CephSecretException("test")
+
+    def test_can_raise_and_catch_data_error_as_base(self):
+        with pytest.raises(CephSecretException):
+            raise CephSecretDataError("data")
+
+    def test_can_raise_and_catch_not_found_as_base(self):
+        with pytest.raises(CephSecretException):
+            raise CephSecretNotFoundError("gone")
+
+
+# ============================================================
+# _validate_segment
+# ============================================================
+
+class TestValidateSegment:
+    @pytest.mark.parametrize("v", [
+        "simple", "with-dashes", "under_score", "dot.dot", "a1b2",
+        "UPPER", "Mixed123",
+    ])
+    def test_valid(self, v):
+        _validate_segment("f", v)
+
+    @pytest.mark.parametrize("bad", [
+        "", " ", "has space", "a/b", "a:b", "star*",
+    ])
+    def test_invalid(self, bad):
+        with pytest.raises(ValueError):
+            _validate_segment("f", bad)
+
+    def test_ends_with_dot_invalid(self):
+        with pytest.raises(ValueError, match="must not end"):
+            _validate_segment("f", "end.")
+
+    def test_non_string_raises(self):
+        with pytest.raises(ValueError, match="must be a string"):
+            _validate_segment("f", None)
+
+
+# ============================================================
+# _validate_custom_path
+# ============================================================
+
+class TestValidateCustomPath:
+    @pytest.mark.parametrize("p", [
+        "single", "a/b", "a/b/c", "deep/path/here",
+    ])
+    def test_valid(self, p):
+        _validate_custom_path(p)
+
+    def test_empty_string(self):
+        with pytest.raises(ValueError, match="must not be empty"):
+            _validate_custom_path("")
+
+    def test_empty_segment_in_middle(self):
+        with pytest.raises(ValueError, match="empty segments"):
+            _validate_custom_path("a//b")
+
+    def test_trailing_slash(self):
+        with pytest.raises(ValueError, match="empty segments"):
+            _validate_custom_path("a/b/")
+
+    def test_leading_slash_empty_first_segment(self):
+        with pytest.raises(ValueError, match="empty segments"):
+            _validate_custom_path("/a/b")
+
+    def test_segment_with_bad_chars(self):
+        with pytest.raises(ValueError):
+            _validate_custom_path("a/b c/d")
+
+
+# ============================================================
+# _quote helpers
+# ============================================================
+
+class TestQuoteHelpers:
+    def test_quote_segment_no_slash(self):
+        # clean identifiers are unchanged
+        assert _quote_segment("cephadm") == "cephadm"
+
+    def test_quote_custom_path_preserves_slash(self):
+        assert _quote_custom_path("a/b/c") == "a/b/c"
+
+
+# ============================================================
+# validate_secret_namespace
+# ============================================================
+
+class TestValidateSecretNamespace:
+    def test_valid_namespace(self):
+        validate_secret_namespace("cephadm")
+
+    def test_empty_namespace(self):
+        with pytest.raises(ValueError):
+            validate_secret_namespace("")
+
+    def test_namespace_with_space(self):
+        with pytest.raises(ValueError):
+            validate_secret_namespace("bad name")
+
+
+# ============================================================
+# SecretScope
+# ============================================================
+
+class TestSecretScope:
+    def test_all_values(self):
+        assert SecretScope.GLOBAL.value == "global"
+        assert SecretScope.SERVICE.value == "service"
+        assert SecretScope.HOST.value == "host"
+        assert SecretScope.CUSTOM.value == "custom"
+
+    def test_from_str_valid(self):
+        for v in ("global", "service", "host", "custom"):
+            scope = SecretScope.from_str(v)
+            assert scope.value == v
+
+    def test_from_str_invalid(self):
+        with pytest.raises(CephSecretException, match="Invalid secret scope"):
+            SecretScope.from_str("vault")
+
+    def test_is_str_subclass(self):
+        assert isinstance(SecretScope.GLOBAL, str)
+
+    # validate_fields – global
+    def test_global_valid(self):
+        SecretScope.GLOBAL.validate_fields("", "mykey")
+
+    def test_global_nonempty_target_fails(self):
+        with pytest.raises(ValueError, match="target must be empty"):
+            SecretScope.GLOBAL.validate_fields("target", "key")
+
+    def test_global_empty_name_fails(self):
+        with pytest.raises(ValueError):
+            SecretScope.GLOBAL.validate_fields("", "")
+
+    # validate_fields – service / host
+    def test_service_valid(self):
+        SecretScope.SERVICE.validate_fields("prometheus", "auth")
+
+    def test_service_empty_target_fails(self):
+        with pytest.raises(ValueError):
+            SecretScope.SERVICE.validate_fields("", "auth")
+
+    def test_host_valid(self):
+        SecretScope.HOST.validate_fields("node1", "ssh_key")
+
+    def test_host_empty_name_fails(self):
+        with pytest.raises(ValueError):
+            SecretScope.HOST.validate_fields("node1", "")
+
+    # validate_fields – custom
+    def test_custom_valid_flat(self):
+        SecretScope.CUSTOM.validate_fields("", "some-key")
+
+    def test_custom_valid_nested(self):
+        SecretScope.CUSTOM.validate_fields("", "a/b/c")
+
+    def test_custom_nonempty_target_fails(self):
+        with pytest.raises(ValueError, match="target must be empty"):
+            SecretScope.CUSTOM.validate_fields("badtarget", "key")
+
+    def test_custom_empty_name_fails(self):
+        with pytest.raises(ValueError):
+            SecretScope.CUSTOM.validate_fields("", "")
+
+
+# ============================================================
+# SecretRef
+# ============================================================
+
+class TestSecretRef:
+    def test_global_ref_construction(self):
+        ref = SecretRef("ns", SecretScope.GLOBAL, "", "pw")
+        assert ref.namespace == "ns"
+        assert ref.scope == SecretScope.GLOBAL
+        assert ref.target == ""
+        assert ref.name == "pw"
+
+    def test_service_ref(self):
+        ref = SecretRef("ns", SecretScope.SERVICE, "prom", "auth")
+        assert ref.target == "prom"
+
+    def test_host_ref(self):
+        ref = SecretRef("ns", SecretScope.HOST, "n1", "k")
+        assert ref.name == "k"
+
+    def test_custom_ref(self):
+        ref = SecretRef("ns", SecretScope.CUSTOM, "", "a/b/c")
+        assert ref.name == "a/b/c"
+
+    def test_is_frozen(self):
+        ref = SecretRef("ns", SecretScope.GLOBAL, "", "k")
+        with pytest.raises(Exception):
+            ref.name = "other"  # type: ignore[misc]
+
+    def test_scope_coerced_from_string(self):
+        ref = SecretRef("ns", "host", "node1", "k")
+        assert ref.scope == SecretScope.HOST
+
+    def test_invalid_scope_raises(self):
+        with pytest.raises(ValueError):
+            SecretRef("ns", "badscope", "", "k")
+
+    def test_invalid_namespace_raises(self):
+        with pytest.raises(ValueError):
+            SecretRef("bad ns", SecretScope.GLOBAL, "", "k")
+
+    def test_ident_global(self):
+        ref = SecretRef("ns", SecretScope.GLOBAL, "", "k")
+        assert ref.ident() == ("ns", "global", "", "k")
+
+    def test_ident_service(self):
+        ref = SecretRef("ns", SecretScope.SERVICE, "t", "n")
+        assert ref.ident() == ("ns", "service", "t", "n")
+
+    def test_to_uri_global(self):
+        ref = SecretRef("cephadm", SecretScope.GLOBAL, "", "pw")
+        assert ref.to_uri() == "secret:/cephadm/global/pw"
+
+    def test_to_uri_service(self):
+        ref = SecretRef("ns", SecretScope.SERVICE, "prom", "auth")
+        assert ref.to_uri() == "secret:/ns/service/prom/auth"
+
+    def test_to_uri_host(self):
+        ref = SecretRef("ns", SecretScope.HOST, "node1", "ssh")
+        assert ref.to_uri() == "secret:/ns/host/node1/ssh"
+
+    def test_to_uri_custom(self):
+        ref = SecretRef("ns", SecretScope.CUSTOM, "", "a/b/c")
+        assert ref.to_uri() == "secret:/ns/custom/a/b/c"
+
+    def test_equality(self):
+        r1 = SecretRef("ns", SecretScope.GLOBAL, "", "k")
+        r2 = SecretRef("ns", SecretScope.GLOBAL, "", "k")
+        assert r1 == r2
+
+    def test_inequality_different_name(self):
+        r1 = SecretRef("ns", SecretScope.GLOBAL, "", "k1")
+        r2 = SecretRef("ns", SecretScope.GLOBAL, "", "k2")
+        assert r1 != r2
+
+
+# ============================================================
+# BadSecretURI
+# ============================================================
+
+class TestBadSecretURI:
+    def test_construction(self):
+        b = BadSecretURI(raw="secret:/bad/scope/key", error="bad scope", namespace="ns")
+        assert b.raw == "secret:/bad/scope/key"
+        assert b.error == "bad scope"
+
+    def test_to_uri_returns_raw(self):
+        b = BadSecretURI(raw="secret:/bad", error="err", namespace="ns")
+        assert b.to_uri() == "secret:/bad"
+
+    def test_is_frozen(self):
+        b = BadSecretURI(raw="x", error="e", namespace="n")
+        with pytest.raises(Exception):
+            b.raw = "y"  # type: ignore[misc]
+
+
+# ============================================================
+# parse_secret_uri
+# ============================================================
+
+class TestParseSecretUri:
+    # --- valid inputs ---
+    def test_global(self):
+        ref = parse_secret_uri("secret:/ns/global/pw")
+        assert ref.scope == SecretScope.GLOBAL
+        assert ref.name == "pw"
+
+    def test_service(self):
+        ref = parse_secret_uri("secret:/ns/service/prom/auth")
+        assert ref.scope == SecretScope.SERVICE
+        assert ref.target == "prom"
+        assert ref.name == "auth"
+
+    def test_host(self):
+        ref = parse_secret_uri("secret:/ns/host/node1/ssh")
+        assert ref.target == "node1"
+        assert ref.name == "ssh"
+
+    def test_custom_flat(self):
+        ref = parse_secret_uri("secret:/ns/custom/single")
+        assert ref.scope == SecretScope.CUSTOM
+        assert ref.name == "single"
+
+    def test_custom_nested(self):
+        ref = parse_secret_uri("secret:/ns/custom/a/b/c")
+        assert ref.name == "a/b/c"
+
+    # --- invalid inputs ---
+    def test_wrong_scheme(self):
+        with pytest.raises(CephSecretException, match="Not a secret uri"):
+            parse_secret_uri("http://example.com/foo")
+
+    def test_authority_rejected(self):
+        with pytest.raises(CephSecretException, match="authority"):
+            parse_secret_uri("secret://ns/global/key")
+
+    def test_query_string_rejected(self):
+        with pytest.raises(CephSecretException, match="query"):
+            parse_secret_uri("secret:/ns/global/key?x=y")
+
+    def test_fragment_rejected(self):
+        with pytest.raises(CephSecretException, match="query"):
+            parse_secret_uri("secret:/ns/global/key#frag")
+
+    def test_percent_encoding_rejected(self):
+        with pytest.raises(CephSecretException, match="percent-encoding"):
+            parse_secret_uri("secret:/ns/global/my%2Dkey")
+
+    def test_too_short(self):
+        with pytest.raises(CephSecretException):
+            parse_secret_uri("secret:/ns/global")
+
+    def test_bad_scope(self):
+        with pytest.raises(CephSecretException):
+            parse_secret_uri("secret:/ns/vault/key")
+
+    def test_non_string_input(self):
+        with pytest.raises(CephSecretException, match="must be a string"):
+            parse_secret_uri(42)  # type: ignore[arg-type]
+
+    def test_global_with_extra_segment_rejected(self):
+        with pytest.raises(CephSecretException):
+            parse_secret_uri("secret:/ns/global/target/name")
+
+    def test_service_missing_name(self):
+        with pytest.raises(CephSecretException):
+            parse_secret_uri("secret:/ns/service/prom")
+
+
+# ============================================================
+# parse_secret_path
+# ============================================================
+
+class TestParseSecretPath:
+    # --- valid inputs ---
+    def test_global(self):
+        ref = parse_secret_path("cephadm/global/pw")
+        assert ref.namespace == "cephadm"
+        assert ref.scope == SecretScope.GLOBAL
+        assert ref.name == "pw"
+
+    def test_service(self):
+        ref = parse_secret_path("ns/service/prom/auth")
+        assert ref.target == "prom"
+        assert ref.name == "auth"
+
+    def test_host(self):
+        ref = parse_secret_path("ns/host/node1/ssh_key")
+        assert ref.name == "ssh_key"
+
+    def test_custom_flat(self):
+        ref = parse_secret_path("ns/custom/flat")
+        assert ref.scope == SecretScope.CUSTOM
+        assert ref.name == "flat"
+
+    def test_custom_nested(self):
+        ref = parse_secret_path("ns/custom/a/b/c")
+        assert ref.name == "a/b/c"
+
+    def test_leading_slash_stripped(self):
+        ref = parse_secret_path("/ns/global/k")
+        assert ref.namespace == "ns"
+
+    # --- invalid inputs ---
+    def test_empty_string(self):
+        with pytest.raises(CephSecretException, match="empty"):
+            parse_secret_path("")
+
+    def test_non_string(self):
+        with pytest.raises(CephSecretException):
+            parse_secret_path(None)  # type: ignore[arg-type]
+
+    def test_too_few_segments(self):
+        with pytest.raises(CephSecretException, match="Use"):
+            parse_secret_path("ns/global")
+
+    def test_double_slash(self):
+        with pytest.raises(CephSecretException):
+            parse_secret_path("ns//global/key")
+
+    def test_trailing_slash(self):
+        with pytest.raises(CephSecretException, match="empty segment"):
+            parse_secret_path("ns/global/key/")
+
+    def test_bad_scope(self):
+        with pytest.raises(CephSecretException):
+            parse_secret_path("ns/badscope/key")
+
+    def test_global_extra_segment(self):
+        with pytest.raises(CephSecretException, match="global scope"):
+            parse_secret_path("ns/global/target/name")
+
+    def test_service_missing_name(self):
+        with pytest.raises(CephSecretException, match="service"):
+            parse_secret_path("ns/service/target")
+
+    def test_host_missing_name(self):
+        with pytest.raises(CephSecretException, match="host"):
+            parse_secret_path("ns/host/target")
+
+
+# ============================================================
+# Whitespace / canonicality
+# ============================================================
+
+class TestParseSecretPathWhitespace:
+    @pytest.mark.parametrize("path", [
+        " ns/global/key",
+        "ns/global/key ",
+        "\tns/global/key",
+        "ns/global/key\n",
+    ])
+    def test_rejects_outer_whitespace(self, path):
+        with pytest.raises(CephSecretException):
+            parse_secret_path(path)
+
+
+class TestParseSecretUriWhitespace:
+    def test_rejects_leading_whitespace(self):
+        with pytest.raises(CephSecretException):
+            parse_secret_uri(" secret:/ns/global/key")
+
+    def test_rejects_trailing_whitespace(self):
+        with pytest.raises(CephSecretException):
+            parse_secret_uri("secret:/ns/global/key ")