Also: Added tox.
Co-authored-by: Juan Miguel Olmo MartÃnez <jolmomar@redhat.com>
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
.. automethod:: Orchestrator.create_osds
.. automethod:: Orchestrator.replace_osds
.. automethod:: Orchestrator.remove_osds
-.. autoclass:: OsdCreationSpec
+.. autoclass:: DeviceSelection
.. autoclass:: DriveGroupSpec
Upgrades
add_subdirectory(dashboard)
add_subdirectory(insights)
add_subdirectory(ansible)
+add_subdirectory(orchestrator_cli)
return ansible_operation
- def create_osds(self, osd_spec):
+ def create_osds(self, drive_group, all_hosts):
"""
Create one or more OSDs within a single Drive Group.
finer-grained OSD feature enablement (choice of backing store,
compression/encryption, etc).
- :param osd_spec: OsdCreationSpec
+ :param osd_spec: DriveGroupSpec
"""
def verify_config(self):
Please see the ceph-mgr module developer's guide for more information.
"""
-import time
-
try:
- from typing import TypeVar, Generic, List
+ from typing import TypeVar, Generic, List, Optional, Union
T = TypeVar('T')
G = Generic[T]
except ImportError:
T, G = object, object
+import time
+
class _Completion(G):
@property
"""
raise NotImplementedError()
+ def add_host(self, host):
+ # type: (str) -> WriteCompletion
+ """
+ Add a host to the orchestrator inventory.
+ :param host: hostname
+ """
+ raise NotImplementedError()
+
+ def remote_host(self, host):
+ # type: (str) -> WriteCompletion
+ """
+ Remove a host from the orchestrator inventory.
+ :param host: hostname
+ """
+ raise NotImplementedError()
+
+ def get_hosts(self):
+ # type: () -> ReadCompletion[List[InventoryNode]]
+ """
+ Report the hosts in the cluster.
+
+ The default implementation is extra slow.
+ :return: list of InventoryNodes
+ """
+ return self.get_inventory()
+
def get_inventory(self, node_filter=None):
# type: (InventoryFilter) -> ReadCompletion[List[InventoryNode]]
"""
assert not (service_name and service_id)
raise NotImplementedError()
- def create_osds(self, osd_spec):
- # type: (OsdCreationSpec) -> WriteCompletion
+ def create_osds(self, drive_group, all_hosts):
+ # type: (DriveGroupSpec, List[str]) -> WriteCompletion
"""
Create one or more OSDs within a single Drive Group.
finer-grained OSD feature enablement (choice of backing store,
compression/encryption, etc).
- :param osd_spec: OsdCreationSpec
+ :param drive_group: DriveGroupSpec
+ :param all_hosts: TODO, this is required because the orchestrator methods are not composable
"""
raise NotImplementedError()
- def replace_osds(self, osd_spec):
- # type: (OsdCreationSpec) -> WriteCompletion
+ def replace_osds(self, drive_group):
+ # type: (DriveGroupSpec) -> WriteCompletion
"""
Like create_osds, but the osd_id_claims must be fully
populated.
return {k: v for (k, v) in out.items() if v is not None}
+class DeviceSelection(object):
+ def __init__(self, paths=None, id_model=None, size=None, rotates=None, count=None):
+ # type: (List[str], str, str, bool, int) -> None
+ """
+ ephemeral drive group device specification
+
+ :param paths: abs paths to the devices.
+ :param id_model: A wildcard string. e.g: "SDD*"
+ :param size: Size specification of format LOW:HIGH.
+ Can also take the the form :HIGH, LOW:
+ or an exact value (as ceph-volume inventory reports)
+ :param rotates: is the drive rotating or not
+ :param count: if this is present limit the number of drives to this number.
+
+ Any attributes (even none) can be included in the device
+ specification structure.
+
+ TODO: translate from the user interface (Drive Groups) to an actual list of devices.
+ """
+ if paths is None:
+ paths = []
+ self.paths = paths # type: List[str]
+ if self.paths and any(p is not None for p in [id_model, size, rotates, count]):
+ raise TypeError('`paths` and other parameters are mutually exclusive')
+
+ self.id_model = id_model
+ self.size = size
+ self.rotates = rotates
+ self.count = count
+
+ @classmethod
+ def from_json(cls, device_spec):
+ return cls(**device_spec)
+
+
class DriveGroupSpec(object):
"""
Describe a drive group in the same form that ceph-volume
understands.
"""
- def __init__(self, devices):
- self.devices = devices
+ def __init__(self, host_pattern, data_devices, db_devices=None, wal_devices=None, journal_devices=None,
+ osds_per_device=None, objectstore='bluestore', encrypted=False, db_slots=None,
+ wal_slots=None):
+ # type: (str, DeviceSelection, Optional[DeviceSelection], Optional[DeviceSelection], Optional[DeviceSelection], int, str, bool, int, int) -> ()
+ # concept of applying a drive group to a (set) of hosts is tightly
+ # linked to the drive group itself
+ #
+ # An fnmatch pattern to select hosts. Can also be a single host.
+ self.host_pattern = host_pattern
-class OsdCreationSpec(object):
- """
- Used during OSD creation.
+ self.data_devices = data_devices
+ self.db_devices = db_devices
+ self.wal_devices = wal_devices
+ self.journal_devices = journal_devices
- The drive names used here may be ephemeral.
- """
- def __init__(self):
- self.format = None # filestore, bluestore
+ # Number of osd daemons per "DATA" device.
+ # To fully utilize nvme devices multiple osds are required.
+ self.osds_per_device = osds_per_device
- self.node = None # name of a node
+ assert objectstore in ('filestore', 'bluestore')
+ self.objectstore = objectstore
- # List of device names
- self.drive_group = None
+ self.encrypted = encrypted
+ self.db_slots = db_slots
+ self.wal_slots = wal_slots
+
+ # FIXME: needs ceph-volume support
# Optional: mapping of drive to OSD ID, used when the
# created OSDs are meant to replace previous OSDs on
# the same node.
self.osd_id_claims = {}
- # Arbitrary JSON-serializable object.
- # Maybe your orchestrator knows how to do something
- # special like encrypting drives
- self.extended = {}
+ @classmethod
+ def from_json(self, json_drive_group):
+ """
+ Initialize and verify 'Drive group' structure
+ :param json_drive_group: A valid json string with a Drive Group
+ specification
+ """
+ args = {k: (DeviceSelection.from_json(v) if k.endswith('_devices') else v) for k, v in
+ json_drive_group.items()}
+ return DriveGroupSpec(**args)
+
+ def hosts(self, all_hosts):
+ import fnmatch
+ return fnmatch.filter(all_hosts, self.host_pattern)
class StatelessServiceSpec(object):
--- /dev/null
+set(MGR_ORCHESTRATOR_CLI_VIRTUALENV ${CEPH_BUILD_VIRTUALENV}/mgr-orchestrator_cli-virtualenv)
+
+add_custom_target(mgr-orchestrator_cli-test-venv
+ COMMAND ${CMAKE_SOURCE_DIR}/src/tools/setup-virtualenv.sh --python=${MGR_PYTHON_EXECUTABLE} ${MGR_ORCHESTRATOR_CLI_VIRTUALENV}
+ WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/src/pybind/mgr/orchestrator_cli
+ COMMENT "orchestrator_cli tests virtualenv is being created")
+add_dependencies(tests mgr-orchestrator_cli-test-venv)
+from __future__ import absolute_import
+import os
-from .module import OrchestratorCli
+if 'UNITTEST' not in os.environ:
+ from .module import OrchestratorCli
+else:
+ import sys
+ import mock
+ sys.path.append("..")
+ sys.modules['ceph_module'] = mock.Mock()
return HandleCommandResult(-errno.EINVAL,
stderr="Invalid device spec, should be <node>:<device>")
- spec = orchestrator.OsdCreationSpec()
- spec.node = node_name
- spec.format = "bluestore"
- spec.drive_group = orchestrator.DriveGroupSpec([block_device])
+ devs = orchestrator.DeviceSelection(paths=block_device)
+ spec = orchestrator.DriveGroupSpec(node_name, data_devices=devs)
- completion = self.create_osds(spec)
+ # TODO: Remove this and make the orchestrator composable
+ host_completion = self.get_hosts()
+ self.wait([host_completion])
+ all_hosts = [h.name for h in host_completion.result]
+
+ completion = self.create_osds(spec, all_hosts)
self._orchestrator_wait([completion])
return HandleCommandResult()
--- /dev/null
+#!/usr/bin/env bash
+
+# run from ./ or from ../
+: ${MGR_ORCHESTRATOR_CLI_VIRTUALENV:=/tmp/mgr-orchestrator_cli-virtualenv}
+: ${WITH_PYTHON2:=ON}
+: ${WITH_PYTHON3:=ON}
+: ${CEPH_BUILD_DIR:=$PWD/.tox}
+test -d orchestrator_cli && cd orchestrator_cli
+
+if [ -e tox.ini ]; then
+ TOX_PATH=$(readlink -f tox.ini)
+else
+ TOX_PATH=$(readlink -f $(dirname $0)/tox.ini)
+fi
+
+# tox.ini will take care of this.
+unset PYTHONPATH
+export CEPH_BUILD_DIR=$CEPH_BUILD_DIR
+
+if [ -f ${MGR_ORCHESTRATOR_CLI_VIRTUALENV}/bin/activate ]
+then
+ source ${MGR_ORCHESTRATOR_CLI_VIRTUALENV}/bin/activate
+fi
+
+if [ "$WITH_PYTHON2" = "ON" ]; then
+ ENV_LIST+="py27"
+fi
+if [ "$WITH_PYTHON3" = "ON" ]; then
+ ENV_LIST+=",py3"
+fi
+
+tox -c ${TOX_PATH} -e ${ENV_LIST}
--- /dev/null
+from __future__ import absolute_import
+import pytest
+
+
+from orchestrator import DriveGroupSpec, DeviceSelection
+
+
+def test_DriveGroup():
+ dg_json = {
+ 'host_pattern': 'hostname',
+ 'data_devices': {'paths': ['/dev/sda']}
+ }
+
+ dg = DriveGroupSpec.from_json(dg_json)
+ assert dg.hosts(['hostname']) == ['hostname']
+ assert dg.data_devices.paths == ['/dev/sda']
+
+
+def test_DriveGroup_fail():
+ with pytest.raises(TypeError):
+ DriveGroupSpec.from_json({})
+
+
+def test_drivegroup_pattern():
+ dg = DriveGroupSpec('node[1-3]', DeviceSelection())
+ assert dg.hosts(['node{}'.format(i) for i in range(10)]) == ['node1', 'node2', 'node3']
+
+
+def test_drive_selection():
+ devs = DeviceSelection(paths=['/dev/sda'])
+ spec = DriveGroupSpec('node_name', data_devices=devs)
+ assert spec.data_devices.paths == ['/dev/sda']
+
+ with pytest.raises(TypeError, match='exclusive'):
+ DeviceSelection(paths=['/dev/sda'], rotates=False)
+
--- /dev/null
+[tox]
+envlist = py27,py3
+skipsdist = true
+toxworkdir = {env:CEPH_BUILD_DIR}/orchestrator_cli
+minversion = 2.5
+
+[testenv]
+deps =
+ pytest
+ mock
+ requests-mock
+setenv=
+ UNITTEST = true
+ py27: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.2
+ py3: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.3
+
+commands=
+ {envbindir}/py.test .
import functools
import os
import uuid
-
-from mgr_module import MgrModule
-
-import orchestrator
+try:
+ from typing import List
+except ImportError:
+ pass # just for type checking
try:
from kubernetes import client, config
client = None
config = None
+from mgr_module import MgrModule
+import orchestrator
+
from .rook_cluster import RookCluster
lambda: self.rook_cluster.rm_service(service_type, service_id), None,
"Removing {0} services for {1}".format(service_type, service_id))
- def create_osds(self, spec):
- # Validate spec.node
- if not self.rook_cluster.node_exists(spec.node):
+ def create_osds(self, drive_group, all_hosts):
+ # type: (orchestrator.DriveGroupSpec, List[str]) -> RookWriteCompletion
+
+ assert len(drive_group.hosts(all_hosts)) == 1
+ if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]):
raise RuntimeError("Node '{0}' is not in the Kubernetes "
- "cluster".format(spec.node))
+ "cluster".format(drive_group.hosts(all_hosts)))
# Validate whether cluster CRD can accept individual OSD
# creations (i.e. not useAllDevices)
"support OSD creation.")
def execute():
- self.rook_cluster.add_osds(spec)
+ self.rook_cluster.add_osds(drive_group, all_hosts)
def is_complete():
# Find OSD pods on this host
pods = self._k8s.list_namespaced_pod("rook-ceph",
label_selector="rook_cluster=rook-ceph,app=rook-ceph-osd",
field_selector="spec.nodeName={0}".format(
- spec.node
+ drive_group.hosts(all_hosts)[0]
)).items
for p in pods:
pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
continue
metadata = self.get_metadata('osd', "%s" % osd_id)
- if metadata and metadata['devices'] in spec.drive_group.devices:
+ if metadata and metadata['devices'] in drive_group.data_devices.paths:
found.append(osd_id)
else:
self.log.info("ignoring osd {0} {1}".format(
return RookWriteCompletion(execute, is_complete,
"Creating OSD on {0}:{1}".format(
- spec.node,
- spec.drive_group.devices
+ drive_group.hosts(all_hosts)[0],
+ drive_group.data_devices.paths
))
except ImportError:
ApiException = None
+try:
+ import orchestrator
+except ImportError:
+ pass # just used for type checking.
+
+
ROOK_SYSTEM_NS = "rook-ceph-system"
ROOK_API_VERSION = "v1"
ROOK_API_NAME = "ceph.rook.io/%s" % ROOK_API_VERSION
else:
return True
- def add_osds(self, spec):
+ def add_osds(self, drive_group, all_hosts):
+ # type: (orchestrator.DriveGroupSpec, List[str]) -> None
"""
Rook currently (0.8) can only do single-drive OSDs, so we
treat all drive groups as just a list of individual OSDs.
"""
- # assert isinstance(spec, orchestrator.OsdSpec)
-
- block_devices = spec.drive_group.devices
+ block_devices = drive_group.data_devices
- assert spec.format in ("bluestore", "filestore")
+ assert drive_group.objectstore in ("bluestore", "filestore")
# The CRD looks something like this:
# nodes:
current_nodes = current_cluster['spec']['storage'].get('nodes', [])
- if spec.node not in [n['name'] for n in current_nodes]:
+ if drive_group.hosts(all_hosts)[0] not in [n['name'] for n in current_nodes]:
patch.append({
"op": "add", "path": "/spec/storage/nodes/-", "value": {
- "name": spec.node,
+ "name": drive_group.hosts(all_hosts)[0],
"devices": [{'name': d} for d in block_devices],
"storeConfig": {
- "storeType": spec.format
+ "storeType": drive_group.objectstore
}
}
})
node_idx = None
current_node = None
for i, c in enumerate(current_nodes):
- if c['name'] == spec.node:
+ if c['name'] == drive_group.hosts(all_hosts)[0]:
current_node = c
node_idx = i
break
def add_stateless_service(self, service_type, spec):
raise NotImplementedError(service_type)
- def create_osds(self, spec):
- raise NotImplementedError(str(spec))
+ def create_osds(self, drive_group, all_hosts):
+ raise NotImplementedError(str(drive_group))
def service_action(self, action, service_type, service_name=None, service_id=None):
return TestWriteCompletion(
list(APPEND tox_tests run-tox-mgr-ansible)
set(MGR_ANSIBLE_VIRTUALENV ${CEPH_BUILD_VIRTUALENV}/mgr-ansible-virtualenv)
list(APPEND env_vars_for_tox_tests MGR_ANSIBLE_VIRTUALENV=${MGR_ANSIBLE_VIRTUALENV})
+
+ add_test(NAME run-tox-mgr-orchestrator_cli COMMAND bash ${CMAKE_SOURCE_DIR}/src/pybind/mgr/orchestrator_cli/run-tox.sh)
+ list(APPEND tox_tests run-tox-mgr-orchestrator_cli)
+ set(MGR_ORCHESTRATOR_CLI_VIRTUALENV ${CEPH_BUILD_VIRTUALENV}/mgr-orchestrator_cli-virtualenv)
+ list(APPEND env_vars_for_tox_tests MGR_ORCHESTRATOR_CLI_VIRTUALENV=${MGR_ORCHESTRATOR_CLI_VIRTUALENV})
endif()
set_property(