]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: move _run_cephadm to serve.py
authorSebastian Wagner <sebastian.wagner@suse.com>
Wed, 6 Jan 2021 12:07:51 +0000 (13:07 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Mon, 11 Jan 2021 13:58:26 +0000 (14:58 +0100)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/osd.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/cephadm/tests/test_migration.py
src/pybind/mgr/cephadm/tests/test_upgrade.py
src/pybind/mgr/cephadm/upgrade.py
src/pybind/mgr/cephadm/utils.py

index 37cb9fd2da48350df81ee39119aad996df3096b2..639faebf787a2e21f78bb7cc1e33993b0a99395d 100644 (file)
@@ -12,7 +12,7 @@ from threading import Event
 
 import string
 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
-    Any, Set, TYPE_CHECKING, cast, Iterator, Union, NamedTuple
+    Any, Set, TYPE_CHECKING, cast, Iterator, NamedTuple
 
 import datetime
 import os
@@ -55,7 +55,7 @@ from .schedule import HostAssignment
 from .inventory import Inventory, SpecStore, HostCache, EventStore
 from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
 from .template import TemplateMgr
-from .utils import forall_hosts, CephadmNoImage, cephadmNoImage
+from .utils import forall_hosts, cephadmNoImage
 
 try:
     import remoto
@@ -481,7 +481,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             'username': username,
             'password': password,
         })
-        out, err, code = self._run_cephadm(
+        out, err, code = CephadmServe(self)._run_cephadm(
             host, 'mon', 'registry-login',
             ['--registry-json', '-'], stdin=args_str, error_ok=True)
         if code:
@@ -918,10 +918,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         'Check whether we can access and manage a remote host')
     def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
         try:
-            out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host',
-                                               ['--expect-hostname', host],
-                                               addr=addr,
-                                               error_ok=True, no_fsid=True)
+            out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host',
+                                                             ['--expect-hostname', host],
+                                                             addr=addr,
+                                                             error_ok=True, no_fsid=True)
             if code:
                 return 1, '', ('check-host failed:\n' + '\n'.join(err))
         except OrchestratorError as e:
@@ -942,10 +942,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         'name=addr,type=CephString,req=false',
         'Prepare a remote host for use with cephadm')
     def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
-        out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host',
-                                           ['--expect-hostname', host],
-                                           addr=addr,
-                                           error_ok=True, no_fsid=True)
+        out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host',
+                                                         ['--expect-hostname', host],
+                                                         addr=addr,
+                                                         error_ok=True, no_fsid=True)
         if code:
             return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
         # if we have an outstanding health alert for this host, give the
@@ -1221,102 +1221,6 @@ To check that the host is reachable:
 
         return image
 
-    def _run_cephadm(self,
-                     host: str,
-                     entity: Union[CephadmNoImage, str],
-                     command: str,
-                     args: List[str],
-                     addr: Optional[str] = "",
-                     stdin: Optional[str] = "",
-                     no_fsid: Optional[bool] = False,
-                     error_ok: Optional[bool] = False,
-                     image: Optional[str] = "",
-                     env_vars: Optional[List[str]] = None,
-                     ) -> Tuple[List[str], List[str], int]:
-        """
-        Run cephadm on the remote host with the given command + args
-
-        :env_vars: in format -> [KEY=VALUE, ..]
-        """
-        self.log.debug(f"_run_cephadm : command = {command}")
-        self.log.debug(f"_run_cephadm : args = {args}")
-
-        bypass_image = ('cephadm-exporter',)
-
-        with self._remote_connection(host, addr) as tpl:
-            conn, connr = tpl
-            assert image or entity
-            # Skip the image check for daemons deployed that are not ceph containers
-            if not str(entity).startswith(bypass_image):
-                if not image and entity is not cephadmNoImage:
-                    image = self._get_container_image(entity)
-
-            final_args = []
-
-            if env_vars:
-                for env_var_pair in env_vars:
-                    final_args.extend(['--env', env_var_pair])
-
-            if image:
-                final_args.extend(['--image', image])
-            final_args.append(command)
-
-            if not no_fsid:
-                final_args += ['--fsid', self._cluster_fsid]
-
-            if self.container_init:
-                final_args += ['--container-init']
-
-            final_args += args
-
-            self.log.debug('args: %s' % (' '.join(final_args)))
-            if self.mode == 'root':
-                if stdin:
-                    self.log.debug('stdin: %s' % stdin)
-                script = 'injected_argv = ' + json.dumps(final_args) + '\n'
-                if stdin:
-                    script += 'injected_stdin = ' + json.dumps(stdin) + '\n'
-                script += self._cephadm
-                python = connr.choose_python()
-                if not python:
-                    raise RuntimeError(
-                        'unable to find python on %s (tried %s in %s)' % (
-                            host, remotes.PYTHONS, remotes.PATH))
-                try:
-                    out, err, code = remoto.process.check(
-                        conn,
-                        [python, '-u'],
-                        stdin=script.encode('utf-8'))
-                except RuntimeError as e:
-                    self._reset_con(host)
-                    if error_ok:
-                        return [], [str(e)], 1
-                    raise
-            elif self.mode == 'cephadm-package':
-                try:
-                    out, err, code = remoto.process.check(
-                        conn,
-                        ['sudo', '/usr/bin/cephadm'] + final_args,
-                        stdin=stdin)
-                except RuntimeError as e:
-                    self._reset_con(host)
-                    if error_ok:
-                        return [], [str(e)], 1
-                    raise
-            else:
-                assert False, 'unsupported mode'
-
-            self.log.debug('code: %d' % code)
-            if out:
-                self.log.debug('out: %s' % '\n'.join(out))
-            if err:
-                self.log.debug('err: %s' % '\n'.join(err))
-            if code and not error_ok:
-                raise OrchestratorError(
-                    'cephadm exited with an error code: %d, stderr:%s' % (
-                        code, '\n'.join(err)))
-            return out, err, code
-
     def _hosts_with_daemon_inventory(self) -> List[HostSpec]:
         """
         Returns all usable hosts that went through _refresh_host_daemons().
@@ -1340,10 +1244,10 @@ To check that the host is reachable:
         :param host: host name
         """
         assert_valid_host(spec.hostname)
-        out, err, code = self._run_cephadm(spec.hostname, cephadmNoImage, 'check-host',
-                                           ['--expect-hostname', spec.hostname],
-                                           addr=spec.addr,
-                                           error_ok=True, no_fsid=True)
+        out, err, code = CephadmServe(self)._run_cephadm(spec.hostname, cephadmNoImage, 'check-host',
+                                                         ['--expect-hostname', spec.hostname],
+                                                         addr=spec.addr,
+                                                         error_ok=True, no_fsid=True)
         if code:
             # err will contain stdout and stderr, so we filter on the message text to 
             # only show the errors
@@ -1490,9 +1394,9 @@ To check that the host is reachable:
                 raise OrchestratorError(msg, errno=rc)
 
             # call the host-maintenance function
-            out, _err, _code = self._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
-                                                 ["enter"],
-                                                 error_ok=True)
+            out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
+                                                               ["enter"],
+                                                               error_ok=True)
             if out:
                 raise OrchestratorError(
                     f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}")
@@ -1541,9 +1445,9 @@ To check that the host is reachable:
         if tgt_host['status'] != "maintenance":
             raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
 
-        out, _err, _code = self._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
-                                             ['exit'],
-                                             error_ok=True)
+        out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
+                                                           ['exit'],
+                                                           error_ok=True)
         if out:
             raise OrchestratorError(
                 f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
@@ -1757,7 +1661,7 @@ To check that the host is reachable:
         name = daemon_spec.name()
         for a in actions[action]:
             try:
-                out, err, code = self._run_cephadm(
+                out, err, code = CephadmServe(self)._run_cephadm(
                     host, name, 'unit',
                     ['--name', name, a])
             except Exception:
@@ -1872,7 +1776,7 @@ To check that the host is reachable:
     @trivial_completion
     def zap_device(self, host: str, path: str) -> str:
         self.log.info('Zap device %s:%s' % (host, path))
-        out, err, code = self._run_cephadm(
+        out, err, code = CephadmServe(self)._run_cephadm(
             host, 'osd', 'ceph-volume',
             ['--', 'lvm', 'zap', '--destroy', path],
             error_ok=True)
@@ -1907,7 +1811,7 @@ To check that the host is reachable:
                                             host=host)
             cmd_args = shlex.split(cmd_line)
 
-            out, err, code = self._run_cephadm(
+            out, err, code = CephadmServe(self)._run_cephadm(
                 host, 'osd', 'shell', ['--'] + cmd_args,
                 error_ok=True)
             if code:
@@ -2338,7 +2242,7 @@ To check that the host is reachable:
         if self.cache.host_needs_registry_login(host) and self.registry_url:
             self._registry_login(host, self.registry_url,
                                  self.registry_username, self.registry_password)
-        out, err, code = self._run_cephadm(
+        out, err, code = CephadmServe(self)._run_cephadm(
             host, '', 'pull', [],
             image=image_name,
             no_fsid=True,
index 20dc1c0325cfcaccee44ab8fa1ab0eac17be0e39..83d39712ad487920a5349acb8d059a7fbf13a70c 100644 (file)
@@ -2,7 +2,9 @@ import datetime
 import json
 import logging
 from collections import defaultdict
-from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict, Any
+from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict, Any, Union, Tuple
+
+from cephadm import remotes
 
 try:
     import remoto
@@ -19,8 +21,8 @@ import orchestrator
 from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent
 from cephadm.services.cephadmservice import CephadmDaemonSpec
 from cephadm.schedule import HostAssignment
-from cephadm.upgrade import CEPH_UPGRADE_ORDER
-from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest
+from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
+    CephadmNoImage, CEPH_UPGRADE_ORDER
 from orchestrator._interface import daemon_type_to_service, service_to_daemon_types
 
 if TYPE_CHECKING:
@@ -193,7 +195,7 @@ class CephadmServe:
             return None
         self.log.debug(' checking %s' % host)
         try:
-            out, err, code = self.mgr._run_cephadm(
+            out, err, code = self._run_cephadm(
                 host, cephadmNoImage, 'check-host', [],
                 error_ok=True, no_fsid=True)
             self.mgr.cache.update_last_host_check(host)
@@ -211,7 +213,7 @@ class CephadmServe:
 
     def _refresh_host_daemons(self, host: str) -> Optional[str]:
         try:
-            out, err, code = self.mgr._run_cephadm(
+            out, err, code = self._run_cephadm(
                 host, 'mon', 'ls', [], no_fsid=True)
             if code:
                 return 'host %s cephadm ls returned %d: %s' % (
@@ -268,7 +270,7 @@ class CephadmServe:
 
     def _refresh_facts(self, host: str) -> Optional[str]:
         try:
-            out, err, code = self.mgr._run_cephadm(
+            out, err, code = self._run_cephadm(
                 host, cephadmNoImage, 'gather-facts', [],
                 error_ok=True, no_fsid=True)
 
@@ -283,7 +285,7 @@ class CephadmServe:
 
     def _refresh_host_devices(self, host: str) -> Optional[str]:
         try:
-            out, err, code = self.mgr._run_cephadm(
+            out, err, code = self._run_cephadm(
                 host, 'osd',
                 'ceph-volume',
                 ['--', 'inventory', '--format=json', '--filter-for-batch'])
@@ -298,7 +300,7 @@ class CephadmServe:
         except Exception as e:
             return 'host %s ceph-volume inventory failed: %s' % (host, e)
         try:
-            out, err, code = self.mgr._run_cephadm(
+            out, err, code = self._run_cephadm(
                 host, 'mon',
                 'list-networks',
                 [],
@@ -801,7 +803,7 @@ class CephadmServe:
                 'Reconfiguring' if reconfig else 'Deploying',
                 daemon_spec.name(), daemon_spec.host))
 
-            out, err, code = self.mgr._run_cephadm(
+            out, err, code = self._run_cephadm(
                 daemon_spec.host, daemon_spec.name(), 'deploy',
                 [
                     '--name', daemon_spec.name(),
@@ -850,7 +852,7 @@ class CephadmServe:
 
             args = ['--name', name, '--force']
             self.log.info('Removing daemon %s from %s' % (name, host))
-            out, err, code = self.mgr._run_cephadm(
+            out, err, code = self._run_cephadm(
                 host, name, 'rm-daemon', args)
             if not code:
                 # remove item from cache
@@ -860,3 +862,101 @@ class CephadmServe:
             self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].post_remove(daemon)
 
             return "Removed {} from host '{}'".format(name, host)
+
+    def _run_cephadm(self,
+                     host: str,
+                     entity: Union[CephadmNoImage, str],
+                     command: str,
+                     args: List[str],
+                     addr: Optional[str] = "",
+                     stdin: Optional[str] = "",
+                     no_fsid: Optional[bool] = False,
+                     error_ok: Optional[bool] = False,
+                     image: Optional[str] = "",
+                     env_vars: Optional[List[str]] = None,
+                     ) -> Tuple[List[str], List[str], int]:
+        """
+        Run cephadm on the remote host with the given command + args
+
+        Important: You probably don't want to run _run_cephadm from CLI handlers
+
+        :env_vars: in format -> [KEY=VALUE, ..]
+        """
+        self.log.debug(f"_run_cephadm : command = {command}")
+        self.log.debug(f"_run_cephadm : args = {args}")
+
+        bypass_image = ('cephadm-exporter',)
+
+        with self.mgr._remote_connection(host, addr) as tpl:
+            conn, connr = tpl
+            assert image or entity
+            # Skip the image check for daemons deployed that are not ceph containers
+            if not str(entity).startswith(bypass_image):
+                if not image and entity is not cephadmNoImage:
+                    image = self.mgr._get_container_image(entity)
+
+            final_args = []
+
+            if env_vars:
+                for env_var_pair in env_vars:
+                    final_args.extend(['--env', env_var_pair])
+
+            if image:
+                final_args.extend(['--image', image])
+            final_args.append(command)
+
+            if not no_fsid:
+                final_args += ['--fsid', self.mgr._cluster_fsid]
+
+            if self.mgr.container_init:
+                final_args += ['--container-init']
+
+            final_args += args
+
+            self.log.debug('args: %s' % (' '.join(final_args)))
+            if self.mgr.mode == 'root':
+                if stdin:
+                    self.log.debug('stdin: %s' % stdin)
+                script = 'injected_argv = ' + json.dumps(final_args) + '\n'
+                if stdin:
+                    script += 'injected_stdin = ' + json.dumps(stdin) + '\n'
+                script += self.mgr._cephadm
+                python = connr.choose_python()
+                if not python:
+                    raise RuntimeError(
+                        'unable to find python on %s (tried %s in %s)' % (
+                            host, remotes.PYTHONS, remotes.PATH))
+                try:
+                    out, err, code = remoto.process.check(
+                        conn,
+                        [python, '-u'],
+                        stdin=script.encode('utf-8'))
+                except RuntimeError as e:
+                    self.mgr._reset_con(host)
+                    if error_ok:
+                        return [], [str(e)], 1
+                    raise
+            elif self.mgr.mode == 'cephadm-package':
+                try:
+                    out, err, code = remoto.process.check(
+                        conn,
+                        ['sudo', '/usr/bin/cephadm'] + final_args,
+                        stdin=stdin)
+                except RuntimeError as e:
+                    self.mgr._reset_con(host)
+                    if error_ok:
+                        return [], [str(e)], 1
+                    raise
+            else:
+                assert False, 'unsupported mode'
+
+            self.log.debug('code: %d' % code)
+            if out:
+                self.log.debug('out: %s' % '\n'.join(out))
+            if err:
+                self.log.debug('err: %s' % '\n'.join(err))
+            if code and not error_ok:
+                raise OrchestratorError(
+                    'cephadm exited with an error code: %d, stderr:%s' % (
+                        code, '\n'.join(err)))
+            return out, err, code
index 70364cc5048086c49c7cc62327b7a46e8c2f8fdd..06080c16ed75a7f42ac6eb0f838ccb74951b79a8 100644 (file)
@@ -67,7 +67,7 @@ class OSDService(CephService):
                     code, '\n'.join(err)))
 
         # check result
-        out, err, code = self.mgr._run_cephadm(
+        out, err, code = CephadmServe(self.mgr)._run_cephadm(
             host, 'osd', 'ceph-volume',
             [
                 '--',
@@ -263,7 +263,7 @@ class OSDService(CephService):
         split_cmd = cmd.split(' ')
         _cmd = ['--config-json', '-', '--']
         _cmd.extend(split_cmd)
-        out, err, code = self.mgr._run_cephadm(
+        out, err, code = CephadmServe(self.mgr)._run_cephadm(
             host, 'osd', 'ceph-volume',
             _cmd,
             env_vars=env_vars,
index e5b99226017dd704d23d972bd4370b455bbe5096..c296ccdbadaafda118bd6422dd024f100f4e5d3f 100644 (file)
@@ -81,7 +81,7 @@ class TestCephadm(object):
         new_mgr = cephadm_module.get_unique_name('mgr', 'myhost', existing)
         match_glob(new_mgr, 'myhost.*')
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     def test_host(self, cephadm_module):
         assert wait(cephadm_module, cephadm_module.get_hosts()) == []
         with with_host(cephadm_module, 'test'):
@@ -100,7 +100,7 @@ class TestCephadm(object):
             assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', 'test')]
         assert wait(cephadm_module, cephadm_module.get_hosts()) == []
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
     def test_service_ls(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
@@ -159,13 +159,13 @@ class TestCephadm(object):
                             del o['events']  # delete it, as it contains a timestamp
                     assert out == expected
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     def test_device_ls(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
             c = cephadm_module.get_inventory()
             assert wait(cephadm_module, c) == [InventoryHost('test')]
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
         json.dumps([
             dict(
                 name='rgw.myrgw.foobar',
@@ -184,7 +184,7 @@ class TestCephadm(object):
             c = cephadm_module.list_daemons()
             assert wait(cephadm_module, c)[0].name() == 'rgw.myrgw.foobar'
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
     def test_daemon_action(self, cephadm_module: CephadmOrchestrator):
         cephadm_module.service_cache_timeout = 10
@@ -209,7 +209,7 @@ class TestCephadm(object):
 
                 CephadmServe(cephadm_module)._check_daemons()
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
     def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator):
         cephadm_module.service_cache_timeout = 10
@@ -243,7 +243,7 @@ class TestCephadm(object):
             'redeploy'
         ]
     )
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     def test_daemon_check(self, cephadm_module: CephadmOrchestrator, action):
         with with_host(cephadm_module, 'test'):
             with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test') as d_names:
@@ -258,7 +258,7 @@ class TestCephadm(object):
 
                 assert cephadm_module.cache.get_scheduled_daemon_action('test', daemon_name) is None
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
     def test_daemon_check_extra_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
         _run_cephadm.return_value = ('{}', '', 0)
 
@@ -294,7 +294,7 @@ class TestCephadm(object):
                                                 stdin='{"config": "\\n\\n[mon]\\nk=v\\n", "keyring": ""}',
                                                 image='')
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     def test_daemon_check_post(self, cephadm_module: CephadmOrchestrator):
         with with_host(cephadm_module, 'test'):
             with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test'):
@@ -315,7 +315,7 @@ class TestCephadm(object):
                         {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'},
                         None)
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     def test_mon_add(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
@@ -327,7 +327,7 @@ class TestCephadm(object):
                 c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
                 wait(cephadm_module, c)
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     def test_mgr_update(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
@@ -387,7 +387,7 @@ class TestCephadm(object):
         with pytest.raises(OrchestratorError):
             out = cephadm_module.osd_service.find_destroyed_osds()
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
     def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
         _run_cephadm.return_value = ('{}', '', 0)
         with with_host(cephadm_module, 'test'):
@@ -426,7 +426,7 @@ class TestCephadm(object):
             _run_cephadm.assert_called_with(
                 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'])
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.module.SpecStore.save")
     def test_apply_osd_save_placement(self, _save_spec, cephadm_module):
         with with_host(cephadm_module, 'test'):
@@ -438,7 +438,7 @@ class TestCephadm(object):
             assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
             _save_spec.assert_called_with(spec)
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     def test_create_osds(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
             dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
@@ -447,7 +447,7 @@ class TestCephadm(object):
             out = wait(cephadm_module, c)
             assert out == "Created no osd(s) on host test; already created?"
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     def test_prepare_drivegroup(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
             dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
@@ -473,7 +473,7 @@ class TestCephadm(object):
              "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
         ]
     )
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
         with with_host(cephadm_module, 'test'):
             dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(
@@ -483,7 +483,7 @@ class TestCephadm(object):
             out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
             assert out in exp_command
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
         json.dumps([
             dict(
                 name='osd.0',
@@ -522,7 +522,7 @@ class TestCephadm(object):
             out = wait(cephadm_module, c)
             assert out == []
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
     def test_rgw_update(self, cephadm_module):
         with with_host(cephadm_module, 'host1'):
@@ -541,7 +541,7 @@ class TestCephadm(object):
                 assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host1')
                 assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host2')
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
         json.dumps([
             dict(
                 name='rgw.myrgw.myhost.myid',
@@ -577,14 +577,14 @@ class TestCephadm(object):
         ]
     )
     @mock.patch("cephadm.module.CephadmOrchestrator._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
     def test_daemon_add(self, spec: ServiceSpec, meth, cephadm_module):
         with with_host(cephadm_module, 'test'):
             with with_daemon(cephadm_module, spec, meth, 'test'):
                 pass
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
     def test_nfs(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
@@ -605,7 +605,7 @@ class TestCephadm(object):
             # automatically.
             assert_rm_service(cephadm_module, 'nfs.name')
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
     def test_iscsi(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
@@ -641,7 +641,7 @@ class TestCephadm(object):
             'ident'
         ]
     )
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
     def test_blink_device_light(self, _run_cephadm, on_bool, fault_ident, cephadm_module):
         _run_cephadm.return_value = '{}', '', 0
         with with_host(cephadm_module, 'test'):
@@ -651,7 +651,7 @@ class TestCephadm(object):
             _run_cephadm.assert_called_with('test', 'osd', 'shell', [
                                             '--', 'lsmcli', f'local-disk-{fault_ident}-led-{on_off}', '--path', 'dev'], error_ok=True)
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
     def test_blink_device_light_custom(self, _run_cephadm, cephadm_module):
         _run_cephadm.return_value = '{}', '', 0
         with with_host(cephadm_module, 'test'):
@@ -661,7 +661,7 @@ class TestCephadm(object):
             _run_cephadm.assert_called_with('test', 'osd', 'shell', [
                                             '--', 'echo', 'hello'], error_ok=True)
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
     def test_blink_device_light_custom_per_host(self, _run_cephadm, cephadm_module):
         _run_cephadm.return_value = '{}', '', 0
         with with_host(cephadm_module, 'mgr0'):
@@ -744,14 +744,14 @@ class TestCephadm(object):
         ]
     )
     @mock.patch("cephadm.module.CephadmOrchestrator._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
     def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
         with with_host(cephadm_module, 'test'):
             with with_service(cephadm_module, spec, meth, 'test'):
                 pass
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop")
     def test_daemon_ok_to_stop(self, ok_to_stop, cephadm_module: CephadmOrchestrator):
         spec = ServiceSpec(
@@ -890,7 +890,7 @@ class TestCephadm(object):
         with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m:
             assert m.manage_etc_ceph_ceph_conf is True
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
     def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
         def check_registry_credentials(url, username, password):
             assert cephadm_module.get_module_option('registry_url') == url
@@ -937,7 +937,7 @@ class TestCephadm(object):
             assert err == 'Host test failed to login to fail-url as fail-user with given password'
             check_registry_credentials('json-url', 'json-user', 'json-pass')
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(json.dumps({
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({
         'image_id': 'image_id',
                     'repo_digest': 'image@repo_digest',
     })))
index f46c1024f34e5c6319327a10787f900523579e4d..049743a1fb51cc8df99d9f25fad727cee18f7f37 100644 (file)
@@ -12,7 +12,7 @@ from cephadm.serve import CephadmServe
 from tests import mock
 
 
-@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
 @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
 def test_migrate_scheduler(cephadm_module: CephadmOrchestrator):
     with with_host(cephadm_module, 'host1', refresh_hosts=False):
@@ -60,7 +60,7 @@ def test_migrate_scheduler(cephadm_module: CephadmOrchestrator):
                 hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])]
 
 
-@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
 def test_migrate_service_id_mon_one(cephadm_module: CephadmOrchestrator):
     with with_host(cephadm_module, 'host1'):
         cephadm_module.set_store(SPEC_STORE_PREFIX + 'mon.wrong', json.dumps({
@@ -92,7 +92,7 @@ def test_migrate_service_id_mon_one(cephadm_module: CephadmOrchestrator):
         )
 
 
-@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
 def test_migrate_service_id_mon_two(cephadm_module: CephadmOrchestrator):
     with with_host(cephadm_module, 'host1'):
         cephadm_module.set_store(SPEC_STORE_PREFIX + 'mon', json.dumps({
@@ -135,7 +135,7 @@ def test_migrate_service_id_mon_two(cephadm_module: CephadmOrchestrator):
         )
 
 
-@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
 def test_migrate_service_id_mds_one(cephadm_module: CephadmOrchestrator):
     with with_host(cephadm_module, 'host1'):
         cephadm_module.set_store(SPEC_STORE_PREFIX + 'mds', json.dumps({
index 8316670856f3a708592e991f1b9e1285ca0c99ba..baa6c50a1a961ee1b96dc82cbe37df4364af96d8 100644 (file)
@@ -10,7 +10,7 @@ from cephadm.serve import CephadmServe
 from .fixtures import _run_cephadm, wait, cephadm_module, with_host, with_service
 
 
-@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
 def test_upgrade_start(cephadm_module: CephadmOrchestrator):
     with with_host(cephadm_module, 'test'):
         assert wait(cephadm_module, cephadm_module.upgrade_start(
@@ -26,7 +26,7 @@ def test_upgrade_start(cephadm_module: CephadmOrchestrator):
         assert wait(cephadm_module, cephadm_module.upgrade_stop()) == 'Stopped upgrade to image_id'
 
 
-@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
 @pytest.mark.parametrize("use_repo_digest",
                          [
                              False,
@@ -52,7 +52,7 @@ def test_upgrade_run(use_repo_digest, cephadm_module: CephadmOrchestrator):
 
             cephadm_module._mon_command_mock_versions = _versions_mock
 
-            with mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(json.dumps({
+            with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({
                 'image_id': 'image_id',
                 'repo_digest': 'to_image@repo_digest',
             }))):
@@ -61,7 +61,7 @@ def test_upgrade_run(use_repo_digest, cephadm_module: CephadmOrchestrator):
 
             assert cephadm_module.upgrade_status is not None
 
-            with mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
+            with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
                 json.dumps([
                     dict(
                         name=list(cephadm_module.cache.daemons['test'].keys())[0],
index 45c352b503ab48043eef89ed652dbf18be431841..77727e45f54193a74e494431e6d4cd7cac898652 100644 (file)
@@ -2,20 +2,17 @@ import json
 import logging
 import time
 import uuid
-from typing import TYPE_CHECKING, Optional, Dict, NamedTuple
+from typing import TYPE_CHECKING, Optional, Dict
 
 import orchestrator
-from cephadm.utils import name_to_config_section
+from cephadm.serve import CephadmServe
+from cephadm.utils import name_to_config_section, CEPH_UPGRADE_ORDER
 from orchestrator import OrchestratorError, DaemonDescription
 
 if TYPE_CHECKING:
     from .module import CephadmOrchestrator
 
 
-# ceph daemon types that use the ceph container image.
-# NOTE: listed in upgrade order!
-CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
-
 logger = logging.getLogger(__name__)
 
 
@@ -297,13 +294,13 @@ class CephadmUpgrade:
                     continue
 
                 # make sure host has latest container image
-                out, err, code = self.mgr._run_cephadm(
+                out, err, code = CephadmServe(self.mgr)._run_cephadm(
                     d.hostname, '', 'inspect-image', [],
                     image=target_image, no_fsid=True, error_ok=True)
                 if code or json.loads(''.join(out)).get('image_id') != target_id:
                     logger.info('Upgrade: Pulling %s on %s' % (target_image,
                                                                d.hostname))
-                    out, err, code = self.mgr._run_cephadm(
+                    out, err, code = CephadmServe(self.mgr)._run_cephadm(
                         d.hostname, '', 'pull', [],
                         image=target_image, no_fsid=True, error_ok=True)
                     if code:
index 0680df0099c41a72c6cc0dd19c304accb952cb2b..2cb7f8dd13bbde314c936afcd49796d7be475ebb 100644 (file)
@@ -19,6 +19,11 @@ class CephadmNoImage(Enum):
     token = 1
 
 
+# ceph daemon types that use the ceph container image.
+# NOTE: listed in upgrade order!
+CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
+
+
 # Used for _run_cephadm used for check-host etc that don't require an --image parameter
 cephadmNoImage = CephadmNoImage.token