from ceph_volume.util import encryption
-from mock.mock import patch
+from mock.mock import patch, Mock
import base64
+import pytest
+
+
+class TestNoWorkqueue:
+ def setup_method(self):
+ encryption.conf.dmcrypt_no_workqueue = None
+
+ @patch('ceph_volume.util.encryption.process.call',
+ Mock(return_value=(['cryptsetup 2.7.2 flags: UDEV BLKID KEYRING' \
+ 'FIPS KERNEL_CAPI PWQUALITY '], [''], 0)))
+ def test_set_dmcrypt_no_workqueue_true(self):
+ encryption.set_dmcrypt_no_workqueue()
+ assert encryption.conf.dmcrypt_no_workqueue
+
+ @patch('ceph_volume.util.encryption.process.call',
+ Mock(return_value=(['cryptsetup 2.0.0'], [''], 0)))
+ def test_set_dmcrypt_no_workqueue_false(self):
+ encryption.set_dmcrypt_no_workqueue()
+ assert encryption.conf.dmcrypt_no_workqueue is None
+
+ @patch('ceph_volume.util.encryption.process.call',
+ Mock(return_value=([''], ['fake error'], 1)))
+ def test_set_dmcrypt_no_workqueue_cryptsetup_version_fails(self):
+ with pytest.raises(RuntimeError):
+ encryption.set_dmcrypt_no_workqueue()
+
+ @patch('ceph_volume.util.encryption.process.call',
+ Mock(return_value=(['unexpected output'], [''], 0)))
+ def test_set_dmcrypt_no_workqueue_pattern_not_found(self):
+ with pytest.raises(RuntimeError):
+ encryption.set_dmcrypt_no_workqueue()
+
+ @patch('ceph_volume.util.encryption.process.call',
+ Mock(return_value=([], [''], 0)))
+ def test_set_dmcrypt_no_workqueue_index_error(self):
+ with pytest.raises(RuntimeError):
+ encryption.set_dmcrypt_no_workqueue()
+
class TestGetKeySize(object):
def test_get_size_from_conf_default(self, conf_ceph_stub):
# This regex extracts the version number from
# the `cryptsetup --version` output
- pattern: str = r'\b\d+(\.\d+)*\b'
+ pattern: str = r'(\d+\.?)+'
if rc:
raise RuntimeError(f"Can't retrieve cryptsetup version: {err}")
try:
- cryptsetup_version = re.match(pattern, out[0])
+ cryptsetup_version = re.search(pattern, out[0])
if cryptsetup_version is None:
_output: str = "\n".join(out)