-# flake8: noqa
+import os
+
+if 'UNITTEST' in os.environ:
+ import tests # noqa: F401
+
from .module import Module
+
+__all__ = ['Module']
--- /dev/null
+# -*- coding: utf-8 -*-
+import pytest
+from mgr_module import MgrModule
+
+
+@pytest.fixture
+def mgr() -> MgrModule:
+ return MgrModule.__new__(MgrModule)
+
+
+@pytest.fixture
+def store(mgr: MgrModule):
+ from ceph_secrets.secret_store import SecretStoreMon
+ return SecretStoreMon(mgr)
+
+
+@pytest.fixture
+def secret_mgr(store):
+ from ceph_secrets.secret_mgr import SecretMgr
+ return SecretMgr(store)
--- /dev/null
+# -*- coding: utf-8 -*-
+"""Tests for ceph_secrets.backends (registry of storage backends)."""
+from __future__ import annotations
+
+from ceph_secrets.backends import BACKENDS
+from ceph_secrets.secret_store import SecretStoreMon
+
+
+class TestBackendsRegistry:
+ def test_mon_backend_registered(self):
+ assert "mon" in BACKENDS
+
+ def test_mon_maps_to_correct_class(self):
+ assert BACKENDS["mon"] is SecretStoreMon
+
+ def test_only_known_backends(self):
+ # Update this test when new backends are added.
+ assert set(BACKENDS.keys()) == {"mon"}
+
+ def test_backend_is_instantiable(self, mgr):
+ cls = BACKENDS["mon"]
+ instance = cls(mgr)
+ assert isinstance(instance, SecretStoreMon)
--- /dev/null
+# -*- coding: utf-8 -*-
+"""Tests for ceph_secrets.module.Module — RPC surface and internal methods."""
+from __future__ import annotations
+
+import json
+import pytest
+from typing import Any
+from unittest.mock import MagicMock, patch
+
+from ceph_secrets.module import Module
+from ceph_secrets.secret_store import SecretStoreMon
+from ceph_secrets.secret_mgr import SecretMgr
+from ceph_secrets_types import (
+ SecretScope,
+ SecretRef,
+ CephSecretException,
+ CephSecretDataError,
+)
+
+
+# ---------------------------------------------------------------------------
+# Helper: build a Module without going through MgrModule.__init__
+# ---------------------------------------------------------------------------
+
+def _make_module(mgr_stub) -> Module:
+ """Bypass MgrModule.__init__ and wire up manually."""
+ mod = object.__new__(Module)
+ mod.secret_mgr = SecretMgr(SecretStoreMon(mgr_stub))
+ return mod
+
+
+# ---------------------------------------------------------------------------
+# ---------------------------------------------------------------------------
+# Module.__init__ — backend selection
+# ---------------------------------------------------------------------------
+
+class TestModuleInit:
+ def test_default_backend_initialises_secret_mgr(self, mgr):
+ """Module.__init__ with default 'mon' backend wires up SecretMgr."""
+ from mgr_module import MgrModule
+ mod = object.__new__(Module)
+ with patch.object(MgrModule, "__init__", lambda self, *a, **kw: None):
+ with patch.object(MgrModule, "get_module_option", return_value="mon"):
+ Module.__init__(mod, mgr)
+ assert isinstance(mod.secret_mgr, SecretMgr)
+
+ def test_unsupported_backend_raises(self, mgr):
+ """Unknown backend name raises RuntimeError."""
+ from mgr_module import MgrModule
+ with patch.object(MgrModule, "__init__", lambda self, *a, **kw: None):
+ with patch("ceph_secrets.module.BACKENDS", {}):
+ with patch.object(MgrModule, "get_module_option", return_value="vault"):
+ with pytest.raises(RuntimeError, match="Unsupported secrets backend"):
+ Module.__init__(object.__new__(Module), mgr)
+
+ def test_backend_init_failure_raises(self, mgr):
+ """Backend constructor failure raises RuntimeError with helpful message."""
+ from mgr_module import MgrModule
+ broken_cls = MagicMock(side_effect=Exception("connection refused"))
+ with patch.object(MgrModule, "__init__", lambda self, *a, **kw: None):
+ with patch("ceph_secrets.module.BACKENDS", {"mon": broken_cls}):
+ with patch.object(MgrModule, "get_module_option", return_value="mon"):
+ with pytest.raises(RuntimeError, match="Failed to initialize secrets backend"):
+ Module.__init__(object.__new__(Module), mgr)
+
+
+# ---------------------------------------------------------------------------
+# Module RPC surface
+# ---------------------------------------------------------------------------
+
+class TestModuleRPC:
+ @pytest.fixture
+ def mod(self, mgr):
+ return _make_module(mgr)
+
+ def test_secret_get_epoch_zero(self, mod):
+ assert mod.secret_get_epoch("cephadm") == 0
+
+ def test_secret_get_epoch_after_set(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+ assert mod.secret_get_epoch("ns") == 1
+
+ def test_secret_set_and_get(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+ result = mod.secret_get("ns", SecretScope.GLOBAL, "", "pw", reveal=True)
+ assert result is not None
+ assert result["data"] == "s3cr3t"
+
+ def test_secret_get_without_reveal_hides_data(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+ result = mod.secret_get("ns", SecretScope.GLOBAL, "", "pw", reveal=False)
+ assert result is not None
+ assert "data" not in result
+
+ def test_secret_get_missing_returns_none(self, mod):
+ assert mod.secret_get("ns", SecretScope.GLOBAL, "", "ghost") is None
+
+ def test_secret_get_version_existing(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+ assert mod.secret_get_version("ns", SecretScope.GLOBAL, "", "k") == 1
+
+ def test_secret_get_version_missing_returns_none(self, mod):
+ assert mod.secret_get_version("ns", SecretScope.GLOBAL, "", "ghost") is None
+
+ def test_secret_get_value_existing(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+ assert mod.secret_get_value("ns", SecretScope.GLOBAL, "", "pw") == "s3cr3t"
+
+ def test_secret_get_value_missing_returns_none(self, mod):
+ assert mod.secret_get_value("ns", SecretScope.GLOBAL, "", "ghost") is None
+
+ def test_secret_get_value_opaque_string(self, mod):
+ # Any string is valid — including one that looks like JSON
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "creds", '{"u": "a", "p": "b"}')
+ assert mod.secret_get_value("ns", SecretScope.GLOBAL, "", "creds") == '{"u": "a", "p": "b"}'
+
+ def test_secret_get_value_corrupt_raises_data_error(self, mgr, mod):
+ mgr.set_store("secret_store/v1/ns/global/bad", "{not json")
+ with pytest.raises(CephSecretDataError):
+ mod.secret_get_value("ns", SecretScope.GLOBAL, "", "bad")
+
+ def test_secret_get_versions_batch(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "a", "data-a")
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "b", "data-b")
+ result = mod.secret_get_versions([
+ "secret:/ns/global/a",
+ "secret:/ns/global/b",
+ "secret:/ns/global/ghost",
+ ])
+ assert result["secret:/ns/global/a"] == 1
+ assert result["secret:/ns/global/b"] == 1
+ assert result["secret:/ns/global/ghost"] is None
+
+ def test_secret_get_versions_skips_invalid_uris(self, mod):
+ result = mod.secret_get_versions(["not-a-uri", "secret:/ns/badscope/key"])
+ assert len(result) == 0
+
+ def test_secret_get_versions_empty_list(self, mod):
+ assert mod.secret_get_versions([]) == {}
+
+ def test_secret_get_versions_corrupt_raises_data_error(self, mgr, mod):
+ """Corruption must not be silently swallowed as a missing/invalid URI."""
+ mgr.set_store("secret_store/v1/ns/global/bad", "{not json")
+ with pytest.raises(CephSecretDataError):
+ mod.secret_get_versions(["secret:/ns/global/bad"])
+
+ def test_secret_set_returns_metadata(self, mod):
+ result = mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+ assert "metadata" in result
+ assert result["metadata"]["version"] == 1
+
+ def test_secret_set_empty_data_raises(self, mod):
+ with pytest.raises(CephSecretException, match="must not be empty"):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "")
+
+ def test_secret_rm_existing(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+ assert mod.secret_rm("ns", SecretScope.GLOBAL, "", "k") is True
+
+ def test_secret_rm_nonexistent(self, mod):
+ assert mod.secret_rm("ns", SecretScope.GLOBAL, "", "ghost") is False
+
+ def test_secret_ls_empty(self, mod):
+ assert mod.secret_ls(namespace="ns") == {}
+
+ def test_secret_ls_returns_records(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "a", "data-a")
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "b", "data-b")
+ out = mod.secret_ls(namespace="ns")
+ assert "ns/global/a" in out
+ assert "ns/global/b" in out
+
+ def test_secret_ls_scope_filter(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "g", "data-g")
+ mod.secret_set("ns", SecretScope.SERVICE, "prom", "auth", "data-auth")
+ out = mod.secret_ls(namespace="ns", scope="service")
+ assert all("service" in k for k in out.keys())
+
+ def test_secret_ls_with_target(self, mod):
+ mod.secret_set("ns", SecretScope.SERVICE, "prom", "auth", "data-auth")
+ out = mod.secret_ls(namespace="ns", scope="service", target="prom")
+ assert "ns/service/prom/auth" in out
+
+ def test_secret_ls_show_internals(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+ out = mod.secret_ls(namespace="ns", show_internals=True)
+ rec = out["ns/global/k"]
+ assert "policy" in rec
+
+ def test_secret_ls_key_format_global(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+ out = mod.secret_ls(namespace="ns")
+ assert "ns/global/k" in out
+ for k in out:
+ assert "//" not in k
+
+ def test_scan_refs(self, mod):
+ result = mod.scan_refs({"key": "secret:/ns/global/pw"}, namespace="ns")
+ assert "secret:/ns/global/pw" in result
+
+ def test_scan_unresolved_refs_all_missing(self, mod):
+ result = mod.scan_unresolved_refs("secret:/ns/global/ghost", namespace="ns")
+ assert "secret:/ns/global/ghost" in result
+
+ def test_scan_unresolved_refs_exists(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "x")
+ result = mod.scan_unresolved_refs("secret:/ns/global/pw", namespace="ns")
+ assert result == []
+
+ def test_resolve_object(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+ result = mod.resolve_object("secret:/ns/global/pw")
+ assert result == "s3cr3t"
+
+
+# ---------------------------------------------------------------------------
+# Module internal ref-based methods
+# ---------------------------------------------------------------------------
+
+class TestModuleInternals:
+ @pytest.fixture
+ def mod(self, mgr):
+ return _make_module(mgr)
+
+ def _ref(self, ns="ns", scope=SecretScope.GLOBAL, target="", name="key"):
+ return SecretRef(ns, scope, target, name)
+
+ def test_secret_get_existing(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "key", "data-x")
+ result = mod._secret_get(self._ref())
+ assert result is not None
+
+ def test_secret_get_not_found(self, mod):
+ assert mod._secret_get(self._ref(name="ghost")) is None
+
+ def test_secret_get_corrupt_raises_data_error(self, mgr, mod):
+ mgr.set_store("secret_store/v1/ns/global/bad", "{not json")
+ with pytest.raises(CephSecretDataError):
+ mod._secret_get(self._ref(name="bad"))
+
+ def test_secret_get_version_existing(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "key", "data-x")
+ assert mod._secret_get_version(self._ref()) == 1
+
+ def test_secret_get_version_not_found(self, mod):
+ assert mod._secret_get_version(self._ref(name="ghost")) is None
+
+ def test_secret_get_version_corrupt_raises_data_error(self, mgr, mod):
+ mgr.set_store("secret_store/v1/ns/global/bad", "{not json")
+ with pytest.raises(CephSecretDataError):
+ mod._secret_get_version(self._ref(name="bad"))
+
+ def test_secret_set_returns_metadata_without_data(self, mod):
+ result = mod._secret_set(self._ref(), "data-x")
+ assert "metadata" in result
+ assert "data" not in result
+
+ def test_secret_rm_existing(self, mod):
+ mod._secret_set(self._ref(), "data-x")
+ assert mod._secret_rm(self._ref()) is True
+
+ def test_secret_rm_nonexistent(self, mod):
+ assert mod._secret_rm(self._ref(name="ghost")) is False
+
+
+# ---------------------------------------------------------------------------
+# CLI handler logic (path-based dispatch)
+# ---------------------------------------------------------------------------
+
+class TestModuleCLIHandlers:
+ @pytest.fixture
+ def mod(self, mgr):
+ return _make_module(mgr)
+
+ def _ok(self, result: Any) -> Any:
+ """Unwrap a Responder tuple (retcode, body, status) → parsed dict.
+ Falls through unchanged when Responder is a no-op stub."""
+ if isinstance(result, tuple):
+ retcode, body, _ = result
+ assert retcode == 0, f"unexpected error retcode {retcode}: {body}"
+ return json.loads(body)
+ return result
+
+ def _assert_error(self, result: Any, match: str = "") -> None:
+ """Assert a CLI call failed — either via a non-zero tuple or ErrorResponse.
+ Never accepts a raw CephSecretException (would mean _handle_secret_errors broke)."""
+ from object_format import ErrorResponse
+ if isinstance(result, tuple):
+ retcode, body, status = result
+ assert retcode != 0, "expected non-zero retcode"
+ if match:
+ assert match in (body + status).lower(), \
+ f"expected {match!r} in response, got body={body!r} status={status!r}"
+ elif isinstance(result, ErrorResponse):
+ if match:
+ assert match in str(result).lower()
+ else:
+ raise AssertionError(
+ f"expected error tuple or ErrorResponse, got {type(result).__name__}: {result!r}"
+ )
+
+ def test_cli_get_by_path_found(self, mod):
+ mod.secret_set("ns", SecretScope.GLOBAL, "", "key", "data-x")
+ result = self._ok(mod._cli_secret_get_by_path(path="ns/global/key"))
+ assert "metadata" in result
+
+ def test_cli_get_by_path_not_found(self, mod):
+ from object_format import ErrorResponse
+ try:
+ result = mod._cli_secret_get_by_path(path="ns/global/ghost")
+ self._assert_error(result, match="not found")
+ except ErrorResponse as e:
+ assert "not found" in str(e).lower()
+
+ def test_cli_get_by_path_bad_path(self, mod):
+ from object_format import ErrorResponse
+ try:
+ result = mod._cli_secret_get_by_path(path="ns/badscope/key")
+ self._assert_error(result)
+ except ErrorResponse:
+ pass
+
+ def test_cli_get_corrupt_raises_data_error(self, mgr, mod):
+ from object_format import ErrorResponse
+ mgr.set_store("secret_store/v1/ns/global/bad", "{not json")
+ # CephSecretDataError is a CephSecretException subclass so _handle_secret_errors
+ # wraps it into ErrorResponse; Responder formats it as a non-zero tuple.
+ try:
+ result = mod._cli_secret_get_by_path(path="ns/global/bad")
+ self._assert_error(result)
+ except ErrorResponse:
+ pass
+
+ def test_cli_set_by_path(self, mod):
+ result = self._ok(mod._cli_secret_set_by_path(
+ path="ns/global/key",
+ inbuf="s3cr3t"
+ ))
+ assert result["metadata"]["version"] == 1
+
+ def test_cli_set_by_path_no_inbuf(self, mod):
+ from object_format import ErrorResponse
+ try:
+ result = mod._cli_secret_set_by_path(path="ns/global/key", inbuf=None)
+ self._assert_error(result, match="-i")
+ except ErrorResponse as e:
+ assert "-i" in str(e)
+
+ def test_cli_set_by_path_empty_data(self, mod):
+ from object_format import ErrorResponse
+ try:
+ result = mod._cli_secret_set_by_path(path="ns/global/key", inbuf="")
+ self._assert_error(result, match="must not be empty")
+ except ErrorResponse as e:
+ assert "must not be empty" in str(e).lower()
+
+ def test_cli_rm_by_path_existing(self, mod):
+ mod._cli_secret_set_by_path(path="ns/global/key", inbuf='{"v": "x"}')
+ result = self._ok(mod._cli_secret_rm_by_path(path="ns/global/key"))
+ assert result["status"] == "removed"
+
+ def test_cli_rm_by_path_not_found(self, mod):
+ result = self._ok(mod._cli_secret_rm_by_path(path="ns/global/ghost"))
+ assert result["status"] == "not_found"
+
+ def test_cli_rm_by_path_bad_path(self, mod):
+ from object_format import ErrorResponse
+ try:
+ result = mod._cli_secret_rm_by_path(path="ns/badscope/key")
+ self._assert_error(result)
+ except ErrorResponse:
+ pass
+
+ def test_cli_ls(self, mod):
+ mod._cli_secret_set_by_path(path="ns/global/a", inbuf='{"v": "1"}')
+ mod._cli_secret_set_by_path(path="ns/global/b", inbuf='{"v": "2"}')
+ result = self._ok(mod._cli_secret_ls(namespace="ns"))
+ assert "ns/global/a" in result
+ assert "ns/global/b" in result
+
+ def test_cli_get_with_reveal(self, mod):
+ mod._cli_secret_set_by_path(path="ns/global/pw", inbuf="s3cr3t")
+ result = self._ok(mod._cli_secret_get_by_path(path="ns/global/pw", reveal=True))
+ assert result["data"] == "s3cr3t"
+
+ def test_cli_get_without_reveal_hides_data(self, mod):
+ mod._cli_secret_set_by_path(path="ns/global/pw", inbuf="s3cr3t")
+ result = self._ok(mod._cli_secret_get_by_path(path="ns/global/pw", reveal=False))
+ assert "data" not in result
+
+ def _get_value(self, result: Any) -> str:
+ """Unwrap a get-value tuple (retcode, body, status) → raw string."""
+ if isinstance(result, tuple):
+ retcode, body, _ = result
+ assert retcode == 0, f"unexpected error retcode {retcode}: {body}"
+ return body
+ return result
+
+ def test_cli_get_value_returns_raw_string(self, mod):
+ mod._cli_secret_set_by_path(path="ns/global/pw", inbuf="s3cr3t")
+ result = self._get_value(mod._cli_secret_get_value_by_path(path="ns/global/pw"))
+ assert result == "s3cr3t"
+
+ def test_cli_get_value_not_found_raises(self, mod):
+ from object_format import ErrorResponse
+ try:
+ result = mod._cli_secret_get_value_by_path(path="ns/global/ghost")
+ self._assert_error(result)
+ except ErrorResponse:
+ pass
+
+ def test_cli_get_value_bad_path_raises(self, mod):
+ from object_format import ErrorResponse
+ try:
+ result = mod._cli_secret_get_value_by_path(path="ns/badscope/key")
+ self._assert_error(result)
+ except ErrorResponse:
+ pass
+
+ def test_cli_get_value_opaque_json_string(self, mod):
+ payload = '{"u": "a", "p": "b"}'
+ mod._cli_secret_set_by_path(path="ns/global/creds", inbuf=payload)
+ result = self._get_value(mod._cli_secret_get_value_by_path(path="ns/global/creds"))
+ assert result == payload
--- /dev/null
+# -*- coding: utf-8 -*-
+"""Tests for the SecretStorageBackend ABC and contract."""
+from __future__ import annotations
+
+import pytest
+from abc import ABC
+
+from ceph_secrets.secret_backend import SecretStorageBackend
+from ceph_secrets_types import SecretScope
+
+
+class TestSecretStorageBackend:
+ def test_is_abstract(self):
+ assert issubclass(SecretStorageBackend, ABC)
+
+ def test_cannot_instantiate_directly(self):
+ with pytest.raises(TypeError):
+ SecretStorageBackend() # type: ignore[abstract]
+
+ def test_concrete_subclass_instantiates(self, store):
+ """SecretStoreMon (the concrete impl) must satisfy the ABC."""
+ assert isinstance(store, SecretStorageBackend)
+
+ def test_abstract_methods_defined(self):
+ abstract = SecretStorageBackend.__abstractmethods__
+ assert 'get' in abstract
+ assert 'set' in abstract
+ assert 'rm' in abstract
+ assert 'ls' in abstract
+ assert 'get_epoch' in abstract
+ assert 'bump_epoch' in abstract
+
+ def test_partial_implementation_still_abstract(self):
+ """Subclass missing any abstract method stays abstract."""
+ class PartialImpl(SecretStorageBackend):
+ def get(self, ns, scope, target, name):
+ pass
+
+ def set(self, ns, scope, target, name, data, user_made=True, editable=True):
+ pass
+
+ def rm(self, ns, scope, target, name):
+ pass
+
+ def ls(self, namespace=None, scope=None, target=None):
+ pass
+
+ def get_epoch(self, namespace):
+ pass
+ # missing bump_epoch
+
+ with pytest.raises(TypeError):
+ PartialImpl() # type: ignore[abstract]
+
+ def test_full_implementation_is_instantiable(self):
+
+ class FullImpl(SecretStorageBackend):
+
+ def get(self, ns, scope, target, name):
+ return None
+
+ def set(self, ns, scope, target, name, data, user_made=True, editable=True):
+ pass
+
+ def rm(self, ns, scope, target, name):
+ return False
+
+ def ls(self, namespace=None, scope=None, target=None):
+ return []
+
+ def get_epoch(self, namespace):
+ return 0
+
+ def bump_epoch(self, namespace):
+ return 1
+
+ impl = FullImpl()
+ assert impl.get("ns", SecretScope.GLOBAL, "", "k") is None
+ assert impl.get_epoch("ns") == 0
+ assert impl.bump_epoch("ns") == 1
--- /dev/null
+# -*- coding: utf-8 -*-
+"""Tests for ceph_secrets.secret_mgr (SecretMgr)."""
+from __future__ import annotations
+
+import pytest
+
+from ceph_secrets_types import (
+ SecretScope, SecretRef,
+ CephSecretException, CephSecretNotFoundError,
+ BadSecretURI,
+)
+
+
+# ============================================================
+# make_ref
+# ============================================================
+
+class TestMakeRef:
+ def test_basic(self, secret_mgr):
+ ref = secret_mgr.make_ref("ns", SecretScope.GLOBAL, "", "key")
+ assert isinstance(ref, SecretRef)
+ assert ref.namespace == "ns"
+
+ def test_scope_as_string(self, secret_mgr):
+ ref = secret_mgr.make_ref("ns", "host", "node1", "ssh_key")
+ assert ref.scope == SecretScope.HOST
+
+ def test_bad_scope_raises(self, secret_mgr):
+ with pytest.raises(CephSecretException):
+ secret_mgr.make_ref("ns", "badscope", "", "key")
+
+ def test_bad_namespace_raises(self, secret_mgr):
+ with pytest.raises(CephSecretException):
+ secret_mgr.make_ref("bad ns!", SecretScope.GLOBAL, "", "key")
+
+
+# ============================================================
+# get / get_value
+# ============================================================
+
+class TestGet:
+ def test_get_existing(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+ ref = SecretRef("ns", SecretScope.GLOBAL, "", "pw")
+ rec = secret_mgr.get(ref)
+ assert rec.data == "s3cr3t"
+
+ def test_get_missing_raises(self, secret_mgr):
+ ref = SecretRef("ns", SecretScope.GLOBAL, "", "ghost")
+ with pytest.raises(CephSecretNotFoundError):
+ secret_mgr.get(ref)
+
+ def test_get_value(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+ ref = SecretRef("ns", SecretScope.GLOBAL, "", "pw")
+ assert secret_mgr.get_value(ref) == "s3cr3t"
+
+ def test_get_value_opaque_string(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "creds", '{"u": "admin", "p": "pw"}')
+ ref = SecretRef("ns", SecretScope.GLOBAL, "", "creds")
+ val = secret_mgr.get_value(ref)
+ assert isinstance(val, str)
+ assert val == '{"u": "admin", "p": "pw"}'
+
+ def test_get_value_missing_raises(self, secret_mgr):
+ ref = SecretRef("ns", SecretScope.GLOBAL, "", "ghost")
+ with pytest.raises(CephSecretNotFoundError):
+ secret_mgr.get_value(ref)
+
+
+# ============================================================
+# set
+# ============================================================
+
+class TestSet:
+ def test_set_global(self, secret_mgr):
+ rec = secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "data-v1")
+ assert rec.metadata.version == 1
+ assert rec.data == "data-v1"
+
+ def test_set_service(self, secret_mgr):
+ rec = secret_mgr.set("ns", SecretScope.SERVICE, "prom", "auth", "user-a")
+ assert rec.target == "prom"
+
+ def test_set_custom(self, secret_mgr):
+ rec = secret_mgr.set("ns", SecretScope.CUSTOM, "", "a/b/c", "tok")
+ assert rec.name == "a/b/c"
+
+ def test_set_non_str_raises(self, secret_mgr):
+ with pytest.raises(CephSecretException, match="string"):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", {"not": "a-string"}) # type: ignore[arg-type]
+
+ def test_set_empty_string_raises(self, secret_mgr):
+ with pytest.raises(CephSecretException, match="must not be empty"):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "")
+
+ def test_set_preserves_whitespace_and_newlines(self, secret_mgr):
+ payload = " secret-value\n"
+ rec = secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", payload)
+ assert rec.data == payload
+ assert secret_mgr.get_value(SecretRef("ns", SecretScope.GLOBAL, "", "k")) == payload
+
+ def test_set_increments_version(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "data-v1")
+ rec = secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "data-v2")
+ assert rec.metadata.version == 2
+
+ def test_set_scope_string(self, secret_mgr):
+ rec = secret_mgr.set("ns", "global", "", "k", "data-x")
+ assert rec.scope == SecretScope.GLOBAL
+
+
+# ============================================================
+# rm
+# ============================================================
+
+class TestRm:
+ def test_rm_existing(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+ assert secret_mgr.rm("ns", SecretScope.GLOBAL, "", "k") is True
+
+ def test_rm_nonexistent(self, secret_mgr):
+ assert secret_mgr.rm("ns", SecretScope.GLOBAL, "", "ghost") is False
+
+
+# ============================================================
+# ls
+# ============================================================
+
+class TestLs:
+ def test_ls_empty(self, secret_mgr):
+ assert secret_mgr.ls(namespace="ns") == []
+
+ def test_ls_returns_records(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "a", "data-a")
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "b", "data-b")
+ recs = secret_mgr.ls(namespace="ns")
+ assert len(recs) == 2
+
+ def test_ls_scope_filter(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "g", "data-g")
+ secret_mgr.set("ns", SecretScope.SERVICE, "prom", "auth", "data-auth")
+ recs = secret_mgr.ls(namespace="ns", scope="service")
+ assert len(recs) == 1
+ assert recs[0].scope == SecretScope.SERVICE
+
+
+# ============================================================
+# scan_refs / scan_unresolved_refs
+# ============================================================
+
+class TestScanRefs:
+ def test_scan_simple_string(self, secret_mgr):
+ obj = "secret:/ns/global/pw"
+ refs = secret_mgr.scan_refs(obj, namespace="ns")
+ uris = {r.to_uri() for r in refs}
+ assert "secret:/ns/global/pw" in uris
+
+ def test_scan_in_dict(self, secret_mgr):
+ obj = {"key": "secret:/ns/global/pw"}
+ refs = secret_mgr.scan_refs(obj, namespace="ns")
+ assert len(refs) == 1
+
+ def test_scan_in_list(self, secret_mgr):
+ obj = ["secret:/ns/global/pw", "secret:/ns/host/node1/ssh"]
+ refs = secret_mgr.scan_refs(obj, namespace="ns")
+ assert len(refs) == 2
+
+ def test_scan_nested(self, secret_mgr):
+ obj = {"a": {"b": "secret:/ns/global/pw"}}
+ refs = secret_mgr.scan_refs(obj, namespace="ns")
+ assert len(refs) == 1
+
+ def test_scan_no_refs(self, secret_mgr):
+ obj = {"plain": "value"}
+ assert secret_mgr.scan_refs(obj, namespace="ns") == set()
+
+ def test_scan_bad_uri_yields_bad_secret_uri(self, secret_mgr):
+ obj = "secret:/ns/badscope/key"
+ refs = secret_mgr.scan_refs(obj, namespace="ns")
+ bad = [r for r in refs if isinstance(r, BadSecretURI)]
+ assert len(bad) == 1
+
+ def test_scan_unresolved_all_exist(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "x")
+ obj = "secret:/ns/global/pw"
+ unresolved = secret_mgr.scan_unresolved_refs(obj, namespace="ns")
+ assert len(unresolved) == 0
+
+ def test_scan_unresolved_missing(self, secret_mgr):
+ obj = "secret:/ns/global/ghost"
+ unresolved = secret_mgr.scan_unresolved_refs(obj, namespace="ns")
+ assert len(unresolved) == 1
+
+ def test_scan_unresolved_bad_uri_is_unresolved(self, secret_mgr):
+ obj = "secret:/ns/badscope/key"
+ unresolved = secret_mgr.scan_unresolved_refs(obj, namespace="ns")
+ assert len(unresolved) == 1
+
+ def test_scan_unresolved_embedded_uri_is_unresolved(self, secret_mgr):
+ # Even though the referenced secret exists, an embedded (non-whole-value)
+ # URI is never resolvable, so it must be reported as unresolved so that
+ # pre-deploy validation catches it.
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "x")
+ obj = {"auth": "Bearer secret:/ns/global/pw"}
+ unresolved = secret_mgr.scan_unresolved_refs(obj, namespace="ns")
+ assert len(unresolved) == 1
+
+
+# ============================================================
+# resolve_object
+# ============================================================
+
+class TestResolveObject:
+ def test_resolve_secret(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+ result = secret_mgr.resolve_object("secret:/ns/global/pw")
+ assert result == "s3cr3t"
+
+ def test_resolve_opaque_json_string(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "creds", '{"u": "a", "p": "b"}')
+ result = secret_mgr.resolve_object("secret:/ns/global/creds")
+ assert isinstance(result, str)
+ assert result == '{"u": "a", "p": "b"}'
+
+ def test_resolve_in_dict(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+ result = secret_mgr.resolve_object({"password": "secret:/ns/global/pw"})
+ assert result["password"] == "s3cr3t"
+
+ def test_resolve_in_list(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "a", "x")
+ result = secret_mgr.resolve_object(["secret:/ns/global/a", "plain"])
+ assert result[0] == "x"
+ assert result[1] == "plain"
+
+ def test_resolve_in_tuple(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "t", "y")
+ result = secret_mgr.resolve_object(("secret:/ns/global/t",))
+ assert result == ("y",)
+
+ def test_resolve_non_secret_string_unchanged(self, secret_mgr):
+ result = secret_mgr.resolve_object("just a normal string")
+ assert result == "just a normal string"
+
+ def test_resolve_non_string_unchanged(self, secret_mgr):
+ assert secret_mgr.resolve_object(42) == 42
+ assert secret_mgr.resolve_object(None) is None
+
+ def test_resolve_missing_secret_raises(self, secret_mgr):
+ with pytest.raises(CephSecretException):
+ secret_mgr.resolve_object("secret:/ns/global/ghost")
+
+ def test_resolve_invalid_uri_raises(self, secret_mgr):
+ with pytest.raises(CephSecretException):
+ secret_mgr.resolve_object("secret:/ns/badscope/key")
+
+ def test_resolve_edge_whitespace_tolerated(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+ assert secret_mgr.resolve_object(" secret:/ns/global/pw ") == "s3cr3t"
+
+ def test_resolve_newline_whitespace_tolerated(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+ assert secret_mgr.resolve_object("\nsecret:/ns/global/pw\n") == "s3cr3t"
+
+ def test_resolve_ref_with_suffix_raises(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+ with pytest.raises(CephSecretException):
+ secret_mgr.resolve_object("secret:/ns/global/pw suffix")
+
+ def test_resolve_embedded_uri_raises(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+ with pytest.raises(CephSecretException, match="embedded"):
+ secret_mgr.resolve_object("Bearer secret:/ns/global/pw")
+
+ def test_resolve_embedded_uri_in_dict_raises(self, secret_mgr):
+ secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t")
+ with pytest.raises(CephSecretException, match="embedded"):
+ secret_mgr.resolve_object({"auth": "Bearer secret:/ns/global/pw"})
+
+ def test_resolve_plain_string_whitespace_preserved(self, secret_mgr):
+ # non-secret strings pass through byte-for-byte, including whitespace
+ assert secret_mgr.resolve_object(" plain value ") == " plain value "
+
+
+# ============================================================
+# scan_refs — edge cases
+# ============================================================
+
+class TestScanRefsEdgeCases:
+ def test_embedded_refs_rejected_as_bad(self, secret_mgr):
+ # A string that embeds URIs inside other text is NOT a whole-value
+ # reference; it is surfaced as a single BadSecretURI, not parsed into
+ # multiple SecretRefs.
+ obj = "use secret:/ns/global/a and secret:/ns/global/b"
+ refs = secret_mgr.scan_refs(obj, namespace="ns")
+ assert len(refs) == 1
+ bad = [r for r in refs if isinstance(r, BadSecretURI)]
+ assert len(bad) == 1
+ assert bad[0].raw == obj
+
+ def test_duplicate_refs_deduped(self, secret_mgr):
+ obj = ["secret:/ns/global/pw", "secret:/ns/global/pw"]
+ refs = secret_mgr.scan_refs(obj, namespace="ns")
+ uris = [r.to_uri() for r in refs]
+ assert uris.count("secret:/ns/global/pw") == 1
+
+ def test_ref_followed_by_punctuation(self, secret_mgr):
+ # "secret:/ns/global/pw," is a whole-value string that starts with the
+ # prefix but is not a valid URI (trailing comma in name) → BadSecretURI.
+ obj = "secret:/ns/global/pw,"
+ refs = secret_mgr.scan_refs(obj, namespace="ns")
+ bad = [r for r in refs if isinstance(r, BadSecretURI)]
+ assert len(bad) == 1
+
+ def test_whole_value_ref_with_edge_whitespace(self, secret_mgr):
+ # Surrounding whitespace around an otherwise-clean URI is tolerated.
+ obj = " secret:/ns/global/pw "
+ refs = secret_mgr.scan_refs(obj, namespace="ns")
+ uris = {r.to_uri() for r in refs if isinstance(r, SecretRef)}
+ assert "secret:/ns/global/pw" in uris
+
+ def test_embedded_ref_in_dict_value_is_bad(self, secret_mgr):
+ obj = {"auth": "Bearer secret:/ns/global/pw"}
+ refs = secret_mgr.scan_refs(obj, namespace="ns")
+ bad = [r for r in refs if isinstance(r, BadSecretURI)]
+ assert len(bad) == 1
+
+ def test_ref_inside_tuple(self, secret_mgr):
+ obj = ("secret:/ns/global/pw",)
+ refs = secret_mgr.scan_refs(obj, namespace="ns")
+ assert len(refs) == 1
+
+ def test_cross_namespace_ref_is_scanned(self, secret_mgr):
+ """scan_refs finds refs regardless of namespace — namespace arg does not filter."""
+ obj = "secret:/other/global/pw"
+ refs = secret_mgr.scan_refs(obj, namespace="ns")
+ uris = {r.to_uri() for r in refs}
+ assert "secret:/other/global/pw" in uris
--- /dev/null
+# -*- coding: utf-8 -*-
+"""Tests for ceph_secrets.secret_store (SecretStoreMon + data model)."""
+from __future__ import annotations
+
+import pytest
+
+# conftest injects stubs; just import production code after.
+from ceph_secrets.secret_store import (
+ SecretRecord,
+ SecretMetadata,
+ SecretPolicy,
+ SECRET_STORE_PREFIX,
+ SECRET_STORE_FORMAT_VERSION,
+ _checked_ref,
+ _checked_namespace,
+ _epoch_key,
+ _now_iso,
+ _expect_bool,
+ _expect_str,
+ _expect_positive_int,
+ _expect_object,
+ _reject_unknown_keys,
+ _require_keys,
+)
+from ceph_secrets_types import (
+ SecretScope, SecretRef,
+ CephSecretException, CephSecretDataError,
+)
+
+
+# ============================================================
+# Helpers / primitive validators
+# ============================================================
+
+class TestPrimitiveHelpers:
+ def test_now_iso_format(self):
+ ts = _now_iso()
+ import re
+ assert re.match(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', ts)
+
+ def test_expect_bool_ok(self):
+ assert _expect_bool("f", True) is True
+ assert _expect_bool("f", False) is False
+
+ def test_expect_bool_bad(self):
+ with pytest.raises(CephSecretDataError, match="must be a boolean"):
+ _expect_bool("f", 1)
+
+ def test_expect_str_ok(self):
+ assert _expect_str("f", "hello") == "hello"
+
+ def test_expect_str_bad(self):
+ with pytest.raises(CephSecretDataError, match="must be a string"):
+ _expect_str("f", 42)
+
+ def test_expect_positive_int_ok(self):
+ assert _expect_positive_int("f", 1) == 1
+ assert _expect_positive_int("f", 99) == 99
+
+ def test_expect_positive_int_zero(self):
+ with pytest.raises(CephSecretDataError):
+ _expect_positive_int("f", 0)
+
+ def test_expect_positive_int_bool_rejected(self):
+ with pytest.raises(CephSecretDataError):
+ _expect_positive_int("f", True)
+
+ def test_expect_object_ok(self):
+ d = {"a": 1}
+ assert _expect_object("o", d) is d
+
+ def test_expect_object_bad(self):
+ with pytest.raises(CephSecretDataError, match="must be a JSON object"):
+ _expect_object("o", [1, 2])
+
+ def test_reject_unknown_keys(self):
+ with pytest.raises(CephSecretDataError, match="unknown field"):
+ _reject_unknown_keys("L", {"a": 1, "z": 2}, {"a"})
+
+ def test_require_keys_missing(self):
+ with pytest.raises(CephSecretDataError, match="missing required"):
+ _require_keys("L", {"a": 1}, {"a", "b"})
+
+ def test_checked_namespace_valid(self):
+ assert _checked_namespace("cephadm") == "cephadm"
+
+ def test_checked_namespace_bad(self):
+ with pytest.raises(CephSecretDataError):
+ _checked_namespace("bad ns!")
+
+ def test_epoch_key_format(self):
+ assert _epoch_key("cephadm") == "secret_store/meta/cephadm/_epoch"
+
+ def test_checked_ref_valid(self):
+ ref = _checked_ref("ns", SecretScope.GLOBAL, "", "key")
+ assert isinstance(ref, SecretRef)
+
+ def test_checked_ref_bad_scope(self):
+ with pytest.raises(CephSecretDataError):
+ _checked_ref("ns", "bad", "", "key")
+
+
+# ============================================================
+# SecretMetadata
+# ============================================================
+
+class TestSecretMetadata:
+ def test_defaults(self):
+ m = SecretMetadata(version=1, created="2024-01-01T00:00:00Z", updated="2024-01-01T00:00:00Z")
+ assert m.version == 1
+
+ def test_to_json(self):
+ m = SecretMetadata(version=2, created="2024-01-01T00:00:00Z", updated="2024-01-02T00:00:00Z")
+ d = m.to_json()
+ assert d["version"] == 2
+ assert "created" in d
+ assert "updated" in d
+
+ def test_from_json_roundtrip(self):
+ m = SecretMetadata(version=3, created="2024-01-01T00:00:00Z", updated="2024-01-02T00:00:00Z")
+ m2 = SecretMetadata.from_json(m.to_json())
+ assert m2.version == 3
+
+ def test_from_json_missing_field(self):
+ with pytest.raises(CephSecretDataError, match="missing required"):
+ SecretMetadata.from_json({"version": 1, "created": "x"})
+
+ def test_from_json_unknown_field(self):
+ with pytest.raises(CephSecretDataError, match="unknown"):
+ SecretMetadata.from_json({"version": 1, "created": "x", "updated": "y", "extra": 1})
+
+ def test_version_zero_raises(self):
+ with pytest.raises(CephSecretDataError):
+ SecretMetadata(version=0, created="x", updated="y")
+
+ def test_non_dict_raises(self):
+ with pytest.raises(CephSecretDataError):
+ SecretMetadata.from_json("not a dict")
+
+
+# ============================================================
+# SecretPolicy
+# ============================================================
+
+class TestSecretPolicy:
+ def test_defaults(self):
+ p = SecretPolicy()
+ assert p.user_made is True
+ assert p.editable is True
+
+ def test_to_json(self):
+ p = SecretPolicy(user_made=False, editable=True)
+ d = p.to_json()
+ assert d["user_made"] is False
+
+ def test_from_json_roundtrip(self):
+ p = SecretPolicy(user_made=True, editable=False)
+ p2 = SecretPolicy.from_json(p.to_json())
+ assert p2.editable is False
+
+ def test_from_json_missing(self):
+ with pytest.raises(CephSecretDataError):
+ SecretPolicy.from_json({"user_made": True})
+
+ def test_from_json_bad_type(self):
+ with pytest.raises(CephSecretDataError, match="must be a boolean"):
+ SecretPolicy.from_json({"user_made": 1, "editable": True})
+
+ def test_non_bool_raises(self):
+ with pytest.raises(CephSecretDataError):
+ SecretPolicy(user_made="yes", editable=True) # type: ignore[arg-type]
+
+
+# ============================================================
+# SecretRecord
+# ============================================================
+
+class TestSecretRecord:
+ def _ref(self, scope=SecretScope.GLOBAL, target="", name="key"):
+ return SecretRef("ns", scope, target, name)
+
+ def _meta(self):
+ return SecretMetadata(version=1, created="2024-01-01T00:00:00Z", updated="2024-01-01T00:00:00Z")
+
+ def test_basic_construction(self):
+ rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="s3cr3t")
+ assert rec.namespace == "ns"
+ assert rec.scope == SecretScope.GLOBAL
+ assert rec.name == "key"
+
+ def test_to_public_json_no_data(self):
+ rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="s3cr3t")
+ out = rec.to_public_json(include_data=False)
+ assert "data" not in out
+ assert "metadata" in out
+
+ def test_to_public_json_with_data(self):
+ rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="s3cr3t")
+ out = rec.to_public_json(include_data=True)
+ assert out["data"] == "s3cr3t"
+
+ def test_to_public_json_with_ref(self):
+ rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="x")
+ out = rec.to_public_json(include_ref=True)
+ assert out["ref"]["namespace"] == "ns"
+ assert out["ref"]["scope"] == "global"
+
+ def test_to_public_json_with_policy(self):
+ rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="x")
+ out = rec.to_public_json(include_policy=True)
+ assert "policy" in out
+
+ def test_to_store_json(self):
+ rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="x")
+ stored = rec.to_store_json()
+ assert stored["format_version"] == SECRET_STORE_FORMAT_VERSION
+ assert "data" in stored
+ assert "policy" in stored
+
+ def test_from_store_json_roundtrip(self):
+ ref = self._ref()
+ rec = SecretRecord(ref=ref, metadata=self._meta(), data="pw")
+ rec2 = SecretRecord.from_store_json(ref, rec.to_store_json())
+ assert rec2.data == "pw"
+ assert rec2.metadata.version == 1
+
+ def test_from_store_json_wrong_version(self):
+ ref = self._ref()
+ payload = {
+ "format_version": 99,
+ "metadata": {"version": 1, "created": "x", "updated": "y"},
+ "policy": {"user_made": True, "editable": True},
+ "data": "x",
+ }
+ with pytest.raises(CephSecretDataError, match="unsupported"):
+ SecretRecord.from_store_json(ref, payload)
+
+ def test_from_store_json_missing_key(self):
+ ref = self._ref()
+ with pytest.raises(CephSecretDataError, match="missing required"):
+ SecretRecord.from_store_json(ref, {"format_version": 1, "metadata": {}})
+
+ def test_bad_ref_type_raises(self):
+ with pytest.raises(CephSecretDataError, match="must be a SecretRef"):
+ SecretRecord(ref="not-a-ref", metadata=self._meta(), data="x") # type: ignore[arg-type]
+
+ def test_bad_metadata_type_raises(self):
+ with pytest.raises(CephSecretDataError):
+ SecretRecord(ref=self._ref(), metadata={"version": 1}, data="x") # type: ignore[arg-type]
+
+ def test_bad_data_type_raises(self):
+ with pytest.raises(CephSecretDataError):
+ SecretRecord(ref=self._ref(), metadata=self._meta(), data={"not": "a-string"}) # type: ignore[arg-type]
+
+ def test_empty_data_raises(self):
+ with pytest.raises(CephSecretDataError, match="must not be empty"):
+ SecretRecord(ref=self._ref(), metadata=self._meta(), data="")
+
+ def test_data_in_store_json(self):
+ rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="myvalue")
+ stored = rec.to_store_json()
+ assert stored["data"] == "myvalue"
+
+ def test_ident(self):
+ ref = self._ref()
+ rec = SecretRecord(ref=ref, metadata=self._meta(), data="x")
+ assert rec.ident() == ("ns", "global", "", "key")
+
+
+# ============================================================
+# SecretStoreMon – epoch
+# ============================================================
+
+class TestSecretStoreMon_Epoch:
+ def test_initial_epoch_is_zero(self, store):
+ assert store.get_epoch("cephadm") == 0
+
+ def test_bump_epoch(self, store):
+ assert store.bump_epoch("cephadm") == 1
+ assert store.bump_epoch("cephadm") == 2
+
+ def test_namespace_isolation(self, store):
+ store.bump_epoch("cephadm")
+ store.bump_epoch("cephadm")
+ assert store.get_epoch("rook") == 0
+
+ def test_epoch_key_stored_correctly(self, mgr, store):
+ store.bump_epoch("myns")
+ assert mgr.get_store("secret_store/meta/myns/_epoch") == "1"
+
+ def test_corrupted_epoch_returns_zero(self, mgr, store):
+ mgr.set_store("secret_store/meta/ns/_epoch", "notanumber")
+ assert store.get_epoch("ns") == 0
+
+
+# ============================================================
+# SecretStoreMon – CRUD
+# ============================================================
+
+class TestSecretStoreMon_Crud:
+ def test_set_and_get_global(self, store):
+ store.set("ns", SecretScope.GLOBAL, "", "pw", "secret")
+ rec = store.get("ns", SecretScope.GLOBAL, "", "pw")
+ assert rec is not None
+ assert rec.data == "secret"
+
+ def test_get_missing_returns_none(self, store):
+ assert store.get("ns", SecretScope.GLOBAL, "", "noexist") is None
+
+ def test_set_increments_version(self, store):
+ store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1")
+ store.set("ns", SecretScope.GLOBAL, "", "k", "data-v2")
+ rec = store.get("ns", SecretScope.GLOBAL, "", "k")
+ assert rec.metadata.version == 2
+
+ def test_set_preserves_created_timestamp(self, store):
+ import time
+ store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1")
+ rec1 = store.get("ns", SecretScope.GLOBAL, "", "k")
+ time.sleep(1)
+ store.set("ns", SecretScope.GLOBAL, "", "k", "data-v2")
+ rec2 = store.get("ns", SecretScope.GLOBAL, "", "k")
+ assert rec1.metadata.created == rec2.metadata.created
+ assert rec2.metadata.updated != rec2.metadata.created
+
+ def test_set_service_scope(self, store):
+ store.set("ns", SecretScope.SERVICE, "prometheus", "auth", "admin")
+ rec = store.get("ns", SecretScope.SERVICE, "prometheus", "auth")
+ assert rec.data == "admin"
+
+ def test_set_host_scope(self, store):
+ store.set("ns", SecretScope.HOST, "node1", "ssh", "abc")
+ rec = store.get("ns", SecretScope.HOST, "node1", "ssh")
+ assert rec.data == "abc"
+
+ def test_set_custom_scope(self, store):
+ store.set("ns", SecretScope.CUSTOM, "", "a/b/c", "tok")
+ rec = store.get("ns", SecretScope.CUSTOM, "", "a/b/c")
+ assert rec.data == "tok"
+
+ def test_set_bumps_epoch(self, store):
+ store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1")
+ assert store.get_epoch("ns") == 1
+
+ def test_set_non_str_data_raises(self, store):
+ with pytest.raises(CephSecretException):
+ store.set("ns", SecretScope.GLOBAL, "", "k", {"not": "a-string"}) # type: ignore[arg-type]
+
+ def test_set_empty_data_raises(self, store):
+ with pytest.raises(CephSecretException, match="must not be empty"):
+ store.set("ns", SecretScope.GLOBAL, "", "k", "")
+
+ def test_set_non_editable_raises(self, store):
+ store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1", editable=False)
+ with pytest.raises(CephSecretException, match="not editable"):
+ store.set("ns", SecretScope.GLOBAL, "", "k", "data-v2")
+
+ def test_rm_existing(self, store):
+ store.set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+ assert store.rm("ns", SecretScope.GLOBAL, "", "k") is True
+ assert store.get("ns", SecretScope.GLOBAL, "", "k") is None
+
+ def test_rm_nonexistent(self, store):
+ assert store.rm("ns", SecretScope.GLOBAL, "", "ghost") is False
+
+ def test_rm_bumps_epoch(self, store):
+ store.set("ns", SecretScope.GLOBAL, "", "k", "data-x")
+ epoch_after_set = store.get_epoch("ns")
+ store.rm("ns", SecretScope.GLOBAL, "", "k")
+ assert store.get_epoch("ns") == epoch_after_set + 1
+
+ def test_rm_nonexistent_no_epoch_bump(self, store):
+ assert store.get_epoch("ns") == 0
+ store.rm("ns", SecretScope.GLOBAL, "", "ghost")
+ assert store.get_epoch("ns") == 0
+
+ def test_get_corrupted_json_raises(self, mgr, store):
+ mgr.set_store(
+ f"{SECRET_STORE_PREFIX}ns/global/badkey", "NOT JSON"
+ )
+ with pytest.raises(CephSecretDataError):
+ store.get("ns", SecretScope.GLOBAL, "", "badkey")
+
+ def test_set_with_bad_scope_string(self, store):
+ with pytest.raises(CephSecretDataError):
+ store.set("ns", "badscope", "", "k", "data-v1")
+
+
+# ============================================================
+# SecretStoreMon – ls
+# ============================================================
+
+class TestSecretStoreMon_Ls:
+ def test_ls_empty(self, store):
+ assert store.ls(namespace="ns") == []
+
+ def test_ls_returns_all(self, store):
+ store.set("ns", SecretScope.GLOBAL, "", "k1", "data-1")
+ store.set("ns", SecretScope.GLOBAL, "", "k2", "data-2")
+ recs = store.ls(namespace="ns")
+ assert len(recs) == 2
+
+ def test_ls_filter_scope(self, store):
+ store.set("ns", SecretScope.GLOBAL, "", "g", "data-g")
+ store.set("ns", SecretScope.SERVICE, "prom", "auth", "data-auth")
+ recs = store.ls(namespace="ns", scope=SecretScope.GLOBAL)
+ assert len(recs) == 1
+ assert recs[0].scope == SecretScope.GLOBAL
+
+ def test_ls_filter_target(self, store):
+ store.set("ns", SecretScope.SERVICE, "prom", "a1", "data-a1")
+ store.set("ns", SecretScope.SERVICE, "grafana", "a2", "data-a2")
+ recs = store.ls(namespace="ns", scope=SecretScope.SERVICE, target="prom")
+ assert len(recs) == 1
+ assert recs[0].target == "prom"
+
+ def test_ls_namespace_isolation(self, store):
+ store.set("ns1", SecretScope.GLOBAL, "", "k", "data-ns1")
+ store.set("ns2", SecretScope.GLOBAL, "", "k", "data-ns2")
+ recs = store.ls(namespace="ns1")
+ assert len(recs) == 1
+
+ def test_ls_custom_scope(self, store):
+ store.set("ns", SecretScope.CUSTOM, "", "a/b/c", "data-abc")
+ store.set("ns", SecretScope.CUSTOM, "", "x/y", "data-xy")
+ recs = store.ls(namespace="ns", scope=SecretScope.CUSTOM)
+ assert len(recs) == 2
+
+ def test_ls_sorted(self, store):
+ store.set("ns", SecretScope.GLOBAL, "", "zz", "data-zz")
+ store.set("ns", SecretScope.GLOBAL, "", "aa", "data-aa")
+ recs = store.ls(namespace="ns")
+ names = [r.name for r in recs]
+ assert names == sorted(names)
+
+ def test_ls_invalid_namespace(self, store):
+ with pytest.raises(CephSecretDataError):
+ store.ls(namespace="bad ns!")
+
+ def test_ls_scope_as_string(self, store):
+ store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1")
+ recs = store.ls(namespace="ns", scope="global")
+ assert len(recs) == 1
+
+ def test_ls_corrupted_json_raises(self, mgr, store):
+ mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global/bad", "NOT-JSON")
+ with pytest.raises(CephSecretDataError):
+ store.ls(namespace="ns")
+
+ def test_ls_no_filter(self, store):
+ store.set("ns1", SecretScope.GLOBAL, "", "a", "data-a")
+ store.set("ns2", SecretScope.GLOBAL, "", "b", "data-b")
+ recs = store.ls()
+ assert len(recs) == 2
+
+ def test_ls_kv_key_structure_validation(self, mgr, store):
+ """A key with empty segments inside the prefix should raise CephSecretDataError."""
+ mgr.set_store(f"{SECRET_STORE_PREFIX}ns//global/k", "{}")
+ with pytest.raises(CephSecretDataError, match="empty path component"):
+ store.ls(namespace="ns")
+
+
+# ============================================================
+# TestSecretStoreMon_Ls — malformed persisted-record cases
+# ============================================================
+
+class TestSecretStoreMon_Ls_Malformed:
+ def test_ls_too_few_segments(self, mgr, store):
+ mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global", "{}")
+ with pytest.raises(CephSecretDataError, match="unexpected key structure"):
+ store.ls(namespace="ns")
+
+ def test_ls_invalid_scope_in_key(self, mgr, store):
+ mgr.set_store(f"{SECRET_STORE_PREFIX}ns/badscope/key", "{}")
+ with pytest.raises(CephSecretDataError, match="invalid scope"):
+ store.ls(namespace="ns")
+
+ def test_ls_global_with_extra_segment(self, mgr, store):
+ mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global/target/name", "{}")
+ with pytest.raises(CephSecretDataError, match="unexpected global key structure"):
+ store.ls(namespace="ns")
+
+ def test_ls_service_with_missing_segment(self, mgr, store):
+ # service needs 4 parts (ns/service/target/name); 3 is too few
+ mgr.set_store(f"{SECRET_STORE_PREFIX}ns/service/onlytarget", "{}")
+ with pytest.raises(CephSecretDataError, match="unexpected targeted-scope key structure"):
+ store.ls(namespace="ns")
+
+ def test_ls_payload_not_object(self, mgr, store):
+ mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global/k", '["not", "object"]')
+ with pytest.raises(CephSecretDataError, match="not a JSON object"):
+ store.ls(namespace="ns")
+
+ def test_ls_payload_valid_json_missing_fields(self, mgr, store):
+ # Valid JSON object but missing required secret record fields
+ mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global/k", '{"unexpected": 1}')
+ with pytest.raises(CephSecretDataError):
+ store.ls(namespace="ns")
+
+
+# ============================================================
+# TestSecretStoreMon_Crud — editable=False allows rm by design
+# ============================================================
+
+class TestSecretStoreMon_EditableRm:
+ def test_rm_non_editable_secret_is_allowed(self, store):
+ """rm ignores editable — deletion is always permitted by design."""
+ store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1", editable=False)
+ result = store.rm("ns", SecretScope.GLOBAL, "", "k")
+ assert result is True
+
+ def test_set_non_editable_blocks_update(self, store):
+ """Confirmed: set refuses to update a non-editable secret."""
+ store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1", editable=False)
+ with pytest.raises(CephSecretException):
+ store.set("ns", SecretScope.GLOBAL, "", "k", "data-v2")
rather than automatically by a Ceph component. Defaults
to True; set to False for programmatically generated
secrets.
- editable: Whether the secret may be updated or removed by
- automated tooling. Defaults to True.
+ editable: Whether the secret may be updated by automated
+ tooling. Deletion is always permitted regardless of
+ this flag. Defaults to True.
Returns:
A dict containing the written record's ``metadata`` object. The
if not isinstance(uri, str):
raise CephSecretException('secret uri must be a string')
+ if uri != uri.strip():
+ raise CephSecretException(
+ f'Invalid secret uri {uri!r}: leading/trailing whitespace is not allowed'
+ )
+
parsed = urlparse(uri)
if parsed.scheme != SECRET_SCHEME:
raise CephSecretException(f'Not a secret uri: {uri!r}')
if not isinstance(path, str):
raise CephSecretException('secret path must be a string')
+ if path != path.strip():
+ raise CephSecretException(
+ f'Invalid secret path {path!r}: leading/trailing whitespace is not allowed'
+ )
+
p = path.strip()
if not p:
raise CephSecretException('Invalid secret path: empty')
--- /dev/null
+# -*- coding: utf-8 -*-
+"""
+Unit tests for ceph_secrets_client.py.
+Placed at the same level as the module under test (src/pybind/mgr/).
+"""
+from __future__ import annotations
+
+import sys, os
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+import pytest
+from typing import Any
+from unittest.mock import MagicMock, call
+
+from ceph_secrets_client import CephSecretsClient, MgrRemote
+from ceph_secrets_types import SecretScope
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+def _make_mgr(return_value: Any = None, raise_exc: Exception = None) -> MagicMock:
+ """Return a mock that satisfies the MgrRemote Protocol."""
+ mgr = MagicMock()
+ if raise_exc is not None:
+ mgr.remote.side_effect = raise_exc
+ else:
+ mgr.remote.return_value = return_value
+ return mgr
+
+
+def _client(return_value: Any = None, raise_exc: Exception = None) -> CephSecretsClient:
+ return CephSecretsClient(_make_mgr(return_value, raise_exc))
+
+
+# ============================================================
+# CephSecretsClient construction
+# ============================================================
+
+class TestConstruction:
+ def test_default_module_name(self):
+ mgr = MagicMock()
+ client = CephSecretsClient(mgr)
+ assert client.module == "ceph_secrets"
+
+ def test_custom_module_name(self):
+ mgr = MagicMock()
+ client = CephSecretsClient(mgr, module="my_secrets")
+ assert client.module == "my_secrets"
+
+ def test_stores_mgr(self):
+ mgr = MagicMock()
+ client = CephSecretsClient(mgr)
+ assert client.mgr is mgr
+
+
+# ============================================================
+# _remote error handling
+# ============================================================
+
+class TestRemoteErrorHandling:
+ def test_passes_through_return_value(self):
+ client = _client(return_value=42)
+ assert client._remote("some_method", foo="bar") == 42
+
+ def test_wraps_exception_as_runtime_error(self):
+ client = _client(raise_exc=RuntimeError("module down"))
+ with pytest.raises(RuntimeError, match="Cannot call secrets mgr-module"):
+ client._remote("any_method")
+
+ def test_exception_includes_module_name(self):
+ mgr = _make_mgr(raise_exc=Exception("boom"))
+ client = CephSecretsClient(mgr, module="my_secrets")
+ with pytest.raises(RuntimeError, match="my_secrets"):
+ client._remote("any_method")
+
+ def test_exception_is_enabled_hint(self):
+ client = _client(raise_exc=Exception("unavailable"))
+ with pytest.raises(RuntimeError, match="enabled"):
+ client._remote("any_method")
+
+
+# ============================================================
+# secret_get_epoch
+# ============================================================
+
+class TestSecretGetEpoch:
+ def test_calls_correct_method(self):
+ mgr = _make_mgr(return_value=5)
+ client = CephSecretsClient(mgr)
+ result = client.secret_get_epoch("cephadm")
+ mgr.remote.assert_called_once_with("ceph_secrets", "secret_get_epoch", namespace="cephadm")
+ assert result == 5
+
+ def test_returns_zero_epoch(self):
+ assert _client(return_value=0).secret_get_epoch("ns") == 0
+
+
+# ============================================================
+# secret_get
+# ============================================================
+
+class TestSecretGet:
+ def test_calls_remote_correctly(self):
+ mgr = _make_mgr(return_value={"metadata": {"version": 1}})
+ client = CephSecretsClient(mgr)
+ result = client.secret_get("ns", SecretScope.GLOBAL, "", "pw")
+ mgr.remote.assert_called_once_with(
+ "ceph_secrets", "secret_get",
+ namespace="ns", scope=SecretScope.GLOBAL,
+ target="", name="pw", reveal=False,
+ )
+ assert result["metadata"]["version"] == 1
+
+ def test_reveal_kwarg_forwarded(self):
+ mgr = _make_mgr(return_value={"metadata": {}, "data": "s3cr3t"})
+ client = CephSecretsClient(mgr)
+ client.secret_get("ns", SecretScope.GLOBAL, "", "pw", reveal=True)
+ _, kwargs = mgr.remote.call_args
+ assert kwargs["reveal"] is True
+
+ def test_returns_none_when_not_found(self):
+ assert _client(return_value=None).secret_get("ns", "global", "", "ghost") is None
+
+ def test_scope_as_string(self):
+ mgr = _make_mgr(return_value=None)
+ client = CephSecretsClient(mgr)
+ client.secret_get("ns", "host", "node1", "ssh")
+ _, kwargs = mgr.remote.call_args
+ assert kwargs["scope"] == "host"
+
+
+# ============================================================
+# secret_get_value
+# ============================================================
+
+class TestSecretGetValue:
+ def test_calls_remote(self):
+ mgr = _make_mgr(return_value="s3cr3t")
+ client = CephSecretsClient(mgr)
+ result = client.secret_get_value("ns", SecretScope.GLOBAL, "", "pw")
+ mgr.remote.assert_called_once_with(
+ "ceph_secrets", "secret_get_value",
+ namespace="ns", scope=SecretScope.GLOBAL, target="", name="pw",
+ )
+ assert result == "s3cr3t"
+
+ def test_returns_none_if_not_found(self):
+ assert _client(return_value=None).secret_get_value("ns", "global", "", "ghost") is None
+
+ def test_returns_opaque_string(self):
+ payload = '{"u": "a", "p": "b"}'
+ result = _client(return_value=payload).secret_get_value("ns", "global", "", "creds")
+ assert result == payload
+
+ def test_module_unreachable_raises(self):
+ with pytest.raises(RuntimeError, match="Cannot call"):
+ _client(raise_exc=Exception("down")).secret_get_value("ns", "global", "", "pw")
+
+
+# secret_get_version
+# ============================================================
+
+class TestSecretGetVersion:
+ def test_calls_remote(self):
+ mgr = _make_mgr(return_value=3)
+ client = CephSecretsClient(mgr)
+ result = client.secret_get_version("ns", SecretScope.GLOBAL, "", "k")
+ mgr.remote.assert_called_once_with(
+ "ceph_secrets", "secret_get_version",
+ namespace="ns", scope=SecretScope.GLOBAL, target="", name="k",
+ )
+ assert result == 3
+
+ def test_returns_none_if_not_found(self):
+ assert _client(return_value=None).secret_get_version("ns", "global", "", "ghost") is None
+
+
+# ============================================================
+# secret_get_versions
+# ============================================================
+
+class TestSecretGetVersions:
+ def test_calls_remote_with_list(self):
+ uris = ["secret:/ns/global/a", "secret:/ns/global/b"]
+ expected = {"secret:/ns/global/a": 1, "secret:/ns/global/b": None}
+ mgr = _make_mgr(return_value=expected)
+ client = CephSecretsClient(mgr)
+ result = client.secret_get_versions(uris)
+ mgr.remote.assert_called_once_with("ceph_secrets", "secret_get_versions", uris=uris)
+ assert result == expected
+
+ def test_returns_empty_dict_for_empty_list(self):
+ result = _client(return_value={}).secret_get_versions([])
+ assert result == {}
+
+
+# ============================================================
+# secret_set
+# ============================================================
+
+class TestSecretSet:
+ def test_calls_remote_with_defaults(self):
+ expected = {"metadata": {"version": 1}}
+ mgr = _make_mgr(return_value=expected)
+ client = CephSecretsClient(mgr)
+ result = client.secret_set("ns", SecretScope.GLOBAL, "", "pw", "x")
+ mgr.remote.assert_called_once_with(
+ "ceph_secrets", "secret_set",
+ namespace="ns", scope=SecretScope.GLOBAL,
+ target="", name="pw", data="x",
+ user_made=True, editable=True,
+ )
+ assert result == expected
+
+ def test_user_made_editable_overrides(self):
+ mgr = _make_mgr(return_value={})
+ client = CephSecretsClient(mgr)
+ client.secret_set("ns", "global", "", "k", "x", user_made=False, editable=False)
+ _, kwargs = mgr.remote.call_args
+ assert kwargs["user_made"] is False
+ assert kwargs["editable"] is False
+
+ def test_raises_on_module_error(self):
+ with pytest.raises(RuntimeError):
+ _client(raise_exc=Exception("down")).secret_set("ns", "global", "", "k", "x")
+
+
+# ============================================================
+# secret_rm
+# ============================================================
+
+class TestSecretRm:
+ def test_calls_remote(self):
+ mgr = _make_mgr(return_value=True)
+ client = CephSecretsClient(mgr)
+ result = client.secret_rm("ns", SecretScope.GLOBAL, "", "k")
+ mgr.remote.assert_called_once_with(
+ "ceph_secrets", "secret_rm",
+ namespace="ns", scope=SecretScope.GLOBAL, target="", name="k",
+ )
+ assert result is True
+
+ def test_returns_false_when_not_found(self):
+ assert _client(return_value=False).secret_rm("ns", "global", "", "ghost") is False
+
+ def test_truthy_value_coerced_to_bool(self):
+ # remote might return 1 instead of True
+ assert _client(return_value=1).secret_rm("ns", "global", "", "k") is True
+
+ def test_falsy_value_coerced_to_bool(self):
+ assert _client(return_value=0).secret_rm("ns", "global", "", "k") is False
+
+
+# ============================================================
+# scan_unresolved_refs
+# ============================================================
+
+class TestScanUnresolvedRefs:
+ def test_calls_remote(self):
+ expected = ["secret:/ns/global/missing"]
+ mgr = _make_mgr(return_value=expected)
+ client = CephSecretsClient(mgr)
+ result = client.scan_unresolved_refs({"k": "secret:/ns/global/missing"}, "ns")
+ mgr.remote.assert_called_once_with(
+ "ceph_secrets", "scan_unresolved_refs",
+ obj={"k": "secret:/ns/global/missing"}, namespace="ns",
+ )
+ assert result == expected
+
+ def test_returns_empty_when_all_resolved(self):
+ assert _client(return_value=[]).scan_unresolved_refs({}, "ns") == []
+
+
+# ============================================================
+# scan_refs
+# ============================================================
+
+class TestScanRefs:
+ def test_calls_remote(self):
+ expected = ["secret:/ns/global/pw"]
+ mgr = _make_mgr(return_value=expected)
+ client = CephSecretsClient(mgr)
+ result = client.scan_refs("secret:/ns/global/pw", "ns")
+ mgr.remote.assert_called_once_with(
+ "ceph_secrets", "scan_refs",
+ obj="secret:/ns/global/pw", namespace="ns",
+ )
+ assert result == expected
+
+ def test_returns_empty_for_no_refs(self):
+ assert _client(return_value=[]).scan_refs("no refs here", "ns") == []
+
+
+# ============================================================
+# resolve_object
+# ============================================================
+
+class TestResolveObject:
+ def test_calls_remote(self):
+ mgr = _make_mgr(return_value={"password": "s3cr3t"})
+ client = CephSecretsClient(mgr)
+ result = client.resolve_object({"password": "secret:/ns/global/pw"})
+ mgr.remote.assert_called_once_with(
+ "ceph_secrets", "resolve_object",
+ obj={"password": "secret:/ns/global/pw"},
+ )
+ assert result["password"] == "s3cr3t"
+
+ def test_raises_on_unresolvable(self):
+ with pytest.raises(RuntimeError):
+ _client(raise_exc=Exception("missing")).resolve_object("secret:/ns/global/ghost")
+
+ def test_returns_plain_value_unchanged(self):
+ result = _client(return_value="plain").resolve_object("plain")
+ assert result == "plain"
--- /dev/null
+# -*- coding: utf-8 -*-
+"""
+Unit tests for ceph_secrets_types.py.
+Placed at the same level as the module under test (src/pybind/mgr/).
+"""
+from __future__ import annotations
+
+import sys, os
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+import pytest
+
+from ceph_secrets_types import (
+ CephSecretException,
+ CephSecretDataError,
+ CephSecretNotFoundError,
+ SecretScope,
+ SecretRef,
+ BadSecretURI,
+ parse_secret_uri,
+ parse_secret_path,
+ validate_secret_namespace,
+ SECRET_SCHEME,
+ _validate_segment,
+ _validate_custom_path,
+ _quote_segment,
+ _quote_custom_path,
+)
+
+
+# ============================================================
+# Module constants
+# ============================================================
+
+class TestConstants:
+ def test_secret_scheme(self):
+ assert SECRET_SCHEME == "secret"
+
+
+# ============================================================
+# Exception hierarchy
+# ============================================================
+
+class TestExceptions:
+ def test_base_is_exception(self):
+ assert issubclass(CephSecretException, Exception)
+
+ def test_data_error_inherits_base(self):
+ assert issubclass(CephSecretDataError, CephSecretException)
+
+ def test_not_found_inherits_base(self):
+ assert issubclass(CephSecretNotFoundError, CephSecretException)
+
+ def test_can_raise_and_catch_base(self):
+ with pytest.raises(CephSecretException):
+ raise CephSecretException("test")
+
+ def test_can_raise_and_catch_data_error_as_base(self):
+ with pytest.raises(CephSecretException):
+ raise CephSecretDataError("data")
+
+ def test_can_raise_and_catch_not_found_as_base(self):
+ with pytest.raises(CephSecretException):
+ raise CephSecretNotFoundError("gone")
+
+
+# ============================================================
+# _validate_segment
+# ============================================================
+
+class TestValidateSegment:
+ @pytest.mark.parametrize("v", [
+ "simple", "with-dashes", "under_score", "dot.dot", "a1b2",
+ "UPPER", "Mixed123",
+ ])
+ def test_valid(self, v):
+ _validate_segment("f", v)
+
+ @pytest.mark.parametrize("bad", [
+ "", " ", "has space", "a/b", "a:b", "star*",
+ ])
+ def test_invalid(self, bad):
+ with pytest.raises(ValueError):
+ _validate_segment("f", bad)
+
+ def test_ends_with_dot_invalid(self):
+ with pytest.raises(ValueError, match="must not end"):
+ _validate_segment("f", "end.")
+
+ def test_non_string_raises(self):
+ with pytest.raises(ValueError, match="must be a string"):
+ _validate_segment("f", None)
+
+
+# ============================================================
+# _validate_custom_path
+# ============================================================
+
+class TestValidateCustomPath:
+ @pytest.mark.parametrize("p", [
+ "single", "a/b", "a/b/c", "deep/path/here",
+ ])
+ def test_valid(self, p):
+ _validate_custom_path(p)
+
+ def test_empty_string(self):
+ with pytest.raises(ValueError, match="must not be empty"):
+ _validate_custom_path("")
+
+ def test_empty_segment_in_middle(self):
+ with pytest.raises(ValueError, match="empty segments"):
+ _validate_custom_path("a//b")
+
+ def test_trailing_slash(self):
+ with pytest.raises(ValueError, match="empty segments"):
+ _validate_custom_path("a/b/")
+
+ def test_leading_slash_empty_first_segment(self):
+ with pytest.raises(ValueError, match="empty segments"):
+ _validate_custom_path("/a/b")
+
+ def test_segment_with_bad_chars(self):
+ with pytest.raises(ValueError):
+ _validate_custom_path("a/b c/d")
+
+
+# ============================================================
+# _quote helpers
+# ============================================================
+
+class TestQuoteHelpers:
+ def test_quote_segment_no_slash(self):
+ # clean identifiers are unchanged
+ assert _quote_segment("cephadm") == "cephadm"
+
+ def test_quote_custom_path_preserves_slash(self):
+ assert _quote_custom_path("a/b/c") == "a/b/c"
+
+
+# ============================================================
+# validate_secret_namespace
+# ============================================================
+
+class TestValidateSecretNamespace:
+ def test_valid_namespace(self):
+ validate_secret_namespace("cephadm")
+
+ def test_empty_namespace(self):
+ with pytest.raises(ValueError):
+ validate_secret_namespace("")
+
+ def test_namespace_with_space(self):
+ with pytest.raises(ValueError):
+ validate_secret_namespace("bad name")
+
+
+# ============================================================
+# SecretScope
+# ============================================================
+
+class TestSecretScope:
+ def test_all_values(self):
+ assert SecretScope.GLOBAL.value == "global"
+ assert SecretScope.SERVICE.value == "service"
+ assert SecretScope.HOST.value == "host"
+ assert SecretScope.CUSTOM.value == "custom"
+
+ def test_from_str_valid(self):
+ for v in ("global", "service", "host", "custom"):
+ scope = SecretScope.from_str(v)
+ assert scope.value == v
+
+ def test_from_str_invalid(self):
+ with pytest.raises(CephSecretException, match="Invalid secret scope"):
+ SecretScope.from_str("vault")
+
+ def test_is_str_subclass(self):
+ assert isinstance(SecretScope.GLOBAL, str)
+
+ # validate_fields – global
+ def test_global_valid(self):
+ SecretScope.GLOBAL.validate_fields("", "mykey")
+
+ def test_global_nonempty_target_fails(self):
+ with pytest.raises(ValueError, match="target must be empty"):
+ SecretScope.GLOBAL.validate_fields("target", "key")
+
+ def test_global_empty_name_fails(self):
+ with pytest.raises(ValueError):
+ SecretScope.GLOBAL.validate_fields("", "")
+
+ # validate_fields – service / host
+ def test_service_valid(self):
+ SecretScope.SERVICE.validate_fields("prometheus", "auth")
+
+ def test_service_empty_target_fails(self):
+ with pytest.raises(ValueError):
+ SecretScope.SERVICE.validate_fields("", "auth")
+
+ def test_host_valid(self):
+ SecretScope.HOST.validate_fields("node1", "ssh_key")
+
+ def test_host_empty_name_fails(self):
+ with pytest.raises(ValueError):
+ SecretScope.HOST.validate_fields("node1", "")
+
+ # validate_fields – custom
+ def test_custom_valid_flat(self):
+ SecretScope.CUSTOM.validate_fields("", "some-key")
+
+ def test_custom_valid_nested(self):
+ SecretScope.CUSTOM.validate_fields("", "a/b/c")
+
+ def test_custom_nonempty_target_fails(self):
+ with pytest.raises(ValueError, match="target must be empty"):
+ SecretScope.CUSTOM.validate_fields("badtarget", "key")
+
+ def test_custom_empty_name_fails(self):
+ with pytest.raises(ValueError):
+ SecretScope.CUSTOM.validate_fields("", "")
+
+
+# ============================================================
+# SecretRef
+# ============================================================
+
+class TestSecretRef:
+ def test_global_ref_construction(self):
+ ref = SecretRef("ns", SecretScope.GLOBAL, "", "pw")
+ assert ref.namespace == "ns"
+ assert ref.scope == SecretScope.GLOBAL
+ assert ref.target == ""
+ assert ref.name == "pw"
+
+ def test_service_ref(self):
+ ref = SecretRef("ns", SecretScope.SERVICE, "prom", "auth")
+ assert ref.target == "prom"
+
+ def test_host_ref(self):
+ ref = SecretRef("ns", SecretScope.HOST, "n1", "k")
+ assert ref.name == "k"
+
+ def test_custom_ref(self):
+ ref = SecretRef("ns", SecretScope.CUSTOM, "", "a/b/c")
+ assert ref.name == "a/b/c"
+
+ def test_is_frozen(self):
+ ref = SecretRef("ns", SecretScope.GLOBAL, "", "k")
+ with pytest.raises(Exception):
+ ref.name = "other" # type: ignore[misc]
+
+ def test_scope_coerced_from_string(self):
+ ref = SecretRef("ns", "host", "node1", "k")
+ assert ref.scope == SecretScope.HOST
+
+ def test_invalid_scope_raises(self):
+ with pytest.raises(ValueError):
+ SecretRef("ns", "badscope", "", "k")
+
+ def test_invalid_namespace_raises(self):
+ with pytest.raises(ValueError):
+ SecretRef("bad ns", SecretScope.GLOBAL, "", "k")
+
+ def test_ident_global(self):
+ ref = SecretRef("ns", SecretScope.GLOBAL, "", "k")
+ assert ref.ident() == ("ns", "global", "", "k")
+
+ def test_ident_service(self):
+ ref = SecretRef("ns", SecretScope.SERVICE, "t", "n")
+ assert ref.ident() == ("ns", "service", "t", "n")
+
+ def test_to_uri_global(self):
+ ref = SecretRef("cephadm", SecretScope.GLOBAL, "", "pw")
+ assert ref.to_uri() == "secret:/cephadm/global/pw"
+
+ def test_to_uri_service(self):
+ ref = SecretRef("ns", SecretScope.SERVICE, "prom", "auth")
+ assert ref.to_uri() == "secret:/ns/service/prom/auth"
+
+ def test_to_uri_host(self):
+ ref = SecretRef("ns", SecretScope.HOST, "node1", "ssh")
+ assert ref.to_uri() == "secret:/ns/host/node1/ssh"
+
+ def test_to_uri_custom(self):
+ ref = SecretRef("ns", SecretScope.CUSTOM, "", "a/b/c")
+ assert ref.to_uri() == "secret:/ns/custom/a/b/c"
+
+ def test_equality(self):
+ r1 = SecretRef("ns", SecretScope.GLOBAL, "", "k")
+ r2 = SecretRef("ns", SecretScope.GLOBAL, "", "k")
+ assert r1 == r2
+
+ def test_inequality_different_name(self):
+ r1 = SecretRef("ns", SecretScope.GLOBAL, "", "k1")
+ r2 = SecretRef("ns", SecretScope.GLOBAL, "", "k2")
+ assert r1 != r2
+
+
+# ============================================================
+# BadSecretURI
+# ============================================================
+
+class TestBadSecretURI:
+ def test_construction(self):
+ b = BadSecretURI(raw="secret:/bad/scope/key", error="bad scope", namespace="ns")
+ assert b.raw == "secret:/bad/scope/key"
+ assert b.error == "bad scope"
+
+ def test_to_uri_returns_raw(self):
+ b = BadSecretURI(raw="secret:/bad", error="err", namespace="ns")
+ assert b.to_uri() == "secret:/bad"
+
+ def test_is_frozen(self):
+ b = BadSecretURI(raw="x", error="e", namespace="n")
+ with pytest.raises(Exception):
+ b.raw = "y" # type: ignore[misc]
+
+
+# ============================================================
+# parse_secret_uri
+# ============================================================
+
+class TestParseSecretUri:
+ # --- valid inputs ---
+ def test_global(self):
+ ref = parse_secret_uri("secret:/ns/global/pw")
+ assert ref.scope == SecretScope.GLOBAL
+ assert ref.name == "pw"
+
+ def test_service(self):
+ ref = parse_secret_uri("secret:/ns/service/prom/auth")
+ assert ref.scope == SecretScope.SERVICE
+ assert ref.target == "prom"
+ assert ref.name == "auth"
+
+ def test_host(self):
+ ref = parse_secret_uri("secret:/ns/host/node1/ssh")
+ assert ref.target == "node1"
+ assert ref.name == "ssh"
+
+ def test_custom_flat(self):
+ ref = parse_secret_uri("secret:/ns/custom/single")
+ assert ref.scope == SecretScope.CUSTOM
+ assert ref.name == "single"
+
+ def test_custom_nested(self):
+ ref = parse_secret_uri("secret:/ns/custom/a/b/c")
+ assert ref.name == "a/b/c"
+
+ # --- invalid inputs ---
+ def test_wrong_scheme(self):
+ with pytest.raises(CephSecretException, match="Not a secret uri"):
+ parse_secret_uri("http://example.com/foo")
+
+ def test_authority_rejected(self):
+ with pytest.raises(CephSecretException, match="authority"):
+ parse_secret_uri("secret://ns/global/key")
+
+ def test_query_string_rejected(self):
+ with pytest.raises(CephSecretException, match="query"):
+ parse_secret_uri("secret:/ns/global/key?x=y")
+
+ def test_fragment_rejected(self):
+ with pytest.raises(CephSecretException, match="query"):
+ parse_secret_uri("secret:/ns/global/key#frag")
+
+ def test_percent_encoding_rejected(self):
+ with pytest.raises(CephSecretException, match="percent-encoding"):
+ parse_secret_uri("secret:/ns/global/my%2Dkey")
+
+ def test_too_short(self):
+ with pytest.raises(CephSecretException):
+ parse_secret_uri("secret:/ns/global")
+
+ def test_bad_scope(self):
+ with pytest.raises(CephSecretException):
+ parse_secret_uri("secret:/ns/vault/key")
+
+ def test_non_string_input(self):
+ with pytest.raises(CephSecretException, match="must be a string"):
+ parse_secret_uri(42) # type: ignore[arg-type]
+
+ def test_global_with_extra_segment_rejected(self):
+ with pytest.raises(CephSecretException):
+ parse_secret_uri("secret:/ns/global/target/name")
+
+ def test_service_missing_name(self):
+ with pytest.raises(CephSecretException):
+ parse_secret_uri("secret:/ns/service/prom")
+
+
+# ============================================================
+# parse_secret_path
+# ============================================================
+
+class TestParseSecretPath:
+ # --- valid inputs ---
+ def test_global(self):
+ ref = parse_secret_path("cephadm/global/pw")
+ assert ref.namespace == "cephadm"
+ assert ref.scope == SecretScope.GLOBAL
+ assert ref.name == "pw"
+
+ def test_service(self):
+ ref = parse_secret_path("ns/service/prom/auth")
+ assert ref.target == "prom"
+ assert ref.name == "auth"
+
+ def test_host(self):
+ ref = parse_secret_path("ns/host/node1/ssh_key")
+ assert ref.name == "ssh_key"
+
+ def test_custom_flat(self):
+ ref = parse_secret_path("ns/custom/flat")
+ assert ref.scope == SecretScope.CUSTOM
+ assert ref.name == "flat"
+
+ def test_custom_nested(self):
+ ref = parse_secret_path("ns/custom/a/b/c")
+ assert ref.name == "a/b/c"
+
+ def test_leading_slash_stripped(self):
+ ref = parse_secret_path("/ns/global/k")
+ assert ref.namespace == "ns"
+
+ # --- invalid inputs ---
+ def test_empty_string(self):
+ with pytest.raises(CephSecretException, match="empty"):
+ parse_secret_path("")
+
+ def test_non_string(self):
+ with pytest.raises(CephSecretException):
+ parse_secret_path(None) # type: ignore[arg-type]
+
+ def test_too_few_segments(self):
+ with pytest.raises(CephSecretException, match="Use"):
+ parse_secret_path("ns/global")
+
+ def test_double_slash(self):
+ with pytest.raises(CephSecretException):
+ parse_secret_path("ns//global/key")
+
+ def test_trailing_slash(self):
+ with pytest.raises(CephSecretException, match="empty segment"):
+ parse_secret_path("ns/global/key/")
+
+ def test_bad_scope(self):
+ with pytest.raises(CephSecretException):
+ parse_secret_path("ns/badscope/key")
+
+ def test_global_extra_segment(self):
+ with pytest.raises(CephSecretException, match="global scope"):
+ parse_secret_path("ns/global/target/name")
+
+ def test_service_missing_name(self):
+ with pytest.raises(CephSecretException, match="service"):
+ parse_secret_path("ns/service/target")
+
+ def test_host_missing_name(self):
+ with pytest.raises(CephSecretException, match="host"):
+ parse_secret_path("ns/host/target")
+
+
+# ============================================================
+# Whitespace / canonicality
+# ============================================================
+
+class TestParseSecretPathWhitespace:
+ @pytest.mark.parametrize("path", [
+ " ns/global/key",
+ "ns/global/key ",
+ "\tns/global/key",
+ "ns/global/key\n",
+ ])
+ def test_rejects_outer_whitespace(self, path):
+ with pytest.raises(CephSecretException):
+ parse_secret_path(path)
+
+
+class TestParseSecretUriWhitespace:
+ def test_rejects_leading_whitespace(self):
+ with pytest.raises(CephSecretException):
+ parse_secret_uri(" secret:/ns/global/key")
+
+ def test_rejects_trailing_whitespace(self):
+ with pytest.raises(CephSecretException):
+ parse_secret_uri("secret:/ns/global/key ")