-"""
-usage: teuthology-dispatcher --help
- teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config COFNFIG --archive-dir DIR
- teuthology-dispatcher [-v] [--archive-dir DIR] [--exit-on-empty-queue] --log-dir LOG_DIR --tube TUBE
+import argparse
+import sys
-Start a dispatcher for the specified tube. Grab jobs from a beanstalk
-queue and run the teuthology tests they describe as subprocesses. The
-subprocess invoked is a teuthology-dispatcher command run in supervisor
-mode.
+import teuthology.dispatcher
-Supervisor mode: Supervise the job run described by its config. Reimage
-target machines and invoke teuthology command. Unlock the target machines
-at the end of the run.
-standard arguments:
- -h, --help show this help message and exit
- -v, --verbose be more verbose
- -t, --tube TUBE which beanstalk tube to read jobs from
- -l, --log-dir LOG_DIR path in which to store logs
- -a DIR, --archive-dir DIR path to archive results in
- --supervisor run dispatcher in job supervisor mode
- --bin-path BIN_PATH teuthology bin path
- --job-config CONFIG file descriptor of job's config file
- --exit-on-empty-queue if the queue is empty, exit
-"""
+def parse_args(argv):
+ parser = argparse.ArgumentParser(
+ description="Start a dispatcher for the specified tube. Grab jobs from a beanstalk queue and run the teuthology tests they describe as subprocesses. The subprocess invoked is teuthology-supervisor."
+ )
+ parser.add_argument(
+ "-v",
+ "--verbose",
+ action="store_true",
+ help="be more verbose",
+ )
+ parser.add_argument(
+ "-a",
+ "--archive-dir",
+ type=str,
+ help="path to archive results in",
+ )
+ parser.add_argument(
+ "-t",
+ "--tube",
+ type=str,
+ help="which beanstalk tube to read jobs from",
+ required=True,
+ )
+ parser.add_argument(
+ "-l",
+ "--log-dir",
+ type=str,
+ help="path in which to store the dispatcher log",
+ required=True,
+ )
+ parser.add_argument(
+ "--exit-on-empty-queue",
+ action="store_true",
+ help="if the queue is empty, exit",
+ )
+ return parser.parse_args(argv)
-import docopt
-import sys
-import teuthology.dispatcher
+def main():
+ sys.exit(teuthology.dispatcher.main(parse_args(sys.argv[1:])))
-def main():
- args = docopt.docopt(__doc__)
- sys.exit(teuthology.dispatcher.main(args))
+if __name__ == "__main__":
+ main()
--- /dev/null
+import argparse
+import sys
+
+import teuthology.dispatcher.supervisor
+
+
+def parse_args(argv):
+ parser = argparse.ArgumentParser(
+ description="Supervise and run a teuthology job; normally only run by the dispatcher",
+ )
+ parser.add_argument(
+ "-v",
+ "--verbose",
+ action="store_true",
+ help="be more verbose",
+ )
+ parser.add_argument(
+ "-a",
+ "--archive-dir",
+ type=str,
+ help="path in which to store the job's logfiles",
+ required=True,
+ )
+ parser.add_argument(
+ "--bin-path",
+ type=str,
+ help="teuthology bin path",
+ required=True,
+ )
+ parser.add_argument(
+ "--job-config",
+ type=str,
+ help="file descriptor of job's config file",
+ required=True,
+ )
+ return parser.parse_args(argv)
+
+
+def main():
+ sys.exit(teuthology.dispatcher.supervisor.main(parse_args(sys.argv[1:])))
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+from script import Script
+
+
+class TestDispatcher(Script):
+ script_name = 'teuthology-dispatcher'
--- /dev/null
+from script import Script
+
+
+class TestSupervisor(Script):
+ script_name = 'teuthology-supervisor'
+++ /dev/null
-from script import Script
-
-
-class TestWorker(Script):
- script_name = 'teuthology-worker'
+++ /dev/null
-import argparse
-
-import teuthology.worker
-
-
-def main():
- teuthology.worker.main(parse_args())
-
-
-def parse_args():
- parser = argparse.ArgumentParser(description="""
-Grab jobs from a beanstalk queue and run the teuthology tests they
-describe. One job is run at a time.
-""")
- parser.add_argument(
- '-v', '--verbose',
- action='store_true', default=None,
- help='be more verbose',
- )
- parser.add_argument(
- '--archive-dir',
- metavar='DIR',
- help='path under which to archive results',
- required=True,
- )
- parser.add_argument(
- '-l', '--log-dir',
- help='path in which to store logs',
- required=True,
- )
- parser.add_argument(
- '-t', '--tube',
- help='which beanstalk tube to read jobs from',
- required=True,
- )
-
- return parser.parse_args()
teuthology-wait = scripts.wait:main
teuthology-exporter = scripts.exporter:main
teuthology-node-cleanup = scripts.node_cleanup:main
+ teuthology-supervisor = scripts.supervisor:main
[options.extras_require]
manhole =
exporter,
report,
repo_utils,
- worker,
)
from teuthology.config import config as teuth_config
from teuthology.dispatcher import supervisor
-from teuthology.exceptions import SkipJob
+from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries
from teuthology.lock import ops as lock_ops
from teuthology import safepath
def main(args):
- # run dispatcher in job supervisor mode if --supervisor passed
- if args["--supervisor"]:
- return supervisor.main(args)
-
- verbose = args["--verbose"]
- tube = args["--tube"]
- log_dir = args["--log-dir"]
- archive_dir = args["--archive-dir"]
- exit_on_empty_queue = args["--exit-on-empty-queue"]
-
- if archive_dir is None:
- archive_dir = teuth_config.archive_base
+ archive_dir = args.archive_dir or teuth_config.archive_base
# Refuse to start more than one dispatcher per machine type
- procs = find_dispatcher_processes().get(tube)
+ procs = find_dispatcher_processes().get(args.tube)
if procs:
raise RuntimeError(
"There is already a teuthology-dispatcher process running:"
# setup logging for disoatcher in {log_dir}
loglevel = logging.INFO
- if verbose:
+ if args.verbose:
loglevel = logging.DEBUG
logging.getLogger().setLevel(loglevel)
log.setLevel(loglevel)
- log_file_path = os.path.join(log_dir, f"dispatcher.{tube}.{os.getpid()}")
+ log_file_path = os.path.join(args.log_dir, f"dispatcher.{args.tube}.{os.getpid()}")
setup_log_file(log_file_path)
install_except_hook()
load_config(archive_dir=archive_dir)
connection = beanstalk.connect()
- beanstalk.watch_tube(connection, tube)
+ beanstalk.watch_tube(connection, args.tube)
result_proc = None
if teuth_config.teuthology_path is None:
job_procs.remove(proc)
job = connection.reserve(timeout=60)
if job is None:
- if exit_on_empty_queue and not job_procs:
+ if args.exit_on_empty_queue and not job_procs:
log.info("Queue is empty and no supervisor processes running; exiting!")
break
continue
keep_running = False
try:
- job_config, teuth_bin_path = worker.prep_job(
+ job_config, teuth_bin_path = prep_job(
job_config,
log_file_path,
archive_dir,
job_config = lock_machines(job_config)
run_args = [
- os.path.join(teuth_bin_path, 'teuthology-dispatcher'),
- '--supervisor',
+ os.path.join(teuth_bin_path, 'teuthology-supervisor'),
'-v',
'--bin-path', teuth_bin_path,
'--archive-dir', archive_dir,
return procs
+def prep_job(job_config, log_file_path, archive_dir):
+ job_id = job_config['job_id']
+ safe_archive = safepath.munge(job_config['name'])
+ job_config['worker_log'] = log_file_path
+ archive_path_full = os.path.join(
+ archive_dir, safe_archive, str(job_id))
+ job_config['archive_path'] = archive_path_full
+
+ # If the teuthology branch was not specified, default to main and
+ # store that value.
+ teuthology_branch = job_config.get('teuthology_branch', 'main')
+ job_config['teuthology_branch'] = teuthology_branch
+ teuthology_sha1 = job_config.get('teuthology_sha1')
+ if not teuthology_sha1:
+ repo_url = repo_utils.build_git_url('teuthology', 'ceph')
+ try:
+ teuthology_sha1 = repo_utils.ls_remote(repo_url, teuthology_branch)
+ except Exception as exc:
+ log.exception(f"Could not get teuthology sha1 for branch {teuthology_branch}")
+ report.try_push_job_info(
+ job_config,
+ dict(status='dead', failure_reason=str(exc))
+ )
+ raise SkipJob()
+ if not teuthology_sha1:
+ reason = "Teuthology branch {} not found; marking job as dead".format(teuthology_branch)
+ log.error(reason)
+ report.try_push_job_info(
+ job_config,
+ dict(status='dead', failure_reason=reason)
+ )
+ raise SkipJob()
+ if teuth_config.teuthology_path is None:
+ log.info('Using teuthology sha1 %s', teuthology_sha1)
+
+ try:
+ if teuth_config.teuthology_path is not None:
+ teuth_path = teuth_config.teuthology_path
+ else:
+ teuth_path = repo_utils.fetch_teuthology(branch=teuthology_branch,
+ commit=teuthology_sha1)
+ # For the teuthology tasks, we look for suite_branch, and if we
+ # don't get that, we look for branch, and fall back to 'main'.
+ # last-in-suite jobs don't have suite_branch or branch set.
+ ceph_branch = job_config.get('branch', 'main')
+ suite_branch = job_config.get('suite_branch', ceph_branch)
+ suite_sha1 = job_config.get('suite_sha1')
+ suite_repo = job_config.get('suite_repo')
+ if suite_repo:
+ teuth_config.ceph_qa_suite_git_url = suite_repo
+ job_config['suite_path'] = os.path.normpath(os.path.join(
+ repo_utils.fetch_qa_suite(suite_branch, suite_sha1),
+ job_config.get('suite_relpath', ''),
+ ))
+ except (BranchNotFoundError, CommitNotFoundError) as exc:
+ log.exception("Requested version not found; marking job as dead")
+ report.try_push_job_info(
+ job_config,
+ dict(status='dead', failure_reason=str(exc))
+ )
+ raise SkipJob()
+ except MaxWhileTries as exc:
+ log.exception("Failed to fetch or bootstrap; marking job as dead")
+ report.try_push_job_info(
+ job_config,
+ dict(status='dead', failure_reason=str(exc))
+ )
+ raise SkipJob()
+
+ teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin')
+ if not os.path.isdir(teuth_bin_path):
+ raise RuntimeError("teuthology branch %s at %s not bootstrapped!" %
+ (teuthology_branch, teuth_bin_path))
+ return job_config, teuth_bin_path
+
+
def lock_machines(job_config):
report.try_push_job_info(job_config, dict(status='running'))
fake_ctx = supervisor.create_fake_context(job_config, block=True)
def main(args):
-
- verbose = args["--verbose"]
- archive_dir = args["--archive-dir"]
- teuth_bin_path = args["--bin-path"]
- config_file_path = args["--job-config"]
-
- with open(config_file_path, 'r') as config_file:
+ with open(args.job_config, 'r') as config_file:
job_config = yaml.safe_load(config_file)
loglevel = logging.INFO
- if verbose:
+ if args.verbose:
loglevel = logging.DEBUG
logging.getLogger().setLevel(loglevel)
log.setLevel(loglevel)
reimage(job_config)
else:
reimage(job_config)
- with open(config_file_path, 'w') as f:
+ with open(args.job_config, 'w') as f:
yaml.safe_dump(job_config, f, default_flow_style=False)
try:
with exporter.JobTime.labels(suite).time():
return run_job(
job_config,
- teuth_bin_path,
- archive_dir,
- verbose
+ args.bin_path,
+ args.archive_dir,
+ args.verbose
)
else:
return run_job(
job_config,
- teuth_bin_path,
- archive_dir,
- verbose
+ args.bin_path,
+ args.archive_dir,
+ args.verbose
)
except SkipJob:
return 0
--- /dev/null
+import datetime
+import os
+import pytest
+
+from unittest.mock import patch, Mock, MagicMock
+
+from teuthology import dispatcher
+from teuthology.config import FakeNamespace
+from teuthology.contextutil import MaxWhileTries
+
+
+class TestDispatcher(object):
+ @pytest.fixture(autouse=True)
+ def setup_method(self, tmp_path):
+ self.ctx = FakeNamespace()
+ self.ctx.verbose = True
+ self.ctx.archive_dir = str(tmp_path / "archive/dir")
+ self.ctx.log_dir = str(tmp_path / "log/dir")
+ self.ctx.tube = 'tube'
+
+ @patch("os.path.exists")
+ def test_restart_file_path_doesnt_exist(self, m_exists):
+ m_exists.return_value = False
+ result = dispatcher.sentinel(dispatcher.restart_file_path)
+ assert not result
+
+ @patch("os.path.getmtime")
+ @patch("os.path.exists")
+ def test_needs_restart(self, m_exists, m_getmtime):
+ m_exists.return_value = True
+ now = datetime.datetime.now(datetime.timezone.utc)
+ m_getmtime.return_value = (now + datetime.timedelta(days=1)).timestamp()
+ assert dispatcher.sentinel(dispatcher.restart_file_path)
+
+ @patch("os.path.getmtime")
+ @patch("os.path.exists")
+ def test_does_not_need_restart(self, m_exists, m_getmtime):
+ m_exists.return_value = True
+ now = datetime.datetime.now(datetime.timezone.utc)
+ m_getmtime.return_value = (now - datetime.timedelta(days=1)).timestamp()
+ assert not dispatcher.sentinel(dispatcher.restart_file_path)
+
+ @patch("teuthology.repo_utils.ls_remote")
+ @patch("os.path.isdir")
+ @patch("teuthology.repo_utils.fetch_teuthology")
+ @patch("teuthology.dispatcher.teuth_config")
+ @patch("teuthology.repo_utils.fetch_qa_suite")
+ def test_prep_job(self, m_fetch_qa_suite, m_teuth_config,
+ m_fetch_teuthology, m_isdir, m_ls_remote):
+ config = dict(
+ name="the_name",
+ job_id="1",
+ suite_sha1="suite_hash",
+ )
+ m_fetch_teuthology.return_value = '/teuth/path'
+ m_fetch_qa_suite.return_value = '/suite/path'
+ m_ls_remote.return_value = 'teuth_hash'
+ m_isdir.return_value = True
+ m_teuth_config.teuthology_path = None
+ got_config, teuth_bin_path = dispatcher.prep_job(
+ config,
+ self.ctx.log_dir,
+ self.ctx.archive_dir,
+ )
+ assert got_config['worker_log'] == self.ctx.log_dir
+ assert got_config['archive_path'] == os.path.join(
+ self.ctx.archive_dir,
+ config['name'],
+ config['job_id'],
+ )
+ assert got_config['teuthology_branch'] == 'main'
+ m_fetch_teuthology.assert_called_once_with(branch='main', commit='teuth_hash')
+ assert teuth_bin_path == '/teuth/path/virtualenv/bin'
+ m_fetch_qa_suite.assert_called_once_with('main', 'suite_hash')
+ assert got_config['suite_path'] == '/suite/path'
+
+ def build_fake_jobs(self, m_connection, m_job, job_bodies):
+ """
+ Given patched copies of:
+ beanstalkc.Connection
+ beanstalkc.Job
+ And a list of basic job bodies, return a list of mocked Job objects
+ """
+ # Make sure instantiating m_job returns a new object each time
+ jobs = []
+ job_id = 0
+ for job_body in job_bodies:
+ job_id += 1
+ job = MagicMock(conn=m_connection, jid=job_id, body=job_body)
+ job.jid = job_id
+ job.body = job_body
+ jobs.append(job)
+ return jobs
+
+ @patch("teuthology.dispatcher.find_dispatcher_processes")
+ @patch("teuthology.repo_utils.ls_remote")
+ @patch("teuthology.dispatcher.report.try_push_job_info")
+ @patch("teuthology.dispatcher.supervisor.run_job")
+ @patch("beanstalkc.Job", autospec=True)
+ @patch("teuthology.repo_utils.fetch_qa_suite")
+ @patch("teuthology.repo_utils.fetch_teuthology")
+ @patch("teuthology.dispatcher.beanstalk.watch_tube")
+ @patch("teuthology.dispatcher.beanstalk.connect")
+ @patch("os.path.isdir", return_value=True)
+ @patch("teuthology.dispatcher.setup_log_file")
+ def test_main_loop(
+ self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
+ m_fetch_teuthology, m_fetch_qa_suite, m_job, m_run_job,
+ m_try_push_job_info, m_ls_remote, m_find_dispatcher_processes,
+ ):
+ m_find_dispatcher_processes.return_value = {}
+ m_connection = Mock()
+ jobs = self.build_fake_jobs(
+ m_connection,
+ m_job,
+ [
+ 'name: name\nfoo: bar',
+ 'name: name\nstop_worker: true',
+ ],
+ )
+ m_connection.reserve.side_effect = jobs
+ m_connect.return_value = m_connection
+ dispatcher.main(self.ctx)
+ # There should be one reserve call per item in the jobs list
+ expected_reserve_calls = [
+ dict(timeout=60) for i in range(len(jobs))
+ ]
+ got_reserve_calls = [
+ call[1] for call in m_connection.reserve.call_args_list
+ ]
+ assert got_reserve_calls == expected_reserve_calls
+ for job in jobs:
+ job.bury.assert_called_once_with()
+ job.delete.assert_called_once_with()
+
+ @patch("teuthology.dispatcher.find_dispatcher_processes")
+ @patch("teuthology.repo_utils.ls_remote")
+ @patch("teuthology.dispatcher.report.try_push_job_info")
+ @patch("teuthology.dispatcher.supervisor.run_job")
+ @patch("beanstalkc.Job", autospec=True)
+ @patch("teuthology.repo_utils.fetch_qa_suite")
+ @patch("teuthology.repo_utils.fetch_teuthology")
+ @patch("teuthology.dispatcher.beanstalk.watch_tube")
+ @patch("teuthology.dispatcher.beanstalk.connect")
+ @patch("os.path.isdir", return_value=True)
+ @patch("teuthology.dispatcher.setup_log_file")
+ def test_main_loop_13925(
+ self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
+ m_fetch_teuthology, m_fetch_qa_suite, m_job, m_run_job,
+ m_try_push_job_info, m_ls_remote, m_find_dispatcher_processes,
+ ):
+ m_find_dispatcher_processes.return_value = {}
+ m_connection = Mock()
+ jobs = self.build_fake_jobs(
+ m_connection,
+ m_job,
+ [
+ 'name: name',
+ 'name: name\nstop_worker: true',
+ ],
+ )
+ m_connection.reserve.side_effect = jobs
+ m_connect.return_value = m_connection
+ m_fetch_qa_suite.side_effect = [
+ '/suite/path',
+ MaxWhileTries(),
+ MaxWhileTries(),
+ ]
+ dispatcher.main(self.ctx)
+ assert len(m_run_job.call_args_list) == 0
+ assert len(m_try_push_job_info.call_args_list) == len(jobs)
+ for i in range(len(jobs)):
+ push_call = m_try_push_job_info.call_args_list[i]
+ assert push_call[0][1]['status'] == 'dead'
--- /dev/null
+from subprocess import DEVNULL
+from unittest.mock import patch, Mock, MagicMock
+
+from teuthology.dispatcher import supervisor
+
+
+class TestSuperviser(object):
+ @patch("teuthology.dispatcher.supervisor.run_with_watchdog")
+ @patch("teuthology.dispatcher.supervisor.teuth_config")
+ @patch("subprocess.Popen")
+ @patch("os.environ")
+ @patch("os.mkdir")
+ @patch("yaml.safe_dump")
+ @patch("tempfile.NamedTemporaryFile")
+ def test_run_job_with_watchdog(self, m_tempfile, m_safe_dump, m_mkdir,
+ m_environ, m_popen, m_t_config,
+ m_run_watchdog):
+ config = {
+ "suite_path": "suite/path",
+ "config": {"foo": "bar"},
+ "verbose": True,
+ "owner": "the_owner",
+ "archive_path": "archive/path",
+ "name": "the_name",
+ "description": "the_description",
+ "job_id": "1",
+ }
+ m_tmp = MagicMock()
+ temp_file = Mock()
+ temp_file.name = "the_name"
+ m_tmp.__enter__.return_value = temp_file
+ m_tempfile.return_value = m_tmp
+ m_p = Mock()
+ m_p.returncode = 0
+ m_popen.return_value = m_p
+ m_t_config.results_server = True
+ supervisor.run_job(config, "teuth/bin/path", "archive/dir", verbose=False)
+ m_run_watchdog.assert_called_with(m_p, config)
+ expected_args = [
+ 'teuth/bin/path/teuthology',
+ '-v',
+ '--owner', 'the_owner',
+ '--archive', 'archive/path',
+ '--name', 'the_name',
+ '--description',
+ 'the_description',
+ '--',
+ "archive/path/orig.config.yaml",
+ ]
+ m_popen.assert_called_with(args=expected_args, stderr=DEVNULL, stdout=DEVNULL)
+
+ @patch("time.sleep")
+ @patch("teuthology.dispatcher.supervisor.teuth_config")
+ @patch("subprocess.Popen")
+ @patch("os.environ")
+ @patch("os.mkdir")
+ @patch("yaml.safe_dump")
+ @patch("tempfile.NamedTemporaryFile")
+ def test_run_job_no_watchdog(self, m_tempfile, m_safe_dump, m_mkdir,
+ m_environ, m_popen, m_t_config,
+ m_sleep):
+ config = {
+ "suite_path": "suite/path",
+ "config": {"foo": "bar"},
+ "verbose": True,
+ "owner": "the_owner",
+ "archive_path": "archive/path",
+ "name": "the_name",
+ "description": "the_description",
+ "job_id": "1",
+ }
+ m_tmp = MagicMock()
+ temp_file = Mock()
+ temp_file.name = "the_name"
+ m_tmp.__enter__.return_value = temp_file
+ m_tempfile.return_value = m_tmp
+ env = dict(PYTHONPATH="python/path")
+ m_environ.copy.return_value = env
+ m_p = Mock()
+ m_p.returncode = 1
+ m_popen.return_value = m_p
+ m_t_config.results_server = False
+ supervisor.run_job(config, "teuth/bin/path", "archive/dir", verbose=False)
+
+ @patch("teuthology.dispatcher.supervisor.report.try_push_job_info")
+ @patch("time.sleep")
+ def test_run_with_watchdog_no_reporting(self, m_sleep, m_try_push):
+ config = {
+ "name": "the_name",
+ "job_id": "1",
+ "archive_path": "archive/path",
+ "teuthology_branch": "main"
+ }
+ process = Mock()
+ process.poll.return_value = "not None"
+ supervisor.run_with_watchdog(process, config)
+ m_try_push.assert_called_with(
+ dict(name=config["name"], job_id=config["job_id"]),
+ dict(status='dead')
+ )
+
+ @patch("subprocess.Popen")
+ @patch("time.sleep")
+ @patch("teuthology.dispatcher.supervisor.report.try_push_job_info")
+ def test_run_with_watchdog_with_reporting(self, m_tpji, m_sleep, m_popen):
+ config = {
+ "name": "the_name",
+ "job_id": "1",
+ "archive_path": "archive/path",
+ "teuthology_branch": "jewel"
+ }
+ process = Mock()
+ process.poll.return_value = "not None"
+ m_proc = Mock()
+ m_proc.poll.return_value = "not None"
+ m_popen.return_value = m_proc
+ supervisor.run_with_watchdog(process, config)