- Destination to write the keyring
required: false
default: /etc/ceph/
+ fetch_initial_keys:
+ description:
+ - Fetch client.admin and bootstrap key.
+ This is only needed for Nautilus and above.
+ Writes down to the filesystem the initial keys generated by the monitor. # noqa E501
+ This command can ONLY run from a monitor node.
+ required: false
+ default: false
'''
EXAMPLES = '''
keys_to_create:
- - { name: client.key, key: "AQAin8tUUK84ExAA/QgBtI7gEMWdmnvKBzlXdQ==", caps: { mon: "allow rwx", mds: "allow *" } , mode: "0600" }
- - { name: client.cle, caps: { mon: "allow r", osd: "allow *" } , mode: "0600" }
+ - { name: client.key, key: "AQAin8tUUK84ExAA/QgBtI7gEMWdmnvKBzlXdQ==", caps: { mon: "allow rwx", mds: "allow *" } , mode: "0600" } # noqa e501
+ - { name: client.cle, caps: { mon: "allow r", osd: "allow *" } , mode: "0600" } # noqa e501
caps:
mon: "allow rwx"
secret: AQAin8tUMICVFBAALRHNrV0Z4MXupRw4v9JQ6Q==
caps:
mon: allow *
- dest: "/var/lib/ceph/tmp/keyring.mon"
+ dest: "/var/lib/ceph/tmp/"
import_key: False
- name: create cephx key
- name: list cephx keys
ceph_key:
state: list
+
+- name: fetch cephx keys
+ ceph_key:
+ state: fetch_initial_keys
'''
RETURN = '''# '''
-from ansible.module_utils.basic import AnsibleModule
-import datetime
-import os
-import struct
-import time
-import base64
+from ansible.module_utils.basic import AnsibleModule # noqa E402
+import datetime # noqa E402
+import grp # noqa E402
+import json # noqa E402
+import os # noqa E402
+import pwd # noqa E402
+import stat # noqa E402
+import struct # noqa E402
+import time # noqa E402
+import base64 # noqa E402
+import socket # noqa E402
+
+CEPH_INITIAL_KEYS = ['client.admin', 'client.bootstrap-mds', 'client.bootstrap-mgr', # noqa E501
+ 'client.bootstrap-osd', 'client.bootstrap-rbd', 'client.bootstrap-rbd-mirror', 'client.bootstrap-rgw'] # noqa E501
def fatal(message, module):
return cmd
-def generate_ceph_cmd(cluster, args, containerized=None):
+def generate_ceph_cmd(cluster, args, user, user_key, containerized=None):
'''
Generate 'ceph' command line to execute
'''
base_cmd = [
'ceph',
+ '-n',
+ user,
+ '-k',
+ user_key,
'--cluster',
cluster,
'auth',
return cmd
-def generate_ceph_authtool_cmd(cluster, name, secret, caps, auid, dest, containerized=None):
+def generate_ceph_authtool_cmd(cluster, name, secret, caps, auid, dest, containerized=None): # noqa E501
'''
Generate 'ceph-authtool' command line to execute
'''
- file_destination = os.path.join(
- dest + "/" + cluster + "." + name + ".keyring")
-
cmd = [
'ceph-authtool',
'--create-keyring',
- file_destination,
+ dest,
'--name',
name,
'--add-key',
return cmd
-def create_key(module, result, cluster, name, secret, caps, import_key, auid, dest, containerized=None):
+def create_key(module, result, cluster, name, secret, caps, import_key, auid, dest, containerized=None): # noqa E501
'''
Create a CephX key
'''
- file_path = os.path.join(dest + "/" + cluster + "." + name + ".keyring")
-
args = [
'import',
'-i',
- file_path,
+ dest,
]
cmd_list = []
cluster, name, secret, caps, auid, dest, containerized))
if import_key:
- cmd_list.append(generate_ceph_cmd(cluster, args, containerized))
+ user = "client.admin"
+ user = "client.admin"
+ user_key = os.path.join(
+ "/etc/ceph/" + cluster + ".client.admin.keyring")
+ cmd_list.append(generate_ceph_cmd(
+ cluster, args, user, user_key, containerized))
return cmd_list
]
args = generate_caps(args, "ceph", caps)
- cmd_list.append(generate_ceph_cmd(cluster, args, containerized))
+ user = "client.admin"
+ user_key = os.path.join(
+ "/etc/ceph/" + cluster + ".client.admin.keyring")
+ cmd_list.append(generate_ceph_cmd(
+ cluster, args, user, user_key, containerized))
return cmd_list
name,
]
- cmd_list.append(generate_ceph_cmd(cluster, args, containerized))
+ user = "client.admin"
+ user_key = os.path.join(
+ "/etc/ceph/" + cluster + ".client.admin.keyring")
+ cmd_list.append(generate_ceph_cmd(
+ cluster, args, user, user_key, containerized))
+
+ return cmd_list
+
+
+def get_key(cluster, name, dest, containerized=None):
+ '''
+ Get a CephX key (write on the filesystem)
+ '''
+
+ cmd_list = []
+
+ args = [
+ 'get',
+ name,
+ '-o',
+ dest,
+ ]
+
+ user = "client.admin"
+ user_key = os.path.join(
+ "/etc/ceph/" + cluster + ".client.admin.keyring")
+ cmd_list.append(generate_ceph_cmd(
+ cluster, args, user, user_key, containerized))
return cmd_list
-def info_key(cluster, name, containerized=None):
+def info_key(cluster, name, user, user_key, output_format, containerized=None):
'''
Get information about a CephX key
'''
'get',
name,
'-f',
- 'json',
+ output_format,
]
- cmd_list.append(generate_ceph_cmd(cluster, args, containerized))
+ cmd_list.append(generate_ceph_cmd(
+ cluster, args, user, user_key, containerized))
return cmd_list
-def list_keys(cluster, containerized=None):
+def list_keys(cluster, user, user_key, containerized=None):
'''
List all CephX keys
'''
'json',
]
- cmd_list.append(generate_ceph_cmd(cluster, args, containerized))
+ cmd_list.append(generate_ceph_cmd(
+ cluster, args, user, user_key, containerized))
return cmd_list
return rc, cmd, out, err
+def lookup_ceph_initial_entities(out):
+ '''
+ Lookup Ceph initial keys entries in the auth map
+ '''
+
+ # convert out to json, ansible returns a string...
+ try:
+ out_dict = json.loads(out)
+ except ValueError as e:
+ fatal("Could not decode 'ceph auth list' json output: {}".format(e), module) # noqa E501
+
+ entities = []
+ if "auth_dump" in out_dict:
+ for key in out_dict["auth_dump"]:
+ for k, v in key.items():
+ if k == "entity":
+ if v in CEPH_INITIAL_KEYS:
+ entities.append(v)
+ else:
+ fatal("'auth_dump' key not present in json output:", module) # noqa E501
+
+ if len(entities) != len(CEPH_INITIAL_KEYS):
+ return None
+
+ return entities
+
+
+def build_key_path(cluster, entity):
+ '''
+ Build key path depending on the key type
+ '''
+
+ if "admin" in entity:
+ path = "/etc/ceph"
+ key_path = os.path.join(
+ path + "/" + cluster + "." + entity + ".keyring")
+ elif "bootstrap" in entity:
+ path = "/var/lib/ceph"
+ # bootstrap keys show up as 'client.boostrap-osd'
+ # however the directory is called '/var/lib/ceph/bootstrap-osd'
+ # so we need to substring 'client.'
+ entity_split = entity.split('.')[1]
+ key_path = os.path.join(
+ path + "/" + entity_split + "/" + cluster + ".keyring")
+ else:
+ return None
+
+ return key_path
+
+
def run_module():
module_args = dict(
cluster=dict(type='str', required=False, default='ceph'),
secret=dict(type='str', required=False, default=None),
import_key=dict(type='bool', required=False, default=True),
auid=dict(type='str', required=False, default=None),
- dest=dict(type='str', required=False, default='/etc/ceph/'),
+ dest=dict(type='str', required=False, default='/etc/ceph'),
)
module = AnsibleModule(
# We only want to run this check when a key needs to be added
# There is no guarantee that any cluster is running and we don't need one
if import_key:
+ user = "client.admin"
+ user_key = os.path.join(
+ "/etc/ceph/" + cluster + ".client.admin.keyring")
+ output_format = "json"
rc, cmd, out, err = exec_commands(
- module, info_key(cluster, name, containerized))
+ module, info_key(cluster, name, user, user_key, output_format, containerized)) # noqa E501
if state == "present":
if not caps:
- fatal("Capabilities must be provided when state is 'present'", module)
+ fatal("Capabilities must be provided when state is 'present'", module) # noqa E501
+
+ # Build a different path for bootstrap keys as there are stored as
+ # /var/lib/ceph/bootstrap-rbd/ceph.keyring
+ if 'bootstrap' in dest:
+ file_path = os.path.join(dest + "/" + cluster + ".keyring")
+ else:
+ file_path = os.path.join(dest + "/" + cluster +
+ "." + name + ".keyring")
# We allow 'present' to override any existing key
# ONLY if a secret is provided
# if not we skip the creation
if import_key:
if rc == 0 and not secret:
- result["stdout"] = "skipped, since {0} already exists, if you want to update a key use 'state: update'".format(
- name)
+ # If the key exists in Ceph we must fetch it on the system
+ # because nothing tells us it exists on the fs or not
+ rc, cmd, out, err = exec_commands(module, get_key(cluster, name, file_path, containerized)) # noqa E501
+ result["stdout"] = "skipped, since {0} already exists, we only fetched the key at {1}. If you want to update a key use 'state: update'".format( # noqa E501
+ name, file_path)
result['rc'] = rc
module.exit_json(**result)
rc, cmd, out, err = exec_commands(module, create_key(
- module, result, cluster, name, secret, caps, import_key, auid, dest, containerized))
+ module, result, cluster, name, secret, caps, import_key, auid, file_path, containerized)) # noqa E501
- file_path = os.path.join(
- dest + "/" + cluster + "." + name + ".keyring")
file_args = module.load_file_common_arguments(module.params)
file_args['path'] = file_path
module.set_fs_attributes_if_different(file_args, False)
elif state == "update":
if not caps:
- fatal("Capabilities must be provided when state is 'update'", module)
+ fatal("Capabilities must be provided when state is 'update'", module) # noqa E501
if rc != 0:
result["stdout"] = "skipped, since {0} does not exist".format(name)
rc, cmd, out, err = exec_commands(
module, update_key(cluster, name, caps, containerized))
+ # After the update we don't need to overwrite the key on the filesystem
+ # since the secret has not changed
elif state == "absent":
rc, cmd, out, err = exec_commands(
result['rc'] = 0
module.exit_json(**result)
+ user = "client.admin"
+ user_key = os.path.join(
+ "/etc/ceph/" + cluster + ".client.admin.keyring")
+ output_format = "json"
rc, cmd, out, err = exec_commands(
- module, info_key(cluster, name, containerized))
+ module, info_key(cluster, name, user, user_key, output_format, containerized)) # noqa E501
elif state == "list":
+ user = "client.admin"
+ user_key = os.path.join(
+ "/etc/ceph/" + cluster + ".client.admin.keyring")
rc, cmd, out, err = exec_commands(
- module, list_keys(cluster, containerized))
+ module, list_keys(cluster, user, user_key, containerized))
+
+ elif state == "fetch_initial_keys":
+ hostname = socket.gethostname()
+ user = "mon."
+ user_key = os.path.join(
+ "/var/lib/ceph/mon/" + cluster + "-" + hostname + "/keyring")
+ rc, cmd, out, err = exec_commands(
+ module, list_keys(cluster, user, user_key, containerized))
+ if rc != 0:
+ result["stdout"] = "failed to retrieve ceph keys".format(name)
+ result['rc'] = 0
+ module.exit_json(**result)
+
+ entities = lookup_ceph_initial_entities(out)
+ if entities is None:
+ fatal("Failed to find some of the initial entities", module)
+
+ # get ceph's group and user id
+ ceph_uid = pwd.getpwnam('ceph').pw_uid
+ ceph_grp = grp.getgrnam('ceph').gr_gid
+
+ output_format = "plain"
+ for entity in entities:
+ key_path = build_key_path(cluster, entity)
+ if key_path is None:
+ fatal("Failed to build key path, no entity yet?", module)
+ elif os.path.isfile(key_path):
+ # if the key is already on the filesystem
+ # there is no need to fetch it again
+ continue
+
+ extra_args = [
+ '-o',
+ key_path,
+ ]
+
+ info_cmd = info_key(cluster, entity, user,
+ user_key, output_format, containerized)
+ # we use info_cmd[0] because info_cmd is an array made of an array
+ info_cmd[0].extend(extra_args)
+ rc, cmd, out, err = exec_commands(
+ module, info_cmd) # noqa E501
+
+ # apply ceph:ceph ownership and mode 0400 on keys
+ # FIXME by using
+ # file_args = module.load_file_common_arguments(module.params)
+ # file_args['path'] = dest
+ # module.set_fs_attributes_if_different(file_args, False)
+ try:
+ os.chown(key_path, ceph_uid, ceph_grp)
+ os.chmod(key_path, stat.S_IRUSR)
+ except OSError as e:
+ fatal("Failed to set owner/group/permissions of %s: %s" % (
+ key_path, str(e)), module)
else:
module.fail_json(
- msg='State must either be "present" or "absent" or "update" or "list" or "info".', changed=False, rc=1)
+ msg='State must either be "present" or "absent" or "update" or "list" or "info" or "fetch_initial_keys".', changed=False, rc=1) # noqa E501
endd = datetime.datetime.now()
delta = endd - startd
+import json
import os
-import pytest
from . import ceph_key
+from ansible.compat.tests.mock import MagicMock
class TestCephKeyModule(object):
def test_generate_ceph_cmd_list_non_container(self):
fake_cluster = "fake"
fake_args = ['arg']
+ fake_user = "fake-user"
+ fake_key = "/tmp/my-key"
expected_command_list = [
'ceph',
+ '-n',
+ "fake-user",
+ '-k',
+ "/tmp/my-key",
'--cluster',
fake_cluster,
'auth',
'arg'
]
- result = ceph_key.generate_ceph_cmd(fake_cluster, fake_args)
+ result = ceph_key.generate_ceph_cmd(
+ fake_cluster, fake_args, fake_user, fake_key)
assert result == expected_command_list
def test_generate_ceph_cmd_list_container(self):
fake_cluster = "fake"
fake_args = ['arg']
+ fake_user = "fake-user"
+ fake_key = "/tmp/my-key"
fake_containerized = "docker exec -ti ceph-mon"
expected_command_list = [
'docker',
'-ti',
'ceph-mon',
'ceph',
+ '-n',
+ "fake-user",
+ '-k',
+ "/tmp/my-key",
'--cluster',
fake_cluster,
'auth',
'arg'
]
result = ceph_key.generate_ceph_cmd(
- fake_cluster, fake_args, fake_containerized)
+ fake_cluster, fake_args, fake_user, fake_key, fake_containerized)
assert result == expected_command_list
def test_generate_ceph_authtool_cmd_non_container_no_auid(self):
}
fake_dest = "/fake/ceph"
fake_file_destination = os.path.join(
- fake_dest + "/"+ fake_cluster + "." + fake_name + ".keyring")
+ fake_dest + "/" + fake_cluster + "." + fake_name + ".keyring")
fake_auid = None
expected_command_list = [
'ceph-authtool',
'allow rwx',
]
result = ceph_key.generate_ceph_authtool_cmd(
- fake_cluster, fake_name, fake_secret, fake_caps, fake_auid, fake_dest)
+ fake_cluster, fake_name, fake_secret, fake_caps, fake_auid, fake_file_destination) # noqa E501
assert result == expected_command_list
def test_generate_ceph_authtool_cmd_non_container_auid(self):
'allow rwx',
]
result = ceph_key.generate_ceph_authtool_cmd(
- fake_cluster, fake_name, fake_secret, fake_caps, fake_auid, fake_dest)
+ fake_cluster, fake_name, fake_secret, fake_caps, fake_auid, fake_file_destination) # noqa E501
assert result == expected_command_list
def test_generate_ceph_authtool_cmd_container(self):
'allow rwx'
]
result = ceph_key.generate_ceph_authtool_cmd(
- fake_cluster, fake_name, fake_secret, fake_caps, fake_auid, fake_dest, fake_containerized)
+ fake_cluster, fake_name, fake_secret, fake_caps, fake_auid, fake_file_destination, fake_containerized) # noqa E501
assert result == expected_command_list
def test_create_key_non_container(self):
fake_file_destination = os.path.join(
fake_dest + "/" + fake_cluster + "." + fake_name + ".keyring")
expected_command_list = [
- ['ceph-authtool', '--create-keyring', fake_file_destination, '--name', fake_name,
- '--add-key', fake_secret, '--cap', 'mon', 'allow *', '--cap', 'osd', 'allow rwx'],
- ['ceph', '--cluster', fake_cluster, 'auth',
+ ['ceph-authtool', '--create-keyring', fake_file_destination, '--name', fake_name, # noqa E501
+ '--add-key', fake_secret, '--cap', 'mon', 'allow *', '--cap', 'osd', 'allow rwx'], # noqa E501
+ ['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', '--cluster', fake_cluster, 'auth', # noqa E501
'import', '-i', fake_file_destination],
]
result = ceph_key.create_key(fake_module, fake_result, fake_cluster,
- fake_name, fake_secret, fake_caps, fake_import_key, fake_auid, fake_dest)
+ fake_name, fake_secret, fake_caps, fake_import_key, fake_auid, fake_file_destination) # noqa E501
assert result == expected_command_list
def test_create_key_container(self):
fake_file_destination = os.path.join(
fake_dest + "/" + fake_cluster + "." + fake_name + ".keyring")
expected_command_list = [
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph-authtool', '--create-keyring', fake_file_destination,
- '--name', fake_name, '--add-key', fake_secret, '--cap', 'mon', 'allow *', '--cap', 'osd', 'allow rwx'],
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster',
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph-authtool', '--create-keyring', fake_file_destination, # noqa E501
+ '--name', fake_name, '--add-key', fake_secret, '--cap', 'mon', 'allow *', '--cap', 'osd', 'allow rwx'], # noqa E501
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', '--cluster', # noqa E501
fake_cluster, 'auth', 'import', '-i', fake_file_destination],
]
- result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name,
- fake_secret, fake_caps, fake_import_key, fake_auid, fake_dest, fake_containerized)
+ result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name, # noqa E501
+ fake_secret, fake_caps, fake_import_key, fake_auid, fake_file_destination, fake_containerized) # noqa E501
assert result == expected_command_list
def test_create_key_non_container_no_import(self):
fake_auid = None
fake_file_destination = os.path.join(
fake_dest + "/" + fake_cluster + "." + fake_name + ".keyring")
- # create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array
+ # create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array # noqa E501
expected_command_list = [[
'ceph-authtool',
'--create-keyring',
'allow rwx', ]
]
result = ceph_key.create_key(fake_module, fake_result, fake_cluster,
- fake_name, fake_secret, fake_caps, fake_import_key, fake_auid, fake_dest)
+ fake_name, fake_secret, fake_caps, fake_import_key, fake_auid, fake_file_destination) # noqa E501
assert result == expected_command_list
def test_create_key_container_no_import(self):
fake_file_destination = os.path.join(
fake_dest + "/" + fake_cluster + "." + fake_name + ".keyring")
fake_auid = None
- # create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array
+ # create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array # noqa E501
expected_command_list = [[
'docker',
'exec',
'osd',
'allow rwx', ]
]
- result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name,
- fake_secret, fake_caps, fake_import_key, fake_auid, fake_dest, fake_containerized)
+ result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name, # noqa E501
+ fake_secret, fake_caps, fake_import_key, fake_auid, fake_file_destination, fake_containerized) # noqa E501
assert result == expected_command_list
def test_update_key_non_container(self):
'osd': 'allow rwx',
}
expected_command_list = [
- ['ceph', '--cluster', fake_cluster, 'auth', 'caps',
+ ['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', '--cluster', fake_cluster, 'auth', 'caps', # noqa E501
fake_name, 'mon', 'allow *', 'osd', 'allow rwx'],
]
result = ceph_key.update_key(fake_cluster, fake_name, fake_caps)
'osd': 'allow rwx',
}
expected_command_list = [
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster', fake_cluster,
- 'auth', 'caps', fake_name, 'mon', 'allow *', 'osd', 'allow rwx'],
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', '--cluster', fake_cluster, # noqa E501
+ 'auth', 'caps', fake_name, 'mon', 'allow *', 'osd', 'allow rwx'], # noqa E501
]
result = ceph_key.update_key(
fake_cluster, fake_name, fake_caps, fake_containerized)
fake_cluster = "fake"
fake_name = "client.fake"
expected_command_list = [
- ['ceph', '--cluster', fake_cluster, 'auth', 'del', fake_name],
+ ['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', # noqa E501
+ '--cluster', fake_cluster, 'auth', 'del', fake_name],
]
result = ceph_key.delete_key(fake_cluster, fake_name)
assert result == expected_command_list
fake_name = "client.fake"
fake_containerized = "docker exec -ti ceph-mon"
expected_command_list = [
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph',
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', # noqa E501
'--cluster', fake_cluster, 'auth', 'del', fake_name],
]
result = ceph_key.delete_key(
def test_info_key_non_container(self):
fake_cluster = "fake"
fake_name = "client.fake"
+ fake_user = "fake-user"
+ fake_key = "/tmp/my-key"
+ fake_output_format = "json"
expected_command_list = [
- ['ceph', '--cluster', fake_cluster, 'auth',
+ ['ceph', '-n', "fake-user", '-k', "/tmp/my-key", '--cluster', fake_cluster, 'auth', # noqa E501
'get', fake_name, '-f', 'json'],
]
- result = ceph_key.info_key(fake_cluster, fake_name)
+ result = ceph_key.info_key(
+ fake_cluster, fake_name, fake_user, fake_key, fake_output_format)
assert result == expected_command_list
def test_info_key_container(self):
fake_cluster = "fake"
fake_name = "client.fake"
+ fake_user = "fake-user"
+ fake_key = "/tmp/my-key"
fake_containerized = "docker exec -ti ceph-mon"
+ fake_output_format = "json"
expected_command_list = [
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster',
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '-n', "fake-user", '-k', "/tmp/my-key", '--cluster', # noqa E501
fake_cluster, 'auth', 'get', fake_name, '-f', 'json'],
]
- result = ceph_key.info_key(fake_cluster, fake_name, fake_containerized)
+ result = ceph_key.info_key(
+ fake_cluster, fake_name, fake_user, fake_key, fake_output_format, fake_containerized) # noqa E501
assert result == expected_command_list
def test_list_key_non_container(self):
fake_cluster = "fake"
+ fake_user = "fake-user"
+ fake_key = "/tmp/my-key"
+ expected_command_list = [
+ ['ceph', '-n', "fake-user", '-k', "/tmp/my-key",
+ '--cluster', fake_cluster, 'auth', 'ls', '-f', 'json'],
+ ]
+ result = ceph_key.list_keys(fake_cluster, fake_user, fake_key)
+ assert result == expected_command_list
+
+ def test_get_key_container(self):
+ fake_cluster = "fake"
+ fake_name = "client.fake"
+ fake_containerized = "docker exec -ti ceph-mon"
+ fake_dest = "/fake/ceph"
+ fake_file_destination = os.path.join(
+ fake_dest + "/" + fake_cluster + "." + fake_name + ".keyring")
expected_command_list = [
- ['ceph', '--cluster', fake_cluster, 'auth', 'ls', '-f', 'json'],
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '-n', "client.admin", '-k', "/etc/ceph/fake.client.admin.keyring", '--cluster', # noqa E501
+ fake_cluster, 'auth', 'get', fake_name, '-o', fake_file_destination], # noqa E501
]
- result = ceph_key.list_keys(fake_cluster)
+ result = ceph_key.get_key(
+ fake_cluster, fake_name, fake_file_destination, fake_containerized) # noqa E501
+ assert result == expected_command_list
+
+ def test_get_key_non_container(self):
+ fake_cluster = "fake"
+ fake_dest = "/fake/ceph"
+ fake_name = "client.fake"
+ fake_file_destination = os.path.join(
+ fake_dest + "/" + fake_cluster + "." + fake_name + ".keyring")
+ expected_command_list = [
+ ['ceph', '-n', "client.admin", '-k', "/etc/ceph/fake.client.admin.keyring", # noqa E501
+ '--cluster', fake_cluster, 'auth', 'get', fake_name, '-o', fake_file_destination], # noqa E501
+ ]
+ result = ceph_key.get_key(
+ fake_cluster, fake_name, fake_file_destination) # noqa E501
+ assert result == expected_command_list
+
+ def test_list_key_non_container_with_mon_key(self):
+ fake_hostname = "mon01"
+ fake_cluster = "fake"
+ fake_user = "mon."
+ fake_key = os.path.join("/var/lib/ceph/mon/" + fake_cluster + "-" + fake_hostname + "/keyring") # noqa E501
+ expected_command_list = [
+ ['ceph', '-n', "mon.", '-k', "/var/lib/ceph/mon/fake-mon01/keyring", # noqa E501
+ '--cluster', fake_cluster, 'auth', 'ls', '-f', 'json'],
+ ]
+ result = ceph_key.list_keys(fake_cluster, fake_user, fake_key)
+ assert result == expected_command_list
+
+ def test_list_key_container_with_mon_key(self):
+ fake_hostname = "mon01"
+ fake_cluster = "fake"
+ fake_containerized = "docker exec -ti ceph-mon"
+ fake_user = "mon."
+ fake_key = os.path.join("/var/lib/ceph/mon/" + fake_cluster + "-" + fake_hostname + "/keyring") # noqa E501
+ expected_command_list = [
+ ['docker', 'exec', '-ti', 'ceph-mon','ceph', '-n', "mon.", '-k', "/var/lib/ceph/mon/fake-mon01/keyring", # noqa E501
+ '--cluster', fake_cluster, 'auth', 'ls', '-f', 'json'],
+ ]
+ result = ceph_key.list_keys(fake_cluster, fake_user, fake_key, fake_containerized) # noqa E501
assert result == expected_command_list
def test_list_key_container(self):
fake_cluster = "fake"
fake_containerized = "docker exec -ti ceph-mon"
+ fake_user = "fake-user"
+ fake_key = "/tmp/my-key"
expected_command_list = [
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster',
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '-n', "fake-user", '-k', "/tmp/my-key", '--cluster', # noqa E501
fake_cluster, 'auth', 'ls', '-f', 'json'],
]
- result = ceph_key.list_keys(fake_cluster, fake_containerized)
+ result = ceph_key.list_keys(
+ fake_cluster, fake_user, fake_key, fake_containerized)
assert result == expected_command_list
+
+ def test_lookup_ceph_initial_entities(self):
+
+ # fake_module = MagicMock()
+ fake_ceph_dict = {"auth_dump":[{"entity":"osd.0","key":"AQAJkMhbszeBBBAA4/V1tDFXGlft1GnHJS5wWg==","caps":{"mgr":"allow profile osd","mon":"allow profile osd","osd":"allow *"}},{"entity":"osd.1","key":"AQAjkMhbshueAhAAjZec50aBgd1NObLz57SQvg==","caps":{"mgr":"allow profile osd","mon":"allow profile osd","osd":"allow *"}},{"entity":"client.admin","key":"AQDZjshbrJv6EhAAY9v6LzLYNDpPdlC3HD5KHA==","auid":0,"caps":{"mds":"allow","mgr":"allow *","mon":"allow *","osd":"allow *"}},{"entity":"client.bootstrap-mds","key":"AQDojshbc4QCHhAA1ZTrkt9dbSZRVU2GzI6U4A==","caps":{"mon":"allow profile bootstrap-mds"}},{"entity":"client.bootstrap-osd","key":"AQDjjshbYW+uGxAAyHcPCXXmVoL8VsTBI8z1Ng==","caps":{"mon":"allow profile bootstrap-osd"}},{"entity":"client.bootstrap-rbd","key":"AQDyjshb522eIhAAtAz6nUPMOdG4H9u0NgpXhA==","caps":{"mon":"allow profile bootstrap-rbd"}},{"entity":"client.bootstrap-rgw","key":"AQDtjshbDl8oIBAAq1SfSYQKDR49hJNWJVwDQw==","caps":{"mon":"allow profile bootstrap-rgw"}},{"entity":"mgr.mon0","key":"AQA0j8hbgGapORAAoDkyAvXVkM5ej4wNn4cwTQ==","caps":{"mds":"allow *","mon":"allow profile mgr","osd":"allow *"}}]} # noqa E501
+ fake_ceph_dict_str = json.dumps(fake_ceph_dict) # convert to string
+ expected_entity_list = ['client.admin', 'client.bootstrap-mds', 'client.bootstrap-osd', 'client.bootstrap-rbd', 'client.bootstrap-rgw'] # noqa E501
+ result = ceph_key.lookup_ceph_initial_entities(fake_ceph_dict_str)
+ assert result == expected_entity_list
+
+ def test_build_key_path_admin(self):
+ fake_cluster = "fake"
+ entity = "client.admin"
+ expected_result = "/etc/ceph/fake.client.admin.keyring"
+ result = ceph_key.build_key_path(fake_cluster, entity)
+ assert result == expected_result
+
+ def test_build_key_path_bootstrap_osd(self):
+ fake_cluster = "fake"
+ entity = "bootstrap-osd"
+ expected_result = "/var/lib/ceph/bootstrap-osd/fake.keyring"
+ result = ceph_key.build_key_path(fake_cluster, entity)
+ assert result == expected_result
\ No newline at end of file