self.shutdown()
return False
- def version(self):
+ def version(self) -> Version:
"""
Get the version number of the ``librados`` C library.
if ret != 0:
raise make_ex(ret, "error calling conf_read_file")
- def conf_parse_argv(self, args):
+ def conf_parse_argv(self, args: Sequence[str]):
"""
Parse known arguments from args, and remove; returned
args contain only those unknown to ceph
rados_buffer_free(outstr)
return decode_cstr(my_outstr)
- def connect(self, timeout=0):
+ def connect(self, timeout: int = 0):
"""
Connect to the cluster. Use shutdown() to release resources.
"""
raise LogicError("Ioctx.append(%s): rados_append \
returned %d, but should return zero on success." % (self.name, ret))
- def read(self, key: str, length: int = 8192, offset: int = 0):
+ def read(self, key: str, length: int = 8192, offset: int = 0) -> bytes:
"""
Read data from an object synchronously
assert ret < -MAX_ERRNO or ret == 0, "Ioctx.cmpext(%s): failed to compare %s" % (self.name, key)
return ret
- def stat(self, key: str):
+ def stat(self, key: str) -> Tuple[int, time.struct_time]:
"""
Get object stats (size/mtime)
raise make_ex(ret, "Failed to stat %r" % key)
return psize, time.localtime(pmtime)
- def get_xattr(self, key: str, xattr_name: str):
+ def get_xattr(self, key: str, xattr_name: str) -> bytes:
"""
Get the value of an extended attribute on an object.
finally:
free(ret_buf)
- def get_xattrs(self, oid: str):
+ def get_xattrs(self, oid: str) -> XattrIterator:
"""
Start iterating over xattrs on an object.
self.require_ioctx_open()
return XattrIterator(self, oid)
- def set_xattr(self, key: str, xattr_name: str, xattr_value: bytes):
+ def set_xattr(self, key: str, xattr_name: str, xattr_value: bytes) -> bool:
"""
Set an extended attribute on an object.
raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
return True
- def rm_xattr(self, key: str, xattr_name: str):
+ def rm_xattr(self, key: str, xattr_name: str) -> bool:
"""
Removes an extended attribute on from an object.
return Watch(self, obj, callback, error_callback, timeout)
- def list_objects(self):
+ def list_objects(self) -> ObjectIterator:
"""
Get ObjectIterator on rados.Ioctx object.
@set_object_locator
@set_object_namespace
- def stat(self):
+ def stat(self) -> Tuple[int, time.struct_time]:
self.require_object_exists()
return self.ioctx.stat(self.key)
- def seek(self, position):
+ def seek(self, position: int):
self.require_object_exists()
self.offset = position
@set_object_locator
@set_object_namespace
- def get_xattr(self, xattr_name):
+ def get_xattr(self, xattr_name: str) -> bytes:
self.require_object_exists()
return self.ioctx.get_xattr(self.key, xattr_name)
@set_object_locator
@set_object_namespace
- def get_xattrs(self):
+ def get_xattrs(self) -> XattrIterator:
self.require_object_exists()
return self.ioctx.get_xattrs(self.key)
@set_object_locator
@set_object_namespace
- def set_xattr(self, xattr_name, xattr_value):
+ def set_xattr(self, xattr_name: str, xattr_value: bytes) -> bool:
self.require_object_exists()
return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
@set_object_locator
@set_object_namespace
- def rm_xattr(self, xattr_name):
+ def rm_xattr(self, xattr_name: str) -> bool:
self.require_object_exists()
return self.ioctx.rm_xattr(self.key, xattr_name)