_RBD_POOL_STAT_OPTION_TRASH_MAX_PROVISIONED_BYTES "RBD_POOL_STAT_OPTION_TRASH_MAX_PROVISIONED_BYTES"
_RBD_POOL_STAT_OPTION_TRASH_SNAPSHOTS "RBD_POOL_STAT_OPTION_TRASH_SNAPSHOTS"
+ ctypedef enum rbd_encryption_format_t:
+ _RBD_ENCRYPTION_FORMAT_LUKS1 "RBD_ENCRYPTION_FORMAT_LUKS1"
+ _RBD_ENCRYPTION_FORMAT_LUKS2 "RBD_ENCRYPTION_FORMAT_LUKS2"
+
+ ctypedef enum rbd_encryption_algorithm_t:
+ _RBD_ENCRYPTION_ALGORITHM_AES128 "RBD_ENCRYPTION_ALGORITHM_AES128"
+ _RBD_ENCRYPTION_ALGORITHM_AES256 "RBD_ENCRYPTION_ALGORITHM_AES256"
+
+ ctypedef struct rbd_encryption_luks1_format_options_t:
+ rbd_encryption_algorithm_t alg
+ const char* passphrase
+ size_t passphrase_size
+
+ ctypedef struct rbd_encryption_luks2_format_options_t:
+ rbd_encryption_algorithm_t alg
+ const char* passphrase
+ size_t passphrase_size
+
+ ctypedef void* rbd_encryption_options_t
+
ctypedef void (*rbd_callback_t)(rbd_completion_t cb, void *arg)
void rbd_version(int *major, int *minor, int *extra)
int rbd_pool_stats_option_add_uint64(rbd_pool_stats_t stats,
int stat_option, uint64_t* stat_val)
int rbd_pool_stats_get(rados_ioctx_t io, rbd_pool_stats_t stats)
+
+ int rbd_encryption_format(rbd_image_t image,
+ rbd_encryption_format_t format,
+ rbd_encryption_options_t opts, size_t opts_size)
+ int rbd_encryption_load(rbd_image_t image,
+ rbd_encryption_format_t format,
+ rbd_encryption_options_t opts, size_t opts_size)
ctypedef void (*rbd_callback_t)(rbd_completion_t cb, void *arg)
+ ctypedef enum rbd_encryption_format_t:
+ _RBD_ENCRYPTION_FORMAT_LUKS1 "RBD_ENCRYPTION_FORMAT_LUKS1"
+ _RBD_ENCRYPTION_FORMAT_LUKS2 "RBD_ENCRYPTION_FORMAT_LUKS2"
+
+ ctypedef enum rbd_encryption_algorithm_t:
+ _RBD_ENCRYPTION_ALGORITHM_AES128 "RBD_ENCRYPTION_ALGORITHM_AES128"
+ _RBD_ENCRYPTION_ALGORITHM_AES256 "RBD_ENCRYPTION_ALGORITHM_AES256"
+
+ ctypedef struct rbd_encryption_luks1_format_options_t:
+ rbd_encryption_algorithm_t alg
+ const char* passphrase
+ size_t passphrase_size
+
+ ctypedef struct rbd_encryption_luks2_format_options_t:
+ rbd_encryption_algorithm_t alg
+ const char* passphrase
+ size_t passphrase_size
+
+ ctypedef void* rbd_encryption_options_t
+
void rbd_version(int *major, int *minor, int *extra):
pass
void rbd_image_spec_list_cleanup(rbd_image_spec_t *image, size_t num_images):
pass
int rbd_pool_stats_get(rados_ioctx_t io, rbd_pool_stats_t stats):
pass
+ int rbd_encryption_format(rbd_image_t image,
+ rbd_encryption_format_t format,
+ rbd_encryption_options_t opts, size_t opts_size):
+ pass
+ int rbd_encryption_load(rbd_image_t image,
+ rbd_encryption_format_t format,
+ rbd_encryption_options_t opts, size_t opts_size):
+ pass
except ImportError:
from collections import Iterable
from datetime import datetime
+import errno
from itertools import chain
import time
RBD_SNAP_REMOVE_FLATTEN = _RBD_SNAP_REMOVE_FLATTEN
RBD_SNAP_REMOVE_FORCE = _RBD_SNAP_REMOVE_FORCE
+RBD_ENCRYPTION_FORMAT_LUKS1 = _RBD_ENCRYPTION_FORMAT_LUKS1
+RBD_ENCRYPTION_FORMAT_LUKS2 = _RBD_ENCRYPTION_FORMAT_LUKS2
+RBD_ENCRYPTION_ALGORITHM_AES128 = _RBD_ENCRYPTION_ALGORITHM_AES128
+RBD_ENCRYPTION_ALGORITHM_AES256 = _RBD_ENCRYPTION_ALGORITHM_AES256
+
RBD_WRITE_ZEROES_FLAG_THICK_PROVISION = _RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
class Error(Exception):
&sn, sizeof(rbd_snap_mirror_namespace_t))
return info
+ @requires_not_closed
+ def encryption_format(self, format, passphrase,
+ cipher_alg=RBD_ENCRYPTION_ALGORITHM_AES256):
+ passphrase = cstr(passphrase, "passphrase")
+ cdef rbd_encryption_format_t _format = format
+ cdef rbd_encryption_luks1_format_options_t _luks1_opts
+ cdef rbd_encryption_luks1_format_options_t _luks2_opts
+ cdef char* _passphrase = passphrase
+
+ if (format == RBD_ENCRYPTION_FORMAT_LUKS1):
+ _luks1_opts.alg = cipher_alg
+ _luks1_opts.passphrase = _passphrase
+ _luks1_opts.passphrase_size = len(passphrase)
+ with nogil:
+ ret = rbd_encryption_format(self.image, _format, &_luks1_opts,
+ sizeof(_luks1_opts))
+ if ret != 0:
+ raise make_ex(
+ ret,
+ 'error formatting image %s with format luks1' % self.name)
+ elif (format == RBD_ENCRYPTION_FORMAT_LUKS2):
+ _luks2_opts.alg = cipher_alg
+ _luks2_opts.passphrase = _passphrase
+ _luks2_opts.passphrase_size = len(passphrase)
+ with nogil:
+ ret = rbd_encryption_format(self.image, _format, &_luks2_opts,
+ sizeof(_luks2_opts))
+ if ret != 0:
+ raise make_ex(
+ ret,
+ 'error formatting image %s with format luks2' % self.name)
+ else:
+ raise make_ex(-errno.ENOTSUP, 'Unsupported encryption format')
+
+ @requires_not_closed
+ def encryption_load(self, format, passphrase):
+ passphrase = cstr(passphrase, "passphrase")
+ cdef rbd_encryption_format_t _format = format
+ cdef rbd_encryption_luks1_format_options_t _luks1_opts
+ cdef rbd_encryption_luks1_format_options_t _luks2_opts
+ cdef char* _passphrase = passphrase
+
+ if (format == RBD_ENCRYPTION_FORMAT_LUKS1):
+ _luks1_opts.passphrase = _passphrase
+ _luks1_opts.passphrase_size = len(passphrase)
+ with nogil:
+ ret = rbd_encryption_load(self.image, _format, &_luks1_opts,
+ sizeof(_luks1_opts))
+ if ret != 0:
+ raise make_ex(
+ ret,
+ ('error loading encryption on image %s '
+ 'with format luks1') % self.name)
+ elif (format == RBD_ENCRYPTION_FORMAT_LUKS2):
+ _luks2_opts.passphrase = _passphrase
+ _luks2_opts.passphrase_size = len(passphrase)
+ with nogil:
+ ret = rbd_encryption_load(self.image, _format, &_luks2_opts,
+ sizeof(_luks2_opts))
+ if ret != 0:
+ raise make_ex(
+ ret,
+ ('error loading encryption on image %s '
+ 'with format luks2') % self.name)
+ else:
+ raise make_ex(-errno.ENOTSUP, 'Unsupported encryption format')
+
cdef class ImageIterator(object):
"""
import json
import socket
import os
+import platform
import time
import sys
RBD_SNAP_MIRROR_STATE_PRIMARY_DEMOTED,
RBD_SNAP_CREATE_SKIP_QUIESCE,
RBD_SNAP_CREATE_IGNORE_QUIESCE_ERROR,
- RBD_WRITE_ZEROES_FLAG_THICK_PROVISION)
+ RBD_WRITE_ZEROES_FLAG_THICK_PROVISION,
+ RBD_ENCRYPTION_FORMAT_LUKS1, RBD_ENCRYPTION_FORMAT_LUKS2)
rados = None
ioctx = None
return functools.wraps(fn)(_require_features)
return wrapper
+def require_linux():
+ def wrapper(fn):
+ def _require_linux(*args, **kwargs):
+ if platform.system() != "Linux":
+ raise SkipTest
+ return fn(*args, **kwargs)
+ return functools.wraps(fn)(_require_linux)
+ return wrapper
+
def blocklist_features(blocklisted_features):
def wrapper(fn):
def _blocklist_features(*args, **kwargs):
assert_raises(InvalidArgument, self.image.sparsify, 16)
self.image.sparsify(4096)
+ @require_linux()
+ @blocklist_features([RBD_FEATURE_JOURNALING])
+ def test_encryption_luks1(self):
+ self.image.write(b'hello world', 0)
+ self.image.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS1, "password")
+ read_bytes = self.image.read(0, 11)
+ assert_not_equal(b'hello world', read_bytes)
+ self.image.encryption_load(RBD_ENCRYPTION_FORMAT_LUKS1, "password")
+ assert_not_equal(read_bytes, self.image.read(0, 11))
+
+ @require_linux()
+ @blocklist_features([RBD_FEATURE_JOURNALING])
+ def test_encryption_luks2(self):
+ self.image.resize(256 << 20)
+ self.image.write(b'hello world', 0)
+ self.image.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS2, "password")
+ read_bytes = self.image.read(0, 11)
+ assert_not_equal(b'hello world', read_bytes)
+ self.image.encryption_load(RBD_ENCRYPTION_FORMAT_LUKS2, "password")
+ assert_not_equal(read_bytes, self.image.read(0, 11))
+
class TestImageId(object):
def setUp(self):