next-method-defined,
dict-items-not-iterating,
dict-keys-not-iterating,
- dict-values-not-iterating
+ dict-values-not-iterating,
+ missing-docstring,
+ invalid-name,
+ no-self-use,
+ too-few-public-methods,
+ no-member,
+ fixme
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# -*- coding: utf-8 -*-
-from mock import Mock
class BasePyOSDMap(object):
pass
+
class BasePyOSDMapIncremental(object):
pass
+
class BasePyCRUSH(object):
pass
+
class BaseMgrStandbyModule(object):
pass
+
class BaseMgrModule(object):
+ # pylint: disable=W0613
def __init__(self, py_modules_ptr, this_ptr):
self.config_key_map = {}
def _ceph_log(self, *args):
pass
-
-
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-import bcrypt
-import cherrypy
import time
import sys
-from ..tools import ApiController, AuthRequired, RESTController
+import bcrypt
+import cherrypy
+
+from ..tools import ApiController, RESTController
@ApiController('auth')
DEFAULT_SESSION_EXPIRE = 1200.0
def __init__(self):
+ # pylint: disable=E1101
self._mod = Auth._mgr_module_
self._log = self._mod.log
cherrypy.session[Auth.SESSION_KEY_TS] = now
self._log.debug("Login successful")
return {'username': username}
- else:
- cherrypy.response.status = 403
- self._log.debug("Login fail")
- return {'detail': 'Invalid credentials'}
+
+ cherrypy.response.status = 403
+ self._log.debug("Login fail")
+ return {'detail': 'Invalid credentials'}
def bulk_delete(self):
self._log.debug("Logout successful")
salt_password = bcrypt.gensalt()
if sys.version_info > (3, 0):
return bcrypt.hashpw(password, salt_password)
- else:
- return bcrypt.hashpw(password.encode('utf8'), salt_password)
+ return bcrypt.hashpw(password.encode('utf8'), salt_password)
@staticmethod
def check_auth():
'that resource')
now = time.time()
expires = float(module.get_localized_config(
- 'session-expire',
- Auth.DEFAULT_SESSION_EXPIRE))
+ 'session-expire', Auth.DEFAULT_SESSION_EXPIRE))
if expires > 0:
username_ts = cherrypy.session.get(Auth.SESSION_KEY_TS, None)
if username_ts and float(username_ts) < (now - expires):
@AuthRequired()
class Ping(object):
@cherrypy.expose
- def default(self, *args):
+ def default(self):
return "pong"
"""
from __future__ import absolute_import
-
+import errno
import os
+
import cherrypy
-from cherrypy import tools
from mgr_module import MgrModule
from .controllers.auth import Auth
# cherrypy likes to sys.exit on error. don't let it take us down too!
+# pylint: disable=W0613
def os_exit_noop(*args):
pass
+# pylint: disable=W0212
os._exit = os_exit_noop
}
]
- def __init__(self, *args, **kwargs):
- super(Module, self).__init__(*args, **kwargs)
-
def serve(self):
server_addr = self.get_localized_config('server_addr', '::')
server_port = self.get_localized_config('server_port', '8080')
'no server_addr configured; '
'try "ceph config-key put mgr/{}/{}/server_addr <ip>"'
.format(self.module_name, self.get_mgr_id()))
- self.log.info("server_addr: %s server_port: %s" % (server_addr,
- server_port))
+ self.log.info("server_addr: %s server_port: %s", server_addr,
+ server_port)
cherrypy.config.update({
- 'server.socket_host': server_addr,
- 'server.socket_port': int(server_port),
- })
+ 'server.socket_host': server_addr,
+ 'server.socket_port': int(server_port),
+ })
cherrypy.tools.autenticate = cherrypy.Tool('before_handler',
Auth.check_auth)
hashed_passwd = Auth.password_hash(cmd['password'])
self.set_localized_config('password', hashed_passwd)
return 0, 'Username and password updated', ''
- else:
- return (-errno.EINVAL, '', 'Command not found \'{0}\''.format(
- cmd['prefix']))
+
+ return (-errno.EINVAL, '', 'Command not found \'{0}\''
+ .format(cmd['prefix']))
class ApiRoot(object):
def __init__(self, mgrmod):
pluggy==0.6.0
portend==2.2
py==1.5.2
+pycodestyle==2.3.1
+pycparser==2.18
pylint==1.8.2
pytest==3.3.2
pytest-cov==2.5.1
# -*- coding: utf-8 -*-
-
+# pylint: disable=W0212
from __future__ import absolute_import
import json
cherrypy.tree.mount(Ping(), "/api/test",
config={'/': {'tools.autenticate.on': True}})
- module.set_localized_config('session-expire','2')
- module.set_localized_config('username','admin')
+ module.set_localized_config('session-expire', '2')
+ module.set_localized_config('username', 'admin')
module.set_localized_config('password', Auth.password_hash('admin'))
def test_login_valid(self):
self._post("/api/auth", {'username': 'admin', 'password': 'admin'})
self.assertStatus('201 Created')
self.assertBody('{"username": "admin"}')
- self.assertEquals(sess_mock.get(Auth.SESSION_KEY), 'admin')
+ self.assertEqual(sess_mock.get(Auth.SESSION_KEY), 'admin')
def test_login_invalid(self):
sess_mock = RamSession()
self._post("/api/auth", {'username': 'admin', 'password': 'inval'})
self.assertStatus('403 Forbidden')
self.assertBody('{"detail": "Invalid credentials"}')
- self.assertEquals(sess_mock.get(Auth.SESSION_KEY), None)
+ self.assertEqual(sess_mock.get(Auth.SESSION_KEY), None)
def test_logout(self):
sess_mock = RamSession()
with patch('cherrypy.session', sess_mock, create=True):
self._post("/api/auth", {'username': 'admin', 'password': 'admin'})
- self.assertEquals(sess_mock.get(Auth.SESSION_KEY), 'admin')
+ self.assertEqual(sess_mock.get(Auth.SESSION_KEY), 'admin')
self._delete("/api/auth")
self.assertStatus('204 No Content')
self.assertBody('')
- self.assertEquals(sess_mock.get(Auth.SESSION_KEY), None)
+ self.assertEqual(sess_mock.get(Auth.SESSION_KEY), None)
def test_session_expire(self):
sess_mock = RamSession()
with patch('cherrypy.session', sess_mock, create=True):
self._post("/api/auth", {'username': 'admin', 'password': 'admin'})
self.assertStatus('201 Created')
- self.assertEquals(sess_mock.get(Auth.SESSION_KEY), 'admin')
+ self.assertEqual(sess_mock.get(Auth.SESSION_KEY), 'admin')
self._post("/api/test/ping")
self.assertStatus('200 OK')
- self.assertEquals(sess_mock.get(Auth.SESSION_KEY), 'admin')
+ self.assertEqual(sess_mock.get(Auth.SESSION_KEY), 'admin')
time.sleep(3)
self._post("/api/test/ping")
self.assertStatus('401 Unauthorized')
- self.assertEquals(sess_mock.get(Auth.SESSION_KEY), None)
+ self.assertEqual(sess_mock.get(Auth.SESSION_KEY), None)
def test_unauthorized(self):
sess_mock = RamSession()
with patch('cherrypy.session', sess_mock, create=True):
self._post("/api/test/ping")
self.assertStatus('401 Unauthorized')
- self.assertEquals(sess_mock.get(Auth.SESSION_KEY), None)
+ self.assertEqual(sess_mock.get(Auth.SESSION_KEY), None)
def test_echo_args(self):
self._post("/api/echo1", {'msg': 'Hello World'})
self.assertStatus('201 Created')
-
from .helper import ApiControllerTestCase
from ..tools import RESTController
-
+# pylint: disable=W0613
class FooResource(RESTController):
elems = []
def bulk_delete(self):
FooResource.elems = []
-
+# pylint: disable=C0102
class Root(object):
foo = FooResource()
# -*- coding: utf-8 -*-
+# pylint: disable=W0212
from __future__ import absolute_import
import importlib
is_element = len(vpath) > 0
(method_name, status_code) = self._method_mapping[
- (cherrypy.request.method, is_element)]
+ (cherrypy.request.method, is_element)]
method = getattr(self, method_name, None)
if not method:
self._not_implemented(is_element)
func._args_from_json_ = True
return func
+ # pylint: disable=W1505
@staticmethod
def _takes_json(func):
def inner(*args, **kwargs):
body = cherrypy.request.body.read(content_length)
if not body:
raise cherrypy.HTTPError(400, 'Empty body. Content-Length={}'
- .format(content_length))
+ .format(content_length))
try:
data = json.loads(body.decode('utf-8'))
except Exception as e:
raise cherrypy.HTTPError(400, 'Failed to decode JSON: {}'
- .format(str(e)))
+ .format(str(e)))
if hasattr(func, '_args_from_json_'):
- f_args = inspect.getargspec(func).args
+ if sys.version_info > (3, 0):
+ f_args = list(inspect.signature(func).parameters.keys())
+ else:
+ f_args = inspect.getargspec(func).args[1:]
n_args = []
for arg in args:
n_args.append(arg)
- for arg in f_args[1:]:
+ for arg in f_args:
if arg in data:
n_args.append(data[arg])
data.pop(arg)
kwargs.update(data)
return func(*n_args, **kwargs)
- else:
- return func(data, *args, **kwargs)
+
+ return func(data, *args, **kwargs)
return inner
@staticmethod
{envbindir}/py.test --cov=. --cov-report=term --cov-report=xml --junitxml=junit.xml tests/
[testenv:lint]
-deps=
- pylint
- pycodestyle
+deps=-r{toxinidir}/requirements.txt
commands=
- pylint --rcfile=.pylintrc --jobs=5 .
- pycodestyle --max-line-length=100 --exclude=ceph_module_mock.py,python2.7,tests,.tox,venv .
-
+ pylint --rcfile=.pylintrc --jobs=5 . module.py tools.py ceph_module_mock.py controllers tests
+ pycodestyle --max-line-length=100 --exclude=python2.7,.tox,venv .