import subprocess
import uuid
-from ceph.deployment import inventory, translate
+from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.drive_selection.selector import DriveSelection
from ceph.deployment.service_spec import \
HostPlacementSpec, NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
-from mgr_module import MgrModule, HandleCommandResult, MonCommandFailed
+from mgr_module import MgrModule, HandleCommandResult
import orchestrator
from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
CLICommandMeta
from . import remotes
from . import utils
from .services.nfs import NFSGanesha
-from .services.osd import RemoveUtil, OSDRemoval
+from .services.osd import RemoveUtil, OSDRemoval, OSDService
from .inventory import Inventory, SpecStore, HostCache
try:
# in-memory only.
self.offline_hosts: Set[str] = set()
+ # services:
+ self.osd_service = OSDService(self)
+
def shutdown(self):
self.log.debug('shutdown')
self._worker_pool.close()
sd.container_image_id = d.get('container_image_id')
sd.version = d.get('version')
if sd.daemon_type == 'osd':
- sd.osdspec_affinity = self.get_osdspec_affinity(sd.daemon_id)
+ sd.osdspec_affinity = self.osd_service.get_osdspec_affinity(sd.daemon_id)
if 'state' in d:
sd.status_desc = d['state']
sd.status = {
def apply_drivegroups(self, specs: List[DriveGroupSpec]):
return [self._apply(spec) for spec in specs]
- def get_osdspec_affinity(self, osd_id: str) -> str:
- return self.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '')
-
- def find_destroyed_osds(self) -> Dict[str, List[str]]:
- osd_host_map: Dict[str, List[str]] = dict()
- try:
- ret, out, err = self.check_mon_command({
- 'prefix': 'osd tree',
- 'states': ['destroyed'],
- 'format': 'json'
- })
- except MonCommandFailed as e:
- logger.exception('osd tree failed')
- raise OrchestratorError(str(e))
- try:
- tree = json.loads(out)
- except json.decoder.JSONDecodeError:
- self.log.exception(f"Could not decode json -> {out}")
- return osd_host_map
-
- nodes = tree.get('nodes', {})
- for node in nodes:
- if node.get('type') == 'host':
- osd_host_map.update(
- {node.get('name'): [str(_id) for _id in node.get('children', list())]}
- )
- return osd_host_map
-
@trivial_completion
def create_osds(self, drive_group: DriveGroupSpec):
- self.log.debug(f"Processing DriveGroup {drive_group}")
- ret = []
- drive_group.osd_id_claims = self.find_destroyed_osds()
- self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
- for host, drive_selection in self.prepare_drivegroup(drive_group):
- self.log.info('Applying %s on host %s...' % (drive_group.service_id, host))
- cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection,
- drive_group.osd_id_claims.get(host, []))
- if not cmd:
- self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
- continue
- ret_msg = self._create_osd(host, cmd,
- replace_osd_ids=drive_group.osd_id_claims.get(host, []))
- ret.append(ret_msg)
- return ", ".join(ret)
-
- def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
- # 1) use fn_filter to determine matching_hosts
- matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()])
- # 2) Map the inventory to the InventoryHost object
- host_ds_map = []
-
- # set osd_id_claims
-
- def _find_inv_for_host(hostname: str, inventory_dict: dict):
- # This is stupid and needs to be loaded with the host
- for _host, _inventory in inventory_dict.items():
- if _host == hostname:
- return _inventory
- raise OrchestratorError("No inventory found for host: {}".format(hostname))
-
- # 3) iterate over matching_host and call DriveSelection
- self.log.debug(f"Checking matching hosts -> {matching_hosts}")
- for host in matching_hosts:
- inventory_for_host = _find_inv_for_host(host, self.cache.devices)
- self.log.debug(f"Found inventory for host {inventory_for_host}")
- drive_selection = DriveSelection(drive_group, inventory_for_host)
- self.log.debug(f"Found drive selection {drive_selection}")
- host_ds_map.append((host, drive_selection))
- return host_ds_map
-
- def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
- drive_selection: DriveSelection,
- osd_id_claims: Optional[List[str]] = None,
- preview: bool = False) -> Optional[str]:
- self.log.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
- cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, osd_id_claims, preview=preview).run()
- self.log.debug(f"Resulting ceph-volume cmd: {cmd}")
- return cmd
+ return self.osd_service.create_from_spec(drive_group)
+ # @trivial_completion
def preview_drivegroups(self, drive_group_name: Optional[str] = None,
dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
- # find drivegroups
- if drive_group_name:
- drive_groups = cast(List[DriveGroupSpec],
- self.spec_store.find(service_name=drive_group_name))
- elif dg_specs:
- drive_groups = dg_specs
- else:
- drive_groups = []
- ret_all = []
- for drive_group in drive_groups:
- drive_group.osd_id_claims = self.find_destroyed_osds()
- self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
- # prepare driveselection
- for host, ds in self.prepare_drivegroup(drive_group):
- cmd = self.driveselection_to_ceph_volume(drive_group, ds,
- drive_group.osd_id_claims.get(host, []), preview=True)
- if not cmd:
- self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
- continue
- out, err, code = self._run_ceph_volume_command(host, cmd)
- if out:
- concat_out = json.loads(" ".join(out))
- ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id, 'host': host})
- return ret_all
-
- def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]:
- self.inventory.assert_host(host)
-
- # get bootstrap key
- ret, keyring, err = self.check_mon_command({
- 'prefix': 'auth get',
- 'entity': 'client.bootstrap-osd',
- })
-
- # generate config
- ret, config, err = self.check_mon_command({
- "prefix": "config generate-minimal-conf",
- })
-
- j = json.dumps({
- 'config': config,
- 'keyring': keyring,
- })
-
- split_cmd = cmd.split(' ')
- _cmd = ['--config-json', '-', '--']
- _cmd.extend(split_cmd)
- out, err, code = self._run_cephadm(
- host, 'osd', 'ceph-volume',
- _cmd,
- stdin=j,
- error_ok=True)
- return out, err, code
-
- def _create_osd(self, host, cmd, replace_osd_ids=None):
- out, err, code = self._run_ceph_volume_command(host, cmd)
-
- if code == 1 and ', it is already prepared' in '\n'.join(err):
- # HACK: when we create against an existing LV, ceph-volume
- # returns an error and the above message. To make this
- # command idempotent, tolerate this "error" and continue.
- self.log.debug('the device was already prepared; continuing')
- code = 0
- if code:
- raise RuntimeError(
- 'cephadm exited with an error code: %d, stderr:%s' % (
- code, '\n'.join(err)))
-
- # check result
- out, err, code = self._run_cephadm(
- host, 'osd', 'ceph-volume',
- [
- '--',
- 'lvm', 'list',
- '--format', 'json',
- ])
- before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
- osds_elems = json.loads('\n'.join(out))
- fsid = self._cluster_fsid
- osd_uuid_map = self.get_osd_uuid_map()
- created = []
- for osd_id, osds in osds_elems.items():
- for osd in osds:
- if osd['tags']['ceph.cluster_fsid'] != fsid:
- self.log.debug('mismatched fsid, skipping %s' % osd)
- continue
- if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
- # if it exists but is part of the replacement operation, don't skip
- continue
- if osd_id not in osd_uuid_map:
- self.log.debug('osd id {} does not exist in cluster'.format(osd_id))
- continue
- if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
- self.log.debug('mismatched osd uuid (cluster has %s, osd '
- 'has %s)' % (
- osd_uuid_map.get(osd_id),
- osd['tags']['ceph.osd_fsid']))
- continue
-
- created.append(osd_id)
- self._create_daemon(
- 'osd', osd_id, host,
- osd_uuid_map=osd_uuid_map)
-
- if created:
- self.cache.invalidate_host_devices(host)
- return "Created osd(s) %s on host '%s'" % (','.join(created), host)
- else:
- return "Created no osd(s) on host %s; already created?" % host
+ return self.osd_service.preview_drivegroups(drive_group_name, dg_specs)
def _calc_daemon_deps(self, daemon_type, daemon_id):
need = {
self.cache.invalidate_host_daemons(host)
return "Removed {} from host '{}'".format(name, host)
- def _apply_service(self, spec):
+ def _apply_service(self, spec) -> bool:
"""
Schedule a service. Deploy new daemons or remove old ones, depending
on the target label and count specified in the placement.
create_fns = {
'mon': self._create_mon,
'mgr': self._create_mgr,
- 'osd': self.create_osds,
+ 'osd': self.create_osds, # osds work a bit different.
'mds': self._create_mds,
'rgw': self._create_rgw,
'rbd-mirror': self._create_rbd_mirror,
--- /dev/null
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+
+class CephadmService:
+ """
+ Base class for service types. Often providing a create() and config() fn.
+ """
+ def __init__(self, mgr: "CephadmOrchestrator"):
+ self.mgr: "CephadmOrchestrator" = mgr
\ No newline at end of file
from .. import utils
+from .cephadmservice import CephadmService
logger = logging.getLogger(__name__)
class NFSGanesha(object):
import datetime
import json
import logging
-import time
+from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional
-from typing import List, Dict, Any, Set, Union
+from ceph.deployment import translate
+from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.drive_selection import DriveSelection
import orchestrator
from orchestrator import OrchestratorError
+from mgr_module import MonCommandFailed
+
+from cephadm.services.cephadmservice import CephadmService
+
logger = logging.getLogger(__name__)
+class OSDService(CephadmService):
+ def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
+ logger.debug(f"Processing DriveGroup {drive_group}")
+ ret = []
+ drive_group.osd_id_claims = self.find_destroyed_osds()
+ logger.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
+ for host, drive_selection in self.prepare_drivegroup(drive_group):
+ logger.info('Applying %s on host %s...' % (drive_group.service_id, host))
+ cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection,
+ drive_group.osd_id_claims.get(host, []))
+ if not cmd:
+ logger.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
+ continue
+ ret_msg = self.create(host, cmd,
+ replace_osd_ids=drive_group.osd_id_claims.get(host, []))
+ ret.append(ret_msg)
+ return ", ".join(ret)
+
+ def create(self, host: str, cmd: str, replace_osd_ids=None) -> str:
+ out, err, code = self._run_ceph_volume_command(host, cmd)
+
+ if code == 1 and ', it is already prepared' in '\n'.join(err):
+ # HACK: when we create against an existing LV, ceph-volume
+ # returns an error and the above message. To make this
+ # command idempotent, tolerate this "error" and continue.
+ logger.debug('the device was already prepared; continuing')
+ code = 0
+ if code:
+ raise RuntimeError(
+ 'cephadm exited with an error code: %d, stderr:%s' % (
+ code, '\n'.join(err)))
+
+ # check result
+ out, err, code = self.mgr._run_cephadm(
+ host, 'osd', 'ceph-volume',
+ [
+ '--',
+ 'lvm', 'list',
+ '--format', 'json',
+ ])
+ before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True)
+ osds_elems = json.loads('\n'.join(out))
+ fsid = self.mgr._cluster_fsid
+ osd_uuid_map = self.mgr.get_osd_uuid_map()
+ created = []
+ for osd_id, osds in osds_elems.items():
+ for osd in osds:
+ if osd['tags']['ceph.cluster_fsid'] != fsid:
+ logger.debug('mismatched fsid, skipping %s' % osd)
+ continue
+ if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
+ # if it exists but is part of the replacement operation, don't skip
+ continue
+ if osd_id not in osd_uuid_map:
+ logger.debug('osd id {} does not exist in cluster'.format(osd_id))
+ continue
+ if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
+ logger.debug('mismatched osd uuid (cluster has %s, osd '
+ 'has %s)' % (
+ osd_uuid_map.get(osd_id),
+ osd['tags']['ceph.osd_fsid']))
+ continue
+
+ created.append(osd_id)
+ self.mgr._create_daemon(
+ 'osd', osd_id, host,
+ osd_uuid_map=osd_uuid_map)
+
+ if created:
+ self.mgr.cache.invalidate_host_devices(host)
+ return "Created osd(s) %s on host '%s'" % (','.join(created), host)
+ else:
+ return "Created no osd(s) on host %s; already created?" % host
+
+ def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[
+ Tuple[str, DriveSelection]]:
+ # 1) use fn_filter to determine matching_hosts
+ matching_hosts = drive_group.placement.pattern_matches_hosts(
+ [x for x in self.mgr.cache.get_hosts()])
+ # 2) Map the inventory to the InventoryHost object
+ host_ds_map = []
+
+ # set osd_id_claims
+
+ def _find_inv_for_host(hostname: str, inventory_dict: dict):
+ # This is stupid and needs to be loaded with the host
+ for _host, _inventory in inventory_dict.items():
+ if _host == hostname:
+ return _inventory
+ raise OrchestratorError("No inventory found for host: {}".format(hostname))
+
+ # 3) iterate over matching_host and call DriveSelection
+ logger.debug(f"Checking matching hosts -> {matching_hosts}")
+ for host in matching_hosts:
+ inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices)
+ logger.debug(f"Found inventory for host {inventory_for_host}")
+ drive_selection = DriveSelection(drive_group, inventory_for_host)
+ logger.debug(f"Found drive selection {drive_selection}")
+ host_ds_map.append((host, drive_selection))
+ return host_ds_map
+
+ def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
+ drive_selection: DriveSelection,
+ osd_id_claims: Optional[List[str]] = None,
+ preview: bool = False) -> Optional[str]:
+ logger.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
+ cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection,
+ osd_id_claims, preview=preview).run()
+ logger.debug(f"Resulting ceph-volume cmd: {cmd}")
+ return cmd
+
+ def preview_drivegroups(self, drive_group_name: Optional[str] = None,
+ dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[
+ Dict[str, Dict[Any, Any]]]:
+ # find drivegroups
+ if drive_group_name:
+ drive_groups = cast(List[DriveGroupSpec],
+ self.mgr.spec_store.find(service_name=drive_group_name))
+ elif dg_specs:
+ drive_groups = dg_specs
+ else:
+ drive_groups = []
+ ret_all = []
+ for drive_group in drive_groups:
+ drive_group.osd_id_claims = self.find_destroyed_osds()
+ logger.info(
+ f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
+ # prepare driveselection
+ for host, ds in self.prepare_drivegroup(drive_group):
+ cmd = self.driveselection_to_ceph_volume(drive_group, ds,
+ drive_group.osd_id_claims.get(host,
+ []),
+ preview=True)
+ if not cmd:
+ logger.debug("No data_devices, skipping DriveGroup: {}".format(
+ drive_group.service_name()))
+ continue
+ out, err, code = self._run_ceph_volume_command(host, cmd)
+ if out:
+ concat_out = json.loads(" ".join(out))
+ ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id,
+ 'host': host})
+ return ret_all
+
+ def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]:
+ self.mgr.inventory.assert_host(host)
+
+ # get bootstrap key
+ ret, keyring, err = self.mgr.check_mon_command({
+ 'prefix': 'auth get',
+ 'entity': 'client.bootstrap-osd',
+ })
+
+ # generate config
+ ret, config, err = self.mgr.check_mon_command({
+ "prefix": "config generate-minimal-conf",
+ })
+
+ j = json.dumps({
+ 'config': config,
+ 'keyring': keyring,
+ })
+
+ split_cmd = cmd.split(' ')
+ _cmd = ['--config-json', '-', '--']
+ _cmd.extend(split_cmd)
+ out, err, code = self.mgr._run_cephadm(
+ host, 'osd', 'ceph-volume',
+ _cmd,
+ stdin=j,
+ error_ok=True)
+ return out, err, code
+
+ def get_osdspec_affinity(self, osd_id: str) -> str:
+ return self.mgr.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '')
+
+ def find_destroyed_osds(self) -> Dict[str, List[str]]:
+ osd_host_map: Dict[str, List[str]] = dict()
+ try:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'osd tree',
+ 'states': ['destroyed'],
+ 'format': 'json'
+ })
+ except MonCommandFailed as e:
+ logger.exception('osd tree failed')
+ raise OrchestratorError(str(e))
+ try:
+ tree = json.loads(out)
+ except json.decoder.JSONDecodeError:
+ logger.exception(f"Could not decode json -> {out}")
+ return osd_host_map
+
+ nodes = tree.get('nodes', {})
+ for node in nodes:
+ if node.get('type') == 'host':
+ osd_host_map.update(
+ {node.get('name'): [str(_id) for _id in node.get('children', list())]}
+ )
+ return osd_host_map
+
+
class OSDRemoval(object):
def __init__(self,
osd_id: str,
}
json_out = json.dumps(dict_out)
_mon_cmd.return_value = (0, json_out, '')
- out = cephadm_module.find_destroyed_osds()
+ out = cephadm_module.osd_service.find_destroyed_osds()
assert out == {'host1': ['0']}
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module):
_mon_cmd.return_value = (1, "", "fail_msg")
with pytest.raises(OrchestratorError):
- out = cephadm_module.find_destroyed_osds()
+ out = cephadm_module.osd_service.find_destroyed_osds()
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")
def test_prepare_drivegroup(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
- out = cephadm_module.prepare_drivegroup(dg)
+ out = cephadm_module.osd_service.prepare_drivegroup(dg)
assert len(out) == 1
f1 = out[0]
assert f1[0] == 'test'
dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
preview = preview
- out = cephadm_module.driveselection_to_ceph_volume(dg, ds, [], preview)
+ out = cephadm_module.osd_service.driveselection_to_ceph_volume(dg, ds, [], preview)
assert out in exp_command
@mock.patch("cephadm.module.SpecStore.find")
- @mock.patch("cephadm.module.CephadmOrchestrator.prepare_drivegroup")
- @mock.patch("cephadm.module.CephadmOrchestrator.driveselection_to_ceph_volume")
- @mock.patch("cephadm.module.CephadmOrchestrator._run_ceph_volume_command")
+ @mock.patch("cephadm.services.osd.OSDService.prepare_drivegroup")
+ @mock.patch("cephadm.services.osd.OSDService.driveselection_to_ceph_volume")
+ @mock.patch("cephadm.services.osd.OSDService._run_ceph_volume_command")
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_preview_drivegroups_str(self, _run_c_v_command, _ds_to_cv, _prepare_dg, _find_store, cephadm_module):
with self._with_host(cephadm_module, 'test'):
_find_store.return_value = [dg]
_prepare_dg.return_value = [('host1', 'ds_dummy')]
_run_c_v_command.return_value = ("{}", '', 0)
- cephadm_module.preview_drivegroups(drive_group_name='foo')
+ cephadm_module.osd_service.preview_drivegroups(drive_group_name='foo')
_find_store.assert_called_once_with(service_name='foo')
_prepare_dg.assert_called_once_with(dg)
_run_c_v_command.assert_called_once()