})
self.delete_user('admin2')
+ def test_lockout_user(self):
+ self._ceph_cmd(['dashboard', 'set-account-lockout-attempts', '3'])
+ for _ in range(3):
+ self._post("/api/auth", {'username': 'admin', 'password': 'inval'})
+ self._post("/api/auth", {'username': 'admin', 'password': 'admin'})
+ self.assertStatus(400)
+ self.assertJsonBody({
+ "component": "auth",
+ "code": "invalid_credentials",
+ "detail": "Invalid credentials"
+ })
+ self._ceph_cmd(['dashboard', 'ac-user-enable', 'admin'])
+ self._post("/api/auth", {'username': 'admin', 'password': 'admin'})
+ self.assertStatus(201)
+ data = self.jsonBody()
+ self.assertSchema(data, JObj(sub_elems={
+ 'token': JLeaf(str),
+ 'username': JLeaf(str),
+ 'permissions': JObj(sub_elems={}, allow_unknown=True),
+ 'sso': JLeaf(bool),
+ 'pwdExpirationDate': JLeaf(int, none=True),
+ 'pwdUpdateRequired': JLeaf(bool)
+ }, allow_unknown=False))
+ self._validate_jwt_token(data['token'], "admin", data['permissions'])
+
def test_logout(self):
self._post("/api/auth", {'username': 'admin', 'password': 'admin'})
self.assertStatus(201)
import cherrypy
from .. import mgr
-from ..exceptions import DashboardException
+from ..exceptions import InvalidCredentialsError, UserDoesNotExist
from ..services.auth import AuthManager, JwtManager
+from ..settings import Settings
from . import ApiController, ControllerDoc, EndpointDoc, RESTController, allow_empty_body
logger = logging.getLogger('controllers.auth')
"""
Provide authenticates and returns JWT token.
"""
-
def create(self, username, password):
user_data = AuthManager.authenticate(username, password)
user_perms, pwd_expiration_date, pwd_update_required = None, None, None
- if user_data:
- user_perms = user_data.get('permissions')
- pwd_expiration_date = user_data.get('pwdExpirationDate', None)
- pwd_update_required = user_data.get('pwdUpdateRequired', False)
-
- if user_perms is not None:
- logger.debug('Login successful')
- token = JwtManager.gen_token(username)
- token = token.decode('utf-8')
- cherrypy.response.headers['Authorization'] = "Bearer: {}".format(token)
- return {
- 'token': token,
- 'username': username,
- 'permissions': user_perms,
- 'pwdExpirationDate': pwd_expiration_date,
- 'sso': mgr.SSO_DB.protocol == 'saml2',
- 'pwdUpdateRequired': pwd_update_required
- }
+ max_attempt = Settings.ACCOUNT_LOCKOUT_ATTEMPTS
+ if max_attempt == 0 or mgr.ACCESS_CTRL_DB.get_attempt(username) < max_attempt:
+ if user_data:
+ user_perms = user_data.get('permissions')
+ pwd_expiration_date = user_data.get('pwdExpirationDate', None)
+ pwd_update_required = user_data.get('pwdUpdateRequired', False)
- logger.debug('Login failed')
- raise DashboardException(msg='Invalid credentials',
- code='invalid_credentials',
- component='auth')
+ if user_perms is not None:
+ logger.info('Login successful: %s', username)
+ mgr.ACCESS_CTRL_DB.reset_attempt(username)
+ mgr.ACCESS_CTRL_DB.save()
+ token = JwtManager.gen_token(username)
+ token = token.decode('utf-8')
+ cherrypy.response.headers['Authorization'] = "Bearer: {}".format(token)
+ return {
+ 'token': token,
+ 'username': username,
+ 'permissions': user_perms,
+ 'pwdExpirationDate': pwd_expiration_date,
+ 'sso': mgr.SSO_DB.protocol == 'saml2',
+ 'pwdUpdateRequired': pwd_update_required
+ }
+ mgr.ACCESS_CTRL_DB.increment_attempt(username)
+ mgr.ACCESS_CTRL_DB.save()
+ else:
+ try:
+ user = mgr.ACCESS_CTRL_DB.get_user(username)
+ user.enabled = False
+ mgr.ACCESS_CTRL_DB.save()
+ logging.warning('Maximum number of unsuccessful log-in attempts '
+ '(%d) reached for '
+ 'username "%s" so the account was blocked. '
+ 'An administrator will need to re-enable the account',
+ max_attempt, username)
+ raise InvalidCredentialsError
+ except UserDoesNotExist:
+ raise InvalidCredentialsError
+ logger.info('Login failed: %s', username)
+ raise InvalidCredentialsError
@RESTController.Collection('POST')
@allow_empty_body
self.password = password
self.name = name
self.email = email
+ self.invalid_auth_attempt = 0
if roles is None:
self.roles = set()
else:
self.set_password_hash(password_hash(password))
def set_password_hash(self, hashed_password):
+ self.invalid_auth_attempt = 0
self.password = hashed_password
self.refresh_last_update()
self.refresh_pwd_expiration_date()
raise RoleDoesNotExist(name)
return self.roles[name]
+ def increment_attempt(self, username):
+ with self.lock:
+ if username in self.users:
+ self.users[username].invalid_auth_attempt += 1
+
+ def reset_attempt(self, username):
+ with self.lock:
+ if username in self.users:
+ self.users[username].invalid_auth_attempt = 0
+
+ def get_attempt(self, username):
+ with self.lock:
+ try:
+ return self.users[username].invalid_auth_attempt
+ except KeyError:
+ return 0
+
def delete_role(self, name):
with self.lock:
if name not in self.roles:
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
user.enabled = True
+ mgr.ACCESS_CTRL_DB.reset_attempt(username)
mgr.ACCESS_CTRL_DB.save()
return 0, json.dumps(user.to_dict()), ''