From: Redouane Kachach Date: Mon, 26 Jan 2026 13:46:39 +0000 (+0100) Subject: mgr/ceph_secrets: add SecretMgr for secrets handling and resolution X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9c220eec0eac3bec950116b1ee0d8ce8d2b3a80a;p=ceph.git mgr/ceph_secrets: add SecretMgr for secrets handling and resolution Introduce SecretMgr to encapsulate higher-level behavior on top of SecretStoreMon: listing helpers, scan_refs, scan_unresolved_refs, and resolve_object (walk nested dict/list structures). This keeps parsing/substitution logic out of the mgr module entrypoint and makes consumer behavior consistent. The module can now resolve secret://… references deterministically and provide structured scan output. Fixes: https://tracker.ceph.com/issues/74562 Assisted-by: Claude Assisted-by: ChatGPT Signed-off-by: Redouane Kachach --- diff --git a/src/pybind/mgr/ceph_secrets/secret_mgr.py b/src/pybind/mgr/ceph_secrets/secret_mgr.py new file mode 100644 index 00000000000..d957be59275 --- /dev/null +++ b/src/pybind/mgr/ceph_secrets/secret_mgr.py @@ -0,0 +1,217 @@ +# -*- coding: utf-8 -*- +import logging +from typing import Any, List, Optional, Set, Union, Hashable, Protocol + +from .secret_store import SecretRecord, SecretData +from ceph_secrets_types import ( + CephSecretException, + CephSecretNotFoundError, + SecretRef, + BadSecretURI, + SecretScope, + parse_secret_uri, + SECRET_SCHEME +) + + +_SECRET_URI_PREFIX = f'{SECRET_SCHEME}:/' + + +logger = logging.getLogger(__name__) + + +class SecretURI(Protocol, Hashable): + def to_uri(self) -> str: + ... + + +def _coerce_scope(scope: Union[SecretScope, str]) -> SecretScope: + if isinstance(scope, SecretScope): + return scope + return SecretScope.from_str(str(scope)) + + +class SecretMgr: + """ + Phase 1: Mon-store backend only. + + Secret data is an opaque string. Callers are responsible for any + structure within it (e.g. JSON-encoding a dict before storing and + decoding after retrieval). resolve_object() substitutes a secret URI + with the stored string directly. + """ + + def __init__(self, store: Any) -> None: + self.store = store + + def make_ref( + self, + namespace: str, + scope: Union[SecretScope, str], + target: str = '', + name: str = '', + ) -> SecretRef: + try: + return SecretRef( + namespace=namespace, + scope=_coerce_scope(scope), + target=target or '', + name=name, + ) + except ValueError as e: + raise CephSecretException(str(e)) from e + + def get(self, ref: SecretRef) -> SecretRecord: + rec = self.store.get(ref.namespace, ref.scope, ref.target, ref.name) + if rec is None: + raise CephSecretNotFoundError(f"Secret not found: {ref.to_uri()}") + return rec + + def get_value(self, ref: SecretRef) -> str: + """Return the opaque data string for a secret.""" + rec = self.get(ref) + return rec.data + + def set( + self, + namespace: str, + scope: Union[SecretScope, str], + target: str, + name: str, + data: SecretData, + user_made: bool = True, + editable: bool = True, + ) -> SecretRecord: + if not isinstance(data, str): + raise CephSecretException('Secret data must be a string') + if data == '': + raise CephSecretException('Secret data must not be empty') + + ref = self.make_ref(namespace, scope, target, name) + return self.store.set( + ref.namespace, + ref.scope, + ref.target, + ref.name, + data, + user_made, + editable, + ) + + def rm( + self, + namespace: str, + scope: Union[SecretScope, str], + target: str, + name: str, + ) -> bool: + ref = self.make_ref(namespace, scope, target, name) + return self.store.rm(ref.namespace, ref.scope, ref.target, ref.name) + + def ls( + self, + namespace: Optional[str] = None, + scope: Optional[Union[SecretScope, str]] = None, + target: Optional[str] = None, + ) -> List[SecretRecord]: + sc = _coerce_scope(scope) if scope else None + return self.store.ls(namespace=namespace, scope=sc, target=target) + + def scan_unresolved_refs(self, obj: Any, namespace: str) -> Set[SecretURI]: + """ + Return secret refs found in `obj` that cannot be fetched. + """ + unresolved: Set[SecretURI] = set() + for ref in self.scan_refs(obj, namespace): + if isinstance(ref, SecretRef): + try: + self.get_value(ref) + except CephSecretException: + unresolved.add(ref) + else: + unresolved.add(ref) + return unresolved + + def scan_refs(self, obj: Any, namespace: str) -> Set[SecretURI]: + """Collect secret references from *obj*. + + A reference must be the *entire* (stripped) value of a string field — + e.g. ``"secret:/ns/global/pw"``. Embedded URIs inside a larger string + (e.g. ``"Bearer secret:/..."``) are not supported and are surfaced as + a BadSecretURI rather than silently ignored, so validation can flag + them before deploy. + """ + refs: Set[SecretURI] = set() + + def _scan(v: Any) -> None: + if isinstance(v, dict): + for vv in v.values(): + _scan(vv) + elif isinstance(v, (list, tuple)): + for vv in v: + _scan(vv) + elif isinstance(v, str): + s = v.strip() + if s.startswith(_SECRET_URI_PREFIX): + try: + refs.add(parse_secret_uri(s)) + except Exception as e: + logger.warning("Failed to parse secret uri %r: %s", s, e) + refs.add(BadSecretURI(raw=s, namespace=namespace, error=str(e))) + elif _SECRET_URI_PREFIX in s: + # Contains a secret URI but is not a whole-value reference. + err = ('embedded secret URIs are not supported; the entire ' + 'field value must be the secret URI') + logger.warning("Rejecting embedded secret uri in %r", v) + refs.add(BadSecretURI(raw=v, namespace=namespace, error=err)) + # otherwise: plain data, not a reference + + _scan(obj) + return refs + + def _resolve_secret_uri(self, uri: str) -> str: + """Resolve a single secret URI to its opaque data string.""" + try: + parsed_secret = parse_secret_uri(uri) + except CephSecretException as e: + raise CephSecretException(f"Invalid secret URI {uri!r}: {e}") from e + if not isinstance(parsed_secret, SecretRef): + raise CephSecretException(f"Invalid secret URI {uri!r}") + return self.get_value(parsed_secret) + + def _resolve(self, v: Any) -> Any: + """Recursively resolve secret URIs within a nested structure.""" + if isinstance(v, dict): + return {k: self._resolve(vv) for k, vv in v.items()} + if isinstance(v, list): + return [self._resolve(vv) for vv in v] + if isinstance(v, tuple): + return tuple(self._resolve(vv) for vv in v) + if isinstance(v, str): + s = v.strip() + if s.startswith(_SECRET_URI_PREFIX): + return self._resolve_secret_uri(s) + if _SECRET_URI_PREFIX in s: + # A field that embeds a secret URI inside other text would be + # deployed verbatim (leaking the unresolved placeholder). Fail + # loudly rather than silently passing it through. + raise CephSecretException( + f"Invalid secret reference {v!r}: embedded secret URIs are " + f"not supported; the entire field value must be the secret URI" + ) + return v + + def resolve_object(self, obj: Any) -> Any: + """Resolve secret references within nested dict/list/tuple structures. + + A string whose entire (stripped) value is a secret URI is replaced by + the stored opaque data string for that secret. Surrounding whitespace + around an otherwise-clean URI is tolerated; non-secret strings are + returned unchanged, preserving their original whitespace. + + A string that embeds a secret URI inside other text (e.g. + ``"Bearer secret:/..."``) is rejected with CephSecretException, since + partial substitution is not supported and silently emitting the literal + URI would leak an unresolved placeholder into deployed configuration. + """ + return self._resolve(obj)