else:
str_type = str
-cdef int AT_SYMLINK_NOFOLLOW = 0x100
-cdef int CEPH_STATX_BASIC_STATS = 0x7ffu
+AT_NO_ATTR_SYNC = 0x4000
+AT_SYMLINK_NOFOLLOW = 0x100
+cdef int AT_SYMLINK_NOFOLLOW_CDEF = AT_SYMLINK_NOFOLLOW
+CEPH_STATX_BASIC_STATS = 0x7ff
+cdef int CEPH_STATX_BASIC_STATS_CDEF = CEPH_STATX_BASIC_STATS
+CEPH_STATX_MODE = 0x1
+CEPH_STATX_NLINK = 0x2
+CEPH_STATX_UID = 0x4
+CEPH_STATX_GID = 0x8
+CEPH_STATX_RDEV = 0x10
+CEPH_STATX_ATIME = 0x20
+CEPH_STATX_MTIME = 0x40
+CEPH_STATX_CTIME = 0x80
+CEPH_STATX_INO = 0x100
+CEPH_STATX_SIZE = 0x200
+CEPH_STATX_BLOCKS = 0x400
+CEPH_STATX_BTIME = 0x800
+CEPH_STATX_VERSION = 0x1000
cdef extern from "Python.h":
# These are in cpython/string.pxd, but use "object" types instead of
if follow_symlink:
with nogil:
ret = ceph_statx(self.cluster, _path, &stx,
- CEPH_STATX_BASIC_STATS, 0)
+ CEPH_STATX_BASIC_STATS_CDEF, 0)
else:
with nogil:
ret = ceph_statx(self.cluster, _path, &stx,
- CEPH_STATX_BASIC_STATS, AT_SYMLINK_NOFOLLOW)
+ CEPH_STATX_BASIC_STATS_CDEF, AT_SYMLINK_NOFOLLOW_CDEF)
if ret < 0:
raise make_ex(ret, "error in stat: {}".format(path.decode('utf-8')))
with nogil:
ret = ceph_fstatx(self.cluster, _fd, &stx,
- CEPH_STATX_BASIC_STATS, 0)
+ CEPH_STATX_BASIC_STATS_CDEF, 0)
if ret < 0:
raise make_ex(ret, "error in fsat")
return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
+
+ def statx(self, path, mask, flag):
+ """
+ Get a file's extended statistics and attributes.
+
+ :param path: the file or directory to get the statistics of.
+ :param mask: want bitfield of CEPH_STATX_* flags showing designed attributes.
+ :param flag: bitfield that can be used to set AT_* modifier flags (only AT_NO_ATTR_SYNC and AT_SYMLINK_NOFOLLOW)
+ """
+
+ self.require_state("mounted")
+ path = cstr(path, 'path')
+ if not isinstance(mask, int):
+ raise TypeError('flag must be a int')
+ if not isinstance(flag, int):
+ raise TypeError('flag must be a int')
+
+ cdef:
+ char* _path = path
+ statx stx
+ int _mask = mask
+ int _flag = flag
+ dict_result = dict()
+
+ with nogil:
+ ret = ceph_statx(self.cluster, _path, &stx, _mask, _flag)
+ if ret < 0:
+ raise make_ex(ret, "error in stat: %s" % path)
+
+ if (_mask & CEPH_STATX_MODE):
+ dict_result["mode"] = stx.stx_mode
+ if (_mask & CEPH_STATX_NLINK):
+ dict_result["nlink"] = stx.stx_nlink
+ if (_mask & CEPH_STATX_UID):
+ dict_result["uid"] = stx.stx_uid
+ if (_mask & CEPH_STATX_GID):
+ dict_result["gid"] = stx.stx_gid
+ if (_mask & CEPH_STATX_RDEV):
+ dict_result["rdev"] = stx.stx_rdev
+ if (_mask & CEPH_STATX_ATIME):
+ dict_result["atime"] = datetime.fromtimestamp(stx.stx_atime.tv_sec)
+ if (_mask & CEPH_STATX_MTIME):
+ dict_result["mtime"] = datetime.fromtimestamp(stx.stx_mtime.tv_sec)
+ if (_mask & CEPH_STATX_CTIME):
+ dict_result["ctime"] = datetime.fromtimestamp(stx.stx_ctime.tv_sec)
+ if (_mask & CEPH_STATX_INO):
+ dict_result["ino"] = stx.stx_ino
+ if (_mask & CEPH_STATX_SIZE):
+ dict_result["size"] = stx.stx_size
+ if (_mask & CEPH_STATX_BLOCKS):
+ dict_result["blocks"] = stx.stx_blocks
+ if (_mask & CEPH_STATX_BTIME):
+ dict_result["btime"] = datetime.fromtimestamp(stx.stx_btime.tv_sec)
+ if (_mask & CEPH_STATX_VERSION):
+ dict_result["version"] = stx.stx_version
+
+ return dict_result
def symlink(self, existing, newname):
"""