+++ /dev/null
-import datetime
-import os
-
-from unittest.mock import patch, Mock, MagicMock
-
-from teuthology import worker
-
-from teuthology.contextutil import MaxWhileTries
-
-
-class TestWorker(object):
- def setup_method(self):
- self.ctx = Mock()
- self.ctx.verbose = True
- self.ctx.archive_dir = '/archive/dir'
- self.ctx.log_dir = '/log/dir'
- self.ctx.tube = 'tube'
-
- @patch("os.path.exists")
- def test_restart_file_path_doesnt_exist(self, m_exists):
- m_exists.return_value = False
- result = worker.sentinel(worker.restart_file_path)
- assert not result
-
- @patch("os.path.getmtime")
- @patch("os.path.exists")
- def test_needs_restart(self, m_exists, m_getmtime):
- m_exists.return_value = True
- now = datetime.datetime.now(datetime.timezone.utc)
- m_getmtime.return_value = (now + datetime.timedelta(days=1)).timestamp()
- assert worker.sentinel(worker.restart_file_path)
-
- @patch("os.path.getmtime")
- @patch("os.path.exists")
- def test_does_not_need_restart(self, m_exists, m_getmtime):
- m_exists.return_value = True
- now = datetime.datetime.now(datetime.timezone.utc)
- m_getmtime.return_value = (now - datetime.timedelta(days=1)).timestamp()
- assert not worker.sentinel(worker.restart_file_path)
-
- @patch("os.symlink")
- def test_symlink_success(self, m_symlink):
- worker.symlink_worker_log("path/to/worker.log", "path/to/archive")
- m_symlink.assert_called_with("path/to/worker.log", "path/to/archive/worker.log")
-
- @patch("teuthology.worker.log")
- @patch("os.symlink")
- def test_symlink_failure(self, m_symlink, m_log):
- m_symlink.side_effect = IOError
- worker.symlink_worker_log("path/to/worker.log", "path/to/archive")
- # actually logs the exception
- assert m_log.exception.called
-
- @patch("teuthology.worker.run_with_watchdog")
- @patch("teuthology.worker.teuth_config")
- @patch("subprocess.Popen")
- @patch("os.environ")
- @patch("os.mkdir")
- @patch("yaml.safe_dump")
- @patch("tempfile.NamedTemporaryFile")
- def test_run_job_with_watchdog(self, m_tempfile, m_safe_dump, m_mkdir,
- m_environ, m_popen, m_t_config,
- m_run_watchdog):
- config = {
- "suite_path": "suite/path",
- "config": {"foo": "bar"},
- "verbose": True,
- "owner": "the_owner",
- "archive_path": "archive/path",
- "name": "the_name",
- "description": "the_description",
- "job_id": "1",
- }
- m_tmp = MagicMock()
- temp_file = Mock()
- temp_file.name = "the_name"
- m_tmp.__enter__.return_value = temp_file
- m_tempfile.return_value = m_tmp
- env = dict(PYTHONPATH="python/path")
- m_environ.copy.return_value = env
- m_p = Mock()
- m_p.returncode = 0
- m_popen.return_value = m_p
- m_t_config.results_server = True
- worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False)
- m_run_watchdog.assert_called_with(m_p, config)
- expected_args = [
- 'teuth/bin/path/teuthology',
- '-v',
- '--lock',
- '--block',
- '--owner', 'the_owner',
- '--archive', 'archive/path',
- '--name', 'the_name',
- '--description',
- 'the_description',
- '--',
- "the_name"
- ]
- m_popen.assert_called_with(args=expected_args, env=env)
-
- @patch("time.sleep")
- @patch("teuthology.worker.symlink_worker_log")
- @patch("teuthology.worker.teuth_config")
- @patch("subprocess.Popen")
- @patch("os.environ")
- @patch("os.mkdir")
- @patch("yaml.safe_dump")
- @patch("tempfile.NamedTemporaryFile")
- def test_run_job_no_watchdog(self, m_tempfile, m_safe_dump, m_mkdir,
- m_environ, m_popen, m_t_config, m_symlink_log,
- m_sleep):
- config = {
- "suite_path": "suite/path",
- "config": {"foo": "bar"},
- "verbose": True,
- "owner": "the_owner",
- "archive_path": "archive/path",
- "name": "the_name",
- "description": "the_description",
- "worker_log": "worker/log.log",
- "job_id": "1",
- }
- m_tmp = MagicMock()
- temp_file = Mock()
- temp_file.name = "the_name"
- m_tmp.__enter__.return_value = temp_file
- m_tempfile.return_value = m_tmp
- env = dict(PYTHONPATH="python/path")
- m_environ.copy.return_value = env
- m_p = Mock()
- m_p.returncode = 1
- m_popen.return_value = m_p
- m_t_config.results_server = False
- worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False)
- m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
-
- @patch("teuthology.worker.report.try_push_job_info")
- @patch("teuthology.worker.symlink_worker_log")
- @patch("time.sleep")
- def test_run_with_watchdog_no_reporting(self, m_sleep, m_symlink_log, m_try_push):
- config = {
- "name": "the_name",
- "job_id": "1",
- "worker_log": "worker_log",
- "archive_path": "archive/path",
- "teuthology_branch": "main"
- }
- process = Mock()
- process.poll.return_value = "not None"
- worker.run_with_watchdog(process, config)
- m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
- m_try_push.assert_called_with(
- dict(name=config["name"], job_id=config["job_id"]),
- dict(status='dead')
- )
-
- @patch("subprocess.Popen")
- @patch("teuthology.worker.symlink_worker_log")
- @patch("time.sleep")
- @patch("teuthology.worker.report.try_push_job_info")
- def test_run_with_watchdog_with_reporting(self, m_tpji, m_sleep, m_symlink_log, m_popen):
- config = {
- "name": "the_name",
- "job_id": "1",
- "worker_log": "worker_log",
- "archive_path": "archive/path",
- "teuthology_branch": "jewel"
- }
- process = Mock()
- process.poll.return_value = "not None"
- m_proc = Mock()
- m_proc.poll.return_value = "not None"
- m_popen.return_value = m_proc
- worker.run_with_watchdog(process, config)
- m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
-
- @patch("teuthology.repo_utils.ls_remote")
- @patch("os.path.isdir")
- @patch("teuthology.repo_utils.fetch_teuthology")
- @patch("teuthology.worker.teuth_config")
- @patch("teuthology.repo_utils.fetch_qa_suite")
- def test_prep_job(self, m_fetch_qa_suite, m_teuth_config,
- m_fetch_teuthology, m_isdir, m_ls_remote):
- config = dict(
- name="the_name",
- job_id="1",
- suite_sha1="suite_hash",
- )
- archive_dir = '/archive/dir'
- log_file_path = '/worker/log'
- m_fetch_teuthology.return_value = '/teuth/path'
- m_fetch_qa_suite.return_value = '/suite/path'
- m_ls_remote.return_value = 'teuth_hash'
- m_isdir.return_value = True
- m_teuth_config.teuthology_path = None
- got_config, teuth_bin_path = worker.prep_job(
- config,
- log_file_path,
- archive_dir,
- )
- assert got_config['worker_log'] == log_file_path
- assert got_config['archive_path'] == os.path.join(
- archive_dir,
- config['name'],
- config['job_id'],
- )
- assert got_config['teuthology_branch'] == 'main'
- m_fetch_teuthology.assert_called_once_with(branch='main', commit='teuth_hash')
- assert teuth_bin_path == '/teuth/path/virtualenv/bin'
- m_fetch_qa_suite.assert_called_once_with('main', 'suite_hash')
- assert got_config['suite_path'] == '/suite/path'
-
- def build_fake_jobs(self, m_connection, m_job, job_bodies):
- """
- Given patched copies of:
- beanstalkc.Connection
- beanstalkc.Job
- And a list of basic job bodies, return a list of mocked Job objects
- """
- # Make sure instantiating m_job returns a new object each time
- jobs = []
- job_id = 0
- for job_body in job_bodies:
- job_id += 1
- job = MagicMock(conn=m_connection, jid=job_id, body=job_body)
- job.jid = job_id
- job.body = job_body
- jobs.append(job)
- return jobs
-
- @patch("teuthology.worker.run_job")
- @patch("teuthology.worker.prep_job")
- @patch("beanstalkc.Job", autospec=True)
- @patch("teuthology.repo_utils.fetch_qa_suite")
- @patch("teuthology.repo_utils.fetch_teuthology")
- @patch("teuthology.worker.beanstalk.watch_tube")
- @patch("teuthology.worker.beanstalk.connect")
- @patch("os.path.isdir", return_value=True)
- @patch("teuthology.worker.setup_log_file")
- def test_main_loop(
- self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
- m_fetch_teuthology, m_fetch_qa_suite, m_job, m_prep_job, m_run_job,
- ):
- m_connection = Mock()
- jobs = self.build_fake_jobs(
- m_connection,
- m_job,
- [
- 'foo: bar',
- 'stop_worker: true',
- ],
- )
- m_connection.reserve.side_effect = jobs
- m_connect.return_value = m_connection
- m_prep_job.return_value = (dict(), '/bin/path')
- worker.main(self.ctx)
- # There should be one reserve call per item in the jobs list
- expected_reserve_calls = [
- dict(timeout=60) for i in range(len(jobs))
- ]
- got_reserve_calls = [
- call[1] for call in m_connection.reserve.call_args_list
- ]
- assert got_reserve_calls == expected_reserve_calls
- for job in jobs:
- job.bury.assert_called_once_with()
- job.delete.assert_called_once_with()
-
- @patch("teuthology.repo_utils.ls_remote")
- @patch("teuthology.worker.report.try_push_job_info")
- @patch("teuthology.worker.run_job")
- @patch("beanstalkc.Job", autospec=True)
- @patch("teuthology.repo_utils.fetch_qa_suite")
- @patch("teuthology.repo_utils.fetch_teuthology")
- @patch("teuthology.worker.beanstalk.watch_tube")
- @patch("teuthology.worker.beanstalk.connect")
- @patch("os.path.isdir", return_value=True)
- @patch("teuthology.worker.setup_log_file")
- def test_main_loop_13925(
- self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
- m_fetch_teuthology, m_fetch_qa_suite, m_job, m_run_job,
- m_try_push_job_info, m_ls_remote,
- ):
- m_connection = Mock()
- jobs = self.build_fake_jobs(
- m_connection,
- m_job,
- [
- 'name: name',
- 'name: name\nstop_worker: true',
- ],
- )
- m_connection.reserve.side_effect = jobs
- m_connect.return_value = m_connection
- m_fetch_qa_suite.side_effect = [
- '/suite/path',
- MaxWhileTries(),
- MaxWhileTries(),
- ]
- worker.main(self.ctx)
- assert len(m_run_job.call_args_list) == 0
- assert len(m_try_push_job_info.call_args_list) == len(jobs)
- for i in range(len(jobs)):
- push_call = m_try_push_job_info.call_args_list[i]
- assert push_call[0][1]['status'] == 'dead'
+++ /dev/null
-import datetime
-import logging
-import os
-import subprocess
-import sys
-import tempfile
-import time
-import yaml
-
-from teuthology import (
- # non-modules
- setup_log_file,
- install_except_hook,
- # modules
- beanstalk,
- kill,
- report,
- repo_utils,
- safepath,
-)
-from teuthology.config import config as teuth_config
-from teuthology.config import set_config_attr
-from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries
-
-log = logging.getLogger(__name__)
-start_time = datetime.datetime.now(datetime.timezone.utc)
-restart_file_path = '/tmp/teuthology-restart-workers'
-stop_file_path = '/tmp/teuthology-stop-workers'
-
-
-def sentinel(path):
- if not os.path.exists(path):
- return False
- file_mtime = datetime.datetime.fromtimestamp(
- os.path.getmtime(path),
- datetime.timezone.utc,
- )
- if file_mtime > start_time:
- return True
- else:
- return False
-
-
-def restart():
- log.info('Restarting...')
- args = sys.argv[:]
- args.insert(0, sys.executable)
- os.execv(sys.executable, args)
-
-
-def stop():
- log.info('Stopping...')
- sys.exit(0)
-
-
-def load_config(ctx=None):
- teuth_config.load()
- if ctx is not None:
- if not os.path.isdir(ctx.archive_dir):
- sys.exit("{prog}: archive directory must exist: {path}".format(
- prog=os.path.basename(sys.argv[0]),
- path=ctx.archive_dir,
- ))
- else:
- teuth_config.archive_base = ctx.archive_dir
-
-
-def main(ctx):
- loglevel = logging.INFO
- if ctx.verbose:
- loglevel = logging.DEBUG
- log.setLevel(loglevel)
-
- log_file_path = os.path.join(ctx.log_dir, 'worker.{tube}.{pid}'.format(
- pid=os.getpid(), tube=ctx.tube,))
- setup_log_file(log_file_path)
-
- install_except_hook()
-
- load_config(ctx=ctx)
-
- set_config_attr(ctx)
-
- connection = beanstalk.connect()
- beanstalk.watch_tube(connection, ctx.tube)
- result_proc = None
-
- if teuth_config.teuthology_path is None:
- repo_utils.fetch_teuthology('main')
- repo_utils.fetch_qa_suite('main')
-
- keep_running = True
- while keep_running:
- # Check to see if we have a teuthology-results process hanging around
- # and if so, read its return code so that it can exit.
- if result_proc is not None and result_proc.poll() is not None:
- log.debug("teuthology-results exited with code: %s",
- result_proc.returncode)
- result_proc = None
-
- if sentinel(restart_file_path):
- restart()
- elif sentinel(stop_file_path):
- stop()
-
- load_config()
-
- job = connection.reserve(timeout=60)
- if job is None:
- continue
-
- # bury the job so it won't be re-run if it fails
- job.bury()
- job_id = job.jid
- log.info('Reserved job %d', job_id)
- log.info('Config is: %s', job.body)
- job_config = yaml.safe_load(job.body)
- job_config['job_id'] = str(job_id)
-
- if job_config.get('stop_worker'):
- keep_running = False
-
- try:
- job_config, teuth_bin_path = prep_job(
- job_config,
- log_file_path,
- ctx.archive_dir,
- )
- run_job(
- job_config,
- teuth_bin_path,
- ctx.archive_dir,
- ctx.verbose,
- )
- except SkipJob:
- continue
-
- # This try/except block is to keep the worker from dying when
- # beanstalkc throws a SocketError
- try:
- job.delete()
- except Exception:
- log.exception("Saw exception while trying to delete job")
-
-
-def prep_job(job_config, log_file_path, archive_dir):
- job_id = job_config['job_id']
- safe_archive = safepath.munge(job_config['name'])
- job_config['worker_log'] = log_file_path
- archive_path_full = os.path.join(
- archive_dir, safe_archive, str(job_id))
- job_config['archive_path'] = archive_path_full
-
- # If the teuthology branch was not specified, default to main and
- # store that value.
- teuthology_branch = job_config.get('teuthology_branch', 'main')
- job_config['teuthology_branch'] = teuthology_branch
- teuthology_sha1 = job_config.get('teuthology_sha1')
- if not teuthology_sha1:
- repo_url = repo_utils.build_git_url('teuthology', 'ceph')
- try:
- teuthology_sha1 = repo_utils.ls_remote(repo_url, teuthology_branch)
- except Exception as exc:
- log.exception(f"Could not get teuthology sha1 for branch {teuthology_branch}")
- report.try_push_job_info(
- job_config,
- dict(status='dead', failure_reason=str(exc))
- )
- raise SkipJob()
- if not teuthology_sha1:
- reason = "Teuthology branch {} not found; marking job as dead".format(teuthology_branch)
- log.error(reason)
- report.try_push_job_info(
- job_config,
- dict(status='dead', failure_reason=reason)
- )
- raise SkipJob()
- if teuth_config.teuthology_path is None:
- log.info('Using teuthology sha1 %s', teuthology_sha1)
-
- try:
- if teuth_config.teuthology_path is not None:
- teuth_path = teuth_config.teuthology_path
- else:
- teuth_path = repo_utils.fetch_teuthology(branch=teuthology_branch,
- commit=teuthology_sha1)
- # For the teuthology tasks, we look for suite_branch, and if we
- # don't get that, we look for branch, and fall back to 'main'.
- # last-in-suite jobs don't have suite_branch or branch set.
- ceph_branch = job_config.get('branch', 'main')
- suite_branch = job_config.get('suite_branch', ceph_branch)
- suite_sha1 = job_config.get('suite_sha1')
- suite_repo = job_config.get('suite_repo')
- if suite_repo:
- teuth_config.ceph_qa_suite_git_url = suite_repo
- job_config['suite_path'] = os.path.normpath(os.path.join(
- repo_utils.fetch_qa_suite(suite_branch, suite_sha1),
- job_config.get('suite_relpath', ''),
- ))
- except (BranchNotFoundError, CommitNotFoundError) as exc:
- log.exception("Requested version not found; marking job as dead")
- report.try_push_job_info(
- job_config,
- dict(status='dead', failure_reason=str(exc))
- )
- raise SkipJob()
- except MaxWhileTries as exc:
- log.exception("Failed to fetch or bootstrap; marking job as dead")
- report.try_push_job_info(
- job_config,
- dict(status='dead', failure_reason=str(exc))
- )
- raise SkipJob()
-
- teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin')
- if not os.path.isdir(teuth_bin_path):
- raise RuntimeError("teuthology branch %s at %s not bootstrapped!" %
- (teuthology_branch, teuth_bin_path))
- return job_config, teuth_bin_path
-
-
-def run_job(job_config, teuth_bin_path, archive_dir, verbose):
- safe_archive = safepath.munge(job_config['name'])
- if job_config.get('first_in_suite') or job_config.get('last_in_suite'):
- if teuth_config.results_server:
- try:
- report.try_delete_jobs(job_config['name'], job_config['job_id'])
- except Exception as e:
- log.warning("Unable to delete job %s, exception occurred: %s",
- job_config['job_id'], e)
- suite_archive_dir = os.path.join(archive_dir, safe_archive)
- safepath.makedirs('/', suite_archive_dir)
- args = [
- os.path.join(teuth_bin_path, 'teuthology-results'),
- '--archive-dir', suite_archive_dir,
- '--name', job_config['name'],
- ]
- if job_config.get('first_in_suite'):
- log.info('Generating memo for %s', job_config['name'])
- if job_config.get('seed'):
- args.extend(['--seed', job_config['seed']])
- if job_config.get('subset'):
- args.extend(['--subset', job_config['subset']])
- if job_config.get('no_nested_subset'):
- args.extend(['--no-nested-subset'])
- else:
- log.info('Generating results for %s', job_config['name'])
- timeout = job_config.get('results_timeout',
- teuth_config.results_timeout)
- args.extend(['--timeout', str(timeout)])
- if job_config.get('email'):
- args.extend(['--email', job_config['email']])
- # Execute teuthology-results, passing 'preexec_fn=os.setpgrp' to
- # make sure that it will continue to run if this worker process
- # dies (e.g. because of a restart)
- result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp)
- log.info("teuthology-results PID: %s", result_proc.pid)
- return
-
- log.info('Creating archive dir %s', job_config['archive_path'])
- safepath.makedirs('/', job_config['archive_path'])
- log.info('Running job %s', job_config['job_id'])
-
- suite_path = job_config['suite_path']
- arg = [
- os.path.join(teuth_bin_path, 'teuthology'),
- ]
- # The following is for compatibility with older schedulers, from before we
- # started merging the contents of job_config['config'] into job_config
- # itself.
- if 'config' in job_config:
- inner_config = job_config.pop('config')
- if not isinstance(inner_config, dict):
- log.warning("run_job: job_config['config'] isn't a dict, it's a %s",
- str(type(inner_config)))
- else:
- job_config.update(inner_config)
-
- if verbose or job_config['verbose']:
- arg.append('-v')
-
- arg.extend([
- '--lock',
- '--block',
- '--owner', job_config['owner'],
- '--archive', job_config['archive_path'],
- '--name', job_config['name'],
- ])
- if job_config['description'] is not None:
- arg.extend(['--description', job_config['description']])
- arg.append('--')
-
- with tempfile.NamedTemporaryFile(prefix='teuthology-worker.',
- suffix='.tmp', mode='w+t') as tmp:
- yaml.safe_dump(data=job_config, stream=tmp)
- tmp.flush()
- arg.append(tmp.name)
- env = os.environ.copy()
- python_path = env.get('PYTHONPATH', '')
- python_path = ':'.join([suite_path, python_path]).strip(':')
- env['PYTHONPATH'] = python_path
- log.debug("Running: %s" % ' '.join(arg))
- p = subprocess.Popen(args=arg, env=env)
- log.info("Job archive: %s", job_config['archive_path'])
- log.info("Job PID: %s", str(p.pid))
-
- if teuth_config.results_server:
- log.info("Running with watchdog")
- try:
- run_with_watchdog(p, job_config)
- except Exception:
- log.exception("run_with_watchdog had an unhandled exception")
- raise
- else:
- log.info("Running without watchdog")
- # This sleep() is to give the child time to start up and create the
- # archive dir.
- time.sleep(5)
- symlink_worker_log(job_config['worker_log'],
- job_config['archive_path'])
- p.wait()
-
- if p.returncode != 0:
- log.error('Child exited with code %d', p.returncode)
- else:
- log.info('Success!')
-
-
-def run_with_watchdog(process, job_config):
- job_start_time = datetime.datetime.now(datetime.timezone.utc)
-
- # Only push the information that's relevant to the watchdog, to save db
- # load
- job_info = dict(
- name=job_config['name'],
- job_id=job_config['job_id'],
- )
-
- # Sleep once outside of the loop to avoid double-posting jobs
- time.sleep(teuth_config.watchdog_interval)
- symlink_worker_log(job_config['worker_log'], job_config['archive_path'])
- while process.poll() is None:
- # Kill jobs that have been running longer than the global max
- run_time = datetime.datetime.now(datetime.timezone.utc) - job_start_time
- total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds
- if total_seconds > teuth_config.max_job_time:
- log.warning("Job ran longer than {max}s. Killing...".format(
- max=teuth_config.max_job_time))
- kill.kill_job(job_info['name'], job_info['job_id'],
- teuth_config.archive_base, job_config['owner'])
-
- # calling this without a status just updates the jobs updated time
- report.try_push_job_info(job_info)
- time.sleep(teuth_config.watchdog_interval)
-
- # we no longer support testing theses old branches
- assert(job_config.get('teuthology_branch') not in ('argonaut', 'bobtail',
- 'cuttlefish', 'dumpling'))
-
- # Let's make sure that paddles knows the job is finished. We don't know
- # the status, but if it was a pass or fail it will have already been
- # reported to paddles. In that case paddles ignores the 'dead' status.
- # If the job was killed, paddles will use the 'dead' status.
- report.try_push_job_info(job_info, dict(status='dead'))
-
-
-def symlink_worker_log(worker_log_path, archive_dir):
- try:
- log.debug("Worker log: %s", worker_log_path)
- os.symlink(worker_log_path, os.path.join(archive_dir, 'worker.log'))
- except Exception:
- log.exception("Failed to symlink worker log")