-import beanstalkc
import os
from unittest.mock import patch, Mock, MagicMock
assert m_fetch_qa_suite.called_once_with_args(branch='main')
assert got_config['suite_path'] == '/suite/path'
- def build_fake_jobs(self, m_connection, m_job, job_bodies):
+ def build_fake_jobs(self, job_bodies):
"""
- Given patched copies of:
- beanstalkc.Connection
- beanstalkc.Job
And a list of basic job bodies, return a list of mocked Job objects
"""
- # Make sure instantiating m_job returns a new object each time
- m_job.side_effect = lambda **kwargs: Mock(spec=beanstalkc.Job)
jobs = []
job_id = 0
for job_body in job_bodies:
job_id += 1
- job = m_job(conn=m_connection, jid=job_id, body=job_body)
- job.jid = job_id
- job.body = job_body
+ job = {}
+ job['job_id'] = job_id
+ job['body'] = job_body
jobs.append(job)
return jobs
- @patch("teuthology.worker.run_job")
- @patch("teuthology.worker.prep_job")
- @patch("beanstalkc.Job", autospec=True)
- @patch("teuthology.worker.fetch_qa_suite")
- @patch("teuthology.worker.fetch_teuthology")
- @patch("teuthology.worker.beanstalk.watch_tube")
- @patch("teuthology.worker.beanstalk.connect")
- @patch("os.path.isdir", return_value=True)
- @patch("teuthology.worker.setup_log_file")
- def test_main_loop(
- self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
- m_fetch_teuthology, m_fetch_qa_suite, m_job, m_prep_job, m_run_job,
- ):
- m_connection = Mock()
- jobs = self.build_fake_jobs(
- m_connection,
- m_job,
- [
- 'foo: bar',
- 'stop_worker: true',
- ],
- )
- m_connection.reserve.side_effect = jobs
- m_connect.return_value = m_connection
- m_prep_job.return_value = (dict(), '/bin/path')
- worker.main(self.ctx)
- # There should be one reserve call per item in the jobs list
- expected_reserve_calls = [
- dict(timeout=60) for i in range(len(jobs))
- ]
- got_reserve_calls = [
- call[1] for call in m_connection.reserve.call_args_list
- ]
- assert got_reserve_calls == expected_reserve_calls
- for job in jobs:
- job.bury.assert_called_once_with()
- job.delete.assert_called_once_with()
- @patch("teuthology.worker.report.try_push_job_info")
- @patch("teuthology.worker.run_job")
- @patch("beanstalkc.Job", autospec=True)
- @patch("teuthology.worker.fetch_qa_suite")
- @patch("teuthology.worker.fetch_teuthology")
- @patch("teuthology.worker.beanstalk.watch_tube")
- @patch("teuthology.worker.beanstalk.connect")
- @patch("os.path.isdir", return_value=True)
@patch("teuthology.worker.setup_log_file")
+ @patch("os.path.isdir", return_value=True)
+ @patch("teuthology.worker.fetch_teuthology")
+ @patch("teuthology.worker.fetch_qa_suite")
+ @patch("teuthology.worker.run_job")
+ @patch("teuthology.worker.report.try_push_job_info")
+ @patch("teuthology.worker.report.get_queued_job")
+ @patch("teuthology.worker.clean_config")
def test_main_loop_13925(
- self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
- m_fetch_teuthology, m_fetch_qa_suite, m_job, m_run_job,
- m_try_push_job_info,
+ self, m_setup_log_file, m_isdir,
+ m_fetch_teuthology, m_fetch_qa_suite, m_run_job,
+ m_try_push_job_info, m_get_queued_job, m_clean_config
):
- m_connection = Mock()
- jobs = self.build_fake_jobs(
- m_connection,
- m_job,
- [
- 'name: name',
- 'name: name\nstop_worker: true',
- ],
- )
- m_connection.reserve.side_effect = jobs
- m_connect.return_value = m_connection
m_fetch_qa_suite.side_effect = [
'/suite/path',
MaxWhileTries(),
MaxWhileTries(),
]
+ job = {
+ 'job_id': '1',
+ 'description': 'DESC',
+ 'email': 'EMAIL',
+ 'first_in_suite': False,
+ 'last_in_suite': True,
+ 'machine_type': 'test_queue',
+ 'name': 'NAME',
+ 'owner': 'OWNER',
+ 'priority': 99,
+ 'results_timeout': '6',
+ 'verbose': False,
+ 'stop_worker': True
+ }
+ m_get_queued_job.return_value = job
+ m_clean_config.return_value = job
+
+ mock_prep_job_patcher = patch('teuthology.worker.prep_job')
+ mock_prep_job = mock_prep_job_patcher.start()
+ mock_prep_job.return_value = (dict(), '/teuth/bin/path')
+
worker.main(self.ctx)
- assert len(m_run_job.call_args_list) == 0
- assert len(m_try_push_job_info.call_args_list) == len(jobs)
- for i in range(len(jobs)):
- push_call = m_try_push_job_info.call_args_list[i]
- assert push_call[0][1]['status'] == 'dead'
+ mock_prep_job_patcher.stop()
+ assert len(m_run_job.call_args_list) == 1
+ assert len(m_try_push_job_info.call_args_list) == 1
+ assert m_try_push_job_info.called_once_with(job, dict(status='running'))
+