From b97894f4cd6fd70880fff405f1371be32ae0218f Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Wed, 10 Nov 2021 18:01:37 +0100 Subject: [PATCH] mgr/cephadm: make OSDService.create_from_spec_one async And gather all results. Plus a lot of mechanical adjustments Signed-off-by: Sebastian Wagner --- src/pybind/mgr/cephadm/module.py | 59 ++++----- src/pybind/mgr/cephadm/serve.py | 127 ++++++++++--------- src/pybind/mgr/cephadm/services/osd.py | 54 ++++---- src/pybind/mgr/cephadm/tests/fixtures.py | 4 +- src/pybind/mgr/cephadm/tests/test_cephadm.py | 12 +- src/pybind/mgr/cephadm/tests/test_ssh.py | 4 +- src/pybind/mgr/cephadm/upgrade.py | 21 +-- 7 files changed, 145 insertions(+), 136 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index ba28fa6e39b..db7568a3e98 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -11,7 +11,7 @@ from threading import Event import string from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \ - Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, Coroutine, Awaitable + Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, Awaitable import datetime import os @@ -966,7 +966,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, break if not host: raise OrchestratorError('no hosts defined') - r = CephadmServe(self)._registry_login(host, url, username, password) + r = self.wait_async(CephadmServe(self)._registry_login(host, url, username, password)) if r is not None: return 1, '', r # if logins succeeded, store info @@ -982,10 +982,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: """Check whether we can access and manage a remote host""" try: - out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host', - ['--expect-hostname', host], - addr=addr, - error_ok=True, no_fsid=True) + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host', + ['--expect-hostname', host], + addr=addr, + error_ok=True, no_fsid=True)) if code: return 1, '', ('check-host failed:\n' + '\n'.join(err)) except OrchestratorError: @@ -1004,10 +1004,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, 'cephadm prepare-host') def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: """Prepare a remote host for use with cephadm""" - out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host', - ['--expect-hostname', host], - addr=addr, - error_ok=True, no_fsid=True) + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host', + ['--expect-hostname', host], + addr=addr, + error_ok=True, no_fsid=True)) if code: return 1, '', ('prepare-host failed:\n' + '\n'.join(err)) # if we have an outstanding health alert for this host, give the @@ -1179,7 +1179,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, @forall_hosts def run(h: str) -> str: - return self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd') + return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd')) return HandleCommandResult(stdout='\n'.join(run(host))) @@ -1314,11 +1314,11 @@ Then run the following: > ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}''' raise OrchestratorError(msg) - out, err, code = CephadmServe(self)._run_cephadm( + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( host, cephadmNoImage, 'check-host', ['--expect-hostname', host], addr=addr, - error_ok=True, no_fsid=True) + error_ok=True, no_fsid=True)) if code: msg = 'check-host failed:\n' + '\n'.join(err) # err will contain stdout and stderr, so we filter on the message text to @@ -1585,9 +1585,9 @@ Then run the following: msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc) # call the host-maintenance function - _out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance", - ["enter"], - error_ok=True) + _out, _err, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance", + ["enter"], + error_ok=True)) returned_msg = _err[0].split('\n')[-1] if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'): raise OrchestratorError( @@ -1636,9 +1636,9 @@ Then run the following: if tgt_host['status'] != "maintenance": raise OrchestratorError(f"Host {hostname} is not in maintenance mode") - outs, errs, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance', - ['exit'], - error_ok=True) + outs, errs, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance', + ['exit'], + error_ok=True)) returned_msg = errs[0].split('\n')[-1] if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'): raise OrchestratorError( @@ -1824,7 +1824,7 @@ Then run the following: if daemon_spec.daemon_type != 'osd': daemon_spec = self.cephadm_services[daemon_type_to_service( daemon_spec.daemon_type)].prepare_create(daemon_spec) - return CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')) + return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig'))) actions = { 'start': ['reset-failed', 'start'], @@ -1834,9 +1834,9 @@ Then run the following: name = daemon_spec.name() for a in actions[action]: try: - out, err, code = CephadmServe(self)._run_cephadm( + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( daemon_spec.host, name, 'unit', - ['--name', name, a]) + ['--name', name, a])) except Exception: self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed') self.cache.invalidate_host_daemons(daemon_spec.host) @@ -1935,7 +1935,8 @@ Then run the following: msg = '' for h, ls in osds_msg.items(): msg += f'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}' - raise OrchestratorError(f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}') + raise OrchestratorError( + f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}') found = self.spec_store.rm(service_name) if found and service_name.startswith('osd.'): @@ -2039,10 +2040,10 @@ Then run the following: f"OSD{'s' if len(active_osds) > 1 else ''}" f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.") - out, err, code = CephadmServe(self)._run_cephadm( + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( host, 'osd', 'ceph-volume', ['--', 'lvm', 'zap', '--destroy', path], - error_ok=True) + error_ok=True)) self.cache.invalidate_host_devices(host) self.cache.invalidate_host_networks(host) @@ -2079,9 +2080,9 @@ Then run the following: host=host) cmd_args = shlex.split(cmd_line) - out, err, code = CephadmServe(self)._run_cephadm( + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( host, 'osd', 'shell', ['--'] + cmd_args, - error_ok=True) + error_ok=True)) if code: raise OrchestratorError( 'Unable to affect %s light for %s:%s. Command: %s' % ( @@ -2301,7 +2302,7 @@ Then run the following: @ forall_hosts def create_func_map(*args: Any) -> str: daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args) - return CephadmServe(self)._create_daemon(daemon_spec) + return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec)) return create_func_map(args) @@ -2527,7 +2528,7 @@ Then run the following: else: raise OrchestratorError('must specify either image or version') - image_info = CephadmServe(self)._get_container_image_info(target_name) + image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name)) ceph_image_version = image_info.ceph_version if not ceph_image_version: diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index 80de87d404c..f4d0b4384e6 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -298,8 +298,8 @@ class CephadmServe: if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url: self.log.debug(f"Logging `{host}` into custom registry") - r = self._registry_login(host, self.mgr.registry_url, - self.mgr.registry_username, self.mgr.registry_password) + r = self.mgr.wait_async(self._registry_login(host, self.mgr.registry_url, + self.mgr.registry_username, self.mgr.registry_password)) if r: bad_hosts.append(r) @@ -364,9 +364,9 @@ class CephadmServe: self.log.debug(' checking %s' % host) try: addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host - out, err, code = self._run_cephadm( + out, err, code = self.mgr.wait_async(self._run_cephadm( host, cephadmNoImage, 'check-host', [], - error_ok=True, no_fsid=True) + error_ok=True, no_fsid=True)) self.mgr.cache.update_last_host_check(host) self.mgr.cache.save_host(host) if code: @@ -382,7 +382,7 @@ class CephadmServe: def _refresh_host_daemons(self, host: str) -> Optional[str]: try: - ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True) + ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True)) except OrchestratorError as e: return str(e) self.mgr._process_ls_output(host, ls) @@ -390,7 +390,8 @@ class CephadmServe: def _refresh_facts(self, host: str) -> Optional[str]: try: - val = self._run_cephadm_json(host, cephadmNoImage, 'gather-facts', [], no_fsid=True) + val = self.mgr.wait_async(self._run_cephadm_json( + host, cephadmNoImage, 'gather-facts', [], no_fsid=True)) except OrchestratorError as e: return str(e) @@ -408,14 +409,14 @@ class CephadmServe: try: try: - devices = self._run_cephadm_json(host, 'osd', 'ceph-volume', - inventory_args) + devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume', + inventory_args)) except OrchestratorError as e: if 'unrecognized arguments: --filter-for-batch' in str(e): rerun_args = inventory_args.copy() rerun_args.remove('--filter-for-batch') - devices = self._run_cephadm_json(host, 'osd', 'ceph-volume', - rerun_args) + devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume', + rerun_args)) else: raise @@ -432,7 +433,8 @@ class CephadmServe: def _refresh_host_networks(self, host: str) -> Optional[str]: try: - networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True) + networks = self.mgr.wait_async(self._run_cephadm_json( + host, 'mon', 'list-networks', [], no_fsid=True)) except OrchestratorError as e: return str(e) @@ -833,7 +835,7 @@ class CephadmServe: try: daemon_spec = svc.prepare_create(daemon_spec) - self._create_daemon(daemon_spec) + self.mgr.wait_async(self._create_daemon(daemon_spec)) r = True progress_done += 1 update_progress() @@ -1017,7 +1019,8 @@ class CephadmServe: digests: Dict[str, ContainerInspectInfo] = {} for container_image_ref in set(settings.values()): if not is_repo_digest(container_image_ref): - image_info = self._get_container_image_info(container_image_ref) + image_info = self.mgr.wait_async( + self._get_container_image_info(container_image_ref)) if image_info.repo_digests: # FIXME: we assume the first digest here is the best assert is_repo_digest(image_info.repo_digests[0]), image_info @@ -1030,11 +1033,11 @@ class CephadmServe: # FIXME: we assume the first digest here is the best self.mgr.set_container_image(entity, image_info.repo_digests[0]) - def _create_daemon(self, - daemon_spec: CephadmDaemonDeploySpec, - reconfig: bool = False, - osd_uuid_map: Optional[Dict[str, Any]] = None, - ) -> str: + async def _create_daemon(self, + daemon_spec: CephadmDaemonDeploySpec, + reconfig: bool = False, + osd_uuid_map: Optional[Dict[str, Any]] = None, + ) -> str: with set_exception_subject('service', orchestrator.DaemonDescription( daemon_type=daemon_spec.daemon_type, @@ -1075,14 +1078,14 @@ class CephadmServe: daemon_spec.extra_args.append('--allow-ptrace') if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url: - self._registry_login(daemon_spec.host, self.mgr.registry_url, - self.mgr.registry_username, self.mgr.registry_password) + await self._registry_login(daemon_spec.host, self.mgr.registry_url, + self.mgr.registry_username, self.mgr.registry_password) self.log.info('%s daemon %s on %s' % ( 'Reconfiguring' if reconfig else 'Deploying', daemon_spec.name(), daemon_spec.host)) - out, err, code = self._run_cephadm( + out, err, code = await self._run_cephadm( daemon_spec.host, daemon_spec.name(), 'deploy', [ '--name', daemon_spec.name(), @@ -1154,8 +1157,8 @@ class CephadmServe: # we can delete a mon instances data. args = ['--name', name, '--force'] self.log.info('Removing daemon %s from %s' % (name, host)) - out, err, code = self._run_cephadm( - host, name, 'rm-daemon', args) + out, err, code = self.mgr.wait_async(self._run_cephadm( + host, name, 'rm-daemon', args)) if not code: # remove item from cache self.mgr.cache.rm_daemon(host, name) @@ -1167,16 +1170,16 @@ class CephadmServe: return "Removed {} from host '{}'".format(name, host) - def _run_cephadm_json(self, - host: str, - entity: Union[CephadmNoImage, str], - command: str, - args: List[str], - no_fsid: Optional[bool] = False, - image: Optional[str] = "", - ) -> Any: + async def _run_cephadm_json(self, + host: str, + entity: Union[CephadmNoImage, str], + command: str, + args: List[str], + no_fsid: Optional[bool] = False, + image: Optional[str] = "", + ) -> Any: try: - out, err, code = self._run_cephadm( + out, err, code = await self._run_cephadm( host, entity, command, args, no_fsid=no_fsid, image=image) if code: raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}') @@ -1189,18 +1192,18 @@ class CephadmServe: self.log.exception(f'{msg}: {"".join(out)}') raise OrchestratorError(msg) - def _run_cephadm(self, - host: str, - entity: Union[CephadmNoImage, str], - command: str, - args: List[str], - addr: Optional[str] = "", - stdin: Optional[str] = "", - no_fsid: Optional[bool] = False, - error_ok: Optional[bool] = False, - image: Optional[str] = "", - env_vars: Optional[List[str]] = None, - ) -> Tuple[List[str], List[str], int]: + async def _run_cephadm(self, + host: str, + entity: Union[CephadmNoImage, str], + command: str, + args: List[str], + addr: Optional[str] = "", + stdin: Optional[str] = "", + no_fsid: Optional[bool] = False, + error_ok: Optional[bool] = False, + image: Optional[str] = "", + env_vars: Optional[List[str]] = None, + ) -> Tuple[List[str], List[str], int]: """ Run cephadm on the remote host with the given command + args @@ -1209,7 +1212,7 @@ class CephadmServe: :env_vars: in format -> [KEY=VALUE, ..] """ - self.mgr.ssh.remote_connection(host, addr) + await self.mgr.ssh._remote_connection(host, addr) self.log.debug(f"_run_cephadm : command = {command}") self.log.debug(f"_run_cephadm : args = {args}") @@ -1253,22 +1256,22 @@ class CephadmServe: self.log.debug('stdin: %s' % stdin) cmd = ['which', 'python3'] - python = self.mgr.ssh.check_execute_command(host, cmd, addr=addr) + python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr) cmd = [python, self.mgr.cephadm_binary_path] + final_args try: - out, err, code = self.mgr.ssh.execute_command( + out, err, code = await self.mgr.ssh._execute_command( host, cmd, stdin=stdin, addr=addr) if code == 2: ls_cmd = ['ls', self.mgr.cephadm_binary_path] - out_ls, err_ls, code_ls = self.mgr.ssh.execute_command(host, ls_cmd, addr=addr) + out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr) if code_ls == 2: - self._deploy_cephadm_binary(host, addr) - out, err, code = self.mgr.ssh.execute_command( + await self._deploy_cephadm_binary(host, addr) + out, err, code = await self.mgr.ssh._execute_command( host, cmd, stdin=stdin, addr=addr) except Exception as e: - self.mgr.ssh._reset_con(host) + await self.mgr.ssh._reset_con(host) if error_ok: return [], [str(e)], 1 raise @@ -1276,10 +1279,10 @@ class CephadmServe: elif self.mgr.mode == 'cephadm-package': try: cmd = ['/usr/bin/cephadm'] + final_args - out, err, code = self.mgr.ssh.execute_command( + out, err, code = await self.mgr.ssh._execute_command( host, cmd, stdin=stdin, addr=addr) except Exception as e: - self.mgr.ssh._reset_con(host) + await self.mgr.ssh._reset_con(host) if error_ok: return [], [str(e)], 1 raise @@ -1296,7 +1299,7 @@ class CephadmServe: f'cephadm exited with an error code: {code}, stderr: {err}') return [out], [err], code - def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo: + async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo: # pick a random host... host = None for host_name in self.mgr.inventory.keys(): @@ -1305,14 +1308,14 @@ class CephadmServe: if not host: raise OrchestratorError('no hosts defined') if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url: - self._registry_login(host, self.mgr.registry_url, - self.mgr.registry_username, self.mgr.registry_password) + await self._registry_login(host, self.mgr.registry_url, + self.mgr.registry_username, self.mgr.registry_password) pullargs: List[str] = [] if self.mgr.registry_insecure: pullargs.append("--insecure") - j = self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True) + j = await self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True) r = ContainerInspectInfo( j['image_id'], @@ -1323,7 +1326,7 @@ class CephadmServe: return r # function responsible for logging single host into custom registry - def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]: + async def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]: self.log.debug(f"Attempting to log host {host} into custom registry @ {url}") # want to pass info over stdin rather than through normal list of args args_str = json.dumps({ @@ -1331,15 +1334,15 @@ class CephadmServe: 'username': username, 'password': password, }) - out, err, code = self._run_cephadm( + out, err, code = await self._run_cephadm( host, 'mon', 'registry-login', ['--registry-json', '-'], stdin=args_str, error_ok=True) if code: return f"Host {host} failed to login to {url} as {username} with given password" return None - def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None: + async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None: # Use tee (from coreutils) to create a copy of cephadm on the target machine self.log.info(f"Deploying cephadm binary to {host}") - self.mgr.ssh.write_remote_file(host, self.mgr.cephadm_binary_path, - self.mgr._cephadm.encode('utf-8'), addr=addr) + await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path, + self.mgr._cephadm.encode('utf-8'), addr=addr) diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py index 347256f51d5..50011b6b2d4 100644 --- a/src/pybind/mgr/cephadm/services/osd.py +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -12,7 +12,6 @@ from ceph.utils import datetime_to_str, str_to_datetime from datetime import datetime import orchestrator from cephadm.serve import CephadmServe -from cephadm.utils import forall_hosts from ceph.utils import datetime_now from orchestrator import OrchestratorError, DaemonDescription from mgr_module import MonCommandFailed @@ -35,8 +34,7 @@ class OSDService(CephService): logger.info( f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}") - @forall_hosts - def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]: + async def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]: # skip this host if there has been no change in inventory if not self.mgr.cache.osdspec_needs_apply(host, drive_group): self.mgr.log.debug("skipping apply of %s on %s (no change)" % ( @@ -60,7 +58,7 @@ class OSDService(CephService): )) start_ts = datetime_now() env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"] - ret_msg = self.create_single_host( + ret_msg = await self.create_single_host( drive_group, host, cmd, replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars ) @@ -70,14 +68,17 @@ class OSDService(CephService): self.mgr.cache.save_host(host) return ret_msg - ret = create_from_spec_one(self.prepare_drivegroup(drive_group)) + async def all_hosts() -> List[Optional[str]]: + return [await create_from_spec_one(h, ds) for h, ds in self.prepare_drivegroup(drive_group)] + + ret = self.mgr.wait_async(all_hosts()) return ", ".join(filter(None, ret)) - def create_single_host(self, - drive_group: DriveGroupSpec, - host: str, cmd: str, replace_osd_ids: List[str], - env_vars: Optional[List[str]] = None) -> str: - out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars) + async def create_single_host(self, + drive_group: DriveGroupSpec, + host: str, cmd: str, replace_osd_ids: List[str], + env_vars: Optional[List[str]] = None) -> str: + out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars) if code == 1 and ', it is already prepared' in '\n'.join(err): # HACK: when we create against an existing LV, ceph-volume @@ -89,18 +90,18 @@ class OSDService(CephService): raise RuntimeError( 'cephadm exited with an error code: %d, stderr:%s' % ( code, '\n'.join(err))) - return self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(), - replace_osd_ids) + return await self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(), + replace_osd_ids) - def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str, - replace_osd_ids: Optional[List[str]] = None) -> str: + async def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str, + replace_osd_ids: Optional[List[str]] = None) -> str: if replace_osd_ids is None: replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host) assert replace_osd_ids is not None # check result: lvm - osds_elems: dict = CephadmServe(self.mgr)._run_cephadm_json( + osds_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json( host, 'osd', 'ceph-volume', [ '--', @@ -139,12 +140,12 @@ class OSDService(CephService): daemon_type='osd', ) daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) - CephadmServe(self.mgr)._create_daemon( + await CephadmServe(self.mgr)._create_daemon( daemon_spec, osd_uuid_map=osd_uuid_map) # check result: raw - raw_elems: dict = CephadmServe(self.mgr)._run_cephadm_json( + raw_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json( host, 'osd', 'ceph-volume', [ '--', @@ -176,7 +177,7 @@ class OSDService(CephService): daemon_type='osd', ) daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) - CephadmServe(self.mgr)._create_daemon( + await CephadmServe(self.mgr)._create_daemon( daemon_spec, osd_uuid_map=osd_uuid_map) @@ -279,7 +280,7 @@ class OSDService(CephService): continue # get preview data from ceph-volume - out, err, code = self._run_ceph_volume_command(host, cmd) + out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd)) if out: try: concat_out: Dict[str, Any] = json.loads(' '.join(out)) @@ -322,9 +323,9 @@ class OSDService(CephService): matching_specs.append(spec) return matching_specs - def _run_ceph_volume_command(self, host: str, - cmd: str, env_vars: Optional[List[str]] = None - ) -> Tuple[List[str], List[str], int]: + async def _run_ceph_volume_command(self, host: str, + cmd: str, env_vars: Optional[List[str]] = None + ) -> Tuple[List[str], List[str], int]: self.mgr.inventory.assert_host(host) # get bootstrap key @@ -341,7 +342,7 @@ class OSDService(CephService): split_cmd = cmd.split(' ') _cmd = ['--config-json', '-', '--'] _cmd.extend(split_cmd) - out, err, code = CephadmServe(self.mgr)._run_cephadm( + out, err, code = await CephadmServe(self.mgr)._run_cephadm( host, 'osd', 'ceph-volume', _cmd, env_vars=env_vars, @@ -516,10 +517,10 @@ class RemoveUtil(object): def zap_osd(self, osd: "OSD") -> str: "Zaps all devices that are associated with an OSD" if osd.hostname is not None: - out, err, code = CephadmServe(self.mgr)._run_cephadm( + out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( osd.hostname, 'osd', 'ceph-volume', ['--', 'lvm', 'zap', '--destroy', '--osd-id', str(osd.osd_id)], - error_ok=True) + error_ok=True)) self.mgr.cache.invalidate_host_devices(osd.hostname) if code: raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) @@ -556,7 +557,8 @@ class RemoveUtil(object): if ret != 0: self.mgr.log.debug(f"ran {cmd_args} with mon_command") if not error_ok: - self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") + self.mgr.log.error( + f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") return False self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") return True diff --git a/src/pybind/mgr/cephadm/tests/fixtures.py b/src/pybind/mgr/cephadm/tests/fixtures.py index 8254f142d5b..57c5780347f 100644 --- a/src/pybind/mgr/cephadm/tests/fixtures.py +++ b/src/pybind/mgr/cephadm/tests/fixtures.py @@ -23,7 +23,7 @@ def get_ceph_option(_, key): def _run_cephadm(ret): - def foo(s, host, entity, cmd, e, **kwargs): + async def foo(s, host, entity, cmd, e, **kwargs): if cmd == 'gather-facts': return '{}', '', 0 return [ret], '', 0 @@ -58,7 +58,7 @@ def receive_agent_metadata(m: CephadmOrchestrator, host: str, ops: List[str] = N } if ops: for op in ops: - out = CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, []) + out = m.wait_async(CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, [])) to_update[op](host, out) m.cache.last_daemon_update[host] = datetime_now() m.cache.last_facts_update[host] = datetime_now() diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 0e6a46e737d..e03ecddc9a8 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -1424,10 +1424,10 @@ class TestCephadm(object): assert not retval.result_str() assert cephadm_module.inventory._inventory[hostname]['status'] == 'maintenance' - @mock.patch("cephadm.ssh.SSHManager.remote_connection") - @mock.patch("cephadm.ssh.SSHManager.execute_command") - @mock.patch("cephadm.ssh.SSHManager.check_execute_command") - @mock.patch("cephadm.ssh.SSHManager.write_remote_file") + @mock.patch("cephadm.ssh.SSHManager._remote_connection") + @mock.patch("cephadm.ssh.SSHManager._execute_command") + @mock.patch("cephadm.ssh.SSHManager._check_execute_command") + @mock.patch("cephadm.ssh.SSHManager._write_remote_file") def test_etc_ceph(self, _write_file, check_execute_command, execute_command, remote_connection, cephadm_module): _write_file.return_value = None check_execute_command.return_value = '' @@ -1446,7 +1446,7 @@ class TestCephadm(object): CephadmServe(cephadm_module)._refresh_hosts_and_daemons() _write_file.assert_called_with('test', '/etc/ceph/ceph.conf', b'', - 0o644, 0, 0) + 0o644, 0, 0, None) assert '/etc/ceph/ceph.conf' in cephadm_module.cache.get_host_client_files('test') @@ -1454,7 +1454,7 @@ class TestCephadm(object): cephadm_module._set_extra_ceph_conf('[mon]\nk=v') CephadmServe(cephadm_module)._refresh_hosts_and_daemons() _write_file.assert_called_with('test', '/etc/ceph/ceph.conf', - b'\n\n[mon]\nk=v\n', 0o644, 0, 0) + b'\n\n[mon]\nk=v\n', 0o644, 0, 0, None) # reload cephadm_module.cache.last_client_files = {} diff --git a/src/pybind/mgr/cephadm/tests/test_ssh.py b/src/pybind/mgr/cephadm/tests/test_ssh.py index 83690b207df..5c10c04b094 100644 --- a/src/pybind/mgr/cephadm/tests/test_ssh.py +++ b/src/pybind/mgr/cephadm/tests/test_ssh.py @@ -22,8 +22,8 @@ from cephadm.tests.fixtures import with_host, wait @pytest.mark.skipif(ConnectionLost is None, reason='no asyncssh') class TestWithSSH: - @mock.patch("cephadm.ssh.SSHManager.execute_command") - @mock.patch("cephadm.ssh.SSHManager.check_execute_command") + @mock.patch("cephadm.ssh.SSHManager._execute_command") + @mock.patch("cephadm.ssh.SSHManager._check_execute_command") def test_offline(self, check_execute_command, execute_command, cephadm_module): check_execute_command.return_value = '' execute_command.return_value = '', '', 0 diff --git a/src/pybind/mgr/cephadm/upgrade.py b/src/pybind/mgr/cephadm/upgrade.py index 0236dbe1c14..487d9c84d67 100644 --- a/src/pybind/mgr/cephadm/upgrade.py +++ b/src/pybind/mgr/cephadm/upgrade.py @@ -55,7 +55,8 @@ class UpgradeState: self.error: Optional[str] = error self.paused: bool = paused or False self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds - self.fs_original_allow_standby_replay: Optional[Dict[str, bool]] = fs_original_allow_standby_replay + self.fs_original_allow_standby_replay: Optional[Dict[str, + bool]] = fs_original_allow_standby_replay def to_json(self) -> dict: return { @@ -442,13 +443,15 @@ class CephadmUpgrade: continue if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1): - self.mgr.log.info('Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name)) + self.mgr.log.info( + 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name)) time.sleep(10) continue_upgrade = False continue if len(mdsmap['up']) == 0: - self.mgr.log.warning("Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction") + self.mgr.log.warning( + "Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction") # This can happen because the current version MDS have # incompatible compatsets; the mons will not do any promotions. # We must upgrade to continue. @@ -531,8 +534,8 @@ class CephadmUpgrade: logger.info('Upgrade: First pull of %s' % target_image) self.upgrade_info_str: str = 'Doing first pull of %s image' % (target_image) try: - target_id, target_version, target_digests = CephadmServe(self.mgr)._get_container_image_info( - target_image) + target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info( + target_image)) except OrchestratorError as e: self._fail_upgrade('UPGRADE_FAILED_PULL', { 'severity': 'warning', @@ -703,17 +706,17 @@ class CephadmUpgrade: self._update_upgrade_progress(done / len(daemons)) # make sure host has latest container image - out, errs, code = CephadmServe(self.mgr)._run_cephadm( + out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( d.hostname, '', 'inspect-image', [], - image=target_image, no_fsid=True, error_ok=True) + image=target_image, no_fsid=True, error_ok=True)) if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])): logger.info('Upgrade: Pulling %s on %s' % (target_image, d.hostname)) self.upgrade_info_str = 'Pulling %s image on host %s' % ( target_image, d.hostname) - out, errs, code = CephadmServe(self.mgr)._run_cephadm( + out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( d.hostname, '', 'pull', [], - image=target_image, no_fsid=True, error_ok=True) + image=target_image, no_fsid=True, error_ok=True)) if code: self._fail_upgrade('UPGRADE_FAILED_PULL', { 'severity': 'warning', -- 2.39.5