+import os
import subprocess
from mock import patch, Mock, MagicMock
@patch("teuthology.worker.teuth_config")
@patch("subprocess.Popen")
@patch("os.environ")
+ @patch("os.mkdir")
@patch("yaml.safe_dump")
@patch("tempfile.NamedTemporaryFile")
- def test_run_job_with_watchdog(self, m_tempfile, m_safe_dump, m_environ,
- m_popen, m_t_config, m_run_watchdog):
+ def test_run_job_with_watchdog(self, m_tempfile, m_safe_dump, m_mkdir,
+ m_environ, m_popen, m_t_config,
+ m_run_watchdog):
config = {
"suite_path": "suite/path",
"config": {"foo": "bar"},
"owner": "the_owner",
"archive_path": "archive/path",
"name": "the_name",
- "description": "the_description"
+ "description": "the_description",
+ "job_id": "1",
}
m_tmp = MagicMock()
temp_file = Mock()
m_p.returncode = 0
m_popen.return_value = m_p
m_t_config.results_server = True
- worker.run_job(config, "teuth/bin/path", verbose=False)
+ worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False)
m_run_watchdog.assert_called_with(m_p, config)
expected_args = [
'teuth/bin/path/teuthology',
@patch("teuthology.worker.teuth_config")
@patch("subprocess.Popen")
@patch("os.environ")
+ @patch("os.mkdir")
@patch("yaml.safe_dump")
@patch("tempfile.NamedTemporaryFile")
- def test_run_job_no_watchdog(self, m_tempfile, m_safe_dump, m_environ,
- m_popen, m_t_config, m_symlink_log, m_sleep):
+ def test_run_job_no_watchdog(self, m_tempfile, m_safe_dump, m_mkdir,
+ m_environ, m_popen, m_t_config, m_symlink_log,
+ m_sleep):
config = {
"suite_path": "suite/path",
"config": {"foo": "bar"},
"archive_path": "archive/path",
"name": "the_name",
"description": "the_description",
- "worker_log": "worker/log.log"
+ "worker_log": "worker/log.log",
+ "job_id": "1",
}
m_tmp = MagicMock()
temp_file = Mock()
m_p.returncode = 1
m_popen.return_value = m_p
m_t_config.results_server = False
- worker.run_job(config, "teuth/bin/path", verbose=False)
+ worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False)
m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
@patch("teuthology.worker.report.try_push_job_info")
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
+
+ @patch("os.path.isdir")
+ @patch("teuthology.worker.fetch_teuthology")
+ @patch("teuthology.worker.fetch_qa_suite")
+ def test_prep_job(self, m_fetch_qa_suite,
+ m_fetch_teuthology, m_isdir):
+ config = dict(
+ name="the_name",
+ job_id="1",
+ )
+ archive_dir = '/archive/dir'
+ log_file_path = '/worker/log'
+ m_fetch_teuthology.return_value = '/teuth/path'
+ m_fetch_qa_suite.return_value = '/suite/path'
+ m_isdir.return_value = True
+ got_config, teuth_bin_path = worker.prep_job(
+ config,
+ log_file_path,
+ archive_dir,
+ )
+ assert got_config['worker_log'] == log_file_path
+ assert got_config['archive_path'] == os.path.join(
+ archive_dir,
+ config['name'],
+ config['job_id'],
+ )
+ assert got_config['teuthology_branch'] == 'master'
+ assert m_fetch_teuthology.called_once_with_args(branch='master')
+ assert teuth_bin_path == '/teuth/path/virtualenv/bin'
+ assert m_fetch_qa_suite.called_once_with_args(branch='master')
+ assert got_config['suite_path'] == '/suite/path'
from . import safepath
from .config import config as teuth_config
from .config import set_config_attr
-from .exceptions import BranchNotFoundError
+from .exceptions import BranchNotFoundError, SkipJob
from .kill import kill_job
from .repo_utils import fetch_qa_suite, fetch_teuthology
if job_config.get('stop_worker'):
keep_running = False
- safe_archive = safepath.munge(job_config['name'])
- job_config['worker_log'] = log_file_path
- archive_path_full = os.path.join(
- ctx.archive_dir, safe_archive, str(job_id))
- job_config['archive_path'] = archive_path_full
-
- # If the teuthology branch was not specified, default to master and
- # store that value.
- teuthology_branch = job_config.get('teuthology_branch', 'master')
- job_config['teuthology_branch'] = teuthology_branch
-
try:
- if teuth_config.teuthology_path is not None:
- teuth_path = teuth_config.teuthology_path
- else:
- teuth_path = fetch_teuthology(branch=teuthology_branch)
- # For the teuthology tasks, we look for suite_branch, and if we
- # don't get that, we look for branch, and fall back to 'master'.
- # last-in-suite jobs don't have suite_branch or branch set.
- ceph_branch = job_config.get('branch', 'master')
- suite_branch = job_config.get('suite_branch', ceph_branch)
- job_config['suite_path'] = fetch_qa_suite(suite_branch)
- except BranchNotFoundError as exc:
- log.exception("Branch not found; marking job as dead")
- report.try_push_job_info(
+ job_config, teuth_bin_path = prep_job(
+ job_config,
+ log_file_path,
+ ctx.archive_dir,
+ )
+ run_job(
job_config,
- dict(status='dead', failure_reason=str(exc))
+ teuth_bin_path,
+ ctx.archive_dir,
+ ctx.verbose,
)
+ except SkipJob:
continue
- teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin')
- if not os.path.isdir(teuth_bin_path):
- raise RuntimeError("teuthology branch %s at %s not bootstrapped!" %
- (teuthology_branch, teuth_bin_path))
-
- if job_config.get('last_in_suite'):
- if teuth_config.results_server:
- report.try_delete_jobs(job_config['name'],
- job_config['job_id'])
- log.info('Generating results email for %s', job_config['name'])
- args = [
- os.path.join(teuth_bin_path, 'teuthology-results'),
- '--timeout',
- str(job_config.get('results_timeout',
- teuth_config.results_timeout)),
- '--email',
- job_config['email'],
- '--archive-dir',
- os.path.join(ctx.archive_dir, safe_archive),
- '--name',
- job_config['name'],
- ]
- # Execute teuthology-results, passing 'preexec_fn=os.setpgrp' to
- # make sure that it will continue to run if this worker process
- # dies (e.g. because of a restart)
- result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp)
- log.info("teuthology-results PID: %s", result_proc.pid)
- else:
- log.info('Creating archive dir %s', archive_path_full)
- safepath.makedirs(ctx.archive_dir, safe_archive)
- log.info('Running job %d', job_id)
- run_job(job_config, teuth_bin_path, ctx.verbose)
job.delete()
-def run_with_watchdog(process, job_config):
- job_start_time = datetime.utcnow()
-
- # Only push the information that's relevant to the watchdog, to save db
- # load
- job_info = dict(
- name=job_config['name'],
- job_id=job_config['job_id'],
- )
-
- # Sleep once outside of the loop to avoid double-posting jobs
- time.sleep(teuth_config.watchdog_interval)
- symlink_worker_log(job_config['worker_log'], job_config['archive_path'])
- while process.poll() is None:
- # Kill jobs that have been running longer than the global max
- run_time = datetime.utcnow() - job_start_time
- total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds
- if total_seconds > teuth_config.max_job_time:
- log.warning("Job ran longer than {max}s. Killing...".format(
- max=teuth_config.max_job_time))
- kill_job(job_info['name'], job_info['job_id'],
- teuth_config.archive_base)
+def prep_job(job_config, log_file_path, archive_dir):
+ job_id = job_config['job_id']
+ safe_archive = safepath.munge(job_config['name'])
+ job_config['worker_log'] = log_file_path
+ archive_path_full = os.path.join(
+ archive_dir, safe_archive, str(job_id))
+ job_config['archive_path'] = archive_path_full
- # calling this without a status just updates the jobs updated time
- report.try_push_job_info(job_info)
- time.sleep(teuth_config.watchdog_interval)
-
- # The job finished. Let's make sure paddles knows.
- branches_sans_reporting = ('argonaut', 'bobtail', 'cuttlefish', 'dumpling')
- if job_config.get('teuthology_branch') in branches_sans_reporting:
- # The job ran with a teuthology branch that may not have the reporting
- # feature. Let's call teuthology-report (which will be from the master
- # branch) to report the job manually.
- cmd = "teuthology-report -v -D -r {run_name} -j {job_id}".format(
- run_name=job_info['name'],
- job_id=job_info['job_id'])
- try:
- log.info("Executing %s" % cmd)
- report_proc = subprocess.Popen(cmd, shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- while report_proc.poll() is None:
- for line in report_proc.stdout.readlines():
- log.info(line.strip())
- time.sleep(1)
- log.info("Reported results via the teuthology-report command")
- except Exception:
- log.exception("teuthology-report failed")
- else:
- # Let's make sure that paddles knows the job is finished. We don't know
- # the status, but if it was a pass or fail it will have already been
- # reported to paddles. In that case paddles ignores the 'dead' status.
- # If the job was killed, paddles will use the 'dead' status.
- report.try_push_job_info(job_info, dict(status='dead'))
+ # If the teuthology branch was not specified, default to master and
+ # store that value.
+ teuthology_branch = job_config.get('teuthology_branch', 'master')
+ job_config['teuthology_branch'] = teuthology_branch
+ try:
+ if teuth_config.teuthology_path is not None:
+ teuth_path = teuth_config.teuthology_path
+ else:
+ teuth_path = fetch_teuthology(branch=teuthology_branch)
+ # For the teuthology tasks, we look for suite_branch, and if we
+ # don't get that, we look for branch, and fall back to 'master'.
+ # last-in-suite jobs don't have suite_branch or branch set.
+ ceph_branch = job_config.get('branch', 'master')
+ suite_branch = job_config.get('suite_branch', ceph_branch)
+ job_config['suite_path'] = fetch_qa_suite(suite_branch)
+ except BranchNotFoundError as exc:
+ log.exception("Branch not found; marking job as dead")
+ report.try_push_job_info(
+ job_config,
+ dict(status='dead', failure_reason=str(exc))
+ )
+ raise SkipJob()
+
+ teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin')
+ if not os.path.isdir(teuth_bin_path):
+ raise RuntimeError("teuthology branch %s at %s not bootstrapped!" %
+ (teuthology_branch, teuth_bin_path))
+ return job_config, teuth_bin_path
+
+
+def run_job(job_config, teuth_bin_path, archive_dir, verbose):
+ safe_archive = safepath.munge(job_config['name'])
+ if job_config.get('last_in_suite'):
+ if teuth_config.results_server:
+ report.try_delete_jobs(job_config['name'], job_config['job_id'])
+ log.info('Generating results email for %s', job_config['name'])
+ args = [
+ os.path.join(teuth_bin_path, 'teuthology-results'),
+ '--timeout',
+ str(job_config.get('results_timeout',
+ teuth_config.results_timeout)),
+ '--email',
+ job_config['email'],
+ '--archive-dir',
+ os.path.join(archive_dir, safe_archive),
+ '--name',
+ job_config['name'],
+ ]
+ # Execute teuthology-results, passing 'preexec_fn=os.setpgrp' to
+ # make sure that it will continue to run if this worker process
+ # dies (e.g. because of a restart)
+ result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp)
+ log.info("teuthology-results PID: %s", result_proc.pid)
+ return
+
+ log.info('Creating archive dir %s', job_config['archive_path'])
+ safepath.makedirs(archive_dir, safe_archive)
+ log.info('Running job %s', job_config['job_id'])
-def run_job(job_config, teuth_bin_path, verbose):
suite_path = job_config['suite_path']
arg = [
os.path.join(teuth_bin_path, 'teuthology'),
log.info('Success!')
+def run_with_watchdog(process, job_config):
+ job_start_time = datetime.utcnow()
+
+ # Only push the information that's relevant to the watchdog, to save db
+ # load
+ job_info = dict(
+ name=job_config['name'],
+ job_id=job_config['job_id'],
+ )
+
+ # Sleep once outside of the loop to avoid double-posting jobs
+ time.sleep(teuth_config.watchdog_interval)
+ symlink_worker_log(job_config['worker_log'], job_config['archive_path'])
+ while process.poll() is None:
+ # Kill jobs that have been running longer than the global max
+ run_time = datetime.utcnow() - job_start_time
+ total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds
+ if total_seconds > teuth_config.max_job_time:
+ log.warning("Job ran longer than {max}s. Killing...".format(
+ max=teuth_config.max_job_time))
+ kill_job(job_info['name'], job_info['job_id'],
+ teuth_config.archive_base)
+
+ # calling this without a status just updates the jobs updated time
+ report.try_push_job_info(job_info)
+ time.sleep(teuth_config.watchdog_interval)
+
+ # The job finished. Let's make sure paddles knows.
+ branches_sans_reporting = ('argonaut', 'bobtail', 'cuttlefish', 'dumpling')
+ if job_config.get('teuthology_branch') in branches_sans_reporting:
+ # The job ran with a teuthology branch that may not have the reporting
+ # feature. Let's call teuthology-report (which will be from the master
+ # branch) to report the job manually.
+ cmd = "teuthology-report -v -D -r {run_name} -j {job_id}".format(
+ run_name=job_info['name'],
+ job_id=job_info['job_id'])
+ try:
+ log.info("Executing %s" % cmd)
+ report_proc = subprocess.Popen(cmd, shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ while report_proc.poll() is None:
+ for line in report_proc.stdout.readlines():
+ log.info(line.strip())
+ time.sleep(1)
+ log.info("Reported results via the teuthology-report command")
+ except Exception:
+ log.exception("teuthology-report failed")
+ else:
+ # Let's make sure that paddles knows the job is finished. We don't know
+ # the status, but if it was a pass or fail it will have already been
+ # reported to paddles. In that case paddles ignores the 'dead' status.
+ # If the job was killed, paddles will use the 'dead' status.
+ report.try_push_job_info(job_info, dict(status='dead'))
+
+
def symlink_worker_log(worker_log_path, archive_dir):
try:
log.debug("Worker log: %s", worker_log_path)