self.assertEqual(pool['type'], data['pool_type'])
elif k == 'pg_num':
self.assertEqual(pool[k], int(v), '{}: {} != {}'.format(k, pool[k], v))
+ k = 'pg_placement_num' # Should have the same value as pg_num
+ self.assertEqual(pool[k], int(v), '{}: {} != {}'.format(k, pool[k], v))
elif k == 'application_metadata':
self.assertIsInstance(pool[k], list)
self.assertEqual(pool[k],
if flags and 'ec_overwrites' in flags:
CephService.send_command('mon', 'osd pool set', pool=pool, var='allow_ec_overwrites',
val='true')
- if application_metadata:
+ if application_metadata is not None:
def set_app(what, app):
CephService.send_command('mon', 'osd pool application ' + what, pool=pool, app=app,
force='--yes-i-really-mean-it')
set_app('enable', app)
for key, value in kwargs.items():
- CephService.send_command('mon', 'osd pool set', pool=pool, var=key, val=value)
+ def set_key(key):
+ CephService.send_command('mon', 'osd pool set', pool=pool, var=key, val=str(value))
+
+ set_key(key)
+ if key == 'pg_num':
+ set_key('pgp_num')
@Endpoint()
@ReadPermission