# -*- coding: utf-8 -*-
from __future__ import absolute_import
+import json
from .helper import DashboardTestCase
+from .test_orchestrator import test_data
class HostControllerTest(DashboardTestCase):
AUTH_ROLES = ['read-only']
+ URL_HOST = '/api/host'
+
+ @classmethod
+ def setUpClass(cls):
+ super(HostControllerTest, cls).setUpClass()
+ cls._load_module("test_orchestrator")
+
+ cmd = ['orchestrator', 'set', 'backend', 'test_orchestrator']
+ cls.mgr_cluster.mon_manager.raw_cluster_cmd(*cmd)
+
+ cmd = ['test_orchestrator', 'load_data', '-i', '-']
+ cls.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd, stdin=json.dumps(test_data))
+
+ @classmethod
+ def tearDownClass(cls):
+ cmd = ['test_orchestrator', 'load_data', '-i', '-']
+ cls.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd, stdin='{}')
+
@DashboardTestCase.RunAs('test', 'test', ['block-manager'])
def test_access_permissions(self):
- self._get('/api/host')
+ self._get(self.URL_HOST)
self.assertStatus(403)
def test_host_list(self):
- data = self._get('/api/host')
+ data = self._get(self.URL_HOST)
self.assertStatus(200)
+ orch_hostnames = {inventory_node['name'] for inventory_node in test_data['inventory']}
+
for server in data:
self.assertIn('services', server)
self.assertIn('hostname', server)
self.assertIn('ceph_version', server)
self.assertIsNotNone(server['hostname'])
self.assertIsNotNone(server['ceph_version'])
- self.assertGreaterEqual(len(server['services']), 1)
for service in server['services']:
self.assertIn('type', service)
self.assertIn('id', service)
self.assertIsNotNone(service['type'])
self.assertIsNotNone(service['id'])
+
+ self.assertIn('sources', server)
+ in_ceph, in_orchestrator = server['sources']['ceph'], server['sources']['orchestrator']
+ if in_ceph:
+ self.assertGreaterEqual(len(server['services']), 1)
+ if not in_orchestrator:
+ self.assertNotIn(server['hostname'], orch_hostnames)
+ if in_orchestrator:
+ self.assertEqual(len(server['services']), 0)
+ self.assertIn(server['hostname'], orch_hostnames)
+
+ def test_host_list_with_sources(self):
+ data = self._get('{}?sources=orchestrator'.format(self.URL_HOST))
+ self.assertStatus(200)
+ test_hostnames = {inventory_node['name'] for inventory_node in test_data['inventory']}
+ resp_hostnames = {host['hostname'] for host in data}
+ self.assertEqual(test_hostnames, resp_hostnames)
+
+ data = self._get('{}?sources=ceph'.format(self.URL_HOST))
+ self.assertStatus(200)
+ test_hostnames = {inventory_node['name'] for inventory_node in test_data['inventory']}
+ resp_hostnames = {host['hostname'] for host in data}
+ self.assertEqual(len(test_hostnames.intersection(resp_hostnames)), 0)
--- /dev/null
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+import os
+import json
+
+from .helper import DashboardTestCase
+
+
+test_data = {
+ 'inventory': [
+ {
+ 'name': 'test-host0',
+ 'devices': [
+ {
+ 'type': 'hdd',
+ 'id': '/dev/sda',
+ 'size': 1024**4 * 4,
+ 'rotates': True
+ }
+ ]
+ },
+ {
+ 'name': 'test-host1',
+ 'devices': [
+ {
+ 'type': 'hdd',
+ 'id': '/dev/sda',
+ 'size': 1024**4 * 4,
+ 'rotates': True
+ }
+ ]
+ }
+ ],
+ 'services': [
+ {
+ 'nodename': 'test-host0',
+ 'service_type': 'mon',
+ 'service_instance': 'a'
+ },
+ {
+ 'nodename': 'test-host0',
+ 'service_type': 'mgr',
+ 'service_instance': 'x'
+ },
+ {
+ 'nodename': 'test-host0',
+ 'service_type': 'osd',
+ 'service_instance': '0'
+ },
+ {
+ 'nodename': 'test-host1',
+ 'service_type': 'osd',
+ 'service_instance': '1'
+ }
+ ]
+}
+
+
+class OrchestratorControllerTest(DashboardTestCase):
+
+ AUTH_ROLES = ['read-only']
+
+ URL_STATUS = '/api/orchestrator/status'
+ URL_INVENTORY = '/api/orchestrator/inventory'
+ URL_SERVICE = '/api/orchestrator/service'
+
+
+ @property
+ def test_data_inventory(self):
+ return test_data['inventory']
+
+ @property
+ def test_data_services(self):
+ return test_data['services']
+
+ @classmethod
+ def setUpClass(cls):
+ super(OrchestratorControllerTest, cls).setUpClass()
+ cls._load_module('test_orchestrator')
+ cmd = ['orchestrator', 'set', 'backend', 'test_orchestrator']
+ cls.mgr_cluster.mon_manager.raw_cluster_cmd(*cmd)
+
+ cmd = ['test_orchestrator', 'load_data', '-i', '-']
+ cls.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd, stdin=json.dumps(test_data))
+
+ @classmethod
+ def tearDownClass(cls):
+ cmd = ['test_orchestrator', 'load_data', '-i', '-']
+ cls.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd, stdin='{}')
+
+ def _validate_inventory(self, data, resp_data):
+ self.assertEqual(data['name'], resp_data['name'])
+ self.assertEqual(len(data['devices']), len(resp_data['devices']))
+
+ if not data['devices']:
+ return
+ test_devices = sorted(data['devices'], key=lambda d: d['id'])
+ resp_devices = sorted(resp_data['devices'], key=lambda d: d['id'])
+
+ for test, resp in zip(test_devices, resp_devices):
+ self._validate_device(test, resp)
+
+ def _validate_device(self, data, resp_data):
+ for key, value in data.items():
+ self.assertEqual(value, resp_data[key])
+
+ def _validate_service(self, data, resp_data):
+ for key, value in data.items():
+ self.assertEqual(value, resp_data[key])
+
+ @DashboardTestCase.RunAs('test', 'test', ['block-manager'])
+ def test_access_permissions(self):
+ self._get(self.URL_STATUS)
+ self.assertStatus(200)
+ self._get(self.URL_INVENTORY)
+ self.assertStatus(403)
+ self._get(self.URL_SERVICE)
+ self.assertStatus(403)
+
+ def test_status_get(self):
+ data = self._get(self.URL_STATUS)
+ self.assertTrue(data['available'])
+
+ def test_iventory_list(self):
+ # get all inventory
+ data = self._get(self.URL_INVENTORY)
+ self.assertStatus(200)
+
+ sorting_key = lambda node: node['name']
+ test_inventory = sorted(self.test_data_inventory, key=sorting_key)
+ resp_inventory = sorted(data, key=sorting_key)
+ self.assertEqual(len(test_inventory), len(resp_inventory))
+ for test, resp in zip(test_inventory, resp_inventory):
+ self._validate_inventory(test, resp)
+
+ # get inventory by hostname
+ node = self.test_data_inventory[-1]
+ data = self._get('{}?hostname={}'.format(self.URL_INVENTORY, node['name']))
+ self.assertStatus(200)
+ self.assertEqual(len(data), 1)
+ self._validate_inventory(node, data[0])
+
+ def test_service_list(self):
+ # get all services
+ data = self._get(self.URL_SERVICE)
+ self.assertStatus(200)
+
+ sorting_key = lambda svc: '%(nodename)s.%(service_type)s.%(service_instance)s' % svc
+ test_services = sorted(self.test_data_services, key=sorting_key)
+ resp_services = sorted(data, key=sorting_key)
+ self.assertEqual(len(test_services), len(resp_services))
+ for test, resp in zip(test_services, resp_services):
+ self._validate_service(test, resp)
+
+ # get service by hostname
+ nodename = self.test_data_services[-1]['nodename']
+ test_services = sorted(filter(lambda svc: svc['nodename'] == nodename, test_services),
+ key=sorting_key)
+ data = self._get('{}?hostname={}'.format(self.URL_SERVICE, nodename))
+ resp_services = sorted(data, key=sorting_key)
+ for test, resp in zip(test_services, resp_services):
+ self._validate_service(test, resp)