]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
python-common/ceph: Fix lint and various python errors
authorChristopher Hoffman <choffman@redhat.com>
Thu, 23 Oct 2025 15:16:25 +0000 (15:16 +0000)
committerChristopher Hoffman <choffman@redhat.com>
Wed, 5 Nov 2025 13:59:36 +0000 (13:59 +0000)
Fix various python bindings and linting issues that
arose from libcephfs fscrypt testing in pipeline.

Signed-off-by: Christopher Hoffman <choffman@redhat.com>
src/pybind/cephfs/mock_cephfs.pxi
src/python-common/ceph/fs/enctag.py

index fcda27bbc357c14779284d57fb88998ffbbbae3e..42d39d92428cb0aa825ee88823f79184bafc8eec 100644 (file)
@@ -147,6 +147,8 @@ cdef nogil:
         pass
     int ceph_llistxattr(ceph_mount_info *cmount, const char *path, char *list, size_t size):
         pass
+    int ceph_fcopyfile(ceph_mount_info *cmount, const char *spath, const char *dpath, mode_t mode):
+        pass
     int ceph_write(ceph_mount_info *cmount, int fd, const char *buf, int64_t size, int64_t offset):
         pass
     int ceph_pwritev(ceph_mount_info *cmount, int fd, iovec *iov, int iovcnt, int64_t offset):
index a3633dbed43ac04659067dcd5411c9a6822a3831..544487f6c2eab06431c8f2b8f2db6ea6893c7164 100644 (file)
@@ -1,8 +1,8 @@
 """
 Module: CephFS Volume Encryption Tag
 
-This module provides the `CephFSVolumeEncryptionTag` class, which is designed to manage encryption tags
-of subvolumes within a CephFS filesystem. The encryption tag mechanism allows
+This module provides the `CephFSVolumeEncryptionTag` class, which is designed to manage encryption
+tags of subvolumes within a CephFS filesystem. The encryption tag mechanism allows
 administrators to tag specific subvolumes with identifiers that indicate encryption information,
 such as a keyid or other itentifier tags.
 
@@ -14,15 +14,26 @@ supported top-level scopes.
 """
 
 import errno
-import enum
 import logging
-from typing import Optional, Tuple
+from typing import Optional, Tuple, Protocol
 
 log = logging.getLogger(__name__)
 
 XATTR_SUBVOLUME_ENCTAG_NAME = 'user.ceph.subvolume.enctag'
 
 
+class FSOperations(Protocol):
+    """Protocol class representing the file system operations earmarking
+    classes will perform.
+    """
+
+    def setxattr(
+        self, path: str, key: str, value: bytes, flags: int
+    ) -> None: ...
+
+    def getxattr(self, path: str, key: str) -> bytes: ...
+
+
 class EncryptionTagException(Exception):
     def __init__(self, error_code: int, error_message: str) -> None:
         self.errno = error_code
@@ -38,37 +49,47 @@ class EncryptionTagException(Exception):
 class CephFSVolumeEncryptionTag:
     ENCTAG_MAX = 255
 
-    def __init__(self, fs, path: str) -> None:
+    def __init__(self, fs: FSOperations, path: str) -> None:
         self.fs = fs
         self.path = path
 
     def _handle_cephfs_error(self, e: Exception, action: str) -> None:
         if isinstance(e, ValueError):
-            raise EncryptionTagException(-errno.EINVAL, f"Invalid encryption tag specified: {e}") from e
+            raise EncryptionTagException(
+                -errno.EINVAL, f"Invalid encryption tag specified: {e}"
+            ) from e
         elif isinstance(e, OSError):
             log.error(f"Error {action} encryption tag: {e}")
             raise EncryptionTagException(-e.errno, e.strerror) from e
         else:
             log.error(f"Unexpected error {action} encryption tag: {e}")
-            raise EncryptionTagException(-errno.EIO, "Unexpected error") from e
+            raise EncryptionTagException(
+                -errno.EIO, "Unexpected error"
+            ) from e
 
     def get_tag(self) -> Optional[str]:
         try:
-            enc_tag_value = (
-                self.fs.getxattr(self.path, XATTR_SUBVOLUME_ENCTAG_NAME)
-                .decode('utf-8')
-            )
+            enc_tag_value = self.fs.getxattr(
+                self.path, XATTR_SUBVOLUME_ENCTAG_NAME
+            ).decode('utf-8')
             return enc_tag_value
         except Exception as e:
             self._handle_cephfs_error(e, "getting")
             return None
 
-    def set_tag(self, enc_tag: str):
+    def set_tag(self, enc_tag: str) -> None:
         try:
             if len(enc_tag) > self.ENCTAG_MAX:
-                raise ValueError(f"length '{len(enc_tag)} > {self.ENCTAG_MAX}'")
-
-            self.fs.setxattr(self.path, XATTR_SUBVOLUME_ENCTAG_NAME, enc_tag.encode('utf-8'), 0)
+                raise ValueError(
+                    f"length '{len(enc_tag)} > {self.ENCTAG_MAX}'"
+                )
+
+            self.fs.setxattr(
+                self.path,
+                XATTR_SUBVOLUME_ENCTAG_NAME,
+                enc_tag.encode('utf-8'),
+                0,
+            )
             log.info(f"Encryption Tag '{enc_tag}' set on {self.path}.")
         except Exception as e:
             self._handle_cephfs_error(e, "setting")