HostSpec, OrchestratorError, DaemonDescriptionStatus, OrchestratorEvent
from tests import mock
from .fixtures import wait, _run_cephadm, match_glob, with_host, \
- with_cephadm_module, with_service, _deploy_cephadm_binary, make_daemons_running
+ with_cephadm_module, with_service, _deploy_cephadm_binary, make_daemons_running, async_side_effect
from cephadm.module import CephadmOrchestrator
"""
if ceph_volume_lvm_list:
_run_cephadm.side_effect = ceph_volume_lvm_list
else:
- def _ceph_volume_list(s, host, entity, cmd, **kwargs):
+ async def _ceph_volume_list(s, host, entity, cmd, **kwargs):
logging.info(f'ceph-volume cmd: {cmd}')
if 'raw' in cmd:
return json.dumps({
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_daemon_check_extra_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
- _run_cephadm.return_value = ('{}', '', 0)
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test'):
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
- _run_cephadm.return_value = ('{}', '', 0)
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test'):
spec = DriveGroupSpec(
cephadm_module.cache.update_host_devices('test', inventory.devices)
- _run_cephadm.return_value = (['{}'], '', 0)
+ _run_cephadm.side_effect = async_side_effect((['{}'], '', 0))
assert CephadmServe(cephadm_module)._apply_all_services() is False
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_apply_osd_save_non_collocated(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
- _run_cephadm.return_value = ('{}', '', 0)
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test'):
spec = DriveGroupSpec(
cephadm_module.cache.update_host_devices('test', inventory.devices)
- _run_cephadm.return_value = (['{}'], '', 0)
+ _run_cephadm.side_effect = async_side_effect((['{}'], '', 0))
assert CephadmServe(cephadm_module)._apply_all_services() is False
disks_found = [
'[{"data": "/dev/vdb", "data_size": "50.00 GB", "encryption": "None"}, {"data": "/dev/vdc", "data_size": "50.00 GB", "encryption": "None"}]']
d_to_cv.return_value = 'foo'
- _run_cv_cmd.return_value = (disks_found, '', 0)
+ _run_cv_cmd.side_effect = async_side_effect((disks_found, '', 0))
preview = cephadm_module.osd_service.generate_previews([dg], 'test')
for osd in preview:
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_remove_duplicate_osds(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
- _run_cephadm.return_value = ('{}', '', 0)
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'host1'):
with with_host(cephadm_module, 'host2'):
with with_osd_daemon(cephadm_module, _run_cephadm, 'host1', 1) as dd1: # type: DaemonDescription
@mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())
@mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock())
def test_daemon_add_fail(self, _run_cephadm, entity, success, spec, cephadm_module):
- _run_cephadm.return_value = '{}', '', 0
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test'):
with with_service(cephadm_module, spec):
_run_cephadm.side_effect = OrchestratorError('fail')
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_daemon_place_fail_health_warning(self, _run_cephadm, cephadm_module):
- _run_cephadm.return_value = ('{}', '', 0)
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test'):
_run_cephadm.side_effect = OrchestratorError('fail')
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_apply_spec_fail_health_warning(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
- _run_cephadm.return_value = ('{}', '', 0)
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test'):
CephadmServe(cephadm_module)._apply_all_services()
ps = PlacementSpec(hosts=['fail'], count=1)
@mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option")
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_invalid_config_option_health_warning(self, _run_cephadm, get_foreign_ceph_option, cephadm_module: CephadmOrchestrator):
- _run_cephadm.return_value = ('{}', '', 0)
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
get_foreign_ceph_option.side_effect = KeyError
)
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_blink_device_light(self, _run_cephadm, on_bool, fault_ident, cephadm_module):
- _run_cephadm.return_value = '{}', '', 0
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test'):
c = cephadm_module.blink_device_light(fault_ident, on_bool, [('test', '', 'dev')])
on_off = 'on' if on_bool else 'off'
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_blink_device_light_custom(self, _run_cephadm, cephadm_module):
- _run_cephadm.return_value = '{}', '', 0
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test'):
cephadm_module.set_store('blink_device_light_cmd', 'echo hello')
c = cephadm_module.blink_device_light('ident', True, [('test', '', '/dev/sda')])
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_blink_device_light_custom_per_host(self, _run_cephadm, cephadm_module):
- _run_cephadm.return_value = '{}', '', 0
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'mgr0'):
cephadm_module.set_store('mgr0/blink_device_light_cmd',
'xyz --foo --{{ ident_fault }}={{\'on\' if on else \'off\'}} \'{{ path or dev }}\'')
@mock.patch("cephadm.module.HostCache.get_hosts")
def test_maintenance_enter_success(self, _hosts, _get_daemon_types, _host_ok, _run_cephadm, cephadm_module: CephadmOrchestrator):
hostname = 'host1'
- _run_cephadm.return_value = [''], ['something\nsuccess - systemd target xxx disabled'], 0
+ _run_cephadm.side_effect = async_side_effect(
+ ([''], ['something\nsuccess - systemd target xxx disabled'], 0))
_host_ok.return_value = 0, 'it is okay'
_get_daemon_types.return_value = ['crash']
_hosts.return_value = [hostname, 'other_host']
@mock.patch("cephadm.module.HostCache.get_hosts")
def test_maintenance_enter_failure(self, _hosts, _get_daemon_types, _host_ok, _run_cephadm, cephadm_module: CephadmOrchestrator):
hostname = 'host1'
- _run_cephadm.return_value = [''], ['something\nfailed - disable the target'], 0
+ _run_cephadm.side_effect = async_side_effect(
+ ([''], ['something\nfailed - disable the target'], 0))
_host_ok.return_value = 0, 'it is okay'
_get_daemon_types.return_value = ['crash']
_hosts.return_value = [hostname, 'other_host']
@mock.patch("cephadm.module.HostCache.get_hosts")
def test_maintenance_exit_success(self, _hosts, _get_daemon_types, _run_cephadm, cephadm_module: CephadmOrchestrator):
hostname = 'host1'
- _run_cephadm.return_value = [''], [
- 'something\nsuccess - systemd target xxx enabled and started'], 0
+ _run_cephadm.side_effect = async_side_effect(([''], [
+ 'something\nsuccess - systemd target xxx enabled and started'], 0))
_get_daemon_types.return_value = ['crash']
_hosts.return_value = [hostname, 'other_host']
cephadm_module.inventory.add_host(HostSpec(hostname, status='maintenance'))
@mock.patch("cephadm.module.HostCache.get_hosts")
def test_maintenance_exit_failure(self, _hosts, _get_daemon_types, _run_cephadm, cephadm_module: CephadmOrchestrator):
hostname = 'host1'
- _run_cephadm.return_value = [''], ['something\nfailed - unable to enable the target'], 0
+ _run_cephadm.side_effect = async_side_effect(
+ ([''], ['something\nfailed - unable to enable the target'], 0))
_get_daemon_types.return_value = ['crash']
_hosts.return_value = [hostname, 'other_host']
cephadm_module.inventory.add_host(HostSpec(hostname, status='maintenance'))
@mock.patch("cephadm.ssh.SSHManager._check_execute_command")
@mock.patch("cephadm.ssh.SSHManager._write_remote_file")
def test_etc_ceph(self, _write_file, check_execute_command, execute_command, remote_connection, cephadm_module):
- _write_file.return_value = None
- check_execute_command.return_value = ''
- execute_command.return_value = '{}', '', 0
- remote_connection.return_value = mock.Mock()
+ _write_file.side_effect = async_side_effect(None)
+ check_execute_command.side_effect = async_side_effect('')
+ execute_command.side_effect = async_side_effect(('{}', '', 0))
+ remote_connection.side_effect = async_side_effect(mock.Mock())
assert cephadm_module.manage_etc_ceph_ceph_conf is False
assert cephadm_module.get_module_option('registry_username') == username
assert cephadm_module.get_module_option('registry_password') == password
- _run_cephadm.return_value = '{}', '', 0
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test'):
# test successful login with valid args
code, out, err = cephadm_module.registry_login('test-url', 'test-user', 'test-password')
check_registry_credentials('json-url', 'json-user', 'json-pass')
# test bad login where args are valid but login command fails
- _run_cephadm.return_value = '{}', 'error', 1
+ _run_cephadm.side_effect = async_side_effect(('{}', 'error', 1))
code, out, err = cephadm_module.registry_login('fail-url', 'fail-user', 'fail-password')
assert err == 'Host test failed to login to fail-url as fail-user with given password'
check_registry_credentials('json-url', 'json-user', 'json-pass')
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_ceph_volume_no_filter_for_batch(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
- _run_cephadm.return_value = ('{}', '', 0)
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
error_message = """cephadm exited with an error code: 1, stderr:/usr/bin/podman:stderr usage: ceph-volume inventory [-h] [--format {plain,json,json-pretty}] [path]/usr/bin/podman:stderr ceph-volume inventory: error: unrecognized arguments: --filter-for-batch
Traceback (most recent call last):
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_osd_activate_datadevice(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
- _run_cephadm.return_value = ('{}', '', 0)
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test', refresh_hosts=False):
with with_osd_daemon(cephadm_module, _run_cephadm, 'test', 1):
pass
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_osd_activate_datadevice_fail(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
- _run_cephadm.return_value = ('{}', '', 0)
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test', refresh_hosts=False):
cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
'osds': [
'type': 'data'
}]
}
- _run_cephadm.reset_mock(return_value=True)
+ _run_cephadm.reset_mock(return_value=True, side_effect=True)
- def _r_c(*args, **kwargs):
+ async def _r_c(*args, **kwargs):
if 'ceph-volume' in args:
return (json.dumps(ceph_volume_lvm_list), '', 0)
else:
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_osd_activate_datadevice_dbdevice(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
- _run_cephadm.return_value = ('{}', '', 0)
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test', refresh_hosts=False):
- def _ceph_volume_list(s, host, entity, cmd, **kwargs):
+ async def _ceph_volume_list(s, host, entity, cmd, **kwargs):
logging.info(f'ceph-volume cmd: {cmd}')
if 'raw' in cmd:
return json.dumps({