]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/smb: add keybridge configuration to cluster resource
authorJohn Mulligan <jmulligan@redhat.com>
Tue, 22 Jul 2025 23:22:15 +0000 (19:22 -0400)
committerAvan Thakkar <athakkar@redhat.com>
Thu, 27 Nov 2025 15:07:27 +0000 (20:37 +0530)
Add keybridge service configuration classes and parameters to the
resources module. This supports enabling the keybridge, setting up
scopes for the keybridge and it's access control.

A helper class is added that parses and helps validate the scope names.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
(cherry picked from commit 75676e5ab716874baa102bf5fb278aaf598a6944)

src/pybind/mgr/smb/resources.py

index 4fe7ad6b4d6888e8b77f6dd55218c5c96aee7c4d..a8b3a462dccc5fcc95674b3729ed44795089c6c0 100644 (file)
@@ -21,6 +21,8 @@ from .enums import (
     CephFSStorageProvider,
     Intent,
     JoinSourceType,
+    KeyBridgePeerPolicy,
+    KeyBridgeScopeType,
     LoginAccess,
     LoginCategory,
     PasswordFilter,
@@ -120,6 +122,88 @@ class BigString(str):
 yaml.SafeDumper.add_representer(BigString, BigString.yaml_representer)
 
 
+class KeyBridgeScopeIdentity:
+    """Represent a KeyBridge scope's name in a structured manner.
+    Helps parse and validate the name of a keybridge scope without encoding a
+    more complex type in the JSON/YAML.
+
+    NOTE: Does not need to be serialized by resourcelib.
+    """
+
+    _AUTO_SUB = '00'
+
+    def __init__(
+        self,
+        scope_type: KeyBridgeScopeType,
+        subname: str = '',
+        *,
+        autosub: bool = False,
+    ):
+        if scope_type.unique() and subname:
+            raise ValueError(
+                f'invalid scope name {scope_type}.{subname},'
+                f' must be {scope_type}'
+            )
+        if subname:
+            # is the subname valid?
+            try:
+                validation.check_id(subname)
+            except ValueError as err:
+                raise ValueError(f'invalid scope name: {err}')
+        if autosub and not scope_type.unique():
+            # used to transform unqualified non-unique to qualified
+            subname = self._AUTO_SUB
+        elif subname and subname.startswith(self._AUTO_SUB):
+            # reserved for auto-naming and other future uses
+            raise ValueError(f'invalid scope name: reserved id: {subname}')
+        self._scope_type = scope_type
+        self._subname = subname
+
+    @property
+    def scope_type(self) -> KeyBridgeScopeType:
+        return self._scope_type
+
+    def __str__(self) -> str:
+        if self._subname:
+            return f'{self._scope_type}.{self._subname}'
+        return str(self._scope_type)
+
+    def qualified(self) -> Self:
+        """Return a qualified version of this scope identity if the scope is
+        not unique.
+        """
+        if self._scope_type.unique() or self._subname:
+            return self
+        return self.__class__(self._scope_type, autosub=True)
+
+    @classmethod
+    def from_name(cls, name: str) -> Self:
+        """Parse a scope name string into a scope identity.
+
+        A scope name can be unqalified, consisting only of the scope type, like
+        "mem" or "kmip" or qualified where a sub-name follows a dot (.)
+        following the type, like "kmip.foo". This allows the common case of
+        just one "kmip" scope but allow for >1 if needed (eg. "kmip.1" &
+        "kmip.2".
+
+        Subnames starting with "00" are resrved for automatic naming and/or
+        future uses.
+        """
+        typename, subname = name, ''
+        if '.' in name:
+            typename, subname = name.split('.', 1)
+            if not subname:
+                raise ValueError(
+                    'invalid scope name: no value after delimiter'
+                )
+        try:
+            _type = KeyBridgeScopeType(typename)
+        except ValueError:
+            scopes = sorted(st.value for st in KeyBridgeScopeType)
+            raise ValueError(f'invalid scope type: must be one of {scopes}')
+        return cls(_type, subname)
+
+
 class _RBase:
     # mypy doesn't currently (well?) support class decorators adding methods
     # so we use a base class to add this method to all our resource classes.
@@ -510,6 +594,96 @@ class RemoteControl(_RBase):
         return bool(self.cert and self.key)
 
 
+@resourcelib.component()
+class KeyBridgeScope(_RBase):
+    """Define and configure scopes for they keybridge service.
+    Each each scope is to be named via <type>[.<subname>] and specifies zero or
+    configuration parameters depending on the scope type.
+    """
+
+    # name of the scope (can be unique, like "mem" or "kmip" or qualified
+    # like "kmip.1")
+    name: str
+    # KMIP fields
+    kmip_hosts: Optional[List[str]] = None
+    kmip_port: Optional[int] = None
+    kmip_cert: Optional[TLSSource] = None
+    kmip_key: Optional[TLSSource] = None
+    kmip_ca_cert: Optional[TLSSource] = None
+
+    def scope_identity(self) -> KeyBridgeScopeIdentity:
+        return KeyBridgeScopeIdentity.from_name(self.name)
+
+    def validate(self) -> None:
+        kbsi = self.scope_identity()  # raises value error if scope invalid
+        vfn = {
+            KeyBridgeScopeType.KMIP: self.validate_kmip,
+            KeyBridgeScopeType.MEM: self.validate_mem,
+        }
+        vfn[kbsi.scope_type]()
+
+    def validate_kmip(self) -> None:
+        if not self.kmip_hosts:
+            raise ValueError('at least one kmip hostname is required')
+        if not (self.kmip_port or all(':' in h for h in self.kmip_hosts)):
+            raise ValueError(
+                'a kmip default port is required unless all'
+                ' hosts include a port'
+            )
+        # TODO: should tls credentials be always required?
+        if not (self.kmip_cert and self.kmip_key and self.kmip_ca_cert):
+            raise ValueError('kmip requires a cert, a key, and a ca cert')
+
+    def validate_mem(self) -> None:
+        if (
+            self.kmip_hosts
+            or self.kmip_port
+            or self.kmip_cert
+            or self.kmip_key
+            or self.kmip_ca_cert
+        ):
+            raise ValueError('mem scope does not support kmip parameters')
+
+
+@resourcelib.component()
+class KeyBridge(_RBase):
+    """Configure and enable/disable the keybridge service for this cluster.
+
+    The keybridge can be explicitly enabled or disabled. It will automatically
+    be enabled if scopes are defined and is not explictly enabled (or
+    disabled).  The peer_policy parameter can be used by devs/testers to relax
+    some of the normal access restrictions.
+    """
+
+    # enabled can be set to explicitly toggle the keybridge server
+    enabled: Optional[bool] = None
+    scopes: Optional[List[KeyBridgeScope]] = None
+    # peer_policy allows one to change/relax the keybridge server's peer
+    # verification policy. generally this is only something a developer
+    # should change
+    peer_policy: Optional[KeyBridgePeerPolicy] = None
+
+    @property
+    def is_enabled(self) -> bool:
+        if self.enabled is not None:
+            return self.enabled
+        return bool(self.scopes)
+
+    @property
+    def use_peer_policy(self) -> KeyBridgePeerPolicy:
+        if self.peer_policy is None:
+            return KeyBridgePeerPolicy.RESTRICTED
+        return self.peer_policy
+
+    def validate(self) -> None:
+        if self.enabled and not self.scopes:
+            raise ValueError(
+                'an enabled KeyBridge requires at least one scope'
+            )
+        for scope in self.scopes or []:
+            scope.validate()
+
+
 @resourcelib.resource('ceph.smb.cluster')
 class Cluster(_RBase):
     """Represents a cluster (instance) that is / should be present."""
@@ -532,6 +706,7 @@ class Cluster(_RBase):
     bind_addrs: Optional[List[ClusterBindIP]] = None
     # configure a remote control sidecar server.
     remote_control: Optional[RemoteControl] = None
+    keybridge: Optional[KeyBridge] = None
 
     def validate(self) -> None:
         if not self.cluster_id:
@@ -587,6 +762,15 @@ class Cluster(_RBase):
             return False
         return self.remote_control.is_enabled
 
+    @property
+    def keybridge_is_enabled(self) -> bool:
+        """Return true is a keybridge service should be enabled for this
+        cluster.
+        """
+        if not self.keybridge:
+            return False
+        return self.keybridge.is_enabled
+
     def is_clustered(self) -> bool:
         """Return true if smbd instance should use (CTDB) clustering."""
         if self.clustering_mode == SMBClustering.ALWAYS: