except _ValError as e:
self.assertEqual(data, str(e))
+ def assertSchemaBody(self, schema):
+ self.assertSchema(self.jsonBody(), schema)
+
def assertBody(self, body):
self.assertEqual(self._resp.text, body)
def _validate_json(val, schema, path=[]):
"""
>>> d = {'a': 1, 'b': 'x', 'c': range(10)}
- ... ds = JObj({'a': JLeaf(int), 'b': JLeaf(str), 'c': JList(JLeaf(int))})
+ ... ds = JObj({'a': int, 'b': str, 'c': JList(int)})
... _validate_json(d, ds)
True
"""
raise _ValError('val not of type {}'.format(schema.typ), path)
return True
if isinstance(schema, JList):
+ if not isinstance(val, list):
+ raise _ValError('val="{}" is not a list'.format(val), path)
return all(_validate_json(e, schema.elem_typ, path + [i]) for i, e in enumerate(val))
if isinstance(schema, JTuple):
return all(_validate_json(val[i], typ, path + [i])
return True
elif val is None:
raise _ValError('val is None', path)
+ if not hasattr(val, 'keys'):
+ raise _ValError('val="{}" is not a dict'.format(val), path)
missing_keys = set(schema.sub_elems.keys()).difference(set(val.keys()))
if missing_keys:
raise _ValError('missing keys: {}'.format(missing_keys), path)
_validate_json(val[sub_elem_name], sub_elem, path + [sub_elem_name])
for sub_elem_name, sub_elem in schema.sub_elems.items()
)
+ if schema in [str, int, float, bool, six.string_types]:
+ return _validate_json(val, JLeaf(schema), path)
assert False, str(path)
import six
-from .helper import DashboardTestCase
+from .helper import DashboardTestCase, JObj, JList
log = logging.getLogger(__name__)
@classmethod
def tearDownClass(cls):
super(PoolTest, cls).tearDownClass()
- for name in ['dashboard_pool1', 'dashboard_pool2', 'dashboard_pool3']:
+ for name in ['dashboard_pool1', 'dashboard_pool2', 'dashboard_pool3', 'dashboard_pool_update1']:
cls._ceph_cmd(['osd', 'pool', 'delete', name, name, '--yes-i-really-really-mean-it'])
cls._ceph_cmd(['osd', 'erasure-code-profile', 'rm', 'ecprofile'])
+ pool_schema = JObj(sub_elems={
+ 'pool_name': str,
+ 'type': str,
+ 'application_metadata': JList(str),
+ 'flags': int,
+ 'flags_names': str,
+ }, allow_unknown=True)
+
@DashboardTestCase.RunAs('test', 'test', [{'pool': ['create', 'update', 'delete']}])
def test_read_access_permissions(self):
self._get('/api/pool')
cluster_pools = self.ceph_cluster.mon_manager.list_pools()
self.assertEqual(len(cluster_pools), len(data))
+ self.assertSchemaBody(JList(self.pool_schema))
for pool in data:
- self.assertIn('pool_name', pool)
- self.assertIn('type', pool)
- self.assertIn('application_metadata', pool)
- self.assertIsInstance(pool['application_metadata'], list)
- self.assertIn('flags', pool)
- self.assertIn('flags_names', pool)
self.assertNotIn('stats', pool)
self.assertIn(pool['pool_name'], cluster_pools)
self._post('/api/pool/', data)
self.assertStatus(201)
- pool = self._get("/api/pool/" + data['pool'])
- self.assertStatus(200)
- try:
- for k, v in data.items():
- if k == 'pool_type':
- self.assertEqual(pool['type'], data['pool_type'])
- elif k == 'pg_num':
- self.assertEqual(pool[k], int(v), '{}: {} != {}'.format(k, pool[k], v))
- elif k == 'application_metadata':
- self.assertIsInstance(pool[k], list)
- self.assertEqual(pool[k],
- data['application_metadata'])
- elif k == 'pool':
- self.assertEqual(pool['pool_name'], v)
- elif k in ['compression_mode', 'compression_algorithm']:
- self.assertEqual(pool['options'][k], data[k])
- elif k == 'compression_max_blob_size':
- self.assertEqual(pool['options'][k], int(data[k]))
- elif k == 'compression_required_ratio':
- self.assertEqual(pool['options'][k], float(data[k]))
- else:
- self.assertEqual(pool[k], v, '{}: {} != {}'.format(k, pool[k], v))
-
- except Exception:
- log.exception("test_pool_create: pool=%s", pool)
- raise
+ self._check_pool_properties(data)
self._delete("/api/pool/" + data['pool'])
self.assertStatus(204)
log.exception("test_pool_create: data=%s", data)
raise
+ def _check_pool_properties(self, data, pool_name=None):
+ if not pool_name:
+ pool_name = data['pool']
+ pool = self._get("/api/pool/" + pool_name)
+ self.assertStatus(200)
+ self.assertSchemaBody(self.pool_schema)
+ try:
+ for k, v in data.items():
+ if k == 'pool_type':
+ self.assertEqual(pool['type'], data['pool_type'])
+ elif k == 'pg_num':
+ self.assertEqual(pool[k], int(v), '{}: {} != {}'.format(k, pool[k], v))
+ elif k == 'application_metadata':
+ self.assertIsInstance(pool[k], list)
+ self.assertEqual(pool[k],
+ data['application_metadata'])
+ elif k == 'pool':
+ self.assertEqual(pool['pool_name'], v)
+ elif k in ['compression_mode', 'compression_algorithm']:
+ self.assertEqual(pool['options'][k], data[k])
+ elif k == 'compression_max_blob_size':
+ self.assertEqual(pool['options'][k], int(data[k]))
+ elif k == 'compression_required_ratio':
+ self.assertEqual(pool['options'][k], float(data[k]))
+ else:
+ self.assertEqual(pool[k], v, '{}: {} != {}'.format(k, pool[k], v))
+
+ except Exception:
+ log.exception("test_pool_create: pool=%s", pool)
+ raise
+
def test_pool_create(self):
self._ceph_cmd(['osd', 'crush', 'rule', 'create-erasure', 'ecrule'])
self._ceph_cmd(
for data in pools:
self._pool_create(data)
+ def test_update(self):
+ pool = [
+ {
+ 'pool': 'dashboard_pool_update1',
+ 'pg_num': '10',
+ 'pool_type': 'replicated',
+ },
+ {
+ 'application_metadata': ['rbd', 'sth'],
+ },
+ {
+ 'pg_num': '12',
+ },
+ {
+ 'application_metadata': ['rgw'],
+ },
+ {
+ 'compression_algorithm': 'zstd',
+ 'compression_mode': 'aggressive',
+ 'compression_max_blob_size': '10000000',
+ 'compression_required_ratio': '0.8',
+ }
+
+ ]
+ self._post('/api/pool/', pool[0])
+ self.assertStatus(201)
+
+ self._check_pool_properties(pool[0])
+
+ for data in pool[1:]:
+ self._put('/api/pool/' + pool[0]['pool'], data)
+ self._check_pool_properties(data, pool_name=pool[0]['pool'])
+
+ self._delete("/api/pool/" + pool[0]['pool'])
+ self.assertStatus(204)
+
def test_pool_create_fail(self):
data = {'pool_type': u'replicated', 'rule_name': u'dnf', 'pg_num': u'8', 'pool': u'sadfs'}
self._post('/api/pool/', data)
})
def test_pool_info(self):
- info_data = self._get("/api/pool/_info")
- self.assertEqual(set(info_data),
- {'pool_names', 'crush_rules_replicated', 'crush_rules_erasure',
- 'is_all_bluestore', 'compression_algorithms', 'compression_modes',
- 'osd_count'})
- self.assertTrue(all(isinstance(n, six.string_types) for n in info_data['pool_names']))
- self.assertTrue(
- all(isinstance(n, dict) for n in info_data['crush_rules_replicated']))
- self.assertTrue(
- all(isinstance(n, dict) for n in info_data['crush_rules_erasure']))
- self.assertIsInstance(info_data['is_all_bluestore'], bool)
- self.assertIsInstance(info_data['osd_count'], int)
- self.assertTrue(
- all(isinstance(n, six.string_types) for n in info_data['compression_algorithms']))
- self.assertTrue(
- all(isinstance(n, six.string_types) for n in info_data['compression_modes']))
+ self._get("/api/pool/_info")
+ self.assertSchemaBody(JObj({
+ 'pool_names': JList(six.string_types),
+ 'compression_algorithms': JList(six.string_types),
+ 'compression_modes': JList(six.string_types),
+ 'is_all_bluestore': bool,
+ 'osd_count': int,
+ 'crush_rules_replicated': JList(JObj({}, allow_unknown=True)),
+ 'crush_rules_erasure': JList(JObj({}, allow_unknown=True)),
+ }))
return CephService.send_command('mon', 'osd pool delete', pool=pool_name, pool2=pool_name,
sure='--yes-i-really-really-mean-it')
+ def set(self, pool_name, flags=None, application_metadata=None, **kwargs):
+ self._set_pool_values(pool_name, application_metadata, flags, True, kwargs)
+
@handle_send_command_error('pool')
def create(self, pool, pg_num, pool_type, erasure_code_profile=None, flags=None,
application_metadata=None, rule_name=None, **kwargs):
pgp_num=int(pg_num), pool_type=pool_type, erasure_code_profile=ecp,
rule=rule_name)
+ self._set_pool_values(pool, application_metadata, flags, False, kwargs)
+
+ def _set_pool_values(self, pool, application_metadata, flags, update_existing, kwargs):
if flags and 'ec_overwrites' in flags:
CephService.send_command('mon', 'osd pool set', pool=pool, var='allow_ec_overwrites',
val='true')
-
if application_metadata:
- for app in application_metadata:
- CephService.send_command('mon', 'osd pool application enable', pool=pool, app=app,
+ def set_app(what, app):
+ CephService.send_command('mon', 'osd pool application ' + what, pool=pool, app=app,
force='--yes-i-really-mean-it')
+ if update_existing:
+ original_app_metadata = set(
+ self.get(pool, 'application_metadata')['application_metadata'])
+ else:
+ original_app_metadata = set()
+
+ for app in original_app_metadata - set(application_metadata):
+ set_app('disable', app)
+ for app in set(application_metadata) - original_app_metadata:
+ set_app('enable', app)
+
for key, value in kwargs.items():
CephService.send_command('mon', 'osd pool set', pool=pool, var=key, val=value)