Fix pylint issues.
Refactor JwtManager check to avoid code duplication (found by pylint
when --jobs=1).
Fix issue with DashboardException.code property, using abs() on
potentially None attribute.
Disables Doctests in services/rbd.py that are actually integration tests
(they check the value of rbd.RBD_FEATURES_NAME_MAPPING). Ideally, these
kind of tests should be explicitly executed in an integration testing
stage, rather that unit-testing. Disabled tests have been prepended with
@DISABLEDOCTEST token.
Fixes: https://tracker.ceph.com/issues/40487
Signed-off-by: Ernesto Puerta <epuertat@redhat.com>
if not isinstance(description, str):
raise Exception("%s has been called with a description that is not a string: %s"
% (EndpointDoc.__name__, description))
- elif not isinstance(group, str):
+ if not isinstance(group, str):
raise Exception("%s has been called with a groupname that is not a string: %s"
% (EndpointDoc.__name__, group))
- elif parameters and not isinstance(parameters, dict):
+ if parameters and not isinstance(parameters, dict):
raise Exception("%s has been called with parameters that is not a dict: %s"
% (EndpointDoc.__name__, parameters))
- elif responses and not isinstance(responses, dict):
+ if responses and not isinstance(responses, dict):
raise Exception("%s has been called with responses that is not a dict: %s"
% (EndpointDoc.__name__, responses))
from __future__ import absolute_import
import cherrypy
-import jwt
from . import ApiController, RESTController
from .. import logger, mgr
from ..exceptions import DashboardException
from ..services.auth import AuthManager, JwtManager
-from ..services.access_control import UserDoesNotExist
@ApiController('/auth', secure=False)
@RESTController.Collection('POST')
def check(self, token):
if token:
- try:
- token = JwtManager.decode_token(token)
- if not JwtManager.is_blacklisted(token['jti']):
- user = AuthManager.get_user(token['username'])
- if user.lastUpdate <= token['iat']:
- return {
- 'username': user.username,
- 'permissions': user.permissions_dict(),
- 'sso': mgr.SSO_DB.protocol == 'saml2'
- }
-
- logger.debug("AMT: user info changed after token was"
- " issued, iat=%s lastUpdate=%s",
- token['iat'], user.lastUpdate)
- else:
- logger.debug('AMT: Token is black-listed')
- except jwt.exceptions.ExpiredSignatureError:
- logger.debug("AMT: Token has expired")
- except jwt.exceptions.InvalidTokenError:
- logger.debug("AMT: Failed to decode token")
- except UserDoesNotExist:
- logger.debug("AMT: Invalid token: user %s does not exist",
- token['username'])
+ user = JwtManager.get_user(token)
+ if user:
+ return {
+ 'username': user.username,
+ 'permissions': user.permissions_dict(),
+ 'sso': mgr.SSO_DB.protocol == 'saml2'
+ }
return {
- 'login_url': self._get_login_url()
+ 'login_url': self._get_login_url(),
}
from . import ApiController, Endpoint, Task, BaseController, ReadPermission, \
RESTController
+from .rbd import _rbd_call
+
from .. import logger, mgr
from ..security import Scope
from ..services.ceph_service import CephService
return composed_decorator
-def _rbd_call(pool_name, func, *args, **kwargs):
- with mgr.rados.open_ioctx(pool_name) as ioctx:
- func(ioctx, *args, **kwargs)
-
-
@ViewCache()
def get_daemons_and_pools(): # pylint: disable=R0915
def get_daemons():
token = token.decode('utf-8')
logger.debug("JWT Token: %s", token)
raise cherrypy.HTTPRedirect("{}/#/login?access_token={}".format(url_prefix, token))
- else:
- return {
- 'is_authenticated': auth.is_authenticated(),
- 'errors': errors,
- 'reason': auth.get_last_error_reason()
- }
+
+ return {
+ 'is_authenticated': auth.is_authenticated(),
+ 'errors': errors,
+ 'reason': auth.get_last_error_reason()
+ }
@Endpoint(xml=True)
def metadata(self):
def code(self):
if self._code:
return str(self._code)
- return str(abs(self.errno))
+ return str(abs(self.errno)) if self.errno is not None else 'Error'
# access control module exceptions
PLUGIN_MANAGER = DashboardPluginManager("ceph-mgr.dashboard")
# Load all interfaces and their hooks
-from . import interfaces
+from . import interfaces # pylint: disable=wrong-import-position,cyclic-import
STATUS = 'status'
+# pylint: disable=too-many-ancestors
@PM.add_plugin
class FeatureToggles(I.CanMgr, I.CanLog, I.Setupable, I.HasOptions,
I.HasCommands, I.FilterRequest.BeforeHandler,
@PM.add_hook
def setup(self):
+ # pylint: disable=attribute-defined-outside-init
self.Controller2Feature = {
controller: feature
for feature, controllers in Feature2Controller.items()
@ApiController('/feature_toggles')
class FeatureTogglesEndpoint(RESTController):
- def list(_):
+ def list(_): # pylint: disable=no-self-argument
return {
+ # pylint: disable=protected-access
feature.value: self._is_feature_enabled(feature)
for feature in Features
}
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-from . import PLUGIN_MANAGER as PM, Interface
+from . import PLUGIN_MANAGER as PM, Interface # pylint: disable=cyclic-import
class CanMgr(Interface):
Placeholder for plugin setup, right after server start.
CanMgr.mgr and CanLog.log are initialized by then.
"""
- pass
@PM.add_interface
class HasOptions(Interface):
@PM.add_abcspec
- def get_options(self): pass
+ def get_options(self):
+ pass
@PM.add_interface
class HasCommands(Interface):
@PM.add_abcspec
- def register_commands(self): pass
+ def register_commands(self):
+ pass
@PM.add_interface
class HasControllers(Interface):
@PM.add_abcspec
- def get_controllers(self): pass
+ def get_controllers(self):
+ pass
-class FilterRequest:
+class FilterRequest(object):
@PM.add_interface
class BeforeHandler(Interface):
@PM.add_abcspec
- def filter_request_before_handler(self, request): pass
+ def filter_request_before_handler(self, request):
+ pass
cache = OrderedDict()
stats = [0, 0]
rlock = RLock()
- setattr(
- function,
- 'cache_info',
- lambda:
- "hits={}, misses={}, maxsize={}, currsize={}".format(
- stats[0], stats[1], maxsize, len(cache)))
+ setattr(function, 'cache_info', lambda:
+ "hits={}, misses={}, maxsize={}, currsize={}".format(
+ stats[0], stats[1], maxsize, len(cache)))
@wraps(function)
def wrapper(*args, **kwargs):
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
-"""
-
+"""\
"""
CAVEAT:
This is a minimal implementation of python-pluggy (based on 0.8.0 interface:
def add_hookspecs(self, module_or_class):
""" Dummy method"""
- pass
- def register(self, plugin, name=None):
+ def register(self, plugin, name=None): # pylint: disable=unused-argument
for attr in dir(plugin):
if self.parse_hookimpl_opts(plugin, attr) is not None:
+ # pylint: disable=protected-access
self.hook._add_hookimpl(attr, getattr(plugin, attr))
cache = OrderedDict()
stats = [0, 0, 0]
rlock = RLock()
- setattr(
- function,
- 'cache_info',
- lambda:
- "hits={}, misses={}, expired={}, maxsize={}, currsize={}".format(
- stats[0], stats[1], stats[2], maxsize, len(cache)))
+ setattr(function, 'cache_info', lambda:
+ "hits={}, misses={}, expired={}, maxsize={}, currsize={}".format(
+ stats[0], stats[1], stats[2], maxsize, len(cache)))
@wraps(function)
def wrapper(*args, **kwargs):
return None
@classmethod
- def set_user(cls, token):
- cls.LOCAL_USER.username = token['username']
+ def set_user(cls, username):
+ cls.LOCAL_USER.username = username
@classmethod
def reset_user(cls):
- cls.set_user({'username': None, 'permissions': None})
+ cls.set_user(None)
@classmethod
def get_username(cls):
return getattr(cls.LOCAL_USER, 'username', None)
+ @classmethod
+ def get_user(cls, token):
+ try:
+ dtoken = JwtManager.decode_token(token)
+ if not JwtManager.is_blacklisted(dtoken['jti']):
+ user = AuthManager.get_user(dtoken['username'])
+ if user.lastUpdate <= dtoken['iat']:
+ return user
+ logger.debug("AMT: user info changed after token was issued, iat=%s lastUpdate=%s",
+ dtoken['iat'], user.lastUpdate)
+ else:
+ logger.debug('AMT: Token is black-listed')
+ except jwt.exceptions.ExpiredSignatureError:
+ logger.debug("AMT: Token has expired")
+ except jwt.exceptions.InvalidTokenError:
+ logger.debug("AMT: Failed to decode token")
+ except UserDoesNotExist:
+ logger.debug("AMT: Invalid token: user %s does not exist", dtoken['username'])
+ return None
+
@classmethod
def blacklist_token(cls, token):
token = jwt.decode(token, verify=False)
token = JwtManager.get_token_from_header()
logger.debug("AMT: token: %s", token)
if token:
- try:
- token = JwtManager.decode_token(token)
- if not JwtManager.is_blacklisted(token['jti']):
- user = AuthManager.get_user(token['username'])
- if user.lastUpdate <= token['iat']:
- self._check_authorization(token)
- return
-
- logger.debug("AMT: user info changed after token was"
- " issued, iat=%s lastUpdate=%s",
- token['iat'], user.lastUpdate)
- else:
- logger.debug('AMT: Token is black-listed')
- except jwt.exceptions.ExpiredSignatureError:
- logger.debug("AMT: Token has expired")
- except jwt.exceptions.InvalidTokenError:
- logger.debug("AMT: Failed to decode token")
- except UserDoesNotExist:
- logger.debug("AMT: Invalid token: user %s does not exist",
- token['username'])
-
+ user = JwtManager.get_user(token)
+ if user:
+ self._check_authorization(user.username)
+ return
logger.debug('AMT: Unauthorized access to %s',
cherrypy.url(relative='server'))
raise cherrypy.HTTPError(401, 'You are not authorized to access '
'that resource')
- def _check_authorization(self, token):
+ def _check_authorization(self, username):
logger.debug("AMT: checking authorization...")
- username = token['username']
+ username = username
handler = cherrypy.request.handler.callable
controller = handler.__self__
sec_scope = getattr(controller, '_security_scope', None)
sec_perms = getattr(handler, '_security_permissions', None)
- JwtManager.set_user(token)
+ JwtManager.set_user(username)
if not sec_scope:
# controller does not define any authorization restrictions
kwargs)
logger.error(msg)
raise SendCommandError(outs, prefix, argdict, r)
- else:
- try:
- return json.loads(outb)
- except Exception: # pylint: disable=broad-except
- return outb
+
+ try:
+ return json.loads(outb)
+ except Exception: # pylint: disable=broad-except
+ return outb
@classmethod
def get_rates(cls, svc_type, svc_name, path):
raise NFSException('Cannot create bucket "{}" as it already '
'exists, and belongs to other user.'
.format(path))
- else:
- raise exp
+ raise exp
if not exists:
logger.info('Creating new RGW bucket "%s" for user "%s"', path,
self.rgw_user_id)
"""
Formats the bitmask:
- >>> format_bitmask(45)
+ @DISABLEDOCTEST: >>> format_bitmask(45)
['deep-flatten', 'exclusive-lock', 'layering', 'object-map']
"""
names = [val for key, val in RBD_FEATURES_NAME_MAPPING.items()
"""
Converts the features list to bitmask:
- >>> format_features(['deep-flatten', 'exclusive-lock', 'layering', 'object-map'])
+ @DISABLEDOCTEST: >>> format_features(['deep-flatten', 'exclusive-lock',
+ 'layering', 'object-map'])
45
- >>> format_features(None) is None
+ @DISABLEDOCTEST: >>> format_features(None) is None
True
- >>> format_features('deep-flatten, exclusive-lock')
+ @DISABLEDOCTEST: >>> format_features('deep-flatten, exclusive-lock')
32
"""
if isinstance(features, six.string_types):
elif method == 'DELETE':
self.status = '204 No Content'
return
+
+ if 'status' in thread.res_task['exception']:
+ self.status = thread.res_task['exception']['status']
else:
- if 'status' in thread.res_task['exception']:
- self.status = thread.res_task['exception']['status']
- else:
- self.status = 500
- self.body = json.dumps(thread.res_task['exception'])
- return
+ self.status = 500
+ self.body = json.dumps(thread.res_task['exception'])
+ return
def _task_post(self, url, data=None, timeout=60):
self._task_request('POST', url, data, timeout)