From 3350def8419607a1c97099cd489a14263af283a5 Mon Sep 17 00:00:00 2001 From: Zack Cerza Date: Tue, 30 Jul 2024 17:53:40 -0600 Subject: [PATCH] Remove worker.py Originally, #1960 was intended to do this, but in some review back-and-forth the commit removing the file was dropped. Let's actually remove the file. Signed-off-by: Zack Cerza --- teuthology/test/test_worker.py | 306 --------------------------- teuthology/worker.py | 372 --------------------------------- 2 files changed, 678 deletions(-) delete mode 100644 teuthology/test/test_worker.py delete mode 100644 teuthology/worker.py diff --git a/teuthology/test/test_worker.py b/teuthology/test/test_worker.py deleted file mode 100644 index 4e4e2f5512..0000000000 --- a/teuthology/test/test_worker.py +++ /dev/null @@ -1,306 +0,0 @@ -import datetime -import os - -from unittest.mock import patch, Mock, MagicMock - -from teuthology import worker - -from teuthology.contextutil import MaxWhileTries - - -class TestWorker(object): - def setup_method(self): - self.ctx = Mock() - self.ctx.verbose = True - self.ctx.archive_dir = '/archive/dir' - self.ctx.log_dir = '/log/dir' - self.ctx.tube = 'tube' - - @patch("os.path.exists") - def test_restart_file_path_doesnt_exist(self, m_exists): - m_exists.return_value = False - result = worker.sentinel(worker.restart_file_path) - assert not result - - @patch("os.path.getmtime") - @patch("os.path.exists") - def test_needs_restart(self, m_exists, m_getmtime): - m_exists.return_value = True - now = datetime.datetime.now(datetime.timezone.utc) - m_getmtime.return_value = (now + datetime.timedelta(days=1)).timestamp() - assert worker.sentinel(worker.restart_file_path) - - @patch("os.path.getmtime") - @patch("os.path.exists") - def test_does_not_need_restart(self, m_exists, m_getmtime): - m_exists.return_value = True - now = datetime.datetime.now(datetime.timezone.utc) - m_getmtime.return_value = (now - datetime.timedelta(days=1)).timestamp() - assert not worker.sentinel(worker.restart_file_path) - - @patch("os.symlink") - def test_symlink_success(self, m_symlink): - worker.symlink_worker_log("path/to/worker.log", "path/to/archive") - m_symlink.assert_called_with("path/to/worker.log", "path/to/archive/worker.log") - - @patch("teuthology.worker.log") - @patch("os.symlink") - def test_symlink_failure(self, m_symlink, m_log): - m_symlink.side_effect = IOError - worker.symlink_worker_log("path/to/worker.log", "path/to/archive") - # actually logs the exception - assert m_log.exception.called - - @patch("teuthology.worker.run_with_watchdog") - @patch("teuthology.worker.teuth_config") - @patch("subprocess.Popen") - @patch("os.environ") - @patch("os.mkdir") - @patch("yaml.safe_dump") - @patch("tempfile.NamedTemporaryFile") - def test_run_job_with_watchdog(self, m_tempfile, m_safe_dump, m_mkdir, - m_environ, m_popen, m_t_config, - m_run_watchdog): - config = { - "suite_path": "suite/path", - "config": {"foo": "bar"}, - "verbose": True, - "owner": "the_owner", - "archive_path": "archive/path", - "name": "the_name", - "description": "the_description", - "job_id": "1", - } - m_tmp = MagicMock() - temp_file = Mock() - temp_file.name = "the_name" - m_tmp.__enter__.return_value = temp_file - m_tempfile.return_value = m_tmp - env = dict(PYTHONPATH="python/path") - m_environ.copy.return_value = env - m_p = Mock() - m_p.returncode = 0 - m_popen.return_value = m_p - m_t_config.results_server = True - worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False) - m_run_watchdog.assert_called_with(m_p, config) - expected_args = [ - 'teuth/bin/path/teuthology', - '-v', - '--lock', - '--block', - '--owner', 'the_owner', - '--archive', 'archive/path', - '--name', 'the_name', - '--description', - 'the_description', - '--', - "the_name" - ] - m_popen.assert_called_with(args=expected_args, env=env) - - @patch("time.sleep") - @patch("teuthology.worker.symlink_worker_log") - @patch("teuthology.worker.teuth_config") - @patch("subprocess.Popen") - @patch("os.environ") - @patch("os.mkdir") - @patch("yaml.safe_dump") - @patch("tempfile.NamedTemporaryFile") - def test_run_job_no_watchdog(self, m_tempfile, m_safe_dump, m_mkdir, - m_environ, m_popen, m_t_config, m_symlink_log, - m_sleep): - config = { - "suite_path": "suite/path", - "config": {"foo": "bar"}, - "verbose": True, - "owner": "the_owner", - "archive_path": "archive/path", - "name": "the_name", - "description": "the_description", - "worker_log": "worker/log.log", - "job_id": "1", - } - m_tmp = MagicMock() - temp_file = Mock() - temp_file.name = "the_name" - m_tmp.__enter__.return_value = temp_file - m_tempfile.return_value = m_tmp - env = dict(PYTHONPATH="python/path") - m_environ.copy.return_value = env - m_p = Mock() - m_p.returncode = 1 - m_popen.return_value = m_p - m_t_config.results_server = False - worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False) - m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"]) - - @patch("teuthology.worker.report.try_push_job_info") - @patch("teuthology.worker.symlink_worker_log") - @patch("time.sleep") - def test_run_with_watchdog_no_reporting(self, m_sleep, m_symlink_log, m_try_push): - config = { - "name": "the_name", - "job_id": "1", - "worker_log": "worker_log", - "archive_path": "archive/path", - "teuthology_branch": "main" - } - process = Mock() - process.poll.return_value = "not None" - worker.run_with_watchdog(process, config) - m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"]) - m_try_push.assert_called_with( - dict(name=config["name"], job_id=config["job_id"]), - dict(status='dead') - ) - - @patch("subprocess.Popen") - @patch("teuthology.worker.symlink_worker_log") - @patch("time.sleep") - @patch("teuthology.worker.report.try_push_job_info") - def test_run_with_watchdog_with_reporting(self, m_tpji, m_sleep, m_symlink_log, m_popen): - config = { - "name": "the_name", - "job_id": "1", - "worker_log": "worker_log", - "archive_path": "archive/path", - "teuthology_branch": "jewel" - } - process = Mock() - process.poll.return_value = "not None" - m_proc = Mock() - m_proc.poll.return_value = "not None" - m_popen.return_value = m_proc - worker.run_with_watchdog(process, config) - m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"]) - - @patch("teuthology.repo_utils.ls_remote") - @patch("os.path.isdir") - @patch("teuthology.repo_utils.fetch_teuthology") - @patch("teuthology.worker.teuth_config") - @patch("teuthology.repo_utils.fetch_qa_suite") - def test_prep_job(self, m_fetch_qa_suite, m_teuth_config, - m_fetch_teuthology, m_isdir, m_ls_remote): - config = dict( - name="the_name", - job_id="1", - suite_sha1="suite_hash", - ) - archive_dir = '/archive/dir' - log_file_path = '/worker/log' - m_fetch_teuthology.return_value = '/teuth/path' - m_fetch_qa_suite.return_value = '/suite/path' - m_ls_remote.return_value = 'teuth_hash' - m_isdir.return_value = True - m_teuth_config.teuthology_path = None - got_config, teuth_bin_path = worker.prep_job( - config, - log_file_path, - archive_dir, - ) - assert got_config['worker_log'] == log_file_path - assert got_config['archive_path'] == os.path.join( - archive_dir, - config['name'], - config['job_id'], - ) - assert got_config['teuthology_branch'] == 'main' - m_fetch_teuthology.assert_called_once_with(branch='main', commit='teuth_hash') - assert teuth_bin_path == '/teuth/path/virtualenv/bin' - m_fetch_qa_suite.assert_called_once_with('main', 'suite_hash') - assert got_config['suite_path'] == '/suite/path' - - def build_fake_jobs(self, m_connection, m_job, job_bodies): - """ - Given patched copies of: - beanstalkc.Connection - beanstalkc.Job - And a list of basic job bodies, return a list of mocked Job objects - """ - # Make sure instantiating m_job returns a new object each time - jobs = [] - job_id = 0 - for job_body in job_bodies: - job_id += 1 - job = MagicMock(conn=m_connection, jid=job_id, body=job_body) - job.jid = job_id - job.body = job_body - jobs.append(job) - return jobs - - @patch("teuthology.worker.run_job") - @patch("teuthology.worker.prep_job") - @patch("beanstalkc.Job", autospec=True) - @patch("teuthology.repo_utils.fetch_qa_suite") - @patch("teuthology.repo_utils.fetch_teuthology") - @patch("teuthology.worker.beanstalk.watch_tube") - @patch("teuthology.worker.beanstalk.connect") - @patch("os.path.isdir", return_value=True) - @patch("teuthology.worker.setup_log_file") - def test_main_loop( - self, m_setup_log_file, m_isdir, m_connect, m_watch_tube, - m_fetch_teuthology, m_fetch_qa_suite, m_job, m_prep_job, m_run_job, - ): - m_connection = Mock() - jobs = self.build_fake_jobs( - m_connection, - m_job, - [ - 'foo: bar', - 'stop_worker: true', - ], - ) - m_connection.reserve.side_effect = jobs - m_connect.return_value = m_connection - m_prep_job.return_value = (dict(), '/bin/path') - worker.main(self.ctx) - # There should be one reserve call per item in the jobs list - expected_reserve_calls = [ - dict(timeout=60) for i in range(len(jobs)) - ] - got_reserve_calls = [ - call[1] for call in m_connection.reserve.call_args_list - ] - assert got_reserve_calls == expected_reserve_calls - for job in jobs: - job.bury.assert_called_once_with() - job.delete.assert_called_once_with() - - @patch("teuthology.repo_utils.ls_remote") - @patch("teuthology.worker.report.try_push_job_info") - @patch("teuthology.worker.run_job") - @patch("beanstalkc.Job", autospec=True) - @patch("teuthology.repo_utils.fetch_qa_suite") - @patch("teuthology.repo_utils.fetch_teuthology") - @patch("teuthology.worker.beanstalk.watch_tube") - @patch("teuthology.worker.beanstalk.connect") - @patch("os.path.isdir", return_value=True) - @patch("teuthology.worker.setup_log_file") - def test_main_loop_13925( - self, m_setup_log_file, m_isdir, m_connect, m_watch_tube, - m_fetch_teuthology, m_fetch_qa_suite, m_job, m_run_job, - m_try_push_job_info, m_ls_remote, - ): - m_connection = Mock() - jobs = self.build_fake_jobs( - m_connection, - m_job, - [ - 'name: name', - 'name: name\nstop_worker: true', - ], - ) - m_connection.reserve.side_effect = jobs - m_connect.return_value = m_connection - m_fetch_qa_suite.side_effect = [ - '/suite/path', - MaxWhileTries(), - MaxWhileTries(), - ] - worker.main(self.ctx) - assert len(m_run_job.call_args_list) == 0 - assert len(m_try_push_job_info.call_args_list) == len(jobs) - for i in range(len(jobs)): - push_call = m_try_push_job_info.call_args_list[i] - assert push_call[0][1]['status'] == 'dead' diff --git a/teuthology/worker.py b/teuthology/worker.py deleted file mode 100644 index b11f887f46..0000000000 --- a/teuthology/worker.py +++ /dev/null @@ -1,372 +0,0 @@ -import datetime -import logging -import os -import subprocess -import sys -import tempfile -import time -import yaml - -from teuthology import ( - # non-modules - setup_log_file, - install_except_hook, - # modules - beanstalk, - kill, - report, - repo_utils, - safepath, -) -from teuthology.config import config as teuth_config -from teuthology.config import set_config_attr -from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries - -log = logging.getLogger(__name__) -start_time = datetime.datetime.now(datetime.timezone.utc) -restart_file_path = '/tmp/teuthology-restart-workers' -stop_file_path = '/tmp/teuthology-stop-workers' - - -def sentinel(path): - if not os.path.exists(path): - return False - file_mtime = datetime.datetime.fromtimestamp( - os.path.getmtime(path), - datetime.timezone.utc, - ) - if file_mtime > start_time: - return True - else: - return False - - -def restart(): - log.info('Restarting...') - args = sys.argv[:] - args.insert(0, sys.executable) - os.execv(sys.executable, args) - - -def stop(): - log.info('Stopping...') - sys.exit(0) - - -def load_config(ctx=None): - teuth_config.load() - if ctx is not None: - if not os.path.isdir(ctx.archive_dir): - sys.exit("{prog}: archive directory must exist: {path}".format( - prog=os.path.basename(sys.argv[0]), - path=ctx.archive_dir, - )) - else: - teuth_config.archive_base = ctx.archive_dir - - -def main(ctx): - loglevel = logging.INFO - if ctx.verbose: - loglevel = logging.DEBUG - log.setLevel(loglevel) - - log_file_path = os.path.join(ctx.log_dir, 'worker.{tube}.{pid}'.format( - pid=os.getpid(), tube=ctx.tube,)) - setup_log_file(log_file_path) - - install_except_hook() - - load_config(ctx=ctx) - - set_config_attr(ctx) - - connection = beanstalk.connect() - beanstalk.watch_tube(connection, ctx.tube) - result_proc = None - - if teuth_config.teuthology_path is None: - repo_utils.fetch_teuthology('main') - repo_utils.fetch_qa_suite('main') - - keep_running = True - while keep_running: - # Check to see if we have a teuthology-results process hanging around - # and if so, read its return code so that it can exit. - if result_proc is not None and result_proc.poll() is not None: - log.debug("teuthology-results exited with code: %s", - result_proc.returncode) - result_proc = None - - if sentinel(restart_file_path): - restart() - elif sentinel(stop_file_path): - stop() - - load_config() - - job = connection.reserve(timeout=60) - if job is None: - continue - - # bury the job so it won't be re-run if it fails - job.bury() - job_id = job.jid - log.info('Reserved job %d', job_id) - log.info('Config is: %s', job.body) - job_config = yaml.safe_load(job.body) - job_config['job_id'] = str(job_id) - - if job_config.get('stop_worker'): - keep_running = False - - try: - job_config, teuth_bin_path = prep_job( - job_config, - log_file_path, - ctx.archive_dir, - ) - run_job( - job_config, - teuth_bin_path, - ctx.archive_dir, - ctx.verbose, - ) - except SkipJob: - continue - - # This try/except block is to keep the worker from dying when - # beanstalkc throws a SocketError - try: - job.delete() - except Exception: - log.exception("Saw exception while trying to delete job") - - -def prep_job(job_config, log_file_path, archive_dir): - job_id = job_config['job_id'] - safe_archive = safepath.munge(job_config['name']) - job_config['worker_log'] = log_file_path - archive_path_full = os.path.join( - archive_dir, safe_archive, str(job_id)) - job_config['archive_path'] = archive_path_full - - # If the teuthology branch was not specified, default to main and - # store that value. - teuthology_branch = job_config.get('teuthology_branch', 'main') - job_config['teuthology_branch'] = teuthology_branch - teuthology_sha1 = job_config.get('teuthology_sha1') - if not teuthology_sha1: - repo_url = repo_utils.build_git_url('teuthology', 'ceph') - try: - teuthology_sha1 = repo_utils.ls_remote(repo_url, teuthology_branch) - except Exception as exc: - log.exception(f"Could not get teuthology sha1 for branch {teuthology_branch}") - report.try_push_job_info( - job_config, - dict(status='dead', failure_reason=str(exc)) - ) - raise SkipJob() - if not teuthology_sha1: - reason = "Teuthology branch {} not found; marking job as dead".format(teuthology_branch) - log.error(reason) - report.try_push_job_info( - job_config, - dict(status='dead', failure_reason=reason) - ) - raise SkipJob() - if teuth_config.teuthology_path is None: - log.info('Using teuthology sha1 %s', teuthology_sha1) - - try: - if teuth_config.teuthology_path is not None: - teuth_path = teuth_config.teuthology_path - else: - teuth_path = repo_utils.fetch_teuthology(branch=teuthology_branch, - commit=teuthology_sha1) - # For the teuthology tasks, we look for suite_branch, and if we - # don't get that, we look for branch, and fall back to 'main'. - # last-in-suite jobs don't have suite_branch or branch set. - ceph_branch = job_config.get('branch', 'main') - suite_branch = job_config.get('suite_branch', ceph_branch) - suite_sha1 = job_config.get('suite_sha1') - suite_repo = job_config.get('suite_repo') - if suite_repo: - teuth_config.ceph_qa_suite_git_url = suite_repo - job_config['suite_path'] = os.path.normpath(os.path.join( - repo_utils.fetch_qa_suite(suite_branch, suite_sha1), - job_config.get('suite_relpath', ''), - )) - except (BranchNotFoundError, CommitNotFoundError) as exc: - log.exception("Requested version not found; marking job as dead") - report.try_push_job_info( - job_config, - dict(status='dead', failure_reason=str(exc)) - ) - raise SkipJob() - except MaxWhileTries as exc: - log.exception("Failed to fetch or bootstrap; marking job as dead") - report.try_push_job_info( - job_config, - dict(status='dead', failure_reason=str(exc)) - ) - raise SkipJob() - - teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin') - if not os.path.isdir(teuth_bin_path): - raise RuntimeError("teuthology branch %s at %s not bootstrapped!" % - (teuthology_branch, teuth_bin_path)) - return job_config, teuth_bin_path - - -def run_job(job_config, teuth_bin_path, archive_dir, verbose): - safe_archive = safepath.munge(job_config['name']) - if job_config.get('first_in_suite') or job_config.get('last_in_suite'): - if teuth_config.results_server: - try: - report.try_delete_jobs(job_config['name'], job_config['job_id']) - except Exception as e: - log.warning("Unable to delete job %s, exception occurred: %s", - job_config['job_id'], e) - suite_archive_dir = os.path.join(archive_dir, safe_archive) - safepath.makedirs('/', suite_archive_dir) - args = [ - os.path.join(teuth_bin_path, 'teuthology-results'), - '--archive-dir', suite_archive_dir, - '--name', job_config['name'], - ] - if job_config.get('first_in_suite'): - log.info('Generating memo for %s', job_config['name']) - if job_config.get('seed'): - args.extend(['--seed', job_config['seed']]) - if job_config.get('subset'): - args.extend(['--subset', job_config['subset']]) - if job_config.get('no_nested_subset'): - args.extend(['--no-nested-subset']) - else: - log.info('Generating results for %s', job_config['name']) - timeout = job_config.get('results_timeout', - teuth_config.results_timeout) - args.extend(['--timeout', str(timeout)]) - if job_config.get('email'): - args.extend(['--email', job_config['email']]) - # Execute teuthology-results, passing 'preexec_fn=os.setpgrp' to - # make sure that it will continue to run if this worker process - # dies (e.g. because of a restart) - result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp) - log.info("teuthology-results PID: %s", result_proc.pid) - return - - log.info('Creating archive dir %s', job_config['archive_path']) - safepath.makedirs('/', job_config['archive_path']) - log.info('Running job %s', job_config['job_id']) - - suite_path = job_config['suite_path'] - arg = [ - os.path.join(teuth_bin_path, 'teuthology'), - ] - # The following is for compatibility with older schedulers, from before we - # started merging the contents of job_config['config'] into job_config - # itself. - if 'config' in job_config: - inner_config = job_config.pop('config') - if not isinstance(inner_config, dict): - log.warning("run_job: job_config['config'] isn't a dict, it's a %s", - str(type(inner_config))) - else: - job_config.update(inner_config) - - if verbose or job_config['verbose']: - arg.append('-v') - - arg.extend([ - '--lock', - '--block', - '--owner', job_config['owner'], - '--archive', job_config['archive_path'], - '--name', job_config['name'], - ]) - if job_config['description'] is not None: - arg.extend(['--description', job_config['description']]) - arg.append('--') - - with tempfile.NamedTemporaryFile(prefix='teuthology-worker.', - suffix='.tmp', mode='w+t') as tmp: - yaml.safe_dump(data=job_config, stream=tmp) - tmp.flush() - arg.append(tmp.name) - env = os.environ.copy() - python_path = env.get('PYTHONPATH', '') - python_path = ':'.join([suite_path, python_path]).strip(':') - env['PYTHONPATH'] = python_path - log.debug("Running: %s" % ' '.join(arg)) - p = subprocess.Popen(args=arg, env=env) - log.info("Job archive: %s", job_config['archive_path']) - log.info("Job PID: %s", str(p.pid)) - - if teuth_config.results_server: - log.info("Running with watchdog") - try: - run_with_watchdog(p, job_config) - except Exception: - log.exception("run_with_watchdog had an unhandled exception") - raise - else: - log.info("Running without watchdog") - # This sleep() is to give the child time to start up and create the - # archive dir. - time.sleep(5) - symlink_worker_log(job_config['worker_log'], - job_config['archive_path']) - p.wait() - - if p.returncode != 0: - log.error('Child exited with code %d', p.returncode) - else: - log.info('Success!') - - -def run_with_watchdog(process, job_config): - job_start_time = datetime.datetime.now(datetime.timezone.utc) - - # Only push the information that's relevant to the watchdog, to save db - # load - job_info = dict( - name=job_config['name'], - job_id=job_config['job_id'], - ) - - # Sleep once outside of the loop to avoid double-posting jobs - time.sleep(teuth_config.watchdog_interval) - symlink_worker_log(job_config['worker_log'], job_config['archive_path']) - while process.poll() is None: - # Kill jobs that have been running longer than the global max - run_time = datetime.datetime.now(datetime.timezone.utc) - job_start_time - total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds - if total_seconds > teuth_config.max_job_time: - log.warning("Job ran longer than {max}s. Killing...".format( - max=teuth_config.max_job_time)) - kill.kill_job(job_info['name'], job_info['job_id'], - teuth_config.archive_base, job_config['owner']) - - # calling this without a status just updates the jobs updated time - report.try_push_job_info(job_info) - time.sleep(teuth_config.watchdog_interval) - - # we no longer support testing theses old branches - assert(job_config.get('teuthology_branch') not in ('argonaut', 'bobtail', - 'cuttlefish', 'dumpling')) - - # Let's make sure that paddles knows the job is finished. We don't know - # the status, but if it was a pass or fail it will have already been - # reported to paddles. In that case paddles ignores the 'dead' status. - # If the job was killed, paddles will use the 'dead' status. - report.try_push_job_info(job_info, dict(status='dead')) - - -def symlink_worker_log(worker_log_path, archive_dir): - try: - log.debug("Worker log: %s", worker_log_path) - os.symlink(worker_log_path, os.path.join(archive_dir, 'worker.log')) - except Exception: - log.exception("Failed to symlink worker log") -- 2.39.5