- pip install ansible-lint pytest
script:
- if [[ -n $(grep --exclude-dir=.git -P "\xa0" -r .) ]]; then echo 'NBSP characters found'; exit 1; fi
- - pytest -vvvv library/ plugins/
+ - pytest -vvvv tests/library/ tests/plugins/filter/
- for i in $(ls -1 roles/); do ANSIBLE_LOG_PATH=/dev/null ansible-lint -x 204 -v roles/$i/; if [ $? -ne 0 ]; then exit 1; fi; done
+++ /dev/null
-from . import ceph_crush
-import pytest
-
-
-class TestCephCrushModule(object):
-
- def test_no_host(self):
- location = [
- ("chassis", "monchassis"),
- ("rack", "monrack"),
- ("row", "marow"),
- ("pdu", "monpdu"),
- ("pod", "monpod"),
- ("room", "maroom"),
- ("datacenter", "mondc"),
- ("region", "maregion"),
- ("root", "maroute"),
- ]
- with pytest.raises(Exception):
- result = ceph_crush.sort_osd_crush_location(location, None)
-
- def test_lower_than_two_bucket(self):
- location = [
- ("chassis", "monchassis"),
- ]
- with pytest.raises(Exception):
- result = ceph_crush.sort_osd_crush_location(location, None)
-
- def test_invalid_bucket_type(self):
- location = [
- ("host", "monhost"),
- ("chassis", "monchassis"),
- ("rackyyyyy", "monrack"),
- ]
- with pytest.raises(Exception):
- result = ceph_crush.sort_osd_crush_location(location, None)
-
- def test_ordering(self):
- expected_result = [
- ("host", "monhost"),
- ("chassis", "monchassis"),
- ("rack", "monrack"),
- ("row", "marow"),
- ("pdu", "monpdu"),
- ("pod", "monpod"),
- ("room", "maroom"),
- ("datacenter", "mondc"),
- ("region", "maregion"),
- ("root", "maroute"),
- ]
- expected_result_reverse = expected_result[::-1]
- result = ceph_crush.sort_osd_crush_location(expected_result_reverse, None)
- assert expected_result == result
-
- def test_generate_commands(self):
- cluster = "test"
- expected_command_list = [
- ['ceph', '--cluster', cluster, 'osd', 'crush', "add-bucket", "monhost", "host"],
- ['ceph', '--cluster', cluster, 'osd', 'crush', "add-bucket", "monchassis", "chassis"],
- ['ceph', '--cluster', cluster, 'osd', 'crush', "move", "monhost", "chassis=monchassis"],
- ['ceph', '--cluster', cluster, 'osd', 'crush', "add-bucket", "monrack", "rack"],
- ['ceph', '--cluster', cluster, 'osd', 'crush', "move", "monchassis", "rack=monrack"],
- ]
-
- location = [
- ("host", "monhost"),
- ("chassis", "monchassis"),
- ("rack", "monrack"),
- ]
- result = ceph_crush.create_and_move_buckets_list(cluster, location)
- assert result == expected_command_list
-
- def test_generate_commands_container(self):
- cluster = "test"
- containerized = "docker exec -ti ceph-mon"
- expected_command_list = [
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster', cluster, 'osd', 'crush', "add-bucket", "monhost", "host"],
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster', cluster, 'osd', 'crush', "add-bucket", "monchassis", "chassis"],
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster', cluster, 'osd', 'crush', "move", "monhost", "chassis=monchassis"],
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster', cluster, 'osd', 'crush', "add-bucket", "monrack", "rack"],
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster', cluster, 'osd', 'crush', "move", "monchassis", "rack=monrack"],
- ]
-
- location = [
- ("host", "monhost"),
- ("chassis", "monchassis"),
- ("rack", "monrack"),
- ]
- result = ceph_crush.create_and_move_buckets_list(cluster, location, containerized)
- assert result == expected_command_list
+++ /dev/null
-import json
-import os
-from . import ceph_key
-import mock
-
-
-@mock.patch.dict(os.environ, {'CEPH_CONTAINER_BINARY': 'docker'})
-class TestCephKeyModule(object):
-
- def test_generate_secret(self):
- expected_length = 40
- result = len(ceph_key.generate_secret())
- assert result == expected_length
-
- def test_generate_caps_ceph_authtool(self):
- fake_caps = {
- 'mon': 'allow *',
- 'osd': 'allow rwx',
- }
- fake_cmd = ['ceph']
- fake_type = "ceph-authtool"
- expected_command_list = [
- 'ceph',
- '--cap',
- 'mon',
- 'allow *',
- '--cap',
- 'osd',
- 'allow rwx'
- ]
- result = ceph_key.generate_caps(fake_cmd, fake_type, fake_caps)
- assert result == expected_command_list
-
- def test_generate_caps_not_ceph_authtool(self):
- fake_caps = {
- 'mon': 'allow *',
- 'osd': 'allow rwx',
- }
- fake_cmd = ['ceph']
- fake_type = ""
- expected_command_list = [
- 'ceph',
- 'mon',
- 'allow *',
- 'osd',
- 'allow rwx'
- ]
- result = ceph_key.generate_caps(fake_cmd, fake_type, fake_caps)
- assert result == expected_command_list
-
- def test_generate_ceph_cmd_list_non_container(self):
- fake_cluster = "fake"
- fake_args = ['arg']
- fake_user = "fake-user"
- fake_key = "/tmp/my-key"
- expected_command_list = [
- 'ceph',
- '-n',
- "fake-user",
- '-k',
- "/tmp/my-key",
- '--cluster',
- fake_cluster,
- 'auth',
- 'arg'
- ]
- result = ceph_key.generate_ceph_cmd(
- fake_cluster, fake_args, fake_user, fake_key)
- assert result == expected_command_list
-
- def test_generate_ceph_cmd_list_container(self):
- fake_cluster = "fake"
- fake_args = ['arg']
- fake_user = "fake-user"
- fake_key = "/tmp/my-key"
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = ['docker',
- 'run',
- '--rm',
- '--net=host', # noqa E501
- '-v', '/etc/ceph:/etc/ceph:z',
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph',
- 'docker.io/ceph/daemon:latest-luminous',
- '-n',
- "fake-user",
- '-k',
- "/tmp/my-key",
- '--cluster',
- fake_cluster,
- 'auth',
- 'arg']
- result = ceph_key.generate_ceph_cmd(
- fake_cluster, fake_args, fake_user, fake_key, fake_container_image)
- assert result == expected_command_list
-
- def test_generate_ceph_authtool_cmd_non_container_no_auid(self):
- fake_cluster = "fake"
- fake_name = "client.fake"
- fake_secret = "super-secret"
- fake_caps = {
- 'mon': 'allow *',
- 'osd': 'allow rwx',
- }
- fake_dest = "/fake/ceph"
- fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
- fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
- expected_command_list = [
- 'ceph-authtool',
- '--create-keyring',
- fake_file_destination,
- '--name',
- fake_name,
- '--add-key',
- fake_secret,
- '--cap',
- 'mon',
- 'allow *',
- '--cap',
- 'osd',
- 'allow rwx',
- ]
- result = ceph_key.generate_ceph_authtool_cmd(
- fake_cluster, fake_name, fake_secret, fake_caps, fake_file_destination) # noqa E501
- assert result == expected_command_list
-
- def test_generate_ceph_authtool_cmd_container(self):
- fake_cluster = "fake"
- fake_name = "client.fake"
- fake_secret = "super-secret"
- fake_caps = {
- 'mon': 'allow *',
- 'osd': 'allow rwx',
- }
- fake_dest = "/fake/ceph"
- fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
- fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = ['docker',
- 'run',
- '--rm',
- '--net=host',
- '-v', '/etc/ceph:/etc/ceph:z',
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph-authtool',
- 'docker.io/ceph/daemon:latest-luminous',
- '--create-keyring',
- fake_file_destination,
- '--name',
- fake_name,
- '--add-key',
- fake_secret,
- '--cap',
- 'mon',
- 'allow *',
- '--cap',
- 'osd',
- 'allow rwx']
- result = ceph_key.generate_ceph_authtool_cmd(
- fake_cluster, fake_name, fake_secret, fake_caps, fake_file_destination, fake_container_image) # noqa E501
- assert result == expected_command_list
-
- def test_create_key_non_container(self):
- fake_module = "fake"
- fake_result = " fake"
- fake_cluster = "fake"
- fake_name = "client.fake"
- fake_secret = "super-secret"
- fake_caps = {
- 'mon': 'allow *',
- 'osd': 'allow rwx',
- }
- fake_import_key = True
- fake_dest = "/fake/ceph"
- fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
- fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
- expected_command_list = [
- ['ceph-authtool', '--create-keyring', fake_file_destination, '--name', fake_name, # noqa E501
- '--add-key', fake_secret, '--cap', 'mon', 'allow *', '--cap', 'osd', 'allow rwx'], # noqa E501
- ['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', '--cluster', fake_cluster, 'auth', # noqa E501
- 'import', '-i', fake_file_destination],
- ]
- result = ceph_key.create_key(fake_module, fake_result, fake_cluster,
- fake_name, fake_secret, fake_caps, fake_import_key, fake_file_destination) # noqa E501
- assert result == expected_command_list
-
- def test_create_key_container(self):
- fake_module = "fake"
- fake_result = "fake"
- fake_cluster = "fake"
- fake_name = "client.fake"
- fake_secret = "super-secret"
- fake_caps = {
- 'mon': 'allow *',
- 'osd': 'allow rwx',
- }
- fake_dest = "/fake/ceph"
- fake_import_key = True
- fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
- fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = [
- ['docker', # noqa E128
- 'run',
- '--rm',
- '--net=host',
- '-v', '/etc/ceph:/etc/ceph:z',
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph-authtool',
- 'docker.io/ceph/daemon:latest-luminous',
- '--create-keyring', fake_file_destination,
- '--name', fake_name,
- '--add-key', fake_secret,
- '--cap', 'mon', 'allow *',
- '--cap', 'osd', 'allow rwx'],
- ['docker',
- 'run',
- '--rm',
- '--net=host',
- '-v', '/etc/ceph:/etc/ceph:z',
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph',
- 'docker.io/ceph/daemon:latest-luminous',
- '-n', 'client.admin',
- '-k', '/etc/ceph/fake.client.admin.keyring',
- '--cluster', fake_cluster,
- 'auth', 'import',
- '-i', fake_file_destination]
- ]
- result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name, # noqa E501
- fake_secret, fake_caps, fake_import_key, fake_file_destination, fake_container_image) # noqa E501
- assert result == expected_command_list
-
- def test_create_key_non_container_no_import(self):
- fake_module = "fake"
- fake_result = "fake"
- fake_cluster = "fake"
- fake_name = "client.fake"
- fake_secret = "super-secret"
- fake_caps = {
- 'mon': 'allow *',
- 'osd': 'allow rwx',
- }
- fake_dest = "/fake/ceph"
- fake_import_key = False
- fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
- fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
- # create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array # noqa E501
- expected_command_list = [[
- 'ceph-authtool',
- '--create-keyring',
- fake_file_destination,
- '--name',
- fake_name,
- '--add-key',
- fake_secret,
- '--cap',
- 'mon',
- 'allow *',
- '--cap',
- 'osd',
- 'allow rwx', ]
- ]
- result = ceph_key.create_key(fake_module, fake_result, fake_cluster,
- fake_name, fake_secret, fake_caps, fake_import_key, fake_file_destination) # noqa E501
- assert result == expected_command_list
-
- def test_create_key_container_no_import(self):
- fake_module = "fake"
- fake_result = "fake"
- fake_cluster = "fake"
- fake_name = "client.fake"
- fake_secret = "super-secret"
- fake_caps = {
- 'mon': 'allow *',
- 'osd': 'allow rwx',
- }
- fake_dest = "/fake/ceph"
- fake_import_key = False
- fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
- fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
- # create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array # noqa E501
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = [['docker', # noqa E128
- 'run',
- '--rm',
- '--net=host',
- '-v', '/etc/ceph:/etc/ceph:z',
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph-authtool',
- 'docker.io/ceph/daemon:latest-luminous',
- '--create-keyring',
- fake_file_destination,
- '--name',
- fake_name,
- '--add-key',
- fake_secret,
- '--cap',
- 'mon',
- 'allow *',
- '--cap',
- 'osd',
- 'allow rwx']]
- result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name, # noqa E501
- fake_secret, fake_caps, fake_import_key, fake_file_destination, fake_container_image) # noqa E501
- assert result == expected_command_list
-
- def test_update_key_non_container(self):
- fake_cluster = "fake"
- fake_name = "client.fake"
- fake_caps = {
- 'mon': 'allow *',
- 'osd': 'allow rwx',
- }
- expected_command_list = [
- ['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', '--cluster', fake_cluster, 'auth', 'caps', # noqa E501
- fake_name, 'mon', 'allow *', 'osd', 'allow rwx'],
- ]
- result = ceph_key.update_key(fake_cluster, fake_name, fake_caps)
- assert result == expected_command_list
-
- def test_update_key_container(self):
- fake_cluster = "fake"
- fake_name = "client.fake"
- fake_caps = {
- 'mon': 'allow *',
- 'osd': 'allow rwx',
- }
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = [['docker', # noqa E128
- 'run',
- '--rm',
- '--net=host',
- '-v', '/etc/ceph:/etc/ceph:z',
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph',
- 'docker.io/ceph/daemon:latest-luminous',
- '-n', 'client.admin',
- '-k', '/etc/ceph/fake.client.admin.keyring',
- '--cluster', fake_cluster,
- 'auth',
- 'caps', fake_name,
- 'mon', 'allow *', 'osd', 'allow rwx']
- ]
- result = ceph_key.update_key(
- fake_cluster, fake_name, fake_caps, fake_container_image)
- assert result == expected_command_list
-
- def test_delete_key_non_container(self):
- fake_cluster = "fake"
- fake_name = "client.fake"
- expected_command_list = [
- ['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', # noqa E501
- '--cluster', fake_cluster, 'auth', 'del', fake_name],
- ]
- result = ceph_key.delete_key(fake_cluster, fake_name)
- assert result == expected_command_list
-
- def test_delete_key_container(self):
- fake_cluster = "fake"
- fake_name = "client.fake"
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = [['docker', # noqa E128
- 'run',
- '--rm',
- '--net=host',
- '-v', '/etc/ceph:/etc/ceph:z',
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph',
- 'docker.io/ceph/daemon:latest-luminous',
- '-n', 'client.admin',
- '-k', '/etc/ceph/fake.client.admin.keyring',
- '--cluster', fake_cluster,
- 'auth', 'del', fake_name]
- ]
- result = ceph_key.delete_key(
- fake_cluster, fake_name, fake_container_image)
- assert result == expected_command_list
-
- def test_info_key_non_container(self):
- fake_cluster = "fake"
- fake_name = "client.fake"
- fake_user = "fake-user"
- fake_key = "/tmp/my-key"
- fake_output_format = "json"
- expected_command_list = [
- ['ceph', '-n', "fake-user", '-k', "/tmp/my-key", '--cluster', fake_cluster, 'auth', # noqa E501
- 'get', fake_name, '-f', 'json'],
- ]
- result = ceph_key.info_key(
- fake_cluster, fake_name, fake_user, fake_key, fake_output_format)
- assert result == expected_command_list
-
- def test_info_key_container(self):
- fake_cluster = "fake"
- fake_name = "client.fake"
- fake_user = "fake-user"
- fake_key = "/tmp/my-key"
- fake_output_format = "json"
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = [['docker', # noqa E128
- 'run',
- '--rm',
- '--net=host',
- '-v', '/etc/ceph:/etc/ceph:z',
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph',
- 'docker.io/ceph/daemon:latest-luminous',
- '-n', "fake-user",
- '-k', "/tmp/my-key",
- '--cluster', fake_cluster,
- 'auth', 'get', fake_name,
- '-f', 'json']
- ]
- result = ceph_key.info_key(
- fake_cluster, fake_name, fake_user, fake_key, fake_output_format, fake_container_image) # noqa E501
- assert result == expected_command_list
-
- def test_list_key_non_container(self):
- fake_cluster = "fake"
- fake_user = "fake-user"
- fake_key = "/tmp/my-key"
- expected_command_list = [
- ['ceph', '-n', "fake-user", '-k', "/tmp/my-key",
- '--cluster', fake_cluster, 'auth', 'ls', '-f', 'json'],
- ]
- result = ceph_key.list_keys(fake_cluster, fake_user, fake_key)
- assert result == expected_command_list
-
- def test_get_key_container(self):
- fake_cluster = "fake"
- fake_name = "client.fake"
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- fake_dest = "/fake/ceph"
- fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
- fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
- expected_command_list = [['docker', # noqa E128
- 'run',
- '--rm',
- '--net=host',
- '-v', '/etc/ceph:/etc/ceph:z',
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph',
- 'docker.io/ceph/daemon:latest-luminous',
- '-n', "client.admin",
- '-k', "/etc/ceph/fake.client.admin.keyring", # noqa E501
- '--cluster', fake_cluster,
- 'auth', 'get',
- fake_name, '-o', fake_file_destination],
- ]
- result = ceph_key.get_key(
- fake_cluster, fake_name, fake_file_destination, fake_container_image) # noqa E501
- assert result == expected_command_list
-
- def test_get_key_non_container(self):
- fake_cluster = "fake"
- fake_dest = "/fake/ceph"
- fake_name = "client.fake"
- fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
- fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
- expected_command_list = [
- ['ceph', '-n', "client.admin", '-k', "/etc/ceph/fake.client.admin.keyring", # noqa E501
- '--cluster', fake_cluster, 'auth', 'get', fake_name, '-o', fake_file_destination], # noqa E501
- ]
- result = ceph_key.get_key(
- fake_cluster, fake_name, fake_file_destination) # noqa E501
- assert result == expected_command_list
-
- def test_list_key_non_container_with_mon_key(self):
- fake_hostname = "mon01"
- fake_cluster = "fake"
- fake_user = "mon."
- fake_keyring_dirname = fake_cluster + "-" + fake_hostname
- fake_key = os.path.join("/var/lib/ceph/mon/", fake_keyring_dirname, 'keyring') # noqa E501
- expected_command_list = [
- ['ceph', '-n', "mon.", '-k', "/var/lib/ceph/mon/fake-mon01/keyring", # noqa E501
- '--cluster', fake_cluster, 'auth', 'ls', '-f', 'json'],
- ]
- result = ceph_key.list_keys(fake_cluster, fake_user, fake_key)
- assert result == expected_command_list
-
- def test_list_key_container_with_mon_key(self):
- fake_hostname = "mon01"
- fake_cluster = "fake"
- fake_user = "mon."
- fake_keyring_dirname = fake_cluster + "-" + fake_hostname
- fake_key = os.path.join("/var/lib/ceph/mon/", fake_keyring_dirname, 'keyring') # noqa E501
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = [['docker', # noqa E128
- 'run',
- '--rm',
- '--net=host',
- '-v', '/etc/ceph:/etc/ceph:z',
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph',
- 'docker.io/ceph/daemon:latest-luminous',
- '-n', "mon.",
- '-k', "/var/lib/ceph/mon/fake-mon01/keyring", # noqa E501
- '--cluster', fake_cluster,
- 'auth', 'ls',
- '-f', 'json'],
- ]
- result = ceph_key.list_keys(fake_cluster, fake_user, fake_key, fake_container_image) # noqa E501
- assert result == expected_command_list
-
- def test_list_key_container(self):
- fake_cluster = "fake"
- fake_user = "fake-user"
- fake_key = "/tmp/my-key"
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = [['docker', # noqa E128
- 'run',
- '--rm',
- '--net=host',
- '-v', '/etc/ceph:/etc/ceph:z',
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph',
- 'docker.io/ceph/daemon:latest-luminous',
- '-n', "fake-user",
- '-k', "/tmp/my-key",
- '--cluster', fake_cluster,
- 'auth', 'ls',
- '-f', 'json'],
- ]
- result = ceph_key.list_keys(
- fake_cluster, fake_user, fake_key, fake_container_image)
- assert result == expected_command_list
-
- def test_lookup_ceph_initial_entities(self):
- fake_module = "fake"
- fake_ceph_dict = { "auth_dump":[ { "entity":"osd.0", "key":"AQAJkMhbszeBBBAA4/V1tDFXGlft1GnHJS5wWg==", "caps":{ "mgr":"allow profile osd", "mon":"allow profile osd", "osd":"allow *" } }, { "entity":"osd.1", "key":"AQAjkMhbshueAhAAjZec50aBgd1NObLz57SQvg==", "caps":{ "mgr":"allow profile osd", "mon":"allow profile osd", "osd":"allow *" } }, { "entity":"client.admin", "key":"AQDZjshbrJv6EhAAY9v6LzLYNDpPdlC3HD5KHA==", "auid":0, "caps":{ "mds":"allow", "mgr":"allow *", "mon":"allow *", "osd":"allow *" } }, { "entity":"client.bootstrap-mds", "key":"AQDojshbc4QCHhAA1ZTrkt9dbSZRVU2GzI6U4A==", "caps":{ "mon":"allow profile bootstrap-mds" } }, { "entity":"client.bootstrap-mgr", "key":"AQBfiu5bAAAAABAARcNG24hUMlk4AdstVA5MVQ==", "caps":{ "mon":"allow profile bootstrap-mgr" } }, { "entity":"client.bootstrap-osd", "key":"AQDjjshbYW+uGxAAyHcPCXXmVoL8VsTBI8z1Ng==", "caps":{ "mon":"allow profile bootstrap-osd" } }, { "entity":"client.bootstrap-rbd", "key":"AQDyjshb522eIhAAtAz6nUPMOdG4H9u0NgpXhA==", "caps":{ "mon":"allow profile bootstrap-rbd" } }, { "entity":"client.bootstrap-rbd-mirror", "key":"AQDfh+5bAAAAABAAEGBD59Lj2vAKIdN8pq4lbQ==", "caps":{ "mon":"allow profile bootstrap-rbd-mirror" } }, { "entity":"client.bootstrap-rgw", "key":"AQDtjshbDl8oIBAAq1SfSYQKDR49hJNWJVwDQw==", "caps":{ "mon":"allow profile bootstrap-rgw" } }, { "entity":"mgr.mon0", "key":"AQA0j8hbgGapORAAoDkyAvXVkM5ej4wNn4cwTQ==", "caps":{ "mds":"allow *", "mon":"allow profile mgr", "osd":"allow *" } } ] } # noqa E501
- fake_ceph_dict_str = json.dumps(fake_ceph_dict) # convert to string
- expected_entity_list = ['client.admin', 'client.bootstrap-mds', 'client.bootstrap-mgr', # noqa E501
- 'client.bootstrap-osd', 'client.bootstrap-rbd', 'client.bootstrap-rbd-mirror', 'client.bootstrap-rgw'] # noqa E501
- result = ceph_key.lookup_ceph_initial_entities(fake_module, fake_ceph_dict_str)
- assert result == expected_entity_list
-
- def test_build_key_path_admin(self):
- fake_cluster = "fake"
- entity = "client.admin"
- expected_result = "/etc/ceph/fake.client.admin.keyring"
- result = ceph_key.build_key_path(fake_cluster, entity)
- assert result == expected_result
-
- def test_build_key_path_bootstrap_osd(self):
- fake_cluster = "fake"
- entity = "client.bootstrap-osd"
- expected_result = "/var/lib/ceph/bootstrap-osd/fake.keyring"
- result = ceph_key.build_key_path(fake_cluster, entity)
- assert result == expected_result
+++ /dev/null
-from . import ceph_volume
-import mock
-import os
-
-
-# Python 3
-try:
- from unittest.mock import MagicMock
-except ImportError:
- # Python 2
- try:
- from mock import MagicMock
- except ImportError:
- print('You need the mock library installed on python2.x to run tests')
-
-
-@mock.patch.dict(os.environ, {'CEPH_CONTAINER_BINARY': 'docker'})
-class TestCephVolumeModule(object):
-
- def test_data_no_vg(self):
- result = ceph_volume.get_data("/dev/sda", None)
- assert result == "/dev/sda"
-
- def test_data_with_vg(self):
- result = ceph_volume.get_data("data-lv", "data-vg")
- assert result == "data-vg/data-lv"
-
- def test_journal_no_vg(self):
- result = ceph_volume.get_journal("/dev/sda1", None)
- assert result == "/dev/sda1"
-
- def test_journal_with_vg(self):
- result = ceph_volume.get_journal("journal-lv", "journal-vg")
- assert result == "journal-vg/journal-lv"
-
- def test_db_no_vg(self):
- result = ceph_volume.get_db("/dev/sda1", None)
- assert result == "/dev/sda1"
-
- def test_db_with_vg(self):
- result = ceph_volume.get_db("db-lv", "db-vg")
- assert result == "db-vg/db-lv"
-
- def test_wal_no_vg(self):
- result = ceph_volume.get_wal("/dev/sda1", None)
- assert result == "/dev/sda1"
-
- def test_wal_with_vg(self):
- result = ceph_volume.get_wal("wal-lv", "wal-vg")
- assert result == "wal-vg/wal-lv"
-
- def test_container_exec(self):
- fake_binary = "ceph-volume"
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
- '--ulimit', 'nofile=1024:4096',
- '-v', '/run/lock/lvm:/run/lock/lvm:z',
- '-v', '/var/run/udev/:/var/run/udev/:z',
- '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
- '-v', '/run/lvm/:/run/lvm/', # noqa E501
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph-volume',
- 'docker.io/ceph/daemon:latest-luminous']
- result = ceph_volume.container_exec(fake_binary, fake_container_image)
- assert result == expected_command_list
-
- def test_zap_osd_container(self):
- fake_module = MagicMock()
- fake_module.params = {'data': '/dev/sda'}
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
- '--ulimit', 'nofile=1024:4096',
- '-v', '/run/lock/lvm:/run/lock/lvm:z',
- '-v', '/var/run/udev/:/var/run/udev/:z',
- '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
- '-v', '/run/lvm/:/run/lvm/', # noqa E501
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph-volume',
- 'docker.io/ceph/daemon:latest-luminous',
- 'lvm',
- 'zap',
- '--destroy',
- '/dev/sda']
- result = ceph_volume.zap_devices(fake_module, fake_container_image)
- assert result == expected_command_list
-
- def test_zap_osd(self):
- fake_module = MagicMock()
- fake_module.params = {'data': '/dev/sda'}
- fake_container_image = None
- expected_command_list = ['ceph-volume',
- 'lvm',
- 'zap',
- '--destroy',
- '/dev/sda']
- result = ceph_volume.zap_devices(fake_module, fake_container_image)
- assert result == expected_command_list
-
- def test_zap_osd_fsid(self):
- fake_module = MagicMock()
- fake_module.params = {'osd_fsid': 'a_uuid'}
- fake_container_image = None
- expected_command_list = ['ceph-volume',
- 'lvm',
- 'zap',
- '--destroy',
- '--osd-fsid',
- 'a_uuid']
- result = ceph_volume.zap_devices(fake_module, fake_container_image)
- assert result == expected_command_list
-
- def test_activate_osd(self):
- expected_command_list = ['ceph-volume',
- 'lvm',
- 'activate',
- '--all']
- result = ceph_volume.activate_osd()
- assert result == expected_command_list
-
- def test_list_osd(self):
- fake_module = MagicMock()
- fake_module.params = {'cluster': 'ceph', 'data': '/dev/sda'}
- fake_container_image = None
- expected_command_list = ['ceph-volume',
- '--cluster',
- 'ceph',
- 'lvm',
- 'list',
- '/dev/sda',
- '--format=json',
- ]
- result = ceph_volume.list_osd(fake_module, fake_container_image)
- assert result == expected_command_list
-
- def test_list_osd_container(self):
- fake_module = MagicMock()
- fake_module.params = {'cluster': 'ceph', 'data': '/dev/sda'}
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
- '--ulimit', 'nofile=1024:4096',
- '-v', '/run/lock/lvm:/run/lock/lvm:z',
- '-v', '/var/run/udev/:/var/run/udev/:z',
- '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
- '-v', '/run/lvm/:/run/lvm/', # noqa E501
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph-volume',
- 'docker.io/ceph/daemon:latest-luminous',
- '--cluster',
- 'ceph',
- 'lvm',
- 'list',
- '/dev/sda',
- '--format=json',
- ]
- result = ceph_volume.list_osd(fake_module, fake_container_image)
- assert result == expected_command_list
-
- def test_list_storage_inventory(self):
- fake_module = MagicMock()
- fake_container_image = None
- expected_command_list = ['ceph-volume',
- 'inventory',
- '--format=json',
- ]
- result = ceph_volume.list_storage_inventory(fake_module, fake_container_image)
- assert result == expected_command_list
-
- def test_list_storage_inventory_container(self):
- fake_module = MagicMock()
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
- '--ulimit', 'nofile=1024:4096',
- '-v', '/run/lock/lvm:/run/lock/lvm:z',
- '-v', '/var/run/udev/:/var/run/udev/:z',
- '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
- '-v', '/run/lvm/:/run/lvm/', # noqa E501
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph-volume',
- 'docker.io/ceph/daemon:latest-luminous',
- 'inventory',
- '--format=json',
- ]
- result = ceph_volume.list_storage_inventory(fake_module, fake_container_image)
- assert result == expected_command_list
-
- def test_create_osd_container(self):
- fake_module = MagicMock()
- fake_module.params = {'data': '/dev/sda',
- 'objectstore': 'filestore',
- 'cluster': 'ceph', }
-
- fake_action = "create"
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
- '--ulimit', 'nofile=1024:4096',
- '-v', '/run/lock/lvm:/run/lock/lvm:z',
- '-v', '/var/run/udev/:/var/run/udev/:z',
- '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
- '-v', '/run/lvm/:/run/lvm/', # noqa E501
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph-volume',
- 'docker.io/ceph/daemon:latest-luminous',
- '--cluster',
- 'ceph',
- 'lvm',
- 'create',
- '--filestore',
- '--data',
- '/dev/sda']
- result = ceph_volume.prepare_or_create_osd(
- fake_module, fake_action, fake_container_image)
- assert result == expected_command_list
-
- def test_create_osd(self):
- fake_module = MagicMock()
- fake_module.params = {'data': '/dev/sda',
- 'objectstore': 'filestore',
- 'cluster': 'ceph', }
-
- fake_container_image = None
- fake_action = "create"
- expected_command_list = ['ceph-volume',
- '--cluster',
- 'ceph',
- 'lvm',
- 'create',
- '--filestore',
- '--data',
- '/dev/sda']
- result = ceph_volume.prepare_or_create_osd(
- fake_module, fake_action, fake_container_image)
- assert result == expected_command_list
-
- def test_prepare_osd_container(self):
- fake_module = MagicMock()
- fake_module.params = {'data': '/dev/sda',
- 'objectstore': 'filestore',
- 'cluster': 'ceph', }
-
- fake_action = "prepare"
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
- '--ulimit', 'nofile=1024:4096',
- '-v', '/run/lock/lvm:/run/lock/lvm:z',
- '-v', '/var/run/udev/:/var/run/udev/:z',
- '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
- '-v', '/run/lvm/:/run/lvm/', # noqa E501
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph-volume',
- 'docker.io/ceph/daemon:latest-luminous',
- '--cluster',
- 'ceph',
- 'lvm',
- 'prepare',
- '--filestore',
- '--data',
- '/dev/sda']
- result = ceph_volume.prepare_or_create_osd(
- fake_module, fake_action, fake_container_image)
- assert result == expected_command_list
-
- def test_prepare_osd(self):
- fake_module = MagicMock()
- fake_module.params = {'data': '/dev/sda',
- 'objectstore': 'filestore',
- 'cluster': 'ceph', }
-
- fake_container_image = None
- fake_action = "prepare"
- expected_command_list = ['ceph-volume',
- '--cluster',
- 'ceph',
- 'lvm',
- 'prepare',
- '--filestore',
- '--data',
- '/dev/sda']
- result = ceph_volume.prepare_or_create_osd(
- fake_module, fake_action, fake_container_image)
- assert result == expected_command_list
-
- def test_batch_osd_container(self):
- fake_module = MagicMock()
- fake_module.params = {'data': '/dev/sda',
- 'objectstore': 'filestore',
- 'journal_size': '100',
- 'cluster': 'ceph',
- 'batch_devices': ["/dev/sda", "/dev/sdb"]}
-
- fake_container_image = "docker.io/ceph/daemon:latest-luminous"
- expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
- '--ulimit', 'nofile=1024:4096',
- '-v', '/run/lock/lvm:/run/lock/lvm:z',
- '-v', '/var/run/udev/:/var/run/udev/:z',
- '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
- '-v', '/run/lvm/:/run/lvm/', # noqa E501
- '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
- '-v', '/var/log/ceph/:/var/log/ceph/:z',
- '--entrypoint=ceph-volume',
- 'docker.io/ceph/daemon:latest-luminous',
- '--cluster',
- 'ceph',
- 'lvm',
- 'batch',
- '--filestore',
- '--yes',
- '--prepare',
- '--journal-size',
- '100',
- '/dev/sda',
- '/dev/sdb']
- result = ceph_volume.batch(
- fake_module, fake_container_image)
- assert result == expected_command_list
-
- def test_batch_osd(self):
- fake_module = MagicMock()
- fake_module.params = {'data': '/dev/sda',
- 'objectstore': 'filestore',
- 'journal_size': '100',
- 'cluster': 'ceph',
- 'batch_devices': ["/dev/sda", "/dev/sdb"]}
-
- fake_container_image = None
- expected_command_list = ['ceph-volume',
- '--cluster',
- 'ceph',
- 'lvm',
- 'batch',
- '--filestore',
- '--yes',
- '--journal-size',
- '100',
- '/dev/sda',
- '/dev/sdb']
- result = ceph_volume.batch(
- fake_module, fake_container_image)
- assert result == expected_command_list
+++ /dev/null
-from . import ipaddrs_in_ranges
-
-filter_plugin = ipaddrs_in_ranges.FilterModule()
-
-class TestIpaddrsInRanges(object):
-
- def test_one_ip_one_range(self):
- ips = ['10.10.10.1']
- ranges = ['10.10.10.1/24']
- result = filter_plugin.ips_in_ranges(ips, ranges)
- assert ips[0] in result
- assert len(result) == 1
-
- def test_two_ip_one_range(self):
- ips = ['192.168.1.1', '10.10.10.1']
- ranges = ['10.10.10.1/24']
- result = filter_plugin.ips_in_ranges(ips, ranges)
- assert ips[0] not in result
- assert ips[1] in result
- assert len(result) == 1
-
- def test_one_ip_two_ranges(self):
- ips = ['10.10.10.1']
- ranges = ['192.168.1.0/24', '10.10.10.1/24']
- result = filter_plugin.ips_in_ranges(ips, ranges)
- assert ips[0] in result
- assert len(result) == 1
-
- def test_multiple_ips_multiple_ranges(self):
- ips = ['10.10.10.1', '192.168.1.1', '172.16.10.1']
- ranges = ['192.168.1.0/24', '10.10.10.1/24', '172.16.17.0/24']
- result = filter_plugin.ips_in_ranges(ips, ranges)
- assert ips[0] in result
- assert ips[1] in result
- assert ips[2] not in result
- assert len(result) == 2
-
- def test_no_ips_in_ranges(self):
- ips = ['10.10.20.1', '192.168.2.1', '172.16.10.1']
- ranges = ['192.168.1.0/24', '10.10.10.1/24', '172.16.17.0/24']
- result = filter_plugin.ips_in_ranges(ips, ranges)
- assert result == []
-
--- /dev/null
+import sys
+sys.path.append('./library')
+import ceph_crush
+import pytest
+
+
+class TestCephCrushModule(object):
+
+ def test_no_host(self):
+ location = [
+ ("chassis", "monchassis"),
+ ("rack", "monrack"),
+ ("row", "marow"),
+ ("pdu", "monpdu"),
+ ("pod", "monpod"),
+ ("room", "maroom"),
+ ("datacenter", "mondc"),
+ ("region", "maregion"),
+ ("root", "maroute"),
+ ]
+ with pytest.raises(Exception):
+ result = ceph_crush.sort_osd_crush_location(location, None)
+
+ def test_lower_than_two_bucket(self):
+ location = [
+ ("chassis", "monchassis"),
+ ]
+ with pytest.raises(Exception):
+ result = ceph_crush.sort_osd_crush_location(location, None)
+
+ def test_invalid_bucket_type(self):
+ location = [
+ ("host", "monhost"),
+ ("chassis", "monchassis"),
+ ("rackyyyyy", "monrack"),
+ ]
+ with pytest.raises(Exception):
+ result = ceph_crush.sort_osd_crush_location(location, None)
+
+ def test_ordering(self):
+ expected_result = [
+ ("host", "monhost"),
+ ("chassis", "monchassis"),
+ ("rack", "monrack"),
+ ("row", "marow"),
+ ("pdu", "monpdu"),
+ ("pod", "monpod"),
+ ("room", "maroom"),
+ ("datacenter", "mondc"),
+ ("region", "maregion"),
+ ("root", "maroute"),
+ ]
+ expected_result_reverse = expected_result[::-1]
+ result = ceph_crush.sort_osd_crush_location(expected_result_reverse, None)
+ assert expected_result == result
+
+ def test_generate_commands(self):
+ cluster = "test"
+ expected_command_list = [
+ ['ceph', '--cluster', cluster, 'osd', 'crush', "add-bucket", "monhost", "host"],
+ ['ceph', '--cluster', cluster, 'osd', 'crush', "add-bucket", "monchassis", "chassis"],
+ ['ceph', '--cluster', cluster, 'osd', 'crush', "move", "monhost", "chassis=monchassis"],
+ ['ceph', '--cluster', cluster, 'osd', 'crush', "add-bucket", "monrack", "rack"],
+ ['ceph', '--cluster', cluster, 'osd', 'crush', "move", "monchassis", "rack=monrack"],
+ ]
+
+ location = [
+ ("host", "monhost"),
+ ("chassis", "monchassis"),
+ ("rack", "monrack"),
+ ]
+ result = ceph_crush.create_and_move_buckets_list(cluster, location)
+ assert result == expected_command_list
+
+ def test_generate_commands_container(self):
+ cluster = "test"
+ containerized = "docker exec -ti ceph-mon"
+ expected_command_list = [
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster', cluster, 'osd', 'crush', "add-bucket", "monhost", "host"],
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster', cluster, 'osd', 'crush', "add-bucket", "monchassis", "chassis"],
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster', cluster, 'osd', 'crush', "move", "monhost", "chassis=monchassis"],
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster', cluster, 'osd', 'crush', "add-bucket", "monrack", "rack"],
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster', cluster, 'osd', 'crush', "move", "monchassis", "rack=monrack"],
+ ]
+
+ location = [
+ ("host", "monhost"),
+ ("chassis", "monchassis"),
+ ("rack", "monrack"),
+ ]
+ result = ceph_crush.create_and_move_buckets_list(cluster, location, containerized)
+ assert result == expected_command_list
--- /dev/null
+import json
+import os
+import sys
+sys.path.append('./library')
+import ceph_key
+import mock
+
+
+@mock.patch.dict(os.environ, {'CEPH_CONTAINER_BINARY': 'docker'})
+class TestCephKeyModule(object):
+
+ def test_generate_secret(self):
+ expected_length = 40
+ result = len(ceph_key.generate_secret())
+ assert result == expected_length
+
+ def test_generate_caps_ceph_authtool(self):
+ fake_caps = {
+ 'mon': 'allow *',
+ 'osd': 'allow rwx',
+ }
+ fake_cmd = ['ceph']
+ fake_type = "ceph-authtool"
+ expected_command_list = [
+ 'ceph',
+ '--cap',
+ 'mon',
+ 'allow *',
+ '--cap',
+ 'osd',
+ 'allow rwx'
+ ]
+ result = ceph_key.generate_caps(fake_cmd, fake_type, fake_caps)
+ assert result == expected_command_list
+
+ def test_generate_caps_not_ceph_authtool(self):
+ fake_caps = {
+ 'mon': 'allow *',
+ 'osd': 'allow rwx',
+ }
+ fake_cmd = ['ceph']
+ fake_type = ""
+ expected_command_list = [
+ 'ceph',
+ 'mon',
+ 'allow *',
+ 'osd',
+ 'allow rwx'
+ ]
+ result = ceph_key.generate_caps(fake_cmd, fake_type, fake_caps)
+ assert result == expected_command_list
+
+ def test_generate_ceph_cmd_list_non_container(self):
+ fake_cluster = "fake"
+ fake_args = ['arg']
+ fake_user = "fake-user"
+ fake_key = "/tmp/my-key"
+ expected_command_list = [
+ 'ceph',
+ '-n',
+ "fake-user",
+ '-k',
+ "/tmp/my-key",
+ '--cluster',
+ fake_cluster,
+ 'auth',
+ 'arg'
+ ]
+ result = ceph_key.generate_ceph_cmd(
+ fake_cluster, fake_args, fake_user, fake_key)
+ assert result == expected_command_list
+
+ def test_generate_ceph_cmd_list_container(self):
+ fake_cluster = "fake"
+ fake_args = ['arg']
+ fake_user = "fake-user"
+ fake_key = "/tmp/my-key"
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = ['docker',
+ 'run',
+ '--rm',
+ '--net=host', # noqa E501
+ '-v', '/etc/ceph:/etc/ceph:z',
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '-n',
+ "fake-user",
+ '-k',
+ "/tmp/my-key",
+ '--cluster',
+ fake_cluster,
+ 'auth',
+ 'arg']
+ result = ceph_key.generate_ceph_cmd(
+ fake_cluster, fake_args, fake_user, fake_key, fake_container_image)
+ assert result == expected_command_list
+
+ def test_generate_ceph_authtool_cmd_non_container_no_auid(self):
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ fake_secret = "super-secret"
+ fake_caps = {
+ 'mon': 'allow *',
+ 'osd': 'allow rwx',
+ }
+ fake_dest = "/fake/ceph"
+ fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
+ fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
+ expected_command_list = [
+ 'ceph-authtool',
+ '--create-keyring',
+ fake_file_destination,
+ '--name',
+ fake_name,
+ '--add-key',
+ fake_secret,
+ '--cap',
+ 'mon',
+ 'allow *',
+ '--cap',
+ 'osd',
+ 'allow rwx',
+ ]
+ result = ceph_key.generate_ceph_authtool_cmd(
+ fake_cluster, fake_name, fake_secret, fake_caps, fake_file_destination) # noqa E501
+ assert result == expected_command_list
+
+ def test_generate_ceph_authtool_cmd_container(self):
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ fake_secret = "super-secret"
+ fake_caps = {
+ 'mon': 'allow *',
+ 'osd': 'allow rwx',
+ }
+ fake_dest = "/fake/ceph"
+ fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
+ fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = ['docker',
+ 'run',
+ '--rm',
+ '--net=host',
+ '-v', '/etc/ceph:/etc/ceph:z',
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph-authtool',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '--create-keyring',
+ fake_file_destination,
+ '--name',
+ fake_name,
+ '--add-key',
+ fake_secret,
+ '--cap',
+ 'mon',
+ 'allow *',
+ '--cap',
+ 'osd',
+ 'allow rwx']
+ result = ceph_key.generate_ceph_authtool_cmd(
+ fake_cluster, fake_name, fake_secret, fake_caps, fake_file_destination, fake_container_image) # noqa E501
+ assert result == expected_command_list
+
+ def test_create_key_non_container(self):
+ fake_module = "fake"
+ fake_result = " fake"
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ fake_secret = "super-secret"
+ fake_caps = {
+ 'mon': 'allow *',
+ 'osd': 'allow rwx',
+ }
+ fake_import_key = True
+ fake_dest = "/fake/ceph"
+ fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
+ fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
+ expected_command_list = [
+ ['ceph-authtool', '--create-keyring', fake_file_destination, '--name', fake_name, # noqa E501
+ '--add-key', fake_secret, '--cap', 'mon', 'allow *', '--cap', 'osd', 'allow rwx'], # noqa E501
+ ['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', '--cluster', fake_cluster, 'auth', # noqa E501
+ 'import', '-i', fake_file_destination],
+ ]
+ result = ceph_key.create_key(fake_module, fake_result, fake_cluster,
+ fake_name, fake_secret, fake_caps, fake_import_key, fake_file_destination) # noqa E501
+ assert result == expected_command_list
+
+ def test_create_key_container(self):
+ fake_module = "fake"
+ fake_result = "fake"
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ fake_secret = "super-secret"
+ fake_caps = {
+ 'mon': 'allow *',
+ 'osd': 'allow rwx',
+ }
+ fake_dest = "/fake/ceph"
+ fake_import_key = True
+ fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
+ fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = [
+ ['docker', # noqa E128
+ 'run',
+ '--rm',
+ '--net=host',
+ '-v', '/etc/ceph:/etc/ceph:z',
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph-authtool',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '--create-keyring', fake_file_destination,
+ '--name', fake_name,
+ '--add-key', fake_secret,
+ '--cap', 'mon', 'allow *',
+ '--cap', 'osd', 'allow rwx'],
+ ['docker',
+ 'run',
+ '--rm',
+ '--net=host',
+ '-v', '/etc/ceph:/etc/ceph:z',
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '-n', 'client.admin',
+ '-k', '/etc/ceph/fake.client.admin.keyring',
+ '--cluster', fake_cluster,
+ 'auth', 'import',
+ '-i', fake_file_destination]
+ ]
+ result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name, # noqa E501
+ fake_secret, fake_caps, fake_import_key, fake_file_destination, fake_container_image) # noqa E501
+ assert result == expected_command_list
+
+ def test_create_key_non_container_no_import(self):
+ fake_module = "fake"
+ fake_result = "fake"
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ fake_secret = "super-secret"
+ fake_caps = {
+ 'mon': 'allow *',
+ 'osd': 'allow rwx',
+ }
+ fake_dest = "/fake/ceph"
+ fake_import_key = False
+ fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
+ fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
+ # create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array # noqa E501
+ expected_command_list = [[
+ 'ceph-authtool',
+ '--create-keyring',
+ fake_file_destination,
+ '--name',
+ fake_name,
+ '--add-key',
+ fake_secret,
+ '--cap',
+ 'mon',
+ 'allow *',
+ '--cap',
+ 'osd',
+ 'allow rwx', ]
+ ]
+ result = ceph_key.create_key(fake_module, fake_result, fake_cluster,
+ fake_name, fake_secret, fake_caps, fake_import_key, fake_file_destination) # noqa E501
+ assert result == expected_command_list
+
+ def test_create_key_container_no_import(self):
+ fake_module = "fake"
+ fake_result = "fake"
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ fake_secret = "super-secret"
+ fake_caps = {
+ 'mon': 'allow *',
+ 'osd': 'allow rwx',
+ }
+ fake_dest = "/fake/ceph"
+ fake_import_key = False
+ fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
+ fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
+ # create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array # noqa E501
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = [['docker', # noqa E128
+ 'run',
+ '--rm',
+ '--net=host',
+ '-v', '/etc/ceph:/etc/ceph:z',
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph-authtool',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '--create-keyring',
+ fake_file_destination,
+ '--name',
+ fake_name,
+ '--add-key',
+ fake_secret,
+ '--cap',
+ 'mon',
+ 'allow *',
+ '--cap',
+ 'osd',
+ 'allow rwx']]
+ result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name, # noqa E501
+ fake_secret, fake_caps, fake_import_key, fake_file_destination, fake_container_image) # noqa E501
+ assert result == expected_command_list
+
+ def test_update_key_non_container(self):
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ fake_caps = {
+ 'mon': 'allow *',
+ 'osd': 'allow rwx',
+ }
+ expected_command_list = [
+ ['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', '--cluster', fake_cluster, 'auth', 'caps', # noqa E501
+ fake_name, 'mon', 'allow *', 'osd', 'allow rwx'],
+ ]
+ result = ceph_key.update_key(fake_cluster, fake_name, fake_caps)
+ assert result == expected_command_list
+
+ def test_update_key_container(self):
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ fake_caps = {
+ 'mon': 'allow *',
+ 'osd': 'allow rwx',
+ }
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = [['docker', # noqa E128
+ 'run',
+ '--rm',
+ '--net=host',
+ '-v', '/etc/ceph:/etc/ceph:z',
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '-n', 'client.admin',
+ '-k', '/etc/ceph/fake.client.admin.keyring',
+ '--cluster', fake_cluster,
+ 'auth',
+ 'caps', fake_name,
+ 'mon', 'allow *', 'osd', 'allow rwx']
+ ]
+ result = ceph_key.update_key(
+ fake_cluster, fake_name, fake_caps, fake_container_image)
+ assert result == expected_command_list
+
+ def test_delete_key_non_container(self):
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ expected_command_list = [
+ ['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', # noqa E501
+ '--cluster', fake_cluster, 'auth', 'del', fake_name],
+ ]
+ result = ceph_key.delete_key(fake_cluster, fake_name)
+ assert result == expected_command_list
+
+ def test_delete_key_container(self):
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = [['docker', # noqa E128
+ 'run',
+ '--rm',
+ '--net=host',
+ '-v', '/etc/ceph:/etc/ceph:z',
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '-n', 'client.admin',
+ '-k', '/etc/ceph/fake.client.admin.keyring',
+ '--cluster', fake_cluster,
+ 'auth', 'del', fake_name]
+ ]
+ result = ceph_key.delete_key(
+ fake_cluster, fake_name, fake_container_image)
+ assert result == expected_command_list
+
+ def test_info_key_non_container(self):
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ fake_user = "fake-user"
+ fake_key = "/tmp/my-key"
+ fake_output_format = "json"
+ expected_command_list = [
+ ['ceph', '-n', "fake-user", '-k', "/tmp/my-key", '--cluster', fake_cluster, 'auth', # noqa E501
+ 'get', fake_name, '-f', 'json'],
+ ]
+ result = ceph_key.info_key(
+ fake_cluster, fake_name, fake_user, fake_key, fake_output_format)
+ assert result == expected_command_list
+
+ def test_info_key_container(self):
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ fake_user = "fake-user"
+ fake_key = "/tmp/my-key"
+ fake_output_format = "json"
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = [['docker', # noqa E128
+ 'run',
+ '--rm',
+ '--net=host',
+ '-v', '/etc/ceph:/etc/ceph:z',
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '-n', "fake-user",
+ '-k', "/tmp/my-key",
+ '--cluster', fake_cluster,
+ 'auth', 'get', fake_name,
+ '-f', 'json']
+ ]
+ result = ceph_key.info_key(
+ fake_cluster, fake_name, fake_user, fake_key, fake_output_format, fake_container_image) # noqa E501
+ assert result == expected_command_list
+
+ def test_list_key_non_container(self):
+ fake_cluster = "fake"
+ fake_user = "fake-user"
+ fake_key = "/tmp/my-key"
+ expected_command_list = [
+ ['ceph', '-n', "fake-user", '-k', "/tmp/my-key",
+ '--cluster', fake_cluster, 'auth', 'ls', '-f', 'json'],
+ ]
+ result = ceph_key.list_keys(fake_cluster, fake_user, fake_key)
+ assert result == expected_command_list
+
+ def test_get_key_container(self):
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ fake_dest = "/fake/ceph"
+ fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
+ fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
+ expected_command_list = [['docker', # noqa E128
+ 'run',
+ '--rm',
+ '--net=host',
+ '-v', '/etc/ceph:/etc/ceph:z',
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '-n', "client.admin",
+ '-k', "/etc/ceph/fake.client.admin.keyring", # noqa E501
+ '--cluster', fake_cluster,
+ 'auth', 'get',
+ fake_name, '-o', fake_file_destination],
+ ]
+ result = ceph_key.get_key(
+ fake_cluster, fake_name, fake_file_destination, fake_container_image) # noqa E501
+ assert result == expected_command_list
+
+ def test_get_key_non_container(self):
+ fake_cluster = "fake"
+ fake_dest = "/fake/ceph"
+ fake_name = "client.fake"
+ fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
+ fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
+ expected_command_list = [
+ ['ceph', '-n', "client.admin", '-k', "/etc/ceph/fake.client.admin.keyring", # noqa E501
+ '--cluster', fake_cluster, 'auth', 'get', fake_name, '-o', fake_file_destination], # noqa E501
+ ]
+ result = ceph_key.get_key(
+ fake_cluster, fake_name, fake_file_destination) # noqa E501
+ assert result == expected_command_list
+
+ def test_list_key_non_container_with_mon_key(self):
+ fake_hostname = "mon01"
+ fake_cluster = "fake"
+ fake_user = "mon."
+ fake_keyring_dirname = fake_cluster + "-" + fake_hostname
+ fake_key = os.path.join("/var/lib/ceph/mon/", fake_keyring_dirname, 'keyring') # noqa E501
+ expected_command_list = [
+ ['ceph', '-n', "mon.", '-k', "/var/lib/ceph/mon/fake-mon01/keyring", # noqa E501
+ '--cluster', fake_cluster, 'auth', 'ls', '-f', 'json'],
+ ]
+ result = ceph_key.list_keys(fake_cluster, fake_user, fake_key)
+ assert result == expected_command_list
+
+ def test_list_key_container_with_mon_key(self):
+ fake_hostname = "mon01"
+ fake_cluster = "fake"
+ fake_user = "mon."
+ fake_keyring_dirname = fake_cluster + "-" + fake_hostname
+ fake_key = os.path.join("/var/lib/ceph/mon/", fake_keyring_dirname, 'keyring') # noqa E501
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = [['docker', # noqa E128
+ 'run',
+ '--rm',
+ '--net=host',
+ '-v', '/etc/ceph:/etc/ceph:z',
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '-n', "mon.",
+ '-k', "/var/lib/ceph/mon/fake-mon01/keyring", # noqa E501
+ '--cluster', fake_cluster,
+ 'auth', 'ls',
+ '-f', 'json'],
+ ]
+ result = ceph_key.list_keys(fake_cluster, fake_user, fake_key, fake_container_image) # noqa E501
+ assert result == expected_command_list
+
+ def test_list_key_container(self):
+ fake_cluster = "fake"
+ fake_user = "fake-user"
+ fake_key = "/tmp/my-key"
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = [['docker', # noqa E128
+ 'run',
+ '--rm',
+ '--net=host',
+ '-v', '/etc/ceph:/etc/ceph:z',
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '-n', "fake-user",
+ '-k', "/tmp/my-key",
+ '--cluster', fake_cluster,
+ 'auth', 'ls',
+ '-f', 'json'],
+ ]
+ result = ceph_key.list_keys(
+ fake_cluster, fake_user, fake_key, fake_container_image)
+ assert result == expected_command_list
+
+ def test_lookup_ceph_initial_entities(self):
+ fake_module = "fake"
+ fake_ceph_dict = { "auth_dump":[ { "entity":"osd.0", "key":"AQAJkMhbszeBBBAA4/V1tDFXGlft1GnHJS5wWg==", "caps":{ "mgr":"allow profile osd", "mon":"allow profile osd", "osd":"allow *" } }, { "entity":"osd.1", "key":"AQAjkMhbshueAhAAjZec50aBgd1NObLz57SQvg==", "caps":{ "mgr":"allow profile osd", "mon":"allow profile osd", "osd":"allow *" } }, { "entity":"client.admin", "key":"AQDZjshbrJv6EhAAY9v6LzLYNDpPdlC3HD5KHA==", "auid":0, "caps":{ "mds":"allow", "mgr":"allow *", "mon":"allow *", "osd":"allow *" } }, { "entity":"client.bootstrap-mds", "key":"AQDojshbc4QCHhAA1ZTrkt9dbSZRVU2GzI6U4A==", "caps":{ "mon":"allow profile bootstrap-mds" } }, { "entity":"client.bootstrap-mgr", "key":"AQBfiu5bAAAAABAARcNG24hUMlk4AdstVA5MVQ==", "caps":{ "mon":"allow profile bootstrap-mgr" } }, { "entity":"client.bootstrap-osd", "key":"AQDjjshbYW+uGxAAyHcPCXXmVoL8VsTBI8z1Ng==", "caps":{ "mon":"allow profile bootstrap-osd" } }, { "entity":"client.bootstrap-rbd", "key":"AQDyjshb522eIhAAtAz6nUPMOdG4H9u0NgpXhA==", "caps":{ "mon":"allow profile bootstrap-rbd" } }, { "entity":"client.bootstrap-rbd-mirror", "key":"AQDfh+5bAAAAABAAEGBD59Lj2vAKIdN8pq4lbQ==", "caps":{ "mon":"allow profile bootstrap-rbd-mirror" } }, { "entity":"client.bootstrap-rgw", "key":"AQDtjshbDl8oIBAAq1SfSYQKDR49hJNWJVwDQw==", "caps":{ "mon":"allow profile bootstrap-rgw" } }, { "entity":"mgr.mon0", "key":"AQA0j8hbgGapORAAoDkyAvXVkM5ej4wNn4cwTQ==", "caps":{ "mds":"allow *", "mon":"allow profile mgr", "osd":"allow *" } } ] } # noqa E501
+ fake_ceph_dict_str = json.dumps(fake_ceph_dict) # convert to string
+ expected_entity_list = ['client.admin', 'client.bootstrap-mds', 'client.bootstrap-mgr', # noqa E501
+ 'client.bootstrap-osd', 'client.bootstrap-rbd', 'client.bootstrap-rbd-mirror', 'client.bootstrap-rgw'] # noqa E501
+ result = ceph_key.lookup_ceph_initial_entities(fake_module, fake_ceph_dict_str)
+ assert result == expected_entity_list
+
+ def test_build_key_path_admin(self):
+ fake_cluster = "fake"
+ entity = "client.admin"
+ expected_result = "/etc/ceph/fake.client.admin.keyring"
+ result = ceph_key.build_key_path(fake_cluster, entity)
+ assert result == expected_result
+
+ def test_build_key_path_bootstrap_osd(self):
+ fake_cluster = "fake"
+ entity = "client.bootstrap-osd"
+ expected_result = "/var/lib/ceph/bootstrap-osd/fake.keyring"
+ result = ceph_key.build_key_path(fake_cluster, entity)
+ assert result == expected_result
--- /dev/null
+import sys
+sys.path.append('./library')
+import ceph_volume
+import mock
+import os
+
+
+# Python 3
+try:
+ from unittest.mock import MagicMock
+except ImportError:
+ # Python 2
+ try:
+ from mock import MagicMock
+ except ImportError:
+ print('You need the mock library installed on python2.x to run tests')
+
+
+@mock.patch.dict(os.environ, {'CEPH_CONTAINER_BINARY': 'docker'})
+class TestCephVolumeModule(object):
+
+ def test_data_no_vg(self):
+ result = ceph_volume.get_data("/dev/sda", None)
+ assert result == "/dev/sda"
+
+ def test_data_with_vg(self):
+ result = ceph_volume.get_data("data-lv", "data-vg")
+ assert result == "data-vg/data-lv"
+
+ def test_journal_no_vg(self):
+ result = ceph_volume.get_journal("/dev/sda1", None)
+ assert result == "/dev/sda1"
+
+ def test_journal_with_vg(self):
+ result = ceph_volume.get_journal("journal-lv", "journal-vg")
+ assert result == "journal-vg/journal-lv"
+
+ def test_db_no_vg(self):
+ result = ceph_volume.get_db("/dev/sda1", None)
+ assert result == "/dev/sda1"
+
+ def test_db_with_vg(self):
+ result = ceph_volume.get_db("db-lv", "db-vg")
+ assert result == "db-vg/db-lv"
+
+ def test_wal_no_vg(self):
+ result = ceph_volume.get_wal("/dev/sda1", None)
+ assert result == "/dev/sda1"
+
+ def test_wal_with_vg(self):
+ result = ceph_volume.get_wal("wal-lv", "wal-vg")
+ assert result == "wal-vg/wal-lv"
+
+ def test_container_exec(self):
+ fake_binary = "ceph-volume"
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
+ '--ulimit', 'nofile=1024:4096',
+ '-v', '/run/lock/lvm:/run/lock/lvm:z',
+ '-v', '/var/run/udev/:/var/run/udev/:z',
+ '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
+ '-v', '/run/lvm/:/run/lvm/', # noqa E501
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph-volume',
+ 'docker.io/ceph/daemon:latest-luminous']
+ result = ceph_volume.container_exec(fake_binary, fake_container_image)
+ assert result == expected_command_list
+
+ def test_zap_osd_container(self):
+ fake_module = MagicMock()
+ fake_module.params = {'data': '/dev/sda'}
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
+ '--ulimit', 'nofile=1024:4096',
+ '-v', '/run/lock/lvm:/run/lock/lvm:z',
+ '-v', '/var/run/udev/:/var/run/udev/:z',
+ '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
+ '-v', '/run/lvm/:/run/lvm/', # noqa E501
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph-volume',
+ 'docker.io/ceph/daemon:latest-luminous',
+ 'lvm',
+ 'zap',
+ '--destroy',
+ '/dev/sda']
+ result = ceph_volume.zap_devices(fake_module, fake_container_image)
+ assert result == expected_command_list
+
+ def test_zap_osd(self):
+ fake_module = MagicMock()
+ fake_module.params = {'data': '/dev/sda'}
+ fake_container_image = None
+ expected_command_list = ['ceph-volume',
+ 'lvm',
+ 'zap',
+ '--destroy',
+ '/dev/sda']
+ result = ceph_volume.zap_devices(fake_module, fake_container_image)
+ assert result == expected_command_list
+
+ def test_zap_osd_fsid(self):
+ fake_module = MagicMock()
+ fake_module.params = {'osd_fsid': 'a_uuid'}
+ fake_container_image = None
+ expected_command_list = ['ceph-volume',
+ 'lvm',
+ 'zap',
+ '--destroy',
+ '--osd-fsid',
+ 'a_uuid']
+ result = ceph_volume.zap_devices(fake_module, fake_container_image)
+ assert result == expected_command_list
+
+ def test_activate_osd(self):
+ expected_command_list = ['ceph-volume',
+ 'lvm',
+ 'activate',
+ '--all']
+ result = ceph_volume.activate_osd()
+ assert result == expected_command_list
+
+ def test_list_osd(self):
+ fake_module = MagicMock()
+ fake_module.params = {'cluster': 'ceph', 'data': '/dev/sda'}
+ fake_container_image = None
+ expected_command_list = ['ceph-volume',
+ '--cluster',
+ 'ceph',
+ 'lvm',
+ 'list',
+ '/dev/sda',
+ '--format=json',
+ ]
+ result = ceph_volume.list_osd(fake_module, fake_container_image)
+ assert result == expected_command_list
+
+ def test_list_osd_container(self):
+ fake_module = MagicMock()
+ fake_module.params = {'cluster': 'ceph', 'data': '/dev/sda'}
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
+ '--ulimit', 'nofile=1024:4096',
+ '-v', '/run/lock/lvm:/run/lock/lvm:z',
+ '-v', '/var/run/udev/:/var/run/udev/:z',
+ '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
+ '-v', '/run/lvm/:/run/lvm/', # noqa E501
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph-volume',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '--cluster',
+ 'ceph',
+ 'lvm',
+ 'list',
+ '/dev/sda',
+ '--format=json',
+ ]
+ result = ceph_volume.list_osd(fake_module, fake_container_image)
+ assert result == expected_command_list
+
+ def test_list_storage_inventory(self):
+ fake_module = MagicMock()
+ fake_container_image = None
+ expected_command_list = ['ceph-volume',
+ 'inventory',
+ '--format=json',
+ ]
+ result = ceph_volume.list_storage_inventory(fake_module, fake_container_image)
+ assert result == expected_command_list
+
+ def test_list_storage_inventory_container(self):
+ fake_module = MagicMock()
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
+ '--ulimit', 'nofile=1024:4096',
+ '-v', '/run/lock/lvm:/run/lock/lvm:z',
+ '-v', '/var/run/udev/:/var/run/udev/:z',
+ '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
+ '-v', '/run/lvm/:/run/lvm/', # noqa E501
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph-volume',
+ 'docker.io/ceph/daemon:latest-luminous',
+ 'inventory',
+ '--format=json',
+ ]
+ result = ceph_volume.list_storage_inventory(fake_module, fake_container_image)
+ assert result == expected_command_list
+
+ def test_create_osd_container(self):
+ fake_module = MagicMock()
+ fake_module.params = {'data': '/dev/sda',
+ 'objectstore': 'filestore',
+ 'cluster': 'ceph', }
+
+ fake_action = "create"
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
+ '--ulimit', 'nofile=1024:4096',
+ '-v', '/run/lock/lvm:/run/lock/lvm:z',
+ '-v', '/var/run/udev/:/var/run/udev/:z',
+ '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
+ '-v', '/run/lvm/:/run/lvm/', # noqa E501
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph-volume',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '--cluster',
+ 'ceph',
+ 'lvm',
+ 'create',
+ '--filestore',
+ '--data',
+ '/dev/sda']
+ result = ceph_volume.prepare_or_create_osd(
+ fake_module, fake_action, fake_container_image)
+ assert result == expected_command_list
+
+ def test_create_osd(self):
+ fake_module = MagicMock()
+ fake_module.params = {'data': '/dev/sda',
+ 'objectstore': 'filestore',
+ 'cluster': 'ceph', }
+
+ fake_container_image = None
+ fake_action = "create"
+ expected_command_list = ['ceph-volume',
+ '--cluster',
+ 'ceph',
+ 'lvm',
+ 'create',
+ '--filestore',
+ '--data',
+ '/dev/sda']
+ result = ceph_volume.prepare_or_create_osd(
+ fake_module, fake_action, fake_container_image)
+ assert result == expected_command_list
+
+ def test_prepare_osd_container(self):
+ fake_module = MagicMock()
+ fake_module.params = {'data': '/dev/sda',
+ 'objectstore': 'filestore',
+ 'cluster': 'ceph', }
+
+ fake_action = "prepare"
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
+ '--ulimit', 'nofile=1024:4096',
+ '-v', '/run/lock/lvm:/run/lock/lvm:z',
+ '-v', '/var/run/udev/:/var/run/udev/:z',
+ '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
+ '-v', '/run/lvm/:/run/lvm/', # noqa E501
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph-volume',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '--cluster',
+ 'ceph',
+ 'lvm',
+ 'prepare',
+ '--filestore',
+ '--data',
+ '/dev/sda']
+ result = ceph_volume.prepare_or_create_osd(
+ fake_module, fake_action, fake_container_image)
+ assert result == expected_command_list
+
+ def test_prepare_osd(self):
+ fake_module = MagicMock()
+ fake_module.params = {'data': '/dev/sda',
+ 'objectstore': 'filestore',
+ 'cluster': 'ceph', }
+
+ fake_container_image = None
+ fake_action = "prepare"
+ expected_command_list = ['ceph-volume',
+ '--cluster',
+ 'ceph',
+ 'lvm',
+ 'prepare',
+ '--filestore',
+ '--data',
+ '/dev/sda']
+ result = ceph_volume.prepare_or_create_osd(
+ fake_module, fake_action, fake_container_image)
+ assert result == expected_command_list
+
+ def test_batch_osd_container(self):
+ fake_module = MagicMock()
+ fake_module.params = {'data': '/dev/sda',
+ 'objectstore': 'filestore',
+ 'journal_size': '100',
+ 'cluster': 'ceph',
+ 'batch_devices': ["/dev/sda", "/dev/sdb"]}
+
+ fake_container_image = "docker.io/ceph/daemon:latest-luminous"
+ expected_command_list = ['docker', 'run', '--rm', '--privileged', '--net=host', '--ipc=host', # noqa E501
+ '--ulimit', 'nofile=1024:4096',
+ '-v', '/run/lock/lvm:/run/lock/lvm:z',
+ '-v', '/var/run/udev/:/var/run/udev/:z',
+ '-v', '/dev:/dev', '-v', '/etc/ceph:/etc/ceph:z', # noqa E501
+ '-v', '/run/lvm/:/run/lvm/', # noqa E501
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph-volume',
+ 'docker.io/ceph/daemon:latest-luminous',
+ '--cluster',
+ 'ceph',
+ 'lvm',
+ 'batch',
+ '--filestore',
+ '--yes',
+ '--prepare',
+ '--journal-size',
+ '100',
+ '/dev/sda',
+ '/dev/sdb']
+ result = ceph_volume.batch(
+ fake_module, fake_container_image)
+ assert result == expected_command_list
+
+ def test_batch_osd(self):
+ fake_module = MagicMock()
+ fake_module.params = {'data': '/dev/sda',
+ 'objectstore': 'filestore',
+ 'journal_size': '100',
+ 'cluster': 'ceph',
+ 'batch_devices': ["/dev/sda", "/dev/sdb"]}
+
+ fake_container_image = None
+ expected_command_list = ['ceph-volume',
+ '--cluster',
+ 'ceph',
+ 'lvm',
+ 'batch',
+ '--filestore',
+ '--yes',
+ '--journal-size',
+ '100',
+ '/dev/sda',
+ '/dev/sdb']
+ result = ceph_volume.batch(
+ fake_module, fake_container_image)
+ assert result == expected_command_list
--- /dev/null
+import sys
+sys.path.append('./plugins/filter')
+import ipaddrs_in_ranges
+
+filter_plugin = ipaddrs_in_ranges.FilterModule()
+
+class TestIpaddrsInRanges(object):
+
+ def test_one_ip_one_range(self):
+ ips = ['10.10.10.1']
+ ranges = ['10.10.10.1/24']
+ result = filter_plugin.ips_in_ranges(ips, ranges)
+ assert ips[0] in result
+ assert len(result) == 1
+
+ def test_two_ip_one_range(self):
+ ips = ['192.168.1.1', '10.10.10.1']
+ ranges = ['10.10.10.1/24']
+ result = filter_plugin.ips_in_ranges(ips, ranges)
+ assert ips[0] not in result
+ assert ips[1] in result
+ assert len(result) == 1
+
+ def test_one_ip_two_ranges(self):
+ ips = ['10.10.10.1']
+ ranges = ['192.168.1.0/24', '10.10.10.1/24']
+ result = filter_plugin.ips_in_ranges(ips, ranges)
+ assert ips[0] in result
+ assert len(result) == 1
+
+ def test_multiple_ips_multiple_ranges(self):
+ ips = ['10.10.10.1', '192.168.1.1', '172.16.10.1']
+ ranges = ['192.168.1.0/24', '10.10.10.1/24', '172.16.17.0/24']
+ result = filter_plugin.ips_in_ranges(ips, ranges)
+ assert ips[0] in result
+ assert ips[1] in result
+ assert ips[2] not in result
+ assert len(result) == 2
+
+ def test_no_ips_in_ranges(self):
+ ips = ['10.10.20.1', '192.168.2.1', '172.16.10.1']
+ ranges = ['192.168.1.0/24', '10.10.10.1/24', '172.16.17.0/24']
+ result = filter_plugin.ips_in_ranges(ips, ranges)
+ assert result == []
+