name = cmd['name'] if 'name' in cmd else None
email = cmd['email'] if 'email' in cmd else None
try:
- user = ACCESS_CTRL_DB.create_user(username, password, name, email)
role = ACCESS_CTRL_DB.get_role(rolename) if rolename else None
- except UserAlreadyExists as ex:
- return -errno.EEXIST, '', str(ex)
except RoleDoesNotExist as ex:
if rolename not in SYSTEM_ROLES:
return -errno.ENOENT, '', str(ex)
role = SYSTEM_ROLES[rolename]
+ try:
+ user = ACCESS_CTRL_DB.create_user(username, password, name, email)
+ except UserAlreadyExists as ex:
+ return -errno.EEXIST, '', str(ex)
+
if role:
user.set_roles([role])
ACCESS_CTRL_DB.save()
db = json.loads(db_json)
return db
+ # The DB is written to persistent storage the first time it is saved.
+ # However, should an operation fail due to <reasons>, we may end up in
+ # a state where we have a completely empty CONFIG_KEY_DICT (our mock
+ # equivalent to the persistent state). While this works for most of the
+ # tests in this class, that would prevent us from testing things like
+ # "run a command that is expected to fail, and then ensure nothing
+ # happened", because we'd be asserting in `load_persistent_db()` due to
+ # the map being empty.
+ #
+ # This function will therefore force state to be written to our mock
+ # persistent state. We could have added this extra step to
+ # `load_persistent_db()` directly, but that would conflict with the
+ # upgrade tests. This way, we can selectively enforce this requirement
+ # where we believe it to be necessary; generically speaking, this should
+ # not be needed unless we're testing very specific behaviors.
+ #
+ def setup_and_load_persistent_db(self):
+ from ..services.access_control import ACCESS_CTRL_DB
+ ACCESS_CTRL_DB.save()
+ self.load_persistent_db()
+
def validate_persistent_role(self, rolename, scopes_permissions,
description=None):
db = self.load_persistent_db()
self.assertEqual(ctx.exception.retcode, -errno.EEXIST)
self.assertEqual(str(ctx.exception), "User 'admin' already exists")
+ def test_create_users_with_dne_role(self):
+ # one time call to setup our persistent db
+ self.setup_and_load_persistent_db()
+
+ # create a user with a role that does not exist; expect a failure
+ try:
+ self.exec_cmd('ac-user-create', username='foo',
+ rolename='dne_role', password='foopass',
+ name='foo User', email='foo@user.com')
+ except CmdException as e:
+ self.assertEqual(e.retcode, -errno.ENOENT)
+
+ db = self.load_persistent_db()
+ if 'users' in db:
+ self.assertNotIn('foo', db['users'])
+
+ # We could just finish our test here, given we ensured that the user
+ # with a non-existent role is not in persistent storage. However,
+ # we're going to test the database's consistency, making sure that
+ # side-effects are not written to persistent storage once we commit
+ # an unrelated operation. To ensure this, we'll issue another
+ # operation that is sharing the same code path, and will check whether
+ # the next operation commits dirty state.
+
+ # create a role (this will be 'test_role')
+ self.test_create_role()
+ self.exec_cmd('ac-user-create', username='bar',
+ rolename='test_role', password='barpass',
+ name='bar User', email='bar@user.com')
+
+ # validate db:
+ # user 'foo' should not exist
+ # user 'bar' should exist and have role 'test_role'
+ self.validate_persistent_user('bar', ['test_role'])
+
+ db = self.load_persistent_db()
+ self.assertIn('users', db)
+ self.assertNotIn('foo', db['users'])
+
def test_delete_nonexistent_user(self):
with self.assertRaises(CmdException) as ctx:
self.exec_cmd('ac-user-delete', username='admin')