if ret != 0:
raise make_ex(ret, "error calling conf_parse_env")
- def conf_get(self, option: str):
+ def conf_get(self, option: str) -> Optional[str]:
"""
Get the value of a configuration option
raise make_ex(ret, "error connecting to the cluster")
self.state = "connected"
- def get_instance_id(self):
+ def get_instance_id(self) -> int:
"""
Get a global id for current instance
"""
ret = rados_get_instance_id(self.cluster)
return ret;
- def get_cluster_stats(self):
+ def get_cluster_stats(self) -> Dict[str, int]:
"""
Read usage info about the cluster
'kb_avail': stats.kb_avail,
'num_objects': stats.num_objects}
- def pool_exists(self, pool_name: str):
+ def pool_exists(self, pool_name: str) -> bool:
"""
Checks if a given pool exists.
else:
raise make_ex(ret, "error looking up pool '%s'" % pool_name)
- def pool_lookup(self, pool_name: str):
+ def pool_lookup(self, pool_name: str) -> int:
"""
Returns a pool's ID based on its name.
if ret < 0:
raise make_ex(ret, "error creating pool '%s'" % pool_name)
- def get_pool_base_tier(self, pool_id: int):
+ def get_pool_base_tier(self, pool_id: int) -> int:
"""
Get base pool
if ret < 0:
raise make_ex(ret, "error deleting pool '%s'" % pool_name)
- def get_inconsistent_pgs(self, pool_id: int):
+ def get_inconsistent_pgs(self, pool_id: int) -> List[str]:
"""
List inconsistent placement groups in the given pool
finally:
free(pgs)
- def list_pools(self):
+ def list_pools(self) -> List[str]:
"""
Gets a list of pool names.
finally:
free(c_names)
- def get_fsid(self):
+ def get_fsid(self) -> str:
"""
Get the fsid of the cluster as a hexadecimal string.
finally:
free(ret_buf)
- def open_ioctx(self, ioctx_name: str):
+ def open_ioctx(self, ioctx_name: str) -> Ioctx:
"""
Create an io context
finally:
free(_cmd)
- def wait_for_latest_osdmap(self):
+ def wait_for_latest_osdmap(self) -> int:
self.require_state("connected")
with nogil:
ret = rados_wait_for_latest_osdmap(self.cluster)
return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
% (str(self.ioctx), self.name, self.snap_id)
- def get_timestamp(self):
+ def get_timestamp(self) -> float:
"""
Find when a snapshot in the current pool occurred
self.onsafe = onsafe
self.ioctx = ioctx
- def is_safe(self):
+ def is_safe(self) -> bool:
"""
Is an asynchronous operation safe?
"""
return self.is_complete()
- def is_complete(self):
+ def is_complete(self) -> bool:
"""
Has an asynchronous operation completed?
ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
return ret
- def get_return_value(self):
+ def get_return_value(self) -> int:
"""
Get the return value of an asychronous operation
return
self.error_callback(watch_id, error)
- def get_id(self):
+ def get_id(self) -> int:
return self.id
def check(self):
rados_ioctx_locator_set_key(self.io, _loc_key)
self.locator_key = loc_key
- def get_locator_key(self):
+ def get_locator_key(self) -> str:
"""
Get the locator_key of context
rados_ioctx_set_namespace(self.io, _nspace)
self.nspace = nspace
- def get_namespace(self):
+ def get_namespace(self) -> str:
"""
Get the namespace of context
# itself and set ret_s to NULL, hence XDECREF).
ref.Py_XDECREF(ret_s)
- def get_stats(self):
+ def get_stats(self) -> Dict[str, int]:
"""
Get pool usage statistics
"num_wr": stats.num_wr,
"num_wr_kb": stats.num_wr_kb}
- def remove_object(self, key: str):
+ def remove_object(self, key: str) -> bool:
"""
Delete an object
raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
return ret
- def cmpext(self, key: str, cmp_buf: bytes, offset: int = 0):
+ def cmpext(self, key: str, cmp_buf: bytes, offset: int = 0) -> int:
'''
Compare an on-disk object range with a buffer
:param key: the name of the object
(key, xattr_name))
return True
- def notify(self, obj: str, msg: str = '', timeout_ms: int = 5000):
+ def notify(self, obj: str, msg: str = '', timeout_ms: int = 5000) -> bool:
"""
Send a rados notification to an object.
self.require_ioctx_open()
return ObjectIterator(self)
- def list_snaps(self):
+ def list_snaps(self) -> SnapIterator:
"""
Get SnapIterator on rados.Ioctx object.
self.require_ioctx_open()
return SnapIterator(self)
- def get_pool_id(self):
+ def get_pool_id(self) -> int:
"""
Get pool id
ret = rados_ioctx_get_id(self.io)
return ret;
- def get_pool_name(self):
+ def get_pool_name(self) -> str:
"""
Get pool name
if ret != 0:
raise make_ex(ret, "Failed to remove snap %s" % snap_name)
- def lookup_snap(self, snap_name: str):
+ def lookup_snap(self, snap_name: str) -> Snap:
"""
Get the id of a pool snapshot
if ret != 0:
raise make_ex(ret, "Failed to rollback %s" % oid)
- def get_last_version(self):
+ def get_last_version(self) -> int:
"""
Return the version of the last object read or written to.
ret = rados_get_last_version(self.io)
return int(ret)
- def create_write_op(self):
+ def create_write_op(self) -> WriteOp:
"""
create write operation object.
need call release_write_op after use
"""
return WriteOp().create()
- def create_read_op(self):
+ def create_read_op(self) -> ReadOp:
"""
create read operation object.
need call release_read_op after use
oncomplete: Optional[Callable[[Completion], None]] = None,
onsafe: Optional[Callable[[Completion], None]] = None,
mtime: int = 0,
- flags: int = LIBRADOS_OPERATION_NOFLAG):
+ flags: int = LIBRADOS_OPERATION_NOFLAG) -> Completion:
"""
execute the real write operation asynchronously
:para write_op: write operation object
if ret < 0:
raise make_ex(ret, "error enabling application")
- def application_list(self):
+ def application_list(self) -> List[str]:
"""
Returns a list of enabled applications
finally:
free(apps)
- def application_metadata_get(self, app_name: str, key: str):
+ def application_metadata_get(self, app_name: str, key: str) -> str:
"""
Gets application metadata on an OSD pool for the given key
if ret < 0:
raise make_ex(ret, "error removing application metadata")
- def application_metadata_list(self, app_name: str):
+ def application_metadata_list(self, app_name: str) -> List[Tuple[str, str]]:
"""
Returns a list of enabled applications
free(c_keys)
free(c_vals)
- def alignment(self):
+ def alignment(self) -> int:
"""
Returns pool alignment