positional arguments:
<conf_file> Config file to read
+ "-" indicates read stdin.
optional arguments:
-h, --help Show this help message and exit
gevent
httplib2
humanfriendly
+ lupa
ndg-httpsclient
netaddr
paramiko
openstack-user-data.txt
openstack.yaml
setup-openstack.sh
+teuthology.suite =
+ fragment-merge.lua
teuthology.task.install =
adjust-ulimits
daemon-helper
from teuthology.exceptions import ParseError
from teuthology.suite.build_matrix import \
build_matrix, generate_combinations, _get_matrix
-from teuthology.suite import util
+from teuthology.suite import util, merge
def main(args):
try:
mat, first, matlimit = _get_matrix(path, subset=subset, no_nested_subset=no_nested_subset)
configs = generate_combinations(path, mat, first, matlimit)
count = 0
+ total = len(configs)
suite = os.path.basename(path)
- config_list = util.filter_configs(configs,
- suite_name=suite,
- filter_in=filter_in,
- filter_out=filter_out,
- filter_all=filter_all,
- filter_fragments=filter_fragments)
- for c in config_list:
+ configs = merge.config_merge(configs,
+ suite_name=suite,
+ filter_in=filter_in,
+ filter_out=filter_out,
+ filter_all=filter_all,
+ filter_fragments=filter_fragments)
+ for c in configs:
if limit and count >= limit:
break
count += 1
print(" {}".format(util.strip_fragment_path(path)))
if show_matrix:
print(mat.tostr(1))
- print("# {}/{} {}".format(count, len(configs), path))
+ print("# {}/{} {}".format(count, total, path))
def get_combinations(suite_dir,
limit=0,
dirs = {}
max_dir_depth = 0
- configs = util.filter_configs(configs,
- suite_name=suite,
- filter_in=filter_in,
- filter_out=filter_out,
- filter_all=filter_all,
- filter_fragments=filter_fragments)
- for _, fragment_paths in configs:
+ configs = merge.config_merge(configs,
+ suite_name=suite,
+ filter_in=filter_in,
+ filter_out=filter_out,
+ filter_all=filter_all,
+ filter_fragments=filter_fragments)
+ for _, fragment_paths, __ in configs:
if limit > 0 and num_listed >= limit:
break
import yaml
import json
import re
+from sys import stdin
import pprint
import datetime
from types import MappingProxyType
"""
conf_dict = dict()
for conf_path in config_paths:
- if not os.path.exists(conf_path):
+ if conf_path == "-":
+ partial_dict = yaml.safe_load(stdin)
+ elif not os.path.exists(conf_path):
log.debug("The config path {0} does not exist, skipping.".format(conf_path))
continue
- with open(conf_path) as partial_file:
- partial_dict = yaml.safe_load(partial_file)
+ else:
+ with open(conf_path) as partial_file:
+ partial_dict = yaml.safe_load(partial_file)
try:
conf_dict = deep_merge(conf_dict, partial_dict)
except Exception:
--- /dev/null
+-- allow only some Lua (and lunatic) builtins for use by scripts
+local lua_allowlist = {
+ assert = assert,
+ error = error,
+ ipairs = ipairs,
+ next = next,
+ pairs = pairs,
+ tonumber = tonumber,
+ tostring = tostring,
+ py_attrgetter = python.as_attrgetter,
+ py_dict = python.builtins.dict,
+ py_list = python.builtins.list,
+ py_tuple = python.builtins.tuple,
+ py_enumerate = python.enumerate,
+ py_iterex = python.iterex,
+ py_itemgetter = python.as_itemgetter,
+ math = math,
+}
+lua_allowlist.__index = lua_allowlist
+
+-- accept a fragment/config (or just return true from the script!)
+local function accept()
+ coroutine.yield(true)
+end
+-- reject a fragment/config (or just return false from the script!)
+local function reject()
+ coroutine.yield(false)
+end
+-- this implements logic for filtering (via teuthology-suite CLI flags)
+local function matches(_ENV, f)
+ if description:find(f, 1, true) then
+ return true
+ end
+ if filter_fragments then
+ for i,path in py_enumerate(base_frag_paths) do
+ if path:find(f) then
+ return true
+ end
+ end
+ end
+end
+
+local function check_filters(_ENV)
+ if filter_all then
+ for i,f in py_enumerate(filter_all) do
+ if not matches(_ENV, f) then
+ reject()
+ end
+ end
+ end
+ if filter_in then
+ local found, tried = false, false
+ for i,f in py_enumerate(filter_in) do
+ tried = true
+ if matches(_ENV, f) then
+ found = true
+ break
+ end
+ end
+ if tried and not found then
+ reject()
+ end
+ end
+ if filter_out then
+ for i,f in py_enumerate(filter_out) do
+ if matches(_ENV, f) then
+ reject()
+ end
+ end
+ end
+end
+
+function new_script(script, log, deep_merge, yaml_load)
+ -- create a restricted sandbox for the script:
+ local env = setmetatable({
+ accept = accept,
+ deep_merge = deep_merge,
+ log = log,
+ reject = reject,
+ yaml_load = yaml_load,
+ }, lua_allowlist)
+
+ -- avoid putting check_filters in _ENV
+ -- try to keep line numbers correct:
+ local header = [[do local check_filters = ...; accept(); check_filters(_ENV) end local function main() do ]]
+ local footer = [[ end return true end return main()]]
+ local function chunks()
+ coroutine.yield(header)
+ if #script > 0 then
+ coroutine.yield(script)
+ end
+ coroutine.yield(footer)
+ end
+
+ -- put the script in a coroutine so we can yield success/failure from
+ -- anywhere in the script, including in nested function calls.
+ local f, err = load(coroutine.wrap(chunks), 'teuthology', 't', env)
+ if f == nil then
+ error("failure to load script: "..err)
+ end
+ f = coroutine.wrap(f)
+ f(check_filters)
+ return env, f
+end
--- /dev/null
+import copy
+import logging
+import lupa
+import os
+from types import MappingProxyType
+import yaml
+
+from teuthology.suite.build_matrix import combine_path
+from teuthology.suite.util import strip_fragment_path
+from teuthology.misc import deep_merge
+
+log = logging.getLogger(__name__)
+
+TEUTHOLOGY_TEMPLATE = MappingProxyType({
+ "teuthology": {
+ "fragments_dropped": [],
+ "meta": MappingProxyType({}),
+ "postmerge": [],
+ }
+})
+
+L = lupa.LuaRuntime()
+FRAGMENT_MERGE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "fragment-merge.lua")
+with open(FRAGMENT_MERGE) as f:
+ L.execute(f.read())
+
+def config_merge(configs, suite_name=None, **kwargs):
+ """
+ This procedure selects and merges YAML fragments for each job in the
+ configs array generated for the matrix of jobs.
+
+ The primary task here is to run premerge and postmerge scripts specified
+ with the YAML fragments as part of filtering out jobs or individual YAML
+ fragments. This is done with Lua scripting (via "lupa", a "lunatic"
+ derivative).
+
+ A premerge script looks like:
+
+ <foo.yaml>
+ teuthology:
+ premerge: |
+ if yaml.os_type == 'ubuntu' then reject() end
+ </foo.yaml>
+
+ This script runs prior to a YAML fragment merging into the complete YAML
+ specification for a job. The script has access to the complete YAML
+ description generated so far as part of merging earlier fragments
+ (remember: fragments are ordered lexicographically). In the above case, the
+ os_type is checked with the foo.yaml fragment dropped if the job is
+ configured to run on Ubuntu (note: this does not account for a jobs'
+ default os_type which is not yet known).
+
+ The postmerge scripts look like:
+
+ <bar.yaml>
+ teuthology:
+ postmerge:
+ - if yaml.os_type == "ubuntu" then reject() end
+ </bar.yaml>
+
+ This script is the same but has a different effect: if, after combining all
+ the YAML fragments for a job, the os_type is "ubuntu", then the entire job
+ is dropped (filtered out / rejected). postmerge scripts are also specified
+ as a list of strings in the teuthology.postmerge array. All of these
+ strings are concatenated and then executed as a single script. So,
+ postmerge scripts from multiple fragments are all combined. You may use
+ this to define variables, functions, or anything else you need.
+
+ Scripts have access to the entire yaml object and may do any desired advanced
+ checks. It is also possible to programatically change the YAML definition:
+
+ <foo.yaml>
+ teuthology:
+ postmerge:
+ - |
+ local attr = py_attrgetter
+ local tasks = py_list()
+ for i = 1, 3 do
+ local task = py_dict(
+ exec = py_dict(py_list(
+ py_tuple("mon.a", py_list(
+ "echo "..i
+ )
+ ))
+ )
+ attr(tasks).append(task)
+ end
+ deep_merge(yaml.tasks, tasks)
+ </foo.yaml>
+
+ This will be as if the yaml file contained:
+
+ <foo.yaml>
+ tasks:
+ exec:
+ mon.a:
+ - echo 1
+ exec:
+ mon.a:
+ - echo 2
+ exec:
+ mon.a:
+ - echo 3
+ </foo.yaml>
+
+ Which will be merged normally (via deep_merge) after the script is run.
+
+ Scripts are well sandboxed with access to a small selection of the Lua
+ builtin libraries. There is also access to some python/lupa specific
+ functions which are prefixed with "py_". No I/O or other system functions
+ permitted.
+
+ The teuthology-suite filtering options are now implemented via builtin
+ postmerge scripts. Logically, if a filter matches then reject will drop
+ the entire job (config) from the list.
+ """
+
+ new_script = L.eval('new_script')
+ yaml_cache = {}
+ for desc, paths in configs:
+ log.debug("merging config %s", desc)
+
+ if suite_name is not None:
+ desc = combine_path(suite_name, desc)
+
+ yaml_complete_obj = {}
+ deep_merge(yaml_complete_obj, TEUTHOLOGY_TEMPLATE)
+ for path in paths:
+ if path not in yaml_cache:
+ with open(path) as f:
+ txt = f.read()
+ yaml_cache[path] = (txt, yaml.safe_load(txt))
+
+ yaml_fragment_txt, yaml_fragment_obj = yaml_cache[path]
+ if yaml_fragment_obj is None:
+ continue
+ yaml_fragment_obj = copy.deepcopy(yaml_fragment_obj)
+ premerge = yaml_fragment_obj.get('teuthology', {}).pop('premerge', '')
+ if premerge:
+ log.debug("premerge script running:\n%s", premerge)
+ env, script = new_script(premerge, log, deep_merge, yaml.safe_load)
+ env['base_frag_paths'] = [strip_fragment_path(x) for x in paths]
+ env['description'] = desc
+ env['frag_paths'] = paths
+ env['suite_name'] = suite_name
+ env['yaml'] = yaml_complete_obj
+ env['yaml_fragment'] = yaml_fragment_obj
+ for k,v in kwargs.items():
+ env[k] = v
+ if not script():
+ log.debug("skipping merge of fragment %s due to premerge filter", path)
+ yaml_complete_obj['teuthology']['fragments_dropped'].append(path)
+ continue
+ deep_merge(yaml_complete_obj, yaml_fragment_obj)
+
+ postmerge = yaml_complete_obj.get('teuthology', {}).get('postmerge', [])
+ postmerge = "\n".join(postmerge)
+ log.debug("postmerge script running:\n%s", postmerge)
+ env, script = new_script(postmerge, log, deep_merge, yaml.safe_load)
+ env['base_frag_paths'] = [strip_fragment_path(x) for x in paths]
+ env['description'] = desc
+ env['frag_paths'] = paths
+ env['suite_name'] = suite_name
+ env['yaml'] = yaml_complete_obj
+ for k,v in kwargs.items():
+ env[k] = v
+ if not script():
+ log.debug("skipping config %s due to postmerge filter", desc)
+ continue
+ yield desc, paths, yaml_complete_obj
import logging
import os
import pwd
+import yaml
import re
import time
-import yaml
from humanfriendly import format_timespan
from teuthology.repo_utils import build_git_url
from teuthology.suite import util
+from teuthology.suite.merge import config_merge
from teuthology.suite.build_matrix import build_matrix
from teuthology.suite.placeholder import substitute_placeholders, dict_templ
def collect_jobs(self, arch, configs, newest=False, limit=0):
jobs_to_schedule = []
jobs_missing_packages = []
- for description, fragment_paths in configs:
+ for description, fragment_paths, parsed_yaml in configs:
if limit > 0 and len(jobs_to_schedule) >= limit:
log.info(
'Stopped after {limit} jobs due to --limit={limit}'.format(
limit=limit))
break
- raw_yaml = '\n'.join([open(a, 'r').read() for a in fragment_paths])
-
- parsed_yaml = yaml.safe_load(raw_yaml)
os_type = parsed_yaml.get('os_type') or self.base_config.os_type
os_version = parsed_yaml.get('os_version') or self.base_config.os_version
exclude_arch = parsed_yaml.get('exclude_arch')
'--',
])
arg.extend(self.base_yaml_paths)
- arg.extend(fragment_paths)
+
+ parsed_yaml_txt = yaml.dump(parsed_yaml)
+ arg.append('-')
job = dict(
yaml=parsed_yaml,
desc=description,
sha1=self.base_config.sha1,
- args=arg
+ args=arg,
+ stdin=parsed_yaml_txt,
)
sha1 = self.base_config.sha1
dry_run=self.args.dry_run,
verbose=self.args.verbose,
log_prefix=log_prefix,
+ stdin=job['stdin'],
)
throttle = self.args.throttle
if not self.args.dry_run and throttle:
subset=self.args.subset,
no_nested_subset=self.args.no_nested_subset,
seed=self.args.seed)
- log.info('Suite %s in %s generated %d jobs (not yet filtered)' % (
- suite_name, suite_path, len(configs)))
+ generated = len(configs)
+ log.info(f'Suite {suite_name} in {suite_path} generated {generated} jobs (not yet filtered or merged)')
+ configs = config_merge(configs,
+ filter_in=self.args.filter_in,
+ filter_out=self.args.filter_out,
+ filter_all=self.args.filter_all,
+ filter_fragments=self.args.filter_fragments,
+ suite_name=suite_name)
if self.args.dry_run:
log.debug("Base job config:\n%s" % self.base_config)
'this run for {that_long}? (y/N):'
.format(
that_long=format_timespan(sleep_before_teardown),
- total=len(configs),
+ total=generated,
maximum=job_limit))
while True:
insane=(input(are_you_insane) or 'n').lower()
limit = self.args.newest
while backtrack <= limit:
jobs_missing_packages, jobs_to_schedule = \
- self.collect_jobs(arch,
- util.filter_configs(configs,
- filter_in=self.args.filter_in,
- filter_out=self.args.filter_out,
- filter_all=self.args.filter_all,
- filter_fragments=self.args.filter_fragments,
- suite_name=suite_name),
- self.args.newest, job_limit)
+ self.collect_jobs(arch, configs, self.args.newest, job_limit)
if jobs_missing_packages and self.args.newest:
new_sha1 = \
util.find_git_parent('ceph', self.base_config.sha1)
(suite_name, suite_path, count)
)
log.info('%d/%d jobs were filtered out.',
- (len(configs) - count),
- len(configs))
+ (generated - count),
+ generated)
if missing_count:
log.warning('Scheduled %d/%d jobs that are missing packages!',
missing_count, count)
import os
import pytest
import requests
-import yaml
import contextlib
+import yaml
from datetime import datetime
from mock import patch, call, ANY, DEFAULT
@patch('teuthology.suite.util.has_packages_for_distro')
@patch('teuthology.suite.util.get_package_versions')
@patch('teuthology.suite.util.get_install_task_flavor')
- @patch('teuthology.suite.run.open')
+ @patch('teuthology.suite.merge.open')
@patch('teuthology.suite.run.build_matrix')
@patch('teuthology.suite.util.git_ls_remote')
@patch('teuthology.suite.util.package_version_for_hash')
m_has_packages_for_distro.assert_has_calls(
[call('ceph_sha1', 'ubuntu', '14.04', 'default', {})],
)
- frags = (frag1_read_output, frag2_read_output)
+ y = {
+ 'teuthology': {
+ 'fragments_dropped': [],
+ 'meta': {},
+ 'postmerge': []
+ },
+ 'field1': 'val1',
+ 'field2': 'val2'
+ }
expected_job = dict(
- yaml=yaml.safe_load('\n'.join(frags)),
+ yaml=y,
sha1='ceph_sha1',
args=[
'--num',
'--description',
os.path.join(self.args.suite, build_matrix_desc),
'--',
- ANY,
- build_matrix_frags[0],
- build_matrix_frags[1],
+ ANY, # base config
+ '-'
],
+ stdin=yaml.dump(y),
desc=os.path.join(self.args.suite, build_matrix_desc),
)
@patch('teuthology.suite.util.has_packages_for_distro')
@patch('teuthology.suite.util.get_package_versions')
@patch('teuthology.suite.util.get_install_task_flavor')
- @patch('teuthology.suite.run.open', create=True)
+ @patch('teuthology.suite.run.config_merge')
@patch('teuthology.suite.run.build_matrix')
@patch('teuthology.suite.util.git_ls_remote')
@patch('teuthology.suite.util.package_version_for_hash')
m_package_version_for_hash,
m_git_ls_remote,
m_build_matrix,
- m_open,
+ m_config_merge,
m_get_install_task_flavor,
m_get_package_versions,
m_has_packages_for_distro,
(build_matrix_desc, build_matrix_frags),
]
m_build_matrix.return_value = build_matrix_output
- m_open.side_effect = [StringIO('field: val\n') for i in range(11)]
+ m_config_merge.return_value = [(a, b, {}) for a, b in build_matrix_output]
m_get_install_task_flavor.return_value = 'default'
m_get_package_versions.return_value = dict()
m_has_packages_for_distro.side_effect = [
@patch('teuthology.suite.util.has_packages_for_distro')
@patch('teuthology.suite.util.get_package_versions')
@patch('teuthology.suite.util.get_install_task_flavor')
- @patch('teuthology.suite.run.open', create=True)
+ @patch('teuthology.suite.run.config_merge')
@patch('teuthology.suite.run.build_matrix')
@patch('teuthology.suite.util.git_ls_remote')
@patch('teuthology.suite.util.package_version_for_hash')
m_package_version_for_hash,
m_git_ls_remote,
m_build_matrix,
- m_open,
+ m_config_merge,
m_get_install_task_flavor,
m_get_package_versions,
m_has_packages_for_distro,
(build_matrix_desc, build_matrix_frags),
]
m_build_matrix.return_value = build_matrix_output
- m_open.side_effect = [
- StringIO('field: val\n') for i in range(NUM_FAILS+1)
- ] + [
- contextlib.closing(BytesIO())
- ]
+ m_config_merge.return_value = [(a, b, {}) for a, b in build_matrix_output]
m_get_install_task_flavor.return_value = 'default'
m_get_package_versions.return_value = dict()
# NUM_FAILS, then success
import requests
import smtplib
import socket
-import subprocess
+from subprocess import Popen, PIPE, DEVNULL
import sys
from email.mime.text import MIMEText
from teuthology.orchestra.opsys import OS
from teuthology.packaging import get_builder_project
from teuthology.repo_utils import build_git_url
-from teuthology.suite.build_matrix import combine_path
from teuthology.task.install import get_flavor
log = logging.getLogger(__name__)
return bool(flavors.get(flavor, None))
-def teuthology_schedule(args, verbose, dry_run, log_prefix=''):
+def teuthology_schedule(args, verbose, dry_run, log_prefix='', stdin=None):
"""
Run teuthology-schedule to schedule individual jobs.
' '.join(printable_args),
))
if not dry_run or (dry_run and verbose > 1):
- subprocess.check_call(args=args)
-
+ astdin = DEVNULL if stdin is None else PIPE
+ p = Popen(args, stdin=astdin)
+ if stdin is not None:
+ p.communicate(input=stdin.encode('utf-8'))
+ else:
+ p.communicate()
def find_git_parent(project, sha1):
return sha1s[1]
else:
return None
-
-
-def filter_configs(configs, suite_name=None,
- filter_in=None,
- filter_out=None,
- filter_all=None,
- filter_fragments=True):
- """
- Returns a generator for pairs of description and fragment paths.
-
- Usage:
-
- configs = build_matrix(path, subset, seed)
- for description, fragments in filter_configs(configs):
- pass
- """
- for item in configs:
- fragment_paths = item[1]
- description = combine_path(suite_name, item[0]) \
- if suite_name else item[0]
- base_frag_paths = [strip_fragment_path(x)
- for x in fragment_paths]
- def matches(f):
- if f in description:
- return True
- if filter_fragments and \
- any(f in path for path in base_frag_paths):
- return True
- return False
- if filter_all:
- if not all(matches(f) for f in filter_all):
- continue
- if filter_in:
- if not any(matches(f) for f in filter_in):
- continue
- if filter_out:
- if any(matches(f) for f in filter_out):
- continue
- yield([description, fragment_paths])