-from mock import patch
+from mock import patch, DEFAULT
from pytest import raises
from teuthology.config import FakeNamespace
class TestTask(object):
def setup(self):
+ self.klass = Task
self.ctx = FakeNamespace()
self.ctx.config = dict()
+ self.task_config = dict()
def test_overrides(self):
- self.ctx.config['overrides'] = dict(
- task=dict(
- key_1='overridden',
- ),
+ self.ctx.config['overrides'] = dict()
+ self.ctx.config['overrides'][self.klass.name] = dict(
+ key_1='overridden',
)
- task_config = dict(
+ self.task_config.update(dict(
key_1='default',
key_2='default',
- )
- with Task(self.ctx, task_config) as task:
- assert task.config['key_1'] == 'overridden'
- assert task.config['key_2'] == 'default'
+ ))
+ with patch.multiple(
+ self.klass,
+ begin=DEFAULT,
+ ):
+ with self.klass(self.ctx, self.task_config) as task:
+ assert task.config['key_1'] == 'overridden'
+ assert task.config['key_2'] == 'default'
def test_hosts_no_filter(self):
self.ctx.cluster = Cluster()
- self.ctx.cluster.add(Remote('remote1'), ['role1'])
- self.ctx.cluster.add(Remote('remote2'), ['role2'])
- task_config = dict()
- with Task(self.ctx, task_config) as task:
- task_hosts = task.cluster.remotes.keys()
- assert len(task_hosts) == 2
- assert sorted(host.name for host in task_hosts) == ['remote1',
- 'remote2']
+ self.ctx.cluster.add(Remote('user@remote1'), ['role1'])
+ self.ctx.cluster.add(Remote('user@remote2'), ['role2'])
+ with patch.multiple(
+ self.klass,
+ begin=DEFAULT,
+ ):
+ with self.klass(self.ctx, self.task_config) as task:
+ task_hosts = task.cluster.remotes.keys()
+ assert len(task_hosts) == 2
+ assert sorted(host.shortname for host in task_hosts) == \
+ ['remote1', 'remote2']
def test_hosts_no_results(self):
self.ctx.cluster = Cluster()
- self.ctx.cluster.add(Remote('remote1'), ['role1'])
- task_config = dict(
+ self.ctx.cluster.add(Remote('user@remote1'), ['role1'])
+ self.task_config.update(dict(
hosts=['role2'],
- )
- with raises(RuntimeError):
- with Task(self.ctx, task_config):
- pass
+ ))
+ with patch.multiple(
+ self.klass,
+ begin=DEFAULT,
+ ):
+ with raises(RuntimeError):
+ with self.klass(self.ctx, self.task_config):
+ pass
def test_hosts_one_role(self):
self.ctx.cluster = Cluster()
- self.ctx.cluster.add(Remote('remote1'), ['role1'])
- self.ctx.cluster.add(Remote('remote2'), ['role2'])
- task_config = dict(
+ self.ctx.cluster.add(Remote('user@remote1'), ['role1'])
+ self.ctx.cluster.add(Remote('user@remote2'), ['role2'])
+ self.task_config.update(dict(
hosts=['role1'],
- )
- with Task(self.ctx, task_config) as task:
- task_hosts = task.cluster.remotes.keys()
- assert len(task_hosts) == 1
- assert task_hosts[0].name == 'remote1'
+ ))
+ with patch.multiple(
+ self.klass,
+ begin=DEFAULT,
+ ):
+ with self.klass(self.ctx, self.task_config) as task:
+ task_hosts = task.cluster.remotes.keys()
+ assert len(task_hosts) == 1
+ assert task_hosts[0].shortname == 'remote1'
def test_hosts_two_roles(self):
self.ctx.cluster = Cluster()
- self.ctx.cluster.add(Remote('remote1'), ['role1'])
- self.ctx.cluster.add(Remote('remote2'), ['role2'])
- self.ctx.cluster.add(Remote('remote3'), ['role3'])
- task_config = dict(
+ self.ctx.cluster.add(Remote('user@remote1'), ['role1'])
+ self.ctx.cluster.add(Remote('user@remote2'), ['role2'])
+ self.ctx.cluster.add(Remote('user@remote3'), ['role3'])
+ self.task_config.update(dict(
hosts=['role1', 'role3'],
- )
- with Task(self.ctx, task_config) as task:
- task_hosts = task.cluster.remotes.keys()
- assert len(task_hosts) == 2
- hostnames = [host.name for host in task_hosts]
- assert sorted(hostnames) == ['remote1', 'remote3']
+ ))
+ with patch.multiple(
+ self.klass,
+ begin=DEFAULT,
+ ):
+ with self.klass(self.ctx, self.task_config) as task:
+ task_hosts = task.cluster.remotes.keys()
+ assert len(task_hosts) == 2
+ hostnames = [host.shortname for host in task_hosts]
+ assert sorted(hostnames) == ['remote1', 'remote3']
def test_hosts_two_hostnames(self):
self.ctx.cluster = Cluster()
- self.ctx.cluster.add(Remote('remote1.example.com'), ['role1'])
- self.ctx.cluster.add(Remote('remote2.example.com'), ['role2'])
- self.ctx.cluster.add(Remote('remote3.example.com'), ['role3'])
- task_config = dict(
+ self.ctx.cluster.add(Remote('user@remote1.example.com'), ['role1'])
+ self.ctx.cluster.add(Remote('user@remote2.example.com'), ['role2'])
+ self.ctx.cluster.add(Remote('user@remote3.example.com'), ['role3'])
+ self.task_config.update(dict(
hosts=['remote1', 'remote2.example.com'],
- )
- with Task(self.ctx, task_config) as task:
- task_hosts = task.cluster.remotes.keys()
- assert len(task_hosts) == 2
- hostnames = [host.name for host in task_hosts]
- assert sorted(hostnames) == ['remote1.example.com',
- 'remote2.example.com']
+ ))
+ with patch.multiple(
+ self.klass,
+ begin=DEFAULT,
+ ):
+ with self.klass(self.ctx, self.task_config) as task:
+ task_hosts = task.cluster.remotes.keys()
+ assert len(task_hosts) == 2
+ hostnames = [host.hostname for host in task_hosts]
+ assert sorted(hostnames) == ['remote1.example.com',
+ 'remote2.example.com']
def test_hosts_one_role_one_hostname(self):
self.ctx.cluster = Cluster()
- self.ctx.cluster.add(Remote('remote1.example.com'), ['role1'])
- self.ctx.cluster.add(Remote('remote2.example.com'), ['role2'])
- self.ctx.cluster.add(Remote('remote3.example.com'), ['role3'])
- task_config = dict(
+ self.ctx.cluster.add(Remote('user@remote1.example.com'), ['role1'])
+ self.ctx.cluster.add(Remote('user@remote2.example.com'), ['role2'])
+ self.ctx.cluster.add(Remote('user@remote3.example.com'), ['role3'])
+ self.task_config.update(dict(
hosts=['role1', 'remote2.example.com'],
- )
- with Task(self.ctx, task_config) as task:
- task_hosts = task.cluster.remotes.keys()
- assert len(task_hosts) == 2
- hostnames = [host.name for host in task_hosts]
- assert sorted(hostnames) == ['remote1.example.com',
- 'remote2.example.com']
-
- @patch.object(Task, 'setup')
- def test_setup_called(self, m_setup):
- task_config = dict()
- with Task(self.ctx, task_config):
- m_setup.assert_called_once_with()
-
- @patch.object(Task, 'begin')
- def test_begin_called(self, m_begin):
- task_config = dict()
- with Task(self.ctx, task_config):
- m_begin.assert_called_once_with()
-
- @patch.object(Task, 'end')
- def test_end_called(self, m_end):
- task_config = dict()
- with Task(self.ctx, task_config):
- pass
- m_end.assert_called_once_with()
-
- @patch.object(Task, 'teardown')
- def test_teardown_called(self, m_teardown):
- task_config = dict()
- with Task(self.ctx, task_config):
- pass
- m_teardown.assert_called_once_with()
+ ))
+ with patch.multiple(
+ self.klass,
+ begin=DEFAULT,
+ ):
+ with self.klass(self.ctx, self.task_config) as task:
+ task_hosts = task.cluster.remotes.keys()
+ assert len(task_hosts) == 2
+ hostnames = [host.hostname for host in task_hosts]
+ assert sorted(hostnames) == ['remote1.example.com',
+ 'remote2.example.com']
+
+ def test_setup_called(self):
+ with patch.multiple(
+ self.klass,
+ setup=DEFAULT,
+ begin=DEFAULT,
+ ):
+ with self.klass(self.ctx, self.task_config) as task:
+ task.setup.assert_called_once_with()
+
+ def test_begin_called(self):
+ with patch.multiple(
+ self.klass,
+ setup=DEFAULT,
+ begin=DEFAULT,
+ ):
+ with self.klass(self.ctx, self.task_config) as task:
+ task.begin.assert_called_once_with()
+
+ def test_end_called(self):
+ self.task_config.update(dict())
+ with patch.multiple(
+ self.klass,
+ begin=DEFAULT,
+ end=DEFAULT,
+ ):
+ with self.klass(self.ctx, self.task_config) as task:
+ pass
+ task.end.assert_called_once_with()
+
+ def test_teardown_called(self):
+ self.task_config.update(dict())
+ with patch.multiple(
+ self.klass,
+ setup=DEFAULT,
+ begin=DEFAULT,
+ teardown=DEFAULT,
+ ):
+ with self.klass(self.ctx, self.task_config) as task:
+ pass
+ task.teardown.assert_called_once_with()
def test_skip_teardown(self):
- task_config = dict(
+ self.task_config.update(dict(
skip_teardown=True,
- )
+ ))
def fake_teardown(self):
assert False
- with patch.object(Task, 'teardown', fake_teardown):
- with Task(self.ctx, task_config):
+ with patch.multiple(
+ self.klass,
+ setup=DEFAULT,
+ begin=DEFAULT,
+ teardown=fake_teardown,
+ ):
+ with self.klass(self.ctx, self.task_config):
pass
class TestAnsibleTask(TestTask):
def setup(self):
+ self.klass = Ansible
self.ctx = FakeNamespace()
self.ctx.cluster = Cluster()
self.ctx.cluster.add(Remote('user@remote1'), ['role1'])
self.ctx.cluster.add(Remote('user@remote2'), ['role2'])
self.ctx.config = dict()
+ self.task_config = dict(playbook=[])
def test_setup(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=[]
- )
+ ))
def fake_get_playbook(self):
self.playbook_file = 'fake'
with patch.multiple(
- ansible.Ansible,
+ self.klass,
find_repo=DEFAULT,
get_playbook=fake_get_playbook,
get_inventory=DEFAULT,
generate_hosts_file=DEFAULT,
generate_playbook=Mock(side_effect=Exception),
):
- task = Ansible(self.ctx, task_config)
+ task = self.klass(self.ctx, self.task_config)
task.setup()
def test_setup_generate_playbook(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=[]
- )
+ ))
with patch.multiple(
- ansible.Ansible,
+ self.klass,
find_repo=DEFAULT,
get_playbook=DEFAULT,
get_inventory=DEFAULT,
generate_hosts_file=DEFAULT,
generate_playbook=DEFAULT,
):
- task = Ansible(self.ctx, task_config)
+ task = self.klass(self.ctx, self.task_config)
task.setup()
task.generate_playbook.assert_called_once_with()
def test_find_repo_path(self):
- task_config = dict(
+ self.task_config.update(dict(
repo='~/my/repo',
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.find_repo()
- assert task.repo_path == os.path.expanduser(task_config['repo'])
+ assert task.repo_path == os.path.expanduser(self.task_config['repo'])
@patch('teuthology.task.ansible.fetch_repo')
def test_find_repo_path_remote(self, m_fetch_repo):
- task_config = dict(
+ self.task_config.update(dict(
repo='git://fake_host/repo.git',
- )
+ ))
m_fetch_repo.return_value = '/tmp/repo'
- task = Ansible(self.ctx, task_config)
+ task = self.klass(self.ctx, self.task_config)
task.find_repo()
assert task.repo_path == os.path.expanduser('/tmp/repo')
@patch('teuthology.task.ansible.fetch_repo')
def test_find_repo_http(self, m_fetch_repo):
- task_config = dict(
+ self.task_config.update(dict(
repo='http://example.com/my/repo',
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.find_repo()
- m_fetch_repo.assert_called_once_with(task_config['repo'], 'master')
+ m_fetch_repo.assert_called_once_with(self.task_config['repo'],
+ 'master')
@patch('teuthology.task.ansible.fetch_repo')
def test_find_repo_git(self, m_fetch_repo):
- task_config = dict(
+ self.task_config.update(dict(
repo='git@example.com/my/repo',
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.find_repo()
- m_fetch_repo.assert_called_once_with(task_config['repo'], 'master')
+ m_fetch_repo.assert_called_once_with(self.task_config['repo'],
+ 'master')
def test_playbook_none(self):
- task_config = dict()
- task = Ansible(self.ctx, task_config)
+ del self.task_config['playbook']
+ task = self.klass(self.ctx, self.task_config)
with raises(KeyError):
task.get_playbook()
def test_playbook_wrong_type(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=dict(),
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
with raises(TypeError):
task.get_playbook()
roles=['role1'],
),
]
- task_config = dict(
+ self.task_config.update(dict(
playbook=playbook,
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.get_playbook()
assert task.playbook == playbook
m_get.return_value = Mock()
m_get.return_value.text = 'fake playbook text'
playbook = "http://example.com/my_playbook.yml"
- task_config = dict(
+ self.task_config.update(dict(
playbook=playbook,
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.get_playbook()
m_get.assert_called_once_with(playbook)
def test_playbook_file(self):
fake_playbook = [dict(fake_playbook=True)]
fake_playbook_obj = StringIO(yaml.safe_dump(fake_playbook))
- task_config = dict(
+ self.task_config.update(dict(
playbook='~/fake/playbook',
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
with patch('teuthology.task.ansible.file', create=True) as m_file:
m_file.return_value = fake_playbook_obj
task.get_playbook()
assert task.playbook == fake_playbook
def test_playbook_file_missing(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook='~/fake/playbook',
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
with raises(IOError):
task.get_playbook()
def test_inventory_none(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=[]
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
with patch.object(ansible.os.path, 'exists') as m_exists:
m_exists.return_value = False
task.get_inventory()
def test_inventory_path(self):
inventory = '/my/inventory'
- task_config = dict(
+ self.task_config.update(dict(
playbook=[],
inventory=inventory,
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.get_inventory()
assert task.inventory == inventory
assert task.generated_inventory is False
def test_inventory_etc(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=[]
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
with patch.object(ansible.os.path, 'exists') as m_exists:
m_exists.return_value = True
task.get_inventory()
assert task.generated_inventory is False
def test_generate_hosts_file(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=[]
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
hosts_file_path = '/my/hosts/file'
hosts_file_obj = StringIO()
hosts_file_obj.name = hosts_file_path
roles=['role1', 'role2'],
),
]
- task_config = dict(
+ self.task_config.update(dict(
playbook=playbook
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
playbook_file_path = '/my/playbook/file'
playbook_file_obj = StringIO()
playbook_file_obj.name = playbook_file_path
def test_execute_playbook(self):
playbook = '/my/playbook'
- task_config = dict(
+ self.task_config.update(dict(
playbook=playbook
- )
+ ))
fake_playbook = [dict(fake_playbook=True)]
fake_playbook_obj = StringIO(yaml.safe_dump(fake_playbook))
fake_playbook_obj.name = playbook
- task = Ansible(self.ctx, task_config)
+ task = self.klass(self.ctx, self.task_config)
with patch('teuthology.task.ansible.file', create=True) as m_file:
m_file.return_value = fake_playbook_obj
task.setup()
)
def test_execute_playbook_fail(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=[],
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.setup()
with patch.object(ansible.pexpect, 'run') as m_run:
m_run.return_value = ('', 1)
task.execute_playbook()
def test_build_args_no_tags(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=[],
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.setup()
args = task._build_args()
assert '--tags' not in args
def test_build_args_tags(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=[],
tags="user,pubkeys"
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.setup()
args = task._build_args()
assert args.count('--tags') == 1
assert args[args.index('--tags') + 1] == 'user,pubkeys'
def test_build_args_no_vars(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=[],
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.setup()
args = task._build_args()
assert args.count('--extra-vars') == 1
dict1=dict(key='value'),
)
- task_config = dict(
+ self.task_config.update(dict(
playbook=[],
vars=extra_vars,
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.setup()
args = task._build_args()
assert args.count('--extra-vars') == 1
assert got_extra_vars['dict1'] == extra_vars['dict1']
def test_teardown_inventory(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=[],
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.generated_inventory = True
task.inventory = 'fake'
with patch.object(ansible.os, 'remove') as m_remove:
assert m_remove.called_once_with('fake')
def test_teardown_playbook(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=[],
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.generated_playbook = True
task.playbook_file = Mock()
task.playbook_file.name = 'fake'
assert m_remove.called_once_with('fake')
def test_teardown_cleanup_with_vars(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=[],
cleanup=True,
vars=dict(yum_repos="testing"),
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.inventory = "fake"
task.generated_playbook = True
task.playbook_file = Mock()
task.playbook_file.name = 'fake'
- with patch.object(Ansible, 'execute_playbook') as m_execute:
+ with patch.object(self.klass, 'execute_playbook') as m_execute:
with patch.object(ansible.os, 'remove'):
task.teardown()
task._build_args()
assert 'yum_repos' in task.config['vars']
def test_teardown_cleanup_with_no_vars(self):
- task_config = dict(
+ self.task_config.update(dict(
playbook=[],
cleanup=True,
- )
- task = Ansible(self.ctx, task_config)
+ ))
+ task = self.klass(self.ctx, self.task_config)
task.inventory = "fake"
task.generated_playbook = True
task.playbook_file = Mock()
task.playbook_file.name = 'fake'
- with patch.object(Ansible, 'execute_playbook') as m_execute:
+ with patch.object(self.klass, 'execute_playbook') as m_execute:
with patch.object(ansible.os, 'remove'):
task.teardown()
task._build_args()
class TestCephLabTask(TestTask):
+ klass = CephLab
+
def setup(self):
self.ctx = FakeNamespace()
self.ctx.cluster = Cluster()
- self.ctx.cluster.add(Remote('remote1'), ['role1'])
- self.ctx.cluster.add(Remote('remote2'), ['role2'])
+ self.ctx.cluster.add(Remote('user@remote1'), ['role1'])
+ self.ctx.cluster.add(Remote('user@remote2'), ['role2'])
self.ctx.config = dict()
+ self.task_config = dict()
+ self.patcher_fetch_repo = patch('teuthology.task.ansible.fetch_repo')
+ self.patcher_fetch_repo.return_value = 'PATH'
+ self.patcher_fetch_repo.start()
+
+ def fake_get_playbook(self):
+ self.playbook_file = Mock()
+ self.playbook_file.name = 'cephlab.yml'
+
+ self.patcher_get_playbook = patch(
+ 'teuthology.task.ansible.CephLab.get_playbook',
+ new=fake_get_playbook,
+ )
+ self.patcher_get_playbook.start()
+
+ def teardown(self):
+ self.patcher_fetch_repo.stop()
+ self.patcher_get_playbook.stop()
@patch('teuthology.task.ansible.fetch_repo')
def test_find_repo_http(self, m_fetch_repo):
repo = os.path.join(config.ceph_git_base_url,
'ceph-cm-ansible.git')
- task = CephLab(self.ctx, dict())
+ task = self.klass(self.ctx, dict())
task.find_repo()
m_fetch_repo.assert_called_once_with(repo, 'master')
fake_playbook_obj = StringIO(yaml.safe_dump(fake_playbook))
playbook = 'cephlab.yml'
fake_playbook_obj.name = playbook
- task = CephLab(self.ctx, dict())
+ task = self.klass(self.ctx, dict())
task.repo_path = '/tmp/fake/repo'
with patch('teuthology.task.ansible.file', create=True) as m_file:
m_file.return_value = fake_playbook_obj