def plural(count: int) -> str:
return 'daemon' if count == 1 else 'daemons'
- daemon_count = "only" if number_of_running_daemons == 1 else number_of_running_daemons
left_count = "no" if num_daemons_left == 0 else num_daemons_left
out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
Create a new monitor on the given host.
"""
assert self.TYPE == daemon_spec.daemon_type
- name, host, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
+ name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
# get mon. key
ret, keyring, err = self.mgr.check_mon_command({
})
try:
j = json.loads(out)
- except Exception as e:
+ except Exception:
raise OrchestratorError('failed to parse quorum status')
mons = [m['name'] for m in j['monmap']['mons']]
Create a new manager instance on a host.
"""
assert self.TYPE == daemon_spec.daemon_type
- mgr_id, host = daemon_spec.daemon_id, daemon_spec.host
+ mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
# get mgr. key
ret, keyring, err = self.mgr.check_mon_command({
# If this is the case then the dashboard port opened will be only the used
# as default.
ports = []
- config_ports = ''
ret, mgr_services, err = self.mgr.check_mon_command({
'prefix': 'mgr services',
})
def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
assert self.TYPE == daemon_spec.daemon_type
- mds_id, host = daemon_spec.daemon_id, daemon_spec.host
+ mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
# get mgr. key
ret, keyring, err = self.mgr.check_mon_command({
def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
assert self.TYPE == daemon_spec.daemon_type
- rgw_id, host = daemon_spec.daemon_id, daemon_spec.host
+ rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
keyring = self.get_keyring(rgw_id)
try:
j = json.loads(out)
return j.get('realms', [])
- except Exception as e:
+ except Exception:
raise OrchestratorError('failed to parse realm info')
def create_realm() -> None:
'realm', 'create',
'--rgw-realm=%s' % spec.rgw_realm,
'--default']
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # noqa: F841
self.mgr.log.info('created realm: %s' % spec.rgw_realm)
def get_zonegroups() -> List[str]:
try:
j = json.loads(out)
return j.get('zonegroups', [])
- except Exception as e:
+ except Exception:
raise OrchestratorError('failed to parse zonegroup info')
def create_zonegroup() -> None:
'zonegroup', 'create',
'--rgw-zonegroup=default',
'--master', '--default']
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # noqa: F841
self.mgr.log.info('created zonegroup: default')
def create_zonegroup_if_required() -> None:
try:
j = json.loads(out)
return j.get('zones', [])
- except Exception as e:
+ except Exception:
raise OrchestratorError('failed to parse zone info')
def create_zone() -> None:
'--rgw-zonegroup=default',
'--rgw-zone=%s' % spec.rgw_zone,
'--master', '--default']
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # noqa: F841
self.mgr.log.info('created zone: %s' % spec.rgw_zone)
changes = False
'period', 'update',
'--rgw-realm=%s' % spec.rgw_realm,
'--commit']
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # noqa: F841
self.mgr.log.info('updated period')
def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
assert self.TYPE == daemon_spec.daemon_type
- daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+ daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
@mock.patch('cephadm.services.osd.OSD.stop_draining')
def test_stop(self, stop_draining_mock, osd_obj):
- ret = osd_obj.stop()
+ osd_obj.stop()
assert osd_obj.started is False
assert osd_obj.stopped is True
stop_draining_mock.assert_called_once()
@mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop")
def test_is_ok_to_stop(self, _, osd_obj):
- ret = osd_obj.is_ok_to_stop
+ osd_obj.is_ok_to_stop
osd_obj.rm_util.ok_to_stop.assert_called_once()
@pytest.mark.parametrize(
@mock.patch("cephadm.services.osd.RemoveUtil.safe_to_destroy")
def test_safe_to_destroy(self, _, osd_obj):
- ret = osd_obj.safe_to_destroy()
+ osd_obj.safe_to_destroy()
osd_obj.rm_util.safe_to_destroy.assert_called_once()
@mock.patch("cephadm.services.osd.RemoveUtil.set_osd_flag")
def test_down(self, _, osd_obj):
- ret = osd_obj.down()
+ osd_obj.down()
osd_obj.rm_util.set_osd_flag.assert_called_with([osd_obj], 'down')
@mock.patch("cephadm.services.osd.RemoveUtil.destroy_osd")
def test_destroy_osd(self, _, osd_obj):
- ret = osd_obj.destroy()
+ osd_obj.destroy()
osd_obj.rm_util.destroy_osd.assert_called_once()
@mock.patch("cephadm.services.osd.RemoveUtil.purge_osd")
def test_purge(self, _, osd_obj):
- ret = osd_obj.purge()
+ osd_obj.purge()
osd_obj.rm_util.purge_osd.assert_called_once()
@mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count")
def test_pg_count(self, _, osd_obj):
- ret = osd_obj.get_pg_count()
+ osd_obj.get_pg_count()
osd_obj.rm_util.get_pg_count.assert_called_once()
def test_drain_status_human_not_started(self, osd_obj):