"""
usage: teuthology --help
teuthology --version
- teuthology [options] <config>...
+ teuthology [options] [--] <config>...
Run ceph integration tests
import os
import yaml
import logging
+import collections
def init_logging():
log = init_logging()
-class YamlConfig(object):
+class YamlConfig(collections.MutableMapping):
"""
A configuration object populated by parsing a yaml file, with optional
default values.
"""
return str(self)
+ def get(self, key, default=None):
+ """
+ A function that acts like dict.get(). Will first check self._conf, then
+ _defaults for the given key.
+
+ :param key: The key to fetch
+ :returns: The value for the given key, or the default kwarg if not found
+ """
+ result = self.__getattr__(key)
+ if not result:
+ return default
+ return result
+
def __str__(self):
return yaml.safe_dump(self._conf, default_flow_style=False).strip()
+ def __repr__(self):
+ return self.__str__()
+
def __getitem__(self, name):
- return self._conf.__getitem__(name)
+ return self.__getattr__(name)
def __getattr__(self, name):
return self._conf.get(name, self._defaults.get(name))
+ def __contains__(self, name):
+ return self._conf.__contains__(name)
+
def __setattr__(self, name, value):
if name.endswith('_conf') or name in ('yaml_path'):
object.__setattr__(self, name, value)
def __delattr__(self, name):
del self._conf[name]
+ def __len__(self):
+ return self._conf.__len__()
+
+ def __iter__(self):
+ return self._conf.__iter__()
+
+ def __setitem__(self, name, value):
+ self._conf.__setitem__(name, value)
+
+ def __delitem__(self, name):
+ self._conf.__delitem__(name)
+
class TeuthologyConfig(YamlConfig):
"""
class FakeNamespace(YamlConfig):
- """ This class is meant to behave like a argparse Namespace with an attached
- teuthology config at the teuthology_config property. It mimics the old
- way of doing things with argparse and teuthology.misc.read_config.
+ """
+ This class is meant to behave like a argparse Namespace with an attached
+ teuthology config at the teuthology_config property. It mimics the old
+ way of doing things with argparse and teuthology.misc.read_config.
- We'll use this as a stop-gap as we refactor commands but allow the tasks
- to still be passed a single namespace object for the time being.
+ We'll use this as a stop-gap as we refactor commands but allow the tasks
+ to still be passed a single namespace object for the time being.
"""
def __init__(self, config_dict=None, yaml_path=None):
if not yaml_path:
self._conf = self._clean_config(config_dict)
def _clean_config(self, config_dict):
- """ Makes sure that the keys of config_dict are able to be used. For example
- the "--" prefix of a docopt dict isn't valid and won't populate correctly.
+ """
+ Makes sure that the keys of config_dict are able to be used. For example
+ the "--" prefix of a docopt dict isn't valid and won't populate correctly.
"""
result = dict()
for key, value in config_dict.iteritems():
"""
conf_dict = dict()
for conf_path in config_paths:
+ if not os.path.exists(conf_path):
+ log.debug("The config path {0} does not exist, skipping.".format(conf_path))
+ continue
with file(conf_path) as partial_file:
partial_dict = yaml.safe_load(partial_file)
try:
from .run_tasks import run_tasks
from .repo_utils import fetch_qa_suite
from .results import email_results
-from .config import JobConfig, FakeNamespace
+from .config import FakeNamespace
log = logging.getLogger(__name__)
def setup_config(config_paths):
- """ Takes a list of config yaml files and combines them
- into a single dictionary. Processes / validates the dictionary and then
- returns a JobConfig instance.
+ """
+ Takes a list of config yaml files and combines them
+ into a single dictionary. Processes / validates the dictionary and then
+ returns a JobConfig instance.
"""
config = merge_configs(config_paths)
'%d targets are needed for all roles but found %d listed.' % (
roles, targets)
- return JobConfig.from_dict(config)
+ return config
def get_machine_type(machine_type, config):
- """ If no machine_type is given, find the appropriate machine_type
- from the given config.
+ """
+ If no machine_type is given, find the appropriate machine_type
+ from the given config.
"""
if machine_type is None:
fallback_default = config.get('machine_type', 'plana')
def validate_tasks(config):
- """ Ensures that config tasks do not include 'kernal'.
+ """
+ Ensures that config tasks do not include 'kernel'.
- If there is not tasks, return an empty list.
+ If there is not tasks, return an empty list.
"""
if 'tasks' not in config:
log.warning('No tasks specified. Continuing anyway...')
'of the tasks list')
assert 'kernel' not in task, msg
+ return config["tasks"]
+
def get_initial_tasks(lock, config, machine_type):
init_tasks = []
sys.exit(1)
-def main(ctx):
- set_up_logging(ctx["--verbose"], ctx["--archive"])
+def main(args):
+ set_up_logging(args["--verbose"], args["--archive"])
- if ctx["--owner"] is None:
- ctx["--owner"] = get_user()
+ if args["--owner"] is None:
+ args["--owner"] = get_user()
- ctx["<config>"] = setup_config(ctx["<config>"])
+ args["<config>"] = setup_config(args["<config>"])
- write_initial_metadata(ctx["--archive"], ctx["<config>"], ctx["--name"], ctx["--description"], ctx["--owner"])
- report.try_push_job_info(ctx["<config>"], dict(status='running'))
+ write_initial_metadata(args["--archive"], args["<config>"], args["--name"], args["--description"], args["--owner"])
+ report.try_push_job_info(args["<config>"], dict(status='running'))
- machine_type = get_machine_type(ctx["--machine-type"])
+ machine_type = get_machine_type(args["--machine-type"], args["<config>"])
- if ctx["--block"]:
- assert ctx["--lock"], \
+ if args["--block"]:
+ assert args["--lock"], \
'the --block option is only supported with the --lock option'
- log.debug('\n '.join(['Config:', ] + yaml.safe_dump(
- ctx["<config>"], default_flow_style=False).splitlines()))
+ log.debug('\n '.join(['Config:', ] + yaml.safe_dump(args["<config>"], default_flow_style=False).splitlines()))
- ctx["summary"] = get_summary(ctx["--owner"], ctx["--description"])
+ args["summary"] = get_summary(args["--owner"], args["--description"])
- ctx["<config>"]["tasks"] = validate_tasks(ctx["<config>"])
+ args["<config>"]["tasks"] = validate_tasks(args["<config>"])
- init_tasks = get_initial_tasks(ctx["--lock"], ctx["<config>"], machine_type)
+ init_tasks = get_initial_tasks(args["--lock"], args["<config>"], machine_type)
- ctx["<config>"]['tasks'].insert(0, init_tasks)
+ # prepend init_tasks to the front of the task list
+ args["<config>"]['tasks'][:0] = init_tasks
- if ctx["--suite-path"] is not None:
- ctx["<config>"]['suite_path'] = ctx["--suite-path"]
+ if args["--suite-path"] is not None:
+ args["<config>"]['suite_path'] = args["--suite-path"]
# fetches the tasks and returns a new suite_path if needed
- ctx["<config>"]["suite_path"] = fetch_tasks_if_needed(ctx["<config>"])
+ args["<config>"]["suite_path"] = fetch_tasks_if_needed(args["<config>"])
# create a FakeNamespace instance that mimics the old argparse way of doing things
# we do this so we can pass it to run_tasks without porting those tasks to the
# new way of doing things right now
- fake_ctx = FakeNamespace(ctx)
+ fake_ctx = FakeNamespace(args)
try:
- run_tasks(tasks=ctx["<config>"]['tasks'], ctx=fake_ctx)
+ run_tasks(tasks=args["<config>"]['tasks'], ctx=fake_ctx)
finally:
# print to stdout the results and possibly send an email on any errors
- report_outcome(ctx["<config>"], ctx["--archive"], ctx["summary"], fake_ctx)
+ report_outcome(args["<config>"], args["--archive"], args["summary"], fake_ctx)
conf_obj = self.test_class.from_dict(in_dict)
assert conf_obj.foo == 'bar'
+ def test_contains(self):
+ in_dict = dict(foo='bar')
+ conf_obj = self.test_class.from_dict(in_dict)
+ conf_obj.bar = "foo"
+ assert "bar" in conf_obj
+ assert "foo" in conf_obj
+ assert "baz" not in conf_obj
+
def test_to_dict(self):
in_dict = dict(foo='bar')
conf_obj = self.test_class.from_dict(in_dict)
del conf_obj.foo
assert conf_obj.foo is None
+ def test_get(self):
+ conf_obj = self.test_class()
+ conf_obj.foo = "bar"
+ assert conf_obj.get("foo") == "bar"
+ assert conf_obj.get("not_there", "default") == "default"
+
+ def test_assignment(self):
+ conf_obj = self.test_class()
+ conf_obj["foo"] = "bar"
+ assert conf_obj["foo"] == "bar"
+ assert conf_obj.foo == "bar"
+
+ def test_used_with_update(self):
+ d = dict()
+ conf_obj = self.test_class.from_dict({"foo": "bar"})
+ d.update(conf_obj)
+ assert d["foo"] == "bar"
+
class TestTeuthologyConfig(TestYamlConfig):
def setup(self):
and that defaults are properly used. However, we won't check all
the defaults.
"""
- conf_obj = self.test_class(dict())
- assert conf_obj.teuthology_config
+ conf_obj = self.test_class(dict(foo="bar"))
+ assert conf_obj["foo"] == "bar"
+ assert conf_obj.foo == "bar"
assert conf_obj.teuthology_config.archive_base
assert not conf_obj.teuthology_config.automated_scheduling
assert conf_obj.teuthology_config.ceph_git_base_url == 'https://github.com/ceph/'
+ assert conf_obj.teuthology_config["ceph_git_base_url"] == 'https://github.com/ceph/'
class TestMergeConfigs(object):
""" Tests merge_config and deep_merge in teuthology.misc """
+ @patch("os.path.exists")
@patch("yaml.safe_load")
@patch("__builtin__.file")
- def test_merge_configs(self, m_file, m_safe_load):
+ def test_merge_configs(self, m_file, m_safe_load, m_exists):
""" Only tests with one yaml file being passed, mainly just to test
the loop logic. The actual merge will be tested in subsequent
tests.
"""
expected = {"a": "b", "b": "c"}
+ m_exists.return_value = True
m_safe_load.return_value = expected
result = misc.merge_configs(["path/to/config1"])
assert result == expected
m_merge_configs.return_value = config
result = run.setup_config(["some/config.yaml"])
assert m_merge_configs.called
- assert result.job_id == "1"
+ assert result["job_id"] == "1"
assert result["foo"] == "bar"
@patch("teuthology.run.merge_configs")
config = {"targets": range(4), "roles": range(2)}
m_merge_configs.return_value = config
result = run.setup_config(["some/config.yaml"])
- assert result.targets
- assert result.roles
+ assert result["targets"] == [0, 1, 2, 3]
+ assert result["roles"] == [0, 1]
@patch("teuthology.run.merge_configs")
def test_setup_config_targets_invalid(self, m_merge_configs):
result = run.validate_tasks({})
assert result == []
+ def test_validate_tasks_valid(self):
+ expected = {"foo": "bar"}
+ result = run.validate_tasks({"tasks": expected})
+ assert result == expected
+
def test_get_initial_tasks_invalid(self):
with pytest.raises(AssertionError):
run.get_initial_tasks(True, {"targets": "can't be here"}, "machine_type")