+import asyncio
import json
import errno
import ipaddress
import shlex
from collections import defaultdict
from configparser import ConfigParser
+from contextlib import contextmanager
from functools import wraps
from tempfile import TemporaryDirectory, NamedTemporaryFile
from threading import Event
import string
from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
- Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, Awaitable
+ Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, \
+ Awaitable, Iterator
import datetime
import os
serve = CephadmServe(self)
serve.serve()
- def wait_async(self, coro: Awaitable[T]) -> T:
- return self.event_loop.get_result(coro)
+ def wait_async(self, coro: Awaitable[T], timeout: Optional[int] = None) -> T:
+ if not timeout:
+ timeout = self.default_cephadm_command_timeout
+ # put a lower bound of 60 seconds in case users
+ # accidentally set it to something unreasonable.
+ # For example if they though it was in minutes
+ # rather than seconds
+ if timeout < 60:
+ self.log.info(f'Found default timeout set to {timeout}. Instead trying minimum of 60.')
+ timeout = 60
+ return self.event_loop.get_result(coro, timeout)
+
+ @contextmanager
+ def async_timeout_handler(self, host: Optional[str] = '',
+ cmd: Optional[str] = '',
+ timeout: Optional[int] = None) -> Iterator[None]:
+ # this is meant to catch asyncio.TimeoutError and convert it into an
+ # OrchestratorError which much of the cephadm codebase is better equipped to handle.
+ # If the command being run, the host it is run on, or the timeout being used
+ # are provided, that will be included in the OrchestratorError's message
+ try:
+ yield
+ except asyncio.TimeoutError:
+ err_str: str = ''
+ if cmd:
+ err_str = f'Command "{cmd}" timed out '
+ else:
+ err_str = 'Command timed out '
+ if host:
+ err_str += f'on host {host} '
+ if timeout:
+ err_str += f'(non-default {timeout} second timeout)'
+ else:
+ err_str += (f'(default {self.default_cephadm_command_timeout} second timeout)')
+ raise OrchestratorError(err_str)
def set_container_image(self, entity: str, image: str) -> None:
self.check_mon_command({
break
if not host:
raise OrchestratorError('no hosts defined')
- r = self.wait_async(CephadmServe(self)._registry_login(host, registry_json))
+ with self.async_timeout_handler(host, 'cephadm registry-login'):
+ r = self.wait_async(CephadmServe(self)._registry_login(host, registry_json))
if r is not None:
return 1, '', r
# if logins succeeded, store info
def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
"""Check whether we can access and manage a remote host"""
try:
- out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host',
- ['--expect-hostname', host],
- addr=addr,
- error_ok=True, no_fsid=True))
+ with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'):
+ out, err, code = self.wait_async(
+ CephadmServe(self)._run_cephadm(
+ host, cephadmNoImage, 'check-host', ['--expect-hostname', host],
+ addr=addr, error_ok=True, no_fsid=True))
if code:
return 1, '', ('check-host failed:\n' + '\n'.join(err))
except ssh.HostConnectionError as e:
'cephadm prepare-host')
def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
"""Prepare a remote host for use with cephadm"""
- out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host',
- ['--expect-hostname', host],
- addr=addr,
- error_ok=True, no_fsid=True))
+ with self.async_timeout_handler(host, 'cephadm prepare-host'):
+ out, err, code = self.wait_async(
+ CephadmServe(self)._run_cephadm(
+ host, cephadmNoImage, 'prepare-host', ['--expect-hostname', host],
+ addr=addr, error_ok=True, no_fsid=True))
if code:
return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
# if we have an outstanding health alert for this host, give the
@forall_hosts
def run(h: str) -> str:
- return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd'))
+ with self.async_timeout_handler(h, 'cephadm deploy (osd daemon)'):
+ return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd'))
return HandleCommandResult(stdout='\n'.join(run(host)))
f'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.')
ip_addr = self.inventory.get_addr(host)
try:
- out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
- host, cephadmNoImage, 'check-host',
- ['--expect-hostname', host],
- addr=addr,
- error_ok=True, no_fsid=True))
+ with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'):
+ out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
+ host, cephadmNoImage, 'check-host',
+ ['--expect-hostname', host],
+ addr=addr,
+ error_ok=True, no_fsid=True))
if code:
msg = 'check-host failed:\n' + '\n'.join(err)
# err will contain stdout and stderr, so we filter on the message text to
msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc)
# call the host-maintenance function
- _out, _err, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
- ["enter"],
- error_ok=True))
+ with self.async_timeout_handler(hostname, 'cephadm host-maintenance enter'):
+ _out, _err, _code = self.wait_async(
+ CephadmServe(self)._run_cephadm(
+ hostname, cephadmNoImage, "host-maintenance",
+ ["enter"],
+ error_ok=True))
returned_msg = _err[0].split('\n')[-1]
if (returned_msg.startswith('failed') or returned_msg.startswith('ERROR')) and not yes_i_really_mean_it:
raise OrchestratorError(
if tgt_host['status'] != "maintenance":
raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
- outs, errs, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
- ['exit'],
- error_ok=True))
+ with self.async_timeout_handler(hostname, 'cephadm host-maintenance exit'):
+ outs, errs, _code = self.wait_async(
+ CephadmServe(self)._run_cephadm(hostname, cephadmNoImage,
+ 'host-maintenance', ['exit'], error_ok=True))
returned_msg = errs[0].split('\n')[-1]
if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
raise OrchestratorError(
:param hostname: (str) host name
"""
self.log.info(f'disk rescan request sent to host "{hostname}"')
- _out, _err, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan",
- [],
- no_fsid=True,
- error_ok=True))
+ with self.async_timeout_handler(hostname, 'cephadm disk-rescan'):
+ _out, _err, _code = self.wait_async(
+ CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan",
+ [], no_fsid=True, error_ok=True))
if not _err:
raise OrchestratorError('Unexpected response from cephadm disk-rescan call')
if daemon_spec.daemon_type != 'osd':
daemon_spec = self.cephadm_services[daemon_type_to_service(
daemon_spec.daemon_type)].prepare_create(daemon_spec)
- self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True))
+ with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
+ self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True))
# try to be clever, or fall back to restarting the daemon
rc = -1
# prepare_create function
daemon_spec.final_config, daemon_spec.deps = self.osd_service.generate_config(
daemon_spec)
- return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')))
+ with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
+ return self.wait_async(
+ CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')))
actions = {
'start': ['reset-failed', 'start'],
name = daemon_spec.name()
for a in actions[action]:
try:
- out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
- daemon_spec.host, name, 'unit',
- ['--name', name, a]))
+ with self.async_timeout_handler(daemon_spec.host, f'cephadm unit --name {name}'):
+ out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
+ daemon_spec.host, name, 'unit',
+ ['--name', name, a]))
except Exception:
self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed')
self.cache.invalidate_host_daemons(daemon_spec.host)
f"OSD{'s' if len(active_osds) > 1 else ''}"
f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.")
- out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
- host, 'osd', 'ceph-volume',
- ['--', 'lvm', 'zap', '--destroy', path],
- error_ok=True))
+ cv_args = ['--', 'lvm', 'zap', '--destroy', path]
+ with self.async_timeout_handler(host, f'cephadm ceph-volume {" ".join(cv_args)}'):
+ out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
+ host, 'osd', 'ceph-volume', cv_args, error_ok=True))
self.cache.invalidate_host_devices(host)
self.cache.invalidate_host_networks(host)
host=host)
cmd_args = shlex.split(cmd_line)
- out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
- host, 'osd', 'shell', ['--'] + cmd_args,
- error_ok=True))
+ with self.async_timeout_handler(host, f'cephadm shell -- {" ".join(cmd_args)}'):
+ out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
+ host, 'osd', 'shell', ['--'] + cmd_args,
+ error_ok=True))
if code:
raise OrchestratorError(
'Unable to affect %s light for %s:%s. Command: %s' % (
@ forall_hosts
def create_func_map(*args: Any) -> str:
daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args)
- return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec))
+ with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
+ return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec))
return create_func_map(args)
else:
raise OrchestratorError('must specify either image or version')
- image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name))
+ with self.async_timeout_handler(cmd=f'cephadm inspect-image (image {target_name})'):
+ image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name))
ceph_image_version = image_info.ceph_version
if not ceph_image_version:
if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'):
self.log.debug(f"Logging `{host}` into custom registry")
- r = self.mgr.wait_async(self._registry_login(
- host, json.loads(str(self.mgr.get_store('registry_credentials')))))
+ with self.mgr.async_timeout_handler(host, 'cephadm registry-login'):
+ r = self.mgr.wait_async(self._registry_login(
+ host, json.loads(str(self.mgr.get_store('registry_credentials')))))
if r:
bad_hosts.append(r)
self.log.debug(' checking %s' % host)
try:
addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
- out, err, code = self.mgr.wait_async(self._run_cephadm(
- host, cephadmNoImage, 'check-host', [],
- error_ok=True, no_fsid=True, log_output=self.mgr.log_refresh_metadata))
+ with self.mgr.async_timeout_handler(host, 'cephadm check-host'):
+ out, err, code = self.mgr.wait_async(self._run_cephadm(
+ host, cephadmNoImage, 'check-host', [],
+ error_ok=True, no_fsid=True, log_output=self.mgr.log_refresh_metadata))
self.mgr.cache.update_last_host_check(host)
self.mgr.cache.save_host(host)
if code:
def _refresh_host_daemons(self, host: str) -> Optional[str]:
try:
- ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [],
- no_fsid=True, log_output=self.mgr.log_refresh_metadata))
+ with self.mgr.async_timeout_handler(host, 'cephadm ls'):
+ ls = self.mgr.wait_async(self._run_cephadm_json(
+ host, 'mon', 'ls', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
except OrchestratorError as e:
return str(e)
self.mgr._process_ls_output(host, ls)
def _refresh_facts(self, host: str) -> Optional[str]:
try:
- val = self.mgr.wait_async(self._run_cephadm_json(
- host, cephadmNoImage, 'gather-facts', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
+ with self.mgr.async_timeout_handler(host, 'cephadm gather-facts'):
+ val = self.mgr.wait_async(self._run_cephadm_json(
+ host, cephadmNoImage, 'gather-facts', [],
+ no_fsid=True, log_output=self.mgr.log_refresh_metadata))
except OrchestratorError as e:
return str(e)
try:
try:
- devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
- inventory_args, log_output=self.mgr.log_refresh_metadata))
+ with self.mgr.async_timeout_handler(host, 'cephadm ceph-volume -- inventory'):
+ devices = self.mgr.wait_async(self._run_cephadm_json(
+ host, 'osd', 'ceph-volume', inventory_args, log_output=self.mgr.log_refresh_metadata))
except OrchestratorError as e:
if 'unrecognized arguments: --filter-for-batch' in str(e):
rerun_args = inventory_args.copy()
rerun_args.remove('--filter-for-batch')
- devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
- rerun_args, log_output=self.mgr.log_refresh_metadata))
+ with self.mgr.async_timeout_handler(host, 'cephadm ceph-volume -- inventory'):
+ devices = self.mgr.wait_async(self._run_cephadm_json(
+ host, 'osd', 'ceph-volume', rerun_args, log_output=self.mgr.log_refresh_metadata))
else:
raise
def _refresh_host_networks(self, host: str) -> Optional[str]:
try:
- networks = self.mgr.wait_async(self._run_cephadm_json(
- host, 'mon', 'list-networks', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
+ with self.mgr.async_timeout_handler(host, 'cephadm list-networks'):
+ networks = self.mgr.wait_async(self._run_cephadm_json(
+ host, 'mon', 'list-networks', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
except OrchestratorError as e:
return str(e)
try:
daemon_spec = svc.prepare_create(daemon_spec)
- self.mgr.wait_async(self._create_daemon(daemon_spec))
+ with self.mgr.async_timeout_handler(slot.hostname, f'cephadm deploy ({daemon_spec.daemon_type} type dameon)'):
+ self.mgr.wait_async(self._create_daemon(daemon_spec))
r = True
progress_done += 1
update_progress()
digests: Dict[str, ContainerInspectInfo] = {}
for container_image_ref in set(settings.values()):
if not is_repo_digest(container_image_ref):
- image_info = self.mgr.wait_async(
- self._get_container_image_info(container_image_ref))
+ with self.mgr.async_timeout_handler(cmd=f'cephadm inspect-image (image {container_image_ref})'):
+ image_info = self.mgr.wait_async(
+ self._get_container_image_info(container_image_ref))
if image_info.repo_digests:
# FIXME: we assume the first digest here is the best
assert is_repo_digest(image_info.repo_digests[0]), image_info
args = ['--name', name, '--force']
self.log.info('Removing daemon %s from %s -- ports %s' % (name, host, dd.ports))
- out, err, code = self.mgr.wait_async(self._run_cephadm(
- host, name, 'rm-daemon', args))
+ with self.mgr.async_timeout_handler(host, f'cephadm rm-daemon (daemon {name})'):
+ out, err, code = self.mgr.wait_async(self._run_cephadm(
+ host, name, 'rm-daemon', args))
if not code:
# remove item from cache
self.mgr.cache.rm_daemon(host, name)
final_args += ['--no-cgroups-split']
if not timeout:
- # 15 minute global timeout if no timeout was passed
+ # default global timeout if no timeout was passed
timeout = self.mgr.default_cephadm_command_timeout
+ # put a lower bound of 60 seconds in case users
+ # accidentally set it to something unreasonable.
+ # For example if they though it was in minutes
+ # rather than seconds
+ if timeout < 60:
+ self.log.info(f'Found default timeout set to {timeout}. Instead trying minimum of 60.')
+ timeout = 60
+ # subtract a small amount to give this timeout
+ # in the binary a chance to actually happen over
+ # the asyncio based timeout in the mgr module
+ timeout -= 5
final_args += ['--timeout', str(timeout)]
# subcommand
for h, ds in self.prepare_drivegroup(drive_group)]
return await gather(*futures)
- ret = self.mgr.wait_async(all_hosts())
+ with self.mgr.async_timeout_handler('cephadm deploy (osd daemon)'):
+ ret = self.mgr.wait_async(all_hosts())
return ", ".join(filter(None, ret))
async def create_single_host(self,
# get preview data from ceph-volume
for cmd in cmds:
- out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
+ with self.mgr.async_timeout_handler(host, f'cephadm ceph-volume -- {cmd}'):
+ out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
if out:
try:
concat_out: Dict[str, Any] = json.loads(' '.join(out))
cmd = ['--', 'lvm', 'zap', '--osd-id', str(osd.osd_id)]
if not osd.no_destroy:
cmd.append('--destroy')
- out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
- osd.hostname, 'osd', 'ceph-volume',
- cmd,
- error_ok=True))
+ with self.mgr.async_timeout_handler(osd.hostname, f'cephadm ceph-volume {" ".join(cmd)}'):
+ out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
+ osd.hostname, 'osd', 'ceph-volume',
+ cmd,
+ error_ok=True))
self.mgr.cache.invalidate_host_devices(osd.hostname)
if code:
raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
super().__init__(target=self._loop.run_forever)
self.start()
- def get_result(self, coro: Awaitable[T]) -> T:
- return asyncio.run_coroutine_threadsafe(coro, self._loop).result()
+ def get_result(self, coro: Awaitable[T], timeout: Optional[int] = None) -> T:
+ # useful to note: This "run_coroutine_threadsafe" returns a
+ # concurrent.futures.Future, rather than an asyncio.Future. They are
+ # fairly similar but have a few differences, notably in our case
+ # that the result function of a concurrent.futures.Future accepts
+ # a timeout argument
+ future = asyncio.run_coroutine_threadsafe(coro, self._loop)
+ try:
+ return future.result(timeout)
+ except asyncio.TimeoutError:
+ # try to cancel the task before raising the exception further up
+ future.cancel()
+ raise
class SSHManager:
host: str,
addr: Optional[str] = None,
) -> "SSHClientConnection":
- return self.mgr.wait_async(self._remote_connection(host, addr))
+ with self.mgr.async_timeout_handler(host, f'ssh {host} (addr {addr})'):
+ return self.mgr.wait_async(self._remote_connection(host, addr))
async def _execute_command(self,
host: str,
addr: Optional[str] = None,
log_command: Optional[bool] = True
) -> Tuple[str, str, int]:
- return self.mgr.wait_async(self._execute_command(host, cmd, stdin, addr, log_command))
+ with self.mgr.async_timeout_handler(host, " ".join(cmd)):
+ return self.mgr.wait_async(self._execute_command(host, cmd, stdin, addr, log_command))
async def _check_execute_command(self,
host: str,
addr: Optional[str] = None,
log_command: Optional[bool] = True,
) -> str:
- return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr, log_command))
+ with self.mgr.async_timeout_handler(host, " ".join(cmd)):
+ return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr, log_command))
async def _write_remote_file(self,
host: str,
gid: Optional[int] = None,
addr: Optional[str] = None,
) -> None:
- self.mgr.wait_async(self._write_remote_file(
- host, path, content, mode, uid, gid, addr))
+ with self.mgr.async_timeout_handler(host, f'writing file {path}'):
+ self.mgr.wait_async(self._write_remote_file(
+ host, path, content, mode, uid, gid, addr))
async def _reset_con(self, host: str) -> None:
conn = self.cons.get(host)
del self.cons[host]
def reset_con(self, host: str) -> None:
- self.mgr.wait_async(self._reset_con(host))
+ with self.mgr.async_timeout_handler(cmd=f'resetting ssh connection to {host}'):
+ self.mgr.wait_async(self._reset_con(host))
def _reset_cons(self) -> None:
for host, conn in self.cons.items():
class MockEventLoopThread:
- def get_result(self, coro):
+ def get_result(self, coro, timeout):
if sys.version_info >= (3, 7):
return asyncio.run(coro)
+import asyncio
import json
import logging
assert cephadm_module.inventory.get_addr('host5') == '1.2.3.5'
with pytest.raises(OrchestratorError):
cephadm_module.inventory.get_addr('host5.domain')
+
+ def test_async_timeout_handler(self, cephadm_module):
+ cephadm_module.default_cephadm_command_timeout = 900
+
+ async def _timeout():
+ raise asyncio.TimeoutError
+
+ with pytest.raises(OrchestratorError, match=r'Command timed out \(default 900 second timeout\)'):
+ with cephadm_module.async_timeout_handler():
+ cephadm_module.wait_async(_timeout())
+
+ with pytest.raises(OrchestratorError, match=r'Command timed out on host hostA \(default 900 second timeout\)'):
+ with cephadm_module.async_timeout_handler('hostA'):
+ cephadm_module.wait_async(_timeout())
+
+ with pytest.raises(OrchestratorError, match=r'Command "testing" timed out \(default 900 second timeout\)'):
+ with cephadm_module.async_timeout_handler(cmd='testing'):
+ cephadm_module.wait_async(_timeout())
+
+ with pytest.raises(OrchestratorError, match=r'Command "testing" timed out on host hostB \(default 900 second timeout\)'):
+ with cephadm_module.async_timeout_handler('hostB', 'testing'):
+ cephadm_module.wait_async(_timeout())
+
+ with pytest.raises(OrchestratorError, match=r'Command timed out \(non-default 111 second timeout\)'):
+ with cephadm_module.async_timeout_handler(timeout=111):
+ cephadm_module.wait_async(_timeout())
+
+ with pytest.raises(OrchestratorError, match=r'Command "very slow" timed out on host hostC \(non-default 999 second timeout\)'):
+ with cephadm_module.async_timeout_handler('hostC', 'very slow', 999):
+ cephadm_module.wait_async(_timeout())
self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t))
else:
self.upgrade_state = None
+ self.upgrade_info_str: str = ''
@property
def target_image(self) -> str:
raise OrchestratorError(
'Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.')
try:
- target_id, target_version, target_digests = self.mgr.wait_async(
- CephadmServe(self.mgr)._get_container_image_info(target_name))
+ with self.mgr.async_timeout_handler('cephadm inspect-image'):
+ target_id, target_version, target_digests = self.mgr.wait_async(
+ CephadmServe(self.mgr)._get_container_image_info(target_name))
except OrchestratorError as e:
raise OrchestratorError(f'Failed to pull {target_name}: {str(e)}')
# what we need to do here is build a list of daemons that must already be upgraded
assert d.hostname is not None
# make sure host has latest container image
- out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
- d.hostname, '', 'inspect-image', [],
- image=target_image, no_fsid=True, error_ok=True))
+ with self.mgr.async_timeout_handler(d.hostname, 'cephadm inspect-image'):
+ out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
+ d.hostname, '', 'inspect-image', [],
+ image=target_image, no_fsid=True, error_ok=True))
if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
logger.info('Upgrade: Pulling %s on %s' % (target_image,
d.hostname))
self.upgrade_info_str = 'Pulling %s image on host %s' % (
target_image, d.hostname)
- out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
- d.hostname, '', 'pull', [],
- image=target_image, no_fsid=True, error_ok=True))
+ with self.mgr.async_timeout_handler(d.hostname, 'cephadm pull'):
+ out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
+ d.hostname, '', 'pull', [],
+ image=target_image, no_fsid=True, error_ok=True))
if code:
self._fail_upgrade('UPGRADE_FAILED_PULL', {
'severity': 'warning',
logger.info('Upgrade: First pull of %s' % target_image)
self.upgrade_info_str = 'Doing first pull of %s image' % (target_image)
try:
- target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info(
- target_image))
+ with self.mgr.async_timeout_handler(f'cephadm inspect-image (image {target_image})'):
+ target_id, target_version, target_digests = self.mgr.wait_async(
+ CephadmServe(self.mgr)._get_container_image_info(target_image))
except OrchestratorError as e:
self._fail_upgrade('UPGRADE_FAILED_PULL', {
'severity': 'warning',