return ret
- def __repr__(self):
+ def __repr__(self) -> str:
keys = [
key for key in self._supported_filters + ['limit'] if getattr(self, key) is not None
]
', '.join('{}={}'.format(key, repr(getattr(self, key))) for key in keys)
)
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
return repr(self) == repr(other)
if it was raised in a different mgr module.
"""
- def __init__(self, msg):
+ def __init__(self, msg: str):
super(DriveGroupValidationError, self).__init__('Failed to validate Drive Group: ' + msg)
def __init__(self,
placement=None, # type: Optional[PlacementSpec]
- service_id=None, # type: str
+ service_id=None, # type: Optional[str]
data_devices=None, # type: Optional[DeviceSelection]
db_devices=None, # type: Optional[DeviceSelection]
wal_devices=None, # type: Optional[DeviceSelection]
:param json_drive_group: A valid json string with a Drive Group
specification
"""
- args = {}
+ args: Dict[str, Any] = {}
# legacy json (pre Octopus)
if 'host_pattern' in json_drive_group and 'placement' not in json_drive_group:
json_drive_group['placement'] = {'host_pattern': json_drive_group['host_pattern']}
if self.filter_logic not in ['AND', 'OR']:
raise DriveGroupValidationError('filter_logic must be either <AND> or <OR>')
- def __repr__(self):
+ def __repr__(self) -> str:
keys = [
key for key in self._supported_features if getattr(self, key) is not None
]
', '.join('{}={}'.format(key, repr(getattr(self, key))) for key in keys)
)
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
return repr(self) == repr(other)
import errno
import fnmatch
import re
-from collections import namedtuple, OrderedDict
+from collections import OrderedDict
from functools import wraps
-from typing import Optional, Dict, Any, List, Union, Callable, Iterable, Type, TypeVar, cast
+from typing import Optional, Dict, Any, List, Union, Callable, Iterable, Type, TypeVar, cast, \
+ NamedTuple
import yaml
self.errno = errno
-def assert_valid_host(name):
+def assert_valid_host(name: str) -> None:
p = re.compile('^[a-zA-Z0-9-]+$')
try:
assert len(name) <= 250, 'name is too long (max 250 chars)'
assert len(part) <= 63, '.-delimited name component must not be more than 63 chars'
assert p.match(part), 'name component must include only a-z, 0-9, and -'
except AssertionError as e:
- raise ServiceSpecValidationError(e)
+ raise ServiceSpecValidationError(str(e))
def handle_type_error(method: FuncT) -> FuncT:
@wraps(method)
- def inner(cls, *args, **kwargs):
+ def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
try:
return method(cls, *args, **kwargs)
except (TypeError, AttributeError) as e:
return cast(FuncT, inner)
-class HostPlacementSpec(namedtuple('HostPlacementSpec', ['hostname', 'network', 'name'])):
- def __str__(self):
+class HostPlacementSpec(NamedTuple):
+ hostname: str
+ network: str
+ name: str
+
+ def __str__(self) -> str:
res = ''
res += self.hostname
if self.network:
@classmethod
@handle_type_error
- def from_json(cls, data):
+ def from_json(cls, data: Union[dict, str]) -> 'HostPlacementSpec':
if isinstance(data, str):
return cls.parse(data)
return cls(**data)
host_spec.validate()
return host_spec
- def validate(self):
+ def validate(self) -> None:
assert_valid_host(self.hostname)
def __init__(self,
label=None, # type: Optional[str]
- hosts=None, # type: Union[List[str],List[HostPlacementSpec]]
+ hosts=None, # type: Union[List[str],List[HostPlacementSpec], None]
count=None, # type: Optional[int]
host_pattern=None # type: Optional[str]
):
self.validate()
- def is_empty(self):
+ def is_empty(self) -> bool:
return self.label is None and \
not self.hosts and \
not self.host_pattern and \
self.count is None
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
if isinstance(other, PlacementSpec):
return self.label == other.label \
and self.hosts == other.hosts \
and self.host_pattern == other.host_pattern
return NotImplemented
- def set_hosts(self, hosts):
+ def set_hosts(self, hosts: Union[List[str], List[HostPlacementSpec]]) -> None:
# To backpopulate the .hosts attribute when using labels or count
# in the orchestrator backend.
if all([isinstance(host, HostPlacementSpec) for host in hosts]):
# get_host_selection_size
return []
- def get_host_selection_size(self, hostspecs: Iterable[HostSpec]):
+ def get_host_selection_size(self, hostspecs: Iterable[HostSpec]) -> int:
if self.count:
return self.count
return len(self.filter_matching_hostspecs(hostspecs))
- def pretty_str(self):
+ def pretty_str(self) -> str:
"""
>>> #doctest: +SKIP
... ps = PlacementSpec(...) # For all placement specs:
kv.append(self.host_pattern)
return ';'.join(kv)
- def __repr__(self):
+ def __repr__(self) -> str:
kv = []
if self.count:
kv.append('count=%d' % self.count)
@classmethod
@handle_type_error
- def from_json(cls, data):
+ def from_json(cls, data: dict) -> 'PlacementSpec':
c = data.copy()
hosts = c.get('hosts', [])
if hosts:
_cls.validate()
return _cls
- def to_json(self):
- r = {}
+ def to_json(self) -> dict:
+ r: Dict[str, Any] = {}
if self.label:
r['label'] = self.label
if self.hosts:
r['host_pattern'] = self.host_pattern
return r
- def validate(self):
+ def validate(self) -> None:
if self.hosts and self.label:
# TODO: a less generic Exception
raise ServiceSpecValidationError('Host and label are mutually exclusive')
REQUIRES_SERVICE_ID = 'iscsi mds nfs osd rgw container ha-rgw '.split()
@classmethod
- def _cls(cls, service_type):
+ def _cls(cls: Type[ServiceSpecT], service_type: str) -> Type[ServiceSpecT]:
from ceph.deployment.drive_group import DriveGroupSpec
ret = {
raise ServiceSpecValidationError('Spec needs a "service_type" key.')
return ret
- def __new__(cls, *args, **kwargs):
+ def __new__(cls: Type[ServiceSpecT], *args: Any, **kwargs: Any) -> ServiceSpecT:
"""
Some Python foo to make sure, we don't have an object
like `ServiceSpec('rgw')` of type `ServiceSpec`. Now we have:
if cls != ServiceSpec:
return object.__new__(cls)
service_type = kwargs.get('service_type', args[0] if args else None)
- sub_cls = cls._cls(service_type)
+ sub_cls: Any = cls._cls(service_type)
return object.__new__(sub_cls)
def __init__(self,
return _cls._from_json_impl(c) # type: ignore
@classmethod
- def _from_json_impl(cls, json_spec):
- args = {} # type: Dict[str, Dict[Any, Any]]
+ def _from_json_impl(cls: Type[ServiceSpecT], json_spec: dict) -> ServiceSpecT:
+ args = {} # type: Dict[str, Any]
for k, v in json_spec.items():
if k == 'placement':
v = PlacementSpec.from_json(v)
_cls.validate()
return _cls
- def service_name(self):
+ def service_name(self) -> str:
n = self.service_type
if self.service_id:
n += '.' + self.service_id
ret['spec'] = c
return ret
- def validate(self):
+ def validate(self) -> None:
if not self.service_type:
raise ServiceSpecValidationError('Cannot add Service: type required')
if self.placement is not None:
self.placement.validate()
- def __repr__(self):
+ def __repr__(self) -> str:
return "{}({!r})".format(self.__class__.__name__, self.__dict__)
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
return (self.__class__ == other.__class__
and
self.__dict__ == other.__dict__)
- def one_line_str(self):
+ def one_line_str(self) -> str:
return '<{} for service_name={}>'.format(self.__class__.__name__, self.service_name())
@staticmethod
- def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceSpec'):
+ def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceSpec') -> Any:
return dumper.represent_dict(data.to_json().items())
#: RADOS namespace where NFS client recovery data is stored in the pool.
self.namespace = namespace
- def validate(self):
+ def validate(self) -> None:
super(NFSServiceSpec, self).validate()
if not self.pool:
self.rgw_frontend_ssl_key = rgw_frontend_ssl_key
self.ssl = ssl
- def get_port(self):
+ def get_port(self) -> int:
if self.rgw_frontend_port:
return self.rgw_frontend_port
if self.ssl:
else:
return 80
- def rgw_frontends_config_value(self):
+ def rgw_frontends_config_value(self) -> str:
ports = []
if self.ssl:
ports.append(f"ssl_port={self.get_port()}")
ports.append(f"port={self.get_port()}")
return f'beast {" ".join(ports)}'
- def validate(self):
+ def validate(self) -> None:
super(RGWSpec, self).validate()
if not self.rgw_realm:
if not self.api_secure and self.ssl_cert and self.ssl_key:
self.api_secure = True
- def validate(self):
+ def validate(self) -> None:
super(IscsiServiceSpec, self).validate()
if not self.pool:
# when applying spec
self.definitive_host_list = [] # type: List[HostPlacementSpec]
- def validate(self):
+ def validate(self) -> None:
super(HA_RGWSpec, self).validate()
if not self.virtual_ip_interface:
class CustomContainerSpec(ServiceSpec):
def __init__(self,
service_type: str = 'container',
- service_id: str = None,
+ service_id: Optional[str] = None,
placement: Optional[PlacementSpec] = None,
unmanaged: bool = False,
preview_only: bool = False,
- image: str = None,
+ image: Optional[str] = None,
entrypoint: Optional[str] = None,
uid: Optional[int] = None,
gid: Optional[int] = None,