enctag = attrs.get("enctag", None)
if enctag is not None:
fs_enctag = CephFSVolumeEncryptionTag(self.fs, path)
- fs_enctag.set_tag(enctag)
+ try:
+ fs_enctag.set_tag(enctag)
+ except EncryptionTagException:
+ raise VolumeException(-errno.EINVAL,
+ "invalid enctag specified: length '{0} > {1}'".format(len(enctag), fs_enctag.ENCTAG_MAX))
+
fscrypt_auth = attrs.get("fscrypt_auth")
if fscrypt_auth is not None:
class CephFSVolumeEncryptionTag:
+ ENCTAG_MAX = 255
+
def __init__(self, fs, path: str) -> None:
self.fs = fs
self.path = path
def _handle_cephfs_error(self, e: Exception, action: str) -> None:
if isinstance(e, ValueError):
- raise EncryptionTagException(errno.EINVAL, f"Invalid encryption tag specified: {e}") from e
+ raise EncryptionTagException(-errno.EINVAL, f"Invalid encryption tag specified: {e}") from e
elif isinstance(e, OSError):
log.error(f"Error {action} encryption tag: {e}")
raise EncryptionTagException(-e.errno, e.strerror) from e
else:
log.error(f"Unexpected error {action} encryption tag: {e}")
- raise EncryptionTagException(errno.EIO, "Unexpected error") from e
+ raise EncryptionTagException(-errno.EIO, "Unexpected error") from e
def get_tag(self) -> Optional[str]:
try:
def set_tag(self, enc_tag: str):
try:
+ if len(enc_tag) > self.ENCTAG_MAX:
+ raise ValueError(f"length '{len(enc_tag)} > {self.ENCTAG_MAX}'")
+
self.fs.setxattr(self.path, XATTR_SUBVOLUME_ENCTAG_NAME, enc_tag.encode('utf-8'), 0)
log.info(f"Encryption Tag '{enc_tag}' set on {self.path}.")
except Exception as e: