]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: asyncio based universal timeout for ssh/cephadm commands 51613/head
authorAdam King <adking@redhat.com>
Mon, 20 Mar 2023 19:31:12 +0000 (15:31 -0400)
committerAdam King <adking@redhat.com>
Mon, 22 May 2023 20:42:36 +0000 (16:42 -0400)
Since we already have make use of asyncio for our ssh commands,
we can use asyncio's timeout on waiting for concurrent futures to complete
as a way to have universal timeouts on our cephadm commands.
This change also creates a contextmanager that will catch any asyncio.TimeoutError.
Using the contextmanager along with calls to the wait_async function
will catch any timeout exception raised and convert it into an appropriate
OrchetratorError including information about what and where for the timeout
if it was provided (host where run, what command). This allows us to guarantee a
background ssh command eventually returns and inform users of any
timeouts by raising a health warning or logging the error instead
of sitting idle indefinitely

Fixes: https://tracker.ceph.com/issues/54024
Signed-off-by: Adam King <adking@redhat.com>
(cherry picked from commit adf13c82ebb75a132709aa0b0bbc1d4fef7643a7)

src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/osd.py
src/pybind/mgr/cephadm/ssh.py
src/pybind/mgr/cephadm/tests/fixtures.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/cephadm/upgrade.py

index c62065342908518523f9577d59e58620e0f14147..51856097c8872fef742e99bf094490fb60868624 100644 (file)
@@ -1,3 +1,4 @@
+import asyncio
 import json
 import errno
 import ipaddress
@@ -6,6 +7,7 @@ import re
 import shlex
 from collections import defaultdict
 from configparser import ConfigParser
+from contextlib import contextmanager
 from functools import wraps
 from tempfile import TemporaryDirectory, NamedTemporaryFile
 from threading import Event
@@ -14,7 +16,8 @@ from cephadm.service_discovery import ServiceDiscovery
 
 import string
 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
-    Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, Awaitable
+    Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, \
+    Awaitable, Iterator
 
 import datetime
 import os
@@ -694,8 +697,41 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         serve = CephadmServe(self)
         serve.serve()
 
-    def wait_async(self, coro: Awaitable[T]) -> T:
-        return self.event_loop.get_result(coro)
+    def wait_async(self, coro: Awaitable[T], timeout: Optional[int] = None) -> T:
+        if not timeout:
+            timeout = self.default_cephadm_command_timeout
+            # put a lower bound of 60 seconds in case users
+            # accidentally set it to something unreasonable.
+            # For example if they though it was in minutes
+            # rather than seconds
+            if timeout < 60:
+                self.log.info(f'Found default timeout set to {timeout}. Instead trying minimum of 60.')
+                timeout = 60
+        return self.event_loop.get_result(coro, timeout)
+
+    @contextmanager
+    def async_timeout_handler(self, host: Optional[str] = '',
+                              cmd: Optional[str] = '',
+                              timeout: Optional[int] = None) -> Iterator[None]:
+        # this is meant to catch asyncio.TimeoutError and convert it into an
+        # OrchestratorError which much of the cephadm codebase is better equipped to handle.
+        # If the command being run, the host it is run on, or the timeout being used
+        # are provided, that will be included in the OrchestratorError's message
+        try:
+            yield
+        except asyncio.TimeoutError:
+            err_str: str = ''
+            if cmd:
+                err_str = f'Command "{cmd}" timed out '
+            else:
+                err_str = 'Command timed out '
+            if host:
+                err_str += f'on host {host} '
+            if timeout:
+                err_str += f'(non-default {timeout} second timeout)'
+            else:
+                err_str += (f'(default {self.default_cephadm_command_timeout} second timeout)')
+            raise OrchestratorError(err_str)
 
     def set_container_image(self, entity: str, image: str) -> None:
         self.check_mon_command({
@@ -1132,7 +1168,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             break
         if not host:
             raise OrchestratorError('no hosts defined')
-        r = self.wait_async(CephadmServe(self)._registry_login(host, registry_json))
+        with self.async_timeout_handler(host, 'cephadm registry-login'):
+            r = self.wait_async(CephadmServe(self)._registry_login(host, registry_json))
         if r is not None:
             return 1, '', r
         # if logins succeeded, store info
@@ -1146,10 +1183,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
     def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
         """Check whether we can access and manage a remote host"""
         try:
-            out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host',
-                                                                             ['--expect-hostname', host],
-                                                                             addr=addr,
-                                                                             error_ok=True, no_fsid=True))
+            with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'):
+                out, err, code = self.wait_async(
+                    CephadmServe(self)._run_cephadm(
+                        host, cephadmNoImage, 'check-host', ['--expect-hostname', host],
+                        addr=addr, error_ok=True, no_fsid=True))
             if code:
                 return 1, '', ('check-host failed:\n' + '\n'.join(err))
         except ssh.HostConnectionError as e:
@@ -1172,10 +1210,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         'cephadm prepare-host')
     def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
         """Prepare a remote host for use with cephadm"""
-        out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host',
-                                                                         ['--expect-hostname', host],
-                                                                         addr=addr,
-                                                                         error_ok=True, no_fsid=True))
+        with self.async_timeout_handler(host, 'cephadm prepare-host'):
+            out, err, code = self.wait_async(
+                CephadmServe(self)._run_cephadm(
+                    host, cephadmNoImage, 'prepare-host', ['--expect-hostname', host],
+                    addr=addr, error_ok=True, no_fsid=True))
         if code:
             return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
         # if we have an outstanding health alert for this host, give the
@@ -1347,7 +1386,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         @forall_hosts
         def run(h: str) -> str:
-            return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd'))
+            with self.async_timeout_handler(h, 'cephadm deploy (osd daemon)'):
+                return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd'))
 
         return HandleCommandResult(stdout='\n'.join(run(host)))
 
@@ -1501,11 +1541,12 @@ Then run the following:
                 f'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.')
             ip_addr = self.inventory.get_addr(host)
         try:
-            out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
-                host, cephadmNoImage, 'check-host',
-                ['--expect-hostname', host],
-                addr=addr,
-                error_ok=True, no_fsid=True))
+            with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'):
+                out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
+                    host, cephadmNoImage, 'check-host',
+                    ['--expect-hostname', host],
+                    addr=addr,
+                    error_ok=True, no_fsid=True))
             if code:
                 msg = 'check-host failed:\n' + '\n'.join(err)
                 # err will contain stdout and stderr, so we filter on the message text to
@@ -1801,9 +1842,12 @@ Then run the following:
                     msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc)
 
             # call the host-maintenance function
-            _out, _err, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
-                                                                                ["enter"],
-                                                                                error_ok=True))
+            with self.async_timeout_handler(hostname, 'cephadm host-maintenance enter'):
+                _out, _err, _code = self.wait_async(
+                    CephadmServe(self)._run_cephadm(
+                        hostname, cephadmNoImage, "host-maintenance",
+                        ["enter"],
+                        error_ok=True))
             returned_msg = _err[0].split('\n')[-1]
             if (returned_msg.startswith('failed') or returned_msg.startswith('ERROR')) and not yes_i_really_mean_it:
                 raise OrchestratorError(
@@ -1852,9 +1896,10 @@ Then run the following:
         if tgt_host['status'] != "maintenance":
             raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
 
-        outs, errs, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
-                                                                            ['exit'],
-                                                                            error_ok=True))
+        with self.async_timeout_handler(hostname, 'cephadm host-maintenance exit'):
+            outs, errs, _code = self.wait_async(
+                CephadmServe(self)._run_cephadm(hostname, cephadmNoImage,
+                                                'host-maintenance', ['exit'], error_ok=True))
         returned_msg = errs[0].split('\n')[-1]
         if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
             raise OrchestratorError(
@@ -1897,10 +1942,10 @@ Then run the following:
         :param hostname: (str) host name
         """
         self.log.info(f'disk rescan request sent to host "{hostname}"')
-        _out, _err, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan",
-                                                                            [],
-                                                                            no_fsid=True,
-                                                                            error_ok=True))
+        with self.async_timeout_handler(hostname, 'cephadm disk-rescan'):
+            _out, _err, _code = self.wait_async(
+                CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan",
+                                                [], no_fsid=True, error_ok=True))
         if not _err:
             raise OrchestratorError('Unexpected response from cephadm disk-rescan call')
 
@@ -2107,7 +2152,8 @@ Then run the following:
         if daemon_spec.daemon_type != 'osd':
             daemon_spec = self.cephadm_services[daemon_type_to_service(
                 daemon_spec.daemon_type)].prepare_create(daemon_spec)
-        self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True))
+        with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
+            self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True))
 
         # try to be clever, or fall back to restarting the daemon
         rc = -1
@@ -2163,7 +2209,9 @@ Then run the following:
                 # prepare_create function
                 daemon_spec.final_config, daemon_spec.deps = self.osd_service.generate_config(
                     daemon_spec)
-            return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')))
+            with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
+                return self.wait_async(
+                    CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')))
 
         actions = {
             'start': ['reset-failed', 'start'],
@@ -2173,9 +2221,10 @@ Then run the following:
         name = daemon_spec.name()
         for a in actions[action]:
             try:
-                out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
-                    daemon_spec.host, name, 'unit',
-                    ['--name', name, a]))
+                with self.async_timeout_handler(daemon_spec.host, f'cephadm unit --name {name}'):
+                    out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
+                        daemon_spec.host, name, 'unit',
+                        ['--name', name, a]))
             except Exception:
                 self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed')
         self.cache.invalidate_host_daemons(daemon_spec.host)
@@ -2392,10 +2441,10 @@ Then run the following:
                     f"OSD{'s' if len(active_osds) > 1 else ''}"
                     f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.")
 
-        out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
-            host, 'osd', 'ceph-volume',
-            ['--', 'lvm', 'zap', '--destroy', path],
-            error_ok=True))
+        cv_args = ['--', 'lvm', 'zap', '--destroy', path]
+        with self.async_timeout_handler(host, f'cephadm ceph-volume {" ".join(cv_args)}'):
+            out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
+                host, 'osd', 'ceph-volume', cv_args, error_ok=True))
 
         self.cache.invalidate_host_devices(host)
         self.cache.invalidate_host_networks(host)
@@ -2432,9 +2481,10 @@ Then run the following:
                                             host=host)
             cmd_args = shlex.split(cmd_line)
 
-            out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
-                host, 'osd', 'shell', ['--'] + cmd_args,
-                error_ok=True))
+            with self.async_timeout_handler(host, f'cephadm shell -- {" ".join(cmd_args)}'):
+                out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
+                    host, 'osd', 'shell', ['--'] + cmd_args,
+                    error_ok=True))
             if code:
                 raise OrchestratorError(
                     'Unable to affect %s light for %s:%s. Command: %s' % (
@@ -2700,7 +2750,8 @@ Then run the following:
         @ forall_hosts
         def create_func_map(*args: Any) -> str:
             daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args)
-            return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec))
+            with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
+                return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec))
 
         return create_func_map(args)
 
@@ -3075,7 +3126,8 @@ Then run the following:
         else:
             raise OrchestratorError('must specify either image or version')
 
-        image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name))
+        with self.async_timeout_handler(cmd=f'cephadm inspect-image (image {target_name})'):
+            image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name))
 
         ceph_image_version = image_info.ceph_version
         if not ceph_image_version:
index 6ab77daf5143a022c5a51eb1b8d85044234bc45a..f102d3e4f7b34d262c93458d5689b103ed5afbd0 100644 (file)
@@ -281,8 +281,9 @@ class CephadmServe:
 
             if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'):
                 self.log.debug(f"Logging `{host}` into custom registry")
-                r = self.mgr.wait_async(self._registry_login(
-                    host, json.loads(str(self.mgr.get_store('registry_credentials')))))
+                with self.mgr.async_timeout_handler(host, 'cephadm registry-login'):
+                    r = self.mgr.wait_async(self._registry_login(
+                        host, json.loads(str(self.mgr.get_store('registry_credentials')))))
                 if r:
                     bad_hosts.append(r)
 
@@ -327,9 +328,10 @@ class CephadmServe:
         self.log.debug(' checking %s' % host)
         try:
             addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
-            out, err, code = self.mgr.wait_async(self._run_cephadm(
-                host, cephadmNoImage, 'check-host', [],
-                error_ok=True, no_fsid=True, log_output=self.mgr.log_refresh_metadata))
+            with self.mgr.async_timeout_handler(host, 'cephadm check-host'):
+                out, err, code = self.mgr.wait_async(self._run_cephadm(
+                    host, cephadmNoImage, 'check-host', [],
+                    error_ok=True, no_fsid=True, log_output=self.mgr.log_refresh_metadata))
             self.mgr.cache.update_last_host_check(host)
             self.mgr.cache.save_host(host)
             if code:
@@ -345,8 +347,9 @@ class CephadmServe:
 
     def _refresh_host_daemons(self, host: str) -> Optional[str]:
         try:
-            ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [],
-                                                            no_fsid=True, log_output=self.mgr.log_refresh_metadata))
+            with self.mgr.async_timeout_handler(host, 'cephadm ls'):
+                ls = self.mgr.wait_async(self._run_cephadm_json(
+                    host, 'mon', 'ls', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
         except OrchestratorError as e:
             return str(e)
         self.mgr._process_ls_output(host, ls)
@@ -354,8 +357,10 @@ class CephadmServe:
 
     def _refresh_facts(self, host: str) -> Optional[str]:
         try:
-            val = self.mgr.wait_async(self._run_cephadm_json(
-                host, cephadmNoImage, 'gather-facts', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
+            with self.mgr.async_timeout_handler(host, 'cephadm gather-facts'):
+                val = self.mgr.wait_async(self._run_cephadm_json(
+                    host, cephadmNoImage, 'gather-facts', [],
+                    no_fsid=True, log_output=self.mgr.log_refresh_metadata))
         except OrchestratorError as e:
             return str(e)
 
@@ -373,14 +378,16 @@ class CephadmServe:
 
         try:
             try:
-                devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
-                                                                     inventory_args, log_output=self.mgr.log_refresh_metadata))
+                with self.mgr.async_timeout_handler(host, 'cephadm ceph-volume -- inventory'):
+                    devices = self.mgr.wait_async(self._run_cephadm_json(
+                        host, 'osd', 'ceph-volume', inventory_args, log_output=self.mgr.log_refresh_metadata))
             except OrchestratorError as e:
                 if 'unrecognized arguments: --filter-for-batch' in str(e):
                     rerun_args = inventory_args.copy()
                     rerun_args.remove('--filter-for-batch')
-                    devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
-                                                                         rerun_args, log_output=self.mgr.log_refresh_metadata))
+                    with self.mgr.async_timeout_handler(host, 'cephadm ceph-volume -- inventory'):
+                        devices = self.mgr.wait_async(self._run_cephadm_json(
+                            host, 'osd', 'ceph-volume', rerun_args, log_output=self.mgr.log_refresh_metadata))
                 else:
                     raise
 
@@ -397,8 +404,9 @@ class CephadmServe:
 
     def _refresh_host_networks(self, host: str) -> Optional[str]:
         try:
-            networks = self.mgr.wait_async(self._run_cephadm_json(
-                host, 'mon', 'list-networks', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
+            with self.mgr.async_timeout_handler(host, 'cephadm list-networks'):
+                networks = self.mgr.wait_async(self._run_cephadm_json(
+                    host, 'mon', 'list-networks', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
         except OrchestratorError as e:
             return str(e)
 
@@ -841,7 +849,8 @@ class CephadmServe:
 
                 try:
                     daemon_spec = svc.prepare_create(daemon_spec)
-                    self.mgr.wait_async(self._create_daemon(daemon_spec))
+                    with self.mgr.async_timeout_handler(slot.hostname, f'cephadm deploy ({daemon_spec.daemon_type} type dameon)'):
+                        self.mgr.wait_async(self._create_daemon(daemon_spec))
                     r = True
                     progress_done += 1
                     update_progress()
@@ -1068,8 +1077,9 @@ class CephadmServe:
         digests: Dict[str, ContainerInspectInfo] = {}
         for container_image_ref in set(settings.values()):
             if not is_repo_digest(container_image_ref):
-                image_info = self.mgr.wait_async(
-                    self._get_container_image_info(container_image_ref))
+                with self.mgr.async_timeout_handler(cmd=f'cephadm inspect-image (image {container_image_ref})'):
+                    image_info = self.mgr.wait_async(
+                        self._get_container_image_info(container_image_ref))
                 if image_info.repo_digests:
                     # FIXME: we assume the first digest here is the best
                     assert is_repo_digest(image_info.repo_digests[0]), image_info
@@ -1365,8 +1375,9 @@ class CephadmServe:
                 args = ['--name', name, '--force']
 
             self.log.info('Removing daemon %s from %s -- ports %s' % (name, host, dd.ports))
-            out, err, code = self.mgr.wait_async(self._run_cephadm(
-                host, name, 'rm-daemon', args))
+            with self.mgr.async_timeout_handler(host, f'cephadm rm-daemon (daemon {name})'):
+                out, err, code = self.mgr.wait_async(self._run_cephadm(
+                    host, name, 'rm-daemon', args))
             if not code:
                 # remove item from cache
                 self.mgr.cache.rm_daemon(host, name)
@@ -1460,8 +1471,19 @@ class CephadmServe:
             final_args += ['--no-cgroups-split']
 
         if not timeout:
-            # 15 minute global timeout if no timeout was passed
+            # default global timeout if no timeout was passed
             timeout = self.mgr.default_cephadm_command_timeout
+            # put a lower bound of 60 seconds in case users
+            # accidentally set it to something unreasonable.
+            # For example if they though it was in minutes
+            # rather than seconds
+            if timeout < 60:
+                self.log.info(f'Found default timeout set to {timeout}. Instead trying minimum of 60.')
+                timeout = 60
+            # subtract a small amount to give this timeout
+            # in the binary a chance to actually happen over
+            # the asyncio based timeout in the mgr module
+            timeout -= 5
         final_args += ['--timeout', str(timeout)]
 
         # subcommand
index 9f404646f384b28cbf035d4cf37c955128b3f39e..d777d7fc6fe43c4381571da439ec6ae01176f88e 100644 (file)
@@ -74,7 +74,8 @@ class OSDService(CephService):
                        for h, ds in self.prepare_drivegroup(drive_group)]
             return await gather(*futures)
 
-        ret = self.mgr.wait_async(all_hosts())
+        with self.mgr.async_timeout_handler('cephadm deploy (osd daemon)'):
+            ret = self.mgr.wait_async(all_hosts())
         return ", ".join(filter(None, ret))
 
     async def create_single_host(self,
@@ -308,7 +309,8 @@ class OSDService(CephService):
 
                 # get preview data from ceph-volume
                 for cmd in cmds:
-                    out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
+                    with self.mgr.async_timeout_handler(host, f'cephadm ceph-volume -- {cmd}'):
+                        out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
                     if out:
                         try:
                             concat_out: Dict[str, Any] = json.loads(' '.join(out))
@@ -545,10 +547,11 @@ class RemoveUtil(object):
             cmd = ['--', 'lvm', 'zap', '--osd-id', str(osd.osd_id)]
             if not osd.no_destroy:
                 cmd.append('--destroy')
-            out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
-                osd.hostname, 'osd', 'ceph-volume',
-                cmd,
-                error_ok=True))
+            with self.mgr.async_timeout_handler(osd.hostname, f'cephadm ceph-volume {" ".join(cmd)}'):
+                out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
+                    osd.hostname, 'osd', 'ceph-volume',
+                    cmd,
+                    error_ok=True))
             self.mgr.cache.invalidate_host_devices(osd.hostname)
             if code:
                 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
index a5b6fb30495d456b2932b903e69227099a60edc2..560b8b826d5f0b6a23cd424c33bb19a3d32f9d84 100644 (file)
@@ -52,8 +52,19 @@ class EventLoopThread(Thread):
         super().__init__(target=self._loop.run_forever)
         self.start()
 
-    def get_result(self, coro: Awaitable[T]) -> T:
-        return asyncio.run_coroutine_threadsafe(coro, self._loop).result()
+    def get_result(self, coro: Awaitable[T], timeout: Optional[int] = None) -> T:
+        # useful to note: This "run_coroutine_threadsafe" returns a
+        # concurrent.futures.Future, rather than an asyncio.Future. They are
+        # fairly similar but have a few differences, notably in our case
+        # that the result function of a concurrent.futures.Future accepts
+        # a timeout argument
+        future = asyncio.run_coroutine_threadsafe(coro, self._loop)
+        try:
+            return future.result(timeout)
+        except asyncio.TimeoutError:
+            # try to cancel the task before raising the exception further up
+            future.cancel()
+            raise
 
 
 class SSHManager:
@@ -135,7 +146,8 @@ class SSHManager:
                           host: str,
                           addr: Optional[str] = None,
                           ) -> "SSHClientConnection":
-        return self.mgr.wait_async(self._remote_connection(host, addr))
+        with self.mgr.async_timeout_handler(host, f'ssh {host} (addr {addr})'):
+            return self.mgr.wait_async(self._remote_connection(host, addr))
 
     async def _execute_command(self,
                                host: str,
@@ -188,7 +200,8 @@ class SSHManager:
                         addr: Optional[str] = None,
                         log_command: Optional[bool] = True
                         ) -> Tuple[str, str, int]:
-        return self.mgr.wait_async(self._execute_command(host, cmd, stdin, addr, log_command))
+        with self.mgr.async_timeout_handler(host, " ".join(cmd)):
+            return self.mgr.wait_async(self._execute_command(host, cmd, stdin, addr, log_command))
 
     async def _check_execute_command(self,
                                      host: str,
@@ -211,7 +224,8 @@ class SSHManager:
                               addr: Optional[str] = None,
                               log_command: Optional[bool] = True,
                               ) -> str:
-        return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr, log_command))
+        with self.mgr.async_timeout_handler(host, " ".join(cmd)):
+            return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr, log_command))
 
     async def _write_remote_file(self,
                                  host: str,
@@ -259,8 +273,9 @@ class SSHManager:
                           gid: Optional[int] = None,
                           addr: Optional[str] = None,
                           ) -> None:
-        self.mgr.wait_async(self._write_remote_file(
-            host, path, content, mode, uid, gid, addr))
+        with self.mgr.async_timeout_handler(host, f'writing file {path}'):
+            self.mgr.wait_async(self._write_remote_file(
+                host, path, content, mode, uid, gid, addr))
 
     async def _reset_con(self, host: str) -> None:
         conn = self.cons.get(host)
@@ -270,7 +285,8 @@ class SSHManager:
             del self.cons[host]
 
     def reset_con(self, host: str) -> None:
-        self.mgr.wait_async(self._reset_con(host))
+        with self.mgr.async_timeout_handler(cmd=f'resetting ssh connection to {host}'):
+            self.mgr.wait_async(self._reset_con(host))
 
     def _reset_cons(self) -> None:
         for host, conn in self.cons.items():
index 7b5cc459fbcf5ae13a6a67c4c573fdf6b009dd7c..869c7f7cea47cb9189d9adf394ed9128f7d67fcc 100644 (file)
@@ -50,7 +50,7 @@ def match_glob(val, pat):
 
 
 class MockEventLoopThread:
-    def get_result(self, coro):
+    def get_result(self, coro, timeout):
         if sys.version_info >= (3, 7):
             return asyncio.run(coro)
 
index db50e7f130bd75d84950f9873691bf1cda23b55d..37aff06756cc018c5b288c717ace72831ecf55fd 100644 (file)
@@ -1,3 +1,4 @@
+import asyncio
 import json
 import logging
 
@@ -2294,3 +2295,33 @@ Traceback (most recent call last):
         assert cephadm_module.inventory.get_addr('host5') == '1.2.3.5'
         with pytest.raises(OrchestratorError):
             cephadm_module.inventory.get_addr('host5.domain')
+
+    def test_async_timeout_handler(self, cephadm_module):
+        cephadm_module.default_cephadm_command_timeout = 900
+
+        async def _timeout():
+            raise asyncio.TimeoutError
+
+        with pytest.raises(OrchestratorError, match=r'Command timed out \(default 900 second timeout\)'):
+            with cephadm_module.async_timeout_handler():
+                cephadm_module.wait_async(_timeout())
+
+        with pytest.raises(OrchestratorError, match=r'Command timed out on host hostA \(default 900 second timeout\)'):
+            with cephadm_module.async_timeout_handler('hostA'):
+                cephadm_module.wait_async(_timeout())
+
+        with pytest.raises(OrchestratorError, match=r'Command "testing" timed out \(default 900 second timeout\)'):
+            with cephadm_module.async_timeout_handler(cmd='testing'):
+                cephadm_module.wait_async(_timeout())
+
+        with pytest.raises(OrchestratorError, match=r'Command "testing" timed out on host hostB \(default 900 second timeout\)'):
+            with cephadm_module.async_timeout_handler('hostB', 'testing'):
+                cephadm_module.wait_async(_timeout())
+
+        with pytest.raises(OrchestratorError, match=r'Command timed out \(non-default 111 second timeout\)'):
+            with cephadm_module.async_timeout_handler(timeout=111):
+                cephadm_module.wait_async(_timeout())
+
+        with pytest.raises(OrchestratorError, match=r'Command "very slow" timed out on host hostC \(non-default 999 second timeout\)'):
+            with cephadm_module.async_timeout_handler('hostC', 'very slow', 999):
+                cephadm_module.wait_async(_timeout())
index 5f96a419ea0f969ef04b9927675e85d764608001..552964c845871475221ad9c55a179f07f5a1c0f9 100644 (file)
@@ -135,6 +135,7 @@ class CephadmUpgrade:
             self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t))
         else:
             self.upgrade_state = None
+        self.upgrade_info_str: str = ''
 
     @property
     def target_image(self) -> str:
@@ -385,8 +386,9 @@ class CephadmUpgrade:
             raise OrchestratorError(
                 'Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.')
         try:
-            target_id, target_version, target_digests = self.mgr.wait_async(
-                CephadmServe(self.mgr)._get_container_image_info(target_name))
+            with self.mgr.async_timeout_handler('cephadm inspect-image'):
+                target_id, target_version, target_digests = self.mgr.wait_async(
+                    CephadmServe(self.mgr)._get_container_image_info(target_name))
         except OrchestratorError as e:
             raise OrchestratorError(f'Failed to pull {target_name}: {str(e)}')
         # what we need to do here is build a list of daemons that must already be upgraded
@@ -860,17 +862,19 @@ class CephadmUpgrade:
             assert d.hostname is not None
 
             # make sure host has latest container image
-            out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
-                d.hostname, '', 'inspect-image', [],
-                image=target_image, no_fsid=True, error_ok=True))
+            with self.mgr.async_timeout_handler(d.hostname, 'cephadm inspect-image'):
+                out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
+                    d.hostname, '', 'inspect-image', [],
+                    image=target_image, no_fsid=True, error_ok=True))
             if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
                 logger.info('Upgrade: Pulling %s on %s' % (target_image,
                                                            d.hostname))
                 self.upgrade_info_str = 'Pulling %s image on host %s' % (
                     target_image, d.hostname)
-                out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
-                    d.hostname, '', 'pull', [],
-                    image=target_image, no_fsid=True, error_ok=True))
+                with self.mgr.async_timeout_handler(d.hostname, 'cephadm pull'):
+                    out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
+                        d.hostname, '', 'pull', [],
+                        image=target_image, no_fsid=True, error_ok=True))
                 if code:
                     self._fail_upgrade('UPGRADE_FAILED_PULL', {
                         'severity': 'warning',
@@ -1075,8 +1079,9 @@ class CephadmUpgrade:
             logger.info('Upgrade: First pull of %s' % target_image)
             self.upgrade_info_str = 'Doing first pull of %s image' % (target_image)
             try:
-                target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info(
-                    target_image))
+                with self.mgr.async_timeout_handler(f'cephadm inspect-image (image {target_image})'):
+                    target_id, target_version, target_digests = self.mgr.wait_async(
+                        CephadmServe(self.mgr)._get_container_image_info(target_image))
             except OrchestratorError as e:
                 self._fail_upgrade('UPGRADE_FAILED_PULL', {
                     'severity': 'warning',