CephFSStorageProvider,
Intent,
JoinSourceType,
+ KeyBridgePeerPolicy,
+ KeyBridgeScopeType,
LoginAccess,
LoginCategory,
PasswordFilter,
yaml.SafeDumper.add_representer(BigString, BigString.yaml_representer)
+class KeyBridgeScopeIdentity:
+ """Represent a KeyBridge scope's name in a structured manner.
+ Helps parse and validate the name of a keybridge scope without encoding a
+ more complex type in the JSON/YAML.
+
+ NOTE: Does not need to be serialized by resourcelib.
+ """
+
+ _AUTO_SUB = '00'
+
+ def __init__(
+ self,
+ scope_type: KeyBridgeScopeType,
+ subname: str = '',
+ *,
+ autosub: bool = False,
+ ):
+ if scope_type.unique() and subname:
+ raise ValueError(
+ f'invalid scope name {scope_type}.{subname},'
+ f' must be {scope_type}'
+ )
+ if subname:
+ # is the subname valid?
+ try:
+ validation.check_id(subname)
+ except ValueError as err:
+ raise ValueError(f'invalid scope name: {err}')
+ if autosub and not scope_type.unique():
+ # used to transform unqualified non-unique to qualified
+ subname = self._AUTO_SUB
+ elif subname and subname.startswith(self._AUTO_SUB):
+ # reserved for auto-naming and other future uses
+ raise ValueError(f'invalid scope name: reserved id: {subname}')
+ self._scope_type = scope_type
+ self._subname = subname
+
+ @property
+ def scope_type(self) -> KeyBridgeScopeType:
+ return self._scope_type
+
+ def __str__(self) -> str:
+ if self._subname:
+ return f'{self._scope_type}.{self._subname}'
+ return str(self._scope_type)
+
+ def qualified(self) -> Self:
+ """Return a qualified version of this scope identity if the scope is
+ not unique.
+ """
+ if self._scope_type.unique() or self._subname:
+ return self
+ return self.__class__(self._scope_type, autosub=True)
+
+ @classmethod
+ def from_name(cls, name: str) -> Self:
+ """Parse a scope name string into a scope identity.
+
+ A scope name can be unqalified, consisting only of the scope type, like
+ "mem" or "kmip" or qualified where a sub-name follows a dot (.)
+ following the type, like "kmip.foo". This allows the common case of
+ just one "kmip" scope but allow for >1 if needed (eg. "kmip.1" &
+ "kmip.2".
+
+ Subnames starting with "00" are resrved for automatic naming and/or
+ future uses.
+ """
+ typename, subname = name, ''
+ if '.' in name:
+ typename, subname = name.split('.', 1)
+ if not subname:
+ raise ValueError(
+ 'invalid scope name: no value after delimiter'
+ )
+ try:
+ _type = KeyBridgeScopeType(typename)
+ except ValueError:
+ scopes = sorted(st.value for st in KeyBridgeScopeType)
+ raise ValueError(f'invalid scope type: must be one of {scopes}')
+ return cls(_type, subname)
+
+
class _RBase:
# mypy doesn't currently (well?) support class decorators adding methods
# so we use a base class to add this method to all our resource classes.
return bool(self.cert and self.key)
+@resourcelib.component()
+class KeyBridgeScope(_RBase):
+ """Define and configure scopes for they keybridge service.
+ Each each scope is to be named via <type>[.<subname>] and specifies zero or
+ configuration parameters depending on the scope type.
+ """
+
+ # name of the scope (can be unique, like "mem" or "kmip" or qualified
+ # like "kmip.1")
+ name: str
+ # KMIP fields
+ kmip_hosts: Optional[List[str]] = None
+ kmip_port: Optional[int] = None
+ kmip_cert: Optional[TLSSource] = None
+ kmip_key: Optional[TLSSource] = None
+ kmip_ca_cert: Optional[TLSSource] = None
+
+ def scope_identity(self) -> KeyBridgeScopeIdentity:
+ return KeyBridgeScopeIdentity.from_name(self.name)
+
+ def validate(self) -> None:
+ kbsi = self.scope_identity() # raises value error if scope invalid
+ vfn = {
+ KeyBridgeScopeType.KMIP: self.validate_kmip,
+ KeyBridgeScopeType.MEM: self.validate_mem,
+ }
+ vfn[kbsi.scope_type]()
+
+ def validate_kmip(self) -> None:
+ if not self.kmip_hosts:
+ raise ValueError('at least one kmip hostname is required')
+ if not (self.kmip_port or all(':' in h for h in self.kmip_hosts)):
+ raise ValueError(
+ 'a kmip default port is required unless all'
+ ' hosts include a port'
+ )
+ # TODO: should tls credentials be always required?
+ if not (self.kmip_cert and self.kmip_key and self.kmip_ca_cert):
+ raise ValueError('kmip requires a cert, a key, and a ca cert')
+
+ def validate_mem(self) -> None:
+ if (
+ self.kmip_hosts
+ or self.kmip_port
+ or self.kmip_cert
+ or self.kmip_key
+ or self.kmip_ca_cert
+ ):
+ raise ValueError('mem scope does not support kmip parameters')
+
+
+@resourcelib.component()
+class KeyBridge(_RBase):
+ """Configure and enable/disable the keybridge service for this cluster.
+
+ The keybridge can be explicitly enabled or disabled. It will automatically
+ be enabled if scopes are defined and is not explictly enabled (or
+ disabled). The peer_policy parameter can be used by devs/testers to relax
+ some of the normal access restrictions.
+ """
+
+ # enabled can be set to explicitly toggle the keybridge server
+ enabled: Optional[bool] = None
+ scopes: Optional[List[KeyBridgeScope]] = None
+ # peer_policy allows one to change/relax the keybridge server's peer
+ # verification policy. generally this is only something a developer
+ # should change
+ peer_policy: Optional[KeyBridgePeerPolicy] = None
+
+ @property
+ def is_enabled(self) -> bool:
+ if self.enabled is not None:
+ return self.enabled
+ return bool(self.scopes)
+
+ @property
+ def use_peer_policy(self) -> KeyBridgePeerPolicy:
+ if self.peer_policy is None:
+ return KeyBridgePeerPolicy.RESTRICTED
+ return self.peer_policy
+
+ def validate(self) -> None:
+ if self.enabled and not self.scopes:
+ raise ValueError(
+ 'an enabled KeyBridge requires at least one scope'
+ )
+ for scope in self.scopes or []:
+ scope.validate()
+
+
@resourcelib.resource('ceph.smb.cluster')
class Cluster(_RBase):
"""Represents a cluster (instance) that is / should be present."""
bind_addrs: Optional[List[ClusterBindIP]] = None
# configure a remote control sidecar server.
remote_control: Optional[RemoteControl] = None
+ keybridge: Optional[KeyBridge] = None
def validate(self) -> None:
if not self.cluster_id:
return False
return self.remote_control.is_enabled
+ @property
+ def keybridge_is_enabled(self) -> bool:
+ """Return true is a keybridge service should be enabled for this
+ cluster.
+ """
+ if not self.keybridge:
+ return False
+ return self.keybridge.is_enabled
+
def is_clustered(self) -> bool:
"""Return true if smbd instance should use (CTDB) clustering."""
if self.clustering_mode == SMBClustering.ALWAYS: