r[str(o['osd'])] = o['uuid']
return r
- def call_inventory(self, hosts, drive_groups):
- def call_create(inventory):
- return self._prepare_deployment(hosts, drive_groups, inventory)
-
- return self.get_inventory().then(call_create)
-
- def create_osds(self, drive_groups):
- # type: (List[DriveGroupSpec]) -> AsyncCompletion
- return self.get_hosts().then(lambda hosts: self.call_inventory(hosts, drive_groups))
-
- def _prepare_deployment(self,
- all_hosts, # type: List[orchestrator.HostSpec]
- drive_groups, # type: List[DriveGroupSpec]
- inventory_list # type: List[orchestrator.InventoryHost]
- ):
- # type: (...) -> orchestrator.Completion
-
- for drive_group in drive_groups:
- self.log.info("Processing DriveGroup {}".format(drive_group))
- # 1) use fn_filter to determine matching_hosts
- matching_hosts = drive_group.placement.pattern_matches_hosts([x.hostname for x in all_hosts])
- # 2) Map the inventory to the InventoryHost object
- # FIXME: lazy-load the inventory from a InventoryHost object;
- # this would save one call to the inventory(at least externally)
-
- def _find_inv_for_host(hostname, inventory_list):
- # This is stupid and needs to be loaded with the host
- for _inventory in inventory_list:
- if _inventory.name == hostname:
- return _inventory
- raise OrchestratorError("No inventory found for host: {}".format(hostname))
-
- cmds = []
- # 3) iterate over matching_host and call DriveSelection and to_ceph_volume
- for host in matching_hosts:
- inventory_for_host = _find_inv_for_host(host, inventory_list)
- drive_selection = selector.DriveSelection(drive_group, inventory_for_host.devices)
- cmd = translate.to_ceph_volume(drive_group, drive_selection).run()
- if not cmd:
- self.log.info("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
- continue
- cmds.append((host, cmd))
-
- return self._create_osds(cmds)
-
- @async_map_completion
- def _create_osds(self, host, cmd):
- return self._create_osd(host, cmd)
+ def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[orchestrator.Completion]:
+ completions: List[orchestrator.Completion] = list()
+ for spec in specs:
+ completions.extend(self._apply(spec))
+ return completions
+
+ def create_osds(self, drive_group):
+ # type: (DriveGroupSpec) -> orchestrator.Completion
+ self.log.info("Processing DriveGroup {}".format(drive_group))
+ # 1) use fn_filter to determine matching_hosts
+ matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()])
+ # 2) Map the inventory to the InventoryHost object
+
+ def _find_inv_for_host(hostname: str, inventory_dict: dict):
+ # This is stupid and needs to be loaded with the host
+ for _host, _inventory in inventory_dict.items():
+ if _host == hostname:
+ return _inventory
+ raise OrchestratorError("No inventory found for host: {}".format(hostname))
+
+ ret = []
+ # 3) iterate over matching_host and call DriveSelection and to_ceph_volume
+ self.log.debug(f"Checking matching hosts -> {matching_hosts}")
+ for host in matching_hosts:
+ inventory_for_host = _find_inv_for_host(host, self.cache.devices)
+ self.log.debug(f"Found inventory for host {inventory_for_host}")
+ drive_selection = selector.DriveSelection(drive_group, inventory_for_host)
+ self.log.debug(f"Found drive selection {drive_selection}")
+ cmd = translate.to_ceph_volume(drive_group, drive_selection).run()
+ self.log.debug(f"translated to cmd {cmd}")
+ if not cmd:
+ self.log.info("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
+ continue
+ ret_msg = self._create_osd(host, cmd)
+ ret.append(ret_msg)
+ return trivial_result(", ".join(ret))
def _create_osd(self, host, cmd):
create_fns = {
'mon': self._create_mon,
'mgr': self._create_mgr,
+ 'osd': self.create_osds,
'mds': self._create_mds,
'rgw': self._create_rgw,
'rbd-mirror': self._create_rbd_mirror,
r = False
+ if daemon_type == 'osd':
+ return False if create_func(spec) else True # type: ignore
+
# sanity check
if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
r = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps))
assert r
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.SpecStore.save")
+ def test_apply_osd_save(self, _save_spec, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ json_spec = {'service_type': 'osd', 'host_pattern': 'test', 'service_id': 'foo', 'data_devices': {'all': True}}
+ spec = ServiceSpec.from_json(json_spec)
+ assert isinstance(spec, DriveGroupSpec)
+ c = cephadm_module.apply_drivegroups([spec])
+ _save_spec.assert_called_with(spec)
+ assert wait(cephadm_module, c[0]) == 'Scheduled osd update...'
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.SpecStore.save")
+ def test_apply_osd_save_placement(self, _save_spec, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ json_spec = {'service_type': 'osd', 'placement': {'host_pattern': 'test'}, 'service_id': 'foo', 'data_devices': {'all': True}}
+ spec = ServiceSpec.from_json(json_spec)
+ assert isinstance(spec, DriveGroupSpec)
+ c = cephadm_module.apply_drivegroups([spec])
+ _save_spec.assert_called_with(spec)
+ assert wait(cephadm_module, c[0]) == 'Scheduled osd update...'
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_create_osds(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
- c = cephadm_module.create_osds([dg])
- assert wait(cephadm_module, c) == ["Created no osd(s) on host test; already created?"]
+ c = cephadm_module.create_osds(dg)
+ out = wait(cephadm_module, c)
+ assert out == "Created no osd(s) on host test; already created?"
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
json.dumps([
import logging
import time
-from ceph.deployment.drive_group import DriveGroupSpecs, DriveGroupValidationError
+from ceph.deployment.drive_group import DriveGroupSpec, DriveGroupValidationError
from mgr_util import get_most_recent_rate
from . import ApiController, RESTController, Endpoint, Task
@raise_if_no_orchestrator
@handle_orchestrator_error('osd')
- def _create_with_drive_groups(self, drive_groups):
+ def _create_with_drive_groups(self, drive_group):
"""Create OSDs with DriveGroups."""
orch = OrchClient.instance()
try:
- orch.osds.create(DriveGroupSpecs(drive_groups).drive_groups)
+ orch.osds.create(DriveGroupSpec.from_json(drive_group))
except (ValueError, TypeError, DriveGroupValidationError) as e:
raise DashboardException(e, component='osd')
def create(self, method, data, tracking_id): # pylint: disable=W0622
if method == 'bare':
return self._create_bare(data)
- if method == 'drive_groups':
+ if method == 'drive_group':
return self._create_with_drive_groups(data)
raise DashboardException(
component='osd', http_status_code=400, msg='Unknown method: {}'.format(method))
class OsdManager(ResourceManager):
@wait_api_result
- def create(self, drive_groups):
- return self.api.create_osds(drive_groups)
+ def create(self, drive_group):
+ return self.api.create_osds(drive_group)
@wait_api_result
def remove(self, osd_ids):
fake_client = mock.Mock()
instance.return_value = fake_client
- # Valid DriveGroups
- data = {
- 'method': 'drive_groups',
- 'data': {
- 'all_hdd': {
- 'host_pattern': '*',
- 'data_devices': {
- 'rotational': True
- }
- },
- 'b_ssd': {
- 'host_pattern': 'b',
- 'data_devices': {
- 'rotational': False
- }
- }
- },
- 'tracking_id': 'all_hdd, b_ssd'
- }
+ # Valid DriveGroup
+ data = {'method': 'drive_group',
+ 'data': {'service_type': 'osd', 'service_id': 'all_hdd',
+ 'data_devices': {'rotational': True},
+ 'host_pattern': '*'},
+ 'tracking_id': 'all_hdd, b_ssd'}
# Without orchestrator service
fake_client.available.return_value = False
self._task_post('/api/osd', data)
self.assertStatus(201)
fake_client.osds.create.assert_called_with(
- [DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
- service_id='all_hdd',
- data_devices=DeviceSelection(rotational=True)),
- DriveGroupSpec(placement=PlacementSpec(host_pattern='b'),
- service_id='b_ssd',
- data_devices=DeviceSelection(rotational=False))])
+ DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='all_hdd',
+ service_type='osd',
+ data_devices=DeviceSelection(rotational=True)))
- # Invalid DriveGroups
- data['data']['b'] = {
- 'host_pattern1': 'aa'
- }
+ @mock.patch('dashboard.controllers.orchestrator.OrchClient.instance')
+ def test_osd_create_with_invalid_drive_groups(self, instance):
+ # without orchestrator service
+ fake_client = mock.Mock()
+ instance.return_value = fake_client
+
+ # Invalid DriveGroup
+ data = {'method': 'drive_group',
+ 'data': {'service_type': 'osd', 'service_id': 'invalid_dg',
+ 'data_devices': {'rotational': True},
+ 'host_pattern_wrong': 'unknown'},
+ 'tracking_id': 'all_hdd, b_ssd'}
self._task_post('/api/osd', data)
self.assertStatus(400)
#assert action in ["start", "stop", "reload, "restart", "redeploy"]
raise NotImplementedError()
- def create_osds(self, drive_groups):
- # type: (List[DriveGroupSpec]) -> Completion
+ def create_osds(self, drive_group):
+ # type: (DriveGroupSpec) -> Completion
"""
Create one or more OSDs within a single Drive Group.
of OsdSpec: other fields are advisory/extensible for any
finer-grained OSD feature enablement (choice of backing store,
compression/encryption, etc).
-
- :param drive_groups: a list of DriveGroupSpec
- :param all_hosts: TODO, this is required because the orchestrator methods are not composable
- Probably this parameter can be easily removed because each orchestrator can use
- the "get_inventory" method and the "drive_group.host_pattern" attribute
- to obtain the list of hosts where to apply the operation
"""
raise NotImplementedError()
+ def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[Completion]:
+ """ Update OSD cluster """
+ raise NotImplementedError()
+
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
force: bool = False) -> Completion:
pass # just for type checking.
-from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, \
- DriveGroupSpecs
+from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
+
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
from mgr_module import MgrModule, HandleCommandResult
return HandleCommandResult(stdout=table.get_string())
@_cli_write_command(
- 'orch osd create',
+ 'orch apply osd',
+ desc='Create an OSD daemons using drive_groups')
+ def _apply_osd(self, inbuf=None):
+ # type: (Optional[str]) -> HandleCommandResult
+ """Apply DriveGroupSpecs to create OSDs"""
+ usage = """
+Usage:
+ ceph orch apply osd -i <json_file/yaml_file>
+"""
+ if not inbuf:
+ return HandleCommandResult(-errno.EINVAL, stderr=usage)
+ try:
+ drivegroups = yaml.load_all(inbuf)
+ dg_specs = [ServiceSpec.from_json(dg) for dg in drivegroups]
+ except ValueError as e:
+ msg = 'Failed to read JSON/YAML input: {}'.format(str(e)) + usage
+ return HandleCommandResult(-errno.EINVAL, stderr=msg)
+
+ completions = self.apply_drivegroups(dg_specs)
+ [self._orchestrator_wait([completion]) for completion in completions] # type: ignore
+ [raise_if_exception(completion) for completion in completions] # type: ignore
+ result_strings = [completion.result_str() for completion in completions]
+ return HandleCommandResult(stdout=" ".join(result_strings))
+
+ @_cli_write_command(
+ 'orch daemon add osd',
"name=svc_arg,type=CephString,req=false",
- 'Create an OSD service. Either --svc_arg=host:drives or -i <drive_group>')
- def _create_osd(self, svc_arg=None, inbuf=None):
- # type: (Optional[str], Optional[str]) -> HandleCommandResult
+ 'Create an OSD service. Either --svc_arg=host:drives')
+ def _daemon_add_osd(self, svc_arg=None):
+ # type: (Optional[str]) -> HandleCommandResult
"""Create one or more OSDs"""
usage = """
Usage:
- ceph orch osd create -i <json_file/yaml_file>
- ceph orch osd create host:device1,device2,...
+ ceph orch daemon add osd host:device1,device2,...
"""
-
- if inbuf:
- try:
- dgs = DriveGroupSpecs(yaml.load(inbuf))
- drive_groups = dgs.drive_groups
- except ValueError as e:
- msg = 'Failed to read JSON input: {}'.format(str(e)) + usage
- return HandleCommandResult(-errno.EINVAL, stderr=msg)
-
- elif svc_arg:
- try:
- host_name, block_device = svc_arg.split(":")
- block_devices = block_device.split(',')
- except (TypeError, KeyError, ValueError):
- msg = "Invalid host:device spec: '{}'".format(svc_arg) + usage
- return HandleCommandResult(-errno.EINVAL, stderr=msg)
-
- devs = DeviceSelection(paths=block_devices)
- drive_groups = [DriveGroupSpec(placement=PlacementSpec(host_pattern=host_name), data_devices=devs)]
- else:
+ if not svc_arg:
return HandleCommandResult(-errno.EINVAL, stderr=usage)
+ try:
+ host_name, block_device = svc_arg.split(":")
+ block_devices = block_device.split(',')
+ devs = DeviceSelection(paths=block_devices)
+ drive_group = DriveGroupSpec(placement=PlacementSpec(host_pattern=host_name), data_devices=devs)
+ except (TypeError, KeyError, ValueError):
+ msg = "Invalid host:device spec: '{}'".format(svc_arg) + usage
+ return HandleCommandResult(-errno.EINVAL, stderr=msg)
- completion = self.create_osds(drive_groups)
+ completion = self.create_osds(drive_group)
self._orchestrator_wait([completion])
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
-
+
@_cli_write_command(
'orch osd rm',
"name=svc_id,type=CephString,n=N "
mgr=self
)
- def create_osds(self, drive_groups):
- # type: (List[DriveGroupSpec]) -> RookCompletion
+ def create_osds(self, drive_group):
+ # type: (DriveGroupSpec) -> RookCompletion
""" Creates OSDs from a drive group specification.
- Caveat: Currently limited to a single DriveGroup.
- The orchestrator_cli expects a single completion which
- ideally represents a set of operations. This orchestrator
- doesn't support this notion, yet. Hence it's only accepting
- a single DriveGroup for now.
- You can work around it by invoking:
-
$: ceph orch osd create -i <dg.file>
- multiple times. The drivegroup file must only contain one spec at a time.
+ The drivegroup file must only contain one spec at a time.
"""
- drive_group = drive_groups[0]
targets = [] # type: List[str]
if drive_group.data_devices and drive_group.data_devices.paths:
return list(filter(_filter_func, daemons))
- def create_osds(self, drive_groups):
- # type: (List[DriveGroupSpec]) -> TestCompletion
+ def create_osds(self, drive_group):
+ # type: (DriveGroupSpec) -> TestCompletion
""" Creates OSDs from a drive group specification.
- Caveat: Currently limited to a single DriveGroup.
- The orchestrator_cli expects a single completion which
- ideally represents a set of operations. This orchestrator
- doesn't support this notion, yet. Hence it's only accepting
- a single DriveGroup for now.
- You can work around it by invoking:
-
$: ceph orch osd create -i <dg.file>
- multiple times. The drivegroup file must only contain one spec at a time.
+ The drivegroup file must only contain one spec at a time.
"""
- drive_group = drive_groups[0]
def run(all_hosts):
# type: (List[orchestrator.HostSpec]) -> None
message='create_osds',
mgr=self,
)
-
)
-
@deferred_write("remove_daemons")
def remove_daemons(self, names, force):
assert isinstance(names, list)
@classmethod
def from_json(cls, device_spec):
# type: (dict) -> DeviceSelection
+ if not device_spec:
+ return # type: ignore
for applied_filter in list(device_spec.keys()):
if applied_filter not in cls._supported_filters:
raise DriveGroupValidationError(
return cls(**device_spec)
+ def to_json(self):
+ return self.__dict__.copy()
+
def __repr__(self):
keys = [
key for key in self._supported_filters + ['limit'] if getattr(self, key) is not None
super(DriveGroupValidationError, self).__init__('Failed to validate Drive Group: ' + msg)
-class DriveGroupSpecs(object):
- """ Container class to parse drivegroups """
-
- def __init__(self, drive_group_json):
- # type: (list) -> None
- self.drive_group_json = drive_group_json
-
- if isinstance(self.drive_group_json, dict):
- # from legacy representation (till Octopus)
- self.drive_group_json = [
- dict(service_id=name, service_type='osd', **dg)
- for name, dg in self.drive_group_json.items()
- ]
- if not isinstance(self.drive_group_json, list):
- raise ServiceSpecValidationError('Specs needs to be a list of specs')
- dgs = list(map(DriveGroupSpec.from_json, self.drive_group_json)) # type: ignore
- self.drive_groups = dgs # type: List[DriveGroupSpec]
-
- def __repr__(self):
- return ", ".join([repr(x) for x in self.drive_groups])
-
-
class DriveGroupSpec(ServiceSpec):
"""
Describe a drive group in the same form that ceph-volume
', '.join('{}={}'.format(key, repr(getattr(self, key))) for key in keys)
)
+ def to_json(self):
+ # type: () -> Dict[str, Any]
+ c = self.__dict__.copy()
+ if self.placement:
+ c['placement'] = self.placement.to_json()
+ if self.data_devices:
+ c['data_devices'] = self.data_devices.to_json()
+ if self.db_devices:
+ c['db_devices'] = self.db_devices.to_json()
+ if self.wal_devices:
+ c['wal_devices'] = self.wal_devices.to_json()
+ return c
+
def __eq__(self, other):
return repr(self) == repr(other)
return []
devices = list() # type: List[Device]
- for disk in self.disks.devices:
+ for disk in self.disks:
logger.debug("Processing disk {}".format(disk.path))
if not disk.available:
# This disk is already taken and must not be re-assigned.
for taken_device in devices:
- if taken_device in self.disks.devices:
- self.disks.devices.remove(taken_device)
+ if taken_device in self.disks:
+ self.disks.remove(taken_device)
return sorted([x for x in devices], key=lambda dev: dev.path)
from ceph.deployment.inventory import Device
from ceph.deployment.service_spec import PlacementSpec, ServiceSpecValidationError
from ceph.tests.utils import _mk_inventory, _mk_device
-from ceph.deployment.drive_group import DriveGroupSpec, DriveGroupSpecs, \
- DeviceSelection, DriveGroupValidationError
+from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, \
+ DriveGroupValidationError
@pytest.mark.parametrize("test_input",
[
}
]
),
- (
- { # old style json
- 'testing_drivegroup':
- {
- 'host_pattern': 'hostname',
- 'data_devices': {'paths': ['/dev/sda']}
- }
- }
- )
-
])
-
def test_DriveGroup(test_input):
- dgs = DriveGroupSpecs(test_input)
- for dg in dgs.drive_groups:
- assert dg.placement.pattern_matches_hosts(['hostname']) == ['hostname']
- assert dg.service_id == 'testing_drivegroup'
- assert all([isinstance(x, Device) for x in dg.data_devices.paths])
- assert dg.data_devices.paths[0].path == '/dev/sda'
+ dg = [DriveGroupSpec.from_json(inp) for inp in test_input][0]
+ assert dg.placement.pattern_matches_hosts(['hostname']) == ['hostname']
+ assert dg.service_id == 'testing_drivegroup'
+ assert all([isinstance(x, Device) for x in dg.data_devices.paths])
+ assert dg.data_devices.paths[0].path == '/dev/sda'
def test_DriveGroup_fail():
dev.path = '/dev/sd' + name
dev.sys_api = dict(dev_.sys_api, path='/dev/sd' + name)
devs.append(dev)
- return Devices(devices=devs)
+ return Devices(devices=devs).devices