]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
Move job preparation to new prep_job method
authorZack Cerza <zack@redhat.com>
Thu, 19 May 2016 16:22:37 +0000 (10:22 -0600)
committerZack Cerza <zack@redhat.com>
Tue, 24 May 2016 23:58:10 +0000 (17:58 -0600)
Signed-off-by: Zack Cerza <zack@redhat.com>
teuthology/test/test_worker.py
teuthology/worker.py

index 01f4e30e805e0bd98ea92abf4ea43b708eb43952..4f6d4241b249c2e60257e2920d10fc4ffd04f954 100644 (file)
@@ -1,3 +1,4 @@
+import os
 import subprocess
 
 from mock import patch, Mock, MagicMock
@@ -49,10 +50,12 @@ class TestWorker(object):
     @patch("teuthology.worker.teuth_config")
     @patch("subprocess.Popen")
     @patch("os.environ")
+    @patch("os.mkdir")
     @patch("yaml.safe_dump")
     @patch("tempfile.NamedTemporaryFile")
-    def test_run_job_with_watchdog(self, m_tempfile, m_safe_dump, m_environ,
-                                   m_popen, m_t_config, m_run_watchdog):
+    def test_run_job_with_watchdog(self, m_tempfile, m_safe_dump, m_mkdir,
+                                   m_environ, m_popen, m_t_config,
+                                   m_run_watchdog):
         config = {
             "suite_path": "suite/path",
             "config": {"foo": "bar"},
@@ -60,7 +63,8 @@ class TestWorker(object):
             "owner": "the_owner",
             "archive_path": "archive/path",
             "name": "the_name",
-            "description": "the_description"
+            "description": "the_description",
+            "job_id": "1",
         }
         m_tmp = MagicMock()
         temp_file = Mock()
@@ -73,7 +77,7 @@ class TestWorker(object):
         m_p.returncode = 0
         m_popen.return_value = m_p
         m_t_config.results_server = True
-        worker.run_job(config, "teuth/bin/path", verbose=False)
+        worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False)
         m_run_watchdog.assert_called_with(m_p, config)
         expected_args = [
             'teuth/bin/path/teuthology',
@@ -95,10 +99,12 @@ class TestWorker(object):
     @patch("teuthology.worker.teuth_config")
     @patch("subprocess.Popen")
     @patch("os.environ")
+    @patch("os.mkdir")
     @patch("yaml.safe_dump")
     @patch("tempfile.NamedTemporaryFile")
-    def test_run_job_no_watchdog(self, m_tempfile, m_safe_dump, m_environ,
-                                 m_popen, m_t_config, m_symlink_log, m_sleep):
+    def test_run_job_no_watchdog(self, m_tempfile, m_safe_dump, m_mkdir,
+                                 m_environ, m_popen, m_t_config, m_symlink_log,
+                                 m_sleep):
         config = {
             "suite_path": "suite/path",
             "config": {"foo": "bar"},
@@ -107,7 +113,8 @@ class TestWorker(object):
             "archive_path": "archive/path",
             "name": "the_name",
             "description": "the_description",
-            "worker_log": "worker/log.log"
+            "worker_log": "worker/log.log",
+            "job_id": "1",
         }
         m_tmp = MagicMock()
         temp_file = Mock()
@@ -120,7 +127,7 @@ class TestWorker(object):
         m_p.returncode = 1
         m_popen.return_value = m_p
         m_t_config.results_server = False
-        worker.run_job(config, "teuth/bin/path", verbose=False)
+        worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False)
         m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
 
     @patch("teuthology.worker.report.try_push_job_info")
@@ -168,3 +175,34 @@ class TestWorker(object):
             stdout=subprocess.PIPE,
             stderr=subprocess.STDOUT
         )
+
+    @patch("os.path.isdir")
+    @patch("teuthology.worker.fetch_teuthology")
+    @patch("teuthology.worker.fetch_qa_suite")
+    def test_prep_job(self, m_fetch_qa_suite,
+                      m_fetch_teuthology, m_isdir):
+        config = dict(
+            name="the_name",
+            job_id="1",
+        )
+        archive_dir = '/archive/dir'
+        log_file_path = '/worker/log'
+        m_fetch_teuthology.return_value = '/teuth/path'
+        m_fetch_qa_suite.return_value = '/suite/path'
+        m_isdir.return_value = True
+        got_config, teuth_bin_path = worker.prep_job(
+            config,
+            log_file_path,
+            archive_dir,
+        )
+        assert got_config['worker_log'] == log_file_path
+        assert got_config['archive_path'] == os.path.join(
+            archive_dir,
+            config['name'],
+            config['job_id'],
+        )
+        assert got_config['teuthology_branch'] == 'master'
+        assert m_fetch_teuthology.called_once_with_args(branch='master')
+        assert teuth_bin_path == '/teuth/path/virtualenv/bin'
+        assert m_fetch_qa_suite.called_once_with_args(branch='master')
+        assert got_config['suite_path'] == '/suite/path'
index b390b36e79fabfdf2a98fd021885cdabea109bbe..a6982049baecb28b36fff31bba30ffaa873380b5 100644 (file)
@@ -14,7 +14,7 @@ from . import report
 from . import safepath
 from .config import config as teuth_config
 from .config import set_config_attr
-from .exceptions import BranchNotFoundError
+from .exceptions import BranchNotFoundError, SkipJob
 from .kill import kill_job
 from .repo_utils import fetch_qa_suite, fetch_teuthology
 
@@ -110,128 +110,92 @@ def main(ctx):
         if job_config.get('stop_worker'):
             keep_running = False
 
-        safe_archive = safepath.munge(job_config['name'])
-        job_config['worker_log'] = log_file_path
-        archive_path_full = os.path.join(
-            ctx.archive_dir, safe_archive, str(job_id))
-        job_config['archive_path'] = archive_path_full
-
-        # If the teuthology branch was not specified, default to master and
-        # store that value.
-        teuthology_branch = job_config.get('teuthology_branch', 'master')
-        job_config['teuthology_branch'] = teuthology_branch
-
         try:
-            if teuth_config.teuthology_path is not None:
-                teuth_path = teuth_config.teuthology_path
-            else:
-                teuth_path = fetch_teuthology(branch=teuthology_branch)
-            # For the teuthology tasks, we look for suite_branch, and if we
-            # don't get that, we look for branch, and fall back to 'master'.
-            # last-in-suite jobs don't have suite_branch or branch set.
-            ceph_branch = job_config.get('branch', 'master')
-            suite_branch = job_config.get('suite_branch', ceph_branch)
-            job_config['suite_path'] = fetch_qa_suite(suite_branch)
-        except BranchNotFoundError as exc:
-            log.exception("Branch not found; marking job as dead")
-            report.try_push_job_info(
+            job_config, teuth_bin_path = prep_job(
+                job_config,
+                log_file_path,
+                ctx.archive_dir,
+            )
+            run_job(
                 job_config,
-                dict(status='dead', failure_reason=str(exc))
+                teuth_bin_path,
+                ctx.archive_dir,
+                ctx.verbose,
             )
+        except SkipJob:
             continue
 
-        teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin')
-        if not os.path.isdir(teuth_bin_path):
-            raise RuntimeError("teuthology branch %s at %s not bootstrapped!" %
-                               (teuthology_branch, teuth_bin_path))
-
-        if job_config.get('last_in_suite'):
-            if teuth_config.results_server:
-                report.try_delete_jobs(job_config['name'],
-                                       job_config['job_id'])
-            log.info('Generating results email for %s', job_config['name'])
-            args = [
-                os.path.join(teuth_bin_path, 'teuthology-results'),
-                '--timeout',
-                str(job_config.get('results_timeout',
-                                   teuth_config.results_timeout)),
-                '--email',
-                job_config['email'],
-                '--archive-dir',
-                os.path.join(ctx.archive_dir, safe_archive),
-                '--name',
-                job_config['name'],
-            ]
-            # Execute teuthology-results, passing 'preexec_fn=os.setpgrp' to
-            # make sure that it will continue to run if this worker process
-            # dies (e.g. because of a restart)
-            result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp)
-            log.info("teuthology-results PID: %s", result_proc.pid)
-        else:
-            log.info('Creating archive dir %s', archive_path_full)
-            safepath.makedirs(ctx.archive_dir, safe_archive)
-            log.info('Running job %d', job_id)
-            run_job(job_config, teuth_bin_path, ctx.verbose)
         job.delete()
 
 
-def run_with_watchdog(process, job_config):
-    job_start_time = datetime.utcnow()
-
-    # Only push the information that's relevant to the watchdog, to save db
-    # load
-    job_info = dict(
-        name=job_config['name'],
-        job_id=job_config['job_id'],
-    )
-
-    # Sleep once outside of the loop to avoid double-posting jobs
-    time.sleep(teuth_config.watchdog_interval)
-    symlink_worker_log(job_config['worker_log'], job_config['archive_path'])
-    while process.poll() is None:
-        # Kill jobs that have been running longer than the global max
-        run_time = datetime.utcnow() - job_start_time
-        total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds
-        if total_seconds > teuth_config.max_job_time:
-            log.warning("Job ran longer than {max}s. Killing...".format(
-                max=teuth_config.max_job_time))
-            kill_job(job_info['name'], job_info['job_id'],
-                     teuth_config.archive_base)
+def prep_job(job_config, log_file_path, archive_dir):
+    job_id = job_config['job_id']
+    safe_archive = safepath.munge(job_config['name'])
+    job_config['worker_log'] = log_file_path
+    archive_path_full = os.path.join(
+        archive_dir, safe_archive, str(job_id))
+    job_config['archive_path'] = archive_path_full
 
-        # calling this without a status just updates the jobs updated time
-        report.try_push_job_info(job_info)
-        time.sleep(teuth_config.watchdog_interval)
-
-    # The job finished. Let's make sure paddles knows.
-    branches_sans_reporting = ('argonaut', 'bobtail', 'cuttlefish', 'dumpling')
-    if job_config.get('teuthology_branch') in branches_sans_reporting:
-        # The job ran with a teuthology branch that may not have the reporting
-        # feature. Let's call teuthology-report (which will be from the master
-        # branch) to report the job manually.
-        cmd = "teuthology-report -v -D -r {run_name} -j {job_id}".format(
-            run_name=job_info['name'],
-            job_id=job_info['job_id'])
-        try:
-            log.info("Executing %s" % cmd)
-            report_proc = subprocess.Popen(cmd, shell=True,
-                                           stdout=subprocess.PIPE,
-                                           stderr=subprocess.STDOUT)
-            while report_proc.poll() is None:
-                for line in report_proc.stdout.readlines():
-                    log.info(line.strip())
-                time.sleep(1)
-            log.info("Reported results via the teuthology-report command")
-        except Exception:
-            log.exception("teuthology-report failed")
-    else:
-        # Let's make sure that paddles knows the job is finished. We don't know
-        # the status, but if it was a pass or fail it will have already been
-        # reported to paddles. In that case paddles ignores the 'dead' status.
-        # If the job was killed, paddles will use the 'dead' status.
-        report.try_push_job_info(job_info, dict(status='dead'))
+    # If the teuthology branch was not specified, default to master and
+    # store that value.
+    teuthology_branch = job_config.get('teuthology_branch', 'master')
+    job_config['teuthology_branch'] = teuthology_branch
 
+    try:
+        if teuth_config.teuthology_path is not None:
+            teuth_path = teuth_config.teuthology_path
+        else:
+            teuth_path = fetch_teuthology(branch=teuthology_branch)
+        # For the teuthology tasks, we look for suite_branch, and if we
+        # don't get that, we look for branch, and fall back to 'master'.
+        # last-in-suite jobs don't have suite_branch or branch set.
+        ceph_branch = job_config.get('branch', 'master')
+        suite_branch = job_config.get('suite_branch', ceph_branch)
+        job_config['suite_path'] = fetch_qa_suite(suite_branch)
+    except BranchNotFoundError as exc:
+        log.exception("Branch not found; marking job as dead")
+        report.try_push_job_info(
+            job_config,
+            dict(status='dead', failure_reason=str(exc))
+        )
+        raise SkipJob()
+
+    teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin')
+    if not os.path.isdir(teuth_bin_path):
+        raise RuntimeError("teuthology branch %s at %s not bootstrapped!" %
+                           (teuthology_branch, teuth_bin_path))
+    return job_config, teuth_bin_path
+
+
+def run_job(job_config, teuth_bin_path, archive_dir, verbose):
+    safe_archive = safepath.munge(job_config['name'])
+    if job_config.get('last_in_suite'):
+        if teuth_config.results_server:
+            report.try_delete_jobs(job_config['name'], job_config['job_id'])
+        log.info('Generating results email for %s', job_config['name'])
+        args = [
+            os.path.join(teuth_bin_path, 'teuthology-results'),
+            '--timeout',
+            str(job_config.get('results_timeout',
+                               teuth_config.results_timeout)),
+            '--email',
+            job_config['email'],
+            '--archive-dir',
+            os.path.join(archive_dir, safe_archive),
+            '--name',
+            job_config['name'],
+        ]
+        # Execute teuthology-results, passing 'preexec_fn=os.setpgrp' to
+        # make sure that it will continue to run if this worker process
+        # dies (e.g. because of a restart)
+        result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp)
+        log.info("teuthology-results PID: %s", result_proc.pid)
+        return
+
+    log.info('Creating archive dir %s', job_config['archive_path'])
+    safepath.makedirs(archive_dir, safe_archive)
+    log.info('Running job %s', job_config['job_id'])
 
-def run_job(job_config, teuth_bin_path, verbose):
     suite_path = job_config['suite_path']
     arg = [
         os.path.join(teuth_bin_path, 'teuthology'),
@@ -297,6 +261,62 @@ def run_job(job_config, teuth_bin_path, verbose):
             log.info('Success!')
 
 
+def run_with_watchdog(process, job_config):
+    job_start_time = datetime.utcnow()
+
+    # Only push the information that's relevant to the watchdog, to save db
+    # load
+    job_info = dict(
+        name=job_config['name'],
+        job_id=job_config['job_id'],
+    )
+
+    # Sleep once outside of the loop to avoid double-posting jobs
+    time.sleep(teuth_config.watchdog_interval)
+    symlink_worker_log(job_config['worker_log'], job_config['archive_path'])
+    while process.poll() is None:
+        # Kill jobs that have been running longer than the global max
+        run_time = datetime.utcnow() - job_start_time
+        total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds
+        if total_seconds > teuth_config.max_job_time:
+            log.warning("Job ran longer than {max}s. Killing...".format(
+                max=teuth_config.max_job_time))
+            kill_job(job_info['name'], job_info['job_id'],
+                     teuth_config.archive_base)
+
+        # calling this without a status just updates the jobs updated time
+        report.try_push_job_info(job_info)
+        time.sleep(teuth_config.watchdog_interval)
+
+    # The job finished. Let's make sure paddles knows.
+    branches_sans_reporting = ('argonaut', 'bobtail', 'cuttlefish', 'dumpling')
+    if job_config.get('teuthology_branch') in branches_sans_reporting:
+        # The job ran with a teuthology branch that may not have the reporting
+        # feature. Let's call teuthology-report (which will be from the master
+        # branch) to report the job manually.
+        cmd = "teuthology-report -v -D -r {run_name} -j {job_id}".format(
+            run_name=job_info['name'],
+            job_id=job_info['job_id'])
+        try:
+            log.info("Executing %s" % cmd)
+            report_proc = subprocess.Popen(cmd, shell=True,
+                                           stdout=subprocess.PIPE,
+                                           stderr=subprocess.STDOUT)
+            while report_proc.poll() is None:
+                for line in report_proc.stdout.readlines():
+                    log.info(line.strip())
+                time.sleep(1)
+            log.info("Reported results via the teuthology-report command")
+        except Exception:
+            log.exception("teuthology-report failed")
+    else:
+        # Let's make sure that paddles knows the job is finished. We don't know
+        # the status, but if it was a pass or fail it will have already been
+        # reported to paddles. In that case paddles ignores the 'dead' status.
+        # If the job was killed, paddles will use the 'dead' status.
+        report.try_push_job_info(job_info, dict(status='dead'))
+
+
 def symlink_worker_log(worker_log_path, archive_dir):
     try:
         log.debug("Worker log: %s", worker_log_path)