for drive_group in drive_groups:
self.log.info("Processing DriveGroup {}".format(drive_group))
# 1) use fn_filter to determine matching_hosts
- matching_hosts = drive_group.hosts([x.hostname for x in all_hosts])
+ matching_hosts = drive_group.placement.pattern_matches_hosts([x.hostname for x in all_hosts])
# 2) Map the inventory to the InventoryHost object
# FIXME: lazy-load the inventory from a InventoryHost object;
# this would save one call to the inventory(at least externally)
drive_selection = selector.DriveSelection(drive_group, inventory_for_host.devices)
cmd = translate.to_ceph_volume(drive_group, drive_selection).run()
if not cmd:
- self.log.info("No data_devices, skipping DriveGroup: {}".format(drive_group.name))
+ self.log.info("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
continue
cmds.append((host, cmd))
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_create_osds(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
- dg = DriveGroupSpec('test', data_devices=DeviceSelection(paths=['']))
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
c = cephadm_module.create_osds([dg])
assert wait(cephadm_module, c) == ["Created no osd(s) on host test; already created?"]
return HandleCommandResult(-errno.EINVAL, stderr=msg)
devs = DeviceSelection(paths=block_devices)
- drive_groups = [DriveGroupSpec(host_name, data_devices=devs)]
+ drive_groups = [DriveGroupSpec(placement=PlacementSpec(host_pattern=host_name), data_devices=devs)]
else:
return HandleCommandResult(-errno.EINVAL, stderr=usage)
def execute(all_hosts_):
# type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
all_hosts = [h.hostname for h in all_hosts_]
+ matching_hosts = drive_group.placement.pattern_matches_hosts(all_hosts)
- assert len(drive_group.hosts(all_hosts)) == 1
+ assert len(matching_hosts) == 1
- if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]):
+ if not self.rook_cluster.node_exists(matching_hosts[0]):
raise RuntimeError("Node '{0}' is not in the Kubernetes "
- "cluster".format(drive_group.hosts(all_hosts)))
+ "cluster".format(matching_hosts))
# Validate whether cluster CRD can accept individual OSD
# creations (i.e. not useAllDevices)
return orchestrator.Completion.with_progress(
message="Creating OSD on {0}:{1}".format(
- drive_group.hosts(all_hosts),
+ matching_hosts,
targets),
mgr=self,
on_complete=lambda _:self.rook_cluster.add_osds(drive_group, all_hosts),
@deferred_read
def has_osds(all_hosts):
+ matching_hosts = drive_group.placement.pattern_matches_hosts(all_hosts)
+
# Find OSD pods on this host
pod_osd_ids = set()
pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
field_selector="spec.nodeName={0}".format(
- drive_group.hosts(all_hosts)[0]
+ matching_hosts[0]
)).items
for p in pods:
pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
new_cluster.spec.storage.nodes = ccl.NodesList()
current_nodes = getattr(current_cluster.spec.storage, 'nodes', ccl.NodesList())
+ matching_host = drive_group.placement.pattern_matches_hosts(all_hosts)[0]
- if drive_group.hosts(all_hosts)[0] not in [n.name for n in current_nodes]:
+ if matching_host not in [n.name for n in current_nodes]:
pd = ccl.NodesItem(
- name=drive_group.hosts(all_hosts)[0],
+ name=matching_host,
config=ccl.Config(
storeType=drive_group.objectstore
)
else:
for _node in new_cluster.spec.storage.nodes:
current_node = _node # type: ccl.NodesItem
- if current_node.name == drive_group.hosts(all_hosts)[0]:
+ if current_node.name == matching_host:
if block_devices:
if not hasattr(current_node, 'devices'):
current_node.devices = ccl.DevicesList()
-import fnmatch
from ceph.deployment.inventory import Device
-from ceph.deployment.service_spec import ServiceSpecValidationError
+from ceph.deployment.service_spec import ServiceSpecValidationError, ServiceSpec, PlacementSpec
try:
from typing import Optional, List, Dict, Any
""" Container class to parse drivegroups """
def __init__(self, drive_group_json):
- # type: (dict) -> None
+ # type: (list) -> None
self.drive_group_json = drive_group_json
self.drive_groups = list() # type: List[DriveGroupSpec]
self.build_drive_groups()
def build_drive_groups(self):
- for drive_group_name, drive_group_spec in self.drive_group_json.items():
- self.drive_groups.append(DriveGroupSpec.from_json
- (drive_group_spec, name=drive_group_name))
+ self.drive_groups = list(map(DriveGroupSpec.from_json, self.drive_group_json))
def __repr__(self):
return ", ".join([repr(x) for x in self.drive_groups])
-class DriveGroupSpec(object):
+class DriveGroupSpec(ServiceSpec):
"""
Describe a drive group in the same form that ceph-volume
understands.
_supported_features = [
"encrypted", "block_wal_size", "osds_per_device",
- "db_slots", "wal_slots", "block_db_size", "host_pattern",
+ "db_slots", "wal_slots", "block_db_size", "placement", "service_id",
"data_devices", "db_devices", "wal_devices", "journal_devices",
"data_directories", "osds_per_device", "objectstore", "osd_id_claims",
"journal_size"
]
def __init__(self,
- host_pattern=None, # type: str
- name=None, # type: str
+ placement=None, # type: Optional[PlacementSpec]
+ service_id=None, # type: str
data_devices=None, # type: Optional[DeviceSelection]
db_devices=None, # type: Optional[DeviceSelection]
wal_devices=None, # type: Optional[DeviceSelection]
journal_size=None, # type: Optional[int]
):
- #: A name for the drive group. Since we can have multiple
- # drive groups in a cluster we need a way to identify them.
- self.name = name
-
- # concept of applying a drive group to a (set) of hosts is tightly
- # linked to the drive group itself
- #
- #: An fnmatch pattern to select hosts. Can also be a single host.
- self.host_pattern = host_pattern
+ super(DriveGroupSpec, self).__init__('osd', service_id=service_id, placement=placement)
#: A :class:`ceph.deployment.drive_group.DeviceSelection`
self.data_devices = data_devices
self.osd_id_claims = osd_id_claims
@classmethod
- def from_json(cls, json_drive_group, name=None):
- # type: (dict, Optional[str]) -> DriveGroupSpec
+ def from_json(cls, json_drive_group):
+ # type: (dict) -> DriveGroupSpec
"""
Initialize 'Drive group' structure
from ceph.deployment.drive_selection import SizeMatcher
json_drive_group[key] = SizeMatcher.str_to_byte(json_drive_group[key])
+ if 'placement' in json_drive_group:
+ json_drive_group['placement'] = PlacementSpec.from_json(json_drive_group['placement'])
+
try:
args = {k: (DeviceSelection.from_json(v) if k.endswith('_devices') else v) for k, v in
json_drive_group.items()}
if not args:
raise DriveGroupValidationError("Didn't find Drivegroup specs")
- return DriveGroupSpec(name=name, **args)
+ return DriveGroupSpec(**args)
except (KeyError, TypeError) as e:
raise DriveGroupValidationError(str(e))
- def hosts(self, all_hosts):
- # type: (List[str]) -> List[str]
- return fnmatch.filter(all_hosts, self.host_pattern) # type: ignore
-
def validate(self, all_hosts):
# type: (List[str]) -> None
- if not isinstance(self.host_pattern, six.string_types):
+ if not isinstance(self.placement.host_pattern, six.string_types):
raise DriveGroupValidationError('host_pattern must be of type string')
specs = [self.data_devices, self.db_devices, self.wal_devices, self.journal_devices]
if self.objectstore not in ('filestore', 'bluestore'):
raise DriveGroupValidationError("objectstore not in ('filestore', 'bluestore')")
- if not self.hosts(all_hosts):
+ if not self.placement.pattern_matches_hosts(all_hosts):
raise DriveGroupValidationError(
- "host_pattern '{}' does not match any hosts".format(self.host_pattern))
+ "host_pattern '{}' does not match any hosts".format(self.placement.host_pattern))
if self.block_wal_size is not None and type(self.block_wal_size) != int:
raise DriveGroupValidationError('block_wal_size must be of type int')
if 'objectstore' in keys and self.objectstore == 'bluestore':
keys.remove('objectstore')
return "DriveGroupSpec(name={}->{})".format(
- self.name,
+ self.service_id,
', '.join('{}={}'.format(key, repr(getattr(self, key))) for key in keys)
)
+import fnmatch
import re
from collections import namedtuple
from typing import Optional, Dict, Any, List
For APIs that need to specify a host subset
"""
- def __init__(self, label=None, hosts=None, count=None, all_hosts=False):
- # type: (Optional[str], Optional[List], Optional[int], bool) -> None
+ def __init__(self, label=None, hosts=None, count=None, all_hosts=False, host_pattern=None):
+ # type: (Optional[str], Optional[List], Optional[int], bool, Optional[str]) -> None
if all_hosts and (count or hosts or label):
raise ServiceSpecValidationError('cannot combine all:true and count|hosts|label')
self.label = label
self.count = count # type: Optional[int]
self.all_hosts = all_hosts # type: bool
+ #: An fnmatch pattern to select hosts. Can also be a single host.
+ self.host_pattern = host_pattern
+
def is_empty(self):
return not self.all_hosts and \
self.label is None and \
not self.hosts and \
+ not self.host_pattern and \
self.count is None
-
+
def set_hosts(self, hosts):
# To backpopulate the .hosts attribute when using labels or count
# in the orchestrator backend.
self.hosts = hosts
+ def pattern_matches_hosts(self, all_hosts):
+ # type: (List[str]) -> List[str]
+ return fnmatch.filter(all_hosts, self.host_pattern) # type: ignore
+
def pretty_str(self):
kv = []
if self.count:
kv.append('%s' % ','.join([str(h) for h in self.hosts]))
if self.all_hosts:
kv.append('all:true')
+ if self.host_pattern:
+ kv.append('host_pattern:{}'.format(self.host_pattern))
return ' '.join(kv)
def __repr__(self):
'label': self.label,
'hosts': [host.to_json() for host in self.hosts] if self.hosts else [],
'count': self.count,
- 'all_hosts': self.all_hosts
+ 'all_hosts': self.all_hosts,
+ 'host_pattern': self.host_pattern,
}
def validate(self):
raise ServiceSpecValidationError('Host and label are mutually exclusive')
if self.count is not None and self.count <= 0:
raise ServiceSpecValidationError("num/count must be > 1")
+ if self.host_pattern and (self.hosts or self.label or self.all_hosts):
+ raise ServiceSpecValidationError('Host pattern is mutually exclusive to everything else'
+ '.')
@classmethod
def from_string(cls, arg):
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
from ceph.deployment import drive_selection
+from ceph.deployment.service_spec import PlacementSpec
from ceph.tests.factories import InventoryFactory
from ceph.tests.utils import _mk_inventory, _mk_device
osds_per_device='',
disk_format='bluestore'):
raw_sample_bluestore = {
- 'host_pattern': 'data*',
+ 'placement': {'host_pattern': 'data*'},
'data_devices': {
'size': '30G:50G',
'model': '42-RGB',
'encrypted': True,
}
raw_sample_filestore = {
- 'host_pattern': 'data*',
+ 'placement': {'host_pattern': 'data*'},
'objectstore': 'filestore',
'data_devices': {
'size': '30G:50G',
raw_sample = raw_sample_bluestore
if empty:
- raw_sample = {'host_pattern': 'data*'}
+ raw_sample = {'placement': {'host_pattern': 'data*'}}
dgo = DriveGroupSpec.from_json(raw_sample)
return dgo
testdata = [
(
- DriveGroupSpec(host_pattern='*', data_devices=DeviceSelection(all=True)),
+ DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), data_devices=DeviceSelection(all=True)),
_mk_inventory(_mk_device() * 5),
['/dev/sda', '/dev/sdb', '/dev/sdc', '/dev/sdd', '/dev/sde'], []
),
(
DriveGroupSpec(
- host_pattern='*',
+ placement=PlacementSpec(host_pattern='*'),
data_devices=DeviceSelection(all=True, limit=3),
db_devices=DeviceSelection(all=True)
),
),
(
DriveGroupSpec(
- host_pattern='*',
+ placement=PlacementSpec(host_pattern='*'),
data_devices=DeviceSelection(rotational=True),
db_devices=DeviceSelection(rotational=False)
),
),
(
DriveGroupSpec(
- host_pattern='*',
+ placement=PlacementSpec(host_pattern='*'),
data_devices=DeviceSelection(rotational=True),
db_devices=DeviceSelection(rotational=False)
),
),
(
DriveGroupSpec(
- host_pattern='*',
+ placement=PlacementSpec(host_pattern='*'),
data_devices=DeviceSelection(rotational=True),
db_devices=DeviceSelection(rotational=False)
),
from ceph.deployment import drive_selection, translate
from ceph.deployment.inventory import Device
+from ceph.deployment.service_spec import PlacementSpec
from ceph.tests.utils import _mk_inventory, _mk_device
from ceph.deployment.drive_group import DriveGroupSpec, DriveGroupSpecs, \
DeviceSelection, DriveGroupValidationError
def test_DriveGroup():
- dg_json = {'testing_drivegroup':
- {'host_pattern': 'hostname',
- 'data_devices': {'paths': ['/dev/sda']}
- }
- }
+ dg_json = [
+ {
+ 'service_id': 'testing_drivegroup',
+ 'placement': {'host_pattern': 'hostname'},
+ 'data_devices': {'paths': ['/dev/sda']}
+ }
+ ]
dgs = DriveGroupSpecs(dg_json)
for dg in dgs.drive_groups:
- assert dg.hosts(['hostname']) == ['hostname']
- assert dg.name == 'testing_drivegroup'
+ assert dg.placement.pattern_matches_hosts(['hostname']) == ['hostname']
+ assert dg.service_id == 'testing_drivegroup'
assert all([isinstance(x, Device) for x in dg.data_devices.paths])
assert dg.data_devices.paths[0].path == '/dev/sda'
def test_drivegroup_pattern():
- dg = DriveGroupSpec('node[1-3]', DeviceSelection(all=True))
- assert dg.hosts(['node{}'.format(i) for i in range(10)]) == ['node1', 'node2', 'node3']
+ dg = DriveGroupSpec(PlacementSpec(host_pattern='node[1-3]'), data_devices=DeviceSelection(all=True))
+ assert dg.placement.pattern_matches_hosts(['node{}'.format(i) for i in range(10)]) == ['node1', 'node2', 'node3']
def test_drive_selection():
devs = DeviceSelection(paths=['/dev/sda'])
- spec = DriveGroupSpec('node_name', data_devices=devs)
+ spec = DriveGroupSpec(PlacementSpec('node_name'), data_devices=devs)
assert all([isinstance(x, Device) for x in spec.data_devices.paths])
assert spec.data_devices.paths[0].path == '/dev/sda'
def test_ceph_volume_command_0():
- spec = DriveGroupSpec(host_pattern='*',
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
data_devices=DeviceSelection(all=True)
)
inventory = _mk_inventory(_mk_device()*2)
def test_ceph_volume_command_1():
- spec = DriveGroupSpec(host_pattern='*',
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
data_devices=DeviceSelection(rotational=True),
db_devices=DeviceSelection(rotational=False)
)
def test_ceph_volume_command_2():
- spec = DriveGroupSpec(host_pattern='*',
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
data_devices=DeviceSelection(size='200GB:350GB', rotational=True),
db_devices=DeviceSelection(size='200GB:350GB', rotational=False),
wal_devices=DeviceSelection(size='10G')
def test_ceph_volume_command_3():
- spec = DriveGroupSpec(host_pattern='*',
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
data_devices=DeviceSelection(size='200GB:350GB', rotational=True),
db_devices=DeviceSelection(size='200GB:350GB', rotational=False),
wal_devices=DeviceSelection(size='10G'),
def test_ceph_volume_command_4():
- spec = DriveGroupSpec(host_pattern='*',
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
data_devices=DeviceSelection(size='200GB:350GB', rotational=True),
db_devices=DeviceSelection(size='200GB:350GB', rotational=False),
wal_devices=DeviceSelection(size='10G'),
def test_ceph_volume_command_5():
- spec = DriveGroupSpec(host_pattern='*',
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
data_devices=DeviceSelection(rotational=True),
objectstore='filestore'
)
def test_ceph_volume_command_6():
- spec = DriveGroupSpec(host_pattern='*',
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
data_devices=DeviceSelection(rotational=False),
journal_devices=DeviceSelection(rotational=True),
journal_size='500M',