]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
re-add worker worker-supervisor-testing
authorZack Cerza <zack@redhat.com>
Fri, 12 Jul 2024 21:08:05 +0000 (15:08 -0600)
committerZack Cerza <zack@redhat.com>
Wed, 17 Jul 2024 15:57:31 +0000 (09:57 -0600)
scripts/test/test_worker.py [new file with mode: 0644]
scripts/worker.py [new file with mode: 0644]
teuthology/test/test_worker.py [new file with mode: 0644]
teuthology/worker.py [new file with mode: 0644]

diff --git a/scripts/test/test_worker.py b/scripts/test/test_worker.py
new file mode 100644 (file)
index 0000000..8e76c43
--- /dev/null
@@ -0,0 +1,5 @@
+from script import Script
+
+
+class TestWorker(Script):
+    script_name = 'teuthology-worker'
diff --git a/scripts/worker.py b/scripts/worker.py
new file mode 100644 (file)
index 0000000..a3e12c2
--- /dev/null
@@ -0,0 +1,37 @@
+import argparse
+
+import teuthology.worker
+
+
+def main():
+    teuthology.worker.main(parse_args())
+
+
+def parse_args():
+    parser = argparse.ArgumentParser(description="""
+Grab jobs from a beanstalk queue and run the teuthology tests they
+describe. One job is run at a time.
+""")
+    parser.add_argument(
+        '-v', '--verbose',
+        action='store_true', default=None,
+        help='be more verbose',
+    )
+    parser.add_argument(
+        '--archive-dir',
+        metavar='DIR',
+        help='path under which to archive results',
+        required=True,
+    )
+    parser.add_argument(
+        '-l', '--log-dir',
+        help='path in which to store logs',
+        required=True,
+    )
+    parser.add_argument(
+        '-t', '--tube',
+        help='which beanstalk tube to read jobs from',
+        required=True,
+    )
+
+    return parser.parse_args()
diff --git a/teuthology/test/test_worker.py b/teuthology/test/test_worker.py
new file mode 100644 (file)
index 0000000..f200faf
--- /dev/null
@@ -0,0 +1,133 @@
+from unittest.mock import patch, Mock, MagicMock
+from datetime import datetime, timedelta
+
+from teuthology import worker
+
+from teuthology.contextutil import MaxWhileTries
+
+
+class TestWorker(object):
+    def setup_method(self):
+        self.ctx = Mock()
+        self.ctx.verbose = True
+        self.ctx.archive_dir = '/archive/dir'
+        self.ctx.log_dir = '/log/dir'
+        self.ctx.tube = 'tube'
+
+    @patch("os.path.exists")
+    def test_restart_file_path_doesnt_exist(self, m_exists):
+        m_exists.return_value = False
+        result = worker.sentinel(worker.restart_file_path)
+        assert not result
+
+    @patch("os.path.getmtime")
+    @patch("os.path.exists")
+    @patch("teuthology.dispatcher.datetime")
+    def test_needs_restart(self, m_datetime, m_exists, m_getmtime):
+        m_exists.return_value = True
+        m_datetime.utcfromtimestamp.return_value = datetime.utcnow() + timedelta(days=1)
+        result = worker.sentinel(worker.restart_file_path)
+        assert result
+
+    @patch("os.path.getmtime")
+    @patch("os.path.exists")
+    @patch("teuthology.worker.datetime")
+    def test_does_not_need_restart(self, m_datetime, m_exists, getmtime):
+        m_exists.return_value = True
+        m_datetime.utcfromtimestamp.return_value = datetime.utcnow() - timedelta(days=1)
+        result = worker.sentinel(worker.restart_file_path)
+        assert not result
+
+    def build_fake_jobs(self, m_connection, m_job, job_bodies):
+        """
+        Given patched copies of:
+            beanstalkc.Connection
+            beanstalkc.Job
+        And a list of basic job bodies, return a list of mocked Job objects
+        """
+        # Make sure instantiating m_job returns a new object each time
+        jobs = []
+        job_id = 0
+        for job_body in job_bodies:
+            job_id += 1
+            job = MagicMock(conn=m_connection, jid=job_id, body=job_body)
+            job.jid = job_id
+            job.body = job_body
+            jobs.append(job)
+        return jobs
+
+    @patch("teuthology.worker.run_job")
+    @patch("teuthology.worker.prep_job")
+    @patch("beanstalkc.Job", autospec=True)
+    @patch("teuthology.repo_utils.fetch_qa_suite")
+    @patch("teuthology.repo_utils.fetch_teuthology")
+    @patch("teuthology.worker.beanstalk.watch_tube")
+    @patch("teuthology.worker.beanstalk.connect")
+    @patch("os.path.isdir", return_value=True)
+    @patch("teuthology.worker.setup_log_file")
+    def test_main_loop(
+        self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
+        m_fetch_teuthology, m_fetch_qa_suite, m_job, m_prep_job, m_run_job,
+                       ):
+        m_connection = Mock()
+        jobs = self.build_fake_jobs(
+            m_connection,
+            m_job,
+            [
+                'foo: bar',
+                'stop_worker: true',
+            ],
+        )
+        m_connection.reserve.side_effect = jobs
+        m_connect.return_value = m_connection
+        m_prep_job.return_value = (dict(), '/bin/path')
+        worker.main(self.ctx)
+        # There should be one reserve call per item in the jobs list
+        expected_reserve_calls = [
+            dict(timeout=60) for i in range(len(jobs))
+        ]
+        got_reserve_calls = [
+            call[1] for call in m_connection.reserve.call_args_list
+        ]
+        assert got_reserve_calls == expected_reserve_calls
+        for job in jobs:
+            job.bury.assert_called_once_with()
+            job.delete.assert_called_once_with()
+
+    @patch("teuthology.repo_utils.ls_remote")
+    @patch("teuthology.dispatcher.supervisor.report.try_push_job_info")
+    @patch("teuthology.worker.run_job")
+    @patch("beanstalkc.Job", autospec=True)
+    @patch("teuthology.repo_utils.fetch_qa_suite")
+    @patch("teuthology.repo_utils.fetch_teuthology")
+    @patch("teuthology.worker.beanstalk.watch_tube")
+    @patch("teuthology.worker.beanstalk.connect")
+    @patch("os.path.isdir", return_value=True)
+    @patch("teuthology.worker.setup_log_file")
+    def test_main_loop_13925(
+        self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
+        m_fetch_teuthology, m_fetch_qa_suite, m_job, m_run_job,
+        m_try_push_job_info, m_ls_remote,
+                       ):
+        m_connection = Mock()
+        jobs = self.build_fake_jobs(
+            m_connection,
+            m_job,
+            [
+                'name: name',
+                'name: name\nstop_worker: true',
+            ],
+        )
+        m_connection.reserve.side_effect = jobs
+        m_connect.return_value = m_connection
+        m_fetch_qa_suite.side_effect = [
+            '/suite/path',
+            MaxWhileTries(),
+            MaxWhileTries(),
+        ]
+        worker.main(self.ctx)
+        assert len(m_run_job.call_args_list) == 0
+        assert len(m_try_push_job_info.call_args_list) == len(jobs)
+        for i in range(len(jobs)):
+            push_call = m_try_push_job_info.call_args_list[i]
+            assert push_call[0][1]['status'] == 'dead'
diff --git a/teuthology/worker.py b/teuthology/worker.py
new file mode 100644 (file)
index 0000000..b61d482
--- /dev/null
@@ -0,0 +1,106 @@
+import datetime
+import logging
+import os
+import yaml
+
+from teuthology import (
+    # non-modules
+    setup_log_file,
+    install_except_hook,
+    # modules
+    beanstalk,
+    repo_utils,
+)
+from teuthology.config import config as teuth_config
+from teuthology.config import set_config_attr
+from teuthology.dispatcher import (
+    load_config,
+    prep_job,
+    restart,
+    restart_file_path,
+    sentinel,
+    stop,
+    stop_file_path,
+)
+from teuthology.dispatcher.supervisor import run_job
+from teuthology.exceptions import SkipJob
+
+log = logging.getLogger(__name__)
+start_time = datetime.datetime.now(datetime.timezone.utc)
+
+def main(ctx):
+    loglevel = logging.INFO
+    if ctx.verbose:
+        loglevel = logging.DEBUG
+    log.setLevel(loglevel)
+
+    log_file_path = os.path.join(ctx.log_dir, 'worker.{tube}.{pid}'.format(
+        pid=os.getpid(), tube=ctx.tube,))
+    setup_log_file(log_file_path)
+
+    install_except_hook()
+
+    load_config(ctx.archive_dir)
+
+    set_config_attr(ctx)
+
+    connection = beanstalk.connect()
+    beanstalk.watch_tube(connection, ctx.tube)
+    result_proc = None
+
+    if teuth_config.teuthology_path is None:
+        repo_utils.fetch_teuthology('main')
+    repo_utils.fetch_qa_suite('main')
+
+    keep_running = True
+    while keep_running:
+        # Check to see if we have a teuthology-results process hanging around
+        # and if so, read its return code so that it can exit.
+        if result_proc is not None and result_proc.poll() is not None:
+            log.debug("teuthology-results exited with code: %s",
+                      result_proc.returncode)
+            result_proc = None
+
+        if sentinel(restart_file_path):
+            restart()
+        elif sentinel(stop_file_path):
+            stop()
+
+        load_config()
+
+        job = connection.reserve(timeout=60)
+        if job is None:
+            continue
+
+        # bury the job so it won't be re-run if it fails
+        job.bury()
+        job_id = job.jid
+        log.info('Reserved job %d', job_id)
+        log.info('Config is: %s', job.body)
+        job_config = yaml.safe_load(job.body)
+        job_config['job_id'] = str(job_id)
+
+        if job_config.get('stop_worker'):
+            keep_running = False
+
+        try:
+            job_config, teuth_bin_path = prep_job(
+                job_config,
+                log_file_path,
+                ctx.archive_dir,
+            )
+            run_job(
+                job_config,
+                teuth_bin_path,
+                ctx.archive_dir,
+                ctx.verbose,
+            )
+        except SkipJob:
+            continue
+
+        # This try/except block is to keep the worker from dying when
+        # beanstalkc throws a SocketError
+        try:
+            job.delete()
+        except Exception:
+            log.exception("Saw exception while trying to delete job")