]> git-server-git.apps.pok.os.sepia.ceph.com Git - teuthology.git/commitdiff
teuthology: add lua based fragment merge scripting
authorPatrick Donnelly <pdonnell@redhat.com>
Tue, 28 Jun 2022 20:29:40 +0000 (16:29 -0400)
committerPatrick Donnelly <pdonnell@redhat.com>
Tue, 19 Jul 2022 01:29:27 +0000 (21:29 -0400)
As part of this change, there is a new generator design for producing
job configs. YAML fragments are memoized and merged manually to avoid
expensive and unnecessary parsing of the merged fragments. This provides
for a dramatic speedup in processing matrices with large numbers of
jobs. For rados suite with --subset 1/1000, this branch is 5x faster
(77s vs. 15s). (Note: the difference shrinks when there are fewer or
jobs or larger subsets are used due to cycling and the matrix generation
dominating runtime.)

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
scripts/schedule.py
setup.cfg
teuthology/describe_tests.py
teuthology/misc.py
teuthology/suite/fragment-merge.lua [new file with mode: 0644]
teuthology/suite/merge.py [new file with mode: 0644]
teuthology/suite/run.py
teuthology/suite/test/test_run_.py
teuthology/suite/util.py

index 59a2cee298426c891b15feb928fe15d10d5c2308..58f7a46249dfbf0e291824f6420a9fb0cb0abcfd 100644 (file)
@@ -12,6 +12,7 @@ Schedule ceph integration tests
 
 positional arguments:
   <conf_file>                          Config file to read
+                                       "-" indicates read stdin.
 
 optional arguments:
   -h, --help                           Show this help message and exit
index 05737a641d8e3940cf3f136b43b05609455f4494..f5de27bf925e76f0d6e75483008fa2a01fe84a57 100644 (file)
--- a/setup.cfg
+++ b/setup.cfg
@@ -43,6 +43,7 @@ install_requires =
     gevent
     httplib2
     humanfriendly
+    lupa
     ndg-httpsclient
     netaddr
     paramiko
@@ -119,6 +120,8 @@ teuthology.openstack =
     openstack-user-data.txt
     openstack.yaml
     setup-openstack.sh
+teuthology.suite =
+    fragment-merge.lua
 teuthology.task.install =
     adjust-ulimits
     daemon-helper
index 0f5db6786f9ca3634a5a87526ab1224a5f6a1c45..3ea7d71b6c004fdb272cd1c25bc4eeabcf2d9d79 100644 (file)
@@ -13,7 +13,7 @@ from distutils.util import strtobool
 from teuthology.exceptions import ParseError
 from teuthology.suite.build_matrix import \
         build_matrix, generate_combinations, _get_matrix
-from teuthology.suite import util
+from teuthology.suite import util, merge
 
 def main(args):
     try:
@@ -130,14 +130,15 @@ def output_summary(path, limit=0,
     mat, first, matlimit = _get_matrix(path, subset=subset, no_nested_subset=no_nested_subset)
     configs = generate_combinations(path, mat, first, matlimit)
     count = 0
+    total = len(configs)
     suite = os.path.basename(path)
-    config_list = util.filter_configs(configs,
-                                      suite_name=suite,
-                                      filter_in=filter_in,
-                                      filter_out=filter_out,
-                                      filter_all=filter_all,
-                                      filter_fragments=filter_fragments)
-    for c in config_list:
+    configs = merge.config_merge(configs,
+                                 suite_name=suite,
+                                 filter_in=filter_in,
+                                 filter_out=filter_out,
+                                 filter_all=filter_all,
+                                 filter_fragments=filter_fragments)
+    for c in configs:
         if limit and count >= limit:
             break
         count += 1
@@ -148,7 +149,7 @@ def output_summary(path, limit=0,
                     print("    {}".format(util.strip_fragment_path(path)))
     if show_matrix:
        print(mat.tostr(1))
-    print("# {}/{} {}".format(count, len(configs), path))
+    print("# {}/{} {}".format(count, total, path))
 
 def get_combinations(suite_dir,
                      limit=0,
@@ -179,13 +180,13 @@ def get_combinations(suite_dir,
     dirs = {}
     max_dir_depth = 0
 
-    configs = util.filter_configs(configs,
-                                  suite_name=suite,
-                                  filter_in=filter_in,
-                                  filter_out=filter_out,
-                                  filter_all=filter_all,
-                                  filter_fragments=filter_fragments)
-    for _, fragment_paths in configs:
+    configs = merge.config_merge(configs,
+                                 suite_name=suite,
+                                 filter_in=filter_in,
+                                 filter_out=filter_out,
+                                 filter_all=filter_all,
+                                 filter_fragments=filter_fragments)
+    for _, fragment_paths, __ in configs:
         if limit > 0 and num_listed >= limit:
             break
 
index 9fc344b67451d15147eb8834c652574c8f38fa5c..9f748f39d2dffc44950cb4110ef4b7dae481eb42 100644 (file)
@@ -15,6 +15,7 @@ import time
 import yaml
 import json
 import re
+from sys import stdin
 import pprint
 import datetime
 from types import MappingProxyType
@@ -127,11 +128,14 @@ def merge_configs(config_paths):
     """
     conf_dict = dict()
     for conf_path in config_paths:
-        if not os.path.exists(conf_path):
+        if conf_path == "-":
+            partial_dict = yaml.safe_load(stdin)
+        elif not os.path.exists(conf_path):
             log.debug("The config path {0} does not exist, skipping.".format(conf_path))
             continue
-        with open(conf_path) as partial_file:
-            partial_dict = yaml.safe_load(partial_file)
+        else:
+            with open(conf_path) as partial_file:
+                partial_dict = yaml.safe_load(partial_file)
         try:
             conf_dict = deep_merge(conf_dict, partial_dict)
         except Exception:
diff --git a/teuthology/suite/fragment-merge.lua b/teuthology/suite/fragment-merge.lua
new file mode 100644 (file)
index 0000000..856bded
--- /dev/null
@@ -0,0 +1,104 @@
+-- allow only some Lua (and lunatic) builtins for use by scripts
+local lua_allowlist = {
+  assert = assert,
+  error = error,
+  ipairs = ipairs,
+  next = next,
+  pairs = pairs,
+  tonumber = tonumber,
+  tostring = tostring,
+  py_attrgetter = python.as_attrgetter,
+  py_dict = python.builtins.dict,
+  py_list = python.builtins.list,
+  py_tuple = python.builtins.tuple,
+  py_enumerate = python.enumerate,
+  py_iterex = python.iterex,
+  py_itemgetter = python.as_itemgetter,
+  math = math,
+}
+lua_allowlist.__index = lua_allowlist
+
+-- accept a fragment/config (or just return true from the script!)
+local function accept()
+  coroutine.yield(true)
+end
+-- reject a fragment/config (or just return false from the script!)
+local function reject()
+  coroutine.yield(false)
+end
+-- this implements logic for filtering (via teuthology-suite CLI flags)
+local function matches(_ENV, f)
+  if description:find(f, 1, true) then
+    return true
+  end
+  if filter_fragments then
+    for i,path in py_enumerate(base_frag_paths) do
+      if path:find(f) then
+        return true
+      end
+    end
+  end
+end
+
+local function check_filters(_ENV)
+  if filter_all then
+    for i,f in py_enumerate(filter_all) do
+      if not matches(_ENV, f) then
+        reject()
+      end
+    end
+  end
+  if filter_in then
+    local found, tried = false, false
+    for i,f in py_enumerate(filter_in) do
+      tried = true
+      if matches(_ENV, f) then
+        found = true
+        break
+      end
+    end
+    if tried and not found then
+      reject()
+    end
+  end
+  if filter_out then
+    for i,f in py_enumerate(filter_out) do
+      if matches(_ENV, f) then
+        reject()
+      end
+    end
+  end
+end
+
+function new_script(script, log, deep_merge, yaml_load)
+  -- create a restricted sandbox for the script:
+  local env = setmetatable({
+    accept = accept,
+    deep_merge = deep_merge,
+    log = log,
+    reject = reject,
+    yaml_load = yaml_load,
+  }, lua_allowlist)
+
+  -- avoid putting check_filters in _ENV
+  -- try to keep line numbers correct:
+  local header = [[do local check_filters = ...; accept(); check_filters(_ENV) end local function main() do ]]
+  local footer = [[ end return true end return main()]]
+  local function chunks()
+    coroutine.yield(header)
+    if #script > 0 then
+      coroutine.yield(script)
+    end
+    coroutine.yield(footer)
+  end
+
+  -- put the script in a coroutine so we can yield success/failure from
+  -- anywhere in the script, including in nested function calls.
+  local f, err = load(coroutine.wrap(chunks), 'teuthology', 't', env)
+  if f == nil then
+    error("failure to load script: "..err)
+  end
+  f = coroutine.wrap(f)
+  f(check_filters)
+  return env, f
+end
diff --git a/teuthology/suite/merge.py b/teuthology/suite/merge.py
new file mode 100644 (file)
index 0000000..647fe6e
--- /dev/null
@@ -0,0 +1,170 @@
+import copy
+import logging
+import lupa
+import os
+from types import MappingProxyType
+import yaml
+
+from teuthology.suite.build_matrix import combine_path
+from teuthology.suite.util import strip_fragment_path
+from teuthology.misc import deep_merge
+
+log = logging.getLogger(__name__)
+
+TEUTHOLOGY_TEMPLATE = MappingProxyType({
+  "teuthology": {
+    "fragments_dropped": [],
+    "meta": MappingProxyType({}),
+    "postmerge": [],
+  }
+})
+
+L = lupa.LuaRuntime()
+FRAGMENT_MERGE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "fragment-merge.lua")
+with open(FRAGMENT_MERGE) as f:
+    L.execute(f.read())
+
+def config_merge(configs, suite_name=None, **kwargs):
+    """
+    This procedure selects and merges YAML fragments for each job in the
+    configs array generated for the matrix of jobs.
+
+    The primary task here is to run premerge and postmerge scripts specified
+    with the YAML fragments as part of filtering out jobs or individual YAML
+    fragments. This is done with Lua scripting (via "lupa", a "lunatic"
+    derivative).
+
+    A premerge script looks like:
+
+    <foo.yaml>
+    teuthology:
+      premerge: |
+                if yaml.os_type == 'ubuntu' then reject() end
+    </foo.yaml>
+
+    This script runs prior to a YAML fragment merging into the complete YAML
+    specification for a job.  The script has access to the complete YAML
+    description generated so far as part of merging earlier fragments
+    (remember: fragments are ordered lexicographically). In the above case, the
+    os_type is checked with the foo.yaml fragment dropped if the job is
+    configured to run on Ubuntu (note: this does not account for a jobs'
+    default os_type which is not yet known).
+
+    The postmerge scripts look like:
+
+    <bar.yaml>
+    teuthology:
+      postmerge:
+        - if yaml.os_type == "ubuntu" then reject() end
+    </bar.yaml>
+
+    This script is the same but has a different effect: if, after combining all
+    the YAML fragments for a job, the os_type is "ubuntu", then the entire job
+    is dropped (filtered out / rejected). postmerge scripts are also specified
+    as a list of strings in the teuthology.postmerge array. All of these
+    strings are concatenated and then executed as a single script. So,
+    postmerge scripts from multiple fragments are all combined. You may use
+    this to define variables, functions, or anything else you need.
+
+    Scripts have access to the entire yaml object and may do any desired advanced
+    checks. It is also possible to programatically change the YAML definition:
+
+    <foo.yaml>
+    teuthology:
+      postmerge:
+        - |
+          local attr = py_attrgetter
+          local tasks = py_list()
+          for i = 1, 3 do
+            local task = py_dict(
+              exec = py_dict(py_list(
+                py_tuple("mon.a", py_list(
+                  "echo "..i
+                )
+              ))
+            )
+            attr(tasks).append(task)
+          end
+          deep_merge(yaml.tasks, tasks)
+    </foo.yaml>
+
+    This will be as if the yaml file contained:
+
+    <foo.yaml>
+    tasks:
+      exec:
+        mon.a:
+          - echo 1
+      exec:
+        mon.a:
+          - echo 2
+      exec:
+        mon.a:
+          - echo 3
+    </foo.yaml>
+
+    Which will be merged normally (via deep_merge) after the script is run.
+
+    Scripts are well sandboxed with access to a small selection of the Lua
+    builtin libraries. There is also access to some python/lupa specific
+    functions which are prefixed with "py_". No I/O or other system functions
+    permitted.
+
+    The teuthology-suite filtering options are now implemented via builtin
+    postmerge scripts. Logically, if a filter matches then reject will drop
+    the entire job (config) from the list.
+    """
+
+    new_script = L.eval('new_script')
+    yaml_cache = {}
+    for desc, paths in configs:
+        log.debug("merging config %s", desc)
+
+        if suite_name is not None:
+            desc = combine_path(suite_name, desc)
+
+        yaml_complete_obj = {}
+        deep_merge(yaml_complete_obj, TEUTHOLOGY_TEMPLATE)
+        for path in paths:
+            if path not in yaml_cache:
+                with open(path) as f:
+                    txt = f.read()
+                    yaml_cache[path] = (txt, yaml.safe_load(txt))
+
+            yaml_fragment_txt, yaml_fragment_obj = yaml_cache[path]
+            if yaml_fragment_obj is None:
+                continue
+            yaml_fragment_obj = copy.deepcopy(yaml_fragment_obj)
+            premerge = yaml_fragment_obj.get('teuthology', {}).pop('premerge', '')
+            if premerge:
+                log.debug("premerge script running:\n%s", premerge)
+                env, script = new_script(premerge, log, deep_merge, yaml.safe_load)
+                env['base_frag_paths'] = [strip_fragment_path(x) for x in paths]
+                env['description'] = desc
+                env['frag_paths'] = paths
+                env['suite_name'] = suite_name
+                env['yaml'] = yaml_complete_obj
+                env['yaml_fragment'] = yaml_fragment_obj
+                for k,v in kwargs.items():
+                    env[k] = v
+                if not script():
+                    log.debug("skipping merge of fragment %s due to premerge filter", path)
+                    yaml_complete_obj['teuthology']['fragments_dropped'].append(path)
+                    continue
+            deep_merge(yaml_complete_obj, yaml_fragment_obj)
+
+        postmerge = yaml_complete_obj.get('teuthology', {}).get('postmerge', [])
+        postmerge = "\n".join(postmerge)
+        log.debug("postmerge script running:\n%s", postmerge)
+        env, script = new_script(postmerge, log, deep_merge, yaml.safe_load)
+        env['base_frag_paths'] = [strip_fragment_path(x) for x in paths]
+        env['description'] = desc
+        env['frag_paths'] = paths
+        env['suite_name'] = suite_name
+        env['yaml'] = yaml_complete_obj
+        for k,v in kwargs.items():
+            env[k] = v
+        if not script():
+            log.debug("skipping config %s due to postmerge filter", desc)
+            continue
+        yield desc, paths, yaml_complete_obj
index 0b063e74630a52f948d5cce391d886ff62c740e2..560cbb239acf8b7942d72d4103ca38486dd60d53 100644 (file)
@@ -2,9 +2,9 @@ import copy
 import logging
 import os
 import pwd
+import yaml
 import re
 import time
-import yaml
 
 from humanfriendly import format_timespan
 
@@ -22,6 +22,7 @@ from teuthology.orchestra.opsys import OS
 from teuthology.repo_utils import build_git_url
 
 from teuthology.suite import util
+from teuthology.suite.merge import config_merge
 from teuthology.suite.build_matrix import build_matrix
 from teuthology.suite.placeholder import substitute_placeholders, dict_templ
 
@@ -421,16 +422,13 @@ class Run(object):
     def collect_jobs(self, arch, configs, newest=False, limit=0):
         jobs_to_schedule = []
         jobs_missing_packages = []
-        for description, fragment_paths in configs:
+        for description, fragment_paths, parsed_yaml in configs:
             if limit > 0 and len(jobs_to_schedule) >= limit:
                 log.info(
                     'Stopped after {limit} jobs due to --limit={limit}'.format(
                         limit=limit))
                 break
 
-            raw_yaml = '\n'.join([open(a, 'r').read() for a in fragment_paths])
-
-            parsed_yaml = yaml.safe_load(raw_yaml)
             os_type = parsed_yaml.get('os_type') or self.base_config.os_type
             os_version = parsed_yaml.get('os_version') or self.base_config.os_version
             exclude_arch = parsed_yaml.get('exclude_arch')
@@ -452,13 +450,16 @@ class Run(object):
                 '--',
             ])
             arg.extend(self.base_yaml_paths)
-            arg.extend(fragment_paths)
+
+            parsed_yaml_txt = yaml.dump(parsed_yaml)
+            arg.append('-')
 
             job = dict(
                 yaml=parsed_yaml,
                 desc=description,
                 sha1=self.base_config.sha1,
-                args=arg
+                args=arg,
+                stdin=parsed_yaml_txt,
             )
 
             sha1 = self.base_config.sha1
@@ -519,6 +520,7 @@ class Run(object):
                 dry_run=self.args.dry_run,
                 verbose=self.args.verbose,
                 log_prefix=log_prefix,
+                stdin=job['stdin'],
             )
             throttle = self.args.throttle
             if not self.args.dry_run and throttle:
@@ -579,8 +581,14 @@ Note: If you still want to go ahead, use --job-threshold 0'''
                                subset=self.args.subset,
                                no_nested_subset=self.args.no_nested_subset,
                                seed=self.args.seed)
-        log.info('Suite %s in %s generated %d jobs (not yet filtered)' % (
-            suite_name, suite_path, len(configs)))
+        generated = len(configs)
+        log.info(f'Suite {suite_name} in {suite_path} generated {generated} jobs (not yet filtered or merged)')
+        configs = config_merge(configs,
+            filter_in=self.args.filter_in,
+            filter_out=self.args.filter_out,
+            filter_all=self.args.filter_all,
+            filter_fragments=self.args.filter_fragments,
+            suite_name=suite_name)
 
         if self.args.dry_run:
             log.debug("Base job config:\n%s" % self.base_config)
@@ -617,7 +625,7 @@ Note: If you still want to go ahead, use --job-threshold 0'''
                     'this run for {that_long}? (y/N):'
                     .format(
                         that_long=format_timespan(sleep_before_teardown),
-                        total=len(configs),
+                        total=generated,
                         maximum=job_limit))
                 while True:
                     insane=(input(are_you_insane) or 'n').lower()
@@ -632,14 +640,7 @@ Note: If you still want to go ahead, use --job-threshold 0'''
         limit = self.args.newest
         while backtrack <= limit:
             jobs_missing_packages, jobs_to_schedule = \
-                self.collect_jobs(arch,
-                    util.filter_configs(configs,
-                        filter_in=self.args.filter_in,
-                        filter_out=self.args.filter_out,
-                        filter_all=self.args.filter_all,
-                        filter_fragments=self.args.filter_fragments,
-                        suite_name=suite_name),
-                                  self.args.newest, job_limit)
+                self.collect_jobs(arch, configs, self.args.newest, job_limit)
             if jobs_missing_packages and self.args.newest:
                 new_sha1 = \
                     util.find_git_parent('ceph', self.base_config.sha1)
@@ -690,8 +691,8 @@ Note: If you still want to go ahead, use --job-threshold 0'''
             (suite_name, suite_path, count)
         )
         log.info('%d/%d jobs were filtered out.',
-                 (len(configs) - count),
-                 len(configs))
+                 (generated - count),
+                 generated)
         if missing_count:
             log.warning('Scheduled %d/%d jobs that are missing packages!',
                      missing_count, count)
index 48b2866e524fba237bc538955a54575908a0e381..413fa2acede1a3c5c4055d3f27160082ce0e32e7 100644 (file)
@@ -1,8 +1,8 @@
 import os
 import pytest
 import requests
-import yaml
 import contextlib
+import yaml
 
 from datetime import datetime
 from mock import patch, call, ANY, DEFAULT
@@ -210,7 +210,7 @@ class TestScheduleSuite(object):
     @patch('teuthology.suite.util.has_packages_for_distro')
     @patch('teuthology.suite.util.get_package_versions')
     @patch('teuthology.suite.util.get_install_task_flavor')
-    @patch('teuthology.suite.run.open')
+    @patch('teuthology.suite.merge.open')
     @patch('teuthology.suite.run.build_matrix')
     @patch('teuthology.suite.util.git_ls_remote')
     @patch('teuthology.suite.util.package_version_for_hash')
@@ -262,9 +262,17 @@ class TestScheduleSuite(object):
         m_has_packages_for_distro.assert_has_calls(
             [call('ceph_sha1', 'ubuntu', '14.04', 'default', {})],
         )
-        frags = (frag1_read_output, frag2_read_output)
+        y = {
+          'teuthology': {
+            'fragments_dropped': [],
+            'meta': {},
+            'postmerge': []
+          },
+          'field1': 'val1',
+          'field2': 'val2'
+        }
         expected_job = dict(
-            yaml=yaml.safe_load('\n'.join(frags)),
+            yaml=y,
             sha1='ceph_sha1',
             args=[
                 '--num',
@@ -272,10 +280,10 @@ class TestScheduleSuite(object):
                 '--description',
                 os.path.join(self.args.suite, build_matrix_desc),
                 '--',
-                ANY,
-                build_matrix_frags[0],
-                build_matrix_frags[1],
+                ANY, # base config
+                '-'
             ],
+            stdin=yaml.dump(y),
             desc=os.path.join(self.args.suite, build_matrix_desc),
         )
 
@@ -289,7 +297,7 @@ class TestScheduleSuite(object):
     @patch('teuthology.suite.util.has_packages_for_distro')
     @patch('teuthology.suite.util.get_package_versions')
     @patch('teuthology.suite.util.get_install_task_flavor')
-    @patch('teuthology.suite.run.open', create=True)
+    @patch('teuthology.suite.run.config_merge')
     @patch('teuthology.suite.run.build_matrix')
     @patch('teuthology.suite.util.git_ls_remote')
     @patch('teuthology.suite.util.package_version_for_hash')
@@ -302,7 +310,7 @@ class TestScheduleSuite(object):
         m_package_version_for_hash,
         m_git_ls_remote,
         m_build_matrix,
-        m_open,
+        m_config_merge,
         m_get_install_task_flavor,
         m_get_package_versions,
         m_has_packages_for_distro,
@@ -319,7 +327,7 @@ class TestScheduleSuite(object):
             (build_matrix_desc, build_matrix_frags),
         ]
         m_build_matrix.return_value = build_matrix_output
-        m_open.side_effect = [StringIO('field: val\n') for i in range(11)]
+        m_config_merge.return_value = [(a, b, {}) for a, b in build_matrix_output]
         m_get_install_task_flavor.return_value = 'default'
         m_get_package_versions.return_value = dict()
         m_has_packages_for_distro.side_effect = [
@@ -344,7 +352,7 @@ class TestScheduleSuite(object):
     @patch('teuthology.suite.util.has_packages_for_distro')
     @patch('teuthology.suite.util.get_package_versions')
     @patch('teuthology.suite.util.get_install_task_flavor')
-    @patch('teuthology.suite.run.open', create=True)
+    @patch('teuthology.suite.run.config_merge')
     @patch('teuthology.suite.run.build_matrix')
     @patch('teuthology.suite.util.git_ls_remote')
     @patch('teuthology.suite.util.package_version_for_hash')
@@ -357,7 +365,7 @@ class TestScheduleSuite(object):
         m_package_version_for_hash,
         m_git_ls_remote,
         m_build_matrix,
-        m_open,
+        m_config_merge,
         m_get_install_task_flavor,
         m_get_package_versions,
         m_has_packages_for_distro,
@@ -378,11 +386,7 @@ class TestScheduleSuite(object):
             (build_matrix_desc, build_matrix_frags),
         ]
         m_build_matrix.return_value = build_matrix_output
-        m_open.side_effect = [
-            StringIO('field: val\n') for i in range(NUM_FAILS+1)
-        ] + [
-            contextlib.closing(BytesIO())
-        ] 
+        m_config_merge.return_value = [(a, b, {}) for a, b in build_matrix_output]
         m_get_install_task_flavor.return_value = 'default'
         m_get_package_versions.return_value = dict()
         # NUM_FAILS, then success
index 67ac32410945016174f6e6c364aaedb429458c4e..1ce9bd38624313fdc7e79adb06de2d255063dcdd 100644 (file)
@@ -4,7 +4,7 @@ import os
 import requests
 import smtplib
 import socket
-import subprocess
+from subprocess import Popen, PIPE, DEVNULL
 import sys
 
 from email.mime.text import MIMEText
@@ -20,7 +20,6 @@ from teuthology.repo_utils import fetch_qa_suite, fetch_teuthology
 from teuthology.orchestra.opsys import OS
 from teuthology.packaging import get_builder_project
 from teuthology.repo_utils import build_git_url
-from teuthology.suite.build_matrix import combine_path
 from teuthology.task.install import get_flavor
 
 log = logging.getLogger(__name__)
@@ -428,7 +427,7 @@ def has_packages_for_distro(sha1, os_type, os_version, flavor,
     return bool(flavors.get(flavor, None))
 
 
-def teuthology_schedule(args, verbose, dry_run, log_prefix=''):
+def teuthology_schedule(args, verbose, dry_run, log_prefix='', stdin=None):
     """
     Run teuthology-schedule to schedule individual jobs.
 
@@ -456,8 +455,12 @@ def teuthology_schedule(args, verbose, dry_run, log_prefix=''):
             ' '.join(printable_args),
         ))
     if not dry_run or (dry_run and verbose > 1):
-        subprocess.check_call(args=args)
-
+        astdin = DEVNULL if stdin is None else PIPE
+        p = Popen(args, stdin=astdin)
+        if stdin is not None:
+            p.communicate(input=stdin.encode('utf-8'))
+        else:
+            p.communicate()
 
 def find_git_parent(project, sha1):
 
@@ -493,42 +496,3 @@ def find_git_parent(project, sha1):
         return sha1s[1]
     else:
         return None
-
-
-def filter_configs(configs, suite_name=None,
-                            filter_in=None,
-                            filter_out=None,
-                            filter_all=None,
-                            filter_fragments=True):
-    """
-    Returns a generator for pairs of description and fragment paths.
-
-    Usage:
-
-        configs = build_matrix(path, subset, seed)
-        for description, fragments in filter_configs(configs):
-            pass
-    """
-    for item in configs:
-        fragment_paths = item[1]
-        description = combine_path(suite_name, item[0]) \
-                                        if suite_name else item[0]
-        base_frag_paths = [strip_fragment_path(x)
-                                        for x in fragment_paths]
-        def matches(f):
-            if f in description:
-                return True
-            if filter_fragments and \
-                    any(f in path for path in base_frag_paths):
-                return True
-            return False
-        if filter_all:
-            if not all(matches(f) for f in filter_all):
-                continue
-        if filter_in:
-            if not any(matches(f) for f in filter_in):
-                continue
-        if filter_out:
-            if any(matches(f) for f in filter_out):
-                continue
-        yield([description, fragment_paths])