From 1bd19fc967bcc7c2abfb6e24512c55663c030135 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Mon, 4 May 2020 13:12:38 +0200 Subject: [PATCH] mgr/cephadm: move OSD related code to OSDService Signed-off-by: Sebastian Wagner --- src/pybind/mgr/cephadm/module.py | 205 +---------------- .../mgr/cephadm/services/cephadmservice.py | 12 + src/pybind/mgr/cephadm/services/nfs.py | 1 + src/pybind/mgr/cephadm/services/osd.py | 212 +++++++++++++++++- src/pybind/mgr/cephadm/tests/test_cephadm.py | 16 +- 5 files changed, 243 insertions(+), 203 deletions(-) create mode 100644 src/pybind/mgr/cephadm/services/cephadmservice.py diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 541737bd5e9..d1699d3bcad 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -26,13 +26,12 @@ import shutil import subprocess import uuid -from ceph.deployment import inventory, translate +from ceph.deployment import inventory from ceph.deployment.drive_group import DriveGroupSpec -from ceph.deployment.drive_selection.selector import DriveSelection from ceph.deployment.service_spec import \ HostPlacementSpec, NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host -from mgr_module import MgrModule, HandleCommandResult, MonCommandFailed +from mgr_module import MgrModule, HandleCommandResult import orchestrator from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \ CLICommandMeta @@ -40,7 +39,7 @@ from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpe from . import remotes from . import utils from .services.nfs import NFSGanesha -from .services.osd import RemoveUtil, OSDRemoval +from .services.osd import RemoveUtil, OSDRemoval, OSDService from .inventory import Inventory, SpecStore, HostCache try: @@ -420,6 +419,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): # in-memory only. self.offline_hosts: Set[str] = set() + # services: + self.osd_service = OSDService(self) + def shutdown(self): self.log.debug('shutdown') self._worker_pool.close() @@ -1430,7 +1432,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): sd.container_image_id = d.get('container_image_id') sd.version = d.get('version') if sd.daemon_type == 'osd': - sd.osdspec_affinity = self.get_osdspec_affinity(sd.daemon_id) + sd.osdspec_affinity = self.osd_service.get_osdspec_affinity(sd.daemon_id) if 'state' in d: sd.status_desc = d['state'] sd.status = { @@ -1755,197 +1757,14 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): def apply_drivegroups(self, specs: List[DriveGroupSpec]): return [self._apply(spec) for spec in specs] - def get_osdspec_affinity(self, osd_id: str) -> str: - return self.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '') - - def find_destroyed_osds(self) -> Dict[str, List[str]]: - osd_host_map: Dict[str, List[str]] = dict() - try: - ret, out, err = self.check_mon_command({ - 'prefix': 'osd tree', - 'states': ['destroyed'], - 'format': 'json' - }) - except MonCommandFailed as e: - logger.exception('osd tree failed') - raise OrchestratorError(str(e)) - try: - tree = json.loads(out) - except json.decoder.JSONDecodeError: - self.log.exception(f"Could not decode json -> {out}") - return osd_host_map - - nodes = tree.get('nodes', {}) - for node in nodes: - if node.get('type') == 'host': - osd_host_map.update( - {node.get('name'): [str(_id) for _id in node.get('children', list())]} - ) - return osd_host_map - @trivial_completion def create_osds(self, drive_group: DriveGroupSpec): - self.log.debug(f"Processing DriveGroup {drive_group}") - ret = [] - drive_group.osd_id_claims = self.find_destroyed_osds() - self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}") - for host, drive_selection in self.prepare_drivegroup(drive_group): - self.log.info('Applying %s on host %s...' % (drive_group.service_id, host)) - cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection, - drive_group.osd_id_claims.get(host, [])) - if not cmd: - self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id)) - continue - ret_msg = self._create_osd(host, cmd, - replace_osd_ids=drive_group.osd_id_claims.get(host, [])) - ret.append(ret_msg) - return ", ".join(ret) - - def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]: - # 1) use fn_filter to determine matching_hosts - matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()]) - # 2) Map the inventory to the InventoryHost object - host_ds_map = [] - - # set osd_id_claims - - def _find_inv_for_host(hostname: str, inventory_dict: dict): - # This is stupid and needs to be loaded with the host - for _host, _inventory in inventory_dict.items(): - if _host == hostname: - return _inventory - raise OrchestratorError("No inventory found for host: {}".format(hostname)) - - # 3) iterate over matching_host and call DriveSelection - self.log.debug(f"Checking matching hosts -> {matching_hosts}") - for host in matching_hosts: - inventory_for_host = _find_inv_for_host(host, self.cache.devices) - self.log.debug(f"Found inventory for host {inventory_for_host}") - drive_selection = DriveSelection(drive_group, inventory_for_host) - self.log.debug(f"Found drive selection {drive_selection}") - host_ds_map.append((host, drive_selection)) - return host_ds_map - - def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec, - drive_selection: DriveSelection, - osd_id_claims: Optional[List[str]] = None, - preview: bool = False) -> Optional[str]: - self.log.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command") - cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, osd_id_claims, preview=preview).run() - self.log.debug(f"Resulting ceph-volume cmd: {cmd}") - return cmd + return self.osd_service.create_from_spec(drive_group) + # @trivial_completion def preview_drivegroups(self, drive_group_name: Optional[str] = None, dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]: - # find drivegroups - if drive_group_name: - drive_groups = cast(List[DriveGroupSpec], - self.spec_store.find(service_name=drive_group_name)) - elif dg_specs: - drive_groups = dg_specs - else: - drive_groups = [] - ret_all = [] - for drive_group in drive_groups: - drive_group.osd_id_claims = self.find_destroyed_osds() - self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}") - # prepare driveselection - for host, ds in self.prepare_drivegroup(drive_group): - cmd = self.driveselection_to_ceph_volume(drive_group, ds, - drive_group.osd_id_claims.get(host, []), preview=True) - if not cmd: - self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name())) - continue - out, err, code = self._run_ceph_volume_command(host, cmd) - if out: - concat_out = json.loads(" ".join(out)) - ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id, 'host': host}) - return ret_all - - def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]: - self.inventory.assert_host(host) - - # get bootstrap key - ret, keyring, err = self.check_mon_command({ - 'prefix': 'auth get', - 'entity': 'client.bootstrap-osd', - }) - - # generate config - ret, config, err = self.check_mon_command({ - "prefix": "config generate-minimal-conf", - }) - - j = json.dumps({ - 'config': config, - 'keyring': keyring, - }) - - split_cmd = cmd.split(' ') - _cmd = ['--config-json', '-', '--'] - _cmd.extend(split_cmd) - out, err, code = self._run_cephadm( - host, 'osd', 'ceph-volume', - _cmd, - stdin=j, - error_ok=True) - return out, err, code - - def _create_osd(self, host, cmd, replace_osd_ids=None): - out, err, code = self._run_ceph_volume_command(host, cmd) - - if code == 1 and ', it is already prepared' in '\n'.join(err): - # HACK: when we create against an existing LV, ceph-volume - # returns an error and the above message. To make this - # command idempotent, tolerate this "error" and continue. - self.log.debug('the device was already prepared; continuing') - code = 0 - if code: - raise RuntimeError( - 'cephadm exited with an error code: %d, stderr:%s' % ( - code, '\n'.join(err))) - - # check result - out, err, code = self._run_cephadm( - host, 'osd', 'ceph-volume', - [ - '--', - 'lvm', 'list', - '--format', 'json', - ]) - before_osd_uuid_map = self.get_osd_uuid_map(only_up=True) - osds_elems = json.loads('\n'.join(out)) - fsid = self._cluster_fsid - osd_uuid_map = self.get_osd_uuid_map() - created = [] - for osd_id, osds in osds_elems.items(): - for osd in osds: - if osd['tags']['ceph.cluster_fsid'] != fsid: - self.log.debug('mismatched fsid, skipping %s' % osd) - continue - if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids: - # if it exists but is part of the replacement operation, don't skip - continue - if osd_id not in osd_uuid_map: - self.log.debug('osd id {} does not exist in cluster'.format(osd_id)) - continue - if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']: - self.log.debug('mismatched osd uuid (cluster has %s, osd ' - 'has %s)' % ( - osd_uuid_map.get(osd_id), - osd['tags']['ceph.osd_fsid'])) - continue - - created.append(osd_id) - self._create_daemon( - 'osd', osd_id, host, - osd_uuid_map=osd_uuid_map) - - if created: - self.cache.invalidate_host_devices(host) - return "Created osd(s) %s on host '%s'" % (','.join(created), host) - else: - return "Created no osd(s) on host %s; already created?" % host + return self.osd_service.preview_drivegroups(drive_group_name, dg_specs) def _calc_daemon_deps(self, daemon_type, daemon_id): need = { @@ -2092,7 +1911,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.cache.invalidate_host_daemons(host) return "Removed {} from host '{}'".format(name, host) - def _apply_service(self, spec): + def _apply_service(self, spec) -> bool: """ Schedule a service. Deploy new daemons or remove old ones, depending on the target label and count specified in the placement. @@ -2106,7 +1925,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): create_fns = { 'mon': self._create_mon, 'mgr': self._create_mgr, - 'osd': self.create_osds, + 'osd': self.create_osds, # osds work a bit different. 'mds': self._create_mds, 'rgw': self._create_rgw, 'rbd-mirror': self._create_rbd_mirror, diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py new file mode 100644 index 00000000000..54cee382065 --- /dev/null +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -0,0 +1,12 @@ +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from cephadm.module import CephadmOrchestrator + + +class CephadmService: + """ + Base class for service types. Often providing a create() and config() fn. + """ + def __init__(self, mgr: "CephadmOrchestrator"): + self.mgr: "CephadmOrchestrator" = mgr \ No newline at end of file diff --git a/src/pybind/mgr/cephadm/services/nfs.py b/src/pybind/mgr/cephadm/services/nfs.py index 2eaa9344ab6..b636dbaad2a 100644 --- a/src/pybind/mgr/cephadm/services/nfs.py +++ b/src/pybind/mgr/cephadm/services/nfs.py @@ -10,6 +10,7 @@ from orchestrator import OrchestratorError from .. import utils +from .cephadmservice import CephadmService logger = logging.getLogger(__name__) class NFSGanesha(object): diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py index 2ead0cb8077..cf61feb6a09 100644 --- a/src/pybind/mgr/cephadm/services/osd.py +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -1,16 +1,224 @@ import datetime import json import logging -import time +from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional -from typing import List, Dict, Any, Set, Union +from ceph.deployment import translate +from ceph.deployment.drive_group import DriveGroupSpec +from ceph.deployment.drive_selection import DriveSelection import orchestrator from orchestrator import OrchestratorError +from mgr_module import MonCommandFailed + +from cephadm.services.cephadmservice import CephadmService + logger = logging.getLogger(__name__) +class OSDService(CephadmService): + def create_from_spec(self, drive_group: DriveGroupSpec) -> str: + logger.debug(f"Processing DriveGroup {drive_group}") + ret = [] + drive_group.osd_id_claims = self.find_destroyed_osds() + logger.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}") + for host, drive_selection in self.prepare_drivegroup(drive_group): + logger.info('Applying %s on host %s...' % (drive_group.service_id, host)) + cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection, + drive_group.osd_id_claims.get(host, [])) + if not cmd: + logger.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id)) + continue + ret_msg = self.create(host, cmd, + replace_osd_ids=drive_group.osd_id_claims.get(host, [])) + ret.append(ret_msg) + return ", ".join(ret) + + def create(self, host: str, cmd: str, replace_osd_ids=None) -> str: + out, err, code = self._run_ceph_volume_command(host, cmd) + + if code == 1 and ', it is already prepared' in '\n'.join(err): + # HACK: when we create against an existing LV, ceph-volume + # returns an error and the above message. To make this + # command idempotent, tolerate this "error" and continue. + logger.debug('the device was already prepared; continuing') + code = 0 + if code: + raise RuntimeError( + 'cephadm exited with an error code: %d, stderr:%s' % ( + code, '\n'.join(err))) + + # check result + out, err, code = self.mgr._run_cephadm( + host, 'osd', 'ceph-volume', + [ + '--', + 'lvm', 'list', + '--format', 'json', + ]) + before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True) + osds_elems = json.loads('\n'.join(out)) + fsid = self.mgr._cluster_fsid + osd_uuid_map = self.mgr.get_osd_uuid_map() + created = [] + for osd_id, osds in osds_elems.items(): + for osd in osds: + if osd['tags']['ceph.cluster_fsid'] != fsid: + logger.debug('mismatched fsid, skipping %s' % osd) + continue + if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids: + # if it exists but is part of the replacement operation, don't skip + continue + if osd_id not in osd_uuid_map: + logger.debug('osd id {} does not exist in cluster'.format(osd_id)) + continue + if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']: + logger.debug('mismatched osd uuid (cluster has %s, osd ' + 'has %s)' % ( + osd_uuid_map.get(osd_id), + osd['tags']['ceph.osd_fsid'])) + continue + + created.append(osd_id) + self.mgr._create_daemon( + 'osd', osd_id, host, + osd_uuid_map=osd_uuid_map) + + if created: + self.mgr.cache.invalidate_host_devices(host) + return "Created osd(s) %s on host '%s'" % (','.join(created), host) + else: + return "Created no osd(s) on host %s; already created?" % host + + def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[ + Tuple[str, DriveSelection]]: + # 1) use fn_filter to determine matching_hosts + matching_hosts = drive_group.placement.pattern_matches_hosts( + [x for x in self.mgr.cache.get_hosts()]) + # 2) Map the inventory to the InventoryHost object + host_ds_map = [] + + # set osd_id_claims + + def _find_inv_for_host(hostname: str, inventory_dict: dict): + # This is stupid and needs to be loaded with the host + for _host, _inventory in inventory_dict.items(): + if _host == hostname: + return _inventory + raise OrchestratorError("No inventory found for host: {}".format(hostname)) + + # 3) iterate over matching_host and call DriveSelection + logger.debug(f"Checking matching hosts -> {matching_hosts}") + for host in matching_hosts: + inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices) + logger.debug(f"Found inventory for host {inventory_for_host}") + drive_selection = DriveSelection(drive_group, inventory_for_host) + logger.debug(f"Found drive selection {drive_selection}") + host_ds_map.append((host, drive_selection)) + return host_ds_map + + def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec, + drive_selection: DriveSelection, + osd_id_claims: Optional[List[str]] = None, + preview: bool = False) -> Optional[str]: + logger.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command") + cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, + osd_id_claims, preview=preview).run() + logger.debug(f"Resulting ceph-volume cmd: {cmd}") + return cmd + + def preview_drivegroups(self, drive_group_name: Optional[str] = None, + dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[ + Dict[str, Dict[Any, Any]]]: + # find drivegroups + if drive_group_name: + drive_groups = cast(List[DriveGroupSpec], + self.mgr.spec_store.find(service_name=drive_group_name)) + elif dg_specs: + drive_groups = dg_specs + else: + drive_groups = [] + ret_all = [] + for drive_group in drive_groups: + drive_group.osd_id_claims = self.find_destroyed_osds() + logger.info( + f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}") + # prepare driveselection + for host, ds in self.prepare_drivegroup(drive_group): + cmd = self.driveselection_to_ceph_volume(drive_group, ds, + drive_group.osd_id_claims.get(host, + []), + preview=True) + if not cmd: + logger.debug("No data_devices, skipping DriveGroup: {}".format( + drive_group.service_name())) + continue + out, err, code = self._run_ceph_volume_command(host, cmd) + if out: + concat_out = json.loads(" ".join(out)) + ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id, + 'host': host}) + return ret_all + + def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]: + self.mgr.inventory.assert_host(host) + + # get bootstrap key + ret, keyring, err = self.mgr.check_mon_command({ + 'prefix': 'auth get', + 'entity': 'client.bootstrap-osd', + }) + + # generate config + ret, config, err = self.mgr.check_mon_command({ + "prefix": "config generate-minimal-conf", + }) + + j = json.dumps({ + 'config': config, + 'keyring': keyring, + }) + + split_cmd = cmd.split(' ') + _cmd = ['--config-json', '-', '--'] + _cmd.extend(split_cmd) + out, err, code = self.mgr._run_cephadm( + host, 'osd', 'ceph-volume', + _cmd, + stdin=j, + error_ok=True) + return out, err, code + + def get_osdspec_affinity(self, osd_id: str) -> str: + return self.mgr.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '') + + def find_destroyed_osds(self) -> Dict[str, List[str]]: + osd_host_map: Dict[str, List[str]] = dict() + try: + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'osd tree', + 'states': ['destroyed'], + 'format': 'json' + }) + except MonCommandFailed as e: + logger.exception('osd tree failed') + raise OrchestratorError(str(e)) + try: + tree = json.loads(out) + except json.decoder.JSONDecodeError: + logger.exception(f"Could not decode json -> {out}") + return osd_host_map + + nodes = tree.get('nodes', {}) + for node in nodes: + if node.get('type') == 'host': + osd_host_map.update( + {node.get('name'): [str(_id) for _id in node.get('children', list())]} + ) + return osd_host_map + + class OSDRemoval(object): def __init__(self, osd_id: str, diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 8226bdb2ed7..ad79a8f2826 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -242,14 +242,14 @@ class TestCephadm(object): } json_out = json.dumps(dict_out) _mon_cmd.return_value = (0, json_out, '') - out = cephadm_module.find_destroyed_osds() + out = cephadm_module.osd_service.find_destroyed_osds() assert out == {'host1': ['0']} @mock.patch("cephadm.module.CephadmOrchestrator.mon_command") def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module): _mon_cmd.return_value = (1, "", "fail_msg") with pytest.raises(OrchestratorError): - out = cephadm_module.find_destroyed_osds() + out = cephadm_module.osd_service.find_destroyed_osds() @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.module.SpecStore.save") @@ -285,7 +285,7 @@ class TestCephadm(object): def test_prepare_drivegroup(self, cephadm_module): with self._with_host(cephadm_module, 'test'): dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=[''])) - out = cephadm_module.prepare_drivegroup(dg) + out = cephadm_module.osd_service.prepare_drivegroup(dg) assert len(out) == 1 f1 = out[0] assert f1[0] == 'test' @@ -310,13 +310,13 @@ class TestCephadm(object): dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices)) ds = DriveSelection(dg, Devices([Device(path) for path in devices])) preview = preview - out = cephadm_module.driveselection_to_ceph_volume(dg, ds, [], preview) + out = cephadm_module.osd_service.driveselection_to_ceph_volume(dg, ds, [], preview) assert out in exp_command @mock.patch("cephadm.module.SpecStore.find") - @mock.patch("cephadm.module.CephadmOrchestrator.prepare_drivegroup") - @mock.patch("cephadm.module.CephadmOrchestrator.driveselection_to_ceph_volume") - @mock.patch("cephadm.module.CephadmOrchestrator._run_ceph_volume_command") + @mock.patch("cephadm.services.osd.OSDService.prepare_drivegroup") + @mock.patch("cephadm.services.osd.OSDService.driveselection_to_ceph_volume") + @mock.patch("cephadm.services.osd.OSDService._run_ceph_volume_command") @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) def test_preview_drivegroups_str(self, _run_c_v_command, _ds_to_cv, _prepare_dg, _find_store, cephadm_module): with self._with_host(cephadm_module, 'test'): @@ -324,7 +324,7 @@ class TestCephadm(object): _find_store.return_value = [dg] _prepare_dg.return_value = [('host1', 'ds_dummy')] _run_c_v_command.return_value = ("{}", '', 0) - cephadm_module.preview_drivegroups(drive_group_name='foo') + cephadm_module.osd_service.preview_drivegroups(drive_group_name='foo') _find_store.assert_called_once_with(service_name='foo') _prepare_dg.assert_called_once_with(dg) _run_c_v_command.assert_called_once() -- 2.39.5