self._delete('/api/user/user1')
self.assertStatus(204)
+ def test_crd_disabled_user(self):
+ self._create_user(username='klara',
+ password='123456789',
+ name='Klara Musterfrau',
+ email='klara@musterfrau.com',
+ roles=['administrator'],
+ enabled=False)
+ self.assertStatus(201)
+ user = self.jsonBody()
+
+ # Restart dashboard module.
+ self._unload_module('dashboard')
+ self._load_module('dashboard')
+
+ self._get('/api/user/klara')
+ self.assertStatus(200)
+ self.assertJsonBody({
+ 'username': 'klara',
+ 'name': 'Klara Musterfrau',
+ 'email': 'klara@musterfrau.com',
+ 'roles': ['administrator'],
+ 'lastUpdate': user['lastUpdate'],
+ 'enabled': False
+ })
+
+ self._delete('/api/user/klara')
+ self.assertStatus(204)
+
def test_list_users(self):
self._get('/api/user')
self.assertStatus(200)
cls.setup_mgrs()
+ @classmethod
+ def _unload_module(cls, module_name):
+ def is_disabled():
+ enabled_modules = json.loads(cls.mgr_cluster.mon_manager.raw_cluster_cmd(
+ 'mgr', 'module', 'ls'))['enabled_modules']
+ return module_name not in enabled_modules
+
+ if is_disabled():
+ return
+
+ log.info("Unloading Mgr module %s ...", module_name)
+ cls.mgr_cluster.mon_manager.raw_cluster_cmd('mgr', 'module', 'disable', module_name)
+ cls.wait_until_true(is_disabled, timeout=30)
+
@classmethod
def _load_module(cls, module_name):
loaded = json.loads(cls.mgr_cluster.mon_manager.raw_cluster_cmd(
- "mgr", "module", "ls"))['enabled_modules']
+ "mgr", "module", "ls"))['enabled_modules']
if module_name in loaded:
# The enable command is idempotent, but our wait for a restart
# isn't, so let's return now if it's already loaded
# check if the the module is configured as an always on module
mgr_daemons = json.loads(cls.mgr_cluster.mon_manager.raw_cluster_cmd(
- "mgr", "metadata"))
+ "mgr", "metadata"))
for daemon in mgr_daemons:
if daemon["name"] == initial_mgr_map["active_name"]:
if module_name in always_on:
return
+ log.info("Loading Mgr module %s ...", module_name)
initial_gid = initial_mgr_map['active_gid']
- cls.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "enable",
- module_name, "--force")
+ cls.mgr_cluster.mon_manager.raw_cluster_cmd(
+ "mgr", "module", "enable", module_name, "--force")
# Wait for the module to load
def has_restarted():
done = mgr_map['active_gid'] != initial_gid and mgr_map['available']
if done:
log.info("Restarted after module load (new active {0}/{1})".format(
- mgr_map['active_name'] , mgr_map['active_gid']))
+ mgr_map['active_name'], mgr_map['active_gid']))
return done
cls.wait_until_true(has_restarted, timeout=30)
def from_dict(cls, u_dict, roles):
return User(u_dict['username'], u_dict['password'], u_dict['name'],
u_dict['email'], {roles[r] for r in u_dict['roles']},
- u_dict['lastUpdate'])
+ u_dict['lastUpdate'], u_dict['enabled'])
class AccessControlDB(object):
self.assertEqual(str(ctx.exception),
"Cannot update system role 'read-only'")
- def test_create_user(self, username='admin', rolename=None):
+ def test_create_user(self, username='admin', rolename=None, enabled=True):
user = self.exec_cmd('ac-user-create', username=username,
rolename=rolename, password='admin',
name='{} User'.format(username),
- email='{}@user.com'.format(username))
+ email='{}@user.com'.format(username),
+ enabled=enabled)
pass_hash = password_hash('admin', user['password'])
self.assertDictEqual(user, {
'name': '{} User'.format(username),
'email': '{}@user.com'.format(username),
'roles': [rolename] if rolename else [],
- 'enabled': True
+ 'enabled': enabled
})
self.validate_persistent_user(username, [rolename] if rolename else [],
pass_hash, '{} User'.format(username),
'{}@user.com'.format(username),
- user['lastUpdate'], True)
+ user['lastUpdate'], enabled)
return user
+ def test_create_disabled_user(self):
+ self.test_create_user(enabled=False)
+
def test_create_user_with_role(self):
self.test_add_role_scope_perms()
self.test_create_user(rolename='test_role')