return None
+class Secrets(object):
+
+ def __init__(self):
+ secret, stderr, ret = command(['ceph-authtool', '--gen-print-key'])
+ LOG.debug("stderr " + stderr)
+ assert ret == 0
+ self.keys = {
+ 'cephx_secret': secret.strip(),
+ }
+
+ def write_osd_keyring(self, keyring, osd_id):
+ command_check_call(
+ [
+ 'ceph-authtool', keyring,
+ '--create-keyring',
+ '--name', 'osd.' + str(osd_id),
+ '--add-key', self.keys['cephx_secret'],
+ ])
+ path_set_context(keyring)
+
+ def get_json(self):
+ return bytearray(json.dumps(self.keys), 'ascii')
+
+
+class LockboxSecrets(Secrets):
+
+ def __init__(self, args):
+ super(LockboxSecrets, self).__init__()
+
+ key_size = CryptHelpers.get_dmcrypt_keysize(args)
+ key = open('/dev/urandom', 'rb').read(key_size / 8)
+ base64_key = base64.b64encode(key).decode('ascii')
+
+ secret, stderr, ret = command(['ceph-authtool', '--gen-print-key'])
+ LOG.debug("stderr " + stderr)
+ assert ret == 0
+
+ self.keys.update({
+ 'dmcrypt_key': base64.b64encode(key),
+ 'cephx_lockbox_secret': secret.strip(),
+ })
+
+ def write_lockbox_keyring(self, path, osd_uuid):
+ keyring = os.path.join(path, 'keyring')
+ command_check_call(
+ [
+ 'ceph-authtool', keyring,
+ '--create-keyring',
+ '--name', 'client.osd-lockbox.' + osd_uuid,
+ '--add-key', self.keys['cephx_lockbox_secret'],
+ ])
+ path_set_context(keyring)
+
+
class Lockbox(object):
def __init__(self, args):
set_type=set_type):
data = main.PrepareData(args)
assert data.args.cluster_uuid == cluster_uuid
+
+
+class TestSecrets(Base):
+
+ @mock.patch('ceph_disk.main.command')
+ def test_secrets(self, m_command):
+ key = "KEY"
+ m_command.side_effect = lambda cmd: (key + "\n", '', 0)
+ s = main.Secrets()
+ assert {"cephx_secret": key} == s.keys
+ assert '{"cephx_secret": "' + key + '"}' == s.get_json()
+
+ @mock.patch('ceph_disk.main.open')
+ @mock.patch('ceph_disk.main.CryptHelpers.get_dmcrypt_keysize')
+ @mock.patch('ceph_disk.main.command')
+ def test_lockbox_secrets(self,
+ m_command,
+ m_get_dmcrypt_keysize,
+ m_open):
+ key = "KEY"
+ m_command.side_effect = lambda cmd: (key + "\n", '', 0)
+ m_get_dmcrypt_keysize.side_effect = lambda args: 32
+
+ class File:
+ def read(self, size):
+ return b'O' * size
+
+ m_open.side_effect = lambda path, mode: File()
+ s = main.LockboxSecrets({})
+ assert {
+ "dmcrypt_key": 'T09PTw==',
+ "cephx_secret": key,
+ "cephx_lockbox_secret": key,
+ } == s.keys