import subprocess
from ceph.deployment import inventory, translate
+from ceph.deployment.drive_group import DriveGroupSpecs
from ceph.deployment.drive_selection import selector
from mgr_module import MgrModule
def create_osds(self, drive_groups):
return self.get_hosts().then(lambda hosts: self.call_inventory(hosts, drive_groups))
-
- def _prepare_deployment(self, all_hosts, drive_groups, inventory_list):
- # type: (List[orchestrator.InventoryNode], List[orchestrator.DriveGroupSpecs], List[orchestrator.InventoryNode] -> orchestrator.Completion
+ def _prepare_deployment(self,
+ all_hosts, # type: List[orchestrator.InventoryNode]
+ drive_groups, # type: List[DriveGroupSpecs]
+ inventory_list # type: List[orchestrator.InventoryNode]
+ ):
+ # type: (...) -> orchestrator.Completion
for drive_group in drive_groups:
self.log.info("Processing DriveGroup {}".format(drive_group))
for _inventory in inventory_list:
if _inventory.name == hostname:
return _inventory
+ raise OrchestratorError("No inventory found for host: {}".format(hostname))
cmds = []
# 3) iterate over matching_host and call DriveSelection and to_ceph_volume
for host in matching_hosts:
inventory_for_host = _find_inv_for_host(host, inventory_list)
drive_selection = selector.DriveSelection(drive_group, inventory_for_host.devices)
- cmd = translate.ToCephVolume(drive_group, drive_selection).run()
+ cmd = translate.to_ceph_volume(drive_group, drive_selection).run()
if not cmd:
self.log.info("No data_devices, skipping DriveGroup: {}".format(drive_group.name))
continue
split_cmd = cmd.split(' ')
_cmd = ['--config-and-keyring', '-', '--']
_cmd.extend(split_cmd)
- out, code = self._run_ceph_daemon(
+ out, err, code = self._run_cephadm(
host, 'osd', 'ceph-volume',
_cmd,
stdin=j)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
def test_create_osds(self, _send_command, _get_connection, cephadm_module):
with self._with_host(cephadm_module, 'test'):
- dg = DriveGroupSpec('test', DeviceSelection(paths=['']))
- c = cephadm_module.create_osds(dg)
- assert wait(cephadm_module, c) == "Created osd(s) on host 'test'"
+ dg = DriveGroupSpec('test', data_devices=DeviceSelection(paths=['']))
+ c = cephadm_module.create_osds([dg])
+ assert wait(cephadm_module, c) == ["Created osd(s) on host 'test'"]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
json.dumps([
ceph orchestrator osd create host:device1,device2,...
"""
- # TODO: try if inbuf file is yaml of json
if inbuf:
try:
- dgs = DriveGroupSpecs(json.loads(inbuf))
+ dgs = DriveGroupSpecs(yaml.load(inbuf))
drive_groups = dgs.drive_groups
except ValueError as e:
msg = 'Failed to read JSON input: {}'.format(str(e)) + usage
import fnmatch
+from ceph.deployment.inventory import Device
try:
from typing import Optional, List, Dict, Any
except ImportError:
"""
ephemeral drive group device specification
"""
- #: List of absolute paths to the devices.
- self.paths = [] if paths is None else paths # type: List[str]
+ #: List of Device objects for devices paths.
+ self.paths = [] if paths is None else [Device(path) for path in paths] # type: List[Device]
#: A wildcard string. e.g: "SDD*" or "SanDisk SD8SN8U5"
self.model = model
class DriveGroupSpecs(object):
+ """ Container class to parse drivegroups """
- def __init__(self, drive_group_json: dict):
- self.drive_group_json: dict = drive_group_json
- self.drive_groups: list = list()
+ def __init__(self, drive_group_json):
+ # type: (dict) -> None
+ self.drive_group_json = drive_group_json
+ self.drive_groups = list() # type: list
self.build_drive_groups()
- def build_drive_groups(self) -> list:
+ def build_drive_groups(self):
for drive_group_name, drive_group_spec in self.drive_group_json.items():
self.drive_groups.append(DriveGroupSpec.from_json
(drive_group_spec, name=drive_group_name))
- def __repr__(self) -> str:
+ def __repr__(self):
return ", ".join([repr(x) for x in self.drive_groups])
@classmethod
def from_json(cls, json_drive_group, name=None):
- # type: (dict) -> DriveGroupSpec
+ # type: (dict, Optional[str]) -> DriveGroupSpec
"""
Initialize 'Drive group' structure
json_drive_group.items()}
if not args:
raise DriveGroupValidationError("Didn't find Drivegroup specs")
- return DriveGroupSpec(**args, name=name) # noqa, that's no syntax error
+ return DriveGroupSpec(name=name, **args)
except (KeyError, TypeError) as e:
raise DriveGroupValidationError(str(e))
def hosts(self, all_hosts):
# type: (List[str]) -> List[str]
- return fnmatch.filter(all_hosts, self.host_pattern)
+ return fnmatch.filter(all_hosts, self.host_pattern) # type: ignore
def validate(self, all_hosts):
# type: (List[str]) -> None
self.disks = disks.copy()
self.spec = spec
- if self.spec.data_devices.paths:
- self._data = self.spec.data_devices.paths
- self._db = []
- self._wal = []
- self._journal = []
+ if self.spec.data_devices.paths: # type: ignore
+ # re: type: ignore there is *always* a path attribute assigned to DeviceSelection
+ # it's just None if actual drivegroups are used
+ self._data = self.spec.data_devices.paths # type: ignore
+ self._db = [] # type: List
+ self._wal = [] # type: List
+ self._journal = [] # type: List
else:
self._data = self.assign_devices(self.spec.data_devices)
self._wal = self.assign_devices(self.spec.wal_devices)
logger = logging.getLogger(__name__)
-class ToCephVolume(object):
+class to_ceph_volume(object):
def __init__(self,
spec, # type: DriveGroupSpec
assert ret.is_matchable is False
-
-
class TestDriveSelection(object):
testdata = [
import pytest
-import mock
from ceph.deployment import drive_selection, translate
+from ceph.deployment.inventory import Device
from ceph.tests.utils import _mk_inventory, _mk_device
-from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, DriveGroupValidationError
+from ceph.deployment.drive_group import DriveGroupSpec, DriveGroupSpecs, \
+ DeviceSelection, DriveGroupValidationError
def test_DriveGroup():
- dg_json = {
- 'host_pattern': 'hostname',
- 'data_devices': {'paths': ['/dev/sda']}
- }
+ dg_json = {'testing_drivegroup':
+ {'host_pattern': 'hostname',
+ 'data_devices': {'paths': ['/dev/sda']}
+ }
+ }
- dg = DriveGroupSpec.from_json(dg_json)
- assert dg.hosts(['hostname']) == ['hostname']
- assert dg.data_devices.paths == ['/dev/sda']
+ dgs = DriveGroupSpecs(dg_json)
+ for dg in dgs.drive_groups:
+ assert dg.hosts(['hostname']) == ['hostname']
+ assert dg.name == 'testing_drivegroup'
+ assert all([isinstance(x, Device) for x in dg.data_devices.paths])
+ assert dg.data_devices.paths[0].path == '/dev/sda'
def test_DriveGroup_fail():
def test_drive_selection():
devs = DeviceSelection(paths=['/dev/sda'])
spec = DriveGroupSpec('node_name', data_devices=devs)
- assert spec.data_devices.paths == ['/dev/sda']
+ assert all([isinstance(x, Device) for x in spec.data_devices.paths])
+ assert spec.data_devices.paths[0].path == '/dev/sda'
with pytest.raises(DriveGroupValidationError, match='exclusive'):
DeviceSelection(paths=['/dev/sda'], rotational=False)
)
inventory = _mk_inventory(_mk_device()*2)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.ToCephVolume(spec, sel).run()
+ cmd = translate.to_ceph_volume(spec, sel).run()
assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd'
)
inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.ToCephVolume(spec, sel).run()
- assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc /dev/sdd --yes --no-systemd'
+ cmd = translate.to_ceph_volume(spec, sel).run()
+ assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
+ '--db-devices /dev/sdc /dev/sdd --yes --no-systemd')
def test_ceph_volume_command_2():
_mk_device(size="10.0 GB", rotational=False)*2
)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.ToCephVolume(spec, sel).run()
- assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf --yes --no-systemd'
+ cmd = translate.to_ceph_volume(spec, sel).run()
+ assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
+ '--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf '
+ '--yes --no-systemd')
def test_ceph_volume_command_3():
_mk_device(size="10.0 GB", rotational=False)*2
)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.ToCephVolume(spec, sel).run()
- assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf --dmcrypt --yes --no-systemd'
+ cmd = translate.to_ceph_volume(spec, sel).run()
+ assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
+ '--db-devices /dev/sdc /dev/sdd '
+ '--wal-devices /dev/sde /dev/sdf --dmcrypt '
+ '--yes --no-systemd')
def test_ceph_volume_command_4():
_mk_device(size="10.0 GB", rotational=False)*2
)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.ToCephVolume(spec, sel).run()
- assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf --block-wal-size 500M --block-db-size 500M --dmcrypt --osds-per-device 3 --yes --no-systemd'
+ cmd = translate.to_ceph_volume(spec, sel).run()
+ assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
+ '--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf '
+ '--block-wal-size 500M --block-db-size 500M --dmcrypt '
+ '--osds-per-device 3 --yes --no-systemd')
def test_ceph_volume_command_5():
)
inventory = _mk_inventory(_mk_device(rotational=True)*2)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.ToCephVolume(spec, sel).run()
+ cmd = translate.to_ceph_volume(spec, sel).run()
assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --filestore --yes --no-systemd'
)
inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.ToCephVolume(spec, sel).run()
- assert cmd == 'lvm batch --no-auto /dev/sdc /dev/sdd --journal-size 500M --journal-devices /dev/sda /dev/sdb --filestore --yes --no-systemd'
+ cmd = translate.to_ceph_volume(spec, sel).run()
+ assert cmd == ('lvm batch --no-auto /dev/sdc /dev/sdd '
+ '--journal-size 500M --journal-devices /dev/sda /dev/sdb '
+ '--filestore --yes --no-systemd')
--- /dev/null
+from ceph.deployment.inventory import Devices, Device
+
+
+def _mk_device(rotational=True,
+ locked=False,
+ size="394.27 GB"):
+ return [Device(
+ path='??',
+ sys_api={
+ "rotational": '1' if rotational else '0',
+ "vendor": "Vendor",
+ "human_readable_size": size,
+ "partitions": {},
+ "locked": int(locked),
+ "sectorsize": "512",
+ "removable": "0",
+ "path": "??",
+ "support_discard": "",
+ "model": "Model",
+ "ro": "0",
+ "nr_requests": "128",
+ "size": 423347879936 # ignore coversion from human_readable_size
+ },
+ available=not locked,
+ rejected_reasons=['locked'] if locked else [],
+ lvs=[],
+ device_id="Model-Vendor-foobar"
+ )]
+
+
+def _mk_inventory(devices):
+ devs = []
+ for dev_, name in zip(devices, map(chr, range(ord('a'), ord('z')))):
+ dev = Device.from_json(dev_.to_json())
+ dev.path = '/dev/sd' + name
+ dev.sys_api = dict(dev_.sys_api, path='/dev/sd' + name)
+ devs.append(dev)
+ return Devices(devices=devs)