from collections import OrderedDict
from contextlib import contextmanager
from functools import wraps
-from ipaddress import ip_network, ip_address
+from ipaddress import ip_network, ip_address, ip_interface
from typing import (
Any,
Callable,
yaml.add_representer(CephExporterSpec, ServiceSpec.yaml_representer)
+class SMBClusterPublicIPSpec:
+ # The SMBClusterIPSpec must be able to translate between what cephadm
+ # knows about the system, networks using network addresses, and what
+ # ctdb wants, an IP combined with a prefixlen and device names.
+ def __init__(
+ self,
+ address: str,
+ destination: Union[str, List[str], None] = None,
+ ) -> None:
+ self.address = address
+ self.destination = destination
+ self.validate()
+
+ def validate(self) -> None:
+ if not self.address:
+ raise SpecValidationError('address value missing')
+ if '/' not in self.address:
+ raise SpecValidationError(
+ 'a combined address and prefix length is required'
+ )
+ # in the future we may want to enhance this to take IPs only and figure
+ # out the prefixlen automatically. However, we going to start simple and
+ # require prefix lengths just like ctdb itself does.
+ try:
+ # cache the parsed interface address internally
+ self._addr_iface = ip_interface(self.address)
+ except ValueError as err:
+ raise SpecValidationError(
+ f'Cannot parse interface address {self.address}'
+ ) from err
+ # we strongly prefer /{prefixlen} form, even if the user supplied
+ # a netmask
+ self.address = self._addr_iface.with_prefixlen
+
+ self._destinations = []
+ if not self.destination:
+ return
+ if isinstance(self.destination, str):
+ _dests = [self.destination]
+ elif isinstance(self.destination, list) and all(
+ isinstance(v, str) for v in self.destination
+ ):
+ _dests = self.destination
+ else:
+ raise ValueError(
+ 'destination field must be a string or list of strings'
+ )
+ for dest in _dests:
+ try:
+ dnet = ip_network(dest)
+ except ValueError as err:
+ raise SpecValidationError(
+ f'Cannot parse network value {self.address}'
+ ) from err
+ self._destinations.append(dnet)
+
+ def __eq__(self, other: Any) -> bool:
+ try:
+ return (
+ other.address == self.address
+ and other.destination == self.destination
+ )
+ except AttributeError:
+ return NotImplemented
+
+ def __repr__(self) -> str:
+ return (
+ f'SMBClusterPublicIPSpec({self.address!r}, {self.destination!r})'
+ )
+
+ def to_json(self) -> Dict[str, Any]:
+ """Return a JSON-compatible representation of the SMBClusterPublicIPSpec."""
+ out: Dict[str, Any] = {'address': self.address}
+ if self.destination:
+ out['destination'] = self.destination
+ return out
+
+ def to_strict(self) -> Dict[str, Any]:
+ """Return a strictly formed expanded JSON-compatible representation of
+ the spec. This is not round-trip-able.
+ """
+ # The strict form always contains destination as a list of strings.
+ dests = [n.with_prefixlen for n in self._destinations]
+ if not dests:
+ dests = [self._addr_iface.network.with_prefixlen]
+ return {
+ 'address': self.address,
+ 'destinations': dests,
+ }
+
+ @classmethod
+ def from_json(cls, spec: Dict[str, Any]) -> 'SMBClusterPublicIPSpec':
+ if 'address' not in spec:
+ raise SpecValidationError(
+ 'SMB cluster public IP spec missing required field: address'
+ )
+ return cls(spec['address'], spec.get('destination'))
+
+ @classmethod
+ def convert_list(
+ cls, arg: Optional[List[Any]]
+ ) -> Optional[List['SMBClusterPublicIPSpec']]:
+ if arg is None:
+ return None
+ assert isinstance(arg, list)
+ out = []
+ for value in arg:
+ if isinstance(value, cls):
+ out.append(value)
+ elif hasattr(value, 'to_json'):
+ out.append(cls.from_json(value.to_json()))
+ elif isinstance(value, dict):
+ out.append(cls.from_json(value))
+ else:
+ raise SpecValidationError(
+ f"Unknown type for SMBClusterPublicIPSpec: {type(value)}"
+ )
+ return out
+
+
class SMBSpec(ServiceSpec):
service_type = 'smb'
_valid_features = {'domain', 'clustered'}
# cluster_lock_uri - a pseudo-uri that resolves to a (rados) object
# that will be used by CTDB for a cluster leader / recovery lock.
cluster_lock_uri: Optional[str] = None,
+ # cluster_public_addrs - A list of SMB cluster public IP specs.
+ # If supplied, these will be used to esatablish floating virtual ips
+ # managed by Samba CTDB cluster subsystem.
+ cluster_public_addrs: Optional[List[SMBClusterPublicIPSpec]] = None,
# --- genearal tweaks ---
extra_container_args: Optional[GeneralArgList] = None,
extra_entrypoint_args: Optional[GeneralArgList] = None,
self.include_ceph_users = include_ceph_users or []
self.cluster_meta_uri = cluster_meta_uri
self.cluster_lock_uri = cluster_lock_uri
+ self.cluster_public_addrs = SMBClusterPublicIPSpec.convert_list(
+ cluster_public_addrs
+ )
self.validate()
def validate(self) -> None:
raise ValueError(
'cluster lock uri unsupported when "clustered" feature not set'
)
+ for spec in self.cluster_public_addrs or []:
+ spec.validate()
def _derive_cluster_uri(self, uri: str, objname: str) -> str:
if not uri.startswith('rados://'):
uri = 'rados://' + '/'.join(parts)
return uri
+ def strict_cluster_ip_specs(self) -> List[Dict[str, Any]]:
+ return [s.to_strict() for s in (self.cluster_public_addrs or [])]
+
+ def to_json(self) -> "OrderedDict[str, Any]":
+ obj = super().to_json()
+ spec = obj.get('spec')
+ if spec and spec.get('cluster_public_addrs'):
+ spec['cluster_public_addrs'] = [
+ a.to_json() for a in spec['cluster_public_addrs']
+ ]
+ return obj
+
yaml.add_representer(SMBSpec, ServiceSpec.yaml_representer)