import errno
import os
import time
+from typing import Any, Dict, Optional
AT_NO_ATTR_SYNC = 0x4000
AT_SYMLINK_NOFOLLOW = 0x100
ceph_seekdir(self.lib.cluster, self.handle, _offset)
-def cstr(val, name, encoding="utf-8", opt=False):
+def cstr(val, name, encoding="utf-8", opt=False) -> bytes:
"""
Create a byte string from a Python string
:param str name: Name of the string parameter, for exceptions
:param str encoding: Encoding to use
:param bool opt: If True, None is allowed
- :rtype: bytes
:raises: :class:`InvalidArgument`
"""
if opt and val is None:
return [cstr(s, name) for s in list_str]
-def decode_cstr(val, encoding="utf-8"):
+def decode_cstr(val, encoding="utf-8") -> Optional[str]:
"""
Decode a byte string into a Python string.
:param bytes val: byte string
- :rtype: str or None
"""
if val is None:
return None
if ret < 0:
raise make_ex(ret, "fallocate failed")
- def getcwd(self):
+ def getcwd(self) -> bytes:
"""
Get the current working directory.
- :rtype the path to the current working directory
+ :returns: the path to the current working directory
"""
self.require_state("mounted")
with nogil:
"""
Change the current working directory.
- :param path the path to the working directory to change into.
+ :param path: the path to the working directory to change into.
"""
self.require_state("mounted")
if ret < 0:
raise make_ex(ret, "chdir failed")
- def opendir(self, path):
+ def opendir(self, path) -> DirResult:
"""
Open the given directory.
:param path: the path name of the directory to open. Must be either an absolute path
or a path relative to the current working directory.
- :rtype handle: the open directory stream handle
+ :returns: the open directory stream handle
"""
self.require_state("mounted")
d.handle = handle
return d
- def readdir(self, DirResult handle):
+ def readdir(self, DirResult handle) -> Optional[DirEntry]:
"""
Get the next entry in an open directory.
:param handle: the open directory stream handle
- :rtype dentry: the next directory entry or None if at the end of the
+ :returns: the next directory entry or None if at the end of the
directory (or the directory is empty. This pointer
should not be freed by the caller, and is only safe to
access between return and the next call to readdir or
if ret < 0:
raise make_ex(ret, "error in mkdir {}".format(path.decode('utf-8')))
- def mksnap(self, path, name, mode, metadata={}):
+ def mksnap(self, path, name, mode, metadata={}) -> int:
"""
Create a snapshot.
:raises: :class: `TypeError`
:raises: :class: `Error`
- :returns: int: 0 on success
+ :returns: 0 on success
"""
self.require_state("mounted")
raise make_ex(ret, "mksnap error")
return 0
- def rmsnap(self, path, name):
+ def rmsnap(self, path, name) -> int:
"""
Remove a snapshot.
:param name: snapshot name
:raises: :class: `Error`
- :returns: int: 0 on success
+ :returns: 0 on success
"""
self.require_state("mounted")
path = cstr(path, 'path')
raise make_ex(ret, "rmsnap error")
return 0
- def snap_info(self, path):
+ def snap_info(self, path) -> Dict[str, Any]:
"""
Fetch sapshot info
:param path: snapshot path
:raises: :class: `Error`
- :returns: dict: snapshot metadata
+ :returns: snapshot metadata
"""
self.require_state("mounted")
path = cstr(path, 'path')
ceph_free_snap_info_buffer(&info)
return {'id': info.id, 'metadata': md}
- def chmod(self, path, mode) :
+ def chmod(self, path, mode) -> None:
"""
Change directory mode.
def fchmod(self, fd, mode) :
"""
Change file mode based on fd.
+
:param fd: the file descriptor of the file to change file mode
:param mode: the permissions to be set.
"""
def lchown(self, path, uid, gid):
"""
Change ownership of a symbolic link
+
:param path: the path of the symbolic link to change.
:param uid: the uid to set
:param gid: the gid to set
def fchown(self, fd, uid, gid):
"""
Change file ownership
+
:param fd: the file descriptor of the file to change ownership
:param uid: the uid to set
:param gid: the gid to set
if ret < 0:
raise make_ex(ret, "error in link")
- def readlink(self, path, size):
+ def readlink(self, path, size) -> bytes:
"""
Read a symbolic link.
:param path: the path to the symlink to read
:param size: the length of the buffer
- :rtype buf: buffer to hold the path of the file that the symlink points to.
+ :returns: buffer to hold the path of the file that the symlink points to.
"""
self.require_state("mounted")
path = cstr(path, 'path')
def mds_command(self, mds_spec, args, input_data):
"""
- :return 3-tuple of output status int, output status string, output data
+ :returns: 3-tuple of output status int, output status string, output data
"""
mds_spec = cstr(mds_spec, 'mds_spec')
args = cstr(args, 'args')
"""
Get the file replication information from an open file descriptor.
- :param fd : the open file descriptor referring to the file to get
- the replication information of.
+ :param fd: the open file descriptor referring to the file to get
+ the replication information of.
"""
self.require_state("mounted")
if not isinstance(fd, int):