import ceph_module # noqa
try:
- from typing import Set, Tuple, Iterator, Any
+ from typing import Set, Tuple, Iterator, Any, Dict, Optional, Callable, List
except ImportError:
# just for type checking
pass
return osd_list
def device_class_counts(self):
- result = defaultdict(int)
+ result = defaultdict(int) # type: Dict[str, int]
# TODO don't abuse dump like this
d = self.dump()
for device in d['devices']:
class CLICommand(object):
- COMMANDS = {}
+ COMMANDS = {} # type: Dict[str, CLICommand]
def __init__(self, prefix, args="", desc="", perm="rw"):
self.prefix = prefix
self.args_dict = {}
self.desc = desc
self.perm = perm
- self.func = None
+ self.func = None # type: Optional[Callable]
self._parse_args()
def _parse_args(self):
kwargs[a.replace("-", "_")] = cmd_dict[a]
if inbuf:
kwargs['inbuf'] = inbuf
+ assert self.func
return self.func(mgr, **kwargs)
@classmethod
from their active peer), and to configuration settings (read only).
"""
- MODULE_OPTIONS = []
- MODULE_OPTION_DEFAULTS = {}
+ MODULE_OPTIONS = [] # type: List[Dict[str, Any]]
+ MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
def __init__(self, module_name, capsule):
super(MgrStandbyModule, self).__init__(capsule)
class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
- COMMANDS = []
- MODULE_OPTIONS = []
- MODULE_OPTION_DEFAULTS = {}
+ COMMANDS = [] # type: List[Any]
+ MODULE_OPTIONS = [] # type: List[dict]
+ MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
# Priority definitions for perf counters
PRIO_CRITICAL = 10
return ''
def _perfpath_to_path_labels(self, daemon, path):
- label_names = ("ceph_daemon",)
- labels = (daemon,)
+ # type: (str, str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]
+ label_names = ("ceph_daemon",) # type: Tuple[str, ...]
+ labels = (daemon,) # type: Tuple[str, ...]
if daemon.startswith('rbd-mirror.'):
match = re.match(
value.
"""
- result = defaultdict(dict)
+ result = defaultdict(dict) # type: Dict[str, dict]
for server in self.list_servers():
for service in server['services']:
return host_spec
from ipaddress import ip_network, ip_address
- networks = list()
+ networks = list() # type: List[str]
network = host_spec.network
# in case we have [v2:1.2.3.4:3000,v1:1.2.3.4:6478]
if ',' in network:
def __init__(self,
_first_promise=None, # type: Optional["_Promise"]
- value=NO_RESULT, # type: Optional
+ value=NO_RESULT, # type: Optional[Any]
on_complete=None, # type: Optional[Callable]
name=None, # type: Optional[str]
):
# _Promise is not a continuation monad, as `_result` is of type
# T instead of (T -> r) -> r. Therefore we need to store the first promise here.
- self._first_promise = _first_promise or self # type: 'Completion'
+ self._first_promise = _first_promise or self # type: '_Promise'
def __repr__(self):
name = self._name or getattr(self._on_complete, '__name__', '??') if self._on_complete else 'None'
else:
name = self._on_complete.__class__.__name__
val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...'
- if hasattr(val, 'debug_str'):
- val = val.debug_str()
prefix = {
self.INITIALIZED: ' ',
self.RUNNING: ' >>>',
assert self not in next_result
next_result._append_promise(self._next_promise)
self._set_next_promise(next_result)
+ assert self._next_promise
if self._next_promise._value is self.NO_RESULT:
self._next_promise._value = self._value
self.propagate_to_next()
def __init__(self,
_first_promise=None, # type: Optional["Completion"]
value=_Promise.NO_RESULT, # type: Any
- on_complete=None, # type: Optional[Callable],
+ on_complete=None, # type: Optional[Callable]
name=None, # type: Optional[str]
):
super(Completion, self).__init__(_first_promise, value, on_complete, name)
def _progress_reference(self):
# type: () -> Optional[ProgressReference]
if hasattr(self._on_complete, 'progress_id'):
- return self._on_complete
+ return self._on_complete # type: ignore
return None
@property
def with_progress(cls, # type: Any
message, # type: str
mgr,
- _first_promise=None, # type: Optional["Completions"]
+ _first_promise=None, # type: Optional["Completion"]
value=_Promise.NO_RESULT, # type: Any
on_complete=None, # type: Optional[Callable]
calc_percent=None # type: Optional[Callable[[], Any]]
return self.get_inventory()
def add_host_label(self, host, label):
- # type: (str) -> WriteCompletion
+ # type: (str, str) -> Completion
"""
Add a host label
"""
- return NotImplementedError()
+ raise NotImplementedError()
def remove_host_label(self, host, label):
- # type: (str) -> WriteCompletion
+ # type: (str, str) -> Completion
"""
Remove a host label
"""
- return NotImplementedError()
+ raise NotImplementedError()
def get_inventory(self, node_filter=None, refresh=False):
- # type: (InventoryFilter, bool) -> Completion
+ # type: (Optional[InventoryFilter], bool) -> Completion
"""
Returns something that was created by `ceph-volume inventory`.
raise NotImplementedError()
def service_action(self, action, service_type, service_name=None, service_id=None):
- # type: (str, str, str, str) -> Completion
+ # type: (str, str, Optional[str], Optional[str]) -> Completion
"""
Perform an action (start/stop/reload) on a service.
raise NotImplementedError()
def blink_device_light(self, ident_fault, on, locations):
- # type: (str, bool, List[DeviceLightLoc]) -> WriteCompletion
+ # type: (str, bool, List[DeviceLightLoc]) -> Completion
"""
Instructs the orchestrator to enable or disable either the ident or the fault LED.
@property
def rgw_multisite_endpoint_addr(self):
"""Returns the first host. Not supported for Rook."""
- return self.hosts[0]
+ return self.placement.hosts[0]
@property
def rgw_multisite_endpoints_list(self):
return ",".join(["{}://{}:{}".format(self.rgw_multisite_proto,
host,
- self.rgw_frontend_port) for host in self.hosts])
+ self.rgw_frontend_port) for host in self.placement.hosts])
def genkey(self, nchars):
""" Returns a random string of nchars
"""
def __init__(self, labels=None, nodes=None):
- # type: (List[str], List[str]) -> None
- self.labels = labels # Optional: get info about nodes matching labels
- self.nodes = nodes # Optional: get info about certain named nodes only
+ # type: (Optional[List[str]], Optional[List[str]]) -> None
+ #: Optional: get info about nodes matching labels
+ self.labels = labels
+ #: Optional: get info about certain named nodes only
+ self.nodes = nodes
class InventoryNode(object):
InventoryNode.
"""
def __init__(self, name, devices=None, labels=None):
- # type: (str, inventory.Devices, List[str]) -> None
+ # type: (str, Optional[inventory.Devices], Optional[List[str]]) -> None
if devices is None:
devices = inventory.Devices([])
if labels is None:
# type: (Optional[dict], Optional[datetime.datetime]) -> None
self._data = data
if data is not None and last_refresh is None:
- self.last_refresh = datetime.datetime.utcnow()
+ self.last_refresh = datetime.datetime.utcnow() # type: Optional[datetime.datetime]
else:
self.last_refresh = last_refresh
def json(self):
if self.last_refresh is not None:
- timestr = self.last_refresh.strftime(self.DATEFMT)
+ timestr = self.last_refresh.strftime(self.DATEFMT) # type: Optional[str]
else:
timestr = None
def __getitem__(self, item):
# type: (str) -> OutdatableData
- return OutdatableData.from_json(super(OutdatableDictMixin, self).__getitem__(item))
+ return OutdatableData.from_json(super(OutdatableDictMixin, self).__getitem__(item)) # type: ignore
def __setitem__(self, key, value):
# type: (str, OutdatableData) -> None
val = None if value is None else value.json()
- super(OutdatableDictMixin, self).__setitem__(key, val)
+ super(OutdatableDictMixin, self).__setitem__(key, val) # type: ignore
def items(self):
- # type: () -> Iterator[Tuple[str, OutdatableData]]
- for item in super(OutdatableDictMixin, self).items():
+ ## type: () -> Iterator[Tuple[str, OutdatableData]]
+ for item in super(OutdatableDictMixin, self).items(): # type: ignore
k, v = item
yield k, OutdatableData.from_json(v)
def remove_outdated(self):
outdated = [item[0] for item in self.items() if item[1].outdated()]
for o in outdated:
- del self[o]
+ del self[o] # type: ignore
def invalidate(self, key):
self[key] = OutdatableData(self[key].data,