import functools
from subprocess import check_output, CalledProcessError
try:
- from typing import Callable, List
+ from typing import Callable, List, Tuple
except ImportError:
pass # type checking
return self._services
out = map(str, check_output(['ps', 'aux']).splitlines())
- types = [service_type] if service_type else ("mds", "osd", "mon", "rgw", "mgr")
+ types = (service_type, ) if service_type else ("mds", "osd", "mon", "rgw", "mgr")
+ assert isinstance(types, tuple)
processes = [p for p in out if any([('ceph-' + t in p) for t in types])]
result = []
for p in processes:
sd = orchestrator.ServiceDescription()
sd.nodename = 'localhost'
- sd.service_instance = re.search('ceph-[^ ]+', p).group()
+ res = re.search('ceph-[^ ]+', p)
+ assert res
+ sd.service_instance = res.group()
result.append(sd)
return result
def create_osds(self, drive_group):
+ # type: (orchestrator.DriveGroupSpec) -> TestCompletion
def run(all_hosts):
drive_group.validate(orchestrator.InventoryNode.get_host_names(all_hosts))
return self.get_hosts().then(run).then(
@deferred_write("Adding NFS service")
def add_nfs(self, spec):
+ # type: (orchestrator.NFSServiceSpec) -> None
assert isinstance(spec.pool, str)
@deferred_write("remove_nfs")
@deferred_write("update_mgrs")
def update_mgrs(self, spec):
+ # type: (orchestrator.StatefulServiceSpec) -> None
+
assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
assert all([isinstance(h, str) for h in spec.placement.hosts])
@deferred_write("update_mons")
def update_mons(self, spec):
+ # type: (orchestrator.StatefulServiceSpec) -> None
+
assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
assert all([isinstance(h[0], str) for h in spec.placement.hosts])
assert all([isinstance(h[1], str) or h[1] is None for h in spec.placement.hosts])