]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: move OSD related code to OSDService
authorSebastian Wagner <sebastian.wagner@suse.com>
Mon, 4 May 2020 11:12:38 +0000 (13:12 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Thu, 7 May 2020 11:04:59 +0000 (13:04 +0200)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/services/cephadmservice.py [new file with mode: 0644]
src/pybind/mgr/cephadm/services/nfs.py
src/pybind/mgr/cephadm/services/osd.py
src/pybind/mgr/cephadm/tests/test_cephadm.py

index 541737bd5e906f7df28e5472744881e804625f24..d1699d3bcad7a869777d69172df5f0d5c80731c5 100644 (file)
@@ -26,13 +26,12 @@ import shutil
 import subprocess
 import uuid
 
-from ceph.deployment import inventory, translate
+from ceph.deployment import inventory
 from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.drive_selection.selector import DriveSelection
 from ceph.deployment.service_spec import \
     HostPlacementSpec, NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
 
-from mgr_module import MgrModule, HandleCommandResult, MonCommandFailed
+from mgr_module import MgrModule, HandleCommandResult
 import orchestrator
 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
     CLICommandMeta
@@ -40,7 +39,7 @@ from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpe
 from . import remotes
 from . import utils
 from .services.nfs import NFSGanesha
-from .services.osd import RemoveUtil, OSDRemoval
+from .services.osd import RemoveUtil, OSDRemoval, OSDService
 from .inventory import Inventory, SpecStore, HostCache
 
 try:
@@ -420,6 +419,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         # in-memory only.
         self.offline_hosts: Set[str] = set()
 
+        # services:
+        self.osd_service = OSDService(self)
+
     def shutdown(self):
         self.log.debug('shutdown')
         self._worker_pool.close()
@@ -1430,7 +1432,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             sd.container_image_id = d.get('container_image_id')
             sd.version = d.get('version')
             if sd.daemon_type == 'osd':
-                sd.osdspec_affinity = self.get_osdspec_affinity(sd.daemon_id)
+                sd.osdspec_affinity = self.osd_service.get_osdspec_affinity(sd.daemon_id)
             if 'state' in d:
                 sd.status_desc = d['state']
                 sd.status = {
@@ -1755,197 +1757,14 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def apply_drivegroups(self, specs: List[DriveGroupSpec]):
         return [self._apply(spec) for spec in specs]
 
-    def get_osdspec_affinity(self, osd_id: str) -> str:
-        return self.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '')
-
-    def find_destroyed_osds(self) -> Dict[str, List[str]]:
-        osd_host_map: Dict[str, List[str]] = dict()
-        try:
-            ret, out, err = self.check_mon_command({
-                'prefix': 'osd tree',
-                'states': ['destroyed'],
-                'format': 'json'
-            })
-        except MonCommandFailed as e:
-            logger.exception('osd tree failed')
-            raise OrchestratorError(str(e))
-        try:
-            tree = json.loads(out)
-        except json.decoder.JSONDecodeError:
-            self.log.exception(f"Could not decode json -> {out}")
-            return osd_host_map
-
-        nodes = tree.get('nodes', {})
-        for node in nodes:
-            if node.get('type') == 'host':
-                osd_host_map.update(
-                    {node.get('name'): [str(_id) for _id in node.get('children', list())]}
-                )
-        return osd_host_map
-
     @trivial_completion
     def create_osds(self, drive_group: DriveGroupSpec):
-        self.log.debug(f"Processing DriveGroup {drive_group}")
-        ret = []
-        drive_group.osd_id_claims = self.find_destroyed_osds()
-        self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
-        for host, drive_selection in self.prepare_drivegroup(drive_group):
-            self.log.info('Applying %s on host %s...' % (drive_group.service_id, host))
-            cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection,
-                                                     drive_group.osd_id_claims.get(host, []))
-            if not cmd:
-                self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
-                continue
-            ret_msg = self._create_osd(host, cmd,
-                                       replace_osd_ids=drive_group.osd_id_claims.get(host, []))
-            ret.append(ret_msg)
-        return ", ".join(ret)
-
-    def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
-        # 1) use fn_filter to determine matching_hosts
-        matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()])
-        # 2) Map the inventory to the InventoryHost object
-        host_ds_map = []
-
-        # set osd_id_claims
-
-        def _find_inv_for_host(hostname: str, inventory_dict: dict):
-            # This is stupid and needs to be loaded with the host
-            for _host, _inventory in inventory_dict.items():
-                if _host == hostname:
-                    return _inventory
-            raise OrchestratorError("No inventory found for host: {}".format(hostname))
-
-        # 3) iterate over matching_host and call DriveSelection
-        self.log.debug(f"Checking matching hosts -> {matching_hosts}")
-        for host in matching_hosts:
-            inventory_for_host = _find_inv_for_host(host, self.cache.devices)
-            self.log.debug(f"Found inventory for host {inventory_for_host}")
-            drive_selection = DriveSelection(drive_group, inventory_for_host)
-            self.log.debug(f"Found drive selection {drive_selection}")
-            host_ds_map.append((host, drive_selection))
-        return host_ds_map
-
-    def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
-                                      drive_selection: DriveSelection,
-                                      osd_id_claims: Optional[List[str]] = None,
-                                      preview: bool = False) -> Optional[str]:
-        self.log.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
-        cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, osd_id_claims, preview=preview).run()
-        self.log.debug(f"Resulting ceph-volume cmd: {cmd}")
-        return cmd
+        return self.osd_service.create_from_spec(drive_group)
 
+    # @trivial_completion
     def preview_drivegroups(self, drive_group_name: Optional[str] = None,
                             dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
-        # find drivegroups
-        if drive_group_name:
-            drive_groups = cast(List[DriveGroupSpec],
-                                self.spec_store.find(service_name=drive_group_name))
-        elif dg_specs:
-            drive_groups = dg_specs
-        else:
-            drive_groups = []
-        ret_all = []
-        for drive_group in drive_groups:
-            drive_group.osd_id_claims = self.find_destroyed_osds()
-            self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
-            # prepare driveselection
-            for host, ds in self.prepare_drivegroup(drive_group):
-                cmd = self.driveselection_to_ceph_volume(drive_group, ds,
-                                                         drive_group.osd_id_claims.get(host, []), preview=True)
-                if not cmd:
-                    self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
-                    continue
-                out, err, code = self._run_ceph_volume_command(host, cmd)
-                if out:
-                    concat_out = json.loads(" ".join(out))
-                    ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id, 'host': host})
-        return ret_all
-
-    def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]:
-        self.inventory.assert_host(host)
-
-        # get bootstrap key
-        ret, keyring, err = self.check_mon_command({
-            'prefix': 'auth get',
-            'entity': 'client.bootstrap-osd',
-        })
-
-        # generate config
-        ret, config, err = self.check_mon_command({
-            "prefix": "config generate-minimal-conf",
-        })
-
-        j = json.dumps({
-            'config': config,
-            'keyring': keyring,
-        })
-
-        split_cmd = cmd.split(' ')
-        _cmd = ['--config-json', '-', '--']
-        _cmd.extend(split_cmd)
-        out, err, code = self._run_cephadm(
-            host, 'osd', 'ceph-volume',
-            _cmd,
-            stdin=j,
-            error_ok=True)
-        return out, err, code
-
-    def _create_osd(self, host, cmd, replace_osd_ids=None):
-        out, err, code = self._run_ceph_volume_command(host, cmd)
-
-        if code == 1 and ', it is already prepared' in '\n'.join(err):
-            # HACK: when we create against an existing LV, ceph-volume
-            # returns an error and the above message.  To make this
-            # command idempotent, tolerate this "error" and continue.
-            self.log.debug('the device was already prepared; continuing')
-            code = 0
-        if code:
-            raise RuntimeError(
-                'cephadm exited with an error code: %d, stderr:%s' % (
-                    code, '\n'.join(err)))
-
-        # check result
-        out, err, code = self._run_cephadm(
-            host, 'osd', 'ceph-volume',
-            [
-                '--',
-                'lvm', 'list',
-                '--format', 'json',
-            ])
-        before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
-        osds_elems = json.loads('\n'.join(out))
-        fsid = self._cluster_fsid
-        osd_uuid_map = self.get_osd_uuid_map()
-        created = []
-        for osd_id, osds in osds_elems.items():
-            for osd in osds:
-                if osd['tags']['ceph.cluster_fsid'] != fsid:
-                    self.log.debug('mismatched fsid, skipping %s' % osd)
-                    continue
-                if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
-                    # if it exists but is part of the replacement operation, don't skip
-                    continue
-                if osd_id not in osd_uuid_map:
-                    self.log.debug('osd id {} does not exist in cluster'.format(osd_id))
-                    continue
-                if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
-                    self.log.debug('mismatched osd uuid (cluster has %s, osd '
-                                   'has %s)' % (
-                                       osd_uuid_map.get(osd_id),
-                                       osd['tags']['ceph.osd_fsid']))
-                    continue
-
-                created.append(osd_id)
-                self._create_daemon(
-                    'osd', osd_id, host,
-                    osd_uuid_map=osd_uuid_map)
-
-        if created:
-            self.cache.invalidate_host_devices(host)
-            return "Created osd(s) %s on host '%s'" % (','.join(created), host)
-        else:
-            return "Created no osd(s) on host %s; already created?" % host
+        return self.osd_service.preview_drivegroups(drive_group_name, dg_specs)
 
     def _calc_daemon_deps(self, daemon_type, daemon_id):
         need = {
@@ -2092,7 +1911,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.cache.invalidate_host_daemons(host)
         return "Removed {} from host '{}'".format(name, host)
 
-    def _apply_service(self, spec):
+    def _apply_service(self, spec) -> bool:
         """
         Schedule a service.  Deploy new daemons or remove old ones, depending
         on the target label and count specified in the placement.
@@ -2106,7 +1925,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         create_fns = {
             'mon': self._create_mon,
             'mgr': self._create_mgr,
-            'osd': self.create_osds,
+            'osd': self.create_osds,  # osds work a bit different.
             'mds': self._create_mds,
             'rgw': self._create_rgw,
             'rbd-mirror': self._create_rbd_mirror,
diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py
new file mode 100644 (file)
index 0000000..54cee38
--- /dev/null
@@ -0,0 +1,12 @@
+from typing import  TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from cephadm.module import CephadmOrchestrator
+
+
+class CephadmService:
+    """
+    Base class for service types. Often providing a create() and config() fn.
+    """
+    def __init__(self, mgr: "CephadmOrchestrator"):
+        self.mgr: "CephadmOrchestrator" = mgr
\ No newline at end of file
index 2eaa9344ab6de7d5ecac26aa7fb53e4465d72efb..b636dbaad2a25ecaf6cc7230d63d05871b239b68 100644 (file)
@@ -10,6 +10,7 @@ from orchestrator import OrchestratorError
 
 from .. import utils
 
+from .cephadmservice import CephadmService
 logger = logging.getLogger(__name__)
 
 class NFSGanesha(object):
index 2ead0cb8077f6508f82c8866e2e22397cfe93818..cf61feb6a0955361eaa27a08b42f5470d1c3ca5f 100644 (file)
 import datetime
 import json
 import logging
-import time
+from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional
 
-from typing import List, Dict, Any, Set, Union
+from ceph.deployment import translate
+from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.drive_selection import DriveSelection
 
 import orchestrator
 from orchestrator import OrchestratorError
+from mgr_module import MonCommandFailed
+
+from cephadm.services.cephadmservice import CephadmService
+
 
 logger = logging.getLogger(__name__)
 
 
+class OSDService(CephadmService):
+    def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
+        logger.debug(f"Processing DriveGroup {drive_group}")
+        ret = []
+        drive_group.osd_id_claims = self.find_destroyed_osds()
+        logger.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
+        for host, drive_selection in self.prepare_drivegroup(drive_group):
+            logger.info('Applying %s on host %s...' % (drive_group.service_id, host))
+            cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection,
+                                                     drive_group.osd_id_claims.get(host, []))
+            if not cmd:
+                logger.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
+                continue
+            ret_msg = self.create(host, cmd,
+                                              replace_osd_ids=drive_group.osd_id_claims.get(host, []))
+            ret.append(ret_msg)
+        return ", ".join(ret)
+        
+    def create(self, host: str, cmd: str, replace_osd_ids=None) -> str:
+        out, err, code = self._run_ceph_volume_command(host, cmd)
+
+        if code == 1 and ', it is already prepared' in '\n'.join(err):
+            # HACK: when we create against an existing LV, ceph-volume
+            # returns an error and the above message.  To make this
+            # command idempotent, tolerate this "error" and continue.
+            logger.debug('the device was already prepared; continuing')
+            code = 0
+        if code:
+            raise RuntimeError(
+                'cephadm exited with an error code: %d, stderr:%s' % (
+                    code, '\n'.join(err)))
+
+        # check result
+        out, err, code = self.mgr._run_cephadm(
+            host, 'osd', 'ceph-volume',
+            [
+                '--',
+                'lvm', 'list',
+                '--format', 'json',
+            ])
+        before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True)
+        osds_elems = json.loads('\n'.join(out))
+        fsid = self.mgr._cluster_fsid
+        osd_uuid_map = self.mgr.get_osd_uuid_map()
+        created = []
+        for osd_id, osds in osds_elems.items():
+            for osd in osds:
+                if osd['tags']['ceph.cluster_fsid'] != fsid:
+                    logger.debug('mismatched fsid, skipping %s' % osd)
+                    continue
+                if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
+                    # if it exists but is part of the replacement operation, don't skip
+                    continue
+                if osd_id not in osd_uuid_map:
+                    logger.debug('osd id {} does not exist in cluster'.format(osd_id))
+                    continue
+                if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
+                    logger.debug('mismatched osd uuid (cluster has %s, osd '
+                                   'has %s)' % (
+                                       osd_uuid_map.get(osd_id),
+                                       osd['tags']['ceph.osd_fsid']))
+                    continue
+
+                created.append(osd_id)
+                self.mgr._create_daemon(
+                    'osd', osd_id, host,
+                    osd_uuid_map=osd_uuid_map)
+
+        if created:
+            self.mgr.cache.invalidate_host_devices(host)
+            return "Created osd(s) %s on host '%s'" % (','.join(created), host)
+        else:
+            return "Created no osd(s) on host %s; already created?" % host
+
+    def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[
+        Tuple[str, DriveSelection]]:
+        # 1) use fn_filter to determine matching_hosts
+        matching_hosts = drive_group.placement.pattern_matches_hosts(
+            [x for x in self.mgr.cache.get_hosts()])
+        # 2) Map the inventory to the InventoryHost object
+        host_ds_map = []
+
+        # set osd_id_claims
+
+        def _find_inv_for_host(hostname: str, inventory_dict: dict):
+            # This is stupid and needs to be loaded with the host
+            for _host, _inventory in inventory_dict.items():
+                if _host == hostname:
+                    return _inventory
+            raise OrchestratorError("No inventory found for host: {}".format(hostname))
+
+        # 3) iterate over matching_host and call DriveSelection
+        logger.debug(f"Checking matching hosts -> {matching_hosts}")
+        for host in matching_hosts:
+            inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices)
+            logger.debug(f"Found inventory for host {inventory_for_host}")
+            drive_selection = DriveSelection(drive_group, inventory_for_host)
+            logger.debug(f"Found drive selection {drive_selection}")
+            host_ds_map.append((host, drive_selection))
+        return host_ds_map
+
+    def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
+                                      drive_selection: DriveSelection,
+                                      osd_id_claims: Optional[List[str]] = None,
+                                      preview: bool = False) -> Optional[str]:
+        logger.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
+        cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection,
+                                                      osd_id_claims, preview=preview).run()
+        logger.debug(f"Resulting ceph-volume cmd: {cmd}")
+        return cmd
+
+    def preview_drivegroups(self, drive_group_name: Optional[str] = None,
+                            dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[
+        Dict[str, Dict[Any, Any]]]:
+        # find drivegroups
+        if drive_group_name:
+            drive_groups = cast(List[DriveGroupSpec],
+                                self.mgr.spec_store.find(service_name=drive_group_name))
+        elif dg_specs:
+            drive_groups = dg_specs
+        else:
+            drive_groups = []
+        ret_all = []
+        for drive_group in drive_groups:
+            drive_group.osd_id_claims = self.find_destroyed_osds()
+            logger.info(
+                f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
+            # prepare driveselection
+            for host, ds in self.prepare_drivegroup(drive_group):
+                cmd = self.driveselection_to_ceph_volume(drive_group, ds,
+                                                         drive_group.osd_id_claims.get(host,
+                                                                                       []),
+                                                         preview=True)
+                if not cmd:
+                    logger.debug("No data_devices, skipping DriveGroup: {}".format(
+                        drive_group.service_name()))
+                    continue
+                out, err, code = self._run_ceph_volume_command(host, cmd)
+                if out:
+                    concat_out = json.loads(" ".join(out))
+                    ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id,
+                                    'host': host})
+        return ret_all
+
+    def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]:
+        self.mgr.inventory.assert_host(host)
+
+        # get bootstrap key
+        ret, keyring, err = self.mgr.check_mon_command({
+            'prefix': 'auth get',
+            'entity': 'client.bootstrap-osd',
+        })
+
+        # generate config
+        ret, config, err = self.mgr.check_mon_command({
+            "prefix": "config generate-minimal-conf",
+        })
+
+        j = json.dumps({
+            'config': config,
+            'keyring': keyring,
+        })
+
+        split_cmd = cmd.split(' ')
+        _cmd = ['--config-json', '-', '--']
+        _cmd.extend(split_cmd)
+        out, err, code = self.mgr._run_cephadm(
+            host, 'osd', 'ceph-volume',
+            _cmd,
+            stdin=j,
+            error_ok=True)
+        return out, err, code
+
+    def get_osdspec_affinity(self, osd_id: str) -> str:
+        return self.mgr.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '')
+
+    def find_destroyed_osds(self) -> Dict[str, List[str]]:
+        osd_host_map: Dict[str, List[str]] = dict()
+        try:
+            ret, out, err = self.mgr.check_mon_command({
+                'prefix': 'osd tree',
+                'states': ['destroyed'],
+                'format': 'json'
+            })
+        except MonCommandFailed as e:
+            logger.exception('osd tree failed')
+            raise OrchestratorError(str(e))
+        try:
+            tree = json.loads(out)
+        except json.decoder.JSONDecodeError:
+            logger.exception(f"Could not decode json -> {out}")
+            return osd_host_map
+
+        nodes = tree.get('nodes', {})
+        for node in nodes:
+            if node.get('type') == 'host':
+                osd_host_map.update(
+                    {node.get('name'): [str(_id) for _id in node.get('children', list())]}
+                )
+        return osd_host_map
+
+
 class OSDRemoval(object):
     def __init__(self,
                  osd_id: str,
index 8226bdb2ed74f018749d44e85d12e29ec0a499db..ad79a8f28263d880b9a98ccb85331cad595544ea 100644 (file)
@@ -242,14 +242,14 @@ class TestCephadm(object):
         }
         json_out = json.dumps(dict_out)
         _mon_cmd.return_value = (0, json_out, '')
-        out = cephadm_module.find_destroyed_osds()
+        out = cephadm_module.osd_service.find_destroyed_osds()
         assert out == {'host1': ['0']}
 
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
     def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module):
         _mon_cmd.return_value = (1, "", "fail_msg")
         with pytest.raises(OrchestratorError):
-            out = cephadm_module.find_destroyed_osds()
+            out = cephadm_module.osd_service.find_destroyed_osds()
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.module.SpecStore.save")
@@ -285,7 +285,7 @@ class TestCephadm(object):
     def test_prepare_drivegroup(self, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
-            out = cephadm_module.prepare_drivegroup(dg)
+            out = cephadm_module.osd_service.prepare_drivegroup(dg)
             assert len(out) == 1
             f1 = out[0]
             assert f1[0] == 'test'
@@ -310,13 +310,13 @@ class TestCephadm(object):
             dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
             ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
             preview = preview
-            out = cephadm_module.driveselection_to_ceph_volume(dg, ds, [], preview)
+            out = cephadm_module.osd_service.driveselection_to_ceph_volume(dg, ds, [], preview)
             assert out in exp_command
 
     @mock.patch("cephadm.module.SpecStore.find")
-    @mock.patch("cephadm.module.CephadmOrchestrator.prepare_drivegroup")
-    @mock.patch("cephadm.module.CephadmOrchestrator.driveselection_to_ceph_volume")
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_ceph_volume_command")
+    @mock.patch("cephadm.services.osd.OSDService.prepare_drivegroup")
+    @mock.patch("cephadm.services.osd.OSDService.driveselection_to_ceph_volume")
+    @mock.patch("cephadm.services.osd.OSDService._run_ceph_volume_command")
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
     def test_preview_drivegroups_str(self, _run_c_v_command, _ds_to_cv, _prepare_dg, _find_store, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
@@ -324,7 +324,7 @@ class TestCephadm(object):
             _find_store.return_value = [dg]
             _prepare_dg.return_value = [('host1', 'ds_dummy')]
             _run_c_v_command.return_value = ("{}", '', 0)
-            cephadm_module.preview_drivegroups(drive_group_name='foo')
+            cephadm_module.osd_service.preview_drivegroups(drive_group_name='foo')
             _find_store.assert_called_once_with(service_name='foo')
             _prepare_dg.assert_called_once_with(dg)
             _run_c_v_command.assert_called_once()