"""
return self.mgr_cluster.mon_manager.raw_cluster_cmd_result("orchestrator", *args, **kwargs)
+ def _test_orchestrator_cmd_result(self, *args, **kwargs):
+ return self.mgr_cluster.mon_manager.raw_cluster_cmd_result("test_orchestrator", *args, **kwargs)
+
def setUp(self):
super(TestOrchestratorCli, self).setUp()
evs = json.loads(self._progress_cmd('json'))['completed']
self.assertEqual(len(evs), 1)
self.assertIn('update_mgrs', evs[0]['message'])
+
+ def test_load_data(self):
+ data = {
+ 'inventory': [
+ {
+ 'name': 'host0',
+ 'devices': [
+ {
+ 'type': 'hdd',
+ 'id': '/dev/sda',
+ 'size': 1024**4 * 4,
+ 'rotates': True
+ }
+ ]
+ },
+ {
+ 'name': 'host1',
+ 'devices': [
+ {
+ 'type': 'hdd',
+ 'id': '/dev/sda',
+ 'size': 1024**4 * 4,
+ 'rotates': True
+ }
+ ]
+ }
+ ],
+ 'services': [
+ {
+ 'nodename': 'host0',
+ 'service_type': 'mon',
+ 'service_instance': 'a'
+ },
+ {
+ 'nodename': 'host1',
+ 'service_type': 'osd',
+ 'service_instance': '1'
+ }
+ ]
+ }
+
+ ret = self._test_orchestrator_cmd_result('load_data', '-i', '-', stdin=json.dumps(data))
+ self.assertEqual(ret, 0)
+ out = self._orch_cmd('device', 'ls', '--format=json')
+ inventory = data['inventory']
+ inventory_result = json.loads(out)
+ self.assertEqual(len(inventory), len(inventory_result))
+
+ out = self._orch_cmd('device', 'ls', 'host0', '--format=json')
+ inventory_result = json.loads(out)
+ self.assertEqual(len(inventory_result), 1)
+ self.assertEqual(inventory_result[0]['name'], 'host0')
+
+ out = self._orch_cmd('service', 'ls', '--format=json')
+ services = data['services']
+ services_result = json.loads(out)
+ self.assertEqual(len(services), len(services_result))
+
+ out = self._orch_cmd('service', 'ls', 'host0', '--format=json')
+ services_result = json.loads(out)
+ self.assertEqual(len(services_result), 1)
+ self.assertEqual(services_result[0]['nodename'], 'host0')
+
+ # test invalid input file: invalid json
+ json_str = '{ "inventory: '
+ ret = self._test_orchestrator_cmd_result('load_data', '-i', '-', stdin=json_str)
+ self.assertEqual(ret, errno.EINVAL)
+
+ # test invalid input file: missing key
+ json_str = '{ "inventory": [{"devices": []}] }'
+ ret = self._test_orchestrator_cmd_result('load_data', '-i', '-', stdin=json_str)
+ self.assertEqual(ret, errno.EINVAL)
+
+ # load empty data for other tests
+ ret = self._test_orchestrator_cmd_result('load_data', '-i', '-', stdin='{}')
+ self.assertEqual(ret, 0)
Please see the ceph-mgr module developer's guide for more information.
"""
+import copy
import sys
import time
import fnmatch
+from functools import wraps
import uuid
import string
import random
self.label = None
+def handle_type_error(method):
+ @wraps(method)
+ def inner(cls, *args, **kwargs):
+ try:
+ return method(cls, *args, **kwargs)
+ except TypeError as e:
+ error_msg = '{}: {}'.format(cls.__name__, e)
+ raise OrchestratorValidationError(error_msg)
+ return inner
+
+
class ServiceDescription(object):
"""
For responding to queries about the status of a particular service,
return {k: v for (k, v) in out.items() if v is not None}
@classmethod
+ @handle_type_error
def from_json(cls, data):
return cls(**data)
available=self.available, dev_id=self.dev_id,
extended=self.extended)
+ @classmethod
+ @handle_type_error
+ def from_json(cls, data):
+ return cls(**data)
+
@classmethod
def from_ceph_volume_inventory(cls, data):
# TODO: change InventoryDevice itself to mirror c-v inventory closely!
def to_json(self):
return {'name': self.name, 'devices': [d.to_json() for d in self.devices]}
+ @classmethod
+ def from_json(cls, data):
+ try:
+ _data = copy.deepcopy(data)
+ name = _data.pop('name')
+ devices = [InventoryDevice.from_json(device)
+ for device in _data.pop('devices')]
+ if _data:
+ error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
+ raise OrchestratorValidationError(error_msg)
+ return cls(name, devices)
+ except KeyError as e:
+ error_msg = '{} is required for {}'.format(e, cls.__name__)
+ raise OrchestratorValidationError(error_msg)
+
@classmethod
def from_nested_items(cls, hosts):
devs = InventoryDevice.from_ceph_volume_inventory_list
from __future__ import absolute_import
-
import json
import pytest
+from orchestrator import ReadCompletion, raise_if_exception, RGWSpec
+from orchestrator import InventoryNode, InventoryDevice, ServiceDescription
+from orchestrator import OrchestratorValidationError
+
+
+def _test_resource(data, resource_class, extra):
+ # create the instance with normal way
+ rsc = resource_class(**data)
+ if hasattr(rsc, 'pretty_print'):
+ assert rsc.pretty_print()
+
+ # ensure we can deserialize and serialize
+ rsc = resource_class.from_json(data)
+ rsc.to_json()
+
+ # if there is an unexpected data provided
+ data.update(extra)
+ with pytest.raises(OrchestratorValidationError):
+ resource_class.from_json(data)
-from orchestrator import InventoryDevice, ReadCompletion, raise_if_exception, RGWSpec
-def test_inventory_device():
- i_d = InventoryDevice()
- s = i_d.pretty_print()
- assert len(s)
+def test_inventory():
+ json_data = {
+ 'name': 'host0',
+ 'devices': [
+ {
+ 'type': 'hdd',
+ 'id': '/dev/sda',
+ 'size': 1024,
+ 'rotates': True
+ }
+ ]
+ }
+ _test_resource(json_data, InventoryNode, {'abc': False})
+ for devices in json_data['devices']:
+ _test_resource(devices, InventoryDevice, {'abc': False})
+
+ json_data = [{}, {'name': 'host0'}, {'devices': []}]
+ for data in json_data:
+ with pytest.raises(OrchestratorValidationError):
+ InventoryNode.from_json(data)
+
+
+def test_service_description():
+ json_data = {
+ 'nodename': 'test',
+ 'service_type': 'mon',
+ 'service_instance': 'a'
+ }
+ _test_resource(json_data, ServiceDescription, {'abc': False})
def test_raise():
+import errno
import json
import re
import os
import uuid
from subprocess import check_output, CalledProcessError
+import six
+
+from mgr_module import CLICommand, HandleCommandResult
from mgr_module import MgrModule, PersistentStoreDict
import orchestrator
return all(c.is_complete for c in completions)
+ @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
+ def _load_data(self, inbuf):
+ try:
+ data = json.loads(inbuf)
+ self._init_data(data)
+ return HandleCommandResult()
+ except json.decoder.JSONDecodeError as e:
+ msg = 'Invalid JSON file: {}'.format(e)
+ return HandleCommandResult(retval=-errno.EINVAL, stderr=msg)
+ except orchestrator.OrchestratorValidationError as e:
+ return HandleCommandResult(retval=-errno.EINVAL, stderr=str(e))
+
def available(self):
return True, ""
self._initialized = threading.Event()
self._shutdown = threading.Event()
+ self._init_data({})
def shutdown(self):
self._shutdown.set()
self._shutdown.wait(5)
+ def _init_data(self, data=None):
+ self._inventory = [orchestrator.InventoryNode.from_json(inventory_node)
+ for inventory_node in data.get('inventory', [])]
+ self._services = [orchestrator.ServiceDescription.from_json(service)
+ for service in data.get('services', [])]
+
@deferred_read
def get_inventory(self, node_filter=None, refresh=False):
"""
"""
if node_filter and node_filter.nodes is not None:
assert isinstance(node_filter.nodes, list)
+
+ if self._inventory:
+ if node_filter:
+ return list(filter(lambda node: node.name in node_filter.nodes,
+ self._inventory))
+ return self._inventory
+
try:
c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json'])
except OSError:
it returns the mgr we're running in.
"""
if service_type:
- assert service_type in ("mds", "osd", "mon", "rgw", "mgr"), service_type + " unsupported"
+ support_services = ("mds", "osd", "mon", "rgw", "mgr", "iscsi")
+ assert service_type in support_services, service_type + " unsupported"
+
+ if self._services:
+ if node_name:
+ return list(filter(lambda svc: svc.nodename == node_name, self._services))
+ return self._services
out = map(str, check_output(['ps', 'aux']).splitlines())
types = [service_type] if service_type else ("mds", "osd", "mon", "rgw", "mgr")
@deferred_read
def get_hosts(self):
+ if self._inventory:
+ return self._inventory
return [orchestrator.InventoryNode('localhost', [])]
@deferred_write("add_host")
raise orchestrator.NoOrchestrator()
if host == 'raise_import_error':
raise ImportError("test_orchestrator not enabled")
- assert isinstance(host, str)
+ assert isinstance(host, six.string_types)
@deferred_write("remove_host")
def remove_host(self, host):
- assert isinstance(host, str)
+ assert isinstance(host, six.string_types)
@deferred_write("update_mgrs")
def update_mgrs(self, num, hosts):