import string
from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
- Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, Coroutine, Awaitable
+ Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, Awaitable
import datetime
import os
break
if not host:
raise OrchestratorError('no hosts defined')
- r = CephadmServe(self)._registry_login(host, url, username, password)
+ r = self.wait_async(CephadmServe(self)._registry_login(host, url, username, password))
if r is not None:
return 1, '', r
# if logins succeeded, store info
def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
"""Check whether we can access and manage a remote host"""
try:
- out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host',
- ['--expect-hostname', host],
- addr=addr,
- error_ok=True, no_fsid=True)
+ out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host',
+ ['--expect-hostname', host],
+ addr=addr,
+ error_ok=True, no_fsid=True))
if code:
return 1, '', ('check-host failed:\n' + '\n'.join(err))
except OrchestratorError:
'cephadm prepare-host')
def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
"""Prepare a remote host for use with cephadm"""
- out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host',
- ['--expect-hostname', host],
- addr=addr,
- error_ok=True, no_fsid=True)
+ out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host',
+ ['--expect-hostname', host],
+ addr=addr,
+ error_ok=True, no_fsid=True))
if code:
return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
# if we have an outstanding health alert for this host, give the
@forall_hosts
def run(h: str) -> str:
- return self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd')
+ return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd'))
return HandleCommandResult(stdout='\n'.join(run(host)))
> ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}'''
raise OrchestratorError(msg)
- out, err, code = CephadmServe(self)._run_cephadm(
+ out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
host, cephadmNoImage, 'check-host',
['--expect-hostname', host],
addr=addr,
- error_ok=True, no_fsid=True)
+ error_ok=True, no_fsid=True))
if code:
msg = 'check-host failed:\n' + '\n'.join(err)
# err will contain stdout and stderr, so we filter on the message text to
msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc)
# call the host-maintenance function
- _out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
- ["enter"],
- error_ok=True)
+ _out, _err, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
+ ["enter"],
+ error_ok=True))
returned_msg = _err[0].split('\n')[-1]
if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
raise OrchestratorError(
if tgt_host['status'] != "maintenance":
raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
- outs, errs, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
- ['exit'],
- error_ok=True)
+ outs, errs, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
+ ['exit'],
+ error_ok=True))
returned_msg = errs[0].split('\n')[-1]
if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
raise OrchestratorError(
if daemon_spec.daemon_type != 'osd':
daemon_spec = self.cephadm_services[daemon_type_to_service(
daemon_spec.daemon_type)].prepare_create(daemon_spec)
- return CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig'))
+ return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')))
actions = {
'start': ['reset-failed', 'start'],
name = daemon_spec.name()
for a in actions[action]:
try:
- out, err, code = CephadmServe(self)._run_cephadm(
+ out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
daemon_spec.host, name, 'unit',
- ['--name', name, a])
+ ['--name', name, a]))
except Exception:
self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed')
self.cache.invalidate_host_daemons(daemon_spec.host)
msg = ''
for h, ls in osds_msg.items():
msg += f'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}'
- raise OrchestratorError(f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
+ raise OrchestratorError(
+ f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
found = self.spec_store.rm(service_name)
if found and service_name.startswith('osd.'):
f"OSD{'s' if len(active_osds) > 1 else ''}"
f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.")
- out, err, code = CephadmServe(self)._run_cephadm(
+ out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
host, 'osd', 'ceph-volume',
['--', 'lvm', 'zap', '--destroy', path],
- error_ok=True)
+ error_ok=True))
self.cache.invalidate_host_devices(host)
self.cache.invalidate_host_networks(host)
host=host)
cmd_args = shlex.split(cmd_line)
- out, err, code = CephadmServe(self)._run_cephadm(
+ out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
host, 'osd', 'shell', ['--'] + cmd_args,
- error_ok=True)
+ error_ok=True))
if code:
raise OrchestratorError(
'Unable to affect %s light for %s:%s. Command: %s' % (
@ forall_hosts
def create_func_map(*args: Any) -> str:
daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args)
- return CephadmServe(self)._create_daemon(daemon_spec)
+ return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec))
return create_func_map(args)
else:
raise OrchestratorError('must specify either image or version')
- image_info = CephadmServe(self)._get_container_image_info(target_name)
+ image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name))
ceph_image_version = image_info.ceph_version
if not ceph_image_version:
if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
self.log.debug(f"Logging `{host}` into custom registry")
- r = self._registry_login(host, self.mgr.registry_url,
- self.mgr.registry_username, self.mgr.registry_password)
+ r = self.mgr.wait_async(self._registry_login(host, self.mgr.registry_url,
+ self.mgr.registry_username, self.mgr.registry_password))
if r:
bad_hosts.append(r)
self.log.debug(' checking %s' % host)
try:
addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
- out, err, code = self._run_cephadm(
+ out, err, code = self.mgr.wait_async(self._run_cephadm(
host, cephadmNoImage, 'check-host', [],
- error_ok=True, no_fsid=True)
+ error_ok=True, no_fsid=True))
self.mgr.cache.update_last_host_check(host)
self.mgr.cache.save_host(host)
if code:
def _refresh_host_daemons(self, host: str) -> Optional[str]:
try:
- ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True)
+ ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True))
except OrchestratorError as e:
return str(e)
self.mgr._process_ls_output(host, ls)
def _refresh_facts(self, host: str) -> Optional[str]:
try:
- val = self._run_cephadm_json(host, cephadmNoImage, 'gather-facts', [], no_fsid=True)
+ val = self.mgr.wait_async(self._run_cephadm_json(
+ host, cephadmNoImage, 'gather-facts', [], no_fsid=True))
except OrchestratorError as e:
return str(e)
try:
try:
- devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
- inventory_args)
+ devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
+ inventory_args))
except OrchestratorError as e:
if 'unrecognized arguments: --filter-for-batch' in str(e):
rerun_args = inventory_args.copy()
rerun_args.remove('--filter-for-batch')
- devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
- rerun_args)
+ devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
+ rerun_args))
else:
raise
def _refresh_host_networks(self, host: str) -> Optional[str]:
try:
- networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True)
+ networks = self.mgr.wait_async(self._run_cephadm_json(
+ host, 'mon', 'list-networks', [], no_fsid=True))
except OrchestratorError as e:
return str(e)
try:
daemon_spec = svc.prepare_create(daemon_spec)
- self._create_daemon(daemon_spec)
+ self.mgr.wait_async(self._create_daemon(daemon_spec))
r = True
progress_done += 1
update_progress()
digests: Dict[str, ContainerInspectInfo] = {}
for container_image_ref in set(settings.values()):
if not is_repo_digest(container_image_ref):
- image_info = self._get_container_image_info(container_image_ref)
+ image_info = self.mgr.wait_async(
+ self._get_container_image_info(container_image_ref))
if image_info.repo_digests:
# FIXME: we assume the first digest here is the best
assert is_repo_digest(image_info.repo_digests[0]), image_info
# FIXME: we assume the first digest here is the best
self.mgr.set_container_image(entity, image_info.repo_digests[0])
- def _create_daemon(self,
- daemon_spec: CephadmDaemonDeploySpec,
- reconfig: bool = False,
- osd_uuid_map: Optional[Dict[str, Any]] = None,
- ) -> str:
+ async def _create_daemon(self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ reconfig: bool = False,
+ osd_uuid_map: Optional[Dict[str, Any]] = None,
+ ) -> str:
with set_exception_subject('service', orchestrator.DaemonDescription(
daemon_type=daemon_spec.daemon_type,
daemon_spec.extra_args.append('--allow-ptrace')
if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
- self._registry_login(daemon_spec.host, self.mgr.registry_url,
- self.mgr.registry_username, self.mgr.registry_password)
+ await self._registry_login(daemon_spec.host, self.mgr.registry_url,
+ self.mgr.registry_username, self.mgr.registry_password)
self.log.info('%s daemon %s on %s' % (
'Reconfiguring' if reconfig else 'Deploying',
daemon_spec.name(), daemon_spec.host))
- out, err, code = self._run_cephadm(
+ out, err, code = await self._run_cephadm(
daemon_spec.host, daemon_spec.name(), 'deploy',
[
'--name', daemon_spec.name(),
# we can delete a mon instances data.
args = ['--name', name, '--force']
self.log.info('Removing daemon %s from %s' % (name, host))
- out, err, code = self._run_cephadm(
- host, name, 'rm-daemon', args)
+ out, err, code = self.mgr.wait_async(self._run_cephadm(
+ host, name, 'rm-daemon', args))
if not code:
# remove item from cache
self.mgr.cache.rm_daemon(host, name)
return "Removed {} from host '{}'".format(name, host)
- def _run_cephadm_json(self,
- host: str,
- entity: Union[CephadmNoImage, str],
- command: str,
- args: List[str],
- no_fsid: Optional[bool] = False,
- image: Optional[str] = "",
- ) -> Any:
+ async def _run_cephadm_json(self,
+ host: str,
+ entity: Union[CephadmNoImage, str],
+ command: str,
+ args: List[str],
+ no_fsid: Optional[bool] = False,
+ image: Optional[str] = "",
+ ) -> Any:
try:
- out, err, code = self._run_cephadm(
+ out, err, code = await self._run_cephadm(
host, entity, command, args, no_fsid=no_fsid, image=image)
if code:
raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
self.log.exception(f'{msg}: {"".join(out)}')
raise OrchestratorError(msg)
- def _run_cephadm(self,
- host: str,
- entity: Union[CephadmNoImage, str],
- command: str,
- args: List[str],
- addr: Optional[str] = "",
- stdin: Optional[str] = "",
- no_fsid: Optional[bool] = False,
- error_ok: Optional[bool] = False,
- image: Optional[str] = "",
- env_vars: Optional[List[str]] = None,
- ) -> Tuple[List[str], List[str], int]:
+ async def _run_cephadm(self,
+ host: str,
+ entity: Union[CephadmNoImage, str],
+ command: str,
+ args: List[str],
+ addr: Optional[str] = "",
+ stdin: Optional[str] = "",
+ no_fsid: Optional[bool] = False,
+ error_ok: Optional[bool] = False,
+ image: Optional[str] = "",
+ env_vars: Optional[List[str]] = None,
+ ) -> Tuple[List[str], List[str], int]:
"""
Run cephadm on the remote host with the given command + args
:env_vars: in format -> [KEY=VALUE, ..]
"""
- self.mgr.ssh.remote_connection(host, addr)
+ await self.mgr.ssh._remote_connection(host, addr)
self.log.debug(f"_run_cephadm : command = {command}")
self.log.debug(f"_run_cephadm : args = {args}")
self.log.debug('stdin: %s' % stdin)
cmd = ['which', 'python3']
- python = self.mgr.ssh.check_execute_command(host, cmd, addr=addr)
+ python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr)
cmd = [python, self.mgr.cephadm_binary_path] + final_args
try:
- out, err, code = self.mgr.ssh.execute_command(
+ out, err, code = await self.mgr.ssh._execute_command(
host, cmd, stdin=stdin, addr=addr)
if code == 2:
ls_cmd = ['ls', self.mgr.cephadm_binary_path]
- out_ls, err_ls, code_ls = self.mgr.ssh.execute_command(host, ls_cmd, addr=addr)
+ out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr)
if code_ls == 2:
- self._deploy_cephadm_binary(host, addr)
- out, err, code = self.mgr.ssh.execute_command(
+ await self._deploy_cephadm_binary(host, addr)
+ out, err, code = await self.mgr.ssh._execute_command(
host, cmd, stdin=stdin, addr=addr)
except Exception as e:
- self.mgr.ssh._reset_con(host)
+ await self.mgr.ssh._reset_con(host)
if error_ok:
return [], [str(e)], 1
raise
elif self.mgr.mode == 'cephadm-package':
try:
cmd = ['/usr/bin/cephadm'] + final_args
- out, err, code = self.mgr.ssh.execute_command(
+ out, err, code = await self.mgr.ssh._execute_command(
host, cmd, stdin=stdin, addr=addr)
except Exception as e:
- self.mgr.ssh._reset_con(host)
+ await self.mgr.ssh._reset_con(host)
if error_ok:
return [], [str(e)], 1
raise
f'cephadm exited with an error code: {code}, stderr: {err}')
return [out], [err], code
- def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
+ async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
# pick a random host...
host = None
for host_name in self.mgr.inventory.keys():
if not host:
raise OrchestratorError('no hosts defined')
if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
- self._registry_login(host, self.mgr.registry_url,
- self.mgr.registry_username, self.mgr.registry_password)
+ await self._registry_login(host, self.mgr.registry_url,
+ self.mgr.registry_username, self.mgr.registry_password)
pullargs: List[str] = []
if self.mgr.registry_insecure:
pullargs.append("--insecure")
- j = self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True)
+ j = await self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True)
r = ContainerInspectInfo(
j['image_id'],
return r
# function responsible for logging single host into custom registry
- def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
+ async def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
# want to pass info over stdin rather than through normal list of args
args_str = json.dumps({
'username': username,
'password': password,
})
- out, err, code = self._run_cephadm(
+ out, err, code = await self._run_cephadm(
host, 'mon', 'registry-login',
['--registry-json', '-'], stdin=args_str, error_ok=True)
if code:
return f"Host {host} failed to login to {url} as {username} with given password"
return None
- def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
+ async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
# Use tee (from coreutils) to create a copy of cephadm on the target machine
self.log.info(f"Deploying cephadm binary to {host}")
- self.mgr.ssh.write_remote_file(host, self.mgr.cephadm_binary_path,
- self.mgr._cephadm.encode('utf-8'), addr=addr)
+ await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path,
+ self.mgr._cephadm.encode('utf-8'), addr=addr)
from datetime import datetime
import orchestrator
from cephadm.serve import CephadmServe
-from cephadm.utils import forall_hosts
from ceph.utils import datetime_now
from orchestrator import OrchestratorError, DaemonDescription
from mgr_module import MonCommandFailed
logger.info(
f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}")
- @forall_hosts
- def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
+ async def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
# skip this host if there has been no change in inventory
if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
))
start_ts = datetime_now()
env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
- ret_msg = self.create_single_host(
+ ret_msg = await self.create_single_host(
drive_group, host, cmd,
replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars
)
self.mgr.cache.save_host(host)
return ret_msg
- ret = create_from_spec_one(self.prepare_drivegroup(drive_group))
+ async def all_hosts() -> List[Optional[str]]:
+ return [await create_from_spec_one(h, ds) for h, ds in self.prepare_drivegroup(drive_group)]
+
+ ret = self.mgr.wait_async(all_hosts())
return ", ".join(filter(None, ret))
- def create_single_host(self,
- drive_group: DriveGroupSpec,
- host: str, cmd: str, replace_osd_ids: List[str],
- env_vars: Optional[List[str]] = None) -> str:
- out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
+ async def create_single_host(self,
+ drive_group: DriveGroupSpec,
+ host: str, cmd: str, replace_osd_ids: List[str],
+ env_vars: Optional[List[str]] = None) -> str:
+ out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
if code == 1 and ', it is already prepared' in '\n'.join(err):
# HACK: when we create against an existing LV, ceph-volume
raise RuntimeError(
'cephadm exited with an error code: %d, stderr:%s' % (
code, '\n'.join(err)))
- return self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
- replace_osd_ids)
+ return await self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
+ replace_osd_ids)
- def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str,
- replace_osd_ids: Optional[List[str]] = None) -> str:
+ async def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str,
+ replace_osd_ids: Optional[List[str]] = None) -> str:
if replace_osd_ids is None:
replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host)
assert replace_osd_ids is not None
# check result: lvm
- osds_elems: dict = CephadmServe(self.mgr)._run_cephadm_json(
+ osds_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
host, 'osd', 'ceph-volume',
[
'--',
daemon_type='osd',
)
daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
- CephadmServe(self.mgr)._create_daemon(
+ await CephadmServe(self.mgr)._create_daemon(
daemon_spec,
osd_uuid_map=osd_uuid_map)
# check result: raw
- raw_elems: dict = CephadmServe(self.mgr)._run_cephadm_json(
+ raw_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
host, 'osd', 'ceph-volume',
[
'--',
daemon_type='osd',
)
daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
- CephadmServe(self.mgr)._create_daemon(
+ await CephadmServe(self.mgr)._create_daemon(
daemon_spec,
osd_uuid_map=osd_uuid_map)
continue
# get preview data from ceph-volume
- out, err, code = self._run_ceph_volume_command(host, cmd)
+ out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
if out:
try:
concat_out: Dict[str, Any] = json.loads(' '.join(out))
matching_specs.append(spec)
return matching_specs
- def _run_ceph_volume_command(self, host: str,
- cmd: str, env_vars: Optional[List[str]] = None
- ) -> Tuple[List[str], List[str], int]:
+ async def _run_ceph_volume_command(self, host: str,
+ cmd: str, env_vars: Optional[List[str]] = None
+ ) -> Tuple[List[str], List[str], int]:
self.mgr.inventory.assert_host(host)
# get bootstrap key
split_cmd = cmd.split(' ')
_cmd = ['--config-json', '-', '--']
_cmd.extend(split_cmd)
- out, err, code = CephadmServe(self.mgr)._run_cephadm(
+ out, err, code = await CephadmServe(self.mgr)._run_cephadm(
host, 'osd', 'ceph-volume',
_cmd,
env_vars=env_vars,
def zap_osd(self, osd: "OSD") -> str:
"Zaps all devices that are associated with an OSD"
if osd.hostname is not None:
- out, err, code = CephadmServe(self.mgr)._run_cephadm(
+ out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
osd.hostname, 'osd', 'ceph-volume',
['--', 'lvm', 'zap', '--destroy', '--osd-id', str(osd.osd_id)],
- error_ok=True)
+ error_ok=True))
self.mgr.cache.invalidate_host_devices(osd.hostname)
if code:
raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
if ret != 0:
self.mgr.log.debug(f"ran {cmd_args} with mon_command")
if not error_ok:
- self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
+ self.mgr.log.error(
+ f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
return False
self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
return True
def _run_cephadm(ret):
- def foo(s, host, entity, cmd, e, **kwargs):
+ async def foo(s, host, entity, cmd, e, **kwargs):
if cmd == 'gather-facts':
return '{}', '', 0
return [ret], '', 0
}
if ops:
for op in ops:
- out = CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, [])
+ out = m.wait_async(CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, []))
to_update[op](host, out)
m.cache.last_daemon_update[host] = datetime_now()
m.cache.last_facts_update[host] = datetime_now()
assert not retval.result_str()
assert cephadm_module.inventory._inventory[hostname]['status'] == 'maintenance'
- @mock.patch("cephadm.ssh.SSHManager.remote_connection")
- @mock.patch("cephadm.ssh.SSHManager.execute_command")
- @mock.patch("cephadm.ssh.SSHManager.check_execute_command")
- @mock.patch("cephadm.ssh.SSHManager.write_remote_file")
+ @mock.patch("cephadm.ssh.SSHManager._remote_connection")
+ @mock.patch("cephadm.ssh.SSHManager._execute_command")
+ @mock.patch("cephadm.ssh.SSHManager._check_execute_command")
+ @mock.patch("cephadm.ssh.SSHManager._write_remote_file")
def test_etc_ceph(self, _write_file, check_execute_command, execute_command, remote_connection, cephadm_module):
_write_file.return_value = None
check_execute_command.return_value = ''
CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
_write_file.assert_called_with('test', '/etc/ceph/ceph.conf', b'',
- 0o644, 0, 0)
+ 0o644, 0, 0, None)
assert '/etc/ceph/ceph.conf' in cephadm_module.cache.get_host_client_files('test')
cephadm_module._set_extra_ceph_conf('[mon]\nk=v')
CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
_write_file.assert_called_with('test', '/etc/ceph/ceph.conf',
- b'\n\n[mon]\nk=v\n', 0o644, 0, 0)
+ b'\n\n[mon]\nk=v\n', 0o644, 0, 0, None)
# reload
cephadm_module.cache.last_client_files = {}
@pytest.mark.skipif(ConnectionLost is None, reason='no asyncssh')
class TestWithSSH:
- @mock.patch("cephadm.ssh.SSHManager.execute_command")
- @mock.patch("cephadm.ssh.SSHManager.check_execute_command")
+ @mock.patch("cephadm.ssh.SSHManager._execute_command")
+ @mock.patch("cephadm.ssh.SSHManager._check_execute_command")
def test_offline(self, check_execute_command, execute_command, cephadm_module):
check_execute_command.return_value = ''
execute_command.return_value = '', '', 0
self.error: Optional[str] = error
self.paused: bool = paused or False
self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds
- self.fs_original_allow_standby_replay: Optional[Dict[str, bool]] = fs_original_allow_standby_replay
+ self.fs_original_allow_standby_replay: Optional[Dict[str,
+ bool]] = fs_original_allow_standby_replay
def to_json(self) -> dict:
return {
continue
if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1):
- self.mgr.log.info('Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name))
+ self.mgr.log.info(
+ 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name))
time.sleep(10)
continue_upgrade = False
continue
if len(mdsmap['up']) == 0:
- self.mgr.log.warning("Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction")
+ self.mgr.log.warning(
+ "Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction")
# This can happen because the current version MDS have
# incompatible compatsets; the mons will not do any promotions.
# We must upgrade to continue.
logger.info('Upgrade: First pull of %s' % target_image)
self.upgrade_info_str: str = 'Doing first pull of %s image' % (target_image)
try:
- target_id, target_version, target_digests = CephadmServe(self.mgr)._get_container_image_info(
- target_image)
+ target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info(
+ target_image))
except OrchestratorError as e:
self._fail_upgrade('UPGRADE_FAILED_PULL', {
'severity': 'warning',
self._update_upgrade_progress(done / len(daemons))
# make sure host has latest container image
- out, errs, code = CephadmServe(self.mgr)._run_cephadm(
+ out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
d.hostname, '', 'inspect-image', [],
- image=target_image, no_fsid=True, error_ok=True)
+ image=target_image, no_fsid=True, error_ok=True))
if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
logger.info('Upgrade: Pulling %s on %s' % (target_image,
d.hostname))
self.upgrade_info_str = 'Pulling %s image on host %s' % (
target_image, d.hostname)
- out, errs, code = CephadmServe(self.mgr)._run_cephadm(
+ out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
d.hostname, '', 'pull', [],
- image=target_image, no_fsid=True, error_ok=True)
+ image=target_image, no_fsid=True, error_ok=True))
if code:
self._fail_upgrade('UPGRADE_FAILED_PULL', {
'severity': 'warning',