- Destination to write the keyring
required: false
default: /etc/ceph/
- fetch_initial_keys:
- description:
- - Fetch client.admin and bootstrap key.
- This is only needed for Nautilus and above.
- Writes down to the filesystem the initial keys generated by the monitor. # noqa E501
- This command can ONLY run from a monitor node.
- required: false
- default: false
'''
EXAMPLES = '''
keys_to_create:
- - { name: client.key, key: "AQAin8tUUK84ExAA/QgBtI7gEMWdmnvKBzlXdQ==", caps: { mon: "allow rwx", mds: "allow *" } , mode: "0600" } # noqa e501
- - { name: client.cle, caps: { mon: "allow r", osd: "allow *" } , mode: "0600" } # noqa e501
+ - { name: client.key, key: "AQAin8tUUK84ExAA/QgBtI7gEMWdmnvKBzlXdQ==", caps: { mon: "allow rwx", mds: "allow *" } , mode: "0600" }
+ - { name: client.cle, caps: { mon: "allow r", osd: "allow *" } , mode: "0600" }
caps:
mon: "allow rwx"
- name: list cephx keys
ceph_key:
state: list
-
-- name: fetch cephx keys
- ceph_key:
- state: fetch_initial_keys
'''
RETURN = '''# '''
-from ansible.module_utils.basic import AnsibleModule # noqa E402
-import datetime # noqa E402
-import grp # noqa E402
-import json # noqa E402
-import os # noqa E402
-import pwd # noqa E402
-import stat # noqa E402
-import struct # noqa E402
-import time # noqa E402
-import base64 # noqa E402
-import socket # noqa E402
+from ansible.module_utils.basic import AnsibleModule
+import datetime
+import os
+import struct
+import time
+import base64
def fatal(message, module):
return cmd
-def generate_ceph_cmd(cluster, args, user, user_key, containerized=None):
+def generate_ceph_cmd(cluster, args, containerized=None):
'''
Generate 'ceph' command line to execute
'''
base_cmd = [
'ceph',
- '-n',
- user,
- '-k',
- user_key,
'--cluster',
cluster,
'auth',
return cmd
-def generate_ceph_authtool_cmd(cluster, name, secret, caps, auid, dest, containerized=None): # noqa E501
+def generate_ceph_authtool_cmd(cluster, name, secret, caps, auid, dest, containerized=None):
'''
Generate 'ceph-authtool' command line to execute
'''
return cmd
-def create_key(module, result, cluster, name, secret, caps, import_key, auid, dest, containerized=None): # noqa E501
+def create_key(module, result, cluster, name, secret, caps, import_key, auid, dest, containerized=None):
'''
Create a CephX key
'''
cluster, name, secret, caps, auid, dest, containerized))
if import_key:
- user = "client.admin"
- user = "client.admin"
- user_key = os.path.join(
- "/etc/ceph/" + cluster + ".client.admin.keyring")
- cmd_list.append(generate_ceph_cmd(
- cluster, args, user, user_key, containerized))
+ cmd_list.append(generate_ceph_cmd(cluster, args, containerized))
return cmd_list
]
args = generate_caps(args, "ceph", caps)
- user = "client.admin"
- user_key = os.path.join(
- "/etc/ceph/" + cluster + ".client.admin.keyring")
- cmd_list.append(generate_ceph_cmd(
- cluster, args, user, user_key, containerized))
+ cmd_list.append(generate_ceph_cmd(cluster, args, containerized))
return cmd_list
name,
]
- user = "client.admin"
- user_key = os.path.join(
- "/etc/ceph/" + cluster + ".client.admin.keyring")
- cmd_list.append(generate_ceph_cmd(
- cluster, args, user, user_key, containerized))
+ cmd_list.append(generate_ceph_cmd(cluster, args, containerized))
return cmd_list
-def info_key(cluster, name, user, user_key, output_format, containerized=None):
+def info_key(cluster, name, containerized=None):
'''
Get information about a CephX key
'''
'get',
name,
'-f',
- output_format,
+ 'json',
]
- cmd_list.append(generate_ceph_cmd(
- cluster, args, user, user_key, containerized))
+ cmd_list.append(generate_ceph_cmd(cluster, args, containerized))
return cmd_list
-def list_keys(cluster, user, user_key, containerized=None):
+def list_keys(cluster, containerized=None):
'''
List all CephX keys
'''
'json',
]
- cmd_list.append(generate_ceph_cmd(
- cluster, args, user, user_key, containerized))
+ cmd_list.append(generate_ceph_cmd(cluster, args, containerized))
return cmd_list
return rc, cmd, out, err
-def lookup_ceph_initial_entities(out):
- '''
- Lookup Ceph initial keys entries in the auth map
- '''
-
- # convert out to json, ansible returns a string...
- try:
- out_dict = json.loads(out)
- except ValueError as e:
- fatal("Could not decode 'ceph auth list' json output: {}".format(e), module) # noqa E501
-
- entities = []
- if "auth_dump" in out_dict:
- for key in out_dict["auth_dump"]:
- for k, v in key.items():
- if k == "entity":
- if "client." in v:
- entities.append(v)
- else:
- fatal("'auth_dump' key not present in json output:", module) # noqa E501
-
- return entities
-
-
-def build_key_path(cluster, entity):
- '''
- Build key path depending on the key type
- '''
-
- if "admin" in entity:
- path = "/etc/ceph"
- key_path = os.path.join(
- path + "/" + cluster + "." + entity + ".keyring")
- elif "bootstrap" in entity:
- path = "/var/lib/ceph"
- # bootstrap keys show up as 'client.boostrap-osd'
- # however the directory is called '/var/lib/ceph/bootstrap-osd'
- # so we need to substring 'client.'
- entity_split = entity.split('.')[1]
- key_path = os.path.join(
- path + "/" + entity_split + "/" + cluster + ".keyring")
- else:
- return None
-
- return key_path
-
-
def run_module():
module_args = dict(
cluster=dict(type='str', required=False, default='ceph'),
# We only want to run this check when a key needs to be added
# There is no guarantee that any cluster is running and we don't need one
if import_key:
- user = "client.admin"
- user_key = os.path.join(
- "/etc/ceph/" + cluster + ".client.admin.keyring")
- output_format = "json"
rc, cmd, out, err = exec_commands(
- module, info_key(cluster, name, user, user_key, output_format, containerized)) # noqa E501
+ module, info_key(cluster, name, containerized))
if state == "present":
if not caps:
- fatal("Capabilities must be provided when state is 'present'", module) # noqa E501
+ fatal("Capabilities must be provided when state is 'present'", module)
# We allow 'present' to override any existing key
# ONLY if a secret is provided
# if not we skip the creation
if import_key:
if rc == 0 and not secret:
- result["stdout"] = "skipped, since {0} already exists, if you want to update a key use 'state: update'".format( # noqa E501
+ result["stdout"] = "skipped, since {0} already exists, if you want to update a key use 'state: update'".format(
name)
result['rc'] = rc
module.exit_json(**result)
rc, cmd, out, err = exec_commands(module, create_key(
- module, result, cluster, name, secret, caps, import_key, auid, dest, containerized)) # noqa E501
+ module, result, cluster, name, secret, caps, import_key, auid, dest, containerized))
file_path = os.path.join(
dest + "/" + cluster + "." + name + ".keyring")
module.set_fs_attributes_if_different(file_args, False)
elif state == "update":
if not caps:
- fatal("Capabilities must be provided when state is 'update'", module) # noqa E501
+ fatal("Capabilities must be provided when state is 'update'", module)
if rc != 0:
result["stdout"] = "skipped, since {0} does not exist".format(name)
result['rc'] = 0
module.exit_json(**result)
- user = "client.admin"
- user_key = os.path.join(
- "/etc/ceph/" + cluster + ".client.admin.keyring")
- output_format = "json"
rc, cmd, out, err = exec_commands(
- module, info_key(cluster, name, user, user_key, output_format, containerized)) # noqa E501
+ module, info_key(cluster, name, containerized))
elif state == "list":
- user = "client.admin"
- user_key = os.path.join(
- "/etc/ceph/" + cluster + ".client.admin.keyring")
rc, cmd, out, err = exec_commands(
- module, list_keys(cluster, user, user_key, containerized))
-
- elif state == "fetch_initial_keys":
- hostname = socket.gethostname()
- user = "mon."
- user_key = os.path.join(
- "/var/lib/ceph/mon/" + cluster + "-" + hostname + "/keyring")
- rc, cmd, out, err = exec_commands(
- module, list_keys(cluster, user, user_key, containerized))
- if rc != 0:
- result["stdout"] = "failed to retrieve ceph keys".format(name)
- result['rc'] = 0
- module.exit_json(**result)
-
- entities = lookup_ceph_initial_entities(out)
-
- # get ceph's group and user id
- ceph_uid = pwd.getpwnam('ceph').pw_uid
- ceph_grp = grp.getgrnam('ceph').gr_gid
-
- output_format = "plain"
- for entity in entities:
- key_path = build_key_path(cluster, entity)
- if key_path is None:
- fatal("Failed to build key path, no entity yet?", module)
- elif os.path.isfile(key_path):
- # if the key is already on the filesystem
- # there is no need to fetch it again
- continue
-
- extra_args = [
- '-o',
- key_path,
- ]
-
- info_cmd = info_key(cluster, entity, user,
- user_key, output_format, containerized)
- # we use info_cmd[0] because info_cmd is an array made of an array
- info_cmd[0].extend(extra_args)
- rc, cmd, out, err = exec_commands(
- module, info_cmd) # noqa E501
-
- # apply ceph:ceph ownership and mode 0400 on keys
- try:
- os.chown(key_path, ceph_uid, ceph_grp)
- os.chmod(key_path, stat.S_IRUSR)
- except OSError as e:
- fatal("Failed to set owner/group/permissions of %s: %s" % (
- key_path, str(e)), module)
+ module, list_keys(cluster, containerized))
else:
module.fail_json(
- msg='State must either be "present" or "absent" or "update" or "list" or "info" or "fetch_initial_keys".', changed=False, rc=1) # noqa E501
+ msg='State must either be "present" or "absent" or "update" or "list" or "info".', changed=False, rc=1)
endd = datetime.datetime.now()
delta = endd - startd
-import json
import os
+import pytest
from . import ceph_key
-from ansible.compat.tests.mock import MagicMock
class TestCephKeyModule(object):
def test_generate_ceph_cmd_list_non_container(self):
fake_cluster = "fake"
fake_args = ['arg']
- fake_user = "fake-user"
- fake_key = "/tmp/my-key"
expected_command_list = [
'ceph',
- '-n',
- "fake-user",
- '-k',
- "/tmp/my-key",
'--cluster',
fake_cluster,
'auth',
'arg'
]
- result = ceph_key.generate_ceph_cmd(
- fake_cluster, fake_args, fake_user, fake_key)
+ result = ceph_key.generate_ceph_cmd(fake_cluster, fake_args)
assert result == expected_command_list
def test_generate_ceph_cmd_list_container(self):
fake_cluster = "fake"
fake_args = ['arg']
- fake_user = "fake-user"
- fake_key = "/tmp/my-key"
fake_containerized = "docker exec -ti ceph-mon"
expected_command_list = [
'docker',
'-ti',
'ceph-mon',
'ceph',
- '-n',
- "fake-user",
- '-k',
- "/tmp/my-key",
'--cluster',
fake_cluster,
'auth',
'arg'
]
result = ceph_key.generate_ceph_cmd(
- fake_cluster, fake_args, fake_user, fake_key, fake_containerized)
+ fake_cluster, fake_args, fake_containerized)
assert result == expected_command_list
def test_generate_ceph_authtool_cmd_non_container_no_auid(self):
}
fake_dest = "/fake/ceph"
fake_file_destination = os.path.join(
- fake_dest + "/" + fake_cluster + "." + fake_name + ".keyring")
+ fake_dest + "/"+ fake_cluster + "." + fake_name + ".keyring")
fake_auid = None
expected_command_list = [
'ceph-authtool',
'allow rwx',
]
result = ceph_key.generate_ceph_authtool_cmd(
- fake_cluster, fake_name, fake_secret, fake_caps, fake_auid, fake_dest) # noqa E501
+ fake_cluster, fake_name, fake_secret, fake_caps, fake_auid, fake_dest)
assert result == expected_command_list
def test_generate_ceph_authtool_cmd_non_container_auid(self):
'allow rwx',
]
result = ceph_key.generate_ceph_authtool_cmd(
- fake_cluster, fake_name, fake_secret, fake_caps, fake_auid, fake_dest) # noqa E501
+ fake_cluster, fake_name, fake_secret, fake_caps, fake_auid, fake_dest)
assert result == expected_command_list
def test_generate_ceph_authtool_cmd_container(self):
'allow rwx'
]
result = ceph_key.generate_ceph_authtool_cmd(
- fake_cluster, fake_name, fake_secret, fake_caps, fake_auid, fake_dest, fake_containerized) # noqa E501
+ fake_cluster, fake_name, fake_secret, fake_caps, fake_auid, fake_dest, fake_containerized)
assert result == expected_command_list
def test_create_key_non_container(self):
fake_file_destination = os.path.join(
fake_dest + "/" + fake_cluster + "." + fake_name + ".keyring")
expected_command_list = [
- ['ceph-authtool', '--create-keyring', fake_file_destination, '--name', fake_name, # noqa E501
- '--add-key', fake_secret, '--cap', 'mon', 'allow *', '--cap', 'osd', 'allow rwx'], # noqa E501
- ['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', '--cluster', fake_cluster, 'auth', # noqa E501
+ ['ceph-authtool', '--create-keyring', fake_file_destination, '--name', fake_name,
+ '--add-key', fake_secret, '--cap', 'mon', 'allow *', '--cap', 'osd', 'allow rwx'],
+ ['ceph', '--cluster', fake_cluster, 'auth',
'import', '-i', fake_file_destination],
]
result = ceph_key.create_key(fake_module, fake_result, fake_cluster,
- fake_name, fake_secret, fake_caps, fake_import_key, fake_auid, fake_dest) # noqa E501
+ fake_name, fake_secret, fake_caps, fake_import_key, fake_auid, fake_dest)
assert result == expected_command_list
def test_create_key_container(self):
fake_file_destination = os.path.join(
fake_dest + "/" + fake_cluster + "." + fake_name + ".keyring")
expected_command_list = [
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph-authtool', '--create-keyring', fake_file_destination, # noqa E501
- '--name', fake_name, '--add-key', fake_secret, '--cap', 'mon', 'allow *', '--cap', 'osd', 'allow rwx'], # noqa E501
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', '--cluster', # noqa E501
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph-authtool', '--create-keyring', fake_file_destination,
+ '--name', fake_name, '--add-key', fake_secret, '--cap', 'mon', 'allow *', '--cap', 'osd', 'allow rwx'],
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster',
fake_cluster, 'auth', 'import', '-i', fake_file_destination],
]
- result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name, # noqa E501
- fake_secret, fake_caps, fake_import_key, fake_auid, fake_dest, fake_containerized) # noqa E501
+ result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name,
+ fake_secret, fake_caps, fake_import_key, fake_auid, fake_dest, fake_containerized)
assert result == expected_command_list
def test_create_key_non_container_no_import(self):
fake_auid = None
fake_file_destination = os.path.join(
fake_dest + "/" + fake_cluster + "." + fake_name + ".keyring")
- # create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array # noqa E501
+ # create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array
expected_command_list = [[
'ceph-authtool',
'--create-keyring',
'allow rwx', ]
]
result = ceph_key.create_key(fake_module, fake_result, fake_cluster,
- fake_name, fake_secret, fake_caps, fake_import_key, fake_auid, fake_dest) # noqa E501
+ fake_name, fake_secret, fake_caps, fake_import_key, fake_auid, fake_dest)
assert result == expected_command_list
def test_create_key_container_no_import(self):
fake_file_destination = os.path.join(
fake_dest + "/" + fake_cluster + "." + fake_name + ".keyring")
fake_auid = None
- # create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array # noqa E501
+ # create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array
expected_command_list = [[
'docker',
'exec',
'osd',
'allow rwx', ]
]
- result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name, # noqa E501
- fake_secret, fake_caps, fake_import_key, fake_auid, fake_dest, fake_containerized) # noqa E501
+ result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name,
+ fake_secret, fake_caps, fake_import_key, fake_auid, fake_dest, fake_containerized)
assert result == expected_command_list
def test_update_key_non_container(self):
'osd': 'allow rwx',
}
expected_command_list = [
- ['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', '--cluster', fake_cluster, 'auth', 'caps', # noqa E501
+ ['ceph', '--cluster', fake_cluster, 'auth', 'caps',
fake_name, 'mon', 'allow *', 'osd', 'allow rwx'],
]
result = ceph_key.update_key(fake_cluster, fake_name, fake_caps)
'osd': 'allow rwx',
}
expected_command_list = [
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', '--cluster', fake_cluster, # noqa E501
- 'auth', 'caps', fake_name, 'mon', 'allow *', 'osd', 'allow rwx'], # noqa E501
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster', fake_cluster,
+ 'auth', 'caps', fake_name, 'mon', 'allow *', 'osd', 'allow rwx'],
]
result = ceph_key.update_key(
fake_cluster, fake_name, fake_caps, fake_containerized)
fake_cluster = "fake"
fake_name = "client.fake"
expected_command_list = [
- ['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', # noqa E501
- '--cluster', fake_cluster, 'auth', 'del', fake_name],
+ ['ceph', '--cluster', fake_cluster, 'auth', 'del', fake_name],
]
result = ceph_key.delete_key(fake_cluster, fake_name)
assert result == expected_command_list
fake_name = "client.fake"
fake_containerized = "docker exec -ti ceph-mon"
expected_command_list = [
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', # noqa E501
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph',
'--cluster', fake_cluster, 'auth', 'del', fake_name],
]
result = ceph_key.delete_key(
def test_info_key_non_container(self):
fake_cluster = "fake"
fake_name = "client.fake"
- fake_user = "fake-user"
- fake_key = "/tmp/my-key"
- fake_output_format = "json"
expected_command_list = [
- ['ceph', '-n', "fake-user", '-k', "/tmp/my-key", '--cluster', fake_cluster, 'auth', # noqa E501
+ ['ceph', '--cluster', fake_cluster, 'auth',
'get', fake_name, '-f', 'json'],
]
- result = ceph_key.info_key(
- fake_cluster, fake_name, fake_user, fake_key, fake_output_format)
+ result = ceph_key.info_key(fake_cluster, fake_name)
assert result == expected_command_list
def test_info_key_container(self):
fake_cluster = "fake"
fake_name = "client.fake"
- fake_user = "fake-user"
- fake_key = "/tmp/my-key"
fake_containerized = "docker exec -ti ceph-mon"
- fake_output_format = "json"
expected_command_list = [
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '-n', "fake-user", '-k', "/tmp/my-key", '--cluster', # noqa E501
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster',
fake_cluster, 'auth', 'get', fake_name, '-f', 'json'],
]
- result = ceph_key.info_key(
- fake_cluster, fake_name, fake_user, fake_key, fake_output_format, fake_containerized) # noqa E501
+ result = ceph_key.info_key(fake_cluster, fake_name, fake_containerized)
assert result == expected_command_list
def test_list_key_non_container(self):
fake_cluster = "fake"
- fake_user = "fake-user"
- fake_key = "/tmp/my-key"
- expected_command_list = [
- ['ceph', '-n', "fake-user", '-k', "/tmp/my-key",
- '--cluster', fake_cluster, 'auth', 'ls', '-f', 'json'],
- ]
- result = ceph_key.list_keys(fake_cluster, fake_user, fake_key)
- assert result == expected_command_list
-
- def test_list_key_non_container_with_mon_key(self):
- fake_hostname = "mon01"
- fake_cluster = "fake"
- fake_user = "mon."
- fake_key = os.path.join("/var/lib/ceph/mon/" + fake_cluster + "-" + fake_hostname + "/keyring") # noqa E501
- expected_command_list = [
- ['ceph', '-n', "mon.", '-k', "/var/lib/ceph/mon/fake-mon01/keyring", # noqa E501
- '--cluster', fake_cluster, 'auth', 'ls', '-f', 'json'],
- ]
- result = ceph_key.list_keys(fake_cluster, fake_user, fake_key)
- assert result == expected_command_list
-
- def test_list_key_container_with_mon_key(self):
- fake_hostname = "mon01"
- fake_cluster = "fake"
- fake_containerized = "docker exec -ti ceph-mon"
- fake_user = "mon."
- fake_key = os.path.join("/var/lib/ceph/mon/" + fake_cluster + "-" + fake_hostname + "/keyring") # noqa E501
expected_command_list = [
- ['docker', 'exec', '-ti', 'ceph-mon','ceph', '-n', "mon.", '-k', "/var/lib/ceph/mon/fake-mon01/keyring", # noqa E501
- '--cluster', fake_cluster, 'auth', 'ls', '-f', 'json'],
+ ['ceph', '--cluster', fake_cluster, 'auth', 'ls', '-f', 'json'],
]
- result = ceph_key.list_keys(fake_cluster, fake_user, fake_key, fake_containerized) # noqa E501
+ result = ceph_key.list_keys(fake_cluster)
assert result == expected_command_list
def test_list_key_container(self):
fake_cluster = "fake"
fake_containerized = "docker exec -ti ceph-mon"
- fake_user = "fake-user"
- fake_key = "/tmp/my-key"
expected_command_list = [
- ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '-n', "fake-user", '-k', "/tmp/my-key", '--cluster', # noqa E501
+ ['docker', 'exec', '-ti', 'ceph-mon', 'ceph', '--cluster',
fake_cluster, 'auth', 'ls', '-f', 'json'],
]
- result = ceph_key.list_keys(
- fake_cluster, fake_user, fake_key, fake_containerized)
+ result = ceph_key.list_keys(fake_cluster, fake_containerized)
assert result == expected_command_list
-
- def test_lookup_ceph_initial_entities(self):
-
- # fake_module = MagicMock()
- fake_ceph_dict = {"auth_dump":[{"entity":"osd.0","key":"AQAJkMhbszeBBBAA4/V1tDFXGlft1GnHJS5wWg==","caps":{"mgr":"allow profile osd","mon":"allow profile osd","osd":"allow *"}},{"entity":"osd.1","key":"AQAjkMhbshueAhAAjZec50aBgd1NObLz57SQvg==","caps":{"mgr":"allow profile osd","mon":"allow profile osd","osd":"allow *"}},{"entity":"client.admin","key":"AQDZjshbrJv6EhAAY9v6LzLYNDpPdlC3HD5KHA==","auid":0,"caps":{"mds":"allow","mgr":"allow *","mon":"allow *","osd":"allow *"}},{"entity":"client.bootstrap-mds","key":"AQDojshbc4QCHhAA1ZTrkt9dbSZRVU2GzI6U4A==","caps":{"mon":"allow profile bootstrap-mds"}},{"entity":"client.bootstrap-osd","key":"AQDjjshbYW+uGxAAyHcPCXXmVoL8VsTBI8z1Ng==","caps":{"mon":"allow profile bootstrap-osd"}},{"entity":"client.bootstrap-rbd","key":"AQDyjshb522eIhAAtAz6nUPMOdG4H9u0NgpXhA==","caps":{"mon":"allow profile bootstrap-rbd"}},{"entity":"client.bootstrap-rgw","key":"AQDtjshbDl8oIBAAq1SfSYQKDR49hJNWJVwDQw==","caps":{"mon":"allow profile bootstrap-rgw"}},{"entity":"mgr.mon0","key":"AQA0j8hbgGapORAAoDkyAvXVkM5ej4wNn4cwTQ==","caps":{"mds":"allow *","mon":"allow profile mgr","osd":"allow *"}}]} # noqa E501
- fake_ceph_dict_str = json.dumps(fake_ceph_dict) # convert to string
- expected_entity_list = ['client.admin', 'client.bootstrap-mds', 'client.bootstrap-osd', 'client.bootstrap-rbd', 'client.bootstrap-rgw'] # noqa E501
- result = ceph_key.lookup_ceph_initial_entities(fake_ceph_dict_str)
- assert result == expected_entity_list
-
- def test_build_key_path_admin(self):
- fake_cluster = "fake"
- entity = "client.admin"
- expected_result = "/etc/ceph/fake.client.admin.keyring"
- result = ceph_key.build_key_path(fake_cluster, entity)
- assert result == expected_result
-
- def test_build_key_path_bootstrap_osd(self):
- fake_cluster = "fake"
- entity = "bootstrap-osd"
- expected_result = "/var/lib/ceph/bootstrap-osd/fake.keyring"
- result = ceph_key.build_key_path(fake_cluster, entity)
- assert result == expected_result