]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: make type annotations for module.py mandatory
authorSebastian Wagner <sebastian.wagner@suse.com>
Fri, 20 Nov 2020 12:04:50 +0000 (13:04 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Wed, 9 Dec 2020 08:22:04 +0000 (09:22 +0100)
Fixing bogus json representation for registry-login

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/mypy.ini
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/services/iscsi.py
src/pybind/mgr/cephadm/services/nfs.py
src/pybind/mgr/cephadm/services/osd.py
src/pybind/mgr/cephadm/tests/test_cephadm.py

index 0a0737659bb3761bacce7d9bd1f5d5dcb4f72938..cf8a8b5f4e8931a7ede9f711ae49ccbd4e34c4b4 100755 (executable)
@@ -34,6 +34,9 @@ disallow_untyped_defs = True
 [mypy-cephadm.schedule]
 disallow_untyped_defs = True
 
+[mypy-cephadm.module]
+disallow_untyped_defs = True
+
 # Make cephadm and rook happy
 [mypy-OpenSSL]
 ignore_missing_imports = True
index 7bd854a09cdbacc04c9d166058d8c1d9c9bc3c42..223612e542332e6304b40112b3fd15e283c1faa7 100644 (file)
@@ -25,7 +25,7 @@ from ceph.deployment import inventory
 from ceph.deployment.drive_group import DriveGroupSpec
 from ceph.deployment.service_spec import \
     NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
-    CustomContainerSpec
+    CustomContainerSpec, HostPlacementSpec
 from cephadm.serve import CephadmServe
 from cephadm.services.cephadmservice import CephadmDaemonSpec
 
@@ -61,7 +61,7 @@ try:
     # (https://github.com/alfredodeza/remoto/pull/56) lands
     from distutils.version import StrictVersion
     if StrictVersion(remoto.__version__) <= StrictVersion('1.2'):
-        def remoto_has_connection(self):
+        def remoto_has_connection(self: Any) -> bool:
             return self.gateway.hasreceiver()
 
         from remoto.backends import BaseConnection
@@ -95,7 +95,7 @@ CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
 
 
 class CephadmCompletion(orchestrator.Completion[T]):
-    def evaluate(self):
+    def evaluate(self) -> None:
         self.finalize(None)
 
 
@@ -106,16 +106,16 @@ def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T
     """
 
     @wraps(f)
-    def wrapper(*args, **kwargs):
+    def wrapper(*args: Any, **kwargs: Any) -> CephadmCompletion:
         return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
 
     return wrapper
 
 
-def service_inactive(spec_name: str):
-    def inner(func):
+def service_inactive(spec_name: str) -> Callable:
+    def inner(func: Callable) -> Callable:
         @wraps(func)
-        def wrapper(*args, **kwargs):
+        def wrapper(*args: Any, **kwargs: Any) -> Any:
             obj = args[0]
             if obj.get_store(f"spec.{spec_name}") is not None:
                 return 1, "", f"Unable to change configuration of an active service {spec_name}"
@@ -294,7 +294,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         }
     ]
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, *args: Any, **kwargs: Any):
         super(CephadmOrchestrator, self).__init__(*args, **kwargs)
         self._cluster_fsid = self.get('mon_map')['fsid']
         self.last_monmap: Optional[datetime.datetime] = None
@@ -357,7 +357,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.upgrade = CephadmUpgrade(self)
 
-        self.health_checks = {}
+        self.health_checks: Dict[str, dict] = {}
 
         self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
 
@@ -423,9 +423,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.template = TemplateMgr(self)
 
-        self.requires_post_actions = set()
+        self.requires_post_actions: Set[str] = set()
 
-    def shutdown(self):
+    def shutdown(self) -> None:
         self.log.debug('shutdown')
         self._worker_pool.close()
         self._worker_pool.join()
@@ -436,22 +436,25 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
         return self.cephadm_services[service_type]
 
-    def _kick_serve_loop(self):
+    def _kick_serve_loop(self) -> None:
         self.log.debug('_kick_serve_loop')
         self.event.set()
 
     # function responsible for logging single host into custom registry
-    def _registry_login(self, host, url, username, password):
+    def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
         self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
         # want to pass info over stdin rather than through normal list of args
-        args_str = ("{\"url\": \"" + url + "\", \"username\": \"" + username + "\", "
-                    " \"password\": \"" + password + "\"}")
+        args_str = json.dumps({
+            'url': url,
+            'username': username,
+            'password': password,
+        })
         out, err, code = self._run_cephadm(
             host, 'mon', 'registry-login',
             ['--registry-json', '-'], stdin=args_str, error_ok=True)
         if code:
             return f"Host {host} failed to login to {url} as {username} with given password"
-        return
+        return None
 
     def serve(self) -> None:
         """
@@ -463,7 +466,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         serve = CephadmServe(self)
         serve.serve()
 
-    def set_container_image(self, entity: str, image):
+    def set_container_image(self, entity: str, image: str) -> None:
         self.check_mon_command({
             'prefix': 'config set',
             'name': 'container_image',
@@ -471,7 +474,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             'who': entity,
         })
 
-    def config_notify(self):
+    def config_notify(self) -> None:
         """
         This method is called whenever one of our config options is changed.
 
@@ -491,7 +494,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.event.set()
 
-    def notify(self, notify_type, notify_id):
+    def notify(self, notify_type: str, notify_id: Optional[str]) -> None:
         if notify_type == "mon_map":
             # get monmap mtime so we can refresh configs when mons change
             monmap = self.get('mon_map')
@@ -505,7 +508,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         if notify_type == "pg_summary":
             self._trigger_osd_removal()
 
-    def _trigger_osd_removal(self):
+    def _trigger_osd_removal(self) -> None:
         data = self.get("osd_stats")
         for osd in data.get('osd_stats', []):
             if osd.get('num_pgs') == 0:
@@ -517,7 +520,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
                     # start the process
                     self.rm_util.process_removal_queue()
 
-    def pause(self):
+    def pause(self) -> None:
         if not self.paused:
             self.log.info('Paused')
             self.set_store('pause', 'true')
@@ -525,7 +528,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             # wake loop so we update the health status
             self._kick_serve_loop()
 
-    def resume(self):
+    def resume(self) -> None:
         if self.paused:
             self.log.info('Resumed')
             self.paused = False
@@ -570,7 +573,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
                 continue
             return name
 
-    def _reconfig_ssh(self):
+    def _reconfig_ssh(self) -> None:
         temp_files = []  # type: list
         ssh_options = []  # type: List[str]
 
@@ -621,7 +624,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self._reset_cons()
 
-    def validate_ssh_config_content(self, ssh_config):
+    def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None:
         if ssh_config is None or len(ssh_config.strip()) == 0:
             raise OrchestratorValidationError('ssh_config cannot be empty')
         # StrictHostKeyChecking is [yes|no] ?
@@ -632,38 +635,38 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             if 'ask' in s.lower():
                 raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
 
-    def validate_ssh_config_fname(self, ssh_config_fname):
+    def validate_ssh_config_fname(self, ssh_config_fname: str) -> None:
         if not os.path.isfile(ssh_config_fname):
             raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
                 ssh_config_fname))
 
-    def _reset_con(self, host):
+    def _reset_con(self, host: str) -> None:
         conn, r = self._cons.get(host, (None, None))
         if conn:
             self.log.debug('_reset_con close %s' % host)
             conn.exit()
             del self._cons[host]
 
-    def _reset_cons(self):
+    def _reset_cons(self) -> None:
         for host, conn_and_r in self._cons.items():
             self.log.debug('_reset_cons close %s' % host)
             conn, r = conn_and_r
             conn.exit()
         self._cons = {}
 
-    def offline_hosts_remove(self, host):
+    def offline_hosts_remove(self, host: str) -> None:
         if host in self.offline_hosts:
             self.offline_hosts.remove(host)
 
     @staticmethod
-    def can_run():
+    def can_run() -> Tuple[bool, str]:
         if remoto is not None:
             return True, ""
         else:
             return False, "loading remoto library:{}".format(
                 remoto_import_error)
 
-    def available(self):
+    def available(self) -> Tuple[bool, str]:
         """
         The cephadm orchestrator is always available.
         """
@@ -674,7 +677,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
         return True, ''
 
-    def process(self, completions):
+    def process(self, completions: List[CephadmCompletion]) -> None:
         """
         Does nothing, as completions are processed in another thread.
         """
@@ -688,7 +691,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
     @orchestrator._cli_write_command(
         prefix='cephadm set-ssh-config',
         desc='Set the ssh_config file (use -i <ssh_config>)')
-    def _set_ssh_config(self, inbuf=None):
+    def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
         """
         Set an ssh_config file provided from stdin
         """
@@ -703,7 +706,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
     @orchestrator._cli_write_command(
         prefix='cephadm clear-ssh-config',
         desc='Clear the ssh_config file')
-    def _clear_ssh_config(self):
+    def _clear_ssh_config(self) -> Tuple[int, str, str]:
         """
         Clear the ssh_config file provided from stdin
         """
@@ -717,7 +720,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         prefix='cephadm get-ssh-config',
         desc='Returns the ssh config as used by cephadm'
     )
-    def _get_ssh_config(self):
+    def _get_ssh_config(self) -> HandleCommandResult:
         if self.ssh_config_file:
             self.validate_ssh_config_fname(self.ssh_config_file)
             with open(self.ssh_config_file) as f:
@@ -730,7 +733,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
     @orchestrator._cli_write_command(
         'cephadm generate-key',
         desc='Generate a cluster SSH key (if not present)')
-    def _generate_key(self):
+    def _generate_key(self) -> Tuple[int, str, str]:
         if not self.ssh_pub or not self.ssh_key:
             self.log.info('Generating ssh key...')
             tmp_dir = TemporaryDirectory()
@@ -758,7 +761,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
     @orchestrator._cli_write_command(
         'cephadm set-priv-key',
         desc='Set cluster SSH private key (use -i <private_key>)')
-    def _set_priv_key(self, inbuf=None):
+    def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
         if inbuf is None or len(inbuf) == 0:
             return -errno.EINVAL, "", "empty private ssh key provided"
         if inbuf == self.ssh_key:
@@ -771,7 +774,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
     @orchestrator._cli_write_command(
         'cephadm set-pub-key',
         desc='Set cluster SSH public key (use -i <public_key>)')
-    def _set_pub_key(self, inbuf=None):
+    def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
         if inbuf is None or len(inbuf) == 0:
             return -errno.EINVAL, "", "empty public ssh key provided"
         if inbuf == self.ssh_pub:
@@ -784,7 +787,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
     @orchestrator._cli_write_command(
         'cephadm clear-key',
         desc='Clear cluster SSH key')
-    def _clear_key(self):
+    def _clear_key(self) -> Tuple[int, str, str]:
         self.set_store('ssh_identity_key', None)
         self.set_store('ssh_identity_pub', None)
         self._reconfig_ssh()
@@ -794,7 +797,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
     @orchestrator._cli_read_command(
         'cephadm get-pub-key',
         desc='Show SSH public key for connecting to cluster hosts')
-    def _get_pub_key(self):
+    def _get_pub_key(self) -> Tuple[int, str, str]:
         if self.ssh_pub:
             return 0, self.ssh_pub, ''
         else:
@@ -803,14 +806,14 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
     @orchestrator._cli_read_command(
         'cephadm get-user',
         desc='Show user for SSHing to cluster hosts')
-    def _get_user(self):
+    def _get_user(self) -> Tuple[int, str, str]:
         return 0, self.ssh_user, ''
 
     @orchestrator._cli_read_command(
         'cephadm set-user',
         'name=user,type=CephString',
         'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
-    def set_ssh_user(self, user):
+    def set_ssh_user(self, user: str) -> Tuple[int, str, str]:
         current_user = self.ssh_user
         if user == current_user:
             return 0, "value unchanged", ""
@@ -838,12 +841,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         "name=username,type=CephString,req=false "
         "name=password,type=CephString,req=false",
         'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
-    def registry_login(self, url=None, username=None, password=None, inbuf=None):
+    def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
         # if password not given in command line, get it through file input
         if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
             return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
                                        "or -i <login credentials json file>")
         elif not (url and username and password):
+            assert isinstance(inbuf, str)
             login_info = json.loads(inbuf)
             if "url" in login_info and "username" in login_info and "password" in login_info:
                 url = login_info["url"]
@@ -881,7 +885,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         'name=host,type=CephString '
         'name=addr,type=CephString,req=false',
         'Check whether we can access and manage a remote host')
-    def check_host(self, host, addr=None):
+    def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
         try:
             out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host',
                                                ['--expect-hostname', host],
@@ -899,14 +903,14 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
                 if item.startswith('host %s ' % host):
                     self.event.set()
-        return 0, '%s (%s) ok' % (host, addr), err
+        return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
 
     @orchestrator._cli_read_command(
         'cephadm prepare-host',
         'name=host,type=CephString '
         'name=addr,type=CephString,req=false',
         'Prepare a remote host for use with cephadm')
-    def _prepare_host(self, host, addr=None):
+    def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
         out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host',
                                            ['--expect-hostname', host],
                                            addr=addr,
@@ -919,7 +923,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
                 if item.startswith('host %s ' % host):
                     self.event.set()
-        return 0, '%s (%s) ok' % (host, addr), err
+        return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
 
     @orchestrator._cli_write_command(
         prefix='cephadm set-extra-ceph-conf',
@@ -927,7 +931,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
              "Mainly a workaround, till `config generate-minimal-conf` generates\n"
              "a complete ceph.conf.\n\n"
              "Warning: this is a dangerous operation.")
-    def _set_extra_ceph_conf(self, inbuf=None) -> HandleCommandResult:
+    def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult:
         if inbuf:
             # sanity check.
             cp = ConfigParser()
@@ -966,12 +970,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         prefix='cephadm generate-exporter-config',
         desc='Generate default SSL crt/key and token for cephadm exporter daemons')
     @service_inactive('cephadm-exporter')
-    def _generate_exporter_config(self):
+    def _generate_exporter_config(self) -> Tuple[int, str, str]:
         self._set_exporter_defaults()
         self.log.info('Default settings created for cephadm exporter(s)')
         return 0, "", ""
 
-    def _set_exporter_defaults(self):
+    def _set_exporter_defaults(self) -> None:
         crt, key = self._generate_exporter_ssl()
         token = self._generate_exporter_token()
         self._set_exporter_config({
@@ -982,17 +986,17 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         })
         self._set_exporter_option('enabled', 'true')
 
-    def _generate_exporter_ssl(self):
+    def _generate_exporter_ssl(self) -> Tuple[str, str]:
         return create_self_signed_cert(dname={"O": "Ceph", "OU": "cephadm-exporter"})
 
-    def _generate_exporter_token(self):
+    def _generate_exporter_token(self) -> str:
         return secrets.token_hex(32)
 
     @orchestrator._cli_write_command(
         prefix='cephadm clear-exporter-config',
         desc='Clear the SSL configuration used by cephadm exporter daemons')
     @service_inactive('cephadm-exporter')
-    def _clear_exporter_config(self):
+    def _clear_exporter_config(self) -> Tuple[int, str, str]:
         self._clear_exporter_config_settings()
         self.log.info('Cleared cephadm exporter configuration')
         return 0, "", ""
@@ -1005,7 +1009,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         prefix='cephadm set-exporter-config',
         desc='Set custom cephadm-exporter configuration from a json file (-i <file>). JSON must contain crt, key, token and port')
     @service_inactive('cephadm-exporter')
-    def _store_exporter_config(self, inbuf=None):
+    def _store_exporter_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
 
         if not inbuf:
             return 1, "", "JSON configuration has not been provided (-i <filename>)"
@@ -1031,7 +1035,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
     @orchestrator._cli_read_command(
         'cephadm get-exporter-config',
         desc='Show the current cephadm-exporter configuraion (JSON)')
-    def _show_exporter_config(self):
+    def _show_exporter_config(self) -> Tuple[int, str, str]:
         cfg = self._get_exporter_config()
         return 0, json.dumps(cfg, indent=2), ""
 
@@ -1057,7 +1061,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             return False
         return conf.last_modified > dt
 
-    def _get_connection(self, host: str):
+    def _get_connection(self, host: str) -> Tuple['remoto.backends.BaseConnection',
+                                                  'remoto.backends.LegacyModuleExecute']:
         """
         Setup a connection for running commands on remote host.
         """
@@ -1084,7 +1089,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         return conn, r
 
-    def _executable_path(self, conn, executable):
+    def _executable_path(self, conn: 'remoto.backends.BaseConnection', executable: str) -> str:
         """
         Remote validator that accepts a connection object to ensure that a certain
         executable is available returning its full path if so.
@@ -1331,7 +1336,7 @@ To check that the host is reachable:
         return "Removed host '{}'".format(host)
 
     @trivial_completion
-    def update_host_addr(self, host, addr) -> str:
+    def update_host_addr(self, host: str, addr: str) -> str:
         self.inventory.set_addr(host, addr)
         self._reset_con(host)
         self.event.set()  # refresh stray health check
@@ -1350,19 +1355,19 @@ To check that the host is reachable:
         return list(self.inventory.all_specs())
 
     @trivial_completion
-    def add_host_label(self, host, label) -> str:
+    def add_host_label(self, host: str, label: str) -> str:
         self.inventory.add_label(host, label)
         self.log.info('Added label %s to host %s' % (label, host))
         return 'Added label %s to host %s' % (label, host)
 
     @trivial_completion
-    def remove_host_label(self, host, label) -> str:
+    def remove_host_label(self, host: str, label: str) -> str:
         self.inventory.rm_label(host, label)
         self.log.info('Removed label %s to host %s' % (label, host))
         return 'Removed label %s from host %s' % (label, host)
 
     @trivial_completion
-    def host_ok_to_stop(self, hostname: str):
+    def host_ok_to_stop(self, hostname: str) -> str:
         if hostname not in self.cache.get_hosts():
             raise OrchestratorError(f'Cannot find host "{hostname}"')
 
@@ -1393,7 +1398,7 @@ To check that the host is reachable:
             config += '\n\n' + extra.strip() + '\n'
         return config
 
-    def _invalidate_daemons_and_kick_serve(self, filter_host=None):
+    def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None:
         if filter_host:
             self.cache.invalidate_host_daemons(filter_host)
         else:
@@ -1518,7 +1523,7 @@ To check that the host is reachable:
         return result
 
     @trivial_completion
-    def service_action(self, action, service_name) -> List[str]:
+    def service_action(self, action: str, service_name: str) -> List[str]:
         args = []
         for host, dm in self.cache.daemons.items():
             for name, d in dm.items():
@@ -1529,14 +1534,14 @@ To check that the host is reachable:
         return self._daemon_actions(args)
 
     @forall_hosts
-    def _daemon_actions(self, daemon_type, daemon_id, host, action) -> str:
+    def _daemon_actions(self, daemon_type: str, daemon_id: str, host: str, action: str) -> str:
         with set_exception_subject('daemon', DaemonDescription(
             daemon_type=daemon_type,
             daemon_id=daemon_id
         ).name()):
             return self._daemon_action(daemon_type, daemon_id, host, action)
 
-    def _daemon_action(self, daemon_type, daemon_id, host, action, image=None) -> str:
+    def _daemon_action(self, daemon_type: str, daemon_id: str, host: str, action: str, image: Optional[str] = None) -> str:
         daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
             host=host,
             daemon_id=daemon_id,
@@ -1572,7 +1577,7 @@ To check that the host is reachable:
         self.events.for_daemon(name, 'INFO', msg)
         return msg
 
-    def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str):
+    def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None:
         if image is not None:
             if action != 'redeploy':
                 raise OrchestratorError(
@@ -1606,7 +1611,7 @@ To check that the host is reachable:
     def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
         return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
 
-    def _schedule_daemon_action(self, daemon_name: str, action: str):
+    def _schedule_daemon_action(self, daemon_name: str, action: str) -> str:
         dd = self.cache.get_daemon(daemon_name)
         if action == 'redeploy' and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
                 and not self.mgr_service.mgr_map_has_standby():
@@ -1631,7 +1636,7 @@ To check that the host is reachable:
         return self._remove_daemons(args)
 
     @trivial_completion
-    def remove_service(self, service_name) -> str:
+    def remove_service(self, service_name: str) -> str:
         self.log.info('Remove service %s' % service_name)
         self._trigger_preview_refresh(service_name=service_name)
         found = self.spec_store.rm(service_name)
@@ -1646,7 +1651,7 @@ To check that the host is reachable:
             return f'Failed to remove service. <{service_name}> was not found.'
 
     @trivial_completion
-    def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh=False) -> List[orchestrator.InventoryHost]:
+    def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
         """
         Return the storage inventory of hosts matching the given filter.
 
@@ -1675,7 +1680,7 @@ To check that the host is reachable:
         return result
 
     @trivial_completion
-    def zap_device(self, host, path) -> str:
+    def zap_device(self, host: str, path: str) -> str:
         self.log.info('Zap device %s:%s' % (host, path))
         out, err, code = self._run_cephadm(
             host, 'osd', 'ceph-volume',
@@ -1701,7 +1706,7 @@ To check that the host is reachable:
         See templates/blink_device_light_cmd.j2
         """
         @forall_hosts
-        def blink(host, dev, path):
+        def blink(host: str, dev: str, path: str) -> str:
             cmd_line = self.template.render('blink_device_light_cmd.j2',
                                             {
                                                 'on': on,
@@ -1780,7 +1785,7 @@ To check that the host is reachable:
 
     def _preview_osdspecs(self,
                           osdspecs: Optional[List[DriveGroupSpec]] = None
-                          ):
+                          ) -> dict:
         if not osdspecs:
             return {'n/a': [{'error': True,
                              'message': 'No OSDSpec or matching hosts found.'}]}
@@ -1807,7 +1812,7 @@ To check that the host is reachable:
             previews_for_specs.update({host: osd_reports})
         return previews_for_specs
 
-    def _calc_daemon_deps(self, daemon_type, daemon_id):
+    def _calc_daemon_deps(self, daemon_type: str, daemon_id: str) -> List[str]:
         need = {
             'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
             'grafana': ['prometheus'],
@@ -1821,7 +1826,7 @@ To check that the host is reachable:
 
     def _create_daemon(self,
                        daemon_spec: CephadmDaemonSpec,
-                       reconfig=False,
+                       reconfig: bool = False,
                        osd_uuid_map: Optional[Dict[str, Any]] = None,
                        ) -> str:
 
@@ -1939,10 +1944,10 @@ To check that the host is reachable:
         return code == 0
 
     @forall_hosts
-    def _remove_daemons(self, name, host) -> str:
+    def _remove_daemons(self, name: str, host: str) -> str:
         return self._remove_daemon(name, host)
 
-    def _remove_daemon(self, name, host) -> str:
+    def _remove_daemon(self, name: str, host: str) -> str:
         """
         Remove a daemon
         """
@@ -1969,14 +1974,17 @@ To check that the host is reachable:
 
             return "Removed {} from host '{}'".format(name, host)
 
-    def _check_pool_exists(self, pool, service_name):
+    def _check_pool_exists(self, pool: str, service_name: str) -> None:
         logger.info(f'Checking pool "{pool}" exists for service {service_name}')
         if not self.rados.pool_exists(pool):
             raise OrchestratorError(f'Cannot find pool "{pool}" for '
                                     f'service {service_name}')
 
-    def _add_daemon(self, daemon_type, spec,
-                    create_func: Callable[..., CephadmDaemonSpec], config_func=None) -> List[str]:
+    def _add_daemon(self,
+                    daemon_type: str,
+                    spec: ServiceSpec,
+                    create_func: Callable[..., CephadmDaemonSpec],
+                    config_func: Optional[Callable] = None) -> List[str]:
         """
         Add (and place) a daemon. Require explicit host placement.  Do not
         schedule, and do not apply the related scheduling limitations.
@@ -1990,9 +1998,14 @@ To check that the host is reachable:
                                     spec.placement.hosts, count,
                                     create_func, config_func)
 
-    def _create_daemons(self, daemon_type, spec, daemons,
-                        hosts, count,
-                        create_func: Callable[..., CephadmDaemonSpec], config_func=None) -> List[str]:
+    def _create_daemons(self,
+                        daemon_type: str,
+                        spec: ServiceSpec,
+                        daemons: List[DaemonDescription],
+                        hosts: List[HostPlacementSpec],
+                        count: int,
+                        create_func: Callable[..., CephadmDaemonSpec],
+                        config_func: Optional[Callable] = None) -> List[str]:
         if count > len(hosts):
             raise OrchestratorError('too few hosts: want %d, have %s' % (
                 count, hosts))
@@ -2027,14 +2040,14 @@ To check that the host is reachable:
             daemons.append(sd)
 
         @forall_hosts
-        def create_func_map(*args):
+        def create_func_map(*args: Any) -> str:
             daemon_spec = create_func(*args)
             return self._create_daemon(daemon_spec)
 
         return create_func_map(args)
 
     @trivial_completion
-    def apply_mon(self, spec) -> str:
+    def apply_mon(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
     @trivial_completion
@@ -2058,7 +2071,7 @@ To check that the host is reachable:
 
         return self._apply_service_spec(cast(ServiceSpec, spec))
 
-    def _plan(self, spec: ServiceSpec):
+    def _plan(self, spec: ServiceSpec) -> dict:
         if spec.service_type == 'osd':
             return {'service_name': spec.service_name(),
                     'service_type': spec.service_type,
@@ -2140,7 +2153,7 @@ To check that the host is reachable:
         return results
 
     @trivial_completion
-    def apply_mgr(self, spec) -> str:
+    def apply_mgr(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
     @trivial_completion
@@ -2152,11 +2165,11 @@ To check that the host is reachable:
         return self._apply(spec)
 
     @trivial_completion
-    def add_rgw(self, spec) -> List[str]:
+    def add_rgw(self, spec: ServiceSpec) -> List[str]:
         return self._add_daemon('rgw', spec, self.rgw_service.prepare_create, self.rgw_service.config)
 
     @trivial_completion
-    def apply_rgw(self, spec) -> str:
+    def apply_rgw(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
     @trivial_completion
@@ -2165,23 +2178,23 @@ To check that the host is reachable:
         return self._add_daemon('iscsi', spec, self.iscsi_service.prepare_create, self.iscsi_service.config)
 
     @trivial_completion
-    def apply_iscsi(self, spec) -> str:
+    def apply_iscsi(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
     @trivial_completion
-    def add_rbd_mirror(self, spec) -> List[str]:
+    def add_rbd_mirror(self, spec: ServiceSpec) -> List[str]:
         return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.prepare_create)
 
     @trivial_completion
-    def apply_rbd_mirror(self, spec) -> str:
+    def apply_rbd_mirror(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
     @trivial_completion
-    def add_nfs(self, spec) -> List[str]:
+    def add_nfs(self, spec: ServiceSpec) -> List[str]:
         return self._add_daemon('nfs', spec, self.nfs_service.prepare_create, self.nfs_service.config)
 
     @trivial_completion
-    def apply_nfs(self, spec) -> str:
+    def apply_nfs(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
     def _get_dashboard_url(self):
@@ -2189,11 +2202,11 @@ To check that the host is reachable:
         return self.get('mgr_map').get('services', {}).get('dashboard', '')
 
     @trivial_completion
-    def add_prometheus(self, spec) -> List[str]:
+    def add_prometheus(self, spec: ServiceSpec) -> List[str]:
         return self._add_daemon('prometheus', spec, self.prometheus_service.prepare_create)
 
     @trivial_completion
-    def apply_prometheus(self, spec) -> str:
+    def apply_prometheus(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
     @trivial_completion
@@ -2203,7 +2216,7 @@ To check that the host is reachable:
                                 self.node_exporter_service.prepare_create)
 
     @trivial_completion
-    def apply_node_exporter(self, spec) -> str:
+    def apply_node_exporter(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
     @trivial_completion
@@ -2213,7 +2226,7 @@ To check that the host is reachable:
                                 self.crash_service.prepare_create)
 
     @trivial_completion
-    def apply_crash(self, spec) -> str:
+    def apply_crash(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
     @trivial_completion
@@ -2251,10 +2264,10 @@ To check that the host is reachable:
                                 self.cephadm_exporter_service.prepare_create)
 
     @trivial_completion
-    def apply_cephadm_exporter(self, spec) -> str:
+    def apply_cephadm_exporter(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    def _get_container_image_info(self, image_name) -> ContainerInspectInfo:
+    def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
         # pick a random host...
         host = None
         for host_name in self.inventory.keys():
@@ -2288,7 +2301,7 @@ To check that the host is reachable:
             raise OrchestratorError(msg)
 
     @trivial_completion
-    def upgrade_check(self, image, version) -> str:
+    def upgrade_check(self, image: str, version: str) -> str:
         if version:
             target_name = self.container_image_base + ':v' + version
         elif image:
@@ -2298,7 +2311,7 @@ To check that the host is reachable:
 
         image_info = self._get_container_image_info(target_name)
         self.log.debug(f'image info {image} -> {image_info}')
-        r = {
+        r: dict = {
             'target_name': target_name,
             'target_id': image_info.image_id,
             'target_version': image_info.ceph_version,
@@ -2325,7 +2338,7 @@ To check that the host is reachable:
         return self.upgrade.upgrade_status()
 
     @trivial_completion
-    def upgrade_start(self, image, version) -> str:
+    def upgrade_start(self, image: str, version: str) -> str:
         return self.upgrade.upgrade_start(image, version)
 
     @trivial_completion
@@ -2375,7 +2388,7 @@ To check that the host is reachable:
         return "Scheduled OSD(s) for removal"
 
     @trivial_completion
-    def stop_remove_osds(self, osd_ids: List[str]):
+    def stop_remove_osds(self, osd_ids: List[str]) -> str:
         """
         Stops a `removal` process for a List of OSDs.
         This will revert their weight and remove it from the osds_to_remove queue
@@ -2392,7 +2405,7 @@ To check that the host is reachable:
         return "Stopped OSD(s) removal"
 
     @trivial_completion
-    def remove_osds_status(self):
+    def remove_osds_status(self) -> List[OSD]:
         """
         The CLI call to retrieve an osd removal report
         """
index a6e8f03cc04992ec8eaa1333fa78beab3d376164..6c3514ce3482d428b5f94ebdb8421cca8649167d 100644 (file)
@@ -17,6 +17,7 @@ class IscsiService(CephService):
 
     def config(self, spec: IscsiServiceSpec) -> None:
         assert self.TYPE == spec.service_type
+        assert spec.pool
         self.mgr._check_pool_exists(spec.pool, spec.service_name())
 
         logger.info('Saving service %s spec with placement %s' % (
index 3eaf50cac6892804c94b3cdc279a5edeff206d8d..0323b4110d5a4ea279658400c9fec5446f271c72 100644 (file)
@@ -20,6 +20,7 @@ class NFSService(CephService):
 
     def config(self, spec: NFSServiceSpec) -> None:
         assert self.TYPE == spec.service_type
+        assert spec.pool
         self.mgr._check_pool_exists(spec.pool, spec.service_name())
 
         logger.info('Saving service %s spec with placement %s' % (
index 23193fbb7074a4b020ca9acd4a49c765df439445..8dd49c1e414cb220ae43bcc1a017939f841d11b6 100644 (file)
@@ -356,6 +356,8 @@ class RemoveUtil(object):
 
             if not osd.exists:
                 continue
+            assert osd.fullname is not None
+            assert osd.hostname is not None
             self.mgr._remove_daemon(osd.fullname, osd.hostname)
             logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
             logger.debug(f"Removing {osd.osd_id} from the queue.")
index b67853c86f6923c300c6f74be7d68e7c76d4fd8b..7cc61269860b0729101493d4c3cd10279a40d0ab 100644 (file)
@@ -827,19 +827,19 @@ class TestCephadm(object):
                 raise Exception("boom: connection is dead")
             else:
                 conn.fuse = True
-            return '{}', None, 0
+            return '{}', [], 0
         with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]):
             with mock.patch("remoto.process.check", _check):
                 with with_host(cephadm_module, 'test', refresh_hosts=False):
                     code, out, err = cephadm_module.check_host('test')
                     # First should succeed.
-                    assert err is None
+                    assert err is ''
 
                     # On second it should attempt to reuse the connection, where the
                     # connection is "down" so will recreate the connection. The old
                     # code will blow up here triggering the BOOM!
                     code, out, err = cephadm_module.check_host('test')
-                    assert err is None
+                    assert err is ''
 
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
     @mock.patch("remoto.process.check")