From: Zack Cerza Date: Thu, 19 May 2016 16:22:37 +0000 (-0600) Subject: Move job preparation to new prep_job method X-Git-Tag: 1.1.0~609^2~4 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=b80f646ff99e13297c066ab874e561ad029293a2;p=teuthology.git Move job preparation to new prep_job method Signed-off-by: Zack Cerza --- diff --git a/teuthology/test/test_worker.py b/teuthology/test/test_worker.py index 01f4e30e80..4f6d4241b2 100644 --- a/teuthology/test/test_worker.py +++ b/teuthology/test/test_worker.py @@ -1,3 +1,4 @@ +import os import subprocess from mock import patch, Mock, MagicMock @@ -49,10 +50,12 @@ class TestWorker(object): @patch("teuthology.worker.teuth_config") @patch("subprocess.Popen") @patch("os.environ") + @patch("os.mkdir") @patch("yaml.safe_dump") @patch("tempfile.NamedTemporaryFile") - def test_run_job_with_watchdog(self, m_tempfile, m_safe_dump, m_environ, - m_popen, m_t_config, m_run_watchdog): + def test_run_job_with_watchdog(self, m_tempfile, m_safe_dump, m_mkdir, + m_environ, m_popen, m_t_config, + m_run_watchdog): config = { "suite_path": "suite/path", "config": {"foo": "bar"}, @@ -60,7 +63,8 @@ class TestWorker(object): "owner": "the_owner", "archive_path": "archive/path", "name": "the_name", - "description": "the_description" + "description": "the_description", + "job_id": "1", } m_tmp = MagicMock() temp_file = Mock() @@ -73,7 +77,7 @@ class TestWorker(object): m_p.returncode = 0 m_popen.return_value = m_p m_t_config.results_server = True - worker.run_job(config, "teuth/bin/path", verbose=False) + worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False) m_run_watchdog.assert_called_with(m_p, config) expected_args = [ 'teuth/bin/path/teuthology', @@ -95,10 +99,12 @@ class TestWorker(object): @patch("teuthology.worker.teuth_config") @patch("subprocess.Popen") @patch("os.environ") + @patch("os.mkdir") @patch("yaml.safe_dump") @patch("tempfile.NamedTemporaryFile") - def test_run_job_no_watchdog(self, m_tempfile, m_safe_dump, m_environ, - m_popen, m_t_config, m_symlink_log, m_sleep): + def test_run_job_no_watchdog(self, m_tempfile, m_safe_dump, m_mkdir, + m_environ, m_popen, m_t_config, m_symlink_log, + m_sleep): config = { "suite_path": "suite/path", "config": {"foo": "bar"}, @@ -107,7 +113,8 @@ class TestWorker(object): "archive_path": "archive/path", "name": "the_name", "description": "the_description", - "worker_log": "worker/log.log" + "worker_log": "worker/log.log", + "job_id": "1", } m_tmp = MagicMock() temp_file = Mock() @@ -120,7 +127,7 @@ class TestWorker(object): m_p.returncode = 1 m_popen.return_value = m_p m_t_config.results_server = False - worker.run_job(config, "teuth/bin/path", verbose=False) + worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False) m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"]) @patch("teuthology.worker.report.try_push_job_info") @@ -168,3 +175,34 @@ class TestWorker(object): stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) + + @patch("os.path.isdir") + @patch("teuthology.worker.fetch_teuthology") + @patch("teuthology.worker.fetch_qa_suite") + def test_prep_job(self, m_fetch_qa_suite, + m_fetch_teuthology, m_isdir): + config = dict( + name="the_name", + job_id="1", + ) + archive_dir = '/archive/dir' + log_file_path = '/worker/log' + m_fetch_teuthology.return_value = '/teuth/path' + m_fetch_qa_suite.return_value = '/suite/path' + m_isdir.return_value = True + got_config, teuth_bin_path = worker.prep_job( + config, + log_file_path, + archive_dir, + ) + assert got_config['worker_log'] == log_file_path + assert got_config['archive_path'] == os.path.join( + archive_dir, + config['name'], + config['job_id'], + ) + assert got_config['teuthology_branch'] == 'master' + assert m_fetch_teuthology.called_once_with_args(branch='master') + assert teuth_bin_path == '/teuth/path/virtualenv/bin' + assert m_fetch_qa_suite.called_once_with_args(branch='master') + assert got_config['suite_path'] == '/suite/path' diff --git a/teuthology/worker.py b/teuthology/worker.py index b390b36e79..a6982049ba 100644 --- a/teuthology/worker.py +++ b/teuthology/worker.py @@ -14,7 +14,7 @@ from . import report from . import safepath from .config import config as teuth_config from .config import set_config_attr -from .exceptions import BranchNotFoundError +from .exceptions import BranchNotFoundError, SkipJob from .kill import kill_job from .repo_utils import fetch_qa_suite, fetch_teuthology @@ -110,128 +110,92 @@ def main(ctx): if job_config.get('stop_worker'): keep_running = False - safe_archive = safepath.munge(job_config['name']) - job_config['worker_log'] = log_file_path - archive_path_full = os.path.join( - ctx.archive_dir, safe_archive, str(job_id)) - job_config['archive_path'] = archive_path_full - - # If the teuthology branch was not specified, default to master and - # store that value. - teuthology_branch = job_config.get('teuthology_branch', 'master') - job_config['teuthology_branch'] = teuthology_branch - try: - if teuth_config.teuthology_path is not None: - teuth_path = teuth_config.teuthology_path - else: - teuth_path = fetch_teuthology(branch=teuthology_branch) - # For the teuthology tasks, we look for suite_branch, and if we - # don't get that, we look for branch, and fall back to 'master'. - # last-in-suite jobs don't have suite_branch or branch set. - ceph_branch = job_config.get('branch', 'master') - suite_branch = job_config.get('suite_branch', ceph_branch) - job_config['suite_path'] = fetch_qa_suite(suite_branch) - except BranchNotFoundError as exc: - log.exception("Branch not found; marking job as dead") - report.try_push_job_info( + job_config, teuth_bin_path = prep_job( + job_config, + log_file_path, + ctx.archive_dir, + ) + run_job( job_config, - dict(status='dead', failure_reason=str(exc)) + teuth_bin_path, + ctx.archive_dir, + ctx.verbose, ) + except SkipJob: continue - teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin') - if not os.path.isdir(teuth_bin_path): - raise RuntimeError("teuthology branch %s at %s not bootstrapped!" % - (teuthology_branch, teuth_bin_path)) - - if job_config.get('last_in_suite'): - if teuth_config.results_server: - report.try_delete_jobs(job_config['name'], - job_config['job_id']) - log.info('Generating results email for %s', job_config['name']) - args = [ - os.path.join(teuth_bin_path, 'teuthology-results'), - '--timeout', - str(job_config.get('results_timeout', - teuth_config.results_timeout)), - '--email', - job_config['email'], - '--archive-dir', - os.path.join(ctx.archive_dir, safe_archive), - '--name', - job_config['name'], - ] - # Execute teuthology-results, passing 'preexec_fn=os.setpgrp' to - # make sure that it will continue to run if this worker process - # dies (e.g. because of a restart) - result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp) - log.info("teuthology-results PID: %s", result_proc.pid) - else: - log.info('Creating archive dir %s', archive_path_full) - safepath.makedirs(ctx.archive_dir, safe_archive) - log.info('Running job %d', job_id) - run_job(job_config, teuth_bin_path, ctx.verbose) job.delete() -def run_with_watchdog(process, job_config): - job_start_time = datetime.utcnow() - - # Only push the information that's relevant to the watchdog, to save db - # load - job_info = dict( - name=job_config['name'], - job_id=job_config['job_id'], - ) - - # Sleep once outside of the loop to avoid double-posting jobs - time.sleep(teuth_config.watchdog_interval) - symlink_worker_log(job_config['worker_log'], job_config['archive_path']) - while process.poll() is None: - # Kill jobs that have been running longer than the global max - run_time = datetime.utcnow() - job_start_time - total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds - if total_seconds > teuth_config.max_job_time: - log.warning("Job ran longer than {max}s. Killing...".format( - max=teuth_config.max_job_time)) - kill_job(job_info['name'], job_info['job_id'], - teuth_config.archive_base) +def prep_job(job_config, log_file_path, archive_dir): + job_id = job_config['job_id'] + safe_archive = safepath.munge(job_config['name']) + job_config['worker_log'] = log_file_path + archive_path_full = os.path.join( + archive_dir, safe_archive, str(job_id)) + job_config['archive_path'] = archive_path_full - # calling this without a status just updates the jobs updated time - report.try_push_job_info(job_info) - time.sleep(teuth_config.watchdog_interval) - - # The job finished. Let's make sure paddles knows. - branches_sans_reporting = ('argonaut', 'bobtail', 'cuttlefish', 'dumpling') - if job_config.get('teuthology_branch') in branches_sans_reporting: - # The job ran with a teuthology branch that may not have the reporting - # feature. Let's call teuthology-report (which will be from the master - # branch) to report the job manually. - cmd = "teuthology-report -v -D -r {run_name} -j {job_id}".format( - run_name=job_info['name'], - job_id=job_info['job_id']) - try: - log.info("Executing %s" % cmd) - report_proc = subprocess.Popen(cmd, shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - while report_proc.poll() is None: - for line in report_proc.stdout.readlines(): - log.info(line.strip()) - time.sleep(1) - log.info("Reported results via the teuthology-report command") - except Exception: - log.exception("teuthology-report failed") - else: - # Let's make sure that paddles knows the job is finished. We don't know - # the status, but if it was a pass or fail it will have already been - # reported to paddles. In that case paddles ignores the 'dead' status. - # If the job was killed, paddles will use the 'dead' status. - report.try_push_job_info(job_info, dict(status='dead')) + # If the teuthology branch was not specified, default to master and + # store that value. + teuthology_branch = job_config.get('teuthology_branch', 'master') + job_config['teuthology_branch'] = teuthology_branch + try: + if teuth_config.teuthology_path is not None: + teuth_path = teuth_config.teuthology_path + else: + teuth_path = fetch_teuthology(branch=teuthology_branch) + # For the teuthology tasks, we look for suite_branch, and if we + # don't get that, we look for branch, and fall back to 'master'. + # last-in-suite jobs don't have suite_branch or branch set. + ceph_branch = job_config.get('branch', 'master') + suite_branch = job_config.get('suite_branch', ceph_branch) + job_config['suite_path'] = fetch_qa_suite(suite_branch) + except BranchNotFoundError as exc: + log.exception("Branch not found; marking job as dead") + report.try_push_job_info( + job_config, + dict(status='dead', failure_reason=str(exc)) + ) + raise SkipJob() + + teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin') + if not os.path.isdir(teuth_bin_path): + raise RuntimeError("teuthology branch %s at %s not bootstrapped!" % + (teuthology_branch, teuth_bin_path)) + return job_config, teuth_bin_path + + +def run_job(job_config, teuth_bin_path, archive_dir, verbose): + safe_archive = safepath.munge(job_config['name']) + if job_config.get('last_in_suite'): + if teuth_config.results_server: + report.try_delete_jobs(job_config['name'], job_config['job_id']) + log.info('Generating results email for %s', job_config['name']) + args = [ + os.path.join(teuth_bin_path, 'teuthology-results'), + '--timeout', + str(job_config.get('results_timeout', + teuth_config.results_timeout)), + '--email', + job_config['email'], + '--archive-dir', + os.path.join(archive_dir, safe_archive), + '--name', + job_config['name'], + ] + # Execute teuthology-results, passing 'preexec_fn=os.setpgrp' to + # make sure that it will continue to run if this worker process + # dies (e.g. because of a restart) + result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp) + log.info("teuthology-results PID: %s", result_proc.pid) + return + + log.info('Creating archive dir %s', job_config['archive_path']) + safepath.makedirs(archive_dir, safe_archive) + log.info('Running job %s', job_config['job_id']) -def run_job(job_config, teuth_bin_path, verbose): suite_path = job_config['suite_path'] arg = [ os.path.join(teuth_bin_path, 'teuthology'), @@ -297,6 +261,62 @@ def run_job(job_config, teuth_bin_path, verbose): log.info('Success!') +def run_with_watchdog(process, job_config): + job_start_time = datetime.utcnow() + + # Only push the information that's relevant to the watchdog, to save db + # load + job_info = dict( + name=job_config['name'], + job_id=job_config['job_id'], + ) + + # Sleep once outside of the loop to avoid double-posting jobs + time.sleep(teuth_config.watchdog_interval) + symlink_worker_log(job_config['worker_log'], job_config['archive_path']) + while process.poll() is None: + # Kill jobs that have been running longer than the global max + run_time = datetime.utcnow() - job_start_time + total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds + if total_seconds > teuth_config.max_job_time: + log.warning("Job ran longer than {max}s. Killing...".format( + max=teuth_config.max_job_time)) + kill_job(job_info['name'], job_info['job_id'], + teuth_config.archive_base) + + # calling this without a status just updates the jobs updated time + report.try_push_job_info(job_info) + time.sleep(teuth_config.watchdog_interval) + + # The job finished. Let's make sure paddles knows. + branches_sans_reporting = ('argonaut', 'bobtail', 'cuttlefish', 'dumpling') + if job_config.get('teuthology_branch') in branches_sans_reporting: + # The job ran with a teuthology branch that may not have the reporting + # feature. Let's call teuthology-report (which will be from the master + # branch) to report the job manually. + cmd = "teuthology-report -v -D -r {run_name} -j {job_id}".format( + run_name=job_info['name'], + job_id=job_info['job_id']) + try: + log.info("Executing %s" % cmd) + report_proc = subprocess.Popen(cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + while report_proc.poll() is None: + for line in report_proc.stdout.readlines(): + log.info(line.strip()) + time.sleep(1) + log.info("Reported results via the teuthology-report command") + except Exception: + log.exception("teuthology-report failed") + else: + # Let's make sure that paddles knows the job is finished. We don't know + # the status, but if it was a pass or fail it will have already been + # reported to paddles. In that case paddles ignores the 'dead' status. + # If the job was killed, paddles will use the 'dead' status. + report.try_push_job_info(job_info, dict(status='dead')) + + def symlink_worker_log(worker_log_path, archive_dir): try: log.debug("Worker log: %s", worker_log_path)