From 7bbc77cd48bf48da2c120220ad31678f650c05b1 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 20 Nov 2020 13:04:50 +0100 Subject: [PATCH] mgr/cephadm: make type annotations for module.py mandatory Fixing bogus json representation for registry-login Signed-off-by: Sebastian Wagner --- src/mypy.ini | 3 + src/pybind/mgr/cephadm/module.py | 219 ++++++++++--------- src/pybind/mgr/cephadm/services/iscsi.py | 1 + src/pybind/mgr/cephadm/services/nfs.py | 1 + src/pybind/mgr/cephadm/services/osd.py | 2 + src/pybind/mgr/cephadm/tests/test_cephadm.py | 6 +- 6 files changed, 126 insertions(+), 106 deletions(-) diff --git a/src/mypy.ini b/src/mypy.ini index 0a0737659bb37..cf8a8b5f4e893 100755 --- a/src/mypy.ini +++ b/src/mypy.ini @@ -34,6 +34,9 @@ disallow_untyped_defs = True [mypy-cephadm.schedule] disallow_untyped_defs = True +[mypy-cephadm.module] +disallow_untyped_defs = True + # Make cephadm and rook happy [mypy-OpenSSL] ignore_missing_imports = True diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 7bd854a09cdba..223612e542332 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -25,7 +25,7 @@ from ceph.deployment import inventory from ceph.deployment.drive_group import DriveGroupSpec from ceph.deployment.service_spec import \ NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \ - CustomContainerSpec + CustomContainerSpec, HostPlacementSpec from cephadm.serve import CephadmServe from cephadm.services.cephadmservice import CephadmDaemonSpec @@ -61,7 +61,7 @@ try: # (https://github.com/alfredodeza/remoto/pull/56) lands from distutils.version import StrictVersion if StrictVersion(remoto.__version__) <= StrictVersion('1.2'): - def remoto_has_connection(self): + def remoto_has_connection(self: Any) -> bool: return self.gateway.hasreceiver() from remoto.backends import BaseConnection @@ -95,7 +95,7 @@ CEPH_TYPES = set(CEPH_UPGRADE_ORDER) class CephadmCompletion(orchestrator.Completion[T]): - def evaluate(self): + def evaluate(self) -> None: self.finalize(None) @@ -106,16 +106,16 @@ def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T """ @wraps(f) - def wrapper(*args, **kwargs): + def wrapper(*args: Any, **kwargs: Any) -> CephadmCompletion: return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs)) return wrapper -def service_inactive(spec_name: str): - def inner(func): +def service_inactive(spec_name: str) -> Callable: + def inner(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + def wrapper(*args: Any, **kwargs: Any) -> Any: obj = args[0] if obj.get_store(f"spec.{spec_name}") is not None: return 1, "", f"Unable to change configuration of an active service {spec_name}" @@ -294,7 +294,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, } ] - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any): super(CephadmOrchestrator, self).__init__(*args, **kwargs) self._cluster_fsid = self.get('mon_map')['fsid'] self.last_monmap: Optional[datetime.datetime] = None @@ -357,7 +357,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.upgrade = CephadmUpgrade(self) - self.health_checks = {} + self.health_checks: Dict[str, dict] = {} self.all_progress_references = list() # type: List[orchestrator.ProgressReference] @@ -423,9 +423,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.template = TemplateMgr(self) - self.requires_post_actions = set() + self.requires_post_actions: Set[str] = set() - def shutdown(self): + def shutdown(self) -> None: self.log.debug('shutdown') self._worker_pool.close() self._worker_pool.join() @@ -436,22 +436,25 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES return self.cephadm_services[service_type] - def _kick_serve_loop(self): + def _kick_serve_loop(self) -> None: self.log.debug('_kick_serve_loop') self.event.set() # function responsible for logging single host into custom registry - def _registry_login(self, host, url, username, password): + def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]: self.log.debug(f"Attempting to log host {host} into custom registry @ {url}") # want to pass info over stdin rather than through normal list of args - args_str = ("{\"url\": \"" + url + "\", \"username\": \"" + username + "\", " - " \"password\": \"" + password + "\"}") + args_str = json.dumps({ + 'url': url, + 'username': username, + 'password': password, + }) out, err, code = self._run_cephadm( host, 'mon', 'registry-login', ['--registry-json', '-'], stdin=args_str, error_ok=True) if code: return f"Host {host} failed to login to {url} as {username} with given password" - return + return None def serve(self) -> None: """ @@ -463,7 +466,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, serve = CephadmServe(self) serve.serve() - def set_container_image(self, entity: str, image): + def set_container_image(self, entity: str, image: str) -> None: self.check_mon_command({ 'prefix': 'config set', 'name': 'container_image', @@ -471,7 +474,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, 'who': entity, }) - def config_notify(self): + def config_notify(self) -> None: """ This method is called whenever one of our config options is changed. @@ -491,7 +494,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.event.set() - def notify(self, notify_type, notify_id): + def notify(self, notify_type: str, notify_id: Optional[str]) -> None: if notify_type == "mon_map": # get monmap mtime so we can refresh configs when mons change monmap = self.get('mon_map') @@ -505,7 +508,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, if notify_type == "pg_summary": self._trigger_osd_removal() - def _trigger_osd_removal(self): + def _trigger_osd_removal(self) -> None: data = self.get("osd_stats") for osd in data.get('osd_stats', []): if osd.get('num_pgs') == 0: @@ -517,7 +520,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, # start the process self.rm_util.process_removal_queue() - def pause(self): + def pause(self) -> None: if not self.paused: self.log.info('Paused') self.set_store('pause', 'true') @@ -525,7 +528,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, # wake loop so we update the health status self._kick_serve_loop() - def resume(self): + def resume(self) -> None: if self.paused: self.log.info('Resumed') self.paused = False @@ -570,7 +573,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, continue return name - def _reconfig_ssh(self): + def _reconfig_ssh(self) -> None: temp_files = [] # type: list ssh_options = [] # type: List[str] @@ -621,7 +624,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self._reset_cons() - def validate_ssh_config_content(self, ssh_config): + def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None: if ssh_config is None or len(ssh_config.strip()) == 0: raise OrchestratorValidationError('ssh_config cannot be empty') # StrictHostKeyChecking is [yes|no] ? @@ -632,38 +635,38 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, if 'ask' in s.lower(): raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'') - def validate_ssh_config_fname(self, ssh_config_fname): + def validate_ssh_config_fname(self, ssh_config_fname: str) -> None: if not os.path.isfile(ssh_config_fname): raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format( ssh_config_fname)) - def _reset_con(self, host): + def _reset_con(self, host: str) -> None: conn, r = self._cons.get(host, (None, None)) if conn: self.log.debug('_reset_con close %s' % host) conn.exit() del self._cons[host] - def _reset_cons(self): + def _reset_cons(self) -> None: for host, conn_and_r in self._cons.items(): self.log.debug('_reset_cons close %s' % host) conn, r = conn_and_r conn.exit() self._cons = {} - def offline_hosts_remove(self, host): + def offline_hosts_remove(self, host: str) -> None: if host in self.offline_hosts: self.offline_hosts.remove(host) @staticmethod - def can_run(): + def can_run() -> Tuple[bool, str]: if remoto is not None: return True, "" else: return False, "loading remoto library:{}".format( remoto_import_error) - def available(self): + def available(self) -> Tuple[bool, str]: """ The cephadm orchestrator is always available. """ @@ -674,7 +677,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`' return True, '' - def process(self, completions): + def process(self, completions: List[CephadmCompletion]) -> None: """ Does nothing, as completions are processed in another thread. """ @@ -688,7 +691,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, @orchestrator._cli_write_command( prefix='cephadm set-ssh-config', desc='Set the ssh_config file (use -i )') - def _set_ssh_config(self, inbuf=None): + def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: """ Set an ssh_config file provided from stdin """ @@ -703,7 +706,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, @orchestrator._cli_write_command( prefix='cephadm clear-ssh-config', desc='Clear the ssh_config file') - def _clear_ssh_config(self): + def _clear_ssh_config(self) -> Tuple[int, str, str]: """ Clear the ssh_config file provided from stdin """ @@ -717,7 +720,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, prefix='cephadm get-ssh-config', desc='Returns the ssh config as used by cephadm' ) - def _get_ssh_config(self): + def _get_ssh_config(self) -> HandleCommandResult: if self.ssh_config_file: self.validate_ssh_config_fname(self.ssh_config_file) with open(self.ssh_config_file) as f: @@ -730,7 +733,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, @orchestrator._cli_write_command( 'cephadm generate-key', desc='Generate a cluster SSH key (if not present)') - def _generate_key(self): + def _generate_key(self) -> Tuple[int, str, str]: if not self.ssh_pub or not self.ssh_key: self.log.info('Generating ssh key...') tmp_dir = TemporaryDirectory() @@ -758,7 +761,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, @orchestrator._cli_write_command( 'cephadm set-priv-key', desc='Set cluster SSH private key (use -i )') - def _set_priv_key(self, inbuf=None): + def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: if inbuf is None or len(inbuf) == 0: return -errno.EINVAL, "", "empty private ssh key provided" if inbuf == self.ssh_key: @@ -771,7 +774,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, @orchestrator._cli_write_command( 'cephadm set-pub-key', desc='Set cluster SSH public key (use -i )') - def _set_pub_key(self, inbuf=None): + def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: if inbuf is None or len(inbuf) == 0: return -errno.EINVAL, "", "empty public ssh key provided" if inbuf == self.ssh_pub: @@ -784,7 +787,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, @orchestrator._cli_write_command( 'cephadm clear-key', desc='Clear cluster SSH key') - def _clear_key(self): + def _clear_key(self) -> Tuple[int, str, str]: self.set_store('ssh_identity_key', None) self.set_store('ssh_identity_pub', None) self._reconfig_ssh() @@ -794,7 +797,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, @orchestrator._cli_read_command( 'cephadm get-pub-key', desc='Show SSH public key for connecting to cluster hosts') - def _get_pub_key(self): + def _get_pub_key(self) -> Tuple[int, str, str]: if self.ssh_pub: return 0, self.ssh_pub, '' else: @@ -803,14 +806,14 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, @orchestrator._cli_read_command( 'cephadm get-user', desc='Show user for SSHing to cluster hosts') - def _get_user(self): + def _get_user(self) -> Tuple[int, str, str]: return 0, self.ssh_user, '' @orchestrator._cli_read_command( 'cephadm set-user', 'name=user,type=CephString', 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users') - def set_ssh_user(self, user): + def set_ssh_user(self, user: str) -> Tuple[int, str, str]: current_user = self.ssh_user if user == current_user: return 0, "value unchanged", "" @@ -838,12 +841,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, "name=username,type=CephString,req=false " "name=password,type=CephString,req=false", 'Set custom registry login info by providing url, username and password or json file with login info (-i )') - def registry_login(self, url=None, username=None, password=None, inbuf=None): + def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]: # if password not given in command line, get it through file input if not (url and username and password) and (inbuf is None or len(inbuf) == 0): return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments " "or -i ") elif not (url and username and password): + assert isinstance(inbuf, str) login_info = json.loads(inbuf) if "url" in login_info and "username" in login_info and "password" in login_info: url = login_info["url"] @@ -881,7 +885,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, 'name=host,type=CephString ' 'name=addr,type=CephString,req=false', 'Check whether we can access and manage a remote host') - def check_host(self, host, addr=None): + def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: try: out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host', ['--expect-hostname', host], @@ -899,14 +903,14 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: if item.startswith('host %s ' % host): self.event.set() - return 0, '%s (%s) ok' % (host, addr), err + return 0, '%s (%s) ok' % (host, addr), '\n'.join(err) @orchestrator._cli_read_command( 'cephadm prepare-host', 'name=host,type=CephString ' 'name=addr,type=CephString,req=false', 'Prepare a remote host for use with cephadm') - def _prepare_host(self, host, addr=None): + def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host', ['--expect-hostname', host], addr=addr, @@ -919,7 +923,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: if item.startswith('host %s ' % host): self.event.set() - return 0, '%s (%s) ok' % (host, addr), err + return 0, '%s (%s) ok' % (host, addr), '\n'.join(err) @orchestrator._cli_write_command( prefix='cephadm set-extra-ceph-conf', @@ -927,7 +931,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, "Mainly a workaround, till `config generate-minimal-conf` generates\n" "a complete ceph.conf.\n\n" "Warning: this is a dangerous operation.") - def _set_extra_ceph_conf(self, inbuf=None) -> HandleCommandResult: + def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult: if inbuf: # sanity check. cp = ConfigParser() @@ -966,12 +970,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, prefix='cephadm generate-exporter-config', desc='Generate default SSL crt/key and token for cephadm exporter daemons') @service_inactive('cephadm-exporter') - def _generate_exporter_config(self): + def _generate_exporter_config(self) -> Tuple[int, str, str]: self._set_exporter_defaults() self.log.info('Default settings created for cephadm exporter(s)') return 0, "", "" - def _set_exporter_defaults(self): + def _set_exporter_defaults(self) -> None: crt, key = self._generate_exporter_ssl() token = self._generate_exporter_token() self._set_exporter_config({ @@ -982,17 +986,17 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, }) self._set_exporter_option('enabled', 'true') - def _generate_exporter_ssl(self): + def _generate_exporter_ssl(self) -> Tuple[str, str]: return create_self_signed_cert(dname={"O": "Ceph", "OU": "cephadm-exporter"}) - def _generate_exporter_token(self): + def _generate_exporter_token(self) -> str: return secrets.token_hex(32) @orchestrator._cli_write_command( prefix='cephadm clear-exporter-config', desc='Clear the SSL configuration used by cephadm exporter daemons') @service_inactive('cephadm-exporter') - def _clear_exporter_config(self): + def _clear_exporter_config(self) -> Tuple[int, str, str]: self._clear_exporter_config_settings() self.log.info('Cleared cephadm exporter configuration') return 0, "", "" @@ -1005,7 +1009,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, prefix='cephadm set-exporter-config', desc='Set custom cephadm-exporter configuration from a json file (-i ). JSON must contain crt, key, token and port') @service_inactive('cephadm-exporter') - def _store_exporter_config(self, inbuf=None): + def _store_exporter_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: if not inbuf: return 1, "", "JSON configuration has not been provided (-i )" @@ -1031,7 +1035,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, @orchestrator._cli_read_command( 'cephadm get-exporter-config', desc='Show the current cephadm-exporter configuraion (JSON)') - def _show_exporter_config(self): + def _show_exporter_config(self) -> Tuple[int, str, str]: cfg = self._get_exporter_config() return 0, json.dumps(cfg, indent=2), "" @@ -1057,7 +1061,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, return False return conf.last_modified > dt - def _get_connection(self, host: str): + def _get_connection(self, host: str) -> Tuple['remoto.backends.BaseConnection', + 'remoto.backends.LegacyModuleExecute']: """ Setup a connection for running commands on remote host. """ @@ -1084,7 +1089,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, return conn, r - def _executable_path(self, conn, executable): + def _executable_path(self, conn: 'remoto.backends.BaseConnection', executable: str) -> str: """ Remote validator that accepts a connection object to ensure that a certain executable is available returning its full path if so. @@ -1331,7 +1336,7 @@ To check that the host is reachable: return "Removed host '{}'".format(host) @trivial_completion - def update_host_addr(self, host, addr) -> str: + def update_host_addr(self, host: str, addr: str) -> str: self.inventory.set_addr(host, addr) self._reset_con(host) self.event.set() # refresh stray health check @@ -1350,19 +1355,19 @@ To check that the host is reachable: return list(self.inventory.all_specs()) @trivial_completion - def add_host_label(self, host, label) -> str: + def add_host_label(self, host: str, label: str) -> str: self.inventory.add_label(host, label) self.log.info('Added label %s to host %s' % (label, host)) return 'Added label %s to host %s' % (label, host) @trivial_completion - def remove_host_label(self, host, label) -> str: + def remove_host_label(self, host: str, label: str) -> str: self.inventory.rm_label(host, label) self.log.info('Removed label %s to host %s' % (label, host)) return 'Removed label %s from host %s' % (label, host) @trivial_completion - def host_ok_to_stop(self, hostname: str): + def host_ok_to_stop(self, hostname: str) -> str: if hostname not in self.cache.get_hosts(): raise OrchestratorError(f'Cannot find host "{hostname}"') @@ -1393,7 +1398,7 @@ To check that the host is reachable: config += '\n\n' + extra.strip() + '\n' return config - def _invalidate_daemons_and_kick_serve(self, filter_host=None): + def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None: if filter_host: self.cache.invalidate_host_daemons(filter_host) else: @@ -1518,7 +1523,7 @@ To check that the host is reachable: return result @trivial_completion - def service_action(self, action, service_name) -> List[str]: + def service_action(self, action: str, service_name: str) -> List[str]: args = [] for host, dm in self.cache.daemons.items(): for name, d in dm.items(): @@ -1529,14 +1534,14 @@ To check that the host is reachable: return self._daemon_actions(args) @forall_hosts - def _daemon_actions(self, daemon_type, daemon_id, host, action) -> str: + def _daemon_actions(self, daemon_type: str, daemon_id: str, host: str, action: str) -> str: with set_exception_subject('daemon', DaemonDescription( daemon_type=daemon_type, daemon_id=daemon_id ).name()): return self._daemon_action(daemon_type, daemon_id, host, action) - def _daemon_action(self, daemon_type, daemon_id, host, action, image=None) -> str: + def _daemon_action(self, daemon_type: str, daemon_id: str, host: str, action: str, image: Optional[str] = None) -> str: daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec( host=host, daemon_id=daemon_id, @@ -1572,7 +1577,7 @@ To check that the host is reachable: self.events.for_daemon(name, 'INFO', msg) return msg - def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str): + def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None: if image is not None: if action != 'redeploy': raise OrchestratorError( @@ -1606,7 +1611,7 @@ To check that the host is reachable: def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool: return daemon_type == 'mgr' and daemon_id == self.get_mgr_id() - def _schedule_daemon_action(self, daemon_name: str, action: str): + def _schedule_daemon_action(self, daemon_name: str, action: str) -> str: dd = self.cache.get_daemon(daemon_name) if action == 'redeploy' and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \ and not self.mgr_service.mgr_map_has_standby(): @@ -1631,7 +1636,7 @@ To check that the host is reachable: return self._remove_daemons(args) @trivial_completion - def remove_service(self, service_name) -> str: + def remove_service(self, service_name: str) -> str: self.log.info('Remove service %s' % service_name) self._trigger_preview_refresh(service_name=service_name) found = self.spec_store.rm(service_name) @@ -1646,7 +1651,7 @@ To check that the host is reachable: return f'Failed to remove service. <{service_name}> was not found.' @trivial_completion - def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh=False) -> List[orchestrator.InventoryHost]: + def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]: """ Return the storage inventory of hosts matching the given filter. @@ -1675,7 +1680,7 @@ To check that the host is reachable: return result @trivial_completion - def zap_device(self, host, path) -> str: + def zap_device(self, host: str, path: str) -> str: self.log.info('Zap device %s:%s' % (host, path)) out, err, code = self._run_cephadm( host, 'osd', 'ceph-volume', @@ -1701,7 +1706,7 @@ To check that the host is reachable: See templates/blink_device_light_cmd.j2 """ @forall_hosts - def blink(host, dev, path): + def blink(host: str, dev: str, path: str) -> str: cmd_line = self.template.render('blink_device_light_cmd.j2', { 'on': on, @@ -1780,7 +1785,7 @@ To check that the host is reachable: def _preview_osdspecs(self, osdspecs: Optional[List[DriveGroupSpec]] = None - ): + ) -> dict: if not osdspecs: return {'n/a': [{'error': True, 'message': 'No OSDSpec or matching hosts found.'}]} @@ -1807,7 +1812,7 @@ To check that the host is reachable: previews_for_specs.update({host: osd_reports}) return previews_for_specs - def _calc_daemon_deps(self, daemon_type, daemon_id): + def _calc_daemon_deps(self, daemon_type: str, daemon_id: str) -> List[str]: need = { 'prometheus': ['mgr', 'alertmanager', 'node-exporter'], 'grafana': ['prometheus'], @@ -1821,7 +1826,7 @@ To check that the host is reachable: def _create_daemon(self, daemon_spec: CephadmDaemonSpec, - reconfig=False, + reconfig: bool = False, osd_uuid_map: Optional[Dict[str, Any]] = None, ) -> str: @@ -1939,10 +1944,10 @@ To check that the host is reachable: return code == 0 @forall_hosts - def _remove_daemons(self, name, host) -> str: + def _remove_daemons(self, name: str, host: str) -> str: return self._remove_daemon(name, host) - def _remove_daemon(self, name, host) -> str: + def _remove_daemon(self, name: str, host: str) -> str: """ Remove a daemon """ @@ -1969,14 +1974,17 @@ To check that the host is reachable: return "Removed {} from host '{}'".format(name, host) - def _check_pool_exists(self, pool, service_name): + def _check_pool_exists(self, pool: str, service_name: str) -> None: logger.info(f'Checking pool "{pool}" exists for service {service_name}') if not self.rados.pool_exists(pool): raise OrchestratorError(f'Cannot find pool "{pool}" for ' f'service {service_name}') - def _add_daemon(self, daemon_type, spec, - create_func: Callable[..., CephadmDaemonSpec], config_func=None) -> List[str]: + def _add_daemon(self, + daemon_type: str, + spec: ServiceSpec, + create_func: Callable[..., CephadmDaemonSpec], + config_func: Optional[Callable] = None) -> List[str]: """ Add (and place) a daemon. Require explicit host placement. Do not schedule, and do not apply the related scheduling limitations. @@ -1990,9 +1998,14 @@ To check that the host is reachable: spec.placement.hosts, count, create_func, config_func) - def _create_daemons(self, daemon_type, spec, daemons, - hosts, count, - create_func: Callable[..., CephadmDaemonSpec], config_func=None) -> List[str]: + def _create_daemons(self, + daemon_type: str, + spec: ServiceSpec, + daemons: List[DaemonDescription], + hosts: List[HostPlacementSpec], + count: int, + create_func: Callable[..., CephadmDaemonSpec], + config_func: Optional[Callable] = None) -> List[str]: if count > len(hosts): raise OrchestratorError('too few hosts: want %d, have %s' % ( count, hosts)) @@ -2027,14 +2040,14 @@ To check that the host is reachable: daemons.append(sd) @forall_hosts - def create_func_map(*args): + def create_func_map(*args: Any) -> str: daemon_spec = create_func(*args) return self._create_daemon(daemon_spec) return create_func_map(args) @trivial_completion - def apply_mon(self, spec) -> str: + def apply_mon(self, spec: ServiceSpec) -> str: return self._apply(spec) @trivial_completion @@ -2058,7 +2071,7 @@ To check that the host is reachable: return self._apply_service_spec(cast(ServiceSpec, spec)) - def _plan(self, spec: ServiceSpec): + def _plan(self, spec: ServiceSpec) -> dict: if spec.service_type == 'osd': return {'service_name': spec.service_name(), 'service_type': spec.service_type, @@ -2140,7 +2153,7 @@ To check that the host is reachable: return results @trivial_completion - def apply_mgr(self, spec) -> str: + def apply_mgr(self, spec: ServiceSpec) -> str: return self._apply(spec) @trivial_completion @@ -2152,11 +2165,11 @@ To check that the host is reachable: return self._apply(spec) @trivial_completion - def add_rgw(self, spec) -> List[str]: + def add_rgw(self, spec: ServiceSpec) -> List[str]: return self._add_daemon('rgw', spec, self.rgw_service.prepare_create, self.rgw_service.config) @trivial_completion - def apply_rgw(self, spec) -> str: + def apply_rgw(self, spec: ServiceSpec) -> str: return self._apply(spec) @trivial_completion @@ -2165,23 +2178,23 @@ To check that the host is reachable: return self._add_daemon('iscsi', spec, self.iscsi_service.prepare_create, self.iscsi_service.config) @trivial_completion - def apply_iscsi(self, spec) -> str: + def apply_iscsi(self, spec: ServiceSpec) -> str: return self._apply(spec) @trivial_completion - def add_rbd_mirror(self, spec) -> List[str]: + def add_rbd_mirror(self, spec: ServiceSpec) -> List[str]: return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.prepare_create) @trivial_completion - def apply_rbd_mirror(self, spec) -> str: + def apply_rbd_mirror(self, spec: ServiceSpec) -> str: return self._apply(spec) @trivial_completion - def add_nfs(self, spec) -> List[str]: + def add_nfs(self, spec: ServiceSpec) -> List[str]: return self._add_daemon('nfs', spec, self.nfs_service.prepare_create, self.nfs_service.config) @trivial_completion - def apply_nfs(self, spec) -> str: + def apply_nfs(self, spec: ServiceSpec) -> str: return self._apply(spec) def _get_dashboard_url(self): @@ -2189,11 +2202,11 @@ To check that the host is reachable: return self.get('mgr_map').get('services', {}).get('dashboard', '') @trivial_completion - def add_prometheus(self, spec) -> List[str]: + def add_prometheus(self, spec: ServiceSpec) -> List[str]: return self._add_daemon('prometheus', spec, self.prometheus_service.prepare_create) @trivial_completion - def apply_prometheus(self, spec) -> str: + def apply_prometheus(self, spec: ServiceSpec) -> str: return self._apply(spec) @trivial_completion @@ -2203,7 +2216,7 @@ To check that the host is reachable: self.node_exporter_service.prepare_create) @trivial_completion - def apply_node_exporter(self, spec) -> str: + def apply_node_exporter(self, spec: ServiceSpec) -> str: return self._apply(spec) @trivial_completion @@ -2213,7 +2226,7 @@ To check that the host is reachable: self.crash_service.prepare_create) @trivial_completion - def apply_crash(self, spec) -> str: + def apply_crash(self, spec: ServiceSpec) -> str: return self._apply(spec) @trivial_completion @@ -2251,10 +2264,10 @@ To check that the host is reachable: self.cephadm_exporter_service.prepare_create) @trivial_completion - def apply_cephadm_exporter(self, spec) -> str: + def apply_cephadm_exporter(self, spec: ServiceSpec) -> str: return self._apply(spec) - def _get_container_image_info(self, image_name) -> ContainerInspectInfo: + def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo: # pick a random host... host = None for host_name in self.inventory.keys(): @@ -2288,7 +2301,7 @@ To check that the host is reachable: raise OrchestratorError(msg) @trivial_completion - def upgrade_check(self, image, version) -> str: + def upgrade_check(self, image: str, version: str) -> str: if version: target_name = self.container_image_base + ':v' + version elif image: @@ -2298,7 +2311,7 @@ To check that the host is reachable: image_info = self._get_container_image_info(target_name) self.log.debug(f'image info {image} -> {image_info}') - r = { + r: dict = { 'target_name': target_name, 'target_id': image_info.image_id, 'target_version': image_info.ceph_version, @@ -2325,7 +2338,7 @@ To check that the host is reachable: return self.upgrade.upgrade_status() @trivial_completion - def upgrade_start(self, image, version) -> str: + def upgrade_start(self, image: str, version: str) -> str: return self.upgrade.upgrade_start(image, version) @trivial_completion @@ -2375,7 +2388,7 @@ To check that the host is reachable: return "Scheduled OSD(s) for removal" @trivial_completion - def stop_remove_osds(self, osd_ids: List[str]): + def stop_remove_osds(self, osd_ids: List[str]) -> str: """ Stops a `removal` process for a List of OSDs. This will revert their weight and remove it from the osds_to_remove queue @@ -2392,7 +2405,7 @@ To check that the host is reachable: return "Stopped OSD(s) removal" @trivial_completion - def remove_osds_status(self): + def remove_osds_status(self) -> List[OSD]: """ The CLI call to retrieve an osd removal report """ diff --git a/src/pybind/mgr/cephadm/services/iscsi.py b/src/pybind/mgr/cephadm/services/iscsi.py index a6e8f03cc0499..6c3514ce3482d 100644 --- a/src/pybind/mgr/cephadm/services/iscsi.py +++ b/src/pybind/mgr/cephadm/services/iscsi.py @@ -17,6 +17,7 @@ class IscsiService(CephService): def config(self, spec: IscsiServiceSpec) -> None: assert self.TYPE == spec.service_type + assert spec.pool self.mgr._check_pool_exists(spec.pool, spec.service_name()) logger.info('Saving service %s spec with placement %s' % ( diff --git a/src/pybind/mgr/cephadm/services/nfs.py b/src/pybind/mgr/cephadm/services/nfs.py index 3eaf50cac6892..0323b4110d5a4 100644 --- a/src/pybind/mgr/cephadm/services/nfs.py +++ b/src/pybind/mgr/cephadm/services/nfs.py @@ -20,6 +20,7 @@ class NFSService(CephService): def config(self, spec: NFSServiceSpec) -> None: assert self.TYPE == spec.service_type + assert spec.pool self.mgr._check_pool_exists(spec.pool, spec.service_name()) logger.info('Saving service %s spec with placement %s' % ( diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py index 23193fbb7074a..8dd49c1e414cb 100644 --- a/src/pybind/mgr/cephadm/services/osd.py +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -356,6 +356,8 @@ class RemoveUtil(object): if not osd.exists: continue + assert osd.fullname is not None + assert osd.hostname is not None self.mgr._remove_daemon(osd.fullname, osd.hostname) logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}") logger.debug(f"Removing {osd.osd_id} from the queue.") diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index b67853c86f692..7cc61269860b0 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -827,19 +827,19 @@ class TestCephadm(object): raise Exception("boom: connection is dead") else: conn.fuse = True - return '{}', None, 0 + return '{}', [], 0 with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]): with mock.patch("remoto.process.check", _check): with with_host(cephadm_module, 'test', refresh_hosts=False): code, out, err = cephadm_module.check_host('test') # First should succeed. - assert err is None + assert err is '' # On second it should attempt to reuse the connection, where the # connection is "down" so will recreate the connection. The old # code will blow up here triggering the BOOM! code, out, err = cephadm_module.check_host('test') - assert err is None + assert err is '' @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") @mock.patch("remoto.process.check") -- 2.39.5