%if 0%{?fedora} || 0%{?rhel} >= 9
Requires: python%{python3_pkgversion}-grpcio
Requires: python%{python3_pkgversion}-grpcio-tools
+Requires: python%{python3_pkgversion}-jmespath
%endif
%if 0%{?fedora} || 0%{?rhel} || 0%{?openEuler}
Requires: python%{python3_pkgversion}-cherrypy
python3-coverage <pkg.ceph.check>,
python3-dateutil <pkg.ceph.check>,
python3-grpcio <pkg.ceph.check>,
+ python3-jmespath (>=0.10) <pkg.ceph.check>,
python3-openssl <pkg.ceph.check>,
python3-prettytable <pkg.ceph.check>,
python3-requests <pkg.ceph.check>,
jsonpatch
grpcio==1.46.5
grpcio-tools==1.46.5
+jmespath
r_dict['scopes_permissions'])
@classmethod
- def map_to_system_roles(cls, roles) -> List['Role']:
+ def map_to_system_roles(cls, roles: List[str]) -> List['Role']:
matches = []
- for rn in SYSTEM_ROLES_NAMES:
+ for sys_role in ROLE_MAPPER:
for role in roles:
- if role in SYSTEM_ROLES_NAMES[rn]:
- matches.append(rn)
+ if role in ROLE_MAPPER[sys_role]:
+ matches.append(sys_role)
return matches
}
# static name-like roles list for role mapping
-SYSTEM_ROLES_NAMES = {
+ROLE_MAPPER = {
ADMIN_ROLE: [ADMIN_ROLE.name, 'admin'],
- READ_ONLY_ROLE: [READ_ONLY_ROLE.name, 'read', 'guest', 'monitor']
+ READ_ONLY_ROLE: [READ_ONLY_ROLE.name, 'read', 'guest', 'monitor'],
+ BLOCK_MGR_ROLE: [BLOCK_MGR_ROLE.name, 'block', 'rbd'],
+ RGW_MGR_ROLE: [RGW_MGR_ROLE.name, 'object', 'rgw'],
+ CLUSTER_MGR_ROLE: [CLUSTER_MGR_ROLE.name, 'cluster'],
+ POOL_MGR_ROLE: [POOL_MGR_ROLE.name, 'pool'],
+ CEPHFS_MGR_ROLE: [CEPHFS_MGR_ROLE.name, 'cephfs'],
+ GANESHA_MGR_ROLE: [GANESHA_MGR_ROLE.name, 'ganesha', 'nfs'],
+ SMB_MGR_ROLE: [SMB_MGR_ROLE.name, 'smb']
}
+
+import importlib
import json
+import logging
from typing import Dict, List
from urllib.parse import quote
from ...tools import prepare_url_prefix
from ..access_control import Role, User, UserAlreadyExists
+try:
+ jmespath = importlib.import_module("jmespath")
+except ModuleNotFoundError:
+ logging.error("Module 'jmespath' is not installed.")
+
+logger = logging.getLogger('services.oauth2')
+
class OAuth2(SSOAuth):
LOGIN_URL = 'auth/oauth2/login'
sso = True
class OAuth2Config(BaseAuth.Config):
- pass
+ roles_path: str
+
+ def __init__(self, roles_path=None):
+ self.roles_path = roles_path
+
+ def get_roles_path(self):
+ return self.roles_path
@staticmethod
def enabled():
return mgr.get_module_option('sso_oauth2')
- def to_dict(self) -> 'BaseAuth.Config':
- return self.OAuth2Config()
+ def to_dict(self) -> 'OAuth2Config':
+ return {
+ 'roles_path': self.roles_path
+ }
@classmethod
def from_dict(cls, s_dict: OAuth2Config) -> 'OAuth2':
- # pylint: disable=unused-argument
- return OAuth2()
+ try:
+ return OAuth2(s_dict['roles_path'])
+ except KeyError:
+ return OAuth2({})
@classmethod
def get_auth_name(cls):
@classmethod
def get_user_roles(cls):
- roles: List[Role] = []
+ roles: List[str] = []
user_roles: List[Role] = []
try:
jwt_payload = cherrypy.request.jwt_payload
except AttributeError:
- raise cherrypy.HTTPError()
-
- # check for client roes
- if 'resource_access' in jwt_payload:
- # Find the first value where the key is not 'account'
- roles = next((value['roles'] for key, value in jwt_payload['resource_access'].items()
- if key != "account"), user_roles)
- # check for global roles
- elif 'realm_access' in jwt_payload:
- roles = next((value['roles'] for _, value in jwt_payload['realm_access'].items()),
- user_roles)
+ raise cherrypy.HTTPError(401)
+
+ if jmespath and hasattr(mgr.SSO_DB.config, 'roles_path'):
+ logger.debug("Using 'roles_path' to fetch roles")
+ roles = jmespath.search(mgr.SSO_DB.config.roles_path, jwt_payload)
+ # e.g Keycloak
+ elif 'resource_access' in jwt_payload or 'realm_access' in jwt_payload:
+ logger.debug("Using 'resource_access' or 'realm_access' to fetch roles")
+ roles = jmespath.search(
+ "resource_access.*[?@!='account'].roles[] || realm_access.roles[]",
+ jwt_payload)
+ elif 'roles' in jwt_payload:
+ logger.debug("Using 'roles' to fetch roles")
+ roles = jwt_payload['roles']
+ if isinstance(roles, str):
+ roles = [roles]
else:
- raise cherrypy.HTTPError()
- user_roles = Role.map_to_system_roles(roles)
+ raise cherrypy.HTTPError(403)
+ user_roles = Role.map_to_system_roles(roles or [])
return user_roles
@classmethod
user = mgr.ACCESS_CTRL_DB.create_user(
jwt_payload['sub'], None, jwt_payload['name'], jwt_payload['email'])
except UserAlreadyExists:
+ logger.debug("User already exists")
user = mgr.ACCESS_CTRL_DB.get_user(jwt_payload['sub'])
user.set_roles(cls.get_user_roles())
# set user last update to token time issued
# pylint: disable=too-many-return-statements,too-many-branches
import errno
+import importlib
import json
import logging
import os
import threading
import warnings
-from typing import Dict
+from typing import Dict, Optional
from urllib import parse
from mgr_module import CLIWriteCommand, HandleCommandResult
logger = logging.getLogger('sso')
+try:
+ jmespath = importlib.import_module('jmespath')
+ JMESPathError = getattr(jmespath.exceptions, "JMESPathError")
+except ModuleNotFoundError:
+ logger.error("Module 'jmespath' is not installed.")
+
try:
from onelogin.saml2.errors import OneLogin_Saml2_Error as Saml2Error
from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser as Saml2Parser
@CLIWriteCommand("dashboard sso enable oauth2")
-def enable_sso(_):
+def enable_sso(_, roles_path: Optional[str] = None):
mgr.SSO_DB.protocol = AuthType.OAUTH2
+ if jmespath and roles_path:
+ try:
+ jmespath.compile(roles_path)
+ mgr.SSO_DB.config.roles_path = roles_path
+ except (JMESPathError, SyntaxError):
+ return HandleCommandResult(stdout='Syntax invalid for "roles_path"')
mgr.SSO_DB.save()
mgr.set_module_option('sso_oauth2', True)
return HandleCommandResult(stdout='SSO is "enabled" with "OAuth2" protocol.')
types-requests
types-PyYAML
types-jwt
+ types-jmespath
commands =
mypy --config-file=../../mypy.ini \
-m alerts \