}))
@contextmanager
- def __create_pool(self, name, data=None):
- pool_data = data or {
+ def __yield_pool(self, name=None, data=None, deletion_name=None):
+ """
+ Use either just a name or whole description of a pool to create one.
+ This also validates the correct creation and deletion after the pool was used.
+
+ :param name: Name of the pool
+ :param data: Describes the pool in full length
+ :param deletion_name: Only needed if the pool was renamed
+ :return:
+ """
+ data = self._create_pool(name, data)
+ yield data
+ self._delete_pool(deletion_name or data['pool'])
+
+ def _create_pool(self, name, data):
+ data = data or {
'pool': name,
'pg_num': '4',
'pool_type': 'replicated',
'rbd_qos_iops_limit': 5000,
}
}
- self._task_post('/api/pool/', pool_data)
+ self._task_post('/api/pool/', data)
self.assertStatus(201)
- time.sleep(5)
- self._validate_pool_properties(pool_data, self._get_pool(name))
- yield pool_data
+ self._validate_pool_properties(data, self._get_pool(data['pool']))
+ return data
+
+ def _delete_pool(self, name):
self._task_delete('/api/pool/' + name)
self.assertStatus(204)
for p in ['pg_num', pgp_prop]: # Should have the same values
self.assertEqual(pool[p], int(value), '{}: {} != {}'.format(p, pool[p], value))
- @classmethod
- def tearDownClass(cls):
- super(PoolTest, cls).tearDownClass()
- for name in ['dashboard_pool1', 'dashboard_pool2', 'dashboard_pool3']:
- cls._ceph_cmd(['osd', 'pool', 'delete', name, name, '--yes-i-really-really-mean-it'])
- cls._ceph_cmd(['osd', 'erasure-code-profile', 'rm', 'ecprofile'])
-
@DashboardTestCase.RunAs('test', 'test', [{'pool': ['create', 'update', 'delete']}])
def test_read_access_permissions(self):
self._get('/api/pool')
self.assertNotIn('flags_names', pool)
self.assertSchema(pool['configuration'], self.pool_rbd_conf_schema)
- def test_pool_create(self):
- self._ceph_cmd(['osd', 'crush', 'rule', 'create-erasure', 'ecrule'])
- self._ceph_cmd(
- ['osd', 'erasure-code-profile', 'set', 'ecprofile', 'crush-failure-domain=osd'])
-
- pool = {
+ def test_pool_create_with_two_applications(self):
+ self.__yield_pool(None, {
'pool': 'dashboard_pool1',
'pg_num': '10',
'pool_type': 'replicated',
'application_metadata': ['rbd', 'sth'],
- }
- self._task_post('/api/pool/', pool)
- self.assertStatus(201)
- self._validate_pool_properties(pool, self._get_pool(pool['pool']))
- self._task_delete("/api/pool/" + pool['pool'])
- self.assertStatus(204)
+ })
- pool = {
+ def test_pool_create_with_ecp_and_rule(self):
+ self._ceph_cmd(['osd', 'crush', 'rule', 'create-erasure', 'ecrule'])
+ self._ceph_cmd(
+ ['osd', 'erasure-code-profile', 'set', 'ecprofile', 'crush-failure-domain=osd'])
+ self.__yield_pool(None, {
'pool': 'dashboard_pool2',
'pg_num': '10',
'pool_type': 'erasure',
'application_metadata': ['rbd'],
'erasure_code_profile': 'ecprofile',
'crush_rule': 'ecrule',
- }
- self._task_post('/api/pool/', pool)
- self.assertStatus(201)
- self._validate_pool_properties(pool, self._get_pool(pool['pool']))
- self._task_delete("/api/pool/" + pool['pool'])
- self.assertStatus(204)
+ })
+ self._ceph_cmd(['osd', 'erasure-code-profile', 'rm', 'ecprofile'])
+ def test_pool_create_with_compression(self):
pool = {
'pool': 'dashboard_pool3',
'pg_num': '10',
'rbd_qos_iops_limit': None,
},
}
- expected_configuration = [{
- 'name': 'rbd_qos_bps_limit',
- 'source': 1,
- 'value': '2048',
- }, {
- 'name': 'rbd_qos_iops_limit',
- 'source': 0,
- 'value': '0',
- }]
- self._task_post('/api/pool/', pool)
- self.assertStatus(201)
- new_pool = self._get_pool(pool['pool'])
- self._validate_pool_properties(pool, new_pool)
- for conf in expected_configuration:
- self.assertIn(conf, new_pool['configuration'])
-
- self._task_delete("/api/pool/" + pool['pool'])
- self.assertStatus(204)
+ with self.__yield_pool(None, pool):
+ expected_configuration = [{
+ 'name': 'rbd_qos_bps_limit',
+ 'source': 1,
+ 'value': '2048',
+ }, {
+ 'name': 'rbd_qos_iops_limit',
+ 'source': 0,
+ 'value': '0',
+ }]
+ new_pool = self._get_pool(pool['pool'])
+ for conf in expected_configuration:
+ self.assertIn(conf, new_pool['configuration'])
def test_pool_create_with_quotas(self):
pools = [
for pool in pools:
pool_name = pool['pool_data']['pool']
- with self.__create_pool(pool_name, pool['pool_data']):
+ with self.__yield_pool(pool_name, pool['pool_data']):
self._validate_pool_properties(pool['pool_quotas_to_check'],
self._get_pool(pool_name))
def test_pool_update_metadata(self):
pool_name = 'pool_update_metadata'
- with self.__create_pool(pool_name):
+ with self.__yield_pool(pool_name):
props = {'application_metadata': ['rbd', 'sth']}
self._task_put('/api/pool/{}'.format(pool_name), props)
time.sleep(5)
def test_pool_update_configuration(self):
pool_name = 'pool_update_configuration'
- with self.__create_pool(pool_name):
+ with self.__yield_pool(pool_name):
configuration = {
'rbd_qos_bps_limit': 1024,
'rbd_qos_iops_limit': None,
def test_pool_update_compression(self):
pool_name = 'pool_update_compression'
- with self.__create_pool(pool_name):
+ with self.__yield_pool(pool_name):
properties = {
'compression_algorithm': 'zstd',
'compression_mode': 'aggressive',
def test_pool_update_unset_compression(self):
pool_name = 'pool_update_unset_compression'
- with self.__create_pool(pool_name):
+ with self.__yield_pool(pool_name):
self._task_put('/api/pool/' + pool_name, {'compression_mode': 'unset'})
time.sleep(5)
self._validate_pool_properties({
def test_pool_update_quotas(self):
pool_name = 'pool_update_quotas'
- with self.__create_pool(pool_name):
+ with self.__yield_pool(pool_name):
properties = {
'quota_max_objects': 1024,
'quota_max_bytes': 1000,