From: Redouane Kachach Date: Thu, 11 Jun 2026 08:55:43 +0000 (+0200) Subject: mgr/ceph_secrets: add unit tests for all modules X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=431fac7aded8abb60f77fd272b11afc85137419f;p=ceph.git mgr/ceph_secrets: add unit tests for all modules Add pytest coverage for the full stack: secret types and URI/path parsing, storage backend contract, Mon KV store (CRUD, epoch, serialization, corruption handling), SecretMgr (scan/resolve), module RPC surface and CLI handlers, and the CephSecretsClient wrapper. Gate test imports on the UNITTEST env var following the SMB module pattern. Fixes: https://tracker.ceph.com/issues/74562 Assisted-by: Claude Assisted-by: ChatGPT Signed-off-by: Redouane Kachach --- diff --git a/src/pybind/mgr/ceph_secrets/__init__.py b/src/pybind/mgr/ceph_secrets/__init__.py index ee85dc9d376..b02a13abff1 100644 --- a/src/pybind/mgr/ceph_secrets/__init__.py +++ b/src/pybind/mgr/ceph_secrets/__init__.py @@ -1,2 +1,8 @@ -# flake8: noqa +import os + +if 'UNITTEST' in os.environ: + import tests # noqa: F401 + from .module import Module + +__all__ = ['Module'] diff --git a/src/pybind/mgr/ceph_secrets/tests/__init__.py b/src/pybind/mgr/ceph_secrets/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/pybind/mgr/ceph_secrets/tests/conftest.py b/src/pybind/mgr/ceph_secrets/tests/conftest.py new file mode 100644 index 00000000000..3625d3edc9b --- /dev/null +++ b/src/pybind/mgr/ceph_secrets/tests/conftest.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +import pytest +from mgr_module import MgrModule + + +@pytest.fixture +def mgr() -> MgrModule: + return MgrModule.__new__(MgrModule) + + +@pytest.fixture +def store(mgr: MgrModule): + from ceph_secrets.secret_store import SecretStoreMon + return SecretStoreMon(mgr) + + +@pytest.fixture +def secret_mgr(store): + from ceph_secrets.secret_mgr import SecretMgr + return SecretMgr(store) diff --git a/src/pybind/mgr/ceph_secrets/tests/test_backends.py b/src/pybind/mgr/ceph_secrets/tests/test_backends.py new file mode 100644 index 00000000000..8f40f475066 --- /dev/null +++ b/src/pybind/mgr/ceph_secrets/tests/test_backends.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +"""Tests for ceph_secrets.backends (registry of storage backends).""" +from __future__ import annotations + +from ceph_secrets.backends import BACKENDS +from ceph_secrets.secret_store import SecretStoreMon + + +class TestBackendsRegistry: + def test_mon_backend_registered(self): + assert "mon" in BACKENDS + + def test_mon_maps_to_correct_class(self): + assert BACKENDS["mon"] is SecretStoreMon + + def test_only_known_backends(self): + # Update this test when new backends are added. + assert set(BACKENDS.keys()) == {"mon"} + + def test_backend_is_instantiable(self, mgr): + cls = BACKENDS["mon"] + instance = cls(mgr) + assert isinstance(instance, SecretStoreMon) diff --git a/src/pybind/mgr/ceph_secrets/tests/test_module.py b/src/pybind/mgr/ceph_secrets/tests/test_module.py new file mode 100644 index 00000000000..8728ec8bb07 --- /dev/null +++ b/src/pybind/mgr/ceph_secrets/tests/test_module.py @@ -0,0 +1,424 @@ +# -*- coding: utf-8 -*- +"""Tests for ceph_secrets.module.Module — RPC surface and internal methods.""" +from __future__ import annotations + +import json +import pytest +from typing import Any +from unittest.mock import MagicMock, patch + +from ceph_secrets.module import Module +from ceph_secrets.secret_store import SecretStoreMon +from ceph_secrets.secret_mgr import SecretMgr +from ceph_secrets_types import ( + SecretScope, + SecretRef, + CephSecretException, + CephSecretDataError, +) + + +# --------------------------------------------------------------------------- +# Helper: build a Module without going through MgrModule.__init__ +# --------------------------------------------------------------------------- + +def _make_module(mgr_stub) -> Module: + """Bypass MgrModule.__init__ and wire up manually.""" + mod = object.__new__(Module) + mod.secret_mgr = SecretMgr(SecretStoreMon(mgr_stub)) + return mod + + +# --------------------------------------------------------------------------- +# --------------------------------------------------------------------------- +# Module.__init__ — backend selection +# --------------------------------------------------------------------------- + +class TestModuleInit: + def test_default_backend_initialises_secret_mgr(self, mgr): + """Module.__init__ with default 'mon' backend wires up SecretMgr.""" + from mgr_module import MgrModule + mod = object.__new__(Module) + with patch.object(MgrModule, "__init__", lambda self, *a, **kw: None): + with patch.object(MgrModule, "get_module_option", return_value="mon"): + Module.__init__(mod, mgr) + assert isinstance(mod.secret_mgr, SecretMgr) + + def test_unsupported_backend_raises(self, mgr): + """Unknown backend name raises RuntimeError.""" + from mgr_module import MgrModule + with patch.object(MgrModule, "__init__", lambda self, *a, **kw: None): + with patch("ceph_secrets.module.BACKENDS", {}): + with patch.object(MgrModule, "get_module_option", return_value="vault"): + with pytest.raises(RuntimeError, match="Unsupported secrets backend"): + Module.__init__(object.__new__(Module), mgr) + + def test_backend_init_failure_raises(self, mgr): + """Backend constructor failure raises RuntimeError with helpful message.""" + from mgr_module import MgrModule + broken_cls = MagicMock(side_effect=Exception("connection refused")) + with patch.object(MgrModule, "__init__", lambda self, *a, **kw: None): + with patch("ceph_secrets.module.BACKENDS", {"mon": broken_cls}): + with patch.object(MgrModule, "get_module_option", return_value="mon"): + with pytest.raises(RuntimeError, match="Failed to initialize secrets backend"): + Module.__init__(object.__new__(Module), mgr) + + +# --------------------------------------------------------------------------- +# Module RPC surface +# --------------------------------------------------------------------------- + +class TestModuleRPC: + @pytest.fixture + def mod(self, mgr): + return _make_module(mgr) + + def test_secret_get_epoch_zero(self, mod): + assert mod.secret_get_epoch("cephadm") == 0 + + def test_secret_get_epoch_after_set(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x") + assert mod.secret_get_epoch("ns") == 1 + + def test_secret_set_and_get(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t") + result = mod.secret_get("ns", SecretScope.GLOBAL, "", "pw", reveal=True) + assert result is not None + assert result["data"] == "s3cr3t" + + def test_secret_get_without_reveal_hides_data(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t") + result = mod.secret_get("ns", SecretScope.GLOBAL, "", "pw", reveal=False) + assert result is not None + assert "data" not in result + + def test_secret_get_missing_returns_none(self, mod): + assert mod.secret_get("ns", SecretScope.GLOBAL, "", "ghost") is None + + def test_secret_get_version_existing(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x") + assert mod.secret_get_version("ns", SecretScope.GLOBAL, "", "k") == 1 + + def test_secret_get_version_missing_returns_none(self, mod): + assert mod.secret_get_version("ns", SecretScope.GLOBAL, "", "ghost") is None + + def test_secret_get_value_existing(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t") + assert mod.secret_get_value("ns", SecretScope.GLOBAL, "", "pw") == "s3cr3t" + + def test_secret_get_value_missing_returns_none(self, mod): + assert mod.secret_get_value("ns", SecretScope.GLOBAL, "", "ghost") is None + + def test_secret_get_value_opaque_string(self, mod): + # Any string is valid — including one that looks like JSON + mod.secret_set("ns", SecretScope.GLOBAL, "", "creds", '{"u": "a", "p": "b"}') + assert mod.secret_get_value("ns", SecretScope.GLOBAL, "", "creds") == '{"u": "a", "p": "b"}' + + def test_secret_get_value_corrupt_raises_data_error(self, mgr, mod): + mgr.set_store("secret_store/v1/ns/global/bad", "{not json") + with pytest.raises(CephSecretDataError): + mod.secret_get_value("ns", SecretScope.GLOBAL, "", "bad") + + def test_secret_get_versions_batch(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "a", "data-a") + mod.secret_set("ns", SecretScope.GLOBAL, "", "b", "data-b") + result = mod.secret_get_versions([ + "secret:/ns/global/a", + "secret:/ns/global/b", + "secret:/ns/global/ghost", + ]) + assert result["secret:/ns/global/a"] == 1 + assert result["secret:/ns/global/b"] == 1 + assert result["secret:/ns/global/ghost"] is None + + def test_secret_get_versions_skips_invalid_uris(self, mod): + result = mod.secret_get_versions(["not-a-uri", "secret:/ns/badscope/key"]) + assert len(result) == 0 + + def test_secret_get_versions_empty_list(self, mod): + assert mod.secret_get_versions([]) == {} + + def test_secret_get_versions_corrupt_raises_data_error(self, mgr, mod): + """Corruption must not be silently swallowed as a missing/invalid URI.""" + mgr.set_store("secret_store/v1/ns/global/bad", "{not json") + with pytest.raises(CephSecretDataError): + mod.secret_get_versions(["secret:/ns/global/bad"]) + + def test_secret_set_returns_metadata(self, mod): + result = mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x") + assert "metadata" in result + assert result["metadata"]["version"] == 1 + + def test_secret_set_empty_data_raises(self, mod): + with pytest.raises(CephSecretException, match="must not be empty"): + mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "") + + def test_secret_rm_existing(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x") + assert mod.secret_rm("ns", SecretScope.GLOBAL, "", "k") is True + + def test_secret_rm_nonexistent(self, mod): + assert mod.secret_rm("ns", SecretScope.GLOBAL, "", "ghost") is False + + def test_secret_ls_empty(self, mod): + assert mod.secret_ls(namespace="ns") == {} + + def test_secret_ls_returns_records(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "a", "data-a") + mod.secret_set("ns", SecretScope.GLOBAL, "", "b", "data-b") + out = mod.secret_ls(namespace="ns") + assert "ns/global/a" in out + assert "ns/global/b" in out + + def test_secret_ls_scope_filter(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "g", "data-g") + mod.secret_set("ns", SecretScope.SERVICE, "prom", "auth", "data-auth") + out = mod.secret_ls(namespace="ns", scope="service") + assert all("service" in k for k in out.keys()) + + def test_secret_ls_with_target(self, mod): + mod.secret_set("ns", SecretScope.SERVICE, "prom", "auth", "data-auth") + out = mod.secret_ls(namespace="ns", scope="service", target="prom") + assert "ns/service/prom/auth" in out + + def test_secret_ls_show_internals(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x") + out = mod.secret_ls(namespace="ns", show_internals=True) + rec = out["ns/global/k"] + assert "policy" in rec + + def test_secret_ls_key_format_global(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "k", "data-x") + out = mod.secret_ls(namespace="ns") + assert "ns/global/k" in out + for k in out: + assert "//" not in k + + def test_scan_refs(self, mod): + result = mod.scan_refs({"key": "secret:/ns/global/pw"}, namespace="ns") + assert "secret:/ns/global/pw" in result + + def test_scan_unresolved_refs_all_missing(self, mod): + result = mod.scan_unresolved_refs("secret:/ns/global/ghost", namespace="ns") + assert "secret:/ns/global/ghost" in result + + def test_scan_unresolved_refs_exists(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "x") + result = mod.scan_unresolved_refs("secret:/ns/global/pw", namespace="ns") + assert result == [] + + def test_resolve_object(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t") + result = mod.resolve_object("secret:/ns/global/pw") + assert result == "s3cr3t" + + +# --------------------------------------------------------------------------- +# Module internal ref-based methods +# --------------------------------------------------------------------------- + +class TestModuleInternals: + @pytest.fixture + def mod(self, mgr): + return _make_module(mgr) + + def _ref(self, ns="ns", scope=SecretScope.GLOBAL, target="", name="key"): + return SecretRef(ns, scope, target, name) + + def test_secret_get_existing(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "key", "data-x") + result = mod._secret_get(self._ref()) + assert result is not None + + def test_secret_get_not_found(self, mod): + assert mod._secret_get(self._ref(name="ghost")) is None + + def test_secret_get_corrupt_raises_data_error(self, mgr, mod): + mgr.set_store("secret_store/v1/ns/global/bad", "{not json") + with pytest.raises(CephSecretDataError): + mod._secret_get(self._ref(name="bad")) + + def test_secret_get_version_existing(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "key", "data-x") + assert mod._secret_get_version(self._ref()) == 1 + + def test_secret_get_version_not_found(self, mod): + assert mod._secret_get_version(self._ref(name="ghost")) is None + + def test_secret_get_version_corrupt_raises_data_error(self, mgr, mod): + mgr.set_store("secret_store/v1/ns/global/bad", "{not json") + with pytest.raises(CephSecretDataError): + mod._secret_get_version(self._ref(name="bad")) + + def test_secret_set_returns_metadata_without_data(self, mod): + result = mod._secret_set(self._ref(), "data-x") + assert "metadata" in result + assert "data" not in result + + def test_secret_rm_existing(self, mod): + mod._secret_set(self._ref(), "data-x") + assert mod._secret_rm(self._ref()) is True + + def test_secret_rm_nonexistent(self, mod): + assert mod._secret_rm(self._ref(name="ghost")) is False + + +# --------------------------------------------------------------------------- +# CLI handler logic (path-based dispatch) +# --------------------------------------------------------------------------- + +class TestModuleCLIHandlers: + @pytest.fixture + def mod(self, mgr): + return _make_module(mgr) + + def _ok(self, result: Any) -> Any: + """Unwrap a Responder tuple (retcode, body, status) → parsed dict. + Falls through unchanged when Responder is a no-op stub.""" + if isinstance(result, tuple): + retcode, body, _ = result + assert retcode == 0, f"unexpected error retcode {retcode}: {body}" + return json.loads(body) + return result + + def _assert_error(self, result: Any, match: str = "") -> None: + """Assert a CLI call failed — either via a non-zero tuple or ErrorResponse. + Never accepts a raw CephSecretException (would mean _handle_secret_errors broke).""" + from object_format import ErrorResponse + if isinstance(result, tuple): + retcode, body, status = result + assert retcode != 0, "expected non-zero retcode" + if match: + assert match in (body + status).lower(), \ + f"expected {match!r} in response, got body={body!r} status={status!r}" + elif isinstance(result, ErrorResponse): + if match: + assert match in str(result).lower() + else: + raise AssertionError( + f"expected error tuple or ErrorResponse, got {type(result).__name__}: {result!r}" + ) + + def test_cli_get_by_path_found(self, mod): + mod.secret_set("ns", SecretScope.GLOBAL, "", "key", "data-x") + result = self._ok(mod._cli_secret_get_by_path(path="ns/global/key")) + assert "metadata" in result + + def test_cli_get_by_path_not_found(self, mod): + from object_format import ErrorResponse + try: + result = mod._cli_secret_get_by_path(path="ns/global/ghost") + self._assert_error(result, match="not found") + except ErrorResponse as e: + assert "not found" in str(e).lower() + + def test_cli_get_by_path_bad_path(self, mod): + from object_format import ErrorResponse + try: + result = mod._cli_secret_get_by_path(path="ns/badscope/key") + self._assert_error(result) + except ErrorResponse: + pass + + def test_cli_get_corrupt_raises_data_error(self, mgr, mod): + from object_format import ErrorResponse + mgr.set_store("secret_store/v1/ns/global/bad", "{not json") + # CephSecretDataError is a CephSecretException subclass so _handle_secret_errors + # wraps it into ErrorResponse; Responder formats it as a non-zero tuple. + try: + result = mod._cli_secret_get_by_path(path="ns/global/bad") + self._assert_error(result) + except ErrorResponse: + pass + + def test_cli_set_by_path(self, mod): + result = self._ok(mod._cli_secret_set_by_path( + path="ns/global/key", + inbuf="s3cr3t" + )) + assert result["metadata"]["version"] == 1 + + def test_cli_set_by_path_no_inbuf(self, mod): + from object_format import ErrorResponse + try: + result = mod._cli_secret_set_by_path(path="ns/global/key", inbuf=None) + self._assert_error(result, match="-i") + except ErrorResponse as e: + assert "-i" in str(e) + + def test_cli_set_by_path_empty_data(self, mod): + from object_format import ErrorResponse + try: + result = mod._cli_secret_set_by_path(path="ns/global/key", inbuf="") + self._assert_error(result, match="must not be empty") + except ErrorResponse as e: + assert "must not be empty" in str(e).lower() + + def test_cli_rm_by_path_existing(self, mod): + mod._cli_secret_set_by_path(path="ns/global/key", inbuf='{"v": "x"}') + result = self._ok(mod._cli_secret_rm_by_path(path="ns/global/key")) + assert result["status"] == "removed" + + def test_cli_rm_by_path_not_found(self, mod): + result = self._ok(mod._cli_secret_rm_by_path(path="ns/global/ghost")) + assert result["status"] == "not_found" + + def test_cli_rm_by_path_bad_path(self, mod): + from object_format import ErrorResponse + try: + result = mod._cli_secret_rm_by_path(path="ns/badscope/key") + self._assert_error(result) + except ErrorResponse: + pass + + def test_cli_ls(self, mod): + mod._cli_secret_set_by_path(path="ns/global/a", inbuf='{"v": "1"}') + mod._cli_secret_set_by_path(path="ns/global/b", inbuf='{"v": "2"}') + result = self._ok(mod._cli_secret_ls(namespace="ns")) + assert "ns/global/a" in result + assert "ns/global/b" in result + + def test_cli_get_with_reveal(self, mod): + mod._cli_secret_set_by_path(path="ns/global/pw", inbuf="s3cr3t") + result = self._ok(mod._cli_secret_get_by_path(path="ns/global/pw", reveal=True)) + assert result["data"] == "s3cr3t" + + def test_cli_get_without_reveal_hides_data(self, mod): + mod._cli_secret_set_by_path(path="ns/global/pw", inbuf="s3cr3t") + result = self._ok(mod._cli_secret_get_by_path(path="ns/global/pw", reveal=False)) + assert "data" not in result + + def _get_value(self, result: Any) -> str: + """Unwrap a get-value tuple (retcode, body, status) → raw string.""" + if isinstance(result, tuple): + retcode, body, _ = result + assert retcode == 0, f"unexpected error retcode {retcode}: {body}" + return body + return result + + def test_cli_get_value_returns_raw_string(self, mod): + mod._cli_secret_set_by_path(path="ns/global/pw", inbuf="s3cr3t") + result = self._get_value(mod._cli_secret_get_value_by_path(path="ns/global/pw")) + assert result == "s3cr3t" + + def test_cli_get_value_not_found_raises(self, mod): + from object_format import ErrorResponse + try: + result = mod._cli_secret_get_value_by_path(path="ns/global/ghost") + self._assert_error(result) + except ErrorResponse: + pass + + def test_cli_get_value_bad_path_raises(self, mod): + from object_format import ErrorResponse + try: + result = mod._cli_secret_get_value_by_path(path="ns/badscope/key") + self._assert_error(result) + except ErrorResponse: + pass + + def test_cli_get_value_opaque_json_string(self, mod): + payload = '{"u": "a", "p": "b"}' + mod._cli_secret_set_by_path(path="ns/global/creds", inbuf=payload) + result = self._get_value(mod._cli_secret_get_value_by_path(path="ns/global/creds")) + assert result == payload diff --git a/src/pybind/mgr/ceph_secrets/tests/test_secret_backend.py b/src/pybind/mgr/ceph_secrets/tests/test_secret_backend.py new file mode 100644 index 00000000000..1b98dd67355 --- /dev/null +++ b/src/pybind/mgr/ceph_secrets/tests/test_secret_backend.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +"""Tests for the SecretStorageBackend ABC and contract.""" +from __future__ import annotations + +import pytest +from abc import ABC + +from ceph_secrets.secret_backend import SecretStorageBackend +from ceph_secrets_types import SecretScope + + +class TestSecretStorageBackend: + def test_is_abstract(self): + assert issubclass(SecretStorageBackend, ABC) + + def test_cannot_instantiate_directly(self): + with pytest.raises(TypeError): + SecretStorageBackend() # type: ignore[abstract] + + def test_concrete_subclass_instantiates(self, store): + """SecretStoreMon (the concrete impl) must satisfy the ABC.""" + assert isinstance(store, SecretStorageBackend) + + def test_abstract_methods_defined(self): + abstract = SecretStorageBackend.__abstractmethods__ + assert 'get' in abstract + assert 'set' in abstract + assert 'rm' in abstract + assert 'ls' in abstract + assert 'get_epoch' in abstract + assert 'bump_epoch' in abstract + + def test_partial_implementation_still_abstract(self): + """Subclass missing any abstract method stays abstract.""" + class PartialImpl(SecretStorageBackend): + def get(self, ns, scope, target, name): + pass + + def set(self, ns, scope, target, name, data, user_made=True, editable=True): + pass + + def rm(self, ns, scope, target, name): + pass + + def ls(self, namespace=None, scope=None, target=None): + pass + + def get_epoch(self, namespace): + pass + # missing bump_epoch + + with pytest.raises(TypeError): + PartialImpl() # type: ignore[abstract] + + def test_full_implementation_is_instantiable(self): + + class FullImpl(SecretStorageBackend): + + def get(self, ns, scope, target, name): + return None + + def set(self, ns, scope, target, name, data, user_made=True, editable=True): + pass + + def rm(self, ns, scope, target, name): + return False + + def ls(self, namespace=None, scope=None, target=None): + return [] + + def get_epoch(self, namespace): + return 0 + + def bump_epoch(self, namespace): + return 1 + + impl = FullImpl() + assert impl.get("ns", SecretScope.GLOBAL, "", "k") is None + assert impl.get_epoch("ns") == 0 + assert impl.bump_epoch("ns") == 1 diff --git a/src/pybind/mgr/ceph_secrets/tests/test_secret_mgr.py b/src/pybind/mgr/ceph_secrets/tests/test_secret_mgr.py new file mode 100644 index 00000000000..a4be58a4432 --- /dev/null +++ b/src/pybind/mgr/ceph_secrets/tests/test_secret_mgr.py @@ -0,0 +1,339 @@ +# -*- coding: utf-8 -*- +"""Tests for ceph_secrets.secret_mgr (SecretMgr).""" +from __future__ import annotations + +import pytest + +from ceph_secrets_types import ( + SecretScope, SecretRef, + CephSecretException, CephSecretNotFoundError, + BadSecretURI, +) + + +# ============================================================ +# make_ref +# ============================================================ + +class TestMakeRef: + def test_basic(self, secret_mgr): + ref = secret_mgr.make_ref("ns", SecretScope.GLOBAL, "", "key") + assert isinstance(ref, SecretRef) + assert ref.namespace == "ns" + + def test_scope_as_string(self, secret_mgr): + ref = secret_mgr.make_ref("ns", "host", "node1", "ssh_key") + assert ref.scope == SecretScope.HOST + + def test_bad_scope_raises(self, secret_mgr): + with pytest.raises(CephSecretException): + secret_mgr.make_ref("ns", "badscope", "", "key") + + def test_bad_namespace_raises(self, secret_mgr): + with pytest.raises(CephSecretException): + secret_mgr.make_ref("bad ns!", SecretScope.GLOBAL, "", "key") + + +# ============================================================ +# get / get_value +# ============================================================ + +class TestGet: + def test_get_existing(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t") + ref = SecretRef("ns", SecretScope.GLOBAL, "", "pw") + rec = secret_mgr.get(ref) + assert rec.data == "s3cr3t" + + def test_get_missing_raises(self, secret_mgr): + ref = SecretRef("ns", SecretScope.GLOBAL, "", "ghost") + with pytest.raises(CephSecretNotFoundError): + secret_mgr.get(ref) + + def test_get_value(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t") + ref = SecretRef("ns", SecretScope.GLOBAL, "", "pw") + assert secret_mgr.get_value(ref) == "s3cr3t" + + def test_get_value_opaque_string(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "creds", '{"u": "admin", "p": "pw"}') + ref = SecretRef("ns", SecretScope.GLOBAL, "", "creds") + val = secret_mgr.get_value(ref) + assert isinstance(val, str) + assert val == '{"u": "admin", "p": "pw"}' + + def test_get_value_missing_raises(self, secret_mgr): + ref = SecretRef("ns", SecretScope.GLOBAL, "", "ghost") + with pytest.raises(CephSecretNotFoundError): + secret_mgr.get_value(ref) + + +# ============================================================ +# set +# ============================================================ + +class TestSet: + def test_set_global(self, secret_mgr): + rec = secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "data-v1") + assert rec.metadata.version == 1 + assert rec.data == "data-v1" + + def test_set_service(self, secret_mgr): + rec = secret_mgr.set("ns", SecretScope.SERVICE, "prom", "auth", "user-a") + assert rec.target == "prom" + + def test_set_custom(self, secret_mgr): + rec = secret_mgr.set("ns", SecretScope.CUSTOM, "", "a/b/c", "tok") + assert rec.name == "a/b/c" + + def test_set_non_str_raises(self, secret_mgr): + with pytest.raises(CephSecretException, match="string"): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", {"not": "a-string"}) # type: ignore[arg-type] + + def test_set_empty_string_raises(self, secret_mgr): + with pytest.raises(CephSecretException, match="must not be empty"): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "") + + def test_set_preserves_whitespace_and_newlines(self, secret_mgr): + payload = " secret-value\n" + rec = secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", payload) + assert rec.data == payload + assert secret_mgr.get_value(SecretRef("ns", SecretScope.GLOBAL, "", "k")) == payload + + def test_set_increments_version(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "data-v1") + rec = secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "data-v2") + assert rec.metadata.version == 2 + + def test_set_scope_string(self, secret_mgr): + rec = secret_mgr.set("ns", "global", "", "k", "data-x") + assert rec.scope == SecretScope.GLOBAL + + +# ============================================================ +# rm +# ============================================================ + +class TestRm: + def test_rm_existing(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "k", "data-x") + assert secret_mgr.rm("ns", SecretScope.GLOBAL, "", "k") is True + + def test_rm_nonexistent(self, secret_mgr): + assert secret_mgr.rm("ns", SecretScope.GLOBAL, "", "ghost") is False + + +# ============================================================ +# ls +# ============================================================ + +class TestLs: + def test_ls_empty(self, secret_mgr): + assert secret_mgr.ls(namespace="ns") == [] + + def test_ls_returns_records(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "a", "data-a") + secret_mgr.set("ns", SecretScope.GLOBAL, "", "b", "data-b") + recs = secret_mgr.ls(namespace="ns") + assert len(recs) == 2 + + def test_ls_scope_filter(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "g", "data-g") + secret_mgr.set("ns", SecretScope.SERVICE, "prom", "auth", "data-auth") + recs = secret_mgr.ls(namespace="ns", scope="service") + assert len(recs) == 1 + assert recs[0].scope == SecretScope.SERVICE + + +# ============================================================ +# scan_refs / scan_unresolved_refs +# ============================================================ + +class TestScanRefs: + def test_scan_simple_string(self, secret_mgr): + obj = "secret:/ns/global/pw" + refs = secret_mgr.scan_refs(obj, namespace="ns") + uris = {r.to_uri() for r in refs} + assert "secret:/ns/global/pw" in uris + + def test_scan_in_dict(self, secret_mgr): + obj = {"key": "secret:/ns/global/pw"} + refs = secret_mgr.scan_refs(obj, namespace="ns") + assert len(refs) == 1 + + def test_scan_in_list(self, secret_mgr): + obj = ["secret:/ns/global/pw", "secret:/ns/host/node1/ssh"] + refs = secret_mgr.scan_refs(obj, namespace="ns") + assert len(refs) == 2 + + def test_scan_nested(self, secret_mgr): + obj = {"a": {"b": "secret:/ns/global/pw"}} + refs = secret_mgr.scan_refs(obj, namespace="ns") + assert len(refs) == 1 + + def test_scan_no_refs(self, secret_mgr): + obj = {"plain": "value"} + assert secret_mgr.scan_refs(obj, namespace="ns") == set() + + def test_scan_bad_uri_yields_bad_secret_uri(self, secret_mgr): + obj = "secret:/ns/badscope/key" + refs = secret_mgr.scan_refs(obj, namespace="ns") + bad = [r for r in refs if isinstance(r, BadSecretURI)] + assert len(bad) == 1 + + def test_scan_unresolved_all_exist(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "x") + obj = "secret:/ns/global/pw" + unresolved = secret_mgr.scan_unresolved_refs(obj, namespace="ns") + assert len(unresolved) == 0 + + def test_scan_unresolved_missing(self, secret_mgr): + obj = "secret:/ns/global/ghost" + unresolved = secret_mgr.scan_unresolved_refs(obj, namespace="ns") + assert len(unresolved) == 1 + + def test_scan_unresolved_bad_uri_is_unresolved(self, secret_mgr): + obj = "secret:/ns/badscope/key" + unresolved = secret_mgr.scan_unresolved_refs(obj, namespace="ns") + assert len(unresolved) == 1 + + def test_scan_unresolved_embedded_uri_is_unresolved(self, secret_mgr): + # Even though the referenced secret exists, an embedded (non-whole-value) + # URI is never resolvable, so it must be reported as unresolved so that + # pre-deploy validation catches it. + secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "x") + obj = {"auth": "Bearer secret:/ns/global/pw"} + unresolved = secret_mgr.scan_unresolved_refs(obj, namespace="ns") + assert len(unresolved) == 1 + + +# ============================================================ +# resolve_object +# ============================================================ + +class TestResolveObject: + def test_resolve_secret(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t") + result = secret_mgr.resolve_object("secret:/ns/global/pw") + assert result == "s3cr3t" + + def test_resolve_opaque_json_string(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "creds", '{"u": "a", "p": "b"}') + result = secret_mgr.resolve_object("secret:/ns/global/creds") + assert isinstance(result, str) + assert result == '{"u": "a", "p": "b"}' + + def test_resolve_in_dict(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t") + result = secret_mgr.resolve_object({"password": "secret:/ns/global/pw"}) + assert result["password"] == "s3cr3t" + + def test_resolve_in_list(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "a", "x") + result = secret_mgr.resolve_object(["secret:/ns/global/a", "plain"]) + assert result[0] == "x" + assert result[1] == "plain" + + def test_resolve_in_tuple(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "t", "y") + result = secret_mgr.resolve_object(("secret:/ns/global/t",)) + assert result == ("y",) + + def test_resolve_non_secret_string_unchanged(self, secret_mgr): + result = secret_mgr.resolve_object("just a normal string") + assert result == "just a normal string" + + def test_resolve_non_string_unchanged(self, secret_mgr): + assert secret_mgr.resolve_object(42) == 42 + assert secret_mgr.resolve_object(None) is None + + def test_resolve_missing_secret_raises(self, secret_mgr): + with pytest.raises(CephSecretException): + secret_mgr.resolve_object("secret:/ns/global/ghost") + + def test_resolve_invalid_uri_raises(self, secret_mgr): + with pytest.raises(CephSecretException): + secret_mgr.resolve_object("secret:/ns/badscope/key") + + def test_resolve_edge_whitespace_tolerated(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t") + assert secret_mgr.resolve_object(" secret:/ns/global/pw ") == "s3cr3t" + + def test_resolve_newline_whitespace_tolerated(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t") + assert secret_mgr.resolve_object("\nsecret:/ns/global/pw\n") == "s3cr3t" + + def test_resolve_ref_with_suffix_raises(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t") + with pytest.raises(CephSecretException): + secret_mgr.resolve_object("secret:/ns/global/pw suffix") + + def test_resolve_embedded_uri_raises(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t") + with pytest.raises(CephSecretException, match="embedded"): + secret_mgr.resolve_object("Bearer secret:/ns/global/pw") + + def test_resolve_embedded_uri_in_dict_raises(self, secret_mgr): + secret_mgr.set("ns", SecretScope.GLOBAL, "", "pw", "s3cr3t") + with pytest.raises(CephSecretException, match="embedded"): + secret_mgr.resolve_object({"auth": "Bearer secret:/ns/global/pw"}) + + def test_resolve_plain_string_whitespace_preserved(self, secret_mgr): + # non-secret strings pass through byte-for-byte, including whitespace + assert secret_mgr.resolve_object(" plain value ") == " plain value " + + +# ============================================================ +# scan_refs — edge cases +# ============================================================ + +class TestScanRefsEdgeCases: + def test_embedded_refs_rejected_as_bad(self, secret_mgr): + # A string that embeds URIs inside other text is NOT a whole-value + # reference; it is surfaced as a single BadSecretURI, not parsed into + # multiple SecretRefs. + obj = "use secret:/ns/global/a and secret:/ns/global/b" + refs = secret_mgr.scan_refs(obj, namespace="ns") + assert len(refs) == 1 + bad = [r for r in refs if isinstance(r, BadSecretURI)] + assert len(bad) == 1 + assert bad[0].raw == obj + + def test_duplicate_refs_deduped(self, secret_mgr): + obj = ["secret:/ns/global/pw", "secret:/ns/global/pw"] + refs = secret_mgr.scan_refs(obj, namespace="ns") + uris = [r.to_uri() for r in refs] + assert uris.count("secret:/ns/global/pw") == 1 + + def test_ref_followed_by_punctuation(self, secret_mgr): + # "secret:/ns/global/pw," is a whole-value string that starts with the + # prefix but is not a valid URI (trailing comma in name) → BadSecretURI. + obj = "secret:/ns/global/pw," + refs = secret_mgr.scan_refs(obj, namespace="ns") + bad = [r for r in refs if isinstance(r, BadSecretURI)] + assert len(bad) == 1 + + def test_whole_value_ref_with_edge_whitespace(self, secret_mgr): + # Surrounding whitespace around an otherwise-clean URI is tolerated. + obj = " secret:/ns/global/pw " + refs = secret_mgr.scan_refs(obj, namespace="ns") + uris = {r.to_uri() for r in refs if isinstance(r, SecretRef)} + assert "secret:/ns/global/pw" in uris + + def test_embedded_ref_in_dict_value_is_bad(self, secret_mgr): + obj = {"auth": "Bearer secret:/ns/global/pw"} + refs = secret_mgr.scan_refs(obj, namespace="ns") + bad = [r for r in refs if isinstance(r, BadSecretURI)] + assert len(bad) == 1 + + def test_ref_inside_tuple(self, secret_mgr): + obj = ("secret:/ns/global/pw",) + refs = secret_mgr.scan_refs(obj, namespace="ns") + assert len(refs) == 1 + + def test_cross_namespace_ref_is_scanned(self, secret_mgr): + """scan_refs finds refs regardless of namespace — namespace arg does not filter.""" + obj = "secret:/other/global/pw" + refs = secret_mgr.scan_refs(obj, namespace="ns") + uris = {r.to_uri() for r in refs} + assert "secret:/other/global/pw" in uris diff --git a/src/pybind/mgr/ceph_secrets/tests/test_secret_store.py b/src/pybind/mgr/ceph_secrets/tests/test_secret_store.py new file mode 100644 index 00000000000..5473944d695 --- /dev/null +++ b/src/pybind/mgr/ceph_secrets/tests/test_secret_store.py @@ -0,0 +1,516 @@ +# -*- coding: utf-8 -*- +"""Tests for ceph_secrets.secret_store (SecretStoreMon + data model).""" +from __future__ import annotations + +import pytest + +# conftest injects stubs; just import production code after. +from ceph_secrets.secret_store import ( + SecretRecord, + SecretMetadata, + SecretPolicy, + SECRET_STORE_PREFIX, + SECRET_STORE_FORMAT_VERSION, + _checked_ref, + _checked_namespace, + _epoch_key, + _now_iso, + _expect_bool, + _expect_str, + _expect_positive_int, + _expect_object, + _reject_unknown_keys, + _require_keys, +) +from ceph_secrets_types import ( + SecretScope, SecretRef, + CephSecretException, CephSecretDataError, +) + + +# ============================================================ +# Helpers / primitive validators +# ============================================================ + +class TestPrimitiveHelpers: + def test_now_iso_format(self): + ts = _now_iso() + import re + assert re.match(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', ts) + + def test_expect_bool_ok(self): + assert _expect_bool("f", True) is True + assert _expect_bool("f", False) is False + + def test_expect_bool_bad(self): + with pytest.raises(CephSecretDataError, match="must be a boolean"): + _expect_bool("f", 1) + + def test_expect_str_ok(self): + assert _expect_str("f", "hello") == "hello" + + def test_expect_str_bad(self): + with pytest.raises(CephSecretDataError, match="must be a string"): + _expect_str("f", 42) + + def test_expect_positive_int_ok(self): + assert _expect_positive_int("f", 1) == 1 + assert _expect_positive_int("f", 99) == 99 + + def test_expect_positive_int_zero(self): + with pytest.raises(CephSecretDataError): + _expect_positive_int("f", 0) + + def test_expect_positive_int_bool_rejected(self): + with pytest.raises(CephSecretDataError): + _expect_positive_int("f", True) + + def test_expect_object_ok(self): + d = {"a": 1} + assert _expect_object("o", d) is d + + def test_expect_object_bad(self): + with pytest.raises(CephSecretDataError, match="must be a JSON object"): + _expect_object("o", [1, 2]) + + def test_reject_unknown_keys(self): + with pytest.raises(CephSecretDataError, match="unknown field"): + _reject_unknown_keys("L", {"a": 1, "z": 2}, {"a"}) + + def test_require_keys_missing(self): + with pytest.raises(CephSecretDataError, match="missing required"): + _require_keys("L", {"a": 1}, {"a", "b"}) + + def test_checked_namespace_valid(self): + assert _checked_namespace("cephadm") == "cephadm" + + def test_checked_namespace_bad(self): + with pytest.raises(CephSecretDataError): + _checked_namespace("bad ns!") + + def test_epoch_key_format(self): + assert _epoch_key("cephadm") == "secret_store/meta/cephadm/_epoch" + + def test_checked_ref_valid(self): + ref = _checked_ref("ns", SecretScope.GLOBAL, "", "key") + assert isinstance(ref, SecretRef) + + def test_checked_ref_bad_scope(self): + with pytest.raises(CephSecretDataError): + _checked_ref("ns", "bad", "", "key") + + +# ============================================================ +# SecretMetadata +# ============================================================ + +class TestSecretMetadata: + def test_defaults(self): + m = SecretMetadata(version=1, created="2024-01-01T00:00:00Z", updated="2024-01-01T00:00:00Z") + assert m.version == 1 + + def test_to_json(self): + m = SecretMetadata(version=2, created="2024-01-01T00:00:00Z", updated="2024-01-02T00:00:00Z") + d = m.to_json() + assert d["version"] == 2 + assert "created" in d + assert "updated" in d + + def test_from_json_roundtrip(self): + m = SecretMetadata(version=3, created="2024-01-01T00:00:00Z", updated="2024-01-02T00:00:00Z") + m2 = SecretMetadata.from_json(m.to_json()) + assert m2.version == 3 + + def test_from_json_missing_field(self): + with pytest.raises(CephSecretDataError, match="missing required"): + SecretMetadata.from_json({"version": 1, "created": "x"}) + + def test_from_json_unknown_field(self): + with pytest.raises(CephSecretDataError, match="unknown"): + SecretMetadata.from_json({"version": 1, "created": "x", "updated": "y", "extra": 1}) + + def test_version_zero_raises(self): + with pytest.raises(CephSecretDataError): + SecretMetadata(version=0, created="x", updated="y") + + def test_non_dict_raises(self): + with pytest.raises(CephSecretDataError): + SecretMetadata.from_json("not a dict") + + +# ============================================================ +# SecretPolicy +# ============================================================ + +class TestSecretPolicy: + def test_defaults(self): + p = SecretPolicy() + assert p.user_made is True + assert p.editable is True + + def test_to_json(self): + p = SecretPolicy(user_made=False, editable=True) + d = p.to_json() + assert d["user_made"] is False + + def test_from_json_roundtrip(self): + p = SecretPolicy(user_made=True, editable=False) + p2 = SecretPolicy.from_json(p.to_json()) + assert p2.editable is False + + def test_from_json_missing(self): + with pytest.raises(CephSecretDataError): + SecretPolicy.from_json({"user_made": True}) + + def test_from_json_bad_type(self): + with pytest.raises(CephSecretDataError, match="must be a boolean"): + SecretPolicy.from_json({"user_made": 1, "editable": True}) + + def test_non_bool_raises(self): + with pytest.raises(CephSecretDataError): + SecretPolicy(user_made="yes", editable=True) # type: ignore[arg-type] + + +# ============================================================ +# SecretRecord +# ============================================================ + +class TestSecretRecord: + def _ref(self, scope=SecretScope.GLOBAL, target="", name="key"): + return SecretRef("ns", scope, target, name) + + def _meta(self): + return SecretMetadata(version=1, created="2024-01-01T00:00:00Z", updated="2024-01-01T00:00:00Z") + + def test_basic_construction(self): + rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="s3cr3t") + assert rec.namespace == "ns" + assert rec.scope == SecretScope.GLOBAL + assert rec.name == "key" + + def test_to_public_json_no_data(self): + rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="s3cr3t") + out = rec.to_public_json(include_data=False) + assert "data" not in out + assert "metadata" in out + + def test_to_public_json_with_data(self): + rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="s3cr3t") + out = rec.to_public_json(include_data=True) + assert out["data"] == "s3cr3t" + + def test_to_public_json_with_ref(self): + rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="x") + out = rec.to_public_json(include_ref=True) + assert out["ref"]["namespace"] == "ns" + assert out["ref"]["scope"] == "global" + + def test_to_public_json_with_policy(self): + rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="x") + out = rec.to_public_json(include_policy=True) + assert "policy" in out + + def test_to_store_json(self): + rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="x") + stored = rec.to_store_json() + assert stored["format_version"] == SECRET_STORE_FORMAT_VERSION + assert "data" in stored + assert "policy" in stored + + def test_from_store_json_roundtrip(self): + ref = self._ref() + rec = SecretRecord(ref=ref, metadata=self._meta(), data="pw") + rec2 = SecretRecord.from_store_json(ref, rec.to_store_json()) + assert rec2.data == "pw" + assert rec2.metadata.version == 1 + + def test_from_store_json_wrong_version(self): + ref = self._ref() + payload = { + "format_version": 99, + "metadata": {"version": 1, "created": "x", "updated": "y"}, + "policy": {"user_made": True, "editable": True}, + "data": "x", + } + with pytest.raises(CephSecretDataError, match="unsupported"): + SecretRecord.from_store_json(ref, payload) + + def test_from_store_json_missing_key(self): + ref = self._ref() + with pytest.raises(CephSecretDataError, match="missing required"): + SecretRecord.from_store_json(ref, {"format_version": 1, "metadata": {}}) + + def test_bad_ref_type_raises(self): + with pytest.raises(CephSecretDataError, match="must be a SecretRef"): + SecretRecord(ref="not-a-ref", metadata=self._meta(), data="x") # type: ignore[arg-type] + + def test_bad_metadata_type_raises(self): + with pytest.raises(CephSecretDataError): + SecretRecord(ref=self._ref(), metadata={"version": 1}, data="x") # type: ignore[arg-type] + + def test_bad_data_type_raises(self): + with pytest.raises(CephSecretDataError): + SecretRecord(ref=self._ref(), metadata=self._meta(), data={"not": "a-string"}) # type: ignore[arg-type] + + def test_empty_data_raises(self): + with pytest.raises(CephSecretDataError, match="must not be empty"): + SecretRecord(ref=self._ref(), metadata=self._meta(), data="") + + def test_data_in_store_json(self): + rec = SecretRecord(ref=self._ref(), metadata=self._meta(), data="myvalue") + stored = rec.to_store_json() + assert stored["data"] == "myvalue" + + def test_ident(self): + ref = self._ref() + rec = SecretRecord(ref=ref, metadata=self._meta(), data="x") + assert rec.ident() == ("ns", "global", "", "key") + + +# ============================================================ +# SecretStoreMon – epoch +# ============================================================ + +class TestSecretStoreMon_Epoch: + def test_initial_epoch_is_zero(self, store): + assert store.get_epoch("cephadm") == 0 + + def test_bump_epoch(self, store): + assert store.bump_epoch("cephadm") == 1 + assert store.bump_epoch("cephadm") == 2 + + def test_namespace_isolation(self, store): + store.bump_epoch("cephadm") + store.bump_epoch("cephadm") + assert store.get_epoch("rook") == 0 + + def test_epoch_key_stored_correctly(self, mgr, store): + store.bump_epoch("myns") + assert mgr.get_store("secret_store/meta/myns/_epoch") == "1" + + def test_corrupted_epoch_returns_zero(self, mgr, store): + mgr.set_store("secret_store/meta/ns/_epoch", "notanumber") + assert store.get_epoch("ns") == 0 + + +# ============================================================ +# SecretStoreMon – CRUD +# ============================================================ + +class TestSecretStoreMon_Crud: + def test_set_and_get_global(self, store): + store.set("ns", SecretScope.GLOBAL, "", "pw", "secret") + rec = store.get("ns", SecretScope.GLOBAL, "", "pw") + assert rec is not None + assert rec.data == "secret" + + def test_get_missing_returns_none(self, store): + assert store.get("ns", SecretScope.GLOBAL, "", "noexist") is None + + def test_set_increments_version(self, store): + store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1") + store.set("ns", SecretScope.GLOBAL, "", "k", "data-v2") + rec = store.get("ns", SecretScope.GLOBAL, "", "k") + assert rec.metadata.version == 2 + + def test_set_preserves_created_timestamp(self, store): + import time + store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1") + rec1 = store.get("ns", SecretScope.GLOBAL, "", "k") + time.sleep(1) + store.set("ns", SecretScope.GLOBAL, "", "k", "data-v2") + rec2 = store.get("ns", SecretScope.GLOBAL, "", "k") + assert rec1.metadata.created == rec2.metadata.created + assert rec2.metadata.updated != rec2.metadata.created + + def test_set_service_scope(self, store): + store.set("ns", SecretScope.SERVICE, "prometheus", "auth", "admin") + rec = store.get("ns", SecretScope.SERVICE, "prometheus", "auth") + assert rec.data == "admin" + + def test_set_host_scope(self, store): + store.set("ns", SecretScope.HOST, "node1", "ssh", "abc") + rec = store.get("ns", SecretScope.HOST, "node1", "ssh") + assert rec.data == "abc" + + def test_set_custom_scope(self, store): + store.set("ns", SecretScope.CUSTOM, "", "a/b/c", "tok") + rec = store.get("ns", SecretScope.CUSTOM, "", "a/b/c") + assert rec.data == "tok" + + def test_set_bumps_epoch(self, store): + store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1") + assert store.get_epoch("ns") == 1 + + def test_set_non_str_data_raises(self, store): + with pytest.raises(CephSecretException): + store.set("ns", SecretScope.GLOBAL, "", "k", {"not": "a-string"}) # type: ignore[arg-type] + + def test_set_empty_data_raises(self, store): + with pytest.raises(CephSecretException, match="must not be empty"): + store.set("ns", SecretScope.GLOBAL, "", "k", "") + + def test_set_non_editable_raises(self, store): + store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1", editable=False) + with pytest.raises(CephSecretException, match="not editable"): + store.set("ns", SecretScope.GLOBAL, "", "k", "data-v2") + + def test_rm_existing(self, store): + store.set("ns", SecretScope.GLOBAL, "", "k", "data-x") + assert store.rm("ns", SecretScope.GLOBAL, "", "k") is True + assert store.get("ns", SecretScope.GLOBAL, "", "k") is None + + def test_rm_nonexistent(self, store): + assert store.rm("ns", SecretScope.GLOBAL, "", "ghost") is False + + def test_rm_bumps_epoch(self, store): + store.set("ns", SecretScope.GLOBAL, "", "k", "data-x") + epoch_after_set = store.get_epoch("ns") + store.rm("ns", SecretScope.GLOBAL, "", "k") + assert store.get_epoch("ns") == epoch_after_set + 1 + + def test_rm_nonexistent_no_epoch_bump(self, store): + assert store.get_epoch("ns") == 0 + store.rm("ns", SecretScope.GLOBAL, "", "ghost") + assert store.get_epoch("ns") == 0 + + def test_get_corrupted_json_raises(self, mgr, store): + mgr.set_store( + f"{SECRET_STORE_PREFIX}ns/global/badkey", "NOT JSON" + ) + with pytest.raises(CephSecretDataError): + store.get("ns", SecretScope.GLOBAL, "", "badkey") + + def test_set_with_bad_scope_string(self, store): + with pytest.raises(CephSecretDataError): + store.set("ns", "badscope", "", "k", "data-v1") + + +# ============================================================ +# SecretStoreMon – ls +# ============================================================ + +class TestSecretStoreMon_Ls: + def test_ls_empty(self, store): + assert store.ls(namespace="ns") == [] + + def test_ls_returns_all(self, store): + store.set("ns", SecretScope.GLOBAL, "", "k1", "data-1") + store.set("ns", SecretScope.GLOBAL, "", "k2", "data-2") + recs = store.ls(namespace="ns") + assert len(recs) == 2 + + def test_ls_filter_scope(self, store): + store.set("ns", SecretScope.GLOBAL, "", "g", "data-g") + store.set("ns", SecretScope.SERVICE, "prom", "auth", "data-auth") + recs = store.ls(namespace="ns", scope=SecretScope.GLOBAL) + assert len(recs) == 1 + assert recs[0].scope == SecretScope.GLOBAL + + def test_ls_filter_target(self, store): + store.set("ns", SecretScope.SERVICE, "prom", "a1", "data-a1") + store.set("ns", SecretScope.SERVICE, "grafana", "a2", "data-a2") + recs = store.ls(namespace="ns", scope=SecretScope.SERVICE, target="prom") + assert len(recs) == 1 + assert recs[0].target == "prom" + + def test_ls_namespace_isolation(self, store): + store.set("ns1", SecretScope.GLOBAL, "", "k", "data-ns1") + store.set("ns2", SecretScope.GLOBAL, "", "k", "data-ns2") + recs = store.ls(namespace="ns1") + assert len(recs) == 1 + + def test_ls_custom_scope(self, store): + store.set("ns", SecretScope.CUSTOM, "", "a/b/c", "data-abc") + store.set("ns", SecretScope.CUSTOM, "", "x/y", "data-xy") + recs = store.ls(namespace="ns", scope=SecretScope.CUSTOM) + assert len(recs) == 2 + + def test_ls_sorted(self, store): + store.set("ns", SecretScope.GLOBAL, "", "zz", "data-zz") + store.set("ns", SecretScope.GLOBAL, "", "aa", "data-aa") + recs = store.ls(namespace="ns") + names = [r.name for r in recs] + assert names == sorted(names) + + def test_ls_invalid_namespace(self, store): + with pytest.raises(CephSecretDataError): + store.ls(namespace="bad ns!") + + def test_ls_scope_as_string(self, store): + store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1") + recs = store.ls(namespace="ns", scope="global") + assert len(recs) == 1 + + def test_ls_corrupted_json_raises(self, mgr, store): + mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global/bad", "NOT-JSON") + with pytest.raises(CephSecretDataError): + store.ls(namespace="ns") + + def test_ls_no_filter(self, store): + store.set("ns1", SecretScope.GLOBAL, "", "a", "data-a") + store.set("ns2", SecretScope.GLOBAL, "", "b", "data-b") + recs = store.ls() + assert len(recs) == 2 + + def test_ls_kv_key_structure_validation(self, mgr, store): + """A key with empty segments inside the prefix should raise CephSecretDataError.""" + mgr.set_store(f"{SECRET_STORE_PREFIX}ns//global/k", "{}") + with pytest.raises(CephSecretDataError, match="empty path component"): + store.ls(namespace="ns") + + +# ============================================================ +# TestSecretStoreMon_Ls — malformed persisted-record cases +# ============================================================ + +class TestSecretStoreMon_Ls_Malformed: + def test_ls_too_few_segments(self, mgr, store): + mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global", "{}") + with pytest.raises(CephSecretDataError, match="unexpected key structure"): + store.ls(namespace="ns") + + def test_ls_invalid_scope_in_key(self, mgr, store): + mgr.set_store(f"{SECRET_STORE_PREFIX}ns/badscope/key", "{}") + with pytest.raises(CephSecretDataError, match="invalid scope"): + store.ls(namespace="ns") + + def test_ls_global_with_extra_segment(self, mgr, store): + mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global/target/name", "{}") + with pytest.raises(CephSecretDataError, match="unexpected global key structure"): + store.ls(namespace="ns") + + def test_ls_service_with_missing_segment(self, mgr, store): + # service needs 4 parts (ns/service/target/name); 3 is too few + mgr.set_store(f"{SECRET_STORE_PREFIX}ns/service/onlytarget", "{}") + with pytest.raises(CephSecretDataError, match="unexpected targeted-scope key structure"): + store.ls(namespace="ns") + + def test_ls_payload_not_object(self, mgr, store): + mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global/k", '["not", "object"]') + with pytest.raises(CephSecretDataError, match="not a JSON object"): + store.ls(namespace="ns") + + def test_ls_payload_valid_json_missing_fields(self, mgr, store): + # Valid JSON object but missing required secret record fields + mgr.set_store(f"{SECRET_STORE_PREFIX}ns/global/k", '{"unexpected": 1}') + with pytest.raises(CephSecretDataError): + store.ls(namespace="ns") + + +# ============================================================ +# TestSecretStoreMon_Crud — editable=False allows rm by design +# ============================================================ + +class TestSecretStoreMon_EditableRm: + def test_rm_non_editable_secret_is_allowed(self, store): + """rm ignores editable — deletion is always permitted by design.""" + store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1", editable=False) + result = store.rm("ns", SecretScope.GLOBAL, "", "k") + assert result is True + + def test_set_non_editable_blocks_update(self, store): + """Confirmed: set refuses to update a non-editable secret.""" + store.set("ns", SecretScope.GLOBAL, "", "k", "data-v1", editable=False) + with pytest.raises(CephSecretException): + store.set("ns", SecretScope.GLOBAL, "", "k", "data-v2") diff --git a/src/pybind/mgr/ceph_secrets_client.py b/src/pybind/mgr/ceph_secrets_client.py index 2386de71043..3240641dbd8 100644 --- a/src/pybind/mgr/ceph_secrets_client.py +++ b/src/pybind/mgr/ceph_secrets_client.py @@ -249,8 +249,9 @@ class CephSecretsClient: rather than automatically by a Ceph component. Defaults to True; set to False for programmatically generated secrets. - editable: Whether the secret may be updated or removed by - automated tooling. Defaults to True. + editable: Whether the secret may be updated by automated + tooling. Deletion is always permitted regardless of + this flag. Defaults to True. Returns: A dict containing the written record's ``metadata`` object. The diff --git a/src/pybind/mgr/ceph_secrets_types.py b/src/pybind/mgr/ceph_secrets_types.py index ae2ccd58672..88cdce45422 100644 --- a/src/pybind/mgr/ceph_secrets_types.py +++ b/src/pybind/mgr/ceph_secrets_types.py @@ -196,6 +196,11 @@ def parse_secret_uri(uri: str) -> SecretRef: if not isinstance(uri, str): raise CephSecretException('secret uri must be a string') + if uri != uri.strip(): + raise CephSecretException( + f'Invalid secret uri {uri!r}: leading/trailing whitespace is not allowed' + ) + parsed = urlparse(uri) if parsed.scheme != SECRET_SCHEME: raise CephSecretException(f'Not a secret uri: {uri!r}') @@ -289,6 +294,11 @@ def parse_secret_path(path: str) -> SecretRef: if not isinstance(path, str): raise CephSecretException('secret path must be a string') + if path != path.strip(): + raise CephSecretException( + f'Invalid secret path {path!r}: leading/trailing whitespace is not allowed' + ) + p = path.strip() if not p: raise CephSecretException('Invalid secret path: empty') diff --git a/src/pybind/mgr/tests/test_ceph_secrets_client.py b/src/pybind/mgr/tests/test_ceph_secrets_client.py new file mode 100644 index 00000000000..ab52077cd27 --- /dev/null +++ b/src/pybind/mgr/tests/test_ceph_secrets_client.py @@ -0,0 +1,317 @@ +# -*- coding: utf-8 -*- +""" +Unit tests for ceph_secrets_client.py. +Placed at the same level as the module under test (src/pybind/mgr/). +""" +from __future__ import annotations + +import sys, os +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +import pytest +from typing import Any +from unittest.mock import MagicMock, call + +from ceph_secrets_client import CephSecretsClient, MgrRemote +from ceph_secrets_types import SecretScope + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +def _make_mgr(return_value: Any = None, raise_exc: Exception = None) -> MagicMock: + """Return a mock that satisfies the MgrRemote Protocol.""" + mgr = MagicMock() + if raise_exc is not None: + mgr.remote.side_effect = raise_exc + else: + mgr.remote.return_value = return_value + return mgr + + +def _client(return_value: Any = None, raise_exc: Exception = None) -> CephSecretsClient: + return CephSecretsClient(_make_mgr(return_value, raise_exc)) + + +# ============================================================ +# CephSecretsClient construction +# ============================================================ + +class TestConstruction: + def test_default_module_name(self): + mgr = MagicMock() + client = CephSecretsClient(mgr) + assert client.module == "ceph_secrets" + + def test_custom_module_name(self): + mgr = MagicMock() + client = CephSecretsClient(mgr, module="my_secrets") + assert client.module == "my_secrets" + + def test_stores_mgr(self): + mgr = MagicMock() + client = CephSecretsClient(mgr) + assert client.mgr is mgr + + +# ============================================================ +# _remote error handling +# ============================================================ + +class TestRemoteErrorHandling: + def test_passes_through_return_value(self): + client = _client(return_value=42) + assert client._remote("some_method", foo="bar") == 42 + + def test_wraps_exception_as_runtime_error(self): + client = _client(raise_exc=RuntimeError("module down")) + with pytest.raises(RuntimeError, match="Cannot call secrets mgr-module"): + client._remote("any_method") + + def test_exception_includes_module_name(self): + mgr = _make_mgr(raise_exc=Exception("boom")) + client = CephSecretsClient(mgr, module="my_secrets") + with pytest.raises(RuntimeError, match="my_secrets"): + client._remote("any_method") + + def test_exception_is_enabled_hint(self): + client = _client(raise_exc=Exception("unavailable")) + with pytest.raises(RuntimeError, match="enabled"): + client._remote("any_method") + + +# ============================================================ +# secret_get_epoch +# ============================================================ + +class TestSecretGetEpoch: + def test_calls_correct_method(self): + mgr = _make_mgr(return_value=5) + client = CephSecretsClient(mgr) + result = client.secret_get_epoch("cephadm") + mgr.remote.assert_called_once_with("ceph_secrets", "secret_get_epoch", namespace="cephadm") + assert result == 5 + + def test_returns_zero_epoch(self): + assert _client(return_value=0).secret_get_epoch("ns") == 0 + + +# ============================================================ +# secret_get +# ============================================================ + +class TestSecretGet: + def test_calls_remote_correctly(self): + mgr = _make_mgr(return_value={"metadata": {"version": 1}}) + client = CephSecretsClient(mgr) + result = client.secret_get("ns", SecretScope.GLOBAL, "", "pw") + mgr.remote.assert_called_once_with( + "ceph_secrets", "secret_get", + namespace="ns", scope=SecretScope.GLOBAL, + target="", name="pw", reveal=False, + ) + assert result["metadata"]["version"] == 1 + + def test_reveal_kwarg_forwarded(self): + mgr = _make_mgr(return_value={"metadata": {}, "data": "s3cr3t"}) + client = CephSecretsClient(mgr) + client.secret_get("ns", SecretScope.GLOBAL, "", "pw", reveal=True) + _, kwargs = mgr.remote.call_args + assert kwargs["reveal"] is True + + def test_returns_none_when_not_found(self): + assert _client(return_value=None).secret_get("ns", "global", "", "ghost") is None + + def test_scope_as_string(self): + mgr = _make_mgr(return_value=None) + client = CephSecretsClient(mgr) + client.secret_get("ns", "host", "node1", "ssh") + _, kwargs = mgr.remote.call_args + assert kwargs["scope"] == "host" + + +# ============================================================ +# secret_get_value +# ============================================================ + +class TestSecretGetValue: + def test_calls_remote(self): + mgr = _make_mgr(return_value="s3cr3t") + client = CephSecretsClient(mgr) + result = client.secret_get_value("ns", SecretScope.GLOBAL, "", "pw") + mgr.remote.assert_called_once_with( + "ceph_secrets", "secret_get_value", + namespace="ns", scope=SecretScope.GLOBAL, target="", name="pw", + ) + assert result == "s3cr3t" + + def test_returns_none_if_not_found(self): + assert _client(return_value=None).secret_get_value("ns", "global", "", "ghost") is None + + def test_returns_opaque_string(self): + payload = '{"u": "a", "p": "b"}' + result = _client(return_value=payload).secret_get_value("ns", "global", "", "creds") + assert result == payload + + def test_module_unreachable_raises(self): + with pytest.raises(RuntimeError, match="Cannot call"): + _client(raise_exc=Exception("down")).secret_get_value("ns", "global", "", "pw") + + +# secret_get_version +# ============================================================ + +class TestSecretGetVersion: + def test_calls_remote(self): + mgr = _make_mgr(return_value=3) + client = CephSecretsClient(mgr) + result = client.secret_get_version("ns", SecretScope.GLOBAL, "", "k") + mgr.remote.assert_called_once_with( + "ceph_secrets", "secret_get_version", + namespace="ns", scope=SecretScope.GLOBAL, target="", name="k", + ) + assert result == 3 + + def test_returns_none_if_not_found(self): + assert _client(return_value=None).secret_get_version("ns", "global", "", "ghost") is None + + +# ============================================================ +# secret_get_versions +# ============================================================ + +class TestSecretGetVersions: + def test_calls_remote_with_list(self): + uris = ["secret:/ns/global/a", "secret:/ns/global/b"] + expected = {"secret:/ns/global/a": 1, "secret:/ns/global/b": None} + mgr = _make_mgr(return_value=expected) + client = CephSecretsClient(mgr) + result = client.secret_get_versions(uris) + mgr.remote.assert_called_once_with("ceph_secrets", "secret_get_versions", uris=uris) + assert result == expected + + def test_returns_empty_dict_for_empty_list(self): + result = _client(return_value={}).secret_get_versions([]) + assert result == {} + + +# ============================================================ +# secret_set +# ============================================================ + +class TestSecretSet: + def test_calls_remote_with_defaults(self): + expected = {"metadata": {"version": 1}} + mgr = _make_mgr(return_value=expected) + client = CephSecretsClient(mgr) + result = client.secret_set("ns", SecretScope.GLOBAL, "", "pw", "x") + mgr.remote.assert_called_once_with( + "ceph_secrets", "secret_set", + namespace="ns", scope=SecretScope.GLOBAL, + target="", name="pw", data="x", + user_made=True, editable=True, + ) + assert result == expected + + def test_user_made_editable_overrides(self): + mgr = _make_mgr(return_value={}) + client = CephSecretsClient(mgr) + client.secret_set("ns", "global", "", "k", "x", user_made=False, editable=False) + _, kwargs = mgr.remote.call_args + assert kwargs["user_made"] is False + assert kwargs["editable"] is False + + def test_raises_on_module_error(self): + with pytest.raises(RuntimeError): + _client(raise_exc=Exception("down")).secret_set("ns", "global", "", "k", "x") + + +# ============================================================ +# secret_rm +# ============================================================ + +class TestSecretRm: + def test_calls_remote(self): + mgr = _make_mgr(return_value=True) + client = CephSecretsClient(mgr) + result = client.secret_rm("ns", SecretScope.GLOBAL, "", "k") + mgr.remote.assert_called_once_with( + "ceph_secrets", "secret_rm", + namespace="ns", scope=SecretScope.GLOBAL, target="", name="k", + ) + assert result is True + + def test_returns_false_when_not_found(self): + assert _client(return_value=False).secret_rm("ns", "global", "", "ghost") is False + + def test_truthy_value_coerced_to_bool(self): + # remote might return 1 instead of True + assert _client(return_value=1).secret_rm("ns", "global", "", "k") is True + + def test_falsy_value_coerced_to_bool(self): + assert _client(return_value=0).secret_rm("ns", "global", "", "k") is False + + +# ============================================================ +# scan_unresolved_refs +# ============================================================ + +class TestScanUnresolvedRefs: + def test_calls_remote(self): + expected = ["secret:/ns/global/missing"] + mgr = _make_mgr(return_value=expected) + client = CephSecretsClient(mgr) + result = client.scan_unresolved_refs({"k": "secret:/ns/global/missing"}, "ns") + mgr.remote.assert_called_once_with( + "ceph_secrets", "scan_unresolved_refs", + obj={"k": "secret:/ns/global/missing"}, namespace="ns", + ) + assert result == expected + + def test_returns_empty_when_all_resolved(self): + assert _client(return_value=[]).scan_unresolved_refs({}, "ns") == [] + + +# ============================================================ +# scan_refs +# ============================================================ + +class TestScanRefs: + def test_calls_remote(self): + expected = ["secret:/ns/global/pw"] + mgr = _make_mgr(return_value=expected) + client = CephSecretsClient(mgr) + result = client.scan_refs("secret:/ns/global/pw", "ns") + mgr.remote.assert_called_once_with( + "ceph_secrets", "scan_refs", + obj="secret:/ns/global/pw", namespace="ns", + ) + assert result == expected + + def test_returns_empty_for_no_refs(self): + assert _client(return_value=[]).scan_refs("no refs here", "ns") == [] + + +# ============================================================ +# resolve_object +# ============================================================ + +class TestResolveObject: + def test_calls_remote(self): + mgr = _make_mgr(return_value={"password": "s3cr3t"}) + client = CephSecretsClient(mgr) + result = client.resolve_object({"password": "secret:/ns/global/pw"}) + mgr.remote.assert_called_once_with( + "ceph_secrets", "resolve_object", + obj={"password": "secret:/ns/global/pw"}, + ) + assert result["password"] == "s3cr3t" + + def test_raises_on_unresolvable(self): + with pytest.raises(RuntimeError): + _client(raise_exc=Exception("missing")).resolve_object("secret:/ns/global/ghost") + + def test_returns_plain_value_unchanged(self): + result = _client(return_value="plain").resolve_object("plain") + assert result == "plain" diff --git a/src/pybind/mgr/tests/test_ceph_secrets_types.py b/src/pybind/mgr/tests/test_ceph_secrets_types.py new file mode 100644 index 00000000000..062af9b6fd9 --- /dev/null +++ b/src/pybind/mgr/tests/test_ceph_secrets_types.py @@ -0,0 +1,487 @@ +# -*- coding: utf-8 -*- +""" +Unit tests for ceph_secrets_types.py. +Placed at the same level as the module under test (src/pybind/mgr/). +""" +from __future__ import annotations + +import sys, os +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +import pytest + +from ceph_secrets_types import ( + CephSecretException, + CephSecretDataError, + CephSecretNotFoundError, + SecretScope, + SecretRef, + BadSecretURI, + parse_secret_uri, + parse_secret_path, + validate_secret_namespace, + SECRET_SCHEME, + _validate_segment, + _validate_custom_path, + _quote_segment, + _quote_custom_path, +) + + +# ============================================================ +# Module constants +# ============================================================ + +class TestConstants: + def test_secret_scheme(self): + assert SECRET_SCHEME == "secret" + + +# ============================================================ +# Exception hierarchy +# ============================================================ + +class TestExceptions: + def test_base_is_exception(self): + assert issubclass(CephSecretException, Exception) + + def test_data_error_inherits_base(self): + assert issubclass(CephSecretDataError, CephSecretException) + + def test_not_found_inherits_base(self): + assert issubclass(CephSecretNotFoundError, CephSecretException) + + def test_can_raise_and_catch_base(self): + with pytest.raises(CephSecretException): + raise CephSecretException("test") + + def test_can_raise_and_catch_data_error_as_base(self): + with pytest.raises(CephSecretException): + raise CephSecretDataError("data") + + def test_can_raise_and_catch_not_found_as_base(self): + with pytest.raises(CephSecretException): + raise CephSecretNotFoundError("gone") + + +# ============================================================ +# _validate_segment +# ============================================================ + +class TestValidateSegment: + @pytest.mark.parametrize("v", [ + "simple", "with-dashes", "under_score", "dot.dot", "a1b2", + "UPPER", "Mixed123", + ]) + def test_valid(self, v): + _validate_segment("f", v) + + @pytest.mark.parametrize("bad", [ + "", " ", "has space", "a/b", "a:b", "star*", + ]) + def test_invalid(self, bad): + with pytest.raises(ValueError): + _validate_segment("f", bad) + + def test_ends_with_dot_invalid(self): + with pytest.raises(ValueError, match="must not end"): + _validate_segment("f", "end.") + + def test_non_string_raises(self): + with pytest.raises(ValueError, match="must be a string"): + _validate_segment("f", None) + + +# ============================================================ +# _validate_custom_path +# ============================================================ + +class TestValidateCustomPath: + @pytest.mark.parametrize("p", [ + "single", "a/b", "a/b/c", "deep/path/here", + ]) + def test_valid(self, p): + _validate_custom_path(p) + + def test_empty_string(self): + with pytest.raises(ValueError, match="must not be empty"): + _validate_custom_path("") + + def test_empty_segment_in_middle(self): + with pytest.raises(ValueError, match="empty segments"): + _validate_custom_path("a//b") + + def test_trailing_slash(self): + with pytest.raises(ValueError, match="empty segments"): + _validate_custom_path("a/b/") + + def test_leading_slash_empty_first_segment(self): + with pytest.raises(ValueError, match="empty segments"): + _validate_custom_path("/a/b") + + def test_segment_with_bad_chars(self): + with pytest.raises(ValueError): + _validate_custom_path("a/b c/d") + + +# ============================================================ +# _quote helpers +# ============================================================ + +class TestQuoteHelpers: + def test_quote_segment_no_slash(self): + # clean identifiers are unchanged + assert _quote_segment("cephadm") == "cephadm" + + def test_quote_custom_path_preserves_slash(self): + assert _quote_custom_path("a/b/c") == "a/b/c" + + +# ============================================================ +# validate_secret_namespace +# ============================================================ + +class TestValidateSecretNamespace: + def test_valid_namespace(self): + validate_secret_namespace("cephadm") + + def test_empty_namespace(self): + with pytest.raises(ValueError): + validate_secret_namespace("") + + def test_namespace_with_space(self): + with pytest.raises(ValueError): + validate_secret_namespace("bad name") + + +# ============================================================ +# SecretScope +# ============================================================ + +class TestSecretScope: + def test_all_values(self): + assert SecretScope.GLOBAL.value == "global" + assert SecretScope.SERVICE.value == "service" + assert SecretScope.HOST.value == "host" + assert SecretScope.CUSTOM.value == "custom" + + def test_from_str_valid(self): + for v in ("global", "service", "host", "custom"): + scope = SecretScope.from_str(v) + assert scope.value == v + + def test_from_str_invalid(self): + with pytest.raises(CephSecretException, match="Invalid secret scope"): + SecretScope.from_str("vault") + + def test_is_str_subclass(self): + assert isinstance(SecretScope.GLOBAL, str) + + # validate_fields – global + def test_global_valid(self): + SecretScope.GLOBAL.validate_fields("", "mykey") + + def test_global_nonempty_target_fails(self): + with pytest.raises(ValueError, match="target must be empty"): + SecretScope.GLOBAL.validate_fields("target", "key") + + def test_global_empty_name_fails(self): + with pytest.raises(ValueError): + SecretScope.GLOBAL.validate_fields("", "") + + # validate_fields – service / host + def test_service_valid(self): + SecretScope.SERVICE.validate_fields("prometheus", "auth") + + def test_service_empty_target_fails(self): + with pytest.raises(ValueError): + SecretScope.SERVICE.validate_fields("", "auth") + + def test_host_valid(self): + SecretScope.HOST.validate_fields("node1", "ssh_key") + + def test_host_empty_name_fails(self): + with pytest.raises(ValueError): + SecretScope.HOST.validate_fields("node1", "") + + # validate_fields – custom + def test_custom_valid_flat(self): + SecretScope.CUSTOM.validate_fields("", "some-key") + + def test_custom_valid_nested(self): + SecretScope.CUSTOM.validate_fields("", "a/b/c") + + def test_custom_nonempty_target_fails(self): + with pytest.raises(ValueError, match="target must be empty"): + SecretScope.CUSTOM.validate_fields("badtarget", "key") + + def test_custom_empty_name_fails(self): + with pytest.raises(ValueError): + SecretScope.CUSTOM.validate_fields("", "") + + +# ============================================================ +# SecretRef +# ============================================================ + +class TestSecretRef: + def test_global_ref_construction(self): + ref = SecretRef("ns", SecretScope.GLOBAL, "", "pw") + assert ref.namespace == "ns" + assert ref.scope == SecretScope.GLOBAL + assert ref.target == "" + assert ref.name == "pw" + + def test_service_ref(self): + ref = SecretRef("ns", SecretScope.SERVICE, "prom", "auth") + assert ref.target == "prom" + + def test_host_ref(self): + ref = SecretRef("ns", SecretScope.HOST, "n1", "k") + assert ref.name == "k" + + def test_custom_ref(self): + ref = SecretRef("ns", SecretScope.CUSTOM, "", "a/b/c") + assert ref.name == "a/b/c" + + def test_is_frozen(self): + ref = SecretRef("ns", SecretScope.GLOBAL, "", "k") + with pytest.raises(Exception): + ref.name = "other" # type: ignore[misc] + + def test_scope_coerced_from_string(self): + ref = SecretRef("ns", "host", "node1", "k") + assert ref.scope == SecretScope.HOST + + def test_invalid_scope_raises(self): + with pytest.raises(ValueError): + SecretRef("ns", "badscope", "", "k") + + def test_invalid_namespace_raises(self): + with pytest.raises(ValueError): + SecretRef("bad ns", SecretScope.GLOBAL, "", "k") + + def test_ident_global(self): + ref = SecretRef("ns", SecretScope.GLOBAL, "", "k") + assert ref.ident() == ("ns", "global", "", "k") + + def test_ident_service(self): + ref = SecretRef("ns", SecretScope.SERVICE, "t", "n") + assert ref.ident() == ("ns", "service", "t", "n") + + def test_to_uri_global(self): + ref = SecretRef("cephadm", SecretScope.GLOBAL, "", "pw") + assert ref.to_uri() == "secret:/cephadm/global/pw" + + def test_to_uri_service(self): + ref = SecretRef("ns", SecretScope.SERVICE, "prom", "auth") + assert ref.to_uri() == "secret:/ns/service/prom/auth" + + def test_to_uri_host(self): + ref = SecretRef("ns", SecretScope.HOST, "node1", "ssh") + assert ref.to_uri() == "secret:/ns/host/node1/ssh" + + def test_to_uri_custom(self): + ref = SecretRef("ns", SecretScope.CUSTOM, "", "a/b/c") + assert ref.to_uri() == "secret:/ns/custom/a/b/c" + + def test_equality(self): + r1 = SecretRef("ns", SecretScope.GLOBAL, "", "k") + r2 = SecretRef("ns", SecretScope.GLOBAL, "", "k") + assert r1 == r2 + + def test_inequality_different_name(self): + r1 = SecretRef("ns", SecretScope.GLOBAL, "", "k1") + r2 = SecretRef("ns", SecretScope.GLOBAL, "", "k2") + assert r1 != r2 + + +# ============================================================ +# BadSecretURI +# ============================================================ + +class TestBadSecretURI: + def test_construction(self): + b = BadSecretURI(raw="secret:/bad/scope/key", error="bad scope", namespace="ns") + assert b.raw == "secret:/bad/scope/key" + assert b.error == "bad scope" + + def test_to_uri_returns_raw(self): + b = BadSecretURI(raw="secret:/bad", error="err", namespace="ns") + assert b.to_uri() == "secret:/bad" + + def test_is_frozen(self): + b = BadSecretURI(raw="x", error="e", namespace="n") + with pytest.raises(Exception): + b.raw = "y" # type: ignore[misc] + + +# ============================================================ +# parse_secret_uri +# ============================================================ + +class TestParseSecretUri: + # --- valid inputs --- + def test_global(self): + ref = parse_secret_uri("secret:/ns/global/pw") + assert ref.scope == SecretScope.GLOBAL + assert ref.name == "pw" + + def test_service(self): + ref = parse_secret_uri("secret:/ns/service/prom/auth") + assert ref.scope == SecretScope.SERVICE + assert ref.target == "prom" + assert ref.name == "auth" + + def test_host(self): + ref = parse_secret_uri("secret:/ns/host/node1/ssh") + assert ref.target == "node1" + assert ref.name == "ssh" + + def test_custom_flat(self): + ref = parse_secret_uri("secret:/ns/custom/single") + assert ref.scope == SecretScope.CUSTOM + assert ref.name == "single" + + def test_custom_nested(self): + ref = parse_secret_uri("secret:/ns/custom/a/b/c") + assert ref.name == "a/b/c" + + # --- invalid inputs --- + def test_wrong_scheme(self): + with pytest.raises(CephSecretException, match="Not a secret uri"): + parse_secret_uri("http://example.com/foo") + + def test_authority_rejected(self): + with pytest.raises(CephSecretException, match="authority"): + parse_secret_uri("secret://ns/global/key") + + def test_query_string_rejected(self): + with pytest.raises(CephSecretException, match="query"): + parse_secret_uri("secret:/ns/global/key?x=y") + + def test_fragment_rejected(self): + with pytest.raises(CephSecretException, match="query"): + parse_secret_uri("secret:/ns/global/key#frag") + + def test_percent_encoding_rejected(self): + with pytest.raises(CephSecretException, match="percent-encoding"): + parse_secret_uri("secret:/ns/global/my%2Dkey") + + def test_too_short(self): + with pytest.raises(CephSecretException): + parse_secret_uri("secret:/ns/global") + + def test_bad_scope(self): + with pytest.raises(CephSecretException): + parse_secret_uri("secret:/ns/vault/key") + + def test_non_string_input(self): + with pytest.raises(CephSecretException, match="must be a string"): + parse_secret_uri(42) # type: ignore[arg-type] + + def test_global_with_extra_segment_rejected(self): + with pytest.raises(CephSecretException): + parse_secret_uri("secret:/ns/global/target/name") + + def test_service_missing_name(self): + with pytest.raises(CephSecretException): + parse_secret_uri("secret:/ns/service/prom") + + +# ============================================================ +# parse_secret_path +# ============================================================ + +class TestParseSecretPath: + # --- valid inputs --- + def test_global(self): + ref = parse_secret_path("cephadm/global/pw") + assert ref.namespace == "cephadm" + assert ref.scope == SecretScope.GLOBAL + assert ref.name == "pw" + + def test_service(self): + ref = parse_secret_path("ns/service/prom/auth") + assert ref.target == "prom" + assert ref.name == "auth" + + def test_host(self): + ref = parse_secret_path("ns/host/node1/ssh_key") + assert ref.name == "ssh_key" + + def test_custom_flat(self): + ref = parse_secret_path("ns/custom/flat") + assert ref.scope == SecretScope.CUSTOM + assert ref.name == "flat" + + def test_custom_nested(self): + ref = parse_secret_path("ns/custom/a/b/c") + assert ref.name == "a/b/c" + + def test_leading_slash_stripped(self): + ref = parse_secret_path("/ns/global/k") + assert ref.namespace == "ns" + + # --- invalid inputs --- + def test_empty_string(self): + with pytest.raises(CephSecretException, match="empty"): + parse_secret_path("") + + def test_non_string(self): + with pytest.raises(CephSecretException): + parse_secret_path(None) # type: ignore[arg-type] + + def test_too_few_segments(self): + with pytest.raises(CephSecretException, match="Use"): + parse_secret_path("ns/global") + + def test_double_slash(self): + with pytest.raises(CephSecretException): + parse_secret_path("ns//global/key") + + def test_trailing_slash(self): + with pytest.raises(CephSecretException, match="empty segment"): + parse_secret_path("ns/global/key/") + + def test_bad_scope(self): + with pytest.raises(CephSecretException): + parse_secret_path("ns/badscope/key") + + def test_global_extra_segment(self): + with pytest.raises(CephSecretException, match="global scope"): + parse_secret_path("ns/global/target/name") + + def test_service_missing_name(self): + with pytest.raises(CephSecretException, match="service"): + parse_secret_path("ns/service/target") + + def test_host_missing_name(self): + with pytest.raises(CephSecretException, match="host"): + parse_secret_path("ns/host/target") + + +# ============================================================ +# Whitespace / canonicality +# ============================================================ + +class TestParseSecretPathWhitespace: + @pytest.mark.parametrize("path", [ + " ns/global/key", + "ns/global/key ", + "\tns/global/key", + "ns/global/key\n", + ]) + def test_rejects_outer_whitespace(self, path): + with pytest.raises(CephSecretException): + parse_secret_path(path) + + +class TestParseSecretUriWhitespace: + def test_rejects_leading_whitespace(self): + with pytest.raises(CephSecretException): + parse_secret_uri(" secret:/ns/global/key") + + def test_rejects_trailing_whitespace(self): + with pytest.raises(CephSecretException): + parse_secret_uri("secret:/ns/global/key ")