import argparse
+import pytest
+import subprocess
from unittest.mock import Mock, patch
-from teuthology.orchestra import cluster
-from teuthology.config import config
-from teuthology import misc
-import subprocess
-import pytest
+from teuthology import misc
+from teuthology.config import config
+from teuthology.orchestra import cluster
+from teuthology.orchestra.remote import Remote
class FakeRemote(object):
def test_sh_fail(caplog):
with pytest.raises(subprocess.CalledProcessError) as excinfo:
- misc.sh("/bin/echo -n AB ; /bin/echo C ; exit 111", 2) == "ABC\n"
+ misc.sh("/bin/echo -n AB ; /bin/echo C ; exit 111", 2)
assert excinfo.value.returncode == 111
for record in caplog.records:
if record.levelname == 'ERROR':
'ABC\n' == record.message)
def test_sh_progress(caplog):
- misc.sh("echo AB ; sleep 5 ; /bin/echo C", 2) == "ABC\n"
+ assert misc.sh("echo AB ; sleep 0.1 ; /bin/echo C", 2) == "AB\nC\n"
records = caplog.records
assert ':sh: ' in records[0].message
assert 'AB' == records[1].message
assert 'C' == records[2].message
- #
- # With a sleep 5 between the first and the second message,
- # there must be at least 2 seconds between the log record
- # of the first message and the log record of the second one
- #
- assert (records[2].created - records[1].created) > 2
+ assert records[2].created > records[1].created
def test_wait_until_osds_up():
ctx = argparse.Namespace()
ctx.daemons = Mock()
ctx.daemons.iter_daemons_of_role.return_value = list()
- remote = FakeRemote()
-
- def s(self, **kwargs):
- return 'IGNORED\n{"osds":[{"state":["up"]}]}'
-
- remote.sh = s
+ remote = Mock(spec=Remote)
+ remote.sh.return_value = 'IGNORED\n{"osds":[{"state":["up"]}]}'
ctx.cluster = cluster.Cluster(
remotes=[
(remote, ['osd.0', 'client.1'])
)
with patch.multiple(
misc,
- get_testdir=lambda ctx: "TESTDIR",
+ get_testdir=lambda _: "TESTDIR",
):
misc.wait_until_osds_up(ctx, ctx.cluster, remote)