self._task_delete("/api/pool/" + pool['pool'])
self.assertStatus(204)
+ def test_pool_create_with_quotas(self):
+ pools = [
+ {
+ 'pool_data': {
+ 'pool': 'dashboard_pool_quota1',
+ 'pg_num': '10',
+ 'pool_type': 'replicated',
+ },
+ 'pool_quotas_to_check': {
+ 'quota_max_objects': 0,
+ 'quota_max_bytes': 0,
+ }
+ },
+ {
+ 'pool_data': {
+ 'pool': 'dashboard_pool_quota2',
+ 'pg_num': '10',
+ 'pool_type': 'replicated',
+ 'quota_max_objects': 1024,
+ 'quota_max_bytes': 1000,
+ },
+ 'pool_quotas_to_check': {
+ 'quota_max_objects': 1024,
+ 'quota_max_bytes': 1000,
+ }
+ }
+ ]
+
+ for pool in pools:
+ pool_name = pool['pool_data']['pool']
+ with self.__create_pool(pool_name, pool['pool_data']):
+ self._validate_pool_properties(pool['pool_quotas_to_check'],
+ self._get_pool(pool_name))
+
def test_pool_update_metadata(self):
pool_name = 'pool_update_metadata'
with self.__create_pool(pool_name):
'compression_required_ratio': None,
}, self._get_pool(pool_name))
+ def test_pool_update_quotas(self):
+ pool_name = 'pool_update_quotas'
+ with self.__create_pool(pool_name):
+ properties = {
+ 'quota_max_objects': 1024,
+ 'quota_max_bytes': 1000,
+ }
+ self._task_put('/api/pool/' + pool_name, properties)
+ time.sleep(5)
+ self._validate_pool_properties(properties, self._get_pool(pool_name))
+
def test_pool_create_fail(self):
data = {'pool_type': u'replicated', 'rule_name': u'dnf', 'pg_num': u'8', 'pool': u'sadfs'}
self._task_post('/api/pool/', data)
def set_key(key, value):
CephService.send_command('mon', 'osd pool set', pool=pool, var=key, val=str(value))
+ quotas = {}
+ quotas['max_objects'] = kwargs.pop('quota_max_objects', None)
+ quotas['max_bytes'] = kwargs.pop('quota_max_bytes', None)
+ self._set_quotas(pool, quotas)
+
for key, value in kwargs.items():
if key == 'pool':
update_name = True
if update_name:
CephService.send_command('mon', 'osd pool rename', srcpool=pool, destpool=destpool)
+ def _set_quotas(self, pool, quotas):
+ for field, value in quotas.items():
+ if value is not None:
+ CephService.send_command('mon', 'osd pool set-quota',
+ pool=pool, field=field, val=str(value))
+
def _handle_update_compression_args(self, options, kwargs):
if kwargs.get('compression_mode') == 'unset' and options is not None:
def reset_arg(arg, value):