From: Zack Cerza Date: Fri, 27 May 2016 15:43:48 +0000 (-0600) Subject: Refactor parts of suite into new Run class X-Git-Tag: 1.1.0~603^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8194e14d3fec90251700ff635c92709b83a4a15e;p=teuthology.git Refactor parts of suite into new Run class Including smaller, more purposeful methods. This is for readability, testability and to facilitate making future changes. Signed-off-by: Zack Cerza --- diff --git a/teuthology/suite.py b/teuthology/suite.py index b96f8f5f6..668381dfe 100644 --- a/teuthology/suite.py +++ b/teuthology/suite.py @@ -13,8 +13,7 @@ import subprocess import smtplib import socket import sys -from time import sleep -from time import time +import time import yaml from email.mime.text import MIMEText from tempfile import NamedTemporaryFile @@ -22,8 +21,9 @@ from tempfile import NamedTemporaryFile import teuthology import matrix from . import lock -from .config import config, JobConfig -from .exceptions import BranchNotFoundError, CommitNotFoundError, ScheduleFailError +from .config import config, JobConfig, YamlConfig +from .exceptions import (BranchNotFoundError, CommitNotFoundError, + ScheduleFailError) from .misc import deep_merge, get_results_url from .orchestra.opsys import OS from .packaging import GitbuilderProject @@ -35,43 +35,46 @@ from .task.install import get_flavor log = logging.getLogger(__name__) +def process_args(args): + conf = YamlConfig() + rename_args = { + 'ceph': 'ceph_branch', + 'sha1': 'ceph_sha1', + 'kernel': 'kernel_branch', + # FIXME: ceph flavor and kernel flavor are separate things + 'flavor': 'kernel_flavor', + '': 'base_yaml_paths', + 'filter': 'filter_in', + } + for (key, value) in args.iteritems(): + # Translate --foo-bar to foo_bar + key = key.lstrip('--').replace('-', '_') + # Rename the key if necessary + key = rename_args.get(key) or key + if key == 'suite': + value = value.replace('/', ':') + elif key in ('limit', 'priority', 'num'): + value = int(value) + conf[key] = value + return conf + + def main(args): - verbose = args['--verbose'] - if verbose: + fn = process_args(args) + if fn.verbose: teuthology.log.setLevel(logging.DEBUG) - dry_run = args['--dry-run'] - - base_yaml_paths = args[''] - suite = args['--suite'].replace('/', ':') - ceph_branch = args['--ceph'] - ceph_sha1 = args['--sha1'] - kernel_branch = args['--kernel'] - kernel_flavor = args['--flavor'] - teuthology_branch = args['--teuthology-branch'] - machine_type = args['--machine-type'] - if not machine_type or machine_type == 'None': + + if not fn.machine_type or fn.machine_type == 'None': schedule_fail("Must specify a machine_type") - elif 'multi' in machine_type: + elif 'multi' in fn.machine_type: schedule_fail("'multi' is not a valid machine_type. " + "Maybe you want 'plana,mira,burnupi' or similar") - distro = args['--distro'] - suite_branch = args['--suite-branch'] - suite_dir = args['--suite-dir'] - - limit = int(args['--limit']) - priority = int(args['--priority']) - num = int(args['--num']) - owner = args['--owner'] - email = args['--email'] - if email: - config.results_email = email + + if fn.email: + config.results_email = fn.email if args['--archive-upload']: config.archive_upload = args['--archive-upload'] log.info('Will upload archives to ' + args['--archive-upload']) - timeout = args['--timeout'] - filter_in = args['--filter'] - filter_out = args['--filter-out'] - throttle = args['--throttle'] subset = None if args['--subset']: @@ -79,71 +82,439 @@ def main(args): subset = tuple(map(int, args['--subset'].split('/'))) log.info('Passed subset=%s/%s' % (str(subset[0]), str(subset[1]))) - name = make_run_name(suite, ceph_branch, kernel_branch, kernel_flavor, - machine_type) - - job_config = create_initial_config(suite, suite_branch, ceph_branch, - ceph_sha1, teuthology_branch, - kernel_branch, kernel_flavor, distro, - machine_type, name) - - if suite_dir: - suite_repo_path = suite_dir - else: - suite_repo_path = fetch_repos(job_config.suite_branch, test_name=name) + run = Run(fn) + job_config = run.base_config + name = run.name job_config.name = name - job_config.priority = priority + job_config.priority = fn.priority if config.results_email: job_config.email = config.results_email - if owner: - job_config.owner = owner + if fn.owner: + job_config.owner = fn.owner - if dry_run: + if fn.dry_run: log.debug("Base job config:\n%s" % job_config) - # Interpret any relative paths as being relative to ceph-qa-suite (absolute - # paths are unchanged by this) - base_yaml_paths = [os.path.join(suite_repo_path, b) for b in - base_yaml_paths] - with NamedTemporaryFile(prefix='schedule_suite_', delete=False) as base_yaml: base_yaml.write(str(job_config)) base_yaml_path = base_yaml.name - base_yaml_paths.insert(0, base_yaml_path) - prepare_and_schedule(job_config=job_config, - suite_repo_path=suite_repo_path, - base_yaml_paths=base_yaml_paths, - limit=limit, - num=num, - timeout=timeout, - dry_run=dry_run, - verbose=verbose, - filter_in=filter_in, - filter_out=filter_out, - subset=subset, - throttle=throttle, - ) + run.base_yaml_paths.insert(0, base_yaml_path) + run.prepare_and_schedule() os.remove(base_yaml_path) - if not dry_run and args['--wait']: + if not fn.dry_run and args['--wait']: return wait(name, config.max_job_time, args['--archive-upload-url']) -WAIT_MAX_JOB_TIME = 30 * 60 -WAIT_PAUSE = 5 * 60 + +class Run(object): + WAIT_MAX_JOB_TIME = 30 * 60 + WAIT_PAUSE = 5 * 60 + __slots__ = ( + 'args', 'name', 'base_config', 'suite_repo_path', 'base_yaml_paths', + 'base_args', + ) + + def __init__(self, args): + """ + args must be a config.YamlConfig object + """ + self.args = args + self.name = self.make_run_name() + self.base_config = self.create_initial_config() + + if self.args.suite_dir: + self.suite_repo_path = self.args.suite_dir + else: + self.suite_repo_path = fetch_repos(self.base_config.suite_branch, + test_name=self.name) + + # Interpret any relative paths as being relative to ceph-qa-suite + # (absolute paths are unchanged by this) + self.base_yaml_paths = [os.path.join(self.suite_repo_path, b) for b in + self.args.base_yaml_paths] + + def make_run_name(self): + """ + Generate a run name. A run name looks like: + teuthology-2014-06-23_19:00:37-rados-dumpling-testing-basic-plana + """ + user = self.args.user or pwd.getpwuid(os.getuid()).pw_name + # We assume timestamp is a datetime.datetime object + timestamp = self.args.timestamp or \ + datetime.now().strftime('%Y-%m-%d_%H:%M:%S') + + worker = get_worker(self.args.machine_type) + return '-'.join( + [ + user, str(timestamp), self.args.suite, self.args.ceph_branch, + self.args.kernel_branch or '-', self.args.kernel_flavor, worker + ] + ) + + def create_initial_config(self): + """ + Put together the config file used as the basis for each job in the run. + Grabs hashes for the latest ceph, kernel and teuthology versions in the + branches specified and specifies them so we know exactly what we're + testing. + + :returns: A JobConfig object + """ + kernel_dict = self.choose_kernel() + ceph_hash = self.choose_ceph_hash() + # We don't store ceph_version because we don't use it yet outside of + # logging. + self.choose_ceph_version(ceph_hash) + teuthology_branch = self.choose_teuthology_branch() + suite_branch = self.choose_suite_branch() + suite_hash = self.choose_suite_hash(suite_branch) + + config_input = dict( + suite=self.args.suite, + suite_branch=suite_branch, + suite_hash=suite_hash, + ceph_branch=self.args.ceph_branch, + ceph_hash=ceph_hash, + teuthology_branch=teuthology_branch, + machine_type=self.args.machine_type, + distro=self.args.distro, + archive_upload=config.archive_upload, + archive_upload_key=config.archive_upload_key, + ) + conf_dict = substitute_placeholders(dict_templ, config_input) + conf_dict.update(kernel_dict) + job_config = JobConfig.from_dict(conf_dict) + return job_config + + def choose_kernel(self): + # Put together a stanza specifying the kernel hash + if self.args.kernel_branch == 'distro': + kernel_hash = 'distro' + # Skip the stanza if no -k given + elif self.args.kernel_branch is None: + kernel_hash = None + else: + kernel_hash = get_gitbuilder_hash( + 'kernel', self.args.kernel_branch, self.args.kernel_flavor, + self.args.machine_type, self.args.distro, + ) + if not kernel_hash: + schedule_fail( + message="Kernel branch '{branch}' not found".format( + branch=self.args.kernel_branch), name=self.name + ) + if kernel_hash: + log.info("kernel sha1: {hash}".format(hash=kernel_hash)) + kernel_dict = dict(kernel=dict(kdb=True, sha1=kernel_hash)) + if kernel_hash is not 'distro': + kernel_dict['kernel']['flavor'] = self.args.kernel_flavor + else: + kernel_dict = dict() + return kernel_dict + + def choose_ceph_hash(self): + """ + Get the ceph hash: if --sha1/-S is supplied, use it if it is valid, and + just keep the ceph_branch around. Otherwise use the current git branch + tip. + """ + + if self.args.ceph_sha1: + ceph_hash = git_validate_sha1('ceph', self.args.ceph_sha1) + if not ceph_hash: + exc = CommitNotFoundError(self.args.ceph_sha1, 'ceph.git') + schedule_fail(message=str(exc), name=self.name) + log.info("ceph sha1 explicitly supplied") + + elif self.args.ceph_branch: + ceph_hash = git_ls_remote('ceph', self.args.ceph_branch) + if not ceph_hash: + exc = BranchNotFoundError(self.args.ceph_branch, 'ceph.git') + schedule_fail(message=str(exc), name=self.name) + + log.info("ceph sha1: {hash}".format(hash=ceph_hash)) + return ceph_hash + + def choose_ceph_version(self, ceph_hash): + if config.suite_verify_ceph_hash: + # Get the ceph package version + ceph_version = package_version_for_hash( + ceph_hash, self.args.kernel_flavor, self.args.distro, + self.args.machine_type, + ) + if not ceph_version: + schedule_fail( + "Packages for ceph hash '{ver}' not found".format( + ver=ceph_hash), self.name) + log.info("ceph version: {ver}".format(ver=ceph_version)) + return ceph_version + else: + log.info('skipping ceph package verification') + + def choose_teuthology_branch(self): + teuthology_branch = self.args.teuthology_branch + if teuthology_branch and teuthology_branch != 'master': + if not git_branch_exists('teuthology', teuthology_branch): + exc = BranchNotFoundError(teuthology_branch, 'teuthology.git') + schedule_fail(message=str(exc), name=self.name) + elif not teuthology_branch: + # Decide what branch of teuthology to use + if git_branch_exists('teuthology', self.args.ceph_branch): + teuthology_branch = self.args.ceph_branch + else: + log.info( + "branch {0} not in teuthology.git; will use master for" + " teuthology".format(self.args.ceph_branch)) + teuthology_branch = 'master' + log.info("teuthology branch: %s", teuthology_branch) + return teuthology_branch + + def choose_suite_branch(self): + suite_branch = self.args.suite_branch + ceph_branch = self.args.ceph_branch + if suite_branch and suite_branch != 'master': + if not git_branch_exists('ceph-qa-suite', suite_branch): + exc = BranchNotFoundError(suite_branch, 'ceph-qa-suite.git') + schedule_fail(message=str(exc), name=self.name) + elif not suite_branch: + # Decide what branch of ceph-qa-suite to use + if git_branch_exists('ceph-qa-suite', ceph_branch): + suite_branch = ceph_branch + else: + log.info( + "branch {0} not in ceph-qa-suite.git; will use master for" + " ceph-qa-suite".format(ceph_branch)) + suite_branch = 'master' + return suite_branch + + def choose_suite_hash(self, suite_branch): + suite_hash = git_ls_remote('ceph-qa-suite', suite_branch) + if not suite_hash: + exc = BranchNotFoundError(suite_branch, 'ceph-qa-suite.git') + schedule_fail(message=str(exc), name=self.name) + log.info("ceph-qa-suite branch: %s %s", suite_branch, suite_hash) + + def build_base_args(self): + base_args = [ + '--name', self.name, + '--num', str(self.args.num), + '--worker', get_worker(self.args.machine_type), + ] + if self.args.dry_run: + base_args.append('--dry-run') + if self.args.priority is not None: + base_args.extend(['--priority', str(self.args.priority)]) + if self.args.verbose: + base_args.append('-v') + if self.args.owner: + base_args.extend(['--owner', self.args.owner]) + return base_args + + def prepare_and_schedule(self): + """ + Puts together some "base arguments" with which to execute + teuthology-schedule for each job, then passes them and other parameters + to schedule_suite(). Finally, schedules a "last-in-suite" job that + sends an email to the specified address (if one is configured). + """ + self.base_args = self.build_base_args() + + # Make sure the yaml paths are actually valid + for yaml_path in self.base_yaml_paths: + full_yaml_path = os.path.join(self.suite_repo_path, yaml_path) + if not os.path.exists(full_yaml_path): + raise IOError("File not found: " + full_yaml_path) + + num_jobs = self.schedule_suite() + + if self.base_config.email and num_jobs: + arg = copy.deepcopy(self.base_args) + arg.append('--last-in-suite') + arg.extend(['--email', self.base_config.email]) + if self.args.timeout: + arg.extend(['--timeout', self.args.timeout]) + teuthology_schedule( + args=arg, + dry_run=self.args.dry_run, + verbose=self.args.verbose, + log_prefix="Results email: ", + ) + results_url = get_results_url(self.base_config.name) + if results_url: + log.info("Test results viewable at %s", results_url) + + def schedule_suite(self): + """ + Schedule the suite-run. Returns the number of jobs scheduled. + """ + name = self.name + arch = get_arch(self.base_config.machine_type) + suite_name = self.base_config.suite + suite_path = os.path.join( + self.suite_repo_path, 'suites', + self.base_config.suite.replace(':', '/')) + log.debug('Suite %s in %s' % (suite_name, suite_path)) + configs = [ + (combine_path(suite_name, item[0]), item[1]) for item in + build_matrix(suite_path, subset=self.args.subset) + ] + log.info('Suite %s in %s generated %d jobs (not yet filtered)' % ( + suite_name, suite_path, len(configs))) + + # used as a local cache for package versions from gitbuilder + package_versions = dict() + jobs_to_schedule = [] + jobs_missing_packages = [] + for description, fragment_paths in configs: + base_frag_paths = [strip_fragment_path(x) for x in fragment_paths] + limit = self.args.limit + if limit > 0 and len(jobs_to_schedule) >= limit: + log.info( + 'Stopped after {limit} jobs due to --limit={limit}'.format( + limit=limit)) + break + # Break apart the filter parameter (one string) into comma + # separated components to be used in searches. + filter_in = self.args.filter_in + if filter_in: + filter_list = [x.strip() for x in filter_in.split(',')] + if not any([x in description for x in filter_list]): + all_filt = [] + for filt_samp in filter_list: + all_filt.extend( + [x.find(filt_samp) < 0 for x in base_frag_paths] + ) + if all(all_filt): + continue + filter_out = self.args.filter_out + if filter_out: + filter_list = [x.strip() for x in filter_out.split(',')] + if any([x in description for x in filter_list]): + continue + all_filt_val = False + for filt_samp in filter_list: + flist = [filt_samp in x for x in base_frag_paths] + if any(flist): + all_filt_val = True + continue + if all_filt_val: + continue + + raw_yaml = '\n'.join([file(a, 'r').read() for a in fragment_paths]) + + parsed_yaml = yaml.load(raw_yaml) + os_type = parsed_yaml.get('os_type') or self.base_config.os_type + exclude_arch = parsed_yaml.get('exclude_arch') + exclude_os_type = parsed_yaml.get('exclude_os_type') + + if exclude_arch and exclude_arch == arch: + log.info('Skipping due to excluded_arch: %s facets %s', + exclude_arch, description) + continue + if exclude_os_type and exclude_os_type == os_type: + log.info('Skipping due to excluded_os_type: %s facets %s', + exclude_os_type, description) + continue + + arg = copy.deepcopy(self.base_args) + arg.extend([ + '--description', description, + '--', + ]) + arg.extend(self.base_yaml_paths) + arg.extend(fragment_paths) + + job = dict( + yaml=parsed_yaml, + desc=description, + sha1=self.base_config.sha1, + args=arg + ) + + if config.suite_verify_ceph_hash: + full_job_config = dict() + deep_merge(full_job_config, self.base_config.to_dict()) + deep_merge(full_job_config, parsed_yaml) + flavor = get_install_task_flavor(full_job_config) + sha1 = self.base_config.sha1 + # Get package versions for this sha1, os_type and flavor. If + # we've already retrieved them in a previous loop, they'll be + # present in package_versions and gitbuilder will not be asked + # again for them. + package_versions = get_package_versions( + sha1, + os_type, + flavor, + package_versions + ) + if not has_packages_for_distro(sha1, os_type, flavor, + package_versions): + m = "Packages for os_type '{os}', flavor {flavor} and " + \ + "ceph hash '{ver}' not found" + log.error(m.format(os=os_type, flavor=flavor, ver=sha1)) + jobs_missing_packages.append(job) + + jobs_to_schedule.append(job) + + for job in jobs_to_schedule: + log.info( + 'Scheduling %s', job['desc'] + ) + + log_prefix = '' + if job in jobs_missing_packages: + log_prefix = "Missing Packages: " + if (not self.args.dry_run and not + config.suite_allow_missing_packages): + schedule_fail( + "At least one job needs packages that don't exist for " + "hash {sha1}.".format(sha1=self.base_config.sha1), + name, + ) + teuthology_schedule( + args=job['args'], + dry_run=self.args.dry_run, + verbose=self.args.verbose, + log_prefix=log_prefix, + ) + throttle = self.args.throttle + if not self.args.dry_run and throttle: + log.info("pause between jobs : --throttle " + str(throttle)) + time.sleep(int(throttle)) + + count = len(jobs_to_schedule) + missing_count = len(jobs_missing_packages) + log.info( + 'Suite %s in %s scheduled %d jobs.' % + (suite_name, suite_path, count) + ) + log.info('%d/%d jobs were filtered out.', + (len(configs) - count), + len(configs)) + if missing_count: + log.warn('Scheduled %d/%d jobs that are missing packages!', + missing_count, count) + return count + + +class Job(object): + pass + class WaitException(Exception): pass + def wait(name, max_job_time, upload_url): - stale_job = max_job_time + WAIT_MAX_JOB_TIME + stale_job = max_job_time + Run.WAIT_MAX_JOB_TIME reporter = ResultsReporter() past_unfinished_jobs = [] - progress = time() + progress = time.time() log.info("waiting for the suite to complete") log.debug("the list of unfinished jobs will be displayed " - "every " + str(WAIT_PAUSE / 60) + " minutes") + "every " + str(Run.WAIT_PAUSE / 60) + " minutes") exit_code = 0 while True: jobs = reporter.get_jobs(name, fields=['job_id', 'status']) @@ -157,15 +528,15 @@ def wait(name, max_job_time, upload_url): log.info("wait is done") break if (len(past_unfinished_jobs) == len(unfinished_jobs) and - time() - progress > stale_job): + time.time() - progress > stale_job): raise WaitException( "no progress since " + str(config.max_job_time) + - " + " + str(WAIT_PAUSE) + " seconds") + " + " + str(Run.WAIT_PAUSE) + " seconds") if len(past_unfinished_jobs) != len(unfinished_jobs): past_unfinished_jobs = unfinished_jobs - progress = time() - sleep(WAIT_PAUSE) - job_ids = [ job['job_id'] for job in unfinished_jobs ] + progress = time.time() + time.sleep(Run.WAIT_PAUSE) + job_ids = [job['job_id'] for job in unfinished_jobs] log.debug('wait for jobs ' + str(job_ids)) jobs = reporter.get_jobs(name, fields=['job_id', 'status', 'description', 'log_href']) @@ -179,24 +550,6 @@ def wait(name, max_job_time, upload_url): log.info(job['status'] + " " + url + " " + job['description']) return exit_code -def make_run_name(suite, ceph_branch, kernel_branch, kernel_flavor, - machine_type, user=None, timestamp=None): - """ - Generate a run name based on the parameters. A run name looks like: - teuthology-2014-06-23_19:00:37-rados-dumpling-testing-basic-plana - """ - if not user: - user = pwd.getpwuid(os.getuid()).pw_name - # We assume timestamp is a datetime.datetime object - if not timestamp: - timestamp = datetime.now().strftime('%Y-%m-%d_%H:%M:%S') - - worker = get_worker(machine_type) - return '-'.join( - [user, str(timestamp), suite, ceph_branch, - kernel_branch or '-', kernel_flavor, worker] - ) - def fetch_repos(branch, test_name): """ @@ -224,186 +577,6 @@ def fetch_repos(branch, test_name): return suite_repo_path -def create_initial_config(suite, suite_branch, ceph_branch, ceph_sha1, - teuthology_branch, kernel_branch, kernel_flavor, - distro, machine_type, name=None): - """ - Put together the config file used as the basis for each job in the run. - Grabs hashes for the latest ceph, kernel and teuthology versions in the - branches specified and specifies them so we know exactly what we're - testing. - - :returns: A JobConfig object - """ - # Put together a stanza specifying the kernel hash - if kernel_branch == 'distro': - kernel_hash = 'distro' - # Skip the stanza if no -k given - elif kernel_branch is None: - kernel_hash = None - else: - kernel_hash = get_gitbuilder_hash('kernel', kernel_branch, - kernel_flavor, machine_type, distro) - if not kernel_hash: - schedule_fail(message="Kernel branch '{branch}' not found".format( - branch=kernel_branch), name=name) - if kernel_hash: - log.info("kernel sha1: {hash}".format(hash=kernel_hash)) - kernel_dict = dict(kernel=dict(kdb=True, sha1=kernel_hash)) - if kernel_hash is not 'distro': - kernel_dict['kernel']['flavor'] = kernel_flavor - else: - kernel_dict = dict() - - # Get the ceph hash: if --sha1/-S is supplied, use it if - # it is valid, and just keep the ceph_branch around. - # Otherwise use the current git branch tip. - - if ceph_sha1: - ceph_hash = git_validate_sha1('ceph', ceph_sha1) - if not ceph_hash: - exc = CommitNotFoundError(ceph_sha1, 'ceph.git') - schedule_fail(message=str(exc), name=name) - log.info("ceph sha1 explicitly supplied") - - elif ceph_branch: - ceph_hash = git_ls_remote('ceph', ceph_branch) - if not ceph_hash: - exc = BranchNotFoundError(ceph_branch, 'ceph.git') - schedule_fail(message=str(exc), name=name) - - log.info("ceph sha1: {hash}".format(hash=ceph_hash)) - - if config.suite_verify_ceph_hash: - # Get the ceph package version - ceph_version = package_version_for_hash(ceph_hash, kernel_flavor, - distro, machine_type) - if not ceph_version: - schedule_fail("Packages for ceph hash '{ver}' not found".format( - ver=ceph_hash), name) - log.info("ceph version: {ver}".format(ver=ceph_version)) - else: - log.info('skipping ceph package verification') - - if teuthology_branch and teuthology_branch != 'master': - if not git_branch_exists('teuthology', teuthology_branch): - exc = BranchNotFoundError(teuthology_branch, 'teuthology.git') - schedule_fail(message=str(exc), name=name) - elif not teuthology_branch: - # Decide what branch of teuthology to use - if git_branch_exists('teuthology', ceph_branch): - teuthology_branch = ceph_branch - else: - log.info("branch {0} not in teuthology.git; will use master for" - " teuthology".format(ceph_branch)) - teuthology_branch = 'master' - log.info("teuthology branch: %s", teuthology_branch) - - if suite_branch and suite_branch != 'master': - if not git_branch_exists('ceph-qa-suite', suite_branch): - exc = BranchNotFoundError(suite_branch, 'ceph-qa-suite.git') - schedule_fail(message=str(exc), name=name) - elif not suite_branch: - # Decide what branch of ceph-qa-suite to use - if git_branch_exists('ceph-qa-suite', ceph_branch): - suite_branch = ceph_branch - else: - log.info("branch {0} not in ceph-qa-suite.git; will use master for" - " ceph-qa-suite".format(ceph_branch)) - suite_branch = 'master' - suite_hash = git_ls_remote('ceph-qa-suite', suite_branch) - if not suite_hash: - exc = BranchNotFoundError(suite_branch, 'ceph-qa-suite.git') - schedule_fail(message=str(exc), name=name) - log.info("ceph-qa-suite branch: %s %s", suite_branch, suite_hash) - - config_input = dict( - suite=suite, - suite_branch=suite_branch, - suite_hash=suite_hash, - ceph_branch=ceph_branch, - ceph_hash=ceph_hash, - teuthology_branch=teuthology_branch, - machine_type=machine_type, - distro=distro, - archive_upload=config.archive_upload, - archive_upload_key=config.archive_upload_key, - ) - conf_dict = substitute_placeholders(dict_templ, config_input) - conf_dict.update(kernel_dict) - job_config = JobConfig.from_dict(conf_dict) - return job_config - - -def prepare_and_schedule(job_config, suite_repo_path, base_yaml_paths, limit, - num, timeout, dry_run, verbose, - filter_in, - filter_out, - subset, - throttle): - """ - Puts together some "base arguments" with which to execute - teuthology-schedule for each job, then passes them and other parameters to - schedule_suite(). Finally, schedules a "last-in-suite" job that sends an - email to the specified address (if one is configured). - """ - arch = get_arch(job_config.machine_type) - - base_args = [ - '--name', job_config.name, - '--num', str(num), - '--worker', get_worker(job_config.machine_type), - ] - if dry_run: - base_args.append('--dry-run') - if job_config.priority is not None: - base_args.extend(['--priority', str(job_config.priority)]) - if verbose: - base_args.append('-v') - if job_config.owner: - base_args.extend(['--owner', job_config.owner]) - - suite_path = os.path.join(suite_repo_path, 'suites', - job_config.suite.replace(':', '/')) - - # Make sure the yaml paths are actually valid - for yaml_path in base_yaml_paths: - full_yaml_path = os.path.join(suite_repo_path, yaml_path) - if not os.path.exists(full_yaml_path): - raise IOError("File not found: " + full_yaml_path) - - num_jobs = schedule_suite( - job_config=job_config, - path=suite_path, - base_yamls=base_yaml_paths, - base_args=base_args, - arch=arch, - limit=limit, - dry_run=dry_run, - verbose=verbose, - filter_in=filter_in, - filter_out=filter_out, - subset=subset, - throttle=throttle - ) - - if job_config.email and num_jobs: - arg = copy.deepcopy(base_args) - arg.append('--last-in-suite') - arg.extend(['--email', job_config.email]) - if timeout: - arg.extend(['--timeout', timeout]) - teuthology_schedule( - args=arg, - dry_run=dry_run, - verbose=verbose, - log_prefix="Results email: ", - ) - results_url = get_results_url(job_config.name) - if results_url: - log.info("Test results viewable at %s", results_url) - - def schedule_fail(message, name=''): """ If an email address has been specified anywhere, send an alert there. Then @@ -570,7 +743,7 @@ def git_validate_sha1(project, sha1, project_owner='ceph'): elif '/git.ceph.com/' in url: # kinda specific to knowing git.ceph.com is gitweb url = ('http://git.ceph.com/?p=%s.git;a=blob_plain;f=.gitignore;hb=%s' - % (project, sha1)) + % (project, sha1)) else: raise RuntimeError( 'git_validate_sha1: how do I check %s for a sha1?' % url @@ -595,11 +768,13 @@ def build_git_url(project, project_owner='ceph'): url_templ = re.sub('\.git$', '', base) return url_templ.format(project_owner=project_owner, project=project) + def git_branch_exists(project, branch, project_owner='ceph'): """ Query the git repository to check the existence of a project's branch """ - return git_ls_remote(project, branch, project_owner) != None + return git_ls_remote(project, branch, project_owner) is not None + def get_branch_info(project, branch, project_owner='ceph'): """ @@ -622,6 +797,7 @@ def get_branch_info(project, branch, project_owner='ceph'): if resp.ok: return resp.json() + def strip_fragment_path(original_path): """ Given a path, remove the text before '/suites/'. Part of the fix for @@ -633,155 +809,6 @@ def strip_fragment_path(original_path): return original_path[scan_start + len(scan_after):] return original_path -def schedule_suite(job_config, - path, - base_yamls, - base_args, - arch, - limit=0, - dry_run=True, - verbose=1, - filter_in=None, - filter_out=None, - subset=None, - throttle=None, - ): - """ - schedule one suite. - returns number of jobs scheduled - """ - suite_name = job_config.suite - name = job_config.name - log.debug('Suite %s in %s' % (suite_name, path)) - configs = [(combine_path(suite_name, item[0]), item[1]) for item in - build_matrix(path, subset=subset)] - log.info('Suite %s in %s generated %d jobs (not yet filtered)' % ( - suite_name, path, len(configs))) - - # used as a local cache for package versions from gitbuilder - package_versions = dict() - jobs_to_schedule = [] - jobs_missing_packages = [] - for description, fragment_paths in configs: - base_frag_paths = [strip_fragment_path(x) for x in fragment_paths] - if limit > 0 and len(jobs_to_schedule) >= limit: - log.info( - 'Stopped after {limit} jobs due to --limit={limit}'.format( - limit=limit)) - break - # Break apart the filter parameter (one string) into comma separated - # components to be used in searches. - if filter_in: - filter_list = [x.strip() for x in filter_in.split(',')] - if not any([x in description for x in filter_list]): - all_filt = [] - for filt_samp in filter_list: - all_filt.extend([x.find(filt_samp) < 0 for x in base_frag_paths]) - if all(all_filt): - continue - if filter_out: - filter_list = [x.strip() for x in filter_out.split(',')] - if any([x in description for x in filter_list]): - continue - all_filt_val = False - for filt_samp in filter_list: - flist = [filt_samp in x for x in base_frag_paths] - if any(flist): - all_filt_val = True - continue - if all_filt_val: - continue - - raw_yaml = '\n'.join([file(a, 'r').read() for a in fragment_paths]) - - parsed_yaml = yaml.load(raw_yaml) - os_type = parsed_yaml.get('os_type') or job_config.os_type - exclude_arch = parsed_yaml.get('exclude_arch') - exclude_os_type = parsed_yaml.get('exclude_os_type') - - if exclude_arch and exclude_arch == arch: - log.info('Skipping due to excluded_arch: %s facets %s', - exclude_arch, description) - continue - if exclude_os_type and exclude_os_type == os_type: - log.info('Skipping due to excluded_os_type: %s facets %s', - exclude_os_type, description) - continue - - arg = copy.deepcopy(base_args) - arg.extend([ - '--description', description, - '--', - ]) - arg.extend(base_yamls) - arg.extend(fragment_paths) - - job = dict( - yaml=parsed_yaml, - desc=description, - sha1=job_config.sha1, - args=arg - ) - - if config.suite_verify_ceph_hash: - full_job_config = dict() - deep_merge(full_job_config, job_config.to_dict()) - deep_merge(full_job_config, parsed_yaml) - flavor = get_install_task_flavor(full_job_config) - sha1 = job_config.sha1 - # Get package versions for this sha1, os_type and flavor. If we've - # already retrieved them in a previous loop, they'll be present in - # package_versions and gitbuilder will not be asked again for them. - package_versions = get_package_versions( - sha1, - os_type, - flavor, - package_versions - ) - if not has_packages_for_distro(sha1, os_type, flavor, - package_versions): - m = "Packages for os_type '{os}', flavor {flavor} and " + \ - "ceph hash '{ver}' not found" - log.error(m.format(os=os_type, flavor=flavor, ver=sha1)) - jobs_missing_packages.append(job) - - jobs_to_schedule.append(job) - - for job in jobs_to_schedule: - log.info( - 'Scheduling %s', job['desc'] - ) - - log_prefix = '' - if job in jobs_missing_packages: - log_prefix = "Missing Packages: " - if not dry_run and not config.suite_allow_missing_packages: - schedule_fail( - "At least one job needs packages that don't exist for hash" - " {sha1}.".format(sha1=job_config.sha1), - name, - ) - teuthology_schedule( - args=job['args'], - dry_run=dry_run, - verbose=verbose, - log_prefix=log_prefix, - ) - if not dry_run and throttle: - log.info("pause between jobs : --throttle " + str(throttle)) - sleep(int(throttle)) - - count = len(jobs_to_schedule) - missing_count = len(jobs_missing_packages) - log.info('Suite %s in %s scheduled %d jobs.' % (suite_name, path, count)) - log.info('%d/%d jobs were filtered out.', - (len(configs) - count), - len(configs)) - if missing_count: - log.warn('Scheduled %d/%d jobs that are missing packages!', - missing_count, count) - return count - def teuthology_schedule(args, verbose, dry_run, log_prefix=''): """ @@ -981,6 +1008,7 @@ def generate_combinations(path, mat, generate_from, generate_to): matrix.generate_paths(path, output, combine_path))) return ret + def build_matrix(path, subset=None): """ Return a list of items descibed by path such that if the list of @@ -1020,6 +1048,7 @@ def build_matrix(path, subset=None): mat, first, matlimit = _get_matrix(path, subset) return generate_combinations(path, mat, first, matlimit) + def _get_matrix(path, subset=None): mat = None first = None @@ -1038,6 +1067,7 @@ def _get_matrix(path, subset=None): matlimit = mat.size() return mat, first, matlimit + def _build_matrix(path, mincyclicity=0, item=''): if not os.path.exists(path): raise IOError('%s does not exist' % path) @@ -1077,8 +1107,8 @@ def _build_matrix(path, mincyclicity=0, item=''): mat = matrix.Product(item, submats) if mat and mat.cyclicity() < mincyclicity: mat = matrix.Cycle( - (mincyclicity + mat.cyclicity() - 1) / mat.cyclicity(), - mat) + (mincyclicity + mat.cyclicity() - 1) / mat.cyclicity(), mat + ) return mat else: # list items diff --git a/teuthology/test/test_suite.py b/teuthology/test/test_suite.py index b9d9b382c..003546532 100644 --- a/teuthology/test/test_suite.py +++ b/teuthology/test/test_suite.py @@ -6,7 +6,7 @@ from mock import patch, Mock, DEFAULT, MagicMock from fake_fs import make_fake_fstools from teuthology import suite from scripts.suite import main -from teuthology.config import config +from teuthology.config import config, YamlConfig from teuthology.orchestra.opsys import OS import os @@ -16,6 +16,40 @@ import time import random import requests # to mock a Response + +def get_fake_time_and_sleep(): + m_time = Mock() + m_time.return_value = time.time() + + def m_time_side_effect(): + # Fake the slow passage of time + m_time.return_value += 0.1 + return m_time.return_value + m_time.side_effect = m_time_side_effect + + def f_sleep(seconds): + m_time.return_value += seconds + m_sleep = Mock(wraps=f_sleep) + return m_time, m_sleep + + +def setup_module(): + global m_time + global m_sleep + m_time, m_sleep = get_fake_time_and_sleep() + global patcher_time_sleep + patcher_time_sleep = patch.multiple( + 'teuthology.suite.time', + time=m_time, + sleep=m_sleep, + ) + patcher_time_sleep.start() + + +def teardown_module(): + patcher_time_sleep.stop() + + @pytest.fixture def git_repository(request): d = tempfile.mkdtemp() @@ -33,24 +67,8 @@ def git_repository(request): request.addfinalizer(fin) return d -class TestSuiteOffline(object): - def test_name_timestamp_passed(self): - stamp = datetime.now().strftime('%Y-%m-%d_%H:%M:%S') - name = suite.make_run_name('suite', 'ceph', 'kernel', 'flavor', - 'mtype', timestamp=stamp) - assert str(stamp) in name - - def test_name_timestamp_not_passed(self): - stamp = datetime.now().strftime('%Y-%m-%d_%H:%M:%S') - name = suite.make_run_name('suite', 'ceph', 'kernel', 'flavor', - 'mtype') - assert str(stamp) in name - - def test_name_user(self): - name = suite.make_run_name('suite', 'ceph', 'kernel', 'flavor', - 'mtype', user='USER') - assert name.startswith('USER-') +class TestSuiteOffline(object): def test_substitute_placeholders(self): suite_hash = 'suite_hash' input_dict = dict( @@ -176,6 +194,7 @@ class TestSuiteOffline(object): assert None == suite.git_ls_remote('ceph', 'nobranch') assert suite.git_ls_remote('ceph', 'master') is not None + class TestFlavor(object): def test_get_install_task_flavor_bare(self): config = dict( @@ -227,6 +246,41 @@ class TestFlavor(object): ) assert suite.get_install_task_flavor(config) == 'notcmalloc' + +class TestRun(object): + klass = suite.Run + + def setup(self): + self.args_dict = dict( + suite='suite', + suite_branch='suite_branch', + ceph_branch='ceph_branch', + ceph_sha1='ceph_sha1', + teuthology_branch='teuthology_branch', + kernel_branch=None, + kernel_flavor='kernel_flavor', + distro='ubuntu', + machine_type='machine_type', + base_yaml_paths=list(), + ) + self.args = suite.YamlConfig.from_dict(self.args_dict) + + @patch('teuthology.suite.fetch_repos') + def test_name(self, m_fetch_repos): + stamp = datetime.now().strftime('%Y-%m-%d_%H:%M:%S') + with patch.object(suite.Run, 'create_initial_config', + return_value=suite.JobConfig()): + name = suite.Run(self.args).name + assert str(stamp) in name + + @patch('teuthology.suite.fetch_repos') + def test_name_user(self, m_fetch_repos): + self.args.user = 'USER' + with patch.object(suite.Run, 'create_initial_config', + return_value=suite.JobConfig()): + name = suite.Run(self.args).name + assert name.startswith('USER-') + @patch('teuthology.suite.git_branch_exists') @patch('teuthology.suite.package_version_for_hash') @patch('teuthology.suite.git_ls_remote') @@ -245,13 +299,12 @@ class TestFlavor(object): ] m_package_version_for_hash.return_value = 'a_version' m_git_branch_exists.return_value = True + self.args.ceph_branch = 'ceph_sha1' + self.args.ceph_sha1 = None with pytest.raises(suite.ScheduleFailError): - suite.create_initial_config( - 'suite', 'suite_branch', 'ceph_hash', None, - 'teuth_branch', None, 'kernel_flavor', 'ubuntu', - 'machine_type', - ) + self.klass(self.args) + @patch('teuthology.suite.fetch_repos') @patch('requests.head') @patch('teuthology.suite.git_branch_exists') @patch('teuthology.suite.package_version_for_hash') @@ -262,6 +315,7 @@ class TestFlavor(object): m_package_version_for_hash, m_git_branch_exists, m_requests_head, + m_fetch_repos, ): config.gitbuilder_host = 'example.com' m_package_version_for_hash.return_value = 'ceph_hash' @@ -272,12 +326,9 @@ class TestFlavor(object): m_requests_head.return_value = resp # only one call to git_ls_remote in this case m_git_ls_remote.return_value = "suite_branch" - result = suite.create_initial_config( - 'suite', 'suite_branch', 'ceph_branch', 'ceph_hash', - 'teuth_branch', None, 'kernel_flavor', 'ubuntu', 'machine_type', - ) - assert result.sha1 == 'ceph_hash' - assert result.branch == 'ceph_branch' + run = self.klass(self.args) + assert run.base_config.sha1 == 'ceph_sha1' + assert run.base_config.branch == 'ceph_branch' @patch('requests.head') @patch('teuthology.suite.git_branch_exists') @@ -295,12 +346,10 @@ class TestFlavor(object): resp.reason = 'Not Found' resp.status_code = 404 m_requests_head.return_value = resp + self.args.ceph_sha1 = 'ceph_hash_dne' with pytest.raises(suite.ScheduleFailError): - suite.create_initial_config( - 'suite', 'suite_branch', 'ceph_branch', 'ceph_hash_dne', - 'teuth_branch', None, 'kernel_flavor', 'ubuntu', - 'machine_type', - ) + self.klass(self.args) + class TestMissingPackages(object): """ @@ -920,21 +969,6 @@ def test_git_branch_exists(m_check_output): assert True == suite.git_branch_exists('ceph', 'master') -def get_fake_time_and_sleep(): - m_time = Mock() - m_time.return_value = time.time() - - def m_time_side_effect(): - # Fake the slow passage of time - m_time.return_value += 0.1 - return m_time.return_value - m_time.side_effect = m_time_side_effect - - def m_sleep(seconds): - m_time.return_value += seconds - return m_time, m_sleep - - @patch.object(suite.ResultsReporter, 'get_jobs') def test_wait_success(m_get_jobs, caplog): results = [ @@ -955,7 +989,7 @@ def test_wait_success(m_get_jobs, caplog): else: return final m_get_jobs.side_effect = get_jobs - suite.WAIT_PAUSE = 1 + suite.Run.WAIT_PAUSE = 1 in_progress = deepcopy(results) assert 0 == suite.wait('name', 1, 'http://UPLOAD_URL') @@ -965,12 +999,7 @@ def test_wait_success(m_get_jobs, caplog): in_progress = deepcopy(results) in_progress = deepcopy(results) - m_time, m_sleep = get_fake_time_and_sleep() - with patch.multiple(suite, - time=m_time, - sleep=m_sleep, - ): - assert 0 == suite.wait('name', 1, None) + assert 0 == suite.wait('name', 1, None) assert m_get_jobs.called_with('name', fields=['job_id', 'status']) assert 0 == len(in_progress) assert 'fail http://URL2' in caplog.text() @@ -984,27 +1013,22 @@ def test_wait_fails(m_get_jobs): def get_jobs(name, **kwargs): return results.pop(0) m_get_jobs.side_effect = get_jobs - suite.WAIT_PAUSE = 1 - suite.WAIT_MAX_JOB_TIME = 1 - m_time, m_sleep = get_fake_time_and_sleep() - with patch.multiple(suite, - time=m_time, - sleep=m_sleep, - ): - with pytest.raises(suite.WaitException) as error: - suite.wait('name', 1, None) - assert 'abc' in str(error) + suite.Run.WAIT_PAUSE = 1 + suite.Run.WAIT_MAX_JOB_TIME = 1 + with pytest.raises(suite.WaitException) as error: + suite.wait('name', 1, None) + assert 'abc' in str(error) -class TestSuiteMain(object): +class TestSuiteMain(object): def test_main(self): suite_name = 'SUITE' throttle = '3' machine_type = 'burnupi' - def prepare_and_schedule(**kwargs): - assert kwargs['job_config']['suite'] == suite_name - assert kwargs['throttle'] == throttle + def prepare_and_schedule(obj): + assert obj.base_config.suite == suite_name + assert obj.args.throttle == throttle def fake_str(*args, **kwargs): return 'fake' @@ -1015,15 +1039,19 @@ class TestSuiteMain(object): with patch.multiple( suite, fetch_repos=DEFAULT, - prepare_and_schedule=prepare_and_schedule, package_version_for_hash=fake_str, git_branch_exists=fake_bool, git_ls_remote=fake_str, ): - main(['--suite', suite_name, - '--throttle', throttle, - '--machine-type', machine_type, - ]) + with patch.multiple( + suite.Run, + prepare_and_schedule=prepare_and_schedule, + ): + main([ + '--suite', suite_name, + '--throttle', throttle, + '--machine-type', machine_type, + ]) def test_schedule_suite(self): suite_name = 'noop' @@ -1034,7 +1062,6 @@ class TestSuiteMain(object): suite, fetch_repos=DEFAULT, teuthology_schedule=DEFAULT, - sleep=DEFAULT, get_arch=lambda x: 'x86_64', git_ls_remote=lambda *args: '12345', package_version_for_hash=DEFAULT, @@ -1045,18 +1072,16 @@ class TestSuiteMain(object): '--suite-dir', 'teuthology/test', '--throttle', throttle, '--machine-type', machine_type]) - m['sleep'].assert_called_with(int(throttle)) + m_sleep.assert_called_with(int(throttle)) def test_schedule_suite_noverify(self): suite_name = 'noop' throttle = '3' machine_type = 'burnupi' - with patch.multiple( suite, fetch_repos=DEFAULT, teuthology_schedule=DEFAULT, - sleep=DEFAULT, get_arch=lambda x: 'x86_64', get_gitbuilder_hash=DEFAULT, git_ls_remote=lambda *args: '1234', @@ -1067,5 +1092,5 @@ class TestSuiteMain(object): '--suite-dir', 'teuthology/test', '--throttle', throttle, '--machine-type', machine_type]) - m['sleep'].assert_called_with(int(throttle)) + m_sleep.assert_called_with(int(throttle)) m['get_gitbuilder_hash'].assert_not_called()