import shutil
import subprocess
-from ceph.deployment import inventory
+from ceph.deployment import inventory, translate
+from ceph.deployment.drive_selection import selector
+
from mgr_module import MgrModule
import mgr_util
import orchestrator
r[str(o['osd'])] = o['uuid']
return r
- @async_completion
- def _create_osd(self, all_hosts_, drive_group):
- all_hosts = orchestrator.InventoryNode.get_host_names(all_hosts_)
- assert len(drive_group.hosts(all_hosts)) == 1
- assert len(drive_group.data_devices.paths) > 0
- assert all(map(lambda p: isinstance(p, six.string_types),
- drive_group.data_devices.paths))
-
- host = drive_group.hosts(all_hosts)[0]
+ def call_inventory(self, hosts, drive_groups):
+ def call_create(inventory):
+ return self._prepare_deployment(hosts, drive_groups, inventory)
+
+ return self.get_inventory().then(call_create)
+
+ def create_osds(self, drive_groups):
+ return self.get_hosts().then(lambda hosts: self.call_inventory(hosts, drive_groups))
+
+
+ def _prepare_deployment(self, all_hosts, drive_groups, inventory_list):
+ # type: (List[orchestrator.InventoryNode], List[orchestrator.DriveGroupSpecs], List[orchestrator.InventoryNode] -> orchestrator.Completion
+
+ for drive_group in drive_groups:
+ self.log.info("Processing DriveGroup {}".format(drive_group))
+ # 1) use fn_filter to determine matching_hosts
+ matching_hosts = drive_group.hosts([x.name for x in all_hosts])
+ # 2) Map the inventory to the InventoryNode object
+ # FIXME: lazy-load the inventory from a InventoryNode object;
+ # this would save one call to the inventory(at least externally)
+
+ def _find_inv_for_host(hostname, inventory_list):
+ # This is stupid and needs to be loaded with the host
+ for _inventory in inventory_list:
+ if _inventory.name == hostname:
+ return _inventory
+
+ cmds = []
+ # 3) iterate over matching_host and call DriveSelection and to_ceph_volume
+ for host in matching_hosts:
+ inventory_for_host = _find_inv_for_host(host, inventory_list)
+ drive_selection = selector.DriveSelection(drive_group, inventory_for_host.devices)
+ cmd = translate.ToCephVolume(drive_group, drive_selection).run()
+ if not cmd:
+ self.log.info("No data_devices, skipping DriveGroup: {}".format(drive_group.name))
+ continue
+ cmds.append((host, cmd))
+
+ return self._create_osd(cmds)
+
+ @async_map_completion
+ def _create_osd(self, host, cmd):
+
self._require_hosts(host)
# get bootstrap key
'keyring': keyring,
})
- devices = drive_group.data_devices.paths
- for device in devices:
- out, err, code = self._run_cephadm(
- host, 'osd', 'ceph-volume',
- [
- '--config-and-keyring', '-',
- '--',
- 'lvm', 'prepare',
- "--cluster-fsid", self._cluster_fsid,
- "--{}".format(drive_group.objectstore),
- "--data", device,
- ],
- stdin=j)
- self.log.debug('ceph-volume prepare: %s' % out)
+ split_cmd = cmd.split(' ')
+ _cmd = ['--config-and-keyring', '-', '--']
+ _cmd.extend(split_cmd)
+ out, code = self._run_ceph_daemon(
+ host, 'osd', 'ceph-volume',
+ _cmd,
+ stdin=j)
+ self.log.debug('ceph-volume prepare: %s' % out)
# check result
out, err, code = self._run_cephadm(
if osd['tags']['ceph.cluster_fsid'] != fsid:
self.log.debug('mismatched fsid, skipping %s' % osd)
continue
- if len(list(set(devices) & set(osd['devices']))) == 0 and osd.get('lv_path') not in devices:
- self.log.debug('mismatched devices, skipping %s' % osd)
- continue
if osd_id not in osd_uuid_map:
self.log.debug('osd id %d does not exist in cluster' % osd_id)
continue
return "Created osd(s) on host '{}'".format(host)
- def create_osds(self, drive_group):
- """
- Create a new osd.
-
- The orchestrator CLI currently handles a narrow form of drive
- specification defined by a single block device using bluestore.
-
- :param drive_group: osd specification
-
- TODO:
- - support full drive_group specification
- - support batch creation
- """
-
- return self.get_hosts().then(lambda hosts: self._create_osd(hosts, drive_group))
-
@with_services('osd')
def remove_osds(self, osd_ids, services):
# type: (List[str], List[orchestrator.ServiceDescription]) -> AsyncCompletion
super(DriveGroupValidationError, self).__init__('Failed to validate Drive Group: ' + msg)
+class DriveGroupSpecs(object):
+
+ def __init__(self, drive_group_json: dict):
+ self.drive_group_json: dict = drive_group_json
+ self.drive_groups: list = list()
+ self.build_drive_groups()
+
+ def build_drive_groups(self) -> list:
+ for drive_group_name, drive_group_spec in self.drive_group_json.items():
+ self.drive_groups.append(DriveGroupSpec.from_json
+ (drive_group_spec, name=drive_group_name))
+
+ def __repr__(self) -> str:
+ return ", ".join([repr(x) for x in self.drive_groups])
+
+
class DriveGroupSpec(object):
"""
Describe a drive group in the same form that ceph-volume
]
def __init__(self,
- host_pattern, # type: str
+ host_pattern=None, # type: str
+ name=None, # type: str
data_devices=None, # type: Optional[DeviceSelection]
db_devices=None, # type: Optional[DeviceSelection]
wal_devices=None, # type: Optional[DeviceSelection]
journal_size=None, # type: Optional[int]
):
+ #: A name for the drive group. Since we can have multiple
+ # drive groups in a cluster we need a way to identify them.
+ self.name = name
+
# concept of applying a drive group to a (set) of hosts is tightly
# linked to the drive group itself
#
self.osd_id_claims = osd_id_claims
@classmethod
- def from_json(cls, json_drive_group):
+ def from_json(cls, json_drive_group, name=None):
# type: (dict) -> DriveGroupSpec
"""
Initialize 'Drive group' structure
try:
args = {k: (DeviceSelection.from_json(v) if k.endswith('_devices') else v) for k, v in
json_drive_group.items()}
- return DriveGroupSpec(**args)
+ if not args:
+ raise DriveGroupValidationError("Didn't find Drivegroup specs")
+ return DriveGroupSpec(**args, name=name) # noqa, that's no syntax error
except (KeyError, TypeError) as e:
raise DriveGroupValidationError(str(e))
keys.remove('encrypted')
if 'objectstore' in keys and self.objectstore == 'bluestore':
keys.remove('objectstore')
- return "DriveGroupSpec({})".format(
+ return "DriveGroupSpec(name={}->{})".format(
+ self.name,
', '.join('{}={}'.format(key, repr(getattr(self, key))) for key in keys)
)
self.disks = disks.copy()
self.spec = spec
- self._data = self.assign_devices(self.spec.data_devices)
- self._wal = self.assign_devices(self.spec.wal_devices)
- self._db = self.assign_devices(self.spec.db_devices)
- self._jornal = self.assign_devices(self.spec.journal_devices)
+ if self.spec.data_devices.paths:
+ self._data = self.spec.data_devices.paths
+ self._db = []
+ self._wal = []
+ self._journal = []
+ else:
+ self._data = self.assign_devices(self.spec.data_devices)
+ self._wal = self.assign_devices(self.spec.wal_devices)
+ self._db = self.assign_devices(self.spec.db_devices)
+ self._journal = self.assign_devices(self.spec.journal_devices)
def data_devices(self):
# type: () -> List[Device]
def journal_devices(self):
# type: () -> List[Device]
- return self._jornal
+ return self._journal
@staticmethod
def _limit_reached(device_filter, len_devices,
return a sorted(by path) list of devices
"""
- if device_filter is None:
+ if not device_filter and not self.spec.data_devices.paths:
logger.debug('device_filter is None')
return []
devices = list() # type: List[Device]
# continue criterias
assert _filter.matcher is not None
+
+ if not disk.available:
+ logger.debug(
+ "Ignoring disk {}. Disk is not available".format(disk.path))
+ continue
+
if not _filter.matcher.compare(disk):
logger.debug(
"Ignoring disk {}. Filter did not match".format(
from ceph.deployment import drive_selection
from ceph.tests.factories import InventoryFactory
+from ceph.tests.utils import _mk_inventory, _mk_device
class TestMatcher(object):
assert ret.is_matchable is False
-def _mk_device(rotational=True, locked=False):
- return [Device(
- path='??',
- sys_api={
- "rotational": '1' if rotational else '0',
- "vendor": "Vendor",
- "human_readable_size": "394.27 GB",
- "partitions": {},
- "locked": int(locked),
- "sectorsize": "512",
- "removable": "0",
- "path": "??",
- "support_discard": "",
- "model": "Model",
- "ro": "0",
- "nr_requests": "128",
- "size": 423347879936
- },
- available=not locked,
- rejected_reasons=['locked'] if locked else [],
- lvs=[],
- device_id="Model-Vendor-foobar"
- )]
-
-
-def _mk_inventory(devices):
- devs = []
- for dev_, name in zip(devices, map(chr, range(ord('a'), ord('z')))):
- dev = Device.from_json(dev_.to_json())
- dev.path = '/dev/sd' + name
- dev.sys_api = dict(dev_.sys_api, path='/dev/sd' + name)
- devs.append(dev)
- return Devices(devices=devs)
class TestDriveSelection(object):
import pytest
+import mock
+from ceph.deployment import drive_selection, translate
+from ceph.tests.utils import _mk_inventory, _mk_device
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, DriveGroupValidationError
with pytest.raises(DriveGroupValidationError, match='exclusive'):
DeviceSelection(paths=['/dev/sda'], rotational=False)
+
+
+def test_ceph_volume_command_0():
+ spec = DriveGroupSpec(host_pattern='*',
+ data_devices=DeviceSelection(all=True)
+ )
+ inventory = _mk_inventory(_mk_device()*2)
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmd = translate.ToCephVolume(spec, sel).run()
+ assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd'
+
+
+def test_ceph_volume_command_1():
+ spec = DriveGroupSpec(host_pattern='*',
+ data_devices=DeviceSelection(rotational=True),
+ db_devices=DeviceSelection(rotational=False)
+ )
+ inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmd = translate.ToCephVolume(spec, sel).run()
+ assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc /dev/sdd --yes --no-systemd'
+
+
+def test_ceph_volume_command_2():
+ spec = DriveGroupSpec(host_pattern='*',
+ data_devices=DeviceSelection(size='200GB:350GB', rotational=True),
+ db_devices=DeviceSelection(size='200GB:350GB', rotational=False),
+ wal_devices=DeviceSelection(size='10G')
+ )
+ inventory = _mk_inventory(_mk_device(rotational=True)*2 +
+ _mk_device(rotational=False)*2 +
+ _mk_device(size="10.0 GB", rotational=False)*2
+ )
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmd = translate.ToCephVolume(spec, sel).run()
+ assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf --yes --no-systemd'
+
+
+def test_ceph_volume_command_3():
+ spec = DriveGroupSpec(host_pattern='*',
+ data_devices=DeviceSelection(size='200GB:350GB', rotational=True),
+ db_devices=DeviceSelection(size='200GB:350GB', rotational=False),
+ wal_devices=DeviceSelection(size='10G'),
+ encrypted=True
+ )
+ inventory = _mk_inventory(_mk_device(rotational=True)*2 +
+ _mk_device(rotational=False)*2 +
+ _mk_device(size="10.0 GB", rotational=False)*2
+ )
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmd = translate.ToCephVolume(spec, sel).run()
+ assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf --dmcrypt --yes --no-systemd'
+
+
+def test_ceph_volume_command_4():
+ spec = DriveGroupSpec(host_pattern='*',
+ data_devices=DeviceSelection(size='200GB:350GB', rotational=True),
+ db_devices=DeviceSelection(size='200GB:350GB', rotational=False),
+ wal_devices=DeviceSelection(size='10G'),
+ block_db_size='500M',
+ block_wal_size='500M',
+ osds_per_device=3,
+ encrypted=True
+ )
+ inventory = _mk_inventory(_mk_device(rotational=True)*2 +
+ _mk_device(rotational=False)*2 +
+ _mk_device(size="10.0 GB", rotational=False)*2
+ )
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmd = translate.ToCephVolume(spec, sel).run()
+ assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf --block-wal-size 500M --block-db-size 500M --dmcrypt --osds-per-device 3 --yes --no-systemd'
+
+
+def test_ceph_volume_command_5():
+ spec = DriveGroupSpec(host_pattern='*',
+ data_devices=DeviceSelection(rotational=True),
+ objectstore='filestore'
+ )
+ inventory = _mk_inventory(_mk_device(rotational=True)*2)
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmd = translate.ToCephVolume(spec, sel).run()
+ assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --filestore --yes --no-systemd'
+
+
+def test_ceph_volume_command_6():
+ spec = DriveGroupSpec(host_pattern='*',
+ data_devices=DeviceSelection(rotational=False),
+ journal_devices=DeviceSelection(rotational=True),
+ journal_size='500M',
+ objectstore='filestore'
+ )
+ inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmd = translate.ToCephVolume(spec, sel).run()
+ assert cmd == 'lvm batch --no-auto /dev/sdc /dev/sdd --journal-size 500M --journal-devices /dev/sda /dev/sdb --filestore --yes --no-systemd'