executing_tasks = [task for task in _res['executing_tasks'] if
task['metadata'] == task_metadata]
finished_tasks = [task for task in _res['finished_tasks'] if
- task['metadata'] == task_metadata]
+ task['metadata'] == task_metadata]
if not executing_tasks and finished_tasks:
res_task = finished_tasks[0]
return obj
return None
+
# TODP: pass defaults=(False,) to namedtuple() if python3.7
class JLeaf(namedtuple('JLeaf', ['typ', 'none'])):
def __new__(cls, typ, none=False):
return super().__new__(cls, typ, none)
+
JList = namedtuple('JList', ['elem_typ'])
JTuple = namedtuple('JList', ['elem_typs'])
JUnion = namedtuple('JUnion', ['elem_typs'])
+
class JObj(namedtuple('JObj', ['sub_elems', 'allow_unknown', 'none', 'unknown_schema'])):
def __new__(cls, sub_elems, allow_unknown=False, none=False, unknown_schema=None):
"""
def rm_dir(self, path, expectedStatus=200):
self._delete("/api/cephfs/{}/tree".format(self.get_fs_id()),
- params={'path': path})
+ params={'path': path})
self.assertStatus(expectedStatus)
def get_root_directory(self, expectedStatus=200):
self.assertEqual(len(snapshots), 0)
self._delete("/api/cephfs/{}/snapshot".format(fs_id),
- params={'path': '/movies/dune', 'name': 'test'})
+ params={'path': '/movies/dune', 'name': 'test'})
self.assertStatus(200)
data = self.ls_dir('/movies', 1)
self._ceph_cmd(['config', 'set', 'mon', config_name, 'true'])
self.wait_until_equal(
- lambda: self._get_config_by_name(config_name),
- [{'section': 'mon', 'value': 'true'}],
- timeout=30,
- period=1)
+ lambda: self._get_config_by_name(config_name),
+ [{'section': 'mon', 'value': 'true'}],
+ timeout=30,
+ period=1)
self._ceph_cmd(['config', 'set', 'mon', config_name, 'false'])
self.wait_until_equal(
- lambda: self._get_config_by_name(config_name),
- [{'section': 'mon', 'value': 'false'}],
- timeout=30,
- period=1)
+ lambda: self._get_config_by_name(config_name),
+ [{'section': 'mon', 'value': 'false'}],
+ timeout=30,
+ period=1)
# restore value
if orig_value:
})
self.assertStatus(201)
self.wait_until_equal(
- lambda: self._get_config_by_name(config_name),
- expected_result,
- timeout=30,
- period=1)
+ lambda: self._get_config_by_name(config_name),
+ expected_result,
+ timeout=30,
+ period=1)
# reset original value
self._clear_all_values_for_config_option(config_name)
})
self.assertStatus(201)
self.wait_until_equal(
- lambda: self._get_config_by_name(config_name),
- expected_result,
- timeout=30,
- period=1)
+ lambda: self._get_config_by_name(config_name),
+ expected_result,
+ timeout=30,
+ period=1)
# delete it and check if it's deleted
self._delete('/api/cluster_conf/{}?section={}'.format(config_name, 'mon'))
self.assertStatus(204)
self.wait_until_equal(
- lambda: self._get_config_by_name(config_name),
- None,
- timeout=30,
- period=1)
+ lambda: self._get_config_by_name(config_name),
+ None,
+ timeout=30,
+ period=1)
# reset original value
self._clear_all_values_for_config_option(config_name)
# check if config option value is still the original one
self.wait_until_equal(
- lambda: self._get_config_by_name(config_name),
- orig_value,
- timeout=30,
- period=1)
+ lambda: self._get_config_by_name(config_name),
+ orig_value,
+ timeout=30,
+ period=1)
def test_create_two_values(self):
config_name = 'debug_ms'
})
self.assertStatus(201)
self.wait_until_equal(
- lambda: self._get_config_by_name(config_name),
- expected_result,
- timeout=30,
- period=1)
+ lambda: self._get_config_by_name(config_name),
+ expected_result,
+ timeout=30,
+ period=1)
# reset original value
self._clear_all_values_for_config_option(config_name)
expected_result = [{'section': 'mon', 'value': '0/3'}]
self.wait_until_equal(
- lambda: self._get_config_by_name(config_name),
- expected_result,
- timeout=30,
- period=1)
+ lambda: self._get_config_by_name(config_name),
+ expected_result,
+ timeout=30,
+ period=1)
# reset original value
self._clear_all_values_for_config_option(config_name)
self.assertStatus(201)
self.wait_until_equal(
- lambda: self._get_config_by_name(config_name),
- expected_result,
- timeout=30,
- period=1)
+ lambda: self._get_config_by_name(config_name),
+ expected_result,
+ timeout=30,
+ period=1)
# reset original value
self._clear_all_values_for_config_option(config_name)
for config_name, value in expected_result.items():
self.wait_until_equal(
- lambda: self._get_config_by_name(config_name),
- [value],
- timeout=30,
- period=1)
+ lambda: self._get_config_by_name(config_name),
+ [value],
+ timeout=30,
+ period=1)
# reset original value
self._clear_all_values_for_config_option(config_name)
# check if config option values are still the original ones
for config_name, value in orig_values.items():
self.wait_until_equal(
- lambda: self._get_config_by_name(config_name),
- value,
- timeout=30,
- period=1)
+ lambda: self._get_config_by_name(config_name),
+ value,
+ timeout=30,
+ period=1)
def test_bulk_set_cant_update_at_runtime_partial(self):
config_options = {
# check if config option values are still the original ones
for config_name, value in orig_values.items():
self.wait_until_equal(
- lambda: self._get_config_by_name(config_name),
- value,
- timeout=30,
- period=1)
+ lambda: self._get_config_by_name(config_name),
+ value,
+ timeout=30,
+ period=1)
def test_check_existence(self):
"""
self.assertIn('services', data)
self.assertIn('type', data)
self.assertIn('desc', data)
- self.assertIn(data['type'], ['str', 'bool', 'float', 'int', 'size', 'uint', 'addr', 'addrvec', 'uuid',
- 'secs'])
+ self.assertIn(data['type'], ['str', 'bool', 'float', 'int', 'size', 'uint', 'addr',
+ 'addrvec', 'uuid', 'secs'])
if 'value' in data:
self.assertIn('source', data)
'names': JList(str),
'nodes': JList(JObj({}, allow_unknown=True))
}))
-
get_data = self._get('/api/erasure_code_profile/default')
self.assertEqual(get_data, default[0])
-
def test_create(self):
data = {'name': 'ecp32', 'k': 3, 'm': 2}
self._post('/api/erasure_code_profile', data)
'directory': str,
'nodes': JList(JObj({}, allow_unknown=True))
}))
-
cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha2', 'create', 'conf-node1'])
cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha2', 'create', 'conf-node2'])
cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha2', 'create', 'conf-node3'])
- cls._ceph_cmd(['dashboard', 'set-ganesha-clusters-rados-pool-namespace', 'cluster1:ganesha/ganesha1,cluster2:ganesha/ganesha2'])
+ cls._ceph_cmd(['dashboard', 'set-ganesha-clusters-rados-pool-namespace',
+ 'cluster1:ganesha/ganesha1,cluster2:ganesha/ganesha2'])
# RGW setup
cls._radosgw_admin_cmd([
def tearDownClass(cls):
super(GaneshaTest, cls).tearDownClass()
cls._radosgw_admin_cmd(['user', 'rm', '--uid', 'admin', '--purge-data'])
- cls._ceph_cmd(['osd', 'pool', 'delete', 'ganesha', 'ganesha', '--yes-i-really-really-mean-it'])
+ cls._ceph_cmd(['osd', 'pool', 'delete', 'ganesha', 'ganesha',
+ '--yes-i-really-really-mean-it'])
@DashboardTestCase.RunAs('test', 'test', [{'rbd-image': ['create', 'update', 'delete']}])
def test_read_access_permissions(self):
@classmethod
def create_export(cls, path, cluster_id, daemons, fsal, sec_label_xattr=None):
if fsal == 'CEPH':
- fsal = {"name": "CEPH", "user_id": "admin", "fs_name": None, "sec_label_xattr": sec_label_xattr}
+ fsal = {"name": "CEPH", "user_id": "admin", "fs_name": None,
+ "sec_label_xattr": sec_label_xattr}
pseudo = "/cephfs{}".format(path)
else:
fsal = {"name": "RGW", "rgw_user_id": "admin"}
exports = self._get("/api/nfs-ganesha/export")
self.assertEqual(len(exports), 0)
- data = self.create_export(cephfs_path, 'cluster1', ['node1', 'node2'], 'CEPH', "security.selinux")
+ data = self.create_export(cephfs_path, 'cluster1', ['node1', 'node2'], 'CEPH',
+ "security.selinux")
exports = self._get("/api/nfs-ganesha/export")
self.assertEqual(len(exports), 1)
"setting or deploy an NFS-Ganesha cluster with the Orchestrator."),
data['message'])
- self._ceph_cmd(['dashboard', 'set-ganesha-clusters-rados-pool-namespace', 'cluster1:ganesha/ganesha1,cluster2:ganesha/ganesha2'])
+ self._ceph_cmd(['dashboard', 'set-ganesha-clusters-rados-pool-namespace',
+ 'cluster1:ganesha/ganesha1,cluster2:ganesha/ganesha2'])
def test_valid_status(self):
data = self._get('/api/nfs-ganesha/status')
# -*- coding: utf-8 -*-
from __future__ import absolute_import
+
import json
from .helper import DashboardTestCase, JList, JObj
self._get('/api/monitor')
self.assertStatus(403)
-
def test_monitor_default(self):
data = self._get("/api/monitor")
self.assertStatus(200)
URL_INVENTORY = '/api/orchestrator/inventory'
URL_OSD = '/api/orchestrator/osd'
-
@property
def test_data_inventory(self):
return test_data['inventory']
data = self._get(self.URL_INVENTORY)
self.assertStatus(200)
- sorting_key = lambda node: node['name']
+ def sorting_key(node):
+ return node['name']
+
test_inventory = sorted(self.test_data_inventory, key=sorting_key)
resp_inventory = sorted(data, key=sorting_key)
self.assertEqual(len(test_inventory), len(resp_inventory))
data = self._get('/api/osd/0/histogram')
self.assertStatus(200)
self.assert_in_and_not_none(data['osd'], ['op_w_latency_in_bytes_histogram',
- 'op_r_latency_out_bytes_histogram'])
+ 'op_r_latency_out_bytes_histogram'])
def test_scrub(self):
self._post('/api/osd/0/scrub?deep=False')
}, allow_unknown=True)
pool_list_stat_schema = JObj(sub_elems={
- 'latest': JUnion([int,float]),
+ 'latest': JUnion([int, float]),
'rate': float,
'rates': JList(JAny(none=False)),
})
img = self._get('/api/block/image/{}%2F{}'.format(pool, name))
self._task_post("/api/block/image/{}%2F{}/move_trash".format(pool, name),
- {'delay': delay})
+ {'delay': delay})
self.assertStatus([200, 201])
return img['id']
res = self.create_image('rbd', None, 'test_rbd_twice', 10240)
self.assertStatus(400)
self.assertEqual(res, {"code": '17', 'status': 400, "component": "rbd",
- "detail": "[errno 17] RBD image already exists (error creating image)",
+ "detail": "[errno 17] RBD image already exists (error creating "
+ "image)",
'task': {'name': 'rbd/create',
'metadata': {'pool_name': 'rbd', 'namespace': None,
'image_name': 'test_rbd_twice'}}})
def test_clone_format_version(self):
config_name = 'rbd_default_clone_format'
+
def _get_config_by_name(conf_name):
data = self._get('/api/cluster_conf/{}'.format(conf_name))
if 'value' in data:
'value': value
})
self.wait_until_equal(
- lambda: _get_config_by_name(config_name),
- value,
- timeout=60)
+ lambda: _get_config_by_name(config_name),
+ value,
+ timeout=60)
clone_format_version = self._get('/api/block/image/clone_format_version')
self.assertEqual(clone_format_version, 1)
self.assertStatus(200)
'value': value
})
self.wait_until_equal(
- lambda: _get_config_by_name(config_name),
- value,
- timeout=60)
+ lambda: _get_config_by_name(config_name),
+ value,
+ timeout=60)
clone_format_version = self._get('/api/block/image/clone_format_version')
self.assertEqual(clone_format_version, 2)
self.assertStatus(200)
'value': value
})
self.wait_until_equal(
- lambda: _get_config_by_name(config_name),
- None,
- timeout=60)
+ lambda: _get_config_by_name(config_name),
+ None,
+ timeout=60)
def test_image_with_namespace(self):
self.create_namespace('rbd', 'ns')
self._get('/api/summary')
self.assertHeaders({
'server': 'Ceph-Dashboard'
- })
\ No newline at end of file
+ })
{[base-lint]deps}
commands =
flake8
+ flake8 --config=tox.ini ../../../../qa/tasks/mgr/dashboard
isort . --check
{[base-pylint]commands}
{[base-rst]commands}