From 494b18f7bb49324bdddd540d5b26bf75a6b69d29 Mon Sep 17 00:00:00 2001 From: Zack Cerza Date: Fri, 12 Jul 2024 15:08:05 -0600 Subject: [PATCH] re-add worker --- scripts/test/test_worker.py | 5 ++ scripts/worker.py | 37 +++++++++ teuthology/test/test_worker.py | 133 +++++++++++++++++++++++++++++++++ teuthology/worker.py | 106 ++++++++++++++++++++++++++ 4 files changed, 281 insertions(+) create mode 100644 scripts/test/test_worker.py create mode 100644 scripts/worker.py create mode 100644 teuthology/test/test_worker.py create mode 100644 teuthology/worker.py diff --git a/scripts/test/test_worker.py b/scripts/test/test_worker.py new file mode 100644 index 0000000000..8e76c43a5c --- /dev/null +++ b/scripts/test/test_worker.py @@ -0,0 +1,5 @@ +from script import Script + + +class TestWorker(Script): + script_name = 'teuthology-worker' diff --git a/scripts/worker.py b/scripts/worker.py new file mode 100644 index 0000000000..a3e12c20d7 --- /dev/null +++ b/scripts/worker.py @@ -0,0 +1,37 @@ +import argparse + +import teuthology.worker + + +def main(): + teuthology.worker.main(parse_args()) + + +def parse_args(): + parser = argparse.ArgumentParser(description=""" +Grab jobs from a beanstalk queue and run the teuthology tests they +describe. One job is run at a time. +""") + parser.add_argument( + '-v', '--verbose', + action='store_true', default=None, + help='be more verbose', + ) + parser.add_argument( + '--archive-dir', + metavar='DIR', + help='path under which to archive results', + required=True, + ) + parser.add_argument( + '-l', '--log-dir', + help='path in which to store logs', + required=True, + ) + parser.add_argument( + '-t', '--tube', + help='which beanstalk tube to read jobs from', + required=True, + ) + + return parser.parse_args() diff --git a/teuthology/test/test_worker.py b/teuthology/test/test_worker.py new file mode 100644 index 0000000000..f200faf66a --- /dev/null +++ b/teuthology/test/test_worker.py @@ -0,0 +1,133 @@ +from unittest.mock import patch, Mock, MagicMock +from datetime import datetime, timedelta + +from teuthology import worker + +from teuthology.contextutil import MaxWhileTries + + +class TestWorker(object): + def setup_method(self): + self.ctx = Mock() + self.ctx.verbose = True + self.ctx.archive_dir = '/archive/dir' + self.ctx.log_dir = '/log/dir' + self.ctx.tube = 'tube' + + @patch("os.path.exists") + def test_restart_file_path_doesnt_exist(self, m_exists): + m_exists.return_value = False + result = worker.sentinel(worker.restart_file_path) + assert not result + + @patch("os.path.getmtime") + @patch("os.path.exists") + @patch("teuthology.dispatcher.datetime") + def test_needs_restart(self, m_datetime, m_exists, m_getmtime): + m_exists.return_value = True + m_datetime.utcfromtimestamp.return_value = datetime.utcnow() + timedelta(days=1) + result = worker.sentinel(worker.restart_file_path) + assert result + + @patch("os.path.getmtime") + @patch("os.path.exists") + @patch("teuthology.worker.datetime") + def test_does_not_need_restart(self, m_datetime, m_exists, getmtime): + m_exists.return_value = True + m_datetime.utcfromtimestamp.return_value = datetime.utcnow() - timedelta(days=1) + result = worker.sentinel(worker.restart_file_path) + assert not result + + def build_fake_jobs(self, m_connection, m_job, job_bodies): + """ + Given patched copies of: + beanstalkc.Connection + beanstalkc.Job + And a list of basic job bodies, return a list of mocked Job objects + """ + # Make sure instantiating m_job returns a new object each time + jobs = [] + job_id = 0 + for job_body in job_bodies: + job_id += 1 + job = MagicMock(conn=m_connection, jid=job_id, body=job_body) + job.jid = job_id + job.body = job_body + jobs.append(job) + return jobs + + @patch("teuthology.worker.run_job") + @patch("teuthology.worker.prep_job") + @patch("beanstalkc.Job", autospec=True) + @patch("teuthology.repo_utils.fetch_qa_suite") + @patch("teuthology.repo_utils.fetch_teuthology") + @patch("teuthology.worker.beanstalk.watch_tube") + @patch("teuthology.worker.beanstalk.connect") + @patch("os.path.isdir", return_value=True) + @patch("teuthology.worker.setup_log_file") + def test_main_loop( + self, m_setup_log_file, m_isdir, m_connect, m_watch_tube, + m_fetch_teuthology, m_fetch_qa_suite, m_job, m_prep_job, m_run_job, + ): + m_connection = Mock() + jobs = self.build_fake_jobs( + m_connection, + m_job, + [ + 'foo: bar', + 'stop_worker: true', + ], + ) + m_connection.reserve.side_effect = jobs + m_connect.return_value = m_connection + m_prep_job.return_value = (dict(), '/bin/path') + worker.main(self.ctx) + # There should be one reserve call per item in the jobs list + expected_reserve_calls = [ + dict(timeout=60) for i in range(len(jobs)) + ] + got_reserve_calls = [ + call[1] for call in m_connection.reserve.call_args_list + ] + assert got_reserve_calls == expected_reserve_calls + for job in jobs: + job.bury.assert_called_once_with() + job.delete.assert_called_once_with() + + @patch("teuthology.repo_utils.ls_remote") + @patch("teuthology.dispatcher.supervisor.report.try_push_job_info") + @patch("teuthology.worker.run_job") + @patch("beanstalkc.Job", autospec=True) + @patch("teuthology.repo_utils.fetch_qa_suite") + @patch("teuthology.repo_utils.fetch_teuthology") + @patch("teuthology.worker.beanstalk.watch_tube") + @patch("teuthology.worker.beanstalk.connect") + @patch("os.path.isdir", return_value=True) + @patch("teuthology.worker.setup_log_file") + def test_main_loop_13925( + self, m_setup_log_file, m_isdir, m_connect, m_watch_tube, + m_fetch_teuthology, m_fetch_qa_suite, m_job, m_run_job, + m_try_push_job_info, m_ls_remote, + ): + m_connection = Mock() + jobs = self.build_fake_jobs( + m_connection, + m_job, + [ + 'name: name', + 'name: name\nstop_worker: true', + ], + ) + m_connection.reserve.side_effect = jobs + m_connect.return_value = m_connection + m_fetch_qa_suite.side_effect = [ + '/suite/path', + MaxWhileTries(), + MaxWhileTries(), + ] + worker.main(self.ctx) + assert len(m_run_job.call_args_list) == 0 + assert len(m_try_push_job_info.call_args_list) == len(jobs) + for i in range(len(jobs)): + push_call = m_try_push_job_info.call_args_list[i] + assert push_call[0][1]['status'] == 'dead' diff --git a/teuthology/worker.py b/teuthology/worker.py new file mode 100644 index 0000000000..b61d482193 --- /dev/null +++ b/teuthology/worker.py @@ -0,0 +1,106 @@ +import datetime +import logging +import os +import yaml + +from teuthology import ( + # non-modules + setup_log_file, + install_except_hook, + # modules + beanstalk, + repo_utils, +) +from teuthology.config import config as teuth_config +from teuthology.config import set_config_attr +from teuthology.dispatcher import ( + load_config, + prep_job, + restart, + restart_file_path, + sentinel, + stop, + stop_file_path, +) +from teuthology.dispatcher.supervisor import run_job +from teuthology.exceptions import SkipJob + +log = logging.getLogger(__name__) +start_time = datetime.datetime.now(datetime.timezone.utc) + +def main(ctx): + loglevel = logging.INFO + if ctx.verbose: + loglevel = logging.DEBUG + log.setLevel(loglevel) + + log_file_path = os.path.join(ctx.log_dir, 'worker.{tube}.{pid}'.format( + pid=os.getpid(), tube=ctx.tube,)) + setup_log_file(log_file_path) + + install_except_hook() + + load_config(ctx.archive_dir) + + set_config_attr(ctx) + + connection = beanstalk.connect() + beanstalk.watch_tube(connection, ctx.tube) + result_proc = None + + if teuth_config.teuthology_path is None: + repo_utils.fetch_teuthology('main') + repo_utils.fetch_qa_suite('main') + + keep_running = True + while keep_running: + # Check to see if we have a teuthology-results process hanging around + # and if so, read its return code so that it can exit. + if result_proc is not None and result_proc.poll() is not None: + log.debug("teuthology-results exited with code: %s", + result_proc.returncode) + result_proc = None + + if sentinel(restart_file_path): + restart() + elif sentinel(stop_file_path): + stop() + + load_config() + + job = connection.reserve(timeout=60) + if job is None: + continue + + # bury the job so it won't be re-run if it fails + job.bury() + job_id = job.jid + log.info('Reserved job %d', job_id) + log.info('Config is: %s', job.body) + job_config = yaml.safe_load(job.body) + job_config['job_id'] = str(job_id) + + if job_config.get('stop_worker'): + keep_running = False + + try: + job_config, teuth_bin_path = prep_job( + job_config, + log_file_path, + ctx.archive_dir, + ) + run_job( + job_config, + teuth_bin_path, + ctx.archive_dir, + ctx.verbose, + ) + except SkipJob: + continue + + # This try/except block is to keep the worker from dying when + # beanstalkc throws a SocketError + try: + job.delete() + except Exception: + log.exception("Saw exception while trying to delete job") -- 2.39.5