@ApiController('/cephfs', Scope.CEPHFS)
class CephFS(RESTController):
- def __init__(self): #pragma: no cover
+ def __init__(self): # pragma: no cover
super(CephFS, self).__init__()
# Stateful instances of CephFSClients, hold cached results. Key to
info['name'],
"mds_server.handle_client_request")
else:
- activity = 0.0 #pragma: no cover
+ activity = 0.0 # pragma: no cover
self._append_mds_metadata(mds_versions, info['name'])
rank_table.append(
# indepdendent of whether it's a kernel or userspace
# client, so that the javascript doesn't have to grok that.
for client in clients:
- if "ceph_version" in client['client_metadata']: #pragma: no cover - no complexity there
+ if "ceph_version" in client['client_metadata']: # pragma: no cover - no complexity
client['type'] = "userspace"
client['version'] = client['client_metadata']['ceph_version']
client['hostname'] = client['client_metadata']['hostname']
- elif "kernel_version" in client['client_metadata']: #pragma: no cover - no complexity there
+ elif "kernel_version" in client['client_metadata']: # pragma: no cover - no complexity
client['type'] = "kernel"
client['version'] = client['client_metadata']['kernel_version']
client['hostname'] = client['client_metadata']['hostname']
- else: #pragma: no cover - no complexity there
+ else: # pragma: no cover - no complexity there
client['type'] = "unknown"
client['version'] = ""
client['hostname'] = ""
"""
try:
return self._get_root_directory(self._cephfs_instance(fs_id))
- except (cephfs.PermissionError, cephfs.ObjectNotFound): #pragma: no cover - the handling is too obvious
+ except (cephfs.PermissionError, cephfs.ObjectNotFound): # pragma: no cover
return None
def _get_root_directory(self, cfs):
try:
cfs = self._cephfs_instance(fs_id)
paths = cfs.ls_dir(path, depth)
- except (cephfs.PermissionError, cephfs.ObjectNotFound): #pragma: no cover - the handling is too obvious
+ except (cephfs.PermissionError, cephfs.ObjectNotFound): # pragma: no cover
paths = []
return paths
paths = cfs.ls_dir(path, depth)
if path == os.sep:
paths = [self._get_root_directory(cfs)] + paths
- except (cephfs.PermissionError, cephfs.ObjectNotFound): #pragma: no cover - the handling is too obvious
+ except (cephfs.PermissionError, cephfs.ObjectNotFound): # pragma: no cover
paths = []
return paths
@raise_if_no_orchestrator
@handle_orchestrator_error('host')
@host_task('create', {'hostname': '{hostname}'})
- def create(self, hostname): #pragma: no cover - requires realtime env
+ def create(self, hostname): # pragma: no cover - requires realtime env
orch_client = OrchClient.instance()
self._check_orchestrator_host_op(orch_client, hostname, True)
orch_client.hosts.add(hostname)
@raise_if_no_orchestrator
@handle_orchestrator_error('host')
@host_task('delete', {'hostname': '{hostname}'})
- def delete(self, hostname): #pragma: no cover - requires realtime env
+ def delete(self, hostname): # pragma: no cover - requires realtime env
orch_client = OrchClient.instance()
self._check_orchestrator_host_op(orch_client, hostname, False)
orch_client.hosts.remove(hostname)
- def _check_orchestrator_host_op(self, orch_client, hostname, add_host=True): #pragma: no cover - requires realtime env
+ def _check_orchestrator_host_op(self, orch_client, hostname, add_host=True): # pragma:no cover
"""Check if we can adding or removing a host with orchestrator
:param orch_client: Orchestrator client
ganesha_conf = GaneshaConf.instance(cluster_id)
if not ganesha_conf.has_export(export_id):
- raise cherrypy.HTTPError(404) #pragma: no cover - the handling is too obvious
+ raise cherrypy.HTTPError(404) # pragma: no cover - the handling is too obvious
if fsal['name'] not in Ganesha.fsals_available():
raise NFSException("Cannot make modifications to this export. "
ganesha_conf = GaneshaConf.instance(cluster_id)
if not ganesha_conf.has_export(export_id):
- raise cherrypy.HTTPError(404) #pragma: no cover - the handling is too obvious
+ raise cherrypy.HTTPError(404) # pragma: no cover - the handling is too obvious
export = ganesha_conf.remove_export(export_id)
if reload_daemons:
ganesha_conf.reload_daemons(export.daemons)
return Ganesha.fsals_available()
@Endpoint('GET', '/lsdir')
- def lsdir(self, root_dir=None, depth=1): #pragma: no cover
+ def lsdir(self, root_dir=None, depth=1): # pragma: no cover
if root_dir is None:
root_dir = "/"
depth = int(depth)
def inner(self, *args, **kwargs):
orch = OrchClient.instance()
if not orch.available():
- raise DashboardException(code='orchestrator_status_unavailable', #pragma: no cover
+ raise DashboardException(code='orchestrator_status_unavailable', # pragma: no cover
msg='Orchestrator is unavailable',
component='orchestrator',
http_status_code=503)
@raise_if_no_orchestrator
@handle_orchestrator_error('osd')
@orchestrator_task('identify_device', ['{hostname}', '{device}'])
- def identify_device(self, hostname, device, duration): #pragma: no cover - requires realtime env
+ def identify_device(self, hostname, device, duration): # pragma: no cover
# type: (str, str, int) -> None
"""
Identify a device by switching on the device light for N seconds.
for inventory_host in inventory_hosts:
host_osds = device_osd_map.get(inventory_host['name'])
for device in inventory_host['devices']:
- if host_osds: #pragma: no cover
+ if host_osds: # pragma: no cover
dev_name = os.path.basename(device['path'])
device['osd_ids'] = sorted(host_osds.get(dev_name, []))
else:
from ..tools import str_to_bool
try:
from typing import Dict, List, Any, Union # noqa: F401 pylint: disable=unused-import
-except ImportError: #pragma: no cover
- pass # For typing only #pragma: no cover
+except ImportError: # pragma: no cover
+ pass # For typing only
logger = logging.getLogger('controllers.osd')
osd['stats_history'] = {}
osd_spec = str(osd_id)
if 'osd' not in osd:
- continue #pragma: no cover - simple early continue
+ continue # pragma: no cover - simple early continue
for stat in ['osd.op_w', 'osd.op_in_bytes', 'osd.op_r', 'osd.op_out_bytes']:
prop = stat.split('.')[1]
rates = CephService.get_rates('osd', osd_spec, stat)
try:
histogram = CephService.send_command(
'osd', srv_spec=svc_id, prefix='perf histogram dump')
- except SendCommandError as e: #pragma: no cover - the handling is too obvious
- if 'osd down' in str(e): #pragma: no cover - no complexity there
+ except SendCommandError as e: # pragma: no cover - the handling is too obvious
+ if 'osd down' in str(e): # pragma: no cover - no complexity there
histogram = str(e)
- else: #pragma: no cover - no complexity there
+ else: # pragma: no cover - no complexity there
raise
return {
'histogram': histogram,
}
- def set(self, svc_id, device_class): #pragma: no cover
+ def set(self, svc_id, device_class): # pragma: no cover
old_device_class = CephService.send_command('mon', 'osd crush get-device-class',
ids=[svc_id])
old_device_class = old_device_class[0]['device_class']
@raise_if_no_orchestrator
@handle_orchestrator_error('osd')
@osd_task('delete', {'svc_id': '{svc_id}'})
- def delete(self, svc_id, force=None): #pragma: no cover - requires realtime env
+ def delete(self, svc_id, force=None): # pragma: no cover - requires realtime env
orch = OrchClient.instance()
if not force:
logger.info('Check for removing osd.%s...', svc_id)
try:
from typing import List
-except ImportError: #pragma: no cover
- pass # Just for type checking #pragma: no cover
+except ImportError: # pragma: no cover
+ pass # Just for type checking
logger = logging.getLogger('controllers.rgw')
try:
instance = RgwClient.admin_instance()
# Check if the service is online.
- if not instance.is_service_online(): #pragma: no cover - no complexity there
+ if not instance.is_service_online(): # pragma: no cover - no complexity there
msg = 'Failed to connect to the Object Gateway\'s Admin Ops API.'
raise RequestException(msg)
# Ensure the API user ID is known by the RGW.
instance.userid)
raise RequestException(msg)
# Ensure the system flag is set for the API user ID.
- if not instance.is_system_user(): #pragma: no cover - no complexity there
+ if not instance.is_system_user(): # pragma: no cover - no complexity there
msg = 'The system flag is not set for user "{}".'.format(
instance.userid)
raise RequestException(msg)
lock_retention_period_days,
lock_retention_period_years)
return result
- except RequestException as e: #pragma: no cover - handling is too obvious
+ except RequestException as e: # pragma: no cover - handling is too obvious
raise DashboardException(e, http_status_code=500, component='rgw')
def set(self, bucket, bucket_id, uid, versioning_state=None,
'Object Gateway'.format(uid))
# Finally redirect request to the RGW proxy.
return self.proxy('DELETE', 'user', {'uid': uid}, json_response=False)
- except (DashboardException, RequestException) as e: #pragma: no cover - handling is too obvious
+ except (DashboardException, RequestException) as e: # pragma: no cover
raise DashboardException(e, component='rgw')
# pylint: disable=redefined-builtin
try:
yield result
- except AttributeError: #pragma: no cover - handling is too obvious
- raise cherrypy.NotFound(result) #pragma: no cover - handling is too obvious
+ except AttributeError: # pragma: no cover - handling is too obvious
+ raise cherrypy.NotFound(result) # pragma: no cover - handling is too obvious
@staticmethod
def _to_native(setting):
settings.
:rtype: dict
"""
- return { #pragma: no cover - no complexity there
+ return { # pragma: no cover - no complexity there
'user_pwd_expiration_span':
SettingsModule.USER_PWD_EXPIRATION_SPAN,
'user_pwd_expiration_warning_1':
def _rbd_mirroring(self):
try:
_, data = get_daemons_and_pools()
- except ViewCacheNoDataException: #pragma:no cover
- return {} # pragma: no cover
+ except ViewCacheNoDataException: # pragma: no cover
+ return {} # pragma: no cover
daemons = data.get('daemons', [])
pools = data.get('pools', {})
warnings = 0
errors = 0
for daemon in daemons:
- if daemon['health_color'] == 'error': #pragma:no cover
+ if daemon['health_color'] == 'error': # pragma: no cover
errors += 1
- elif daemon['health_color'] == 'warning': #pragma:no cover
+ elif daemon['health_color'] == 'warning': # pragma: no cover
warnings += 1
for _, pool in pools.items():
- if pool['health_color'] == 'error': #pragma:no cover
+ if pool['health_color'] == 'error': # pragma: no cover
errors += 1
- elif pool['health_color'] == 'warning': #pragma:no cover
+ elif pool['health_color'] == 'warning': # pragma: no cover
warnings += 1
return {'warnings': warnings, 'errors': errors}
- def _task_permissions(self, name): #pragma:no cover
+ def _task_permissions(self, name): # pragma: no cover
result = True
if name == 'pool/create':
result = self._has_permissions(Permission.CREATE, Scope.POOL)
if 'COVERAGE_ENABLED' in os.environ:
import coverage
- __cov = coverage.Coverage(config_file="{}/.coveragerc".format(os.path.dirname(__file__)),
- data_suffix=True)
+ __cov = coverage.Coverage(config_file="{}/.coveragerc".format
+ (os.path.dirname(__file__)),data_suffix=True)
__cov.start()
cherrypy.engine.subscribe('after_request', __cov.save)