Refactor some variables and method names to fulfill the Python naming convention.
Signed-off-by: Volker Theile <vtheile@suse.com>
from ..plugins import PLUGIN_MANAGER
-def EndpointDoc(description="", group="", parameters=None, responses=None):
+def EndpointDoc(description="", group="", parameters=None, responses=None): # noqa: N802
if not isinstance(description, str):
raise Exception("%s has been called with a description that is not a string: %s"
% (EndpointDoc.__name__, description))
secure=secure)
-def Endpoint(method=None, path=None, path_params=None, query_params=None,
+def Endpoint(method=None, path=None, path_params=None, query_params=None, # noqa: N802
json_response=True, proxy=False, xml=False):
if method is None:
return _wrapper
-def Proxy(path=None):
+def Proxy(path=None): # noqa: N802
if path is None:
path = ""
elif path == "/":
return wrapper
@staticmethod
- def Resource(method=None, path=None, status=None, query_params=None):
+ def Resource(method=None, path=None, status=None, query_params=None): # noqa: N802
if not method:
method = 'GET'
return _wrapper
@staticmethod
- def Collection(method=None, path=None, status=None, query_params=None):
+ def Collection(method=None, path=None, status=None, query_params=None): # noqa: N802
if not method:
method = 'GET'
func._security_permissions = list(set(permissions))
-def ReadPermission(func):
+def ReadPermission(func): # noqa: N802
_set_func_permissions(func, Permission.READ)
return func
-def CreatePermission(func):
+def CreatePermission(func): # noqa: N802
_set_func_permissions(func, Permission.CREATE)
return func
-def DeletePermission(func):
+def DeletePermission(func): # noqa: N802
_set_func_permissions(func, Permission.DELETE)
return func
-def UpdatePermission(func):
+def UpdatePermission(func): # noqa: N802
_set_func_permissions(func, Permission.UPDATE)
return func
result = {}
mds_names = self._get_mds_names(fs_id)
- def __to_second(point):
+ def _to_second(point):
return (point[0] // 1000000000, point[1])
for mds_name in mds_names:
data = mgr.get_counter("mds", mds_name, counter)
if data is not None:
result[mds_name][counter] = list(
- map(__to_second, data[counter]))
+ map(_to_second, data[counter]))
else:
result[mds_name][counter] = []
self._updateable_at_runtime([name])
# Update config option
- availSections = ['global', 'mon', 'mgr', 'osd', 'mds', 'client']
+ avail_sections = ['global', 'mon', 'mgr', 'osd', 'mds', 'client']
- for section in availSections:
+ for section in avail_sections:
for entry in value:
if not entry['value']:
break
if endpoint.is_api or all_endpoints:
list_of_ctrl.add(endpoint.ctrl)
- TAG_MAP = {}
+ tag_map = {}
for ctrl in list_of_ctrl:
tag_name = ctrl.__name__
tag_descr = ""
if ctrl.doc_info['tag']:
tag_name = ctrl.doc_info['tag']
tag_descr = ctrl.doc_info['tag_descr']
- if tag_name not in TAG_MAP or not TAG_MAP[tag_name]:
- TAG_MAP[tag_name] = tag_descr
+ if tag_name not in tag_map or not tag_map[tag_name]:
+ tag_map[tag_name] = tag_descr
tags = [{'name': k, 'description': v if v else "*No description available*"}
- for k, v in TAG_MAP.items()]
+ for k, v in tag_map.items()]
tags.sort(key=lambda e: e['name'])
return tags
return parameters
@classmethod
- def _gen_paths(cls, all_endpoints, baseUrl):
- METHOD_ORDER = ['get', 'post', 'put', 'delete']
+ def _gen_paths(cls, all_endpoints, base_url):
+ method_order = ['get', 'post', 'put', 'delete']
paths = {}
for path, endpoints in sorted(list(ENDPOINT_MAP.items()),
key=lambda p: p[0]):
skip = False
endpoint_list = sorted(endpoints, key=lambda e:
- METHOD_ORDER.index(e.method.lower()))
+ method_order.index(e.method.lower()))
for endpoint in endpoint_list:
if not endpoint.is_api and not all_endpoints:
skip = True
methods[method.lower()]['security'] = [{'jwt': []}]
if not skip:
- paths[path[len(baseUrl):]] = methods
+ paths[path[len(base_url):]] = methods
return paths
class Logging(BaseController):
@Endpoint('POST', path='js-error')
- def jsError(self, url, message, stack=None):
+ def jsError(self, url, message, stack=None): # noqa: N802
logger.error('frontend error (%s): %s\n %s\n', url, message, stack)
# pylint: disable=not-callable
-def NfsTask(name, metadata, wait_for):
+def NfsTask(name, metadata, wait_for): # noqa: N802
def composed_decorator(func):
return Task("nfs/{}".format(name), metadata, wait_for,
partial(serialize_dashboard_exception,
# pylint: disable=not-callable
-def RbdTask(name, metadata, wait_for):
+def RbdTask(name, metadata, wait_for): # noqa: N802
def composed_decorator(func):
func = handle_rados_error('pool')(func)
func = handle_rbd_error()(func)
journaling depends on exclusive-lock
fast-diff depends on object-map
"""
- ORDER = ['exclusive-lock', 'journaling', 'object-map', 'fast-diff']
+ ORDER = ['exclusive-lock', 'journaling', 'object-map', 'fast-diff'] # noqa: N806
def key_func(feat):
try:
# pylint: disable=not-callable
-def RbdMirroringTask(name, metadata, wait_for):
+def RbdMirroringTask(name, metadata, wait_for): # noqa: N802
def composed_decorator(func):
func = handle_rbd_mirror_error()(func)
return Task("rbd/mirroring/{}".format(name), metadata, wait_for,
from mgr_module import CLICommand, Option
from . import PLUGIN_MANAGER as PM
-from . import interfaces as I # noqa: E741
+from . import interfaces as I # noqa: E741,N812
from .ttl_cache import ttl_cache
from ..controllers.rbd import Rbd, RbdSnapshot, RbdTrash
@ApiController('/feature_toggles')
class FeatureTogglesEndpoint(RESTController):
- def list(_): # pylint: disable=no-self-argument
+ def list(_): # pylint: disable=no-self-argument # noqa: N805
return {
# pylint: disable=protected-access
feature.value: self._is_feature_enabled(feature)
class User(object):
def __init__(self, username, password, name=None, email=None, roles=None,
- lastUpdate=None, enabled=True):
+ last_update=None, enabled=True):
self.username = username
self.password = password
self.name = name
self.roles = set()
else:
self.roles = roles
- if lastUpdate is None:
- self.refreshLastUpdate()
+ if last_update is None:
+ self.refresh_last_update()
else:
- self.lastUpdate = lastUpdate
+ self.last_update = last_update
self._enabled = enabled
- def refreshLastUpdate(self):
- self.lastUpdate = int(time.time())
+ def refresh_last_update(self):
+ self.last_update = int(time.time())
@property
def enabled(self):
@enabled.setter
def enabled(self, value):
self._enabled = value
- self.refreshLastUpdate()
+ self.refresh_last_update()
def set_password(self, password):
self.set_password_hash(password_hash(password))
def set_password_hash(self, hashed_password):
self.password = hashed_password
- self.refreshLastUpdate()
+ self.refresh_last_update()
def compare_password(self, password):
"""
def set_roles(self, roles):
self.roles = set(roles)
- self.refreshLastUpdate()
+ self.refresh_last_update()
def add_roles(self, roles):
self.roles = self.roles.union(set(roles))
- self.refreshLastUpdate()
+ self.refresh_last_update()
def del_roles(self, roles):
for role in roles:
if role not in self.roles:
raise RoleNotInUser(role.name, self.username)
self.roles.difference_update(set(roles))
- self.refreshLastUpdate()
+ self.refresh_last_update()
def authorize(self, scope, permissions):
for role in self.roles:
'roles': sorted([r.name for r in self.roles]),
'name': self.name,
'email': self.email,
- 'lastUpdate': self.lastUpdate,
+ 'lastUpdate': self.last_update,
'enabled': self.enabled
}
return
for _, user in self.users.items():
if role in user.roles:
- user.refreshLastUpdate()
+ user.refresh_last_update()
def save(self):
with self.lock:
dtoken = JwtManager.decode_token(token)
if not JwtManager.is_blacklisted(dtoken['jti']):
user = AuthManager.get_user(dtoken['username'])
- if user.lastUpdate <= dtoken['iat']:
+ if user.last_update <= dtoken['iat']:
return user
- logger.debug("AMT: user info changed after token was issued, iat=%s lastUpdate=%s",
- dtoken['iat'], user.lastUpdate)
+ logger.debug("AMT: user info changed after token was issued, iat=%s last_update=%s",
+ dtoken['iat'], user.last_update)
else:
logger.debug('AMT: Token is black-listed')
except jwt.exceptions.ExpiredSignatureError:
import re
import ipaddress
from distutils.util import strtobool
-import xml.etree.ElementTree as ET
+import xml.etree.ElementTree as ET # noqa: N814
import six
from ..awsauth import S3Auth
from ..settings import Settings, Options
logger.info("task finished immediately")
return
- res = self.jsonBody()
+ res = self.json_body()
self.assertIsInstance(res, dict)
self.assertIn('name', res)
self.assertIn('metadata', res)
self.task_metadata)
time.sleep(1)
self.tc._get('/api/task?name={}'.format(self.task_name))
- res = self.tc.jsonBody()
+ res = self.tc.json_body()
for task in res['finished_tasks']:
if task['metadata'] == self.task_metadata:
# task finished
def _task_put(self, url, data=None, timeout=60):
self._task_request('PUT', url, data, timeout)
- def jsonBody(self):
+ def json_body(self):
body_str = self.body.decode('utf-8') if isinstance(self.body, bytes) else self.body
return json.loads(body_str)
- def assertJsonBody(self, data, msg=None):
+ def assertJsonBody(self, data, msg=None): # noqa: N802
"""Fail if value != self.body."""
- json_body = self.jsonBody()
+ json_body = self.json_body()
if data != json_body:
if msg is None:
msg = 'expected body:\n%r\n\nactual body:\n%r' % (
data, json_body)
self._handlewebError(msg)
- def assertInJsonBody(self, data, msg=None):
- json_body = self.jsonBody()
+ def assertInJsonBody(self, data, msg=None): # noqa: N802
+ json_body = self.json_body()
if data not in json_body:
if msg is None:
msg = 'expected %r to be in %r' % (data, json_body)
self.assertNotIn(rolename, db['roles'])
def validate_persistent_user(self, username, roles, password=None,
- name=None, email=None, lastUpdate=None,
+ name=None, email=None, last_update=None,
enabled=True):
db = self.load_persistent_db()
self.assertIn('users', db)
self.assertEqual(db['users'][username]['name'], name)
if email:
self.assertEqual(db['users'][username]['email'], email)
- if lastUpdate:
- self.assertEqual(db['users'][username]['lastUpdate'], lastUpdate)
+ if last_update:
+ self.assertEqual(db['users'][username]['lastUpdate'], last_update)
self.assertEqual(db['users'][username]['enabled'], enabled)
def validate_persistent_no_user(self, username):
)
self._get('/foo/wait_task_exception')
- while self.jsonBody():
+ while self.json_body():
time.sleep(0.5)
self._get('/foo/wait_task_exception')
def test_internal_server_error(self):
self._get('/foo/internal_server_error')
self.assertStatus(500)
- self.assertIn('unexpected condition', self.jsonBody()['detail'])
+ self.assertIn('unexpected condition', self.json_body()['detail'])
def test_404(self):
self._get('/foonot_found')
self.assertStatus(404)
- self.assertIn('detail', self.jsonBody())
+ self.assertIn('detail', self.json_body())
with self._mock_osd_list(osd_stat_ids=osds_actual, osdmap_tree_node_ids=osds_leftover,
osdmap_ids=osds_actual):
self._get('/api/osd')
- self.assertEqual(len(self.jsonBody()), 2, 'It should display two OSDs without failure')
+ self.assertEqual(len(self.json_body()), 2, 'It should display two OSDs without failure')
self.assertStatus(200)
self._get('/api/prometheus/notifications?from=' + next_to_last['id'])
forelast = PrometheusReceiver.notifications[1]
last = PrometheusReceiver.notifications[2]
- self.assertEqual(self.jsonBody(), [forelast, last])
+ self.assertEqual(self.json_body(), [forelast, last])
@mock.patch('dashboard.controllers.rbd_mirroring.rbd')
def test_default(self, rbd_mock): # pylint: disable=W0613
self._get('/test/api/block/mirroring/summary')
- result = self.jsonBody()
+ result = self.json_body()
self.assertStatus(200)
self.assertEqual(result['status'], 0)
for k in ['daemons', 'pools', 'image_error', 'image_syncing', 'image_ready']:
self._get('/test/api/summary')
self.assertStatus(200)
- summary = self.jsonBody()['rbd_mirroring']
+ summary = self.json_body()['rbd_mirroring']
self.assertEqual(summary, {'errors': 0, 'warnings': 1})
def test_settings_list(self):
self._get('/api/settings')
- data = self.jsonBody()
+ data = self.json_body()
self.assertTrue(len(data) > 0)
self.assertStatus(200)
self.assertIn('default', data[0].keys())
self.assertInJsonBody('type')
self.assertInJsonBody('name')
self.assertInJsonBody('value')
- self.assertEqual(self.jsonBody()['value'], 'foo')
+ self.assertEqual(self.json_body()['value'], 'foo')
def test_bulk_set(self):
self._put('/api/settings', {
self._get('/api/settings/grafana-api-username')
self.assertStatus(200)
- body = self.jsonBody()
+ body = self.json_body()
self.assertEqual(body['value'], 'foo')
self._get('/api/settings/grafana-api-username')
self.assertStatus(200)
- self.assertEqual(self.jsonBody()['value'], 'foo')
+ self.assertEqual(self.json_body()['value'], 'foo')
self._get('/api/settings/grafana-api-host')
self.assertStatus(200)
- self.assertEqual(self.jsonBody()['value'], 'somehost')
+ self.assertEqual(self.json_body()['value'], 'somehost')
TaskManager.FINISHED_TASK_SIZE = 10
TaskManager.FINISHED_TASK_TTL = 60.0
- def assertTaskResult(self, result):
+ def assertTaskResult(self, result): # noqa: N802
self.assertEqual(result,
{'args': ['dummy arg'], 'kwargs': {'dummy': 'arg'}})
def test_not_implemented(self):
self._put("/foo")
self.assertStatus(404)
- body = self.jsonBody()
+ body = self.json_body()
self.assertIsInstance(body, dict)
assert body['detail'] == "The path '/foo' was not found."
assert '404' in body['status']
raise Exception("n_types param is neither a string nor a list")
for ev_type in n_types:
listeners = cls._listeners[ev_type]
- toRemove = None
+ to_remove = None
for pr, fn in listeners:
if fn == func:
- toRemove = (pr, fn)
+ to_remove = (pr, fn)
break
- if toRemove:
- listeners.discard(toRemove)
+ if to_remove:
+ listeners.discard(to_remove)
logger.debug("NQ: function %s was deregistered for events "
"of type %s", func, ev_type)