From adf13c82ebb75a132709aa0b0bbc1d4fef7643a7 Mon Sep 17 00:00:00 2001 From: Adam King Date: Mon, 20 Mar 2023 15:31:12 -0400 Subject: [PATCH] mgr/cephadm: asyncio based universal timeout for ssh/cephadm commands Since we already have make use of asyncio for our ssh commands, we can use asyncio's timeout on waiting for concurrent futures to complete as a way to have universal timeouts on our cephadm commands. This change also creates a contextmanager that will catch any asyncio.TimeoutError. Using the contextmanager along with calls to the wait_async function will catch any timeout exception raised and convert it into an appropriate OrchetratorError including information about what and where for the timeout if it was provided (host where run, what command). This allows us to guarantee a background ssh command eventually returns and inform users of any timeouts by raising a health warning or logging the error instead of sitting idle indefinitely Fixes: https://tracker.ceph.com/issues/54024 Signed-off-by: Adam King --- src/pybind/mgr/cephadm/module.py | 136 +++++++++++++------ src/pybind/mgr/cephadm/serve.py | 64 ++++++--- src/pybind/mgr/cephadm/services/osd.py | 15 +- src/pybind/mgr/cephadm/ssh.py | 32 +++-- src/pybind/mgr/cephadm/tests/fixtures.py | 2 +- src/pybind/mgr/cephadm/tests/test_cephadm.py | 31 +++++ src/pybind/mgr/cephadm/upgrade.py | 25 ++-- 7 files changed, 217 insertions(+), 88 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 159449fe5822f..9fbbe6f2fcef5 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -1,3 +1,4 @@ +import asyncio import json import errno import ipaddress @@ -6,6 +7,7 @@ import re import shlex from collections import defaultdict from configparser import ConfigParser +from contextlib import contextmanager from functools import wraps from tempfile import TemporaryDirectory, NamedTemporaryFile from threading import Event @@ -14,7 +16,8 @@ from cephadm.service_discovery import ServiceDiscovery import string from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \ - Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, Awaitable + Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, \ + Awaitable, Iterator import datetime import os @@ -694,8 +697,41 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, serve = CephadmServe(self) serve.serve() - def wait_async(self, coro: Awaitable[T]) -> T: - return self.event_loop.get_result(coro) + def wait_async(self, coro: Awaitable[T], timeout: Optional[int] = None) -> T: + if not timeout: + timeout = self.default_cephadm_command_timeout + # put a lower bound of 60 seconds in case users + # accidentally set it to something unreasonable. + # For example if they though it was in minutes + # rather than seconds + if timeout < 60: + self.log.info(f'Found default timeout set to {timeout}. Instead trying minimum of 60.') + timeout = 60 + return self.event_loop.get_result(coro, timeout) + + @contextmanager + def async_timeout_handler(self, host: Optional[str] = '', + cmd: Optional[str] = '', + timeout: Optional[int] = None) -> Iterator[None]: + # this is meant to catch asyncio.TimeoutError and convert it into an + # OrchestratorError which much of the cephadm codebase is better equipped to handle. + # If the command being run, the host it is run on, or the timeout being used + # are provided, that will be included in the OrchestratorError's message + try: + yield + except asyncio.TimeoutError: + err_str: str = '' + if cmd: + err_str = f'Command "{cmd}" timed out ' + else: + err_str = 'Command timed out ' + if host: + err_str += f'on host {host} ' + if timeout: + err_str += f'(non-default {timeout} second timeout)' + else: + err_str += (f'(default {self.default_cephadm_command_timeout} second timeout)') + raise OrchestratorError(err_str) def set_container_image(self, entity: str, image: str) -> None: self.check_mon_command({ @@ -1132,7 +1168,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, break if not host: raise OrchestratorError('no hosts defined') - r = self.wait_async(CephadmServe(self)._registry_login(host, registry_json)) + with self.async_timeout_handler(host, 'cephadm registry-login'): + r = self.wait_async(CephadmServe(self)._registry_login(host, registry_json)) if r is not None: return 1, '', r # if logins succeeded, store info @@ -1146,10 +1183,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: """Check whether we can access and manage a remote host""" try: - out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host', - ['--expect-hostname', host], - addr=addr, - error_ok=True, no_fsid=True)) + with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'): + out, err, code = self.wait_async( + CephadmServe(self)._run_cephadm( + host, cephadmNoImage, 'check-host', ['--expect-hostname', host], + addr=addr, error_ok=True, no_fsid=True)) if code: return 1, '', ('check-host failed:\n' + '\n'.join(err)) except ssh.HostConnectionError as e: @@ -1172,10 +1210,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, 'cephadm prepare-host') def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: """Prepare a remote host for use with cephadm""" - out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host', - ['--expect-hostname', host], - addr=addr, - error_ok=True, no_fsid=True)) + with self.async_timeout_handler(host, 'cephadm prepare-host'): + out, err, code = self.wait_async( + CephadmServe(self)._run_cephadm( + host, cephadmNoImage, 'prepare-host', ['--expect-hostname', host], + addr=addr, error_ok=True, no_fsid=True)) if code: return 1, '', ('prepare-host failed:\n' + '\n'.join(err)) # if we have an outstanding health alert for this host, give the @@ -1347,7 +1386,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, @forall_hosts def run(h: str) -> str: - return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd')) + with self.async_timeout_handler(h, 'cephadm deploy (osd daemon)'): + return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd')) return HandleCommandResult(stdout='\n'.join(run(host))) @@ -1501,11 +1541,12 @@ Then run the following: f'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.') ip_addr = self.inventory.get_addr(host) try: - out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( - host, cephadmNoImage, 'check-host', - ['--expect-hostname', host], - addr=addr, - error_ok=True, no_fsid=True)) + with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'): + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( + host, cephadmNoImage, 'check-host', + ['--expect-hostname', host], + addr=addr, + error_ok=True, no_fsid=True)) if code: msg = 'check-host failed:\n' + '\n'.join(err) # err will contain stdout and stderr, so we filter on the message text to @@ -1801,9 +1842,12 @@ Then run the following: msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc) # call the host-maintenance function - _out, _err, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance", - ["enter"], - error_ok=True)) + with self.async_timeout_handler(hostname, 'cephadm host-maintenance enter'): + _out, _err, _code = self.wait_async( + CephadmServe(self)._run_cephadm( + hostname, cephadmNoImage, "host-maintenance", + ["enter"], + error_ok=True)) returned_msg = _err[0].split('\n')[-1] if (returned_msg.startswith('failed') or returned_msg.startswith('ERROR')) and not yes_i_really_mean_it: raise OrchestratorError( @@ -1852,9 +1896,10 @@ Then run the following: if tgt_host['status'] != "maintenance": raise OrchestratorError(f"Host {hostname} is not in maintenance mode") - outs, errs, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance', - ['exit'], - error_ok=True)) + with self.async_timeout_handler(hostname, 'cephadm host-maintenance exit'): + outs, errs, _code = self.wait_async( + CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, + 'host-maintenance', ['exit'], error_ok=True)) returned_msg = errs[0].split('\n')[-1] if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'): raise OrchestratorError( @@ -1897,10 +1942,10 @@ Then run the following: :param hostname: (str) host name """ self.log.info(f'disk rescan request sent to host "{hostname}"') - _out, _err, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan", - [], - no_fsid=True, - error_ok=True)) + with self.async_timeout_handler(hostname, 'cephadm disk-rescan'): + _out, _err, _code = self.wait_async( + CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan", + [], no_fsid=True, error_ok=True)) if not _err: raise OrchestratorError('Unexpected response from cephadm disk-rescan call') @@ -2107,7 +2152,8 @@ Then run the following: if daemon_spec.daemon_type != 'osd': daemon_spec = self.cephadm_services[daemon_type_to_service( daemon_spec.daemon_type)].prepare_create(daemon_spec) - self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True)) + with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'): + self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True)) # try to be clever, or fall back to restarting the daemon rc = -1 @@ -2163,7 +2209,9 @@ Then run the following: # prepare_create function daemon_spec.final_config, daemon_spec.deps = self.osd_service.generate_config( daemon_spec) - return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig'))) + with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'): + return self.wait_async( + CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig'))) actions = { 'start': ['reset-failed', 'start'], @@ -2173,9 +2221,10 @@ Then run the following: name = daemon_spec.name() for a in actions[action]: try: - out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( - daemon_spec.host, name, 'unit', - ['--name', name, a])) + with self.async_timeout_handler(daemon_spec.host, f'cephadm unit --name {name}'): + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( + daemon_spec.host, name, 'unit', + ['--name', name, a])) except Exception: self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed') self.cache.invalidate_host_daemons(daemon_spec.host) @@ -2392,10 +2441,10 @@ Then run the following: f"OSD{'s' if len(active_osds) > 1 else ''}" f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.") - out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( - host, 'osd', 'ceph-volume', - ['--', 'lvm', 'zap', '--destroy', path], - error_ok=True)) + cv_args = ['--', 'lvm', 'zap', '--destroy', path] + with self.async_timeout_handler(host, f'cephadm ceph-volume {" ".join(cv_args)}'): + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( + host, 'osd', 'ceph-volume', cv_args, error_ok=True)) self.cache.invalidate_host_devices(host) self.cache.invalidate_host_networks(host) @@ -2432,9 +2481,10 @@ Then run the following: host=host) cmd_args = shlex.split(cmd_line) - out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( - host, 'osd', 'shell', ['--'] + cmd_args, - error_ok=True)) + with self.async_timeout_handler(host, f'cephadm shell -- {" ".join(cmd_args)}'): + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( + host, 'osd', 'shell', ['--'] + cmd_args, + error_ok=True)) if code: raise OrchestratorError( 'Unable to affect %s light for %s:%s. Command: %s' % ( @@ -2700,7 +2750,8 @@ Then run the following: @ forall_hosts def create_func_map(*args: Any) -> str: daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args) - return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec)) + with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'): + return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec)) return create_func_map(args) @@ -3075,7 +3126,8 @@ Then run the following: else: raise OrchestratorError('must specify either image or version') - image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name)) + with self.async_timeout_handler(cmd=f'cephadm inspect-image (image {target_name})'): + image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name)) ceph_image_version = image_info.ceph_version if not ceph_image_version: diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index ea8a67590a9e2..715df329577ee 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -281,8 +281,9 @@ class CephadmServe: if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'): self.log.debug(f"Logging `{host}` into custom registry") - r = self.mgr.wait_async(self._registry_login( - host, json.loads(str(self.mgr.get_store('registry_credentials'))))) + with self.mgr.async_timeout_handler(host, 'cephadm registry-login'): + r = self.mgr.wait_async(self._registry_login( + host, json.loads(str(self.mgr.get_store('registry_credentials'))))) if r: bad_hosts.append(r) @@ -327,9 +328,10 @@ class CephadmServe: self.log.debug(' checking %s' % host) try: addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host - out, err, code = self.mgr.wait_async(self._run_cephadm( - host, cephadmNoImage, 'check-host', [], - error_ok=True, no_fsid=True, log_output=self.mgr.log_refresh_metadata)) + with self.mgr.async_timeout_handler(host, 'cephadm check-host'): + out, err, code = self.mgr.wait_async(self._run_cephadm( + host, cephadmNoImage, 'check-host', [], + error_ok=True, no_fsid=True, log_output=self.mgr.log_refresh_metadata)) self.mgr.cache.update_last_host_check(host) self.mgr.cache.save_host(host) if code: @@ -345,8 +347,9 @@ class CephadmServe: def _refresh_host_daemons(self, host: str) -> Optional[str]: try: - ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [], - no_fsid=True, log_output=self.mgr.log_refresh_metadata)) + with self.mgr.async_timeout_handler(host, 'cephadm ls'): + ls = self.mgr.wait_async(self._run_cephadm_json( + host, 'mon', 'ls', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata)) except OrchestratorError as e: return str(e) self.mgr._process_ls_output(host, ls) @@ -354,8 +357,10 @@ class CephadmServe: def _refresh_facts(self, host: str) -> Optional[str]: try: - val = self.mgr.wait_async(self._run_cephadm_json( - host, cephadmNoImage, 'gather-facts', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata)) + with self.mgr.async_timeout_handler(host, 'cephadm gather-facts'): + val = self.mgr.wait_async(self._run_cephadm_json( + host, cephadmNoImage, 'gather-facts', [], + no_fsid=True, log_output=self.mgr.log_refresh_metadata)) except OrchestratorError as e: return str(e) @@ -373,14 +378,16 @@ class CephadmServe: try: try: - devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume', - inventory_args, log_output=self.mgr.log_refresh_metadata)) + with self.mgr.async_timeout_handler(host, 'cephadm ceph-volume -- inventory'): + devices = self.mgr.wait_async(self._run_cephadm_json( + host, 'osd', 'ceph-volume', inventory_args, log_output=self.mgr.log_refresh_metadata)) except OrchestratorError as e: if 'unrecognized arguments: --filter-for-batch' in str(e): rerun_args = inventory_args.copy() rerun_args.remove('--filter-for-batch') - devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume', - rerun_args, log_output=self.mgr.log_refresh_metadata)) + with self.mgr.async_timeout_handler(host, 'cephadm ceph-volume -- inventory'): + devices = self.mgr.wait_async(self._run_cephadm_json( + host, 'osd', 'ceph-volume', rerun_args, log_output=self.mgr.log_refresh_metadata)) else: raise @@ -397,8 +404,9 @@ class CephadmServe: def _refresh_host_networks(self, host: str) -> Optional[str]: try: - networks = self.mgr.wait_async(self._run_cephadm_json( - host, 'mon', 'list-networks', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata)) + with self.mgr.async_timeout_handler(host, 'cephadm list-networks'): + networks = self.mgr.wait_async(self._run_cephadm_json( + host, 'mon', 'list-networks', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata)) except OrchestratorError as e: return str(e) @@ -847,7 +855,8 @@ class CephadmServe: try: daemon_spec = svc.prepare_create(daemon_spec) - self.mgr.wait_async(self._create_daemon(daemon_spec)) + with self.mgr.async_timeout_handler(slot.hostname, f'cephadm deploy ({daemon_spec.daemon_type} type dameon)'): + self.mgr.wait_async(self._create_daemon(daemon_spec)) r = True progress_done += 1 update_progress() @@ -1069,8 +1078,9 @@ class CephadmServe: digests: Dict[str, ContainerInspectInfo] = {} for container_image_ref in set(settings.values()): if not is_repo_digest(container_image_ref): - image_info = self.mgr.wait_async( - self._get_container_image_info(container_image_ref)) + with self.mgr.async_timeout_handler(cmd=f'cephadm inspect-image (image {container_image_ref})'): + image_info = self.mgr.wait_async( + self._get_container_image_info(container_image_ref)) if image_info.repo_digests: # FIXME: we assume the first digest here is the best assert is_repo_digest(image_info.repo_digests[0]), image_info @@ -1366,8 +1376,9 @@ class CephadmServe: args = ['--name', name, '--force'] self.log.info('Removing daemon %s from %s -- ports %s' % (name, host, dd.ports)) - out, err, code = self.mgr.wait_async(self._run_cephadm( - host, name, 'rm-daemon', args)) + with self.mgr.async_timeout_handler(host, f'cephadm rm-daemon (daemon {name})'): + out, err, code = self.mgr.wait_async(self._run_cephadm( + host, name, 'rm-daemon', args)) if not code: # remove item from cache self.mgr.cache.rm_daemon(host, name) @@ -1461,8 +1472,19 @@ class CephadmServe: final_args += ['--no-cgroups-split'] if not timeout: - # 15 minute global timeout if no timeout was passed + # default global timeout if no timeout was passed timeout = self.mgr.default_cephadm_command_timeout + # put a lower bound of 60 seconds in case users + # accidentally set it to something unreasonable. + # For example if they though it was in minutes + # rather than seconds + if timeout < 60: + self.log.info(f'Found default timeout set to {timeout}. Instead trying minimum of 60.') + timeout = 60 + # subtract a small amount to give this timeout + # in the binary a chance to actually happen over + # the asyncio based timeout in the mgr module + timeout -= 5 final_args += ['--timeout', str(timeout)] # subcommand diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py index 9f404646f384b..d777d7fc6fe43 100644 --- a/src/pybind/mgr/cephadm/services/osd.py +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -74,7 +74,8 @@ class OSDService(CephService): for h, ds in self.prepare_drivegroup(drive_group)] return await gather(*futures) - ret = self.mgr.wait_async(all_hosts()) + with self.mgr.async_timeout_handler('cephadm deploy (osd daemon)'): + ret = self.mgr.wait_async(all_hosts()) return ", ".join(filter(None, ret)) async def create_single_host(self, @@ -308,7 +309,8 @@ class OSDService(CephService): # get preview data from ceph-volume for cmd in cmds: - out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd)) + with self.mgr.async_timeout_handler(host, f'cephadm ceph-volume -- {cmd}'): + out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd)) if out: try: concat_out: Dict[str, Any] = json.loads(' '.join(out)) @@ -545,10 +547,11 @@ class RemoveUtil(object): cmd = ['--', 'lvm', 'zap', '--osd-id', str(osd.osd_id)] if not osd.no_destroy: cmd.append('--destroy') - out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( - osd.hostname, 'osd', 'ceph-volume', - cmd, - error_ok=True)) + with self.mgr.async_timeout_handler(osd.hostname, f'cephadm ceph-volume {" ".join(cmd)}'): + out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( + osd.hostname, 'osd', 'ceph-volume', + cmd, + error_ok=True)) self.mgr.cache.invalidate_host_devices(osd.hostname) if code: raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) diff --git a/src/pybind/mgr/cephadm/ssh.py b/src/pybind/mgr/cephadm/ssh.py index a5b6fb30495d4..560b8b826d5f0 100644 --- a/src/pybind/mgr/cephadm/ssh.py +++ b/src/pybind/mgr/cephadm/ssh.py @@ -52,8 +52,19 @@ class EventLoopThread(Thread): super().__init__(target=self._loop.run_forever) self.start() - def get_result(self, coro: Awaitable[T]) -> T: - return asyncio.run_coroutine_threadsafe(coro, self._loop).result() + def get_result(self, coro: Awaitable[T], timeout: Optional[int] = None) -> T: + # useful to note: This "run_coroutine_threadsafe" returns a + # concurrent.futures.Future, rather than an asyncio.Future. They are + # fairly similar but have a few differences, notably in our case + # that the result function of a concurrent.futures.Future accepts + # a timeout argument + future = asyncio.run_coroutine_threadsafe(coro, self._loop) + try: + return future.result(timeout) + except asyncio.TimeoutError: + # try to cancel the task before raising the exception further up + future.cancel() + raise class SSHManager: @@ -135,7 +146,8 @@ class SSHManager: host: str, addr: Optional[str] = None, ) -> "SSHClientConnection": - return self.mgr.wait_async(self._remote_connection(host, addr)) + with self.mgr.async_timeout_handler(host, f'ssh {host} (addr {addr})'): + return self.mgr.wait_async(self._remote_connection(host, addr)) async def _execute_command(self, host: str, @@ -188,7 +200,8 @@ class SSHManager: addr: Optional[str] = None, log_command: Optional[bool] = True ) -> Tuple[str, str, int]: - return self.mgr.wait_async(self._execute_command(host, cmd, stdin, addr, log_command)) + with self.mgr.async_timeout_handler(host, " ".join(cmd)): + return self.mgr.wait_async(self._execute_command(host, cmd, stdin, addr, log_command)) async def _check_execute_command(self, host: str, @@ -211,7 +224,8 @@ class SSHManager: addr: Optional[str] = None, log_command: Optional[bool] = True, ) -> str: - return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr, log_command)) + with self.mgr.async_timeout_handler(host, " ".join(cmd)): + return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr, log_command)) async def _write_remote_file(self, host: str, @@ -259,8 +273,9 @@ class SSHManager: gid: Optional[int] = None, addr: Optional[str] = None, ) -> None: - self.mgr.wait_async(self._write_remote_file( - host, path, content, mode, uid, gid, addr)) + with self.mgr.async_timeout_handler(host, f'writing file {path}'): + self.mgr.wait_async(self._write_remote_file( + host, path, content, mode, uid, gid, addr)) async def _reset_con(self, host: str) -> None: conn = self.cons.get(host) @@ -270,7 +285,8 @@ class SSHManager: del self.cons[host] def reset_con(self, host: str) -> None: - self.mgr.wait_async(self._reset_con(host)) + with self.mgr.async_timeout_handler(cmd=f'resetting ssh connection to {host}'): + self.mgr.wait_async(self._reset_con(host)) def _reset_cons(self) -> None: for host, conn in self.cons.items(): diff --git a/src/pybind/mgr/cephadm/tests/fixtures.py b/src/pybind/mgr/cephadm/tests/fixtures.py index 7b5cc459fbcf5..869c7f7cea47c 100644 --- a/src/pybind/mgr/cephadm/tests/fixtures.py +++ b/src/pybind/mgr/cephadm/tests/fixtures.py @@ -50,7 +50,7 @@ def match_glob(val, pat): class MockEventLoopThread: - def get_result(self, coro): + def get_result(self, coro, timeout): if sys.version_info >= (3, 7): return asyncio.run(coro) diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 2aaa191a4841d..c47425323a12b 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -1,3 +1,4 @@ +import asyncio import json import logging @@ -2261,3 +2262,33 @@ Traceback (most recent call last): assert cephadm_module.inventory.get_addr('host5') == '1.2.3.5' with pytest.raises(OrchestratorError): cephadm_module.inventory.get_addr('host5.domain') + + def test_async_timeout_handler(self, cephadm_module): + cephadm_module.default_cephadm_command_timeout = 900 + + async def _timeout(): + raise asyncio.TimeoutError + + with pytest.raises(OrchestratorError, match=r'Command timed out \(default 900 second timeout\)'): + with cephadm_module.async_timeout_handler(): + cephadm_module.wait_async(_timeout()) + + with pytest.raises(OrchestratorError, match=r'Command timed out on host hostA \(default 900 second timeout\)'): + with cephadm_module.async_timeout_handler('hostA'): + cephadm_module.wait_async(_timeout()) + + with pytest.raises(OrchestratorError, match=r'Command "testing" timed out \(default 900 second timeout\)'): + with cephadm_module.async_timeout_handler(cmd='testing'): + cephadm_module.wait_async(_timeout()) + + with pytest.raises(OrchestratorError, match=r'Command "testing" timed out on host hostB \(default 900 second timeout\)'): + with cephadm_module.async_timeout_handler('hostB', 'testing'): + cephadm_module.wait_async(_timeout()) + + with pytest.raises(OrchestratorError, match=r'Command timed out \(non-default 111 second timeout\)'): + with cephadm_module.async_timeout_handler(timeout=111): + cephadm_module.wait_async(_timeout()) + + with pytest.raises(OrchestratorError, match=r'Command "very slow" timed out on host hostC \(non-default 999 second timeout\)'): + with cephadm_module.async_timeout_handler('hostC', 'very slow', 999): + cephadm_module.wait_async(_timeout()) diff --git a/src/pybind/mgr/cephadm/upgrade.py b/src/pybind/mgr/cephadm/upgrade.py index 5f96a419ea0f9..552964c845871 100644 --- a/src/pybind/mgr/cephadm/upgrade.py +++ b/src/pybind/mgr/cephadm/upgrade.py @@ -135,6 +135,7 @@ class CephadmUpgrade: self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t)) else: self.upgrade_state = None + self.upgrade_info_str: str = '' @property def target_image(self) -> str: @@ -385,8 +386,9 @@ class CephadmUpgrade: raise OrchestratorError( 'Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.') try: - target_id, target_version, target_digests = self.mgr.wait_async( - CephadmServe(self.mgr)._get_container_image_info(target_name)) + with self.mgr.async_timeout_handler('cephadm inspect-image'): + target_id, target_version, target_digests = self.mgr.wait_async( + CephadmServe(self.mgr)._get_container_image_info(target_name)) except OrchestratorError as e: raise OrchestratorError(f'Failed to pull {target_name}: {str(e)}') # what we need to do here is build a list of daemons that must already be upgraded @@ -860,17 +862,19 @@ class CephadmUpgrade: assert d.hostname is not None # make sure host has latest container image - out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( - d.hostname, '', 'inspect-image', [], - image=target_image, no_fsid=True, error_ok=True)) + with self.mgr.async_timeout_handler(d.hostname, 'cephadm inspect-image'): + out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( + d.hostname, '', 'inspect-image', [], + image=target_image, no_fsid=True, error_ok=True)) if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])): logger.info('Upgrade: Pulling %s on %s' % (target_image, d.hostname)) self.upgrade_info_str = 'Pulling %s image on host %s' % ( target_image, d.hostname) - out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( - d.hostname, '', 'pull', [], - image=target_image, no_fsid=True, error_ok=True)) + with self.mgr.async_timeout_handler(d.hostname, 'cephadm pull'): + out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( + d.hostname, '', 'pull', [], + image=target_image, no_fsid=True, error_ok=True)) if code: self._fail_upgrade('UPGRADE_FAILED_PULL', { 'severity': 'warning', @@ -1075,8 +1079,9 @@ class CephadmUpgrade: logger.info('Upgrade: First pull of %s' % target_image) self.upgrade_info_str = 'Doing first pull of %s image' % (target_image) try: - target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info( - target_image)) + with self.mgr.async_timeout_handler(f'cephadm inspect-image (image {target_image})'): + target_id, target_version, target_digests = self.mgr.wait_async( + CephadmServe(self.mgr)._get_container_image_info(target_image)) except OrchestratorError as e: self._fail_upgrade('UPGRADE_FAILED_PULL', { 'severity': 'warning', -- 2.39.5