ignore_missing_imports = True
+[mypy-cephadm.services.*]
+
+disallow_untyped_defs = True
+
# Make cephadm and rook happy
[mypy-OpenSSL]
ignore_missing_imports = True
class CephadmDaemonSpec(Generic[ServiceSpecs]):
# typing.NamedTuple + Generic is broken in py36
- def __init__(self, host: str, daemon_id,
+ def __init__(self, host: str, daemon_id: str,
spec: Optional[ServiceSpecs] = None,
network: Optional[str] = None,
keyring: Optional[str] = None,
def name(self) -> str:
return '%s.%s' % (self.daemon_type, self.daemon_id)
- def config_get_files(self):
+ def config_get_files(self) -> Dict[str, Any]:
files = self.extra_files
if self.ceph_conf:
files['config'] = self.ceph_conf
@property
@abstractmethod
- def TYPE(self):
+ def TYPE(self) -> str:
pass
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
- def make_daemon_spec(self, host, daemon_id, netowrk, spec: ServiceSpecs) -> CephadmDaemonSpec:
+ def make_daemon_spec(self, host: str,
+ daemon_id: str,
+ netowrk: str,
+ spec: ServiceSpecs) -> CephadmDaemonSpec:
return CephadmDaemonSpec(
host=host,
daemon_id=daemon_id,
def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
raise NotImplementedError()
- def daemon_check_post(self, daemon_descrs: List[DaemonDescription]):
+ def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
"""The post actions needed to be done after daemons are checked"""
if self.mgr.config_dashboard:
if 'dashboard' in self.mgr.get('mgr_map')['modules']:
else:
logger.debug('Dashboard is not enabled. Skip configuration.')
- def config_dashboard(self, daemon_descrs: List[DaemonDescription]):
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
"""Config dashboard settings."""
raise NotImplementedError()
service_name: str,
get_mon_cmd: str,
set_mon_cmd: str,
- service_url: str):
+ service_url: str) -> None:
"""A helper to get and set service_url via Dashboard's MON command.
If result of get_mon_cmd differs from service_url, set_mon_cmd will
def _check_and_set_dashboard(self,
service_name: str,
get_cmd: str,
- get_set_cmd_dicts: Callable[[str], List[dict]]):
+ get_set_cmd_dicts: Callable[[str], List[dict]]) -> None:
"""A helper to set configs in the Dashboard.
The method is useful for the pattern:
'keyring': keyring,
}
- def remove_keyring(self, daemon: DaemonDescription):
+ def remove_keyring(self, daemon: DaemonDescription) -> None:
daemon_id: str = daemon.daemon_id
host: str = daemon.hostname
# if no active mgr found, return empty Daemon Desc
return DaemonDescription()
- def fail_over(self):
+ def fail_over(self) -> None:
if not self.mgr_map_has_standby():
raise OrchestratorError('Need standby mgr daemon', event_kind_subject=(
'daemon', 'mgr' + self.mgr.get_mgr_id()))
class RgwService(CephService):
TYPE = 'rgw'
- def config(self, spec: RGWSpec, rgw_id: str):
+ def config(self, spec: RGWSpec, rgw_id: str) -> None:
assert self.TYPE == spec.service_type
# create realm, zonegroup, and zone if needed
return daemon_spec
- def get_keyring(self, rgw_id: str):
+ def get_keyring(self, rgw_id: str) -> str:
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'entity': self.get_auth_entity(rgw_id),
})
return keyring
- def create_realm_zonegroup_zone(self, spec: RGWSpec, rgw_id: str):
+ def create_realm_zonegroup_zone(self, spec: RGWSpec, rgw_id: str) -> None:
if utils.get_cluster_health(self.mgr) != 'HEALTH_OK':
raise OrchestratorError('Health not ok, will try agin when health ok')
except Exception as e:
raise OrchestratorError('failed to parse realm info')
- def create_realm():
+ def create_realm() -> None:
cmd = ['radosgw-admin',
'--key=%s' % keyring,
'--user', 'rgw.%s' % rgw_id,
except Exception as e:
raise OrchestratorError('failed to parse zonegroup info')
- def create_zonegroup():
+ def create_zonegroup() -> None:
cmd = ['radosgw-admin',
'--key=%s' % keyring,
'--user', 'rgw.%s' % rgw_id,
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.mgr.log.info('created zonegroup: default')
- def create_zonegroup_if_required():
+ def create_zonegroup_if_required() -> None:
zonegroups = get_zonegroups()
if 'default' not in zonegroups:
create_zonegroup()
except Exception as e:
raise OrchestratorError('failed to parse zone info')
- def create_zone():
+ def create_zone() -> None:
cmd = ['radosgw-admin',
'--key=%s' % keyring,
'--user', 'rgw.%s' % rgw_id,
return daemon_spec
- def config_dashboard(self, daemon_descrs: List[DaemonDescription]):
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
def get_set_cmd_dicts(out: str) -> List[dict]:
gateways = json.loads(out)['gateways']
cmd_dicts = []
# if empty list provided, return empty Daemon Desc
return DaemonDescription()
- def config_dashboard(self, daemon_descrs: List[DaemonDescription]):
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
# TODO: signed cert
dd = self.get_active_daemon(daemon_descrs)
service_url = 'https://{}:{}'.format(
# if empty list provided, return empty Daemon Desc
return DaemonDescription()
- def config_dashboard(self, daemon_descrs: List[DaemonDescription]):
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
dd = self.get_active_daemon(daemon_descrs)
service_url = 'http://{}:{}'.format(self._inventory_get_addr(dd.hostname),
self.DEFAULT_SERVICE_PORT)
# if empty list provided, return empty Daemon Desc
return DaemonDescription()
- def config_dashboard(self, daemon_descrs: List[DaemonDescription]):
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
dd = self.get_active_daemon(daemon_descrs)
service_url = 'http://{}:{}'.format(
self._inventory_get_addr(dd.hostname), self.DEFAULT_SERVICE_PORT)
return get_cephadm_config(), deps
- def config_dashboard(self, daemon_descrs: List[DaemonDescription]):
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
def get_set_cmd_dicts(out: str) -> List[dict]:
locations: Set[str] = set()
import json
import logging
-from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional
+from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional, TYPE_CHECKING
from ceph.deployment import translate
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.drive_selection import DriveSelection
+from ceph.deployment.inventory import Device
from datetime import datetime
import orchestrator
from cephadm.services.cephadmservice import CephadmDaemonSpec, CephService
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
logger = logging.getLogger(__name__)
ret = create_from_spec_one(self.prepare_drivegroup(drive_group))
return ", ".join(filter(None, ret))
- def create_single_host(self, host: str, cmd: str, replace_osd_ids=None, env_vars: Optional[List[str]] = None) -> str:
+ def create_single_host(self, host: str, cmd: str, replace_osd_ids: List[str],
+ env_vars: Optional[List[str]] = None) -> str:
out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
if code == 1 and ', it is already prepared' in '\n'.join(err):
# set osd_id_claims
- def _find_inv_for_host(hostname: str, inventory_dict: dict):
+ def _find_inv_for_host(hostname: str, inventory_dict: dict) -> List[Device]:
# This is stupid and needs to be loaded with the host
for _host, _inventory in inventory_dict.items():
if _host == hostname:
logger.debug(f"Resulting ceph-volume cmd: {cmd}")
return cmd
- def get_previews(self, host) -> List[Dict[str, Any]]:
+ def get_previews(self, host: str) -> List[Dict[str, Any]]:
# Find OSDSpecs that match host.
osdspecs = self.resolve_osdspecs_for_host(host)
return self.generate_previews(osdspecs, host)
return []
return sum([spec.placement.filter_matching_hostspecs(self.mgr.inventory.all_specs()) for spec in osdspecs], [])
- def resolve_osdspecs_for_host(self, host: str, specs: Optional[List[DriveGroupSpec]] = None):
+ def resolve_osdspecs_for_host(self, host: str,
+ specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]:
matching_specs = []
self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>")
if not specs:
class RemoveUtil(object):
- def __init__(self, mgr):
- self.mgr = mgr
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr: "CephadmOrchestrator" = mgr
def process_removal_queue(self) -> None:
"""
self.mgr.to_remove_osds.intersection_update(new_queue)
self.save_to_store()
- def cleanup(self):
+ def cleanup(self) -> None:
# OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster()
- [self.mgr.to_remove_osds.remove(osd) for osd in not_in_cluster_osds]
+ for osd in not_in_cluster_osds:
+ self.mgr.to_remove_osds.remove(osd)
def get_osds_in_cluster(self) -> List[str]:
osd_map = self.mgr.get_osdmap()
self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
return True
- def save_to_store(self):
+ def save_to_store(self) -> None:
osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()]
logger.debug(f"Saving {osd_queue} to store")
self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
- def load_from_store(self):
+ def load_from_store(self) -> None:
for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
for osd in json.loads(v):
logger.debug(f"Loading osd ->{osd} from store")
def exists(self) -> bool:
return str(self.osd_id) in self.rm_util.get_osds_in_cluster()
- def drain_status_human(self):
+ def drain_status_human(self) -> str:
default_status = 'not started'
status = 'started' if self.started and not self.draining else default_status
status = 'draining' if self.draining else status
status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status
return status
- def pg_count_str(self):
+ def pg_count_str(self) -> str:
return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())
def to_json(self) -> dict:
inp['hostname'] = hostname
return cls(**inp)
- def __hash__(self):
+ def __hash__(self) -> int:
return hash(self.osd_id)
def __eq__(self, other: object) -> bool:
class OSDQueue(Set):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__()
def as_osd_ids(self) -> List[int]: