from .. import worker
+from ..contextutil import MaxWhileTries
+
class TestWorker(object):
def setup(self):
for job in jobs:
job.bury.assert_called_once_with()
job.delete.assert_called_once_with()
+
+ @patch("teuthology.worker.report.try_push_job_info")
+ @patch("teuthology.worker.run_job")
+ @patch("beanstalkc.Job", autospec=True)
+ @patch("teuthology.worker.fetch_qa_suite")
+ @patch("teuthology.worker.fetch_teuthology")
+ @patch("teuthology.worker.beanstalk.watch_tube")
+ @patch("teuthology.worker.beanstalk.connect")
+ @patch("os.path.isdir", return_value=True)
+ @patch("teuthology.worker.setup_log_file")
+ def test_main_loop_13925(
+ self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
+ m_fetch_teuthology, m_fetch_qa_suite, m_job, m_run_job,
+ m_try_push_job_info,
+ ):
+ m_connection = Mock()
+ jobs = self.build_fake_jobs(
+ m_connection,
+ m_job,
+ [
+ 'name: name',
+ 'name: name\nstop_worker: true',
+ ],
+ )
+ m_connection.reserve.side_effect = jobs
+ m_connect.return_value = m_connection
+ m_fetch_qa_suite.side_effect = [
+ '/suite/path',
+ MaxWhileTries(),
+ MaxWhileTries(),
+ ]
+ worker.main(self.ctx)
+ assert len(m_run_job.call_args_list) == 0
+ assert len(m_try_push_job_info.call_args_list) == len(jobs)
+ for i in range(len(jobs)):
+ push_call = m_try_push_job_info.call_args_list[i]
+ assert push_call[0][1]['status'] == 'dead'