Once the asserts were corrected, these tests started failing.
Fortunately, they didn't seem to be masking any actual bugs.
Signed-off-by: Zack Cerza <zack@redhat.com>
(r2, ['baz']),
],
)
- r1.run.assert_called_once_with(args=['test'])
- r2.run.assert_called_once_with(args=['test'])
got = c.run(args=['test'])
+ r1.run.assert_called_once_with(args=['test'], wait=True)
+ r2.run.assert_called_once_with(args=['test'], wait=True)
assert len(got) == 2
assert got, [ret1 == ret2]
# check identity not equality
rem._runner = m_run
result = rem.run(args=args)
m_transport.getpeername.assert_called_once_with()
- m_run.assert_called_once_with(args=args)
+ m_run_call_kwargs = m_run.call_args_list[0][1]
+ assert m_run_call_kwargs['args'] == args
assert result is proc
assert result.remote is rem
stdout.seek(0)
proc = RemoteProcess(
client=self.m_ssh,
- args='fakey',
+ args=args,
)
proc._stdout_buf = Mock()
proc._stdout_buf.channel = Mock()
m_run.return_value = proc
r = remote.Remote(name='jdoe@xyzzy.example.com', ssh=self.m_ssh)
r._runner = m_run
- m_transport.getpeername.assert_called_once_with()
- proc._stdout_buf.channel.recv_exit_status.assert_called_once_with()
- m_run.assert_called_once_with(
- client=self.m_ssh,
- args=args,
- stdout=BytesIO(),
- name=r.shortname,
- )
assert r.arch == 'test_arch'
+ assert len(m_run.call_args_list) == 1
+ m_run_call_kwargs = m_run.call_args_list[0][1]
+ assert m_run_call_kwargs['client'] == self.m_ssh
+ assert m_run_call_kwargs['name'] == r.shortname
+ assert m_run_call_kwargs['args'] == ' '.join(args)
def test_host_key(self):
m_key = MagicMock()
self.mocks['m_Remote_machine_type'].return_value = 'type1'
obj = self.klass('name.fqdn', 'type', '1.0')
host_id = 99
+ task_id = 1234
with patch.multiple(
'teuthology.provision.fog.FOG',
get_host_data=DEFAULT,
_fix_hostname=DEFAULT,
) as local_mocks:
local_mocks['get_host_data'].return_value = dict(id=host_id)
+ local_mocks['schedule_deploy_task'].return_value = task_id
if not success:
local_mocks['wait_for_deploy_task'].side_effect = RuntimeError
with raises(RuntimeError):
local_mocks['get_host_data'].assert_called_once_with()
local_mocks['set_image'].assert_called_once_with(host_id)
local_mocks['schedule_deploy_task'].assert_called_once_with(host_id)
- local_mocks['wait_for_deploy_task'].assert_called_once_with()
+ local_mocks['wait_for_deploy_task'].assert_called_once_with(task_id)
if success:
local_mocks['_wait_for_ready'].assert_called_once_with()
local_mocks['_fix_hostname'].assert_called_once_with()
local_mocks['get_image_data'].return_value = dict(id='13')
obj.set_image(host_id)
local_mocks['do_request'].assert_called_once_with(
- '/host/999', 'put', '{"imageID": "13"}',
+ '/host/999', method='PUT', data='{"imageID": 13}',
)
def test_schedule_deploy_task(self):
local_mocks['get_deploy_tasks'].return_value = host_tasks
obj = self.klass('name.fqdn', 'type', '1.0')
result = obj.schedule_deploy_task(host_id)
- local_mocks['get_deploy_tasks'].assert_called_once_with()
+ assert len(local_mocks['get_deploy_tasks'].call_args_list) == 2
assert len(self.mocks['m_requests_Session_send'].call_args_list) == 3
assert result == task_id
in_progress = deepcopy(results)
assert 0 == suite.wait('name', 1, 'http://UPLOAD_URL')
- assert m_get_jobs.called_with('name', fields=['job_id', 'status'])
+ m_get_jobs.assert_any_call('name', fields=['job_id', 'status'])
assert 0 == len(in_progress)
assert 'fail http://UPLOAD_URL/name/2' in caplog.text
+ m_get_jobs.reset_mock()
in_progress = deepcopy(results)
assert 0 == suite.wait('name', 1, None)
- assert m_get_jobs.called_with('name', fields=['job_id', 'status'])
+ m_get_jobs.assert_any_call('name', fields=['job_id', 'status'])
assert 0 == len(in_progress)
assert 'fail http://URL2' in caplog.text
from textwrap import dedent
from mock import patch, MagicMock
-from unittest import TestCase
from teuthology.suite import build_matrix
from teuthology.suite.merge import config_merge
log = logging.getLogger(__name__)
-class TestMerge(TestCase):
+class TestMerge:
patchpoints = [
'os.path.exists',
'os.listdir',
]
def setup_method(self):
- log.debug("setUp")
self.mocks = dict()
self.patchers = dict()
for ppoint in self.__class__.patchpoints:
self.start_patchers(fake_fs)
try:
result = build_matrix.build_matrix('d0_0')
- self.assertEqual(len(result), 1)
+ assert 1 == len(result)
configs = list(config_merge(result))
- self.assertEqual(len(configs), 1)
+ assert 1 == len(configs)
desc, frags, yaml = configs[0]
- self.assertIn("top", yaml)
- self.assertNotIn("foo", yaml)
+ assert "top" in yaml
+ assert "foo" not in yaml
finally:
self.stop_patchers()
self.start_patchers(fake_fs)
try:
result = build_matrix.build_matrix('d0_0')
- self.assertEqual(len(result), 2)
+ assert 2 == len(result)
configs = list(config_merge(result))
- self.assertEqual(len(configs), 1)
+ assert 1 == len(configs)
desc, frags, yaml = configs[0]
- self.assertIn("top", yaml)
- self.assertIn("baz", yaml)
- self.assertNotIn("foo", yaml)
+ assert "top" in yaml
+ assert "baz" in yaml
+ assert "foo" not in yaml
finally:
self.stop_patchers()
self.start_patchers(fake_fs)
try:
result = build_matrix.build_matrix('d0_0')
- self.assertEqual(len(result), 2)
+ assert 2 == len(result)
configs = list(config_merge(result))
- self.assertEqual(len(configs), 1)
+ assert 1 == len(configs)
desc, frags, yaml = configs[0]
- self.assertIn("top", yaml)
- self.assertIn("baz", yaml)
- self.assertNotIn("foo", yaml)
+ assert "top" in yaml
+ assert "baz" in yaml
+ assert "foo" not in yaml
finally:
self.stop_patchers()
self.start_patchers(fake_fs)
try:
result = build_matrix.build_matrix('d0_0')
- self.assertEqual(len(result), 1)
+ assert 1 == len(result)
configs = list(config_merge(result))
- self.assertEqual(len(configs), 1)
+ assert 1 == len(configs)
desc, frags, yaml = configs[0]
- self.assertIn("test", yaml)
- self.assertDictEqual(yaml["test"], {})
+ assert "test" in yaml
+ assert {} == yaml["test"]
finally:
self.stop_patchers()
teuthology:
postmerge:
- |
+ log.debug(_ENV)
log.debug("_ENV contains:")
for k,v in pairs(_ENV) do
log.debug("_ENV['%s'] = %s", tostring(k), tostring(v))
self.start_patchers(fake_fs)
try:
result = build_matrix.build_matrix('d0_0')
- self.assertEqual(len(result), 1)
+ assert 1 == len(result)
configs = list(config_merge(result))
- self.assertEqual(len(configs), 1)
+ assert 1 == len(configs)
finally:
self.stop_patchers()
def test_begin(self, m_pconsole):
with self.klass(self.ctx, self.task_config) as task:
assert len(task.processes) == len(self.ctx.cluster.remotes)
+ expected_log_paths = []
for remote in task.cluster.remotes.keys():
- dest_path = os.path.join(
- self.ctx.archive, '%s.log' % remote.shortname)
- remote.console.spawn_sol_log.assert_called_once_with(
- dest_path=dest_path)
+ expected_log_paths.append(
+ os.path.join(self.ctx.archive, 'console_logs', '%s.log' % remote.shortname)
+ )
+ assert len(m_pconsole().spawn_sol_log.call_args_list) == len(task.cluster.remotes)
+ got_log_paths = [c[0][0] for c in m_pconsole().spawn_sol_log.call_args_list]
+ assert got_log_paths == expected_log_paths
@patch('teuthology.orchestra.console.PhysicalConsole')
def test_end(self, m_pconsole):
- with self.klass(self.ctx, self.task_config) as task:
+ m_proc = m_pconsole().spawn_sol_log.return_value
+ m_proc.poll.return_value = None
+ with self.klass(self.ctx, self.task_config):
pass
- for proc in task.processes.values():
- proc.terminate.assert_called_once_with()
- proc.kill.assert_called_once_with()
+ assert len(m_proc.terminate.call_args_list) == len(self.ctx.cluster.remotes)
+ assert len(m_proc.kill.call_args_list) == len(self.ctx.cluster.remotes)
-import beanstalkc
import os
from unittest.mock import patch, Mock, MagicMock
config = dict(
name="the_name",
job_id="1",
+ suite_sha1="suite_hash",
)
archive_dir = '/archive/dir'
log_file_path = '/worker/log'
m_fetch_teuthology.return_value = '/teuth/path'
m_fetch_qa_suite.return_value = '/suite/path'
+ m_ls_remote.return_value = 'teuth_hash'
m_isdir.return_value = True
m_teuth_config.teuthology_path = None
got_config, teuth_bin_path = worker.prep_job(
config['job_id'],
)
assert got_config['teuthology_branch'] == 'main'
- m_fetch_teuthology.assert_called_once_with_args(branch='main')
+ m_fetch_teuthology.assert_called_once_with(branch='main', commit='teuth_hash')
assert teuth_bin_path == '/teuth/path/virtualenv/bin'
- m_fetch_qa_suite.assert_called_once_with_args(branch='main')
+ m_fetch_qa_suite.assert_called_once_with('main', 'suite_hash')
assert got_config['suite_path'] == '/suite/path'
def build_fake_jobs(self, m_connection, m_job, job_bodies):
And a list of basic job bodies, return a list of mocked Job objects
"""
# Make sure instantiating m_job returns a new object each time
- m_job.side_effect = lambda **kwargs: Mock(spec=beanstalkc.Job)
jobs = []
job_id = 0
for job_body in job_bodies:
job_id += 1
- job = m_job(conn=m_connection, jid=job_id, body=job_body)
+ job = MagicMock(conn=m_connection, jid=job_id, body=job_body)
job.jid = job_id
job.body = job_body
jobs.append(job)