]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: make OSDService.create_from_spec_one async
authorSebastian Wagner <sewagner@redhat.com>
Wed, 10 Nov 2021 17:01:37 +0000 (18:01 +0100)
committerSebastian Wagner <sewagner@redhat.com>
Thu, 18 Nov 2021 15:19:42 +0000 (16:19 +0100)
And gather all results. Plus a lot of mechanical adjustments

Signed-off-by: Sebastian Wagner <sewagner@redhat.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/osd.py
src/pybind/mgr/cephadm/tests/fixtures.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/cephadm/tests/test_ssh.py
src/pybind/mgr/cephadm/upgrade.py

index ba28fa6e39bec5574d308802de50e06bf6ffb61e..db7568a3e980fead33035601414d80c473cf773e 100644 (file)
@@ -11,7 +11,7 @@ from threading import Event
 
 import string
 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
-    Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, Coroutine, Awaitable
+    Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, Awaitable
 
 import datetime
 import os
@@ -966,7 +966,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             break
         if not host:
             raise OrchestratorError('no hosts defined')
-        r = CephadmServe(self)._registry_login(host, url, username, password)
+        r = self.wait_async(CephadmServe(self)._registry_login(host, url, username, password))
         if r is not None:
             return 1, '', r
         # if logins succeeded, store info
@@ -982,10 +982,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
     def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
         """Check whether we can access and manage a remote host"""
         try:
-            out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host',
-                                                             ['--expect-hostname', host],
-                                                             addr=addr,
-                                                             error_ok=True, no_fsid=True)
+            out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host',
+                                                                             ['--expect-hostname', host],
+                                                                             addr=addr,
+                                                                             error_ok=True, no_fsid=True))
             if code:
                 return 1, '', ('check-host failed:\n' + '\n'.join(err))
         except OrchestratorError:
@@ -1004,10 +1004,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         'cephadm prepare-host')
     def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
         """Prepare a remote host for use with cephadm"""
-        out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host',
-                                                         ['--expect-hostname', host],
-                                                         addr=addr,
-                                                         error_ok=True, no_fsid=True)
+        out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host',
+                                                                         ['--expect-hostname', host],
+                                                                         addr=addr,
+                                                                         error_ok=True, no_fsid=True))
         if code:
             return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
         # if we have an outstanding health alert for this host, give the
@@ -1179,7 +1179,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         @forall_hosts
         def run(h: str) -> str:
-            return self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd')
+            return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd'))
 
         return HandleCommandResult(stdout='\n'.join(run(host)))
 
@@ -1314,11 +1314,11 @@ Then run the following:
 > ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}'''
             raise OrchestratorError(msg)
 
-        out, err, code = CephadmServe(self)._run_cephadm(
+        out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
             host, cephadmNoImage, 'check-host',
             ['--expect-hostname', host],
             addr=addr,
-            error_ok=True, no_fsid=True)
+            error_ok=True, no_fsid=True))
         if code:
             msg = 'check-host failed:\n' + '\n'.join(err)
             # err will contain stdout and stderr, so we filter on the message text to
@@ -1585,9 +1585,9 @@ Then run the following:
                     msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc)
 
             # call the host-maintenance function
-            _out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
-                                                                ["enter"],
-                                                                error_ok=True)
+            _out, _err, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
+                                                                                ["enter"],
+                                                                                error_ok=True))
             returned_msg = _err[0].split('\n')[-1]
             if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
                 raise OrchestratorError(
@@ -1636,9 +1636,9 @@ Then run the following:
         if tgt_host['status'] != "maintenance":
             raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
 
-        outs, errs, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
-                                                            ['exit'],
-                                                            error_ok=True)
+        outs, errs, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
+                                                                            ['exit'],
+                                                                            error_ok=True))
         returned_msg = errs[0].split('\n')[-1]
         if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
             raise OrchestratorError(
@@ -1824,7 +1824,7 @@ Then run the following:
             if daemon_spec.daemon_type != 'osd':
                 daemon_spec = self.cephadm_services[daemon_type_to_service(
                     daemon_spec.daemon_type)].prepare_create(daemon_spec)
-            return CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig'))
+            return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')))
 
         actions = {
             'start': ['reset-failed', 'start'],
@@ -1834,9 +1834,9 @@ Then run the following:
         name = daemon_spec.name()
         for a in actions[action]:
             try:
-                out, err, code = CephadmServe(self)._run_cephadm(
+                out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
                     daemon_spec.host, name, 'unit',
-                    ['--name', name, a])
+                    ['--name', name, a]))
             except Exception:
                 self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed')
         self.cache.invalidate_host_daemons(daemon_spec.host)
@@ -1935,7 +1935,8 @@ Then run the following:
                 msg = ''
                 for h, ls in osds_msg.items():
                     msg += f'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}'
-                raise OrchestratorError(f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
+                raise OrchestratorError(
+                    f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
 
         found = self.spec_store.rm(service_name)
         if found and service_name.startswith('osd.'):
@@ -2039,10 +2040,10 @@ Then run the following:
                     f"OSD{'s' if len(active_osds) > 1 else ''}"
                     f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.")
 
-        out, err, code = CephadmServe(self)._run_cephadm(
+        out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
             host, 'osd', 'ceph-volume',
             ['--', 'lvm', 'zap', '--destroy', path],
-            error_ok=True)
+            error_ok=True))
 
         self.cache.invalidate_host_devices(host)
         self.cache.invalidate_host_networks(host)
@@ -2079,9 +2080,9 @@ Then run the following:
                                             host=host)
             cmd_args = shlex.split(cmd_line)
 
-            out, err, code = CephadmServe(self)._run_cephadm(
+            out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
                 host, 'osd', 'shell', ['--'] + cmd_args,
-                error_ok=True)
+                error_ok=True))
             if code:
                 raise OrchestratorError(
                     'Unable to affect %s light for %s:%s. Command: %s' % (
@@ -2301,7 +2302,7 @@ Then run the following:
         @ forall_hosts
         def create_func_map(*args: Any) -> str:
             daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args)
-            return CephadmServe(self)._create_daemon(daemon_spec)
+            return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec))
 
         return create_func_map(args)
 
@@ -2527,7 +2528,7 @@ Then run the following:
         else:
             raise OrchestratorError('must specify either image or version')
 
-        image_info = CephadmServe(self)._get_container_image_info(target_name)
+        image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name))
 
         ceph_image_version = image_info.ceph_version
         if not ceph_image_version:
index 80de87d404ceb4496dc05b0f8cbb948748e6f327..f4d0b4384e6fd309524b559d39f904bd13754808 100644 (file)
@@ -298,8 +298,8 @@ class CephadmServe:
 
             if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
                 self.log.debug(f"Logging `{host}` into custom registry")
-                r = self._registry_login(host, self.mgr.registry_url,
-                                         self.mgr.registry_username, self.mgr.registry_password)
+                r = self.mgr.wait_async(self._registry_login(host, self.mgr.registry_url,
+                                                             self.mgr.registry_username, self.mgr.registry_password))
                 if r:
                     bad_hosts.append(r)
 
@@ -364,9 +364,9 @@ class CephadmServe:
         self.log.debug(' checking %s' % host)
         try:
             addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
-            out, err, code = self._run_cephadm(
+            out, err, code = self.mgr.wait_async(self._run_cephadm(
                 host, cephadmNoImage, 'check-host', [],
-                error_ok=True, no_fsid=True)
+                error_ok=True, no_fsid=True))
             self.mgr.cache.update_last_host_check(host)
             self.mgr.cache.save_host(host)
             if code:
@@ -382,7 +382,7 @@ class CephadmServe:
 
     def _refresh_host_daemons(self, host: str) -> Optional[str]:
         try:
-            ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True)
+            ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True))
         except OrchestratorError as e:
             return str(e)
         self.mgr._process_ls_output(host, ls)
@@ -390,7 +390,8 @@ class CephadmServe:
 
     def _refresh_facts(self, host: str) -> Optional[str]:
         try:
-            val = self._run_cephadm_json(host, cephadmNoImage, 'gather-facts', [], no_fsid=True)
+            val = self.mgr.wait_async(self._run_cephadm_json(
+                host, cephadmNoImage, 'gather-facts', [], no_fsid=True))
         except OrchestratorError as e:
             return str(e)
 
@@ -408,14 +409,14 @@ class CephadmServe:
 
         try:
             try:
-                devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
-                                                 inventory_args)
+                devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
+                                                                     inventory_args))
             except OrchestratorError as e:
                 if 'unrecognized arguments: --filter-for-batch' in str(e):
                     rerun_args = inventory_args.copy()
                     rerun_args.remove('--filter-for-batch')
-                    devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
-                                                     rerun_args)
+                    devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
+                                                                         rerun_args))
                 else:
                     raise
 
@@ -432,7 +433,8 @@ class CephadmServe:
 
     def _refresh_host_networks(self, host: str) -> Optional[str]:
         try:
-            networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True)
+            networks = self.mgr.wait_async(self._run_cephadm_json(
+                host, 'mon', 'list-networks', [], no_fsid=True))
         except OrchestratorError as e:
             return str(e)
 
@@ -833,7 +835,7 @@ class CephadmServe:
 
                 try:
                     daemon_spec = svc.prepare_create(daemon_spec)
-                    self._create_daemon(daemon_spec)
+                    self.mgr.wait_async(self._create_daemon(daemon_spec))
                     r = True
                     progress_done += 1
                     update_progress()
@@ -1017,7 +1019,8 @@ class CephadmServe:
         digests: Dict[str, ContainerInspectInfo] = {}
         for container_image_ref in set(settings.values()):
             if not is_repo_digest(container_image_ref):
-                image_info = self._get_container_image_info(container_image_ref)
+                image_info = self.mgr.wait_async(
+                    self._get_container_image_info(container_image_ref))
                 if image_info.repo_digests:
                     # FIXME: we assume the first digest here is the best
                     assert is_repo_digest(image_info.repo_digests[0]), image_info
@@ -1030,11 +1033,11 @@ class CephadmServe:
                     # FIXME: we assume the first digest here is the best
                     self.mgr.set_container_image(entity, image_info.repo_digests[0])
 
-    def _create_daemon(self,
-                       daemon_spec: CephadmDaemonDeploySpec,
-                       reconfig: bool = False,
-                       osd_uuid_map: Optional[Dict[str, Any]] = None,
-                       ) -> str:
+    async def _create_daemon(self,
+                             daemon_spec: CephadmDaemonDeploySpec,
+                             reconfig: bool = False,
+                             osd_uuid_map: Optional[Dict[str, Any]] = None,
+                             ) -> str:
 
         with set_exception_subject('service', orchestrator.DaemonDescription(
                 daemon_type=daemon_spec.daemon_type,
@@ -1075,14 +1078,14 @@ class CephadmServe:
                     daemon_spec.extra_args.append('--allow-ptrace')
 
                 if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
-                    self._registry_login(daemon_spec.host, self.mgr.registry_url,
-                                         self.mgr.registry_username, self.mgr.registry_password)
+                    await self._registry_login(daemon_spec.host, self.mgr.registry_url,
+                                               self.mgr.registry_username, self.mgr.registry_password)
 
                 self.log.info('%s daemon %s on %s' % (
                     'Reconfiguring' if reconfig else 'Deploying',
                     daemon_spec.name(), daemon_spec.host))
 
-                out, err, code = self._run_cephadm(
+                out, err, code = await self._run_cephadm(
                     daemon_spec.host, daemon_spec.name(), 'deploy',
                     [
                         '--name', daemon_spec.name(),
@@ -1154,8 +1157,8 @@ class CephadmServe:
             # we can delete a mon instances data.
             args = ['--name', name, '--force']
             self.log.info('Removing daemon %s from %s' % (name, host))
-            out, err, code = self._run_cephadm(
-                host, name, 'rm-daemon', args)
+            out, err, code = self.mgr.wait_async(self._run_cephadm(
+                host, name, 'rm-daemon', args))
             if not code:
                 # remove item from cache
                 self.mgr.cache.rm_daemon(host, name)
@@ -1167,16 +1170,16 @@ class CephadmServe:
 
             return "Removed {} from host '{}'".format(name, host)
 
-    def _run_cephadm_json(self,
-                          host: str,
-                          entity: Union[CephadmNoImage, str],
-                          command: str,
-                          args: List[str],
-                          no_fsid: Optional[bool] = False,
-                          image: Optional[str] = "",
-                          ) -> Any:
+    async def _run_cephadm_json(self,
+                                host: str,
+                                entity: Union[CephadmNoImage, str],
+                                command: str,
+                                args: List[str],
+                                no_fsid: Optional[bool] = False,
+                                image: Optional[str] = "",
+                                ) -> Any:
         try:
-            out, err, code = self._run_cephadm(
+            out, err, code = await self._run_cephadm(
                 host, entity, command, args, no_fsid=no_fsid, image=image)
             if code:
                 raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
@@ -1189,18 +1192,18 @@ class CephadmServe:
             self.log.exception(f'{msg}: {"".join(out)}')
             raise OrchestratorError(msg)
 
-    def _run_cephadm(self,
-                     host: str,
-                     entity: Union[CephadmNoImage, str],
-                     command: str,
-                     args: List[str],
-                     addr: Optional[str] = "",
-                     stdin: Optional[str] = "",
-                     no_fsid: Optional[bool] = False,
-                     error_ok: Optional[bool] = False,
-                     image: Optional[str] = "",
-                     env_vars: Optional[List[str]] = None,
-                     ) -> Tuple[List[str], List[str], int]:
+    async def _run_cephadm(self,
+                           host: str,
+                           entity: Union[CephadmNoImage, str],
+                           command: str,
+                           args: List[str],
+                           addr: Optional[str] = "",
+                           stdin: Optional[str] = "",
+                           no_fsid: Optional[bool] = False,
+                           error_ok: Optional[bool] = False,
+                           image: Optional[str] = "",
+                           env_vars: Optional[List[str]] = None,
+                           ) -> Tuple[List[str], List[str], int]:
         """
         Run cephadm on the remote host with the given command + args
 
@@ -1209,7 +1212,7 @@ class CephadmServe:
         :env_vars: in format -> [KEY=VALUE, ..]
         """
 
-        self.mgr.ssh.remote_connection(host, addr)
+        await self.mgr.ssh._remote_connection(host, addr)
 
         self.log.debug(f"_run_cephadm : command = {command}")
         self.log.debug(f"_run_cephadm : args = {args}")
@@ -1253,22 +1256,22 @@ class CephadmServe:
                 self.log.debug('stdin: %s' % stdin)
 
             cmd = ['which', 'python3']
-            python = self.mgr.ssh.check_execute_command(host, cmd, addr=addr)
+            python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr)
             cmd = [python, self.mgr.cephadm_binary_path] + final_args
 
             try:
-                out, err, code = self.mgr.ssh.execute_command(
+                out, err, code = await self.mgr.ssh._execute_command(
                     host, cmd, stdin=stdin, addr=addr)
                 if code == 2:
                     ls_cmd = ['ls', self.mgr.cephadm_binary_path]
-                    out_ls, err_ls, code_ls = self.mgr.ssh.execute_command(host, ls_cmd, addr=addr)
+                    out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr)
                     if code_ls == 2:
-                        self._deploy_cephadm_binary(host, addr)
-                        out, err, code = self.mgr.ssh.execute_command(
+                        await self._deploy_cephadm_binary(host, addr)
+                        out, err, code = await self.mgr.ssh._execute_command(
                             host, cmd, stdin=stdin, addr=addr)
 
             except Exception as e:
-                self.mgr.ssh._reset_con(host)
+                await self.mgr.ssh._reset_con(host)
                 if error_ok:
                     return [], [str(e)], 1
                 raise
@@ -1276,10 +1279,10 @@ class CephadmServe:
         elif self.mgr.mode == 'cephadm-package':
             try:
                 cmd = ['/usr/bin/cephadm'] + final_args
-                out, err, code = self.mgr.ssh.execute_command(
+                out, err, code = await self.mgr.ssh._execute_command(
                     host, cmd, stdin=stdin, addr=addr)
             except Exception as e:
-                self.mgr.ssh._reset_con(host)
+                await self.mgr.ssh._reset_con(host)
                 if error_ok:
                     return [], [str(e)], 1
                 raise
@@ -1296,7 +1299,7 @@ class CephadmServe:
                 f'cephadm exited with an error code: {code}, stderr: {err}')
         return [out], [err], code
 
-    def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
+    async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
         # pick a random host...
         host = None
         for host_name in self.mgr.inventory.keys():
@@ -1305,14 +1308,14 @@ class CephadmServe:
         if not host:
             raise OrchestratorError('no hosts defined')
         if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
-            self._registry_login(host, self.mgr.registry_url,
-                                 self.mgr.registry_username, self.mgr.registry_password)
+            await self._registry_login(host, self.mgr.registry_url,
+                                       self.mgr.registry_username, self.mgr.registry_password)
 
         pullargs: List[str] = []
         if self.mgr.registry_insecure:
             pullargs.append("--insecure")
 
-        j = self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True)
+        j = await self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True)
 
         r = ContainerInspectInfo(
             j['image_id'],
@@ -1323,7 +1326,7 @@ class CephadmServe:
         return r
 
     # function responsible for logging single host into custom registry
-    def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
+    async def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
         self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
         # want to pass info over stdin rather than through normal list of args
         args_str = json.dumps({
@@ -1331,15 +1334,15 @@ class CephadmServe:
             'username': username,
             'password': password,
         })
-        out, err, code = self._run_cephadm(
+        out, err, code = await self._run_cephadm(
             host, 'mon', 'registry-login',
             ['--registry-json', '-'], stdin=args_str, error_ok=True)
         if code:
             return f"Host {host} failed to login to {url} as {username} with given password"
         return None
 
-    def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
+    async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
         # Use tee (from coreutils) to create a copy of cephadm on the target machine
         self.log.info(f"Deploying cephadm binary to {host}")
-        self.mgr.ssh.write_remote_file(host, self.mgr.cephadm_binary_path,
-                                       self.mgr._cephadm.encode('utf-8'), addr=addr)
+        await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path,
+                                              self.mgr._cephadm.encode('utf-8'), addr=addr)
index 347256f51d55459ed5ba667c8d08d7d15fdce9a8..50011b6b2d4f69a93238ca2a9816da8e7d310c02 100644 (file)
@@ -12,7 +12,6 @@ from ceph.utils import datetime_to_str, str_to_datetime
 from datetime import datetime
 import orchestrator
 from cephadm.serve import CephadmServe
-from cephadm.utils import forall_hosts
 from ceph.utils import datetime_now
 from orchestrator import OrchestratorError, DaemonDescription
 from mgr_module import MonCommandFailed
@@ -35,8 +34,7 @@ class OSDService(CephService):
             logger.info(
                 f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}")
 
-        @forall_hosts
-        def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
+        async def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
             # skip this host if there has been no change in inventory
             if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
                 self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
@@ -60,7 +58,7 @@ class OSDService(CephService):
             ))
             start_ts = datetime_now()
             env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
-            ret_msg = self.create_single_host(
+            ret_msg = await self.create_single_host(
                 drive_group, host, cmd,
                 replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars
             )
@@ -70,14 +68,17 @@ class OSDService(CephService):
             self.mgr.cache.save_host(host)
             return ret_msg
 
-        ret = create_from_spec_one(self.prepare_drivegroup(drive_group))
+        async def all_hosts() -> List[Optional[str]]:
+            return [await create_from_spec_one(h, ds) for h, ds in self.prepare_drivegroup(drive_group)]
+
+        ret = self.mgr.wait_async(all_hosts())
         return ", ".join(filter(None, ret))
 
-    def create_single_host(self,
-                           drive_group: DriveGroupSpec,
-                           host: str, cmd: str, replace_osd_ids: List[str],
-                           env_vars: Optional[List[str]] = None) -> str:
-        out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
+    async def create_single_host(self,
+                                 drive_group: DriveGroupSpec,
+                                 host: str, cmd: str, replace_osd_ids: List[str],
+                                 env_vars: Optional[List[str]] = None) -> str:
+        out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
 
         if code == 1 and ', it is already prepared' in '\n'.join(err):
             # HACK: when we create against an existing LV, ceph-volume
@@ -89,18 +90,18 @@ class OSDService(CephService):
             raise RuntimeError(
                 'cephadm exited with an error code: %d, stderr:%s' % (
                     code, '\n'.join(err)))
-        return self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
-                                                         replace_osd_ids)
+        return await self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
+                                                               replace_osd_ids)
 
-    def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str,
-                                             replace_osd_ids: Optional[List[str]] = None) -> str:
+    async def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str,
+                                                   replace_osd_ids: Optional[List[str]] = None) -> str:
 
         if replace_osd_ids is None:
             replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host)
             assert replace_osd_ids is not None
 
         # check result: lvm
-        osds_elems: dict = CephadmServe(self.mgr)._run_cephadm_json(
+        osds_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
             host, 'osd', 'ceph-volume',
             [
                 '--',
@@ -139,12 +140,12 @@ class OSDService(CephService):
                     daemon_type='osd',
                 )
                 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
-                CephadmServe(self.mgr)._create_daemon(
+                await CephadmServe(self.mgr)._create_daemon(
                     daemon_spec,
                     osd_uuid_map=osd_uuid_map)
 
         # check result: raw
-        raw_elems: dict = CephadmServe(self.mgr)._run_cephadm_json(
+        raw_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
             host, 'osd', 'ceph-volume',
             [
                 '--',
@@ -176,7 +177,7 @@ class OSDService(CephService):
                 daemon_type='osd',
             )
             daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
-            CephadmServe(self.mgr)._create_daemon(
+            await CephadmServe(self.mgr)._create_daemon(
                 daemon_spec,
                 osd_uuid_map=osd_uuid_map)
 
@@ -279,7 +280,7 @@ class OSDService(CephService):
                     continue
 
                 # get preview data from ceph-volume
-                out, err, code = self._run_ceph_volume_command(host, cmd)
+                out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
                 if out:
                     try:
                         concat_out: Dict[str, Any] = json.loads(' '.join(out))
@@ -322,9 +323,9 @@ class OSDService(CephService):
                 matching_specs.append(spec)
         return matching_specs
 
-    def _run_ceph_volume_command(self, host: str,
-                                 cmd: str, env_vars: Optional[List[str]] = None
-                                 ) -> Tuple[List[str], List[str], int]:
+    async def _run_ceph_volume_command(self, host: str,
+                                       cmd: str, env_vars: Optional[List[str]] = None
+                                       ) -> Tuple[List[str], List[str], int]:
         self.mgr.inventory.assert_host(host)
 
         # get bootstrap key
@@ -341,7 +342,7 @@ class OSDService(CephService):
         split_cmd = cmd.split(' ')
         _cmd = ['--config-json', '-', '--']
         _cmd.extend(split_cmd)
-        out, err, code = CephadmServe(self.mgr)._run_cephadm(
+        out, err, code = await CephadmServe(self.mgr)._run_cephadm(
             host, 'osd', 'ceph-volume',
             _cmd,
             env_vars=env_vars,
@@ -516,10 +517,10 @@ class RemoveUtil(object):
     def zap_osd(self, osd: "OSD") -> str:
         "Zaps all devices that are associated with an OSD"
         if osd.hostname is not None:
-            out, err, code = CephadmServe(self.mgr)._run_cephadm(
+            out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
                 osd.hostname, 'osd', 'ceph-volume',
                 ['--', 'lvm', 'zap', '--destroy', '--osd-id', str(osd.osd_id)],
-                error_ok=True)
+                error_ok=True))
             self.mgr.cache.invalidate_host_devices(osd.hostname)
             if code:
                 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
@@ -556,7 +557,8 @@ class RemoveUtil(object):
         if ret != 0:
             self.mgr.log.debug(f"ran {cmd_args} with mon_command")
             if not error_ok:
-                self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
+                self.mgr.log.error(
+                    f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
             return False
         self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
         return True
index 8254f142d5b169bf6a88a7e46d43ae741397d477..57c5780347fee97b94fdd5dfd1b3a9ec1953d9c9 100644 (file)
@@ -23,7 +23,7 @@ def get_ceph_option(_, key):
 
 
 def _run_cephadm(ret):
-    def foo(s, host, entity, cmd, e, **kwargs):
+    async def foo(s, host, entity, cmd, e, **kwargs):
         if cmd == 'gather-facts':
             return '{}', '', 0
         return [ret], '', 0
@@ -58,7 +58,7 @@ def receive_agent_metadata(m: CephadmOrchestrator, host: str, ops: List[str] = N
     }
     if ops:
         for op in ops:
-            out = CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, [])
+            out = m.wait_async(CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, []))
             to_update[op](host, out)
     m.cache.last_daemon_update[host] = datetime_now()
     m.cache.last_facts_update[host] = datetime_now()
index 0e6a46e737df65071da7273319b63fb8b26a182d..e03ecddc9a84eb6b4d32af7e3eff232506711e51 100644 (file)
@@ -1424,10 +1424,10 @@ class TestCephadm(object):
         assert not retval.result_str()
         assert cephadm_module.inventory._inventory[hostname]['status'] == 'maintenance'
 
-    @mock.patch("cephadm.ssh.SSHManager.remote_connection")
-    @mock.patch("cephadm.ssh.SSHManager.execute_command")
-    @mock.patch("cephadm.ssh.SSHManager.check_execute_command")
-    @mock.patch("cephadm.ssh.SSHManager.write_remote_file")
+    @mock.patch("cephadm.ssh.SSHManager._remote_connection")
+    @mock.patch("cephadm.ssh.SSHManager._execute_command")
+    @mock.patch("cephadm.ssh.SSHManager._check_execute_command")
+    @mock.patch("cephadm.ssh.SSHManager._write_remote_file")
     def test_etc_ceph(self, _write_file, check_execute_command, execute_command, remote_connection, cephadm_module):
         _write_file.return_value = None
         check_execute_command.return_value = ''
@@ -1446,7 +1446,7 @@ class TestCephadm(object):
 
             CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
             _write_file.assert_called_with('test', '/etc/ceph/ceph.conf', b'',
-                                           0o644, 0, 0)
+                                           0o644, 0, 0, None)
 
             assert '/etc/ceph/ceph.conf' in cephadm_module.cache.get_host_client_files('test')
 
@@ -1454,7 +1454,7 @@ class TestCephadm(object):
             cephadm_module._set_extra_ceph_conf('[mon]\nk=v')
             CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
             _write_file.assert_called_with('test', '/etc/ceph/ceph.conf',
-                                           b'\n\n[mon]\nk=v\n', 0o644, 0, 0)
+                                           b'\n\n[mon]\nk=v\n', 0o644, 0, 0, None)
 
             # reload
             cephadm_module.cache.last_client_files = {}
index 83690b207dfdd39c20d019b6a203171b2db821a0..5c10c04b094471a855774aba85aae29bcba4af6e 100644 (file)
@@ -22,8 +22,8 @@ from cephadm.tests.fixtures import with_host, wait
 
 @pytest.mark.skipif(ConnectionLost is None, reason='no asyncssh')
 class TestWithSSH:
-    @mock.patch("cephadm.ssh.SSHManager.execute_command")
-    @mock.patch("cephadm.ssh.SSHManager.check_execute_command")
+    @mock.patch("cephadm.ssh.SSHManager._execute_command")
+    @mock.patch("cephadm.ssh.SSHManager._check_execute_command")
     def test_offline(self, check_execute_command, execute_command, cephadm_module):
         check_execute_command.return_value = ''
         execute_command.return_value = '', '', 0
index 0236dbe1c14cb23431a6a4bc8ff8ae1e6a6ac9ad..487d9c84d67654b39d4b4802161034a595e03edd 100644 (file)
@@ -55,7 +55,8 @@ class UpgradeState:
         self.error: Optional[str] = error
         self.paused: bool = paused or False
         self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds
-        self.fs_original_allow_standby_replay: Optional[Dict[str, bool]] = fs_original_allow_standby_replay
+        self.fs_original_allow_standby_replay: Optional[Dict[str,
+                                                             bool]] = fs_original_allow_standby_replay
 
     def to_json(self) -> dict:
         return {
@@ -442,13 +443,15 @@ class CephadmUpgrade:
                 continue
 
             if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1):
-                self.mgr.log.info('Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name))
+                self.mgr.log.info(
+                    'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name))
                 time.sleep(10)
                 continue_upgrade = False
                 continue
 
             if len(mdsmap['up']) == 0:
-                self.mgr.log.warning("Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction")
+                self.mgr.log.warning(
+                    "Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction")
                 # This can happen because the current version MDS have
                 # incompatible compatsets; the mons will not do any promotions.
                 # We must upgrade to continue.
@@ -531,8 +534,8 @@ class CephadmUpgrade:
             logger.info('Upgrade: First pull of %s' % target_image)
             self.upgrade_info_str: str = 'Doing first pull of %s image' % (target_image)
             try:
-                target_id, target_version, target_digests = CephadmServe(self.mgr)._get_container_image_info(
-                    target_image)
+                target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info(
+                    target_image))
             except OrchestratorError as e:
                 self._fail_upgrade('UPGRADE_FAILED_PULL', {
                     'severity': 'warning',
@@ -703,17 +706,17 @@ class CephadmUpgrade:
                 self._update_upgrade_progress(done / len(daemons))
 
                 # make sure host has latest container image
-                out, errs, code = CephadmServe(self.mgr)._run_cephadm(
+                out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
                     d.hostname, '', 'inspect-image', [],
-                    image=target_image, no_fsid=True, error_ok=True)
+                    image=target_image, no_fsid=True, error_ok=True))
                 if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
                     logger.info('Upgrade: Pulling %s on %s' % (target_image,
                                                                d.hostname))
                     self.upgrade_info_str = 'Pulling %s image on host %s' % (
                         target_image, d.hostname)
-                    out, errs, code = CephadmServe(self.mgr)._run_cephadm(
+                    out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
                         d.hostname, '', 'pull', [],
-                        image=target_image, no_fsid=True, error_ok=True)
+                        image=target_image, no_fsid=True, error_ok=True))
                     if code:
                         self._fail_upgrade('UPGRADE_FAILED_PULL', {
                             'severity': 'warning',