import socket
import urlparse
import yaml
+import os
from copy import deepcopy
from libcloud.compute.providers import get_driver
assert hasattr(obj, '_security_groups')
+class TestOpenStackCustomProvisioner(TestOpenStackBase):
+ klass = cloud.openstack.OpenStackProvisioner
+ def get_obj(
+ self, name='node_name', os_type='ubuntu',
+ os_version='16.04', conf=None, test_conf=None):
+
+ if test_conf:
+ yaml_file = os.path.dirname(__file__) + '/' + test_conf
+ print ("Reading conf: %s" % yaml_file)
+ with open(yaml_file) as f:
+ teuth_conf=yaml.safe_load(f)
+ print(teuth_conf)
+ config.libcloud = deepcopy(teuth_conf['libcloud'] or test_config)
+ else:
+ config.libcloud = deepcopy(test_config)
+ return cloud.get_provisioner(
+ node_type='my_provider',
+ name=name,
+ os_type=os_type,
+ os_version=os_version,
+ conf=conf,
+ )
+
+ @mark.parametrize(
+ "conf",
+ [
+ dict(
+ path='test_openstack_userdata_conf.yaml',
+ runcmd_head=['uptime', 'date'],
+ ssh_authorized_keys=['user_public_key1', 'user_public_key2'],
+ user_ssh_pubkey='my_ssh_key',
+ os_version='16.04',
+ os_type='ubuntu',
+ ),
+ dict(
+ path='test_openstack_userdata_conf.yaml',
+ runcmd_head=['uptime', 'date'],
+ ssh_authorized_keys=['user_public_key1', 'user_public_key2'],
+ user_ssh_pubkey=None,
+ os_version='16.04',
+ os_type='ubuntu',
+ ),
+ dict(
+ os_version='16.04',
+ os_type='ubuntu',
+ path=None,
+ user_ssh_pubkey=None,
+ ),
+ ]
+ )
+ def test_userdata_conf(self, conf):
+ self.mocks['m_get_user_ssh_pubkey'].return_value = conf['user_ssh_pubkey']
+ obj = self.get_obj(os_version=conf['os_version'],
+ os_type=conf['os_type'],
+ test_conf=conf['path'])
+ userdata = yaml.safe_load(obj.userdata)
+ print(">>>> ", obj.conf)
+ print(">>>> ", obj.provider.conf)
+ print(">>>> ", obj.provider)
+ print(obj.userdata)
+ if conf and 'path' in conf and conf['path']:
+ assert userdata['runcmd'][0:len(conf['runcmd_head'])] == conf['runcmd_head']
+ assert userdata['bootcmd'] == [
+ 'SuSEfirewall2 stop || true',
+ 'service firewalld stop || true',
+ ]
+ assert 'packages' not in userdata
+ else:
+ assert 'bootcmd' not in userdata
+ assert userdata['packages'] == ['git', 'wget', 'python', 'ntp']
+ assert userdata['user'] == obj.user
+ assert userdata['hostname'] == obj.hostname
+ if 'user_ssh_pubkey' in conf and conf['user_ssh_pubkey']:
+ assert userdata['ssh_authorized_keys'][-1] == conf['user_ssh_pubkey']
+ if 'ssh_authorized_keys' in conf:
+ keys = conf['ssh_authorized_keys']
+ assert userdata['ssh_authorized_keys'][0:len(keys)] == keys
+ else:
+ if 'ssh_authorized_keys' in conf:
+ keys = conf['ssh_authorized_keys']
+ assert userdata['ssh_authorized_keys'][0:len(keys)] == keys
+ else:
+ assert 'ssh_authorized_keys' not in userdata
+
+ @mark.parametrize(
+ "conf",
+ [
+ dict(
+ path='test_openstack_userdata_conf.yaml',
+ runcmd_head=['uptime', 'date'],
+ ),
+ dict(
+ path=None,
+ ),
+ ]
+ )
+ def test_userdata_conf_runcmd(self, conf):
+ self.mocks['m_get_user_ssh_pubkey'].return_value = None
+ obj = self.get_obj(test_conf=conf['path'])
+ userdata = yaml.safe_load(obj.userdata)
+ assert userdata['runcmd'][-2:] == [['passwd', '-d', 'ubuntu'], ['touch', '/.teuth_provisioned']]
+
+ @mark.parametrize(
+ "conf",
+ [
+ dict(
+ path='test_openstack_userdata_conf.yaml',
+ packages=None,
+ ),
+ dict(
+ path=None,
+ packages=['git', 'wget', 'python', 'ntp']
+ ),
+ ]
+ )
+ def test_userdata_conf_packages(self, conf):
+ self.mocks['m_get_user_ssh_pubkey'].return_value = None
+ obj = self.get_obj(test_conf=conf['path'])
+ userdata = yaml.safe_load(obj.userdata)
+ assert userdata.get('packages', None) == conf['packages']
+
class TestOpenStackProvisioner(TestOpenStackBase):
klass = cloud.openstack.OpenStackProvisioner