From 2982e54deed0464a52d2bd55d674e7cdd280b9e1 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Wed, 6 Jan 2021 13:07:51 +0100 Subject: [PATCH] mgr/cephadm: move _run_cephadm to serve.py Signed-off-by: Sebastian Wagner --- src/pybind/mgr/cephadm/module.py | 146 +++--------------- src/pybind/mgr/cephadm/serve.py | 120 ++++++++++++-- src/pybind/mgr/cephadm/services/osd.py | 4 +- src/pybind/mgr/cephadm/tests/test_cephadm.py | 58 +++---- .../mgr/cephadm/tests/test_migration.py | 8 +- src/pybind/mgr/cephadm/tests/test_upgrade.py | 8 +- src/pybind/mgr/cephadm/upgrade.py | 13 +- src/pybind/mgr/cephadm/utils.py | 5 + 8 files changed, 184 insertions(+), 178 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 37cb9fd2da483..639faebf787a2 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -12,7 +12,7 @@ from threading import Event import string from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \ - Any, Set, TYPE_CHECKING, cast, Iterator, Union, NamedTuple + Any, Set, TYPE_CHECKING, cast, Iterator, NamedTuple import datetime import os @@ -55,7 +55,7 @@ from .schedule import HostAssignment from .inventory import Inventory, SpecStore, HostCache, EventStore from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade from .template import TemplateMgr -from .utils import forall_hosts, CephadmNoImage, cephadmNoImage +from .utils import forall_hosts, cephadmNoImage try: import remoto @@ -481,7 +481,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, 'username': username, 'password': password, }) - out, err, code = self._run_cephadm( + out, err, code = CephadmServe(self)._run_cephadm( host, 'mon', 'registry-login', ['--registry-json', '-'], stdin=args_str, error_ok=True) if code: @@ -918,10 +918,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, 'Check whether we can access and manage a remote host') def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: try: - out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host', - ['--expect-hostname', host], - addr=addr, - error_ok=True, no_fsid=True) + out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host', + ['--expect-hostname', host], + addr=addr, + error_ok=True, no_fsid=True) if code: return 1, '', ('check-host failed:\n' + '\n'.join(err)) except OrchestratorError as e: @@ -942,10 +942,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, 'name=addr,type=CephString,req=false', 'Prepare a remote host for use with cephadm') def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: - out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host', - ['--expect-hostname', host], - addr=addr, - error_ok=True, no_fsid=True) + out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host', + ['--expect-hostname', host], + addr=addr, + error_ok=True, no_fsid=True) if code: return 1, '', ('prepare-host failed:\n' + '\n'.join(err)) # if we have an outstanding health alert for this host, give the @@ -1221,102 +1221,6 @@ To check that the host is reachable: return image - def _run_cephadm(self, - host: str, - entity: Union[CephadmNoImage, str], - command: str, - args: List[str], - addr: Optional[str] = "", - stdin: Optional[str] = "", - no_fsid: Optional[bool] = False, - error_ok: Optional[bool] = False, - image: Optional[str] = "", - env_vars: Optional[List[str]] = None, - ) -> Tuple[List[str], List[str], int]: - """ - Run cephadm on the remote host with the given command + args - - :env_vars: in format -> [KEY=VALUE, ..] - """ - self.log.debug(f"_run_cephadm : command = {command}") - self.log.debug(f"_run_cephadm : args = {args}") - - bypass_image = ('cephadm-exporter',) - - with self._remote_connection(host, addr) as tpl: - conn, connr = tpl - assert image or entity - # Skip the image check for daemons deployed that are not ceph containers - if not str(entity).startswith(bypass_image): - if not image and entity is not cephadmNoImage: - image = self._get_container_image(entity) - - final_args = [] - - if env_vars: - for env_var_pair in env_vars: - final_args.extend(['--env', env_var_pair]) - - if image: - final_args.extend(['--image', image]) - final_args.append(command) - - if not no_fsid: - final_args += ['--fsid', self._cluster_fsid] - - if self.container_init: - final_args += ['--container-init'] - - final_args += args - - self.log.debug('args: %s' % (' '.join(final_args))) - if self.mode == 'root': - if stdin: - self.log.debug('stdin: %s' % stdin) - script = 'injected_argv = ' + json.dumps(final_args) + '\n' - if stdin: - script += 'injected_stdin = ' + json.dumps(stdin) + '\n' - script += self._cephadm - python = connr.choose_python() - if not python: - raise RuntimeError( - 'unable to find python on %s (tried %s in %s)' % ( - host, remotes.PYTHONS, remotes.PATH)) - try: - out, err, code = remoto.process.check( - conn, - [python, '-u'], - stdin=script.encode('utf-8')) - except RuntimeError as e: - self._reset_con(host) - if error_ok: - return [], [str(e)], 1 - raise - elif self.mode == 'cephadm-package': - try: - out, err, code = remoto.process.check( - conn, - ['sudo', '/usr/bin/cephadm'] + final_args, - stdin=stdin) - except RuntimeError as e: - self._reset_con(host) - if error_ok: - return [], [str(e)], 1 - raise - else: - assert False, 'unsupported mode' - - self.log.debug('code: %d' % code) - if out: - self.log.debug('out: %s' % '\n'.join(out)) - if err: - self.log.debug('err: %s' % '\n'.join(err)) - if code and not error_ok: - raise OrchestratorError( - 'cephadm exited with an error code: %d, stderr:%s' % ( - code, '\n'.join(err))) - return out, err, code - def _hosts_with_daemon_inventory(self) -> List[HostSpec]: """ Returns all usable hosts that went through _refresh_host_daemons(). @@ -1340,10 +1244,10 @@ To check that the host is reachable: :param host: host name """ assert_valid_host(spec.hostname) - out, err, code = self._run_cephadm(spec.hostname, cephadmNoImage, 'check-host', - ['--expect-hostname', spec.hostname], - addr=spec.addr, - error_ok=True, no_fsid=True) + out, err, code = CephadmServe(self)._run_cephadm(spec.hostname, cephadmNoImage, 'check-host', + ['--expect-hostname', spec.hostname], + addr=spec.addr, + error_ok=True, no_fsid=True) if code: # err will contain stdout and stderr, so we filter on the message text to # only show the errors @@ -1490,9 +1394,9 @@ To check that the host is reachable: raise OrchestratorError(msg, errno=rc) # call the host-maintenance function - out, _err, _code = self._run_cephadm(hostname, cephadmNoImage, "host-maintenance", - ["enter"], - error_ok=True) + out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance", + ["enter"], + error_ok=True) if out: raise OrchestratorError( f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}") @@ -1541,9 +1445,9 @@ To check that the host is reachable: if tgt_host['status'] != "maintenance": raise OrchestratorError(f"Host {hostname} is not in maintenance mode") - out, _err, _code = self._run_cephadm(hostname, cephadmNoImage, 'host-maintenance', - ['exit'], - error_ok=True) + out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance', + ['exit'], + error_ok=True) if out: raise OrchestratorError( f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}") @@ -1757,7 +1661,7 @@ To check that the host is reachable: name = daemon_spec.name() for a in actions[action]: try: - out, err, code = self._run_cephadm( + out, err, code = CephadmServe(self)._run_cephadm( host, name, 'unit', ['--name', name, a]) except Exception: @@ -1872,7 +1776,7 @@ To check that the host is reachable: @trivial_completion def zap_device(self, host: str, path: str) -> str: self.log.info('Zap device %s:%s' % (host, path)) - out, err, code = self._run_cephadm( + out, err, code = CephadmServe(self)._run_cephadm( host, 'osd', 'ceph-volume', ['--', 'lvm', 'zap', '--destroy', path], error_ok=True) @@ -1907,7 +1811,7 @@ To check that the host is reachable: host=host) cmd_args = shlex.split(cmd_line) - out, err, code = self._run_cephadm( + out, err, code = CephadmServe(self)._run_cephadm( host, 'osd', 'shell', ['--'] + cmd_args, error_ok=True) if code: @@ -2338,7 +2242,7 @@ To check that the host is reachable: if self.cache.host_needs_registry_login(host) and self.registry_url: self._registry_login(host, self.registry_url, self.registry_username, self.registry_password) - out, err, code = self._run_cephadm( + out, err, code = CephadmServe(self)._run_cephadm( host, '', 'pull', [], image=image_name, no_fsid=True, diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index 20dc1c0325cfc..83d39712ad487 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -2,7 +2,9 @@ import datetime import json import logging from collections import defaultdict -from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict, Any +from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict, Any, Union, Tuple + +from cephadm import remotes try: import remoto @@ -19,8 +21,8 @@ import orchestrator from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent from cephadm.services.cephadmservice import CephadmDaemonSpec from cephadm.schedule import HostAssignment -from cephadm.upgrade import CEPH_UPGRADE_ORDER -from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest +from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \ + CephadmNoImage, CEPH_UPGRADE_ORDER from orchestrator._interface import daemon_type_to_service, service_to_daemon_types if TYPE_CHECKING: @@ -193,7 +195,7 @@ class CephadmServe: return None self.log.debug(' checking %s' % host) try: - out, err, code = self.mgr._run_cephadm( + out, err, code = self._run_cephadm( host, cephadmNoImage, 'check-host', [], error_ok=True, no_fsid=True) self.mgr.cache.update_last_host_check(host) @@ -211,7 +213,7 @@ class CephadmServe: def _refresh_host_daemons(self, host: str) -> Optional[str]: try: - out, err, code = self.mgr._run_cephadm( + out, err, code = self._run_cephadm( host, 'mon', 'ls', [], no_fsid=True) if code: return 'host %s cephadm ls returned %d: %s' % ( @@ -268,7 +270,7 @@ class CephadmServe: def _refresh_facts(self, host: str) -> Optional[str]: try: - out, err, code = self.mgr._run_cephadm( + out, err, code = self._run_cephadm( host, cephadmNoImage, 'gather-facts', [], error_ok=True, no_fsid=True) @@ -283,7 +285,7 @@ class CephadmServe: def _refresh_host_devices(self, host: str) -> Optional[str]: try: - out, err, code = self.mgr._run_cephadm( + out, err, code = self._run_cephadm( host, 'osd', 'ceph-volume', ['--', 'inventory', '--format=json', '--filter-for-batch']) @@ -298,7 +300,7 @@ class CephadmServe: except Exception as e: return 'host %s ceph-volume inventory failed: %s' % (host, e) try: - out, err, code = self.mgr._run_cephadm( + out, err, code = self._run_cephadm( host, 'mon', 'list-networks', [], @@ -801,7 +803,7 @@ class CephadmServe: 'Reconfiguring' if reconfig else 'Deploying', daemon_spec.name(), daemon_spec.host)) - out, err, code = self.mgr._run_cephadm( + out, err, code = self._run_cephadm( daemon_spec.host, daemon_spec.name(), 'deploy', [ '--name', daemon_spec.name(), @@ -850,7 +852,7 @@ class CephadmServe: args = ['--name', name, '--force'] self.log.info('Removing daemon %s from %s' % (name, host)) - out, err, code = self.mgr._run_cephadm( + out, err, code = self._run_cephadm( host, name, 'rm-daemon', args) if not code: # remove item from cache @@ -860,3 +862,101 @@ class CephadmServe: self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].post_remove(daemon) return "Removed {} from host '{}'".format(name, host) + + def _run_cephadm(self, + host: str, + entity: Union[CephadmNoImage, str], + command: str, + args: List[str], + addr: Optional[str] = "", + stdin: Optional[str] = "", + no_fsid: Optional[bool] = False, + error_ok: Optional[bool] = False, + image: Optional[str] = "", + env_vars: Optional[List[str]] = None, + ) -> Tuple[List[str], List[str], int]: + """ + Run cephadm on the remote host with the given command + args + + Important: You probably don't want to run _run_cephadm from CLI handlers + + :env_vars: in format -> [KEY=VALUE, ..] + """ + self.log.debug(f"_run_cephadm : command = {command}") + self.log.debug(f"_run_cephadm : args = {args}") + + bypass_image = ('cephadm-exporter',) + + with self.mgr._remote_connection(host, addr) as tpl: + conn, connr = tpl + assert image or entity + # Skip the image check for daemons deployed that are not ceph containers + if not str(entity).startswith(bypass_image): + if not image and entity is not cephadmNoImage: + image = self.mgr._get_container_image(entity) + + final_args = [] + + if env_vars: + for env_var_pair in env_vars: + final_args.extend(['--env', env_var_pair]) + + if image: + final_args.extend(['--image', image]) + final_args.append(command) + + if not no_fsid: + final_args += ['--fsid', self.mgr._cluster_fsid] + + if self.mgr.container_init: + final_args += ['--container-init'] + + final_args += args + + self.log.debug('args: %s' % (' '.join(final_args))) + if self.mgr.mode == 'root': + if stdin: + self.log.debug('stdin: %s' % stdin) + script = 'injected_argv = ' + json.dumps(final_args) + '\n' + if stdin: + script += 'injected_stdin = ' + json.dumps(stdin) + '\n' + script += self.mgr._cephadm + python = connr.choose_python() + if not python: + raise RuntimeError( + 'unable to find python on %s (tried %s in %s)' % ( + host, remotes.PYTHONS, remotes.PATH)) + try: + out, err, code = remoto.process.check( + conn, + [python, '-u'], + stdin=script.encode('utf-8')) + except RuntimeError as e: + self.mgr._reset_con(host) + if error_ok: + return [], [str(e)], 1 + raise + elif self.mgr.mode == 'cephadm-package': + try: + out, err, code = remoto.process.check( + conn, + ['sudo', '/usr/bin/cephadm'] + final_args, + stdin=stdin) + except RuntimeError as e: + self.mgr._reset_con(host) + if error_ok: + return [], [str(e)], 1 + raise + else: + assert False, 'unsupported mode' + + self.log.debug('code: %d' % code) + if out: + self.log.debug('out: %s' % '\n'.join(out)) + if err: + self.log.debug('err: %s' % '\n'.join(err)) + if code and not error_ok: + raise OrchestratorError( + 'cephadm exited with an error code: %d, stderr:%s' % ( + code, '\n'.join(err))) + return out, err, code diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py index 70364cc504808..06080c16ed75a 100644 --- a/src/pybind/mgr/cephadm/services/osd.py +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -67,7 +67,7 @@ class OSDService(CephService): code, '\n'.join(err))) # check result - out, err, code = self.mgr._run_cephadm( + out, err, code = CephadmServe(self.mgr)._run_cephadm( host, 'osd', 'ceph-volume', [ '--', @@ -263,7 +263,7 @@ class OSDService(CephService): split_cmd = cmd.split(' ') _cmd = ['--config-json', '-', '--'] _cmd.extend(split_cmd) - out, err, code = self.mgr._run_cephadm( + out, err, code = CephadmServe(self.mgr)._run_cephadm( host, 'osd', 'ceph-volume', _cmd, env_vars=env_vars, diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index e5b99226017dd..c296ccdbadaaf 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -81,7 +81,7 @@ class TestCephadm(object): new_mgr = cephadm_module.get_unique_name('mgr', 'myhost', existing) match_glob(new_mgr, 'myhost.*') - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) def test_host(self, cephadm_module): assert wait(cephadm_module, cephadm_module.get_hosts()) == [] with with_host(cephadm_module, 'test'): @@ -100,7 +100,7 @@ class TestCephadm(object): assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', 'test')] assert wait(cephadm_module, cephadm_module.get_hosts()) == [] - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None) def test_service_ls(self, cephadm_module): with with_host(cephadm_module, 'test'): @@ -159,13 +159,13 @@ class TestCephadm(object): del o['events'] # delete it, as it contains a timestamp assert out == expected - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) def test_device_ls(self, cephadm_module): with with_host(cephadm_module, 'test'): c = cephadm_module.get_inventory() assert wait(cephadm_module, c) == [InventoryHost('test')] - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm( + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm( json.dumps([ dict( name='rgw.myrgw.foobar', @@ -184,7 +184,7 @@ class TestCephadm(object): c = cephadm_module.list_daemons() assert wait(cephadm_module, c)[0].name() == 'rgw.myrgw.foobar' - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None) def test_daemon_action(self, cephadm_module: CephadmOrchestrator): cephadm_module.service_cache_timeout = 10 @@ -209,7 +209,7 @@ class TestCephadm(object): CephadmServe(cephadm_module)._check_daemons() - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None) def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator): cephadm_module.service_cache_timeout = 10 @@ -243,7 +243,7 @@ class TestCephadm(object): 'redeploy' ] ) - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) def test_daemon_check(self, cephadm_module: CephadmOrchestrator, action): with with_host(cephadm_module, 'test'): with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test') as d_names: @@ -258,7 +258,7 @@ class TestCephadm(object): assert cephadm_module.cache.get_scheduled_daemon_action('test', daemon_name) is None - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm") + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") def test_daemon_check_extra_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator): _run_cephadm.return_value = ('{}', '', 0) @@ -294,7 +294,7 @@ class TestCephadm(object): stdin='{"config": "\\n\\n[mon]\\nk=v\\n", "keyring": ""}', image='') - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) def test_daemon_check_post(self, cephadm_module: CephadmOrchestrator): with with_host(cephadm_module, 'test'): with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test'): @@ -315,7 +315,7 @@ class TestCephadm(object): {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'}, None) - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) def test_mon_add(self, cephadm_module): with with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) @@ -327,7 +327,7 @@ class TestCephadm(object): c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps)) wait(cephadm_module, c) - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) def test_mgr_update(self, cephadm_module): with with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) @@ -387,7 +387,7 @@ class TestCephadm(object): with pytest.raises(OrchestratorError): out = cephadm_module.osd_service.find_destroyed_osds() - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm") + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator): _run_cephadm.return_value = ('{}', '', 0) with with_host(cephadm_module, 'test'): @@ -426,7 +426,7 @@ class TestCephadm(object): _run_cephadm.assert_called_with( 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json']) - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.module.SpecStore.save") def test_apply_osd_save_placement(self, _save_spec, cephadm_module): with with_host(cephadm_module, 'test'): @@ -438,7 +438,7 @@ class TestCephadm(object): assert wait(cephadm_module, c) == ['Scheduled osd.foo update...'] _save_spec.assert_called_with(spec) - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) def test_create_osds(self, cephadm_module): with with_host(cephadm_module, 'test'): dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), @@ -447,7 +447,7 @@ class TestCephadm(object): out = wait(cephadm_module, c) assert out == "Created no osd(s) on host test; already created?" - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) def test_prepare_drivegroup(self, cephadm_module): with with_host(cephadm_module, 'test'): dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), @@ -473,7 +473,7 @@ class TestCephadm(object): "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"), ] ) - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command): with with_host(cephadm_module, 'test'): dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec( @@ -483,7 +483,7 @@ class TestCephadm(object): out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview) assert out in exp_command - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm( + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm( json.dumps([ dict( name='osd.0', @@ -522,7 +522,7 @@ class TestCephadm(object): out = wait(cephadm_module, c) assert out == [] - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None) def test_rgw_update(self, cephadm_module): with with_host(cephadm_module, 'host1'): @@ -541,7 +541,7 @@ class TestCephadm(object): assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host1') assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host2') - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm( + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm( json.dumps([ dict( name='rgw.myrgw.myhost.myid', @@ -577,14 +577,14 @@ class TestCephadm(object): ] ) @mock.patch("cephadm.module.CephadmOrchestrator._deploy_cephadm_binary", _deploy_cephadm_binary('test')) - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None) def test_daemon_add(self, spec: ServiceSpec, meth, cephadm_module): with with_host(cephadm_module, 'test'): with with_daemon(cephadm_module, spec, meth, 'test'): pass - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock()) def test_nfs(self, cephadm_module): with with_host(cephadm_module, 'test'): @@ -605,7 +605,7 @@ class TestCephadm(object): # automatically. assert_rm_service(cephadm_module, 'nfs.name') - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock()) def test_iscsi(self, cephadm_module): with with_host(cephadm_module, 'test'): @@ -641,7 +641,7 @@ class TestCephadm(object): 'ident' ] ) - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm") + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") def test_blink_device_light(self, _run_cephadm, on_bool, fault_ident, cephadm_module): _run_cephadm.return_value = '{}', '', 0 with with_host(cephadm_module, 'test'): @@ -651,7 +651,7 @@ class TestCephadm(object): _run_cephadm.assert_called_with('test', 'osd', 'shell', [ '--', 'lsmcli', f'local-disk-{fault_ident}-led-{on_off}', '--path', 'dev'], error_ok=True) - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm") + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") def test_blink_device_light_custom(self, _run_cephadm, cephadm_module): _run_cephadm.return_value = '{}', '', 0 with with_host(cephadm_module, 'test'): @@ -661,7 +661,7 @@ class TestCephadm(object): _run_cephadm.assert_called_with('test', 'osd', 'shell', [ '--', 'echo', 'hello'], error_ok=True) - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm") + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") def test_blink_device_light_custom_per_host(self, _run_cephadm, cephadm_module): _run_cephadm.return_value = '{}', '', 0 with with_host(cephadm_module, 'mgr0'): @@ -744,14 +744,14 @@ class TestCephadm(object): ] ) @mock.patch("cephadm.module.CephadmOrchestrator._deploy_cephadm_binary", _deploy_cephadm_binary('test')) - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None) def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator): with with_host(cephadm_module, 'test'): with with_service(cephadm_module, spec, meth, 'test'): pass - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop") def test_daemon_ok_to_stop(self, ok_to_stop, cephadm_module: CephadmOrchestrator): spec = ServiceSpec( @@ -890,7 +890,7 @@ class TestCephadm(object): with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m: assert m.manage_etc_ceph_ceph_conf is True - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm") + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator): def check_registry_credentials(url, username, password): assert cephadm_module.get_module_option('registry_url') == url @@ -937,7 +937,7 @@ class TestCephadm(object): assert err == 'Host test failed to login to fail-url as fail-user with given password' check_registry_credentials('json-url', 'json-user', 'json-pass') - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(json.dumps({ + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({ 'image_id': 'image_id', 'repo_digest': 'image@repo_digest', }))) diff --git a/src/pybind/mgr/cephadm/tests/test_migration.py b/src/pybind/mgr/cephadm/tests/test_migration.py index f46c1024f34e5..049743a1fb51c 100644 --- a/src/pybind/mgr/cephadm/tests/test_migration.py +++ b/src/pybind/mgr/cephadm/tests/test_migration.py @@ -12,7 +12,7 @@ from cephadm.serve import CephadmServe from tests import mock -@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None) def test_migrate_scheduler(cephadm_module: CephadmOrchestrator): with with_host(cephadm_module, 'host1', refresh_hosts=False): @@ -60,7 +60,7 @@ def test_migrate_scheduler(cephadm_module: CephadmOrchestrator): hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])] -@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) def test_migrate_service_id_mon_one(cephadm_module: CephadmOrchestrator): with with_host(cephadm_module, 'host1'): cephadm_module.set_store(SPEC_STORE_PREFIX + 'mon.wrong', json.dumps({ @@ -92,7 +92,7 @@ def test_migrate_service_id_mon_one(cephadm_module: CephadmOrchestrator): ) -@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) def test_migrate_service_id_mon_two(cephadm_module: CephadmOrchestrator): with with_host(cephadm_module, 'host1'): cephadm_module.set_store(SPEC_STORE_PREFIX + 'mon', json.dumps({ @@ -135,7 +135,7 @@ def test_migrate_service_id_mon_two(cephadm_module: CephadmOrchestrator): ) -@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) def test_migrate_service_id_mds_one(cephadm_module: CephadmOrchestrator): with with_host(cephadm_module, 'host1'): cephadm_module.set_store(SPEC_STORE_PREFIX + 'mds', json.dumps({ diff --git a/src/pybind/mgr/cephadm/tests/test_upgrade.py b/src/pybind/mgr/cephadm/tests/test_upgrade.py index 8316670856f3a..baa6c50a1a961 100644 --- a/src/pybind/mgr/cephadm/tests/test_upgrade.py +++ b/src/pybind/mgr/cephadm/tests/test_upgrade.py @@ -10,7 +10,7 @@ from cephadm.serve import CephadmServe from .fixtures import _run_cephadm, wait, cephadm_module, with_host, with_service -@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) def test_upgrade_start(cephadm_module: CephadmOrchestrator): with with_host(cephadm_module, 'test'): assert wait(cephadm_module, cephadm_module.upgrade_start( @@ -26,7 +26,7 @@ def test_upgrade_start(cephadm_module: CephadmOrchestrator): assert wait(cephadm_module, cephadm_module.upgrade_stop()) == 'Stopped upgrade to image_id' -@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) @pytest.mark.parametrize("use_repo_digest", [ False, @@ -52,7 +52,7 @@ def test_upgrade_run(use_repo_digest, cephadm_module: CephadmOrchestrator): cephadm_module._mon_command_mock_versions = _versions_mock - with mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(json.dumps({ + with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({ 'image_id': 'image_id', 'repo_digest': 'to_image@repo_digest', }))): @@ -61,7 +61,7 @@ def test_upgrade_run(use_repo_digest, cephadm_module: CephadmOrchestrator): assert cephadm_module.upgrade_status is not None - with mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm( + with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm( json.dumps([ dict( name=list(cephadm_module.cache.daemons['test'].keys())[0], diff --git a/src/pybind/mgr/cephadm/upgrade.py b/src/pybind/mgr/cephadm/upgrade.py index 45c352b503ab4..77727e45f5419 100644 --- a/src/pybind/mgr/cephadm/upgrade.py +++ b/src/pybind/mgr/cephadm/upgrade.py @@ -2,20 +2,17 @@ import json import logging import time import uuid -from typing import TYPE_CHECKING, Optional, Dict, NamedTuple +from typing import TYPE_CHECKING, Optional, Dict import orchestrator -from cephadm.utils import name_to_config_section +from cephadm.serve import CephadmServe +from cephadm.utils import name_to_config_section, CEPH_UPGRADE_ORDER from orchestrator import OrchestratorError, DaemonDescription if TYPE_CHECKING: from .module import CephadmOrchestrator -# ceph daemon types that use the ceph container image. -# NOTE: listed in upgrade order! -CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror'] - logger = logging.getLogger(__name__) @@ -297,13 +294,13 @@ class CephadmUpgrade: continue # make sure host has latest container image - out, err, code = self.mgr._run_cephadm( + out, err, code = CephadmServe(self.mgr)._run_cephadm( d.hostname, '', 'inspect-image', [], image=target_image, no_fsid=True, error_ok=True) if code or json.loads(''.join(out)).get('image_id') != target_id: logger.info('Upgrade: Pulling %s on %s' % (target_image, d.hostname)) - out, err, code = self.mgr._run_cephadm( + out, err, code = CephadmServe(self.mgr)._run_cephadm( d.hostname, '', 'pull', [], image=target_image, no_fsid=True, error_ok=True) if code: diff --git a/src/pybind/mgr/cephadm/utils.py b/src/pybind/mgr/cephadm/utils.py index 0680df0099c41..2cb7f8dd13bbd 100644 --- a/src/pybind/mgr/cephadm/utils.py +++ b/src/pybind/mgr/cephadm/utils.py @@ -19,6 +19,11 @@ class CephadmNoImage(Enum): token = 1 +# ceph daemon types that use the ceph container image. +# NOTE: listed in upgrade order! +CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror'] + + # Used for _run_cephadm used for check-host etc that don't require an --image parameter cephadmNoImage = CephadmNoImage.token -- 2.39.5