From e9e9898d5e420c175bbce1d5dab0ce3ab15c36a9 Mon Sep 17 00:00:00 2001 From: Loic Dachary Date: Sun, 29 Nov 2015 11:40:36 +0100 Subject: [PATCH] suite: --wait to block until the suite finishes After all jobs in the suite are scheduled, wait for it to complete by checking the result server every five minutes. The polling delay is large but almost all suites take significantly longer than five minutes. The list of remaining jobs is displayed every five minutes in verbose mode. The suite is considered stale if the list of unfinished jobs does not change within config.max_job_time seconds (plus 30 minutes to avoid border cases). It can happen when a worker dies, the machine running the worker reboots etc. http://tracker.ceph.com/issues/13884 Fixes: 13884 Signed-off-by: Loic Dachary --- scripts/suite.py | 1 + teuthology/suite.py | 56 ++++++++++++++++++++++++++++++++++- teuthology/test/test_suite.py | 49 ++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+), 1 deletion(-) diff --git a/scripts/suite.py b/scripts/suite.py index 4b9b71270d..071de8eb50 100644 --- a/scripts/suite.py +++ b/scripts/suite.py @@ -28,6 +28,7 @@ Standard arguments: Optional extra job yaml to include -s , --suite The suite to schedule + --wait Block until the suite is finished -c , --ceph The ceph branch to run against [default: master] -k , --kernel diff --git a/teuthology/suite.py b/teuthology/suite.py index ac12dffac4..21f2f76571 100644 --- a/teuthology/suite.py +++ b/teuthology/suite.py @@ -14,6 +14,7 @@ import smtplib import socket import sys from time import sleep +from time import time import yaml import math from email.mime.text import MIMEText @@ -26,6 +27,8 @@ from .config import config, JobConfig from .exceptions import BranchNotFoundError, ScheduleFailError from .misc import deep_merge, get_results_url from .repo_utils import fetch_qa_suite, fetch_teuthology +from .report import ResultsReporter +from .results import UNFINISHED_STATUSES from .task.install import get_flavor log = logging.getLogger(__name__) @@ -121,7 +124,58 @@ def main(args): throttle=throttle, ) os.remove(base_yaml_path) - + if not dry_run and args['--wait']: + return wait(name, config.max_job_time, + None) + +WAIT_MAX_JOB_TIME = 30 * 60 +WAIT_PAUSE = 5 * 60 + +class WaitException(Exception): + pass + +def wait(name, max_job_time, upload_url): + stale_job = max_job_time + WAIT_MAX_JOB_TIME + reporter = ResultsReporter() + past_unfinished_jobs = [] + progress = time() + log.info("waiting for the suite to complete") + log.debug("the list of unfinished jobs will be displayed " + "every " + str(WAIT_PAUSE / 60) + " minutes") + exit_code = 0 + while True: + jobs = reporter.get_jobs(name, fields=['job_id', 'status']) + unfinished_jobs = [] + for job in jobs: + if job['status'] in UNFINISHED_STATUSES: + unfinished_jobs.append(job) + elif job['status'] != 'pass': + exit_code = 1 + if len(unfinished_jobs) == 0: + log.info("wait is done") + break + if (len(past_unfinished_jobs) == len(unfinished_jobs) and + time() - progress > stale_job): + raise WaitException( + "no progress since " + str(config.max_job_time) + + " + " + str(WAIT_PAUSE) + " seconds") + if len(past_unfinished_jobs) != len(unfinished_jobs): + past_unfinished_jobs = unfinished_jobs + progress = time() + sleep(WAIT_PAUSE) + job_ids = [ job['job_id'] for job in unfinished_jobs ] + log.debug('wait for jobs ' + str(job_ids)) + jobs = reporter.get_jobs(name, fields=['job_id', 'status', + 'description', 'log_href']) + # dead, fail, pass : show fail/dead jobs first + jobs = sorted(jobs, lambda a, b: cmp(a['status'], b['status'])) + for job in jobs: + if upload_url: + url = os.path.join(upload_url, name, job['job_id']) + else: + url = job['log_href'] + log.info(job['status'] + " " + url + " " + job['description']) + return exit_code def make_run_name(suite, ceph_branch, kernel_branch, kernel_flavor, machine_type, user=None, timestamp=None): diff --git a/teuthology/test/test_suite.py b/teuthology/test/test_suite.py index e1ecd4ce77..6c32606d7a 100644 --- a/teuthology/test/test_suite.py +++ b/teuthology/test/test_suite.py @@ -758,6 +758,55 @@ def test_git_branch_exists(m_check_output): m_check_output.return_value = 'HHH branch' assert True == suite.git_branch_exists('ceph', 'master') +@patch.object(suite.ResultsReporter, 'get_jobs') +def test_wait_success(m_get_jobs, caplog): + results = [ + [{'status': 'queued', 'job_id': '2'}], + [], + ] + final = [ + {'status': 'pass', 'job_id': '1', + 'description': 'DESC1', 'log_href': 'http://URL1'}, + {'status': 'fail', 'job_id': '2', + 'description': 'DESC2', 'log_href': 'http://URL2'}, + {'status': 'pass', 'job_id': '3', + 'description': 'DESC3', 'log_href': 'http://URL3'}, + ] + def get_jobs(name, **kwargs): + if kwargs['fields'] == ['job_id', 'status']: + return in_progress.pop(0) + else: + return final + m_get_jobs.side_effect = get_jobs + suite.WAIT_PAUSE = 1 + + in_progress = deepcopy(results) + assert 0 == suite.wait('name', 1, 'http://UPLOAD_URL') + assert m_get_jobs.called_with('name', fields=['job_id', 'status']) + assert 0 == len(in_progress) + assert 'fail http://UPLOAD_URL/name/2' in caplog.text() + + in_progress = deepcopy(results) + in_progress = deepcopy(results) + assert 0 == suite.wait('name', 1, None) + assert m_get_jobs.called_with('name', fields=['job_id', 'status']) + assert 0 == len(in_progress) + assert 'fail http://URL2' in caplog.text() + +@patch.object(suite.ResultsReporter, 'get_jobs') +def test_wait_fails(m_get_jobs): + results = [] + results.append([{'status': 'queued', 'job_id': '2'}]) + results.append([{'status': 'queued', 'job_id': '2'}]) + results.append([{'status': 'queued', 'job_id': '2'}]) + def get_jobs(name, **kwargs): + return results.pop(0) + m_get_jobs.side_effect = get_jobs + suite.WAIT_PAUSE = 1 + suite.WAIT_MAX_JOB_TIME = 1 + with pytest.raises(suite.WaitException) as error: + suite.wait('name', 1, None) + assert 'abc' in str(error) class TestSuiteMain(object): -- 2.39.5