def __init__(self, name, metadata, wait_for=5.0, exception_handler=None):
self.name = name
if isinstance(metadata, list):
- self.metadata = dict([(e[1:-1], e) for e in metadata])
+ self.metadata = {e[1:-1]: e for e in metadata}
else:
self.metadata = metadata
self.wait_for = wait_for
)
df = mgr.get("df")
- pool_stats = dict([(p['id'], p['stats']) for p in df['pools']])
+ pool_stats = {p['id']: p['stats'] for p in df['pools']}
osdmap = mgr.get("osd_map")
- pools = dict([(p['pool'], p) for p in osdmap['pools']])
+ pools = {p['pool']: p for p in osdmap['pools']}
metadata_pool_id = mdsmap['metadata_pool']
data_pool_ids = mdsmap['data_pools']
def_value = param['default'] if 'default' in param else None
if param_name.startswith("is_"):
return "boolean"
- elif "size" in param_name:
+ if "size" in param_name:
return "integer"
- elif "count" in param_name:
+ if "count" in param_name:
return "integer"
- elif "num" in param_name:
+ if "num" in param_name:
return "integer"
- elif isinstance(def_value, bool):
+ if isinstance(def_value, bool):
return "boolean"
- elif isinstance(def_value, int):
+ if isinstance(def_value, int):
return "integer"
return "string"
# Because 'shec' is experimental it's not included
'plugins': config['osd_erasure_code_plugins'].split() + ['shec'],
'directory': config['erasure_code_dir'],
- 'devices': list(set([device['class'] for device in osd_map_crush['devices']])),
+ 'devices': list({device['class'] for device in osd_map_crush['devices']}),
'failure_domains': [domain['name'] for domain in osd_map_crush['types']],
'names': [name for name, _ in
mgr.get('osd_map').get('erasure_code_profiles', {}).items()]
'health': 'Unknown'
}
for _, pool_data in daemon['status'].items():
- if (health['health'] != 'error' and
- [k for k, v in pool_data.get('callouts', {}).items()
- if v['level'] == 'error']):
+ if (health['health'] != 'error'
+ and [k for k, v in pool_data.get('callouts', {}).items()
+ if v['level'] == 'error']):
health = {
'health_color': 'error',
'health': 'Error'
}
- elif (health['health'] != 'error' and
- [k for k, v in pool_data.get('callouts', {}).items()
- if v['level'] == 'warning']):
+ elif (health['health'] != 'error'
+ and [k for k, v in pool_data.get('callouts', {}).items()
+ if v['level'] == 'warning']):
health = {
'health_color': 'warning',
'health': 'Warning'
stats['image_local_count'] = pool_data.get('image_local_count', 0)
stats['image_remote_count'] = pool_data.get('image_remote_count', 0)
- if (stats.get('health_color', '') != 'error' and
- pool_data.get('image_error_count', 0) > 0):
+ if (stats.get('health_color', '') != 'error'
+ and pool_data.get('image_error_count', 0) > 0):
stats['health_color'] = 'error'
stats['health'] = 'Error'
- elif (stats.get('health_color', '') != 'error' and
- pool_data.get('image_warning_count', 0) > 0):
+ elif (stats.get('health_color', '') != 'error'
+ and pool_data.get('image_warning_count', 0) > 0):
stats['health_color'] = 'warning'
stats['health'] = 'Warning'
elif stats.get('health', None) is None:
def set(self, pool_name, mirror_mode=None):
def _edit(ioctx, mirror_mode=None):
if mirror_mode:
- mode_enum = dict([[x[1], x[0]] for x in
- self.MIRROR_MODES.items()]).get(mirror_mode, None)
+ mode_enum = {x[1]: x[0] for x in
+ self.MIRROR_MODES.items()}.get(mirror_mode, None)
if mode_enum is None:
raise rbd.Error('invalid mirror mode "{}"'.format(mirror_mode))
v = StrictVersion(cherrypy.__version__)
# It was fixed in 3.7.0. Exact lower bound version is probably earlier,
# but 3.5.0 is what this monkey patch is tested on.
- if v >= StrictVersion("3.5.0") and v < StrictVersion("3.7.0"):
+ if StrictVersion("3.5.0") <= v < StrictVersion("3.7.0"):
from cherrypy.wsgiserver.wsgiserver2 import HTTPConnection,\
CP_fileobject
res = handle_sso_command(cmd)
if res[0] != -errno.ENOSYS:
return res
- elif cmd['prefix'] == 'dashboard set-jwt-token-ttl':
+ if cmd['prefix'] == 'dashboard set-jwt-token-ttl':
self.set_module_option('jwt_token_ttl', str(cmd['seconds']))
return 0, 'JWT token TTL updated', ''
- elif cmd['prefix'] == 'dashboard get-jwt-token-ttl':
+ if cmd['prefix'] == 'dashboard get-jwt-token-ttl':
ttl = self.get_module_option('jwt_token_ttl', JwtManager.JWT_TOKEN_TTL)
return 0, str(ttl), ''
- elif cmd['prefix'] == 'dashboard create-self-signed-cert':
+ if cmd['prefix'] == 'dashboard create-self-signed-cert':
self.create_self_signed_cert()
return 0, 'Self-signed certificate created', ''
def get_updated_pool_stats(self):
df = self.get('df')
- pool_stats = dict([(p['id'], p['stats']) for p in df['pools']])
+ pool_stats = {p['id']: p['stats'] for p in df['pools']}
now = time.time()
for pool_id, stats in pool_stats.items():
for stat_name, stat_val in stats.items():
# this roles cannot be deleted nor updated
# admin role provides all permissions for all scopes
-ADMIN_ROLE = Role('administrator', 'Administrator', dict([
- (scope_name, Permission.all_permissions())
+ADMIN_ROLE = Role('administrator', 'Administrator', {
+ scope_name: Permission.all_permissions()
for scope_name in Scope.all_scopes()
-]))
+})
# read-only role provides read-only permission for all scopes
-READ_ONLY_ROLE = Role('read-only', 'Read-Only', dict([
- (scope_name, [_P.READ]) for scope_name in Scope.all_scopes()
+READ_ONLY_ROLE = Role('read-only', 'Read-Only', {
+ scope_name: [_P.READ] for scope_name in Scope.all_scopes()
if scope_name != Scope.DASHBOARD_SETTINGS
-]))
+})
# block manager role provides all permission for block related scopes
@classmethod
def from_dict(cls, u_dict, roles):
return User(u_dict['username'], u_dict['password'], u_dict['name'],
- u_dict['email'], set([roles[r] for r in u_dict['roles']]),
+ u_dict['email'], {roles[r] for r in u_dict['roles']},
u_dict['lastUpdate'])
def save(self):
with self.lock:
db = {
- 'users': dict([(un, u.to_dict()) for un, u in self.users.items()]),
- 'roles': dict([(rn, r.to_dict()) for rn, r in self.roles.items()]),
+ 'users': {un: u.to_dict() for un, u in self.users.items()},
+ 'roles': {rn: r.to_dict() for rn, r in self.roles.items()},
'version': self.version
}
mgr.set_store(self.accessdb_config_key(), json.dumps(db))
return db
db = json.loads(json_db)
- roles = dict([(rn, Role.from_dict(r))
- for rn, r in db.get('roles', {}).items()])
- users = dict([(un, User.from_dict(u, dict(roles, **SYSTEM_ROLES)))
- for un, u in db.get('users', {}).items()])
+ roles = {rn: Role.from_dict(r)
+ for rn, r in db.get('roles', {}).items()}
+ users = {un: User.from_dict(u, dict(roles, **SYSTEM_ROLES))
+ for un, u in db.get('users', {}).items()}
return cls(db['version'], users, roles)
data = mgr.get_counter(svc_type, svc_name, path)[path]
if not data:
return [(0, 0.0)]
- elif len(data) == 1:
+ if len(data) == 1:
return [(data[0][0], 0.0)]
return [(data2[0], differentiate(data1, data2)) for data1, data2 in pairwise(data)]
except RequestException as e:
if e.status_code == 404:
return False
- else:
- raise e
+
+ raise e
@RestClient.api_put('/{bucket_name}')
def create_bucket(self, bucket_name, request=None):
name=metadata['image_name'])
perf_key = "{}lock_acquired_time".format(perf_key_prefix)
lock_acquired_time = (mgr.get_counter(
- 'tcmu-runner', service_id, perf_key)[perf_key] or
- [[0, 0]])[-1][1] / 1000000000
+ 'tcmu-runner', service_id, perf_key)[perf_key]
+ or [[0, 0]])[-1][1] / 1000000000
if lock_acquired_time > image.get('optimized_since', 0):
image['optimized_daemon'] = hostname
image['optimized_since'] = lock_acquired_time
def _get_counter(_daemon_type, daemon_name, _stat):
if daemon_name == 'ceph-dev1:pool1/image1':
return mocked_get_counter1
- elif daemon_name == 'ceph-dev2:pool1/image1':
+ if daemon_name == 'ceph-dev2:pool1/image1':
return mocked_get_counter2
return Exception('invalid daemon name')
# pylint: disable=raising-bad-type
raise self.exception
return ViewCache.VALUE_OK, self.value
- elif self.value_when is not None:
+ if self.value_when is not None:
# We have some data, but it doesn't meet freshness requirements
return ViewCache.VALUE_STALE, self.value
# We have no data, not even stale data
func = func.__wrapped__
except AttributeError:
pass
+ # pylint: disable=deprecated-method
return _getargspec(func)