(r2, ['baz']),
],
)
- assert r1.run.called_once_with(args=['test'])
- assert r2.run.called_once_with(args=['test'])
+ r1.run.assert_called_once_with(args=['test'])
+ r2.run.assert_called_once_with(args=['test'])
got = c.run(args=['test'])
assert len(got) == 2
assert got, [ret1 == ret2]
rem = remote.Remote(name='jdoe@xyzzy.example.com', ssh=self.m_ssh)
rem._runner = m_run
result = rem.run(args=args)
- assert m_transport.getpeername.called_once_with()
- assert m_run.called_once_with(args=args)
+ m_transport.getpeername.assert_called_once_with()
+ m_run.assert_called_once_with(args=args)
assert result is proc
assert result.remote is rem
m_run.return_value = proc
r = remote.Remote(name='jdoe@xyzzy.example.com', ssh=self.m_ssh)
r._runner = m_run
- assert m_transport.getpeername.called_once_with()
- assert proc._stdout_buf.channel.recv_exit_status.called_once_with()
- assert m_run.called_once_with(
+ m_transport.getpeername.assert_called_once_with()
+ proc._stdout_buf.channel.recv_exit_status.assert_called_once_with()
+ m_run.assert_called_once_with(
client=self.m_ssh,
args=args,
stdout=BytesIO(),
obj.provider.driver.create_volume.side_effect = Exception
with patch.object(obj, '_destroy_volumes'):
assert obj.create() is False
- assert obj._destroy_volumes.called_once_with()
+ obj._destroy_volumes.assert_called_once_with()
def test_update_dns(self):
config.nsupdate_url = 'nsupdate_url'
assert result is True
else:
for node in nodes:
- assert node.destroy.called_once_with()
+ node.destroy.assert_called_once_with()
_volume_matrix = (
'count, size, should_succeed',
obj.create()
else:
obj.create()
- assert local_mocks['get_host_data'].called_once_with()
- assert local_mocks['set_image'].called_once_with(host_id)
- assert local_mocks['schedule_deploy_task']\
- .called_once_with(host_id)
- assert local_mocks['wait_for_deploy_task'].called_once_with()
+ local_mocks['get_host_data'].assert_called_once_with()
+ local_mocks['set_image'].assert_called_once_with(host_id)
+ local_mocks['schedule_deploy_task'].assert_called_once_with(host_id)
+ local_mocks['wait_for_deploy_task'].assert_called_once_with()
if success:
- assert local_mocks['_wait_for_ready'].called_once_with()
- assert local_mocks['_fix_hostname'].called_once_with()
+ local_mocks['_wait_for_ready'].assert_called_once_with()
+ local_mocks['_fix_hostname'].assert_called_once_with()
else:
assert len(local_mocks['cancel_deploy_task'].call_args_list) == 1
- assert self.mocks['m_Remote_console']\
- .return_value.power_off.called_once_with()
- assert self.mocks['m_Remote_console']\
- .return_value.power_on.called_once_with()
+ self.mocks['m_Remote_console'].return_value.power_off.assert_called_once_with()
+ self.mocks['m_Remote_console'].return_value.power_on.assert_called_once_with()
def test_do_request(self):
obj = self.klass('name.fqdn', 'type', '1.0')
) as local_mocks:
local_mocks['get_image_data'].return_value = dict(id='13')
obj.set_image(host_id)
- assert local_mocks['do_request'].called_once_with(
+ local_mocks['do_request'].assert_called_once_with(
'/host/999', 'put', '{"imageID": "13"}',
)
local_mocks['get_deploy_tasks'].return_value = host_tasks
obj = self.klass('name.fqdn', 'type', '1.0')
result = obj.schedule_deploy_task(host_id)
- assert local_mocks['get_deploy_tasks'].called_once_with()
+ local_mocks['get_deploy_tasks'].assert_called_once_with()
assert len(self.mocks['m_requests_Session_send'].call_args_list) == 3
assert result == task_id
do_request=DEFAULT,
) as local_mocks:
obj.cancel_deploy_task(10)
- assert local_mocks['do_request'].called_once_with(
+ local_mocks['do_request'].assert_called_once_with(
'/task/cancel',
method='DELETE',
data='{"id": 10}',
task.inventory = 'fake'
with patch.object(ansible.shutil, 'rmtree') as m_rmtree:
task.teardown()
- assert m_rmtree.called_once_with('fake')
+ m_rmtree.assert_called_once_with('fake')
def test_teardown_playbook(self):
self.task_config.update(dict(
task.playbook_file.name = 'fake'
with patch.object(ansible.os, 'remove') as m_remove:
task.teardown()
- assert m_remove.called_once_with('fake')
+ m_remove.assert_called_once_with('fake')
def test_teardown_cleanup_with_vars(self):
self.task_config.update(dict(
for remote in task.cluster.remotes.keys():
dest_path = os.path.join(
self.ctx.archive, '%s.log' % remote.shortname)
- assert remote.console.spawn_sol_log.called_once_with(
- dest_path=dest_path)
+ remote.console.spawn_sol_log.assert_called_once_with(
+ dest_path=dest_path)
@patch('teuthology.orchestra.console.PhysicalConsole')
def test_end(self, m_pconsole):
with self.klass(self.ctx, self.task_config) as task:
pass
for proc in task.processes.values():
- assert proc.terminate.called_once_with()
- assert proc.kill.called_once_with()
+ proc.terminate.assert_called_once_with()
+ proc.kill.assert_called_once_with()
config['job_id'],
)
assert got_config['teuthology_branch'] == 'main'
- assert m_fetch_teuthology.called_once_with_args(branch='main')
+ m_fetch_teuthology.assert_called_once_with_args(branch='main')
assert teuth_bin_path == '/teuth/path/virtualenv/bin'
- assert m_fetch_qa_suite.called_once_with_args(branch='main')
+ m_fetch_qa_suite.assert_called_once_with_args(branch='main')
assert got_config['suite_path'] == '/suite/path'
def build_fake_jobs(self, m_connection, m_job, job_bodies):