command: str,
args: List[str],
no_fsid: Optional[bool] = False,
+ error_ok: Optional[bool] = False,
image: Optional[str] = "",
) -> Any:
try:
out, err, code = self._run_cephadm(
- host, entity, command, args, no_fsid=no_fsid, image=image)
+ host, entity, command, args, no_fsid=no_fsid, error_ok=error_ok, image=image)
if code:
raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
except Exception as e:
'--no-auto', '/dev/sdb', '--yes', '--no-systemd'],
env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=foo'], error_ok=True, stdin='{"config": "", "keyring": ""}')
_run_cephadm.assert_any_call(
- 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False)
+ 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False, error_ok=False)
_run_cephadm.assert_any_call(
- 'test', 'osd', 'ceph-volume', ['--', 'raw', 'list', '--format', 'json'], image='', no_fsid=False)
+ 'test', 'osd', 'ceph-volume', ['--', 'raw', 'list', '--format', 'json'], image='', no_fsid=False, error_ok=False)
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_apply_osd_save_non_collocated(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=noncollocated'],
error_ok=True, stdin='{"config": "", "keyring": ""}')
_run_cephadm.assert_any_call(
- 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False)
+ 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False, error_ok=False)
_run_cephadm.assert_any_call(
- 'test', 'osd', 'ceph-volume', ['--', 'raw', 'list', '--format', 'json'], image='', no_fsid=False)
+ 'test', 'osd', 'ceph-volume', ['--', 'raw', 'list', '--format', 'json'], image='', no_fsid=False, error_ok=False)
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")