-# cython: embedsignature=True
+# cython: embedsignature=True, binding=True
"""
This module is a thin wrapper around librados.
import threading
import time
-try:
- from collections.abc import Callable
-except ImportError:
- from collections import Callable
from datetime import datetime, timedelta
from functools import partial, wraps
from itertools import chain
-
+from typing import Callable, Dict, Optional, Sequence, Tuple, Union
cdef extern from "Python.h":
# These are in cpython/string.pxd, but use "object" types instead of
}
-cdef make_ex(ret, msg):
+cdef make_ex(ret: int, msg: str):
"""
Translate a librados return code into an exception.
return OSError(msg, errno=ret)
-# helper to specify an optional argument, where in addition to `cls`, `None`
-# is also acceptable
-def opt(cls):
- return (cls, None)
-
-
-# validate argument types of an instance method
-# kwargs is an un-ordered dict, so use args instead
-def requires(*types):
- def is_type_of(v, t):
- if t is None:
- return v is None
- else:
- return isinstance(v, t)
-
- def check_type(val, arg_name, arg_type):
- if isinstance(arg_type, tuple):
- if any(is_type_of(val, t) for t in arg_type):
- return
- type_names = ' or '.join('None' if t is None else t.__name__
- for t in arg_type)
- raise TypeError('%s must be %s' % (arg_name, type_names))
- else:
- if is_type_of(val, arg_type):
- return
- assert(arg_type is not None)
- raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
-
- def wrapper(f):
- # FIXME(sileht): this stop with
- # AttributeError: 'method_descriptor' object has no attribute '__module__'
- # @wraps(f)
- def validate_func(*args, **kwargs):
- # ignore the `self` arg
- pos_args = zip(args[1:], types)
- named_args = ((kwargs[name], (name, spec)) for name, spec in types
- if name in kwargs)
- for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
- check_type(arg_val, arg_name, arg_type)
- return f(*args, **kwargs)
- return validate_func
- return wrapper
-
-
def cstr(val, name, encoding="utf-8", opt=False):
"""
Create a byte string from a Python string
DEFAULT_CONF_FILES = -2
"special value that indicates the default conffiles should be read when creating a mount handle"
- @requires(('rados_id', opt(str)), ('name', opt(str)), ('clustername', opt(str)),
- ('conffile', (str, int)))
- def __setup(self, rados_id=None, name=None, clustername=None,
- conf_defaults=None, conffile=NO_CONF_FILE, conf=None, flags=0,
- context=None):
+ def __setup(self,
+ rados_id: Optional[str] = None,
+ name: Optional[str] = None,
+ clustername: Optional[str] = None,
+ conf_defaults: Optional[Dict[str, str]] = None,
+ conffile: Union[str, int, None] = NO_CONF_FILE,
+ conf: Optional[Dict[str, str]] = None,
+ flags: int = 0,
+ context: object = None):
self.monitor_callback = None
self.monitor_callback2 = None
self.parsed_args = []
if clustername is None:
clustername = ''
- name = cstr(name, 'name')
- clustername = cstr(clustername, 'clustername')
+ name_raw = cstr(name, 'name')
+ clustername_raw = cstr(clustername, 'clustername')
cdef:
- char *_name = name
- char *_clustername = clustername
+ char *_name = name_raw
+ char *_clustername = clustername_raw
int _flags = flags
int ret
rados_version(&major, &minor, &extra)
return Version(major, minor, extra)
- @requires(('path', opt(str)))
- def conf_read_file(self, path=None):
+ def conf_read_file(self, path: Optional[str] = None):
"""
Configure the cluster handle using a Ceph config file.
:type path: str
"""
self.require_state("configuring", "connected")
- path = cstr(path, 'path', opt=True)
+ path_raw = cstr(path, 'path', opt=True)
cdef:
- char *_path = opt_str(path)
+ char *_path = opt_str(path_raw)
with nogil:
ret = rados_conf_read_file(self.cluster, _path)
if ret != 0:
free(_argv)
free(_remargv)
- def conf_parse_env(self, var='CEPH_ARGS'):
+ def conf_parse_env(self, var: Optional[str] = 'CEPH_ARGS'):
"""
Parse known arguments from an environment variable, normally
CEPH_ARGS.
if not var:
return
- var = cstr(var, 'var')
+ var_raw = cstr(var, 'var')
cdef:
- char *_var = var
+ char *_var = var_raw
with nogil:
ret = rados_conf_parse_env(self.cluster, _var)
if ret != 0:
raise make_ex(ret, "error calling conf_parse_env")
- @requires(('option', str))
- def conf_get(self, option):
+ def conf_get(self, option: str):
"""
Get the value of a configuration option
:raises: :class:`TypeError`
"""
self.require_state("configuring", "connected")
- option = cstr(option, 'option')
+ option_raw = cstr(option, 'option')
cdef:
- char *_option = option
+ char *_option = option_raw
size_t length = 20
char *ret_buf = NULL
finally:
free(ret_buf)
- @requires(('option', str), ('val', str))
- def conf_set(self, option, val):
+ def conf_set(self, option: str, val: str):
"""
Set the value of a configuration option
:raises: :class:`TypeError`, :class:`ObjectNotFound`
"""
self.require_state("configuring", "connected")
- option = cstr(option, 'option')
- val = cstr(val, 'val')
+ option_raw = cstr(option, 'option')
+ val_raw = cstr(val, 'val')
cdef:
- char *_option = option
- char *_val = val
+ char *_option = option_raw
+ char *_val = val_raw
with nogil:
ret = rados_conf_set(self.cluster, _option, _val)
if ret != 0:
raise make_ex(ret, "error calling conf_set")
- def ping_monitor(self, mon_id):
+ def ping_monitor(self, mon_id: str):
"""
Ping a monitor to assess liveness
self.require_state("configuring", "connected")
- mon_id = cstr(mon_id, 'mon_id')
+ mon_id_raw = cstr(mon_id, 'mon_id')
cdef:
- char *_mon_id = mon_id
+ char *_mon_id = mon_id_raw
size_t outstrlen = 0
char *outstr
'kb_avail': stats.kb_avail,
'num_objects': stats.num_objects}
- @requires(('pool_name', str))
- def pool_exists(self, pool_name):
+ def pool_exists(self, pool_name: str):
"""
Checks if a given pool exists.
"""
self.require_state("connected")
- pool_name = cstr(pool_name, 'pool_name')
+ pool_name_raw = cstr(pool_name, 'pool_name')
cdef:
- char *_pool_name = pool_name
+ char *_pool_name = pool_name_raw
with nogil:
ret = rados_pool_lookup(self.cluster, _pool_name)
else:
raise make_ex(ret, "error looking up pool '%s'" % pool_name)
- @requires(('pool_name', str))
- def pool_lookup(self, pool_name):
+ def pool_lookup(self, pool_name: str):
"""
Returns a pool's ID based on its name.
:returns: int - pool ID, or None if it doesn't exist
"""
self.require_state("connected")
- pool_name = cstr(pool_name, 'pool_name')
+ pool_name_raw = cstr(pool_name, 'pool_name')
cdef:
- char *_pool_name = pool_name
+ char *_pool_name = pool_name_raw
with nogil:
ret = rados_pool_lookup(self.cluster, _pool_name)
else:
raise make_ex(ret, "error looking up pool '%s'" % pool_name)
- @requires(('pool_id', int))
- def pool_reverse_lookup(self, pool_id):
+ def pool_reverse_lookup(self, pool_id: int):
"""
Returns a pool's name based on its ID.
finally:
free(name)
- @requires(('pool_name', str), ('crush_rule', opt(int)), ('auid', opt(int)))
- def create_pool(self, pool_name, crush_rule=None, auid=None):
+ def create_pool(self, pool_name: str,
+ crush_rule: Optional[int] = None,
+ auid: Optional[int] = None):
"""
Create a pool:
- with default settings: if crush_rule=None and auid=None
"""
self.require_state("connected")
- pool_name = cstr(pool_name, 'pool_name')
+ pool_name_raw = cstr(pool_name, 'pool_name')
cdef:
- char *_pool_name = pool_name
+ char *_pool_name = pool_name_raw
uint8_t _crush_rule
uint64_t _auid
if ret < 0:
raise make_ex(ret, "error creating pool '%s'" % pool_name)
- @requires(('pool_id', int))
- def get_pool_base_tier(self, pool_id):
+ def get_pool_base_tier(self, pool_id: int):
"""
Get base pool
raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
return int(base_tier)
- @requires(('pool_name', str))
- def delete_pool(self, pool_name):
+ def delete_pool(self, pool_name: str):
"""
Delete a pool and all data inside it.
"""
self.require_state("connected")
- pool_name = cstr(pool_name, 'pool_name')
+ pool_name_raw = cstr(pool_name, 'pool_name')
cdef:
- char *_pool_name = pool_name
+ char *_pool_name = pool_name_raw
with nogil:
ret = rados_pool_delete(self.cluster, _pool_name)
if ret < 0:
raise make_ex(ret, "error deleting pool '%s'" % pool_name)
- @requires(('pool_id', int))
- def get_inconsistent_pgs(self, pool_id):
+ def get_inconsistent_pgs(self, pool_id: int):
"""
List inconsistent placement groups in the given pool
finally:
free(ret_buf)
- @requires(('ioctx_name', str))
- def open_ioctx(self, ioctx_name):
+ def open_ioctx(self, ioctx_name: str):
"""
Create an io context
:returns: Ioctx - Rados Ioctx object
"""
self.require_state("connected")
- ioctx_name = cstr(ioctx_name, 'ioctx_name')
+ ioctx_name_raw = cstr(ioctx_name, 'ioctx_name')
cdef:
rados_ioctx_t ioctx
- char *_ioctx_name = ioctx_name
+ char *_ioctx_name = ioctx_name_raw
with nogil:
ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
if ret < 0:
io.io = ioctx
return io
- @requires(('pool_id', int))
- def open_ioctx2(self, pool_id):
+ def open_ioctx2(self, pool_id: int) -> Ioctx:
"""
Create an io context
io.io = ioctx
return io
- @requires(('cmd': str), ('inbuf': bytes), ('timeout', opt(int)), ('target', opt(str)))
- def mon_command(self, cmd, inbuf, timeout=0, target=None):
+ def mon_command(self,
+ cmd: str,
+ inbuf: bytes,
+ timeout: int = 0,
+ target: Optional[Union[str, int]] = None) -> Tuple[int, str, bytes]:
"""
Send a command to the mon.
:param cmd: JSON formatted string.
:param inbuf: optional string.
:param timeout: This parameter is ignored.
- :param target: name of a specific mon. Optional
+ :param target: name or rank of a specific mon. Optional
:return: (int ret, string outbuf, string outs)
Example:
"""
# NOTE(sileht): timeout is ignored because C API doesn't provide
# timeout argument, but we keep it for backward compat with old python binding
-
self.require_state("connected")
- cmd = [cstr(cmd, 'cmd')]
+ cmds = [cstr(cmd, 'cmd')]
if isinstance(target, int):
# NOTE(sileht): looks weird but test_monmap_dump pass int
cdef:
char *_target = opt_str(target)
- char **_cmd = to_bytes_array(cmd)
- size_t _cmdlen = len(cmd)
+ char **_cmd = to_bytes_array(cmds)
+ size_t _cmdlen = len(cmds)
char *_inbuf = inbuf
size_t _inbuf_len = len(inbuf)
finally:
free(_cmd)
- def osd_command(self, osdid, cmd, inbuf, timeout=0):
+ def osd_command(self,
+ osdid: int,
+ cmd: str,
+ inbuf: bytes,
+ timeout: int = 0) -> Tuple[int, str, bytes]:
"""
osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
finally:
free(_cmd)
- def mgr_command(self, cmd, inbuf, timeout=0, target=None):
+ def mgr_command(self,
+ cmd: str,
+ inbuf: bytes,
+ timeout: int = 0,
+ target: Optional[str] = None) -> Tuple[int, str, bytes]:
"""
:return: (int ret, string outbuf, string outs)
"""
finally:
free(_cmd)
- def pg_command(self, pgid, cmd, inbuf, timeout=0):
+ def pg_command(self,
+ pgid: str,
+ cmd: str,
+ inbuf: bytes,
+ timeout: int = 0) -> Tuple[int, str, bytes]:
"""
pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
ret = rados_wait_for_latest_osdmap(self.cluster)
return ret
- def blocklist_add(self, client_address, expire_seconds=0):
+ def blocklist_add(self, client_address: str, expire_seconds: int = 0):
"""
Blocklist a client from the OSDs
:raises: :class:`Error`
"""
self.require_state("connected")
- client_address = cstr(client_address, 'client_address')
+ client_address_raw = cstr(client_address, 'client_address')
cdef:
uint32_t _expire_seconds = expire_seconds
- char *_client_address = client_address
+ char *_client_address = client_address_raw
with nogil:
ret = rados_blocklist_add(self.cluster, _client_address, _expire_seconds)
if ret < 0:
raise make_ex(ret, "error blocklisting client '%s'" % client_address)
- def monitor_log(self, level, callback, arg):
+ def monitor_log(self, level: str,
+ callback: Optional[Callable[[object, str, str, str, int, int, int, str, str], None]] = None,
+ arg: Optional[object] = None):
if level not in MONITOR_LEVELS:
raise LogicError("invalid monitor level " + level)
if callback is not None and not callable(callback):
raise LogicError("callback must be a callable function or None")
- level = cstr(level, 'level')
- cdef char *_level = level
+ level_raw = cstr(level, 'level')
+ cdef char *_level = level_raw
if callback is None:
with nogil:
self.monitor_callback = cb
self.monitor_callback2 = None
- def monitor_log2(self, level, callback, arg):
+ def monitor_log2(self, level: str,
+ callback: Optional[Callable[[object, str, str, str, str, int, int, int, str, str], None]] = None,
+ arg: Optional[object] = None):
if level not in MONITOR_LEVELS:
raise LogicError("invalid monitor level " + level)
if callback is not None and not callable(callback):
raise LogicError("callback must be a callable function or None")
- level = cstr(level, 'level')
- cdef char *_level = level
+ level_raw = cstr(level, 'level')
+ cdef char *_level = level_raw
if callback is None:
with nogil:
self.monitor_callback = None
self.monitor_callback2 = cb
- @requires(('service', str), ('daemon', str), ('metadata', dict))
- def service_daemon_register(self, service, daemon, metadata):
+ def service_daemon_register(self, service: str, daemon: str, metadata: Dict[str, str]):
"""
:param str service: service name (e.g. "rgw")
:param str daemon: daemon name (e.g. "gwfoo")
:param dict metadata: static metadata about the register daemon
(e.g., the version of Ceph, the kernel version.)
"""
- service = cstr(service, 'service')
- daemon = cstr(daemon, 'daemon')
+ service_raw = cstr(service, 'service')
+ daemon_raw = cstr(daemon, 'daemon')
metadata_dict = flatten_dict(metadata, 'metadata')
cdef:
- char *_service = service
- char *_daemon = daemon
+ char *_service = service_raw
+ char *_daemon = daemon_raw
char *_metadata = metadata_dict
with nogil:
if ret != 0:
raise make_ex(ret, "error calling service_register()")
- @requires(('metadata', dict))
- def service_daemon_update(self, status):
+ def service_daemon_update(self, status: Dict[str, str]):
status_dict = flatten_dict(status, 'status')
cdef:
char *_status = status_dict
with nogil:
rados_release_write_op(self.write_op)
- @requires(('exclusive', opt(int)))
- def new(self, exclusive=None):
+ def new(self, exclusive: Optional[int] = None):
"""
Create the object.
"""
with nogil:
rados_write_op_remove(self.write_op)
- @requires(('flags', int))
- def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
+ def set_flags(self, flags: int = LIBRADOS_OPERATION_NOFLAG):
"""
Set flags for the last operation added to this write_op.
:para flags: flags to apply to the last operation
with nogil:
rados_write_op_set_flags(self.write_op, _flags)
- @requires(('xattr_name', str), ('xattr_value', bytes))
- def set_xattr(self, xattr_name, xattr_value):
+ def set_xattr(self, xattr_name: str, xattr_value: bytes):
"""
Set an extended attribute on an object.
:param xattr_name: name of the xattr
:param xattr_value: buffer to set xattr to
:type xattr_value: bytes
"""
- xattr_name = cstr(xattr_name, 'xattr_name')
+ xattr_name_raw = cstr(xattr_name, 'xattr_name')
cdef:
- char *_xattr_name = xattr_name
+ char *_xattr_name = xattr_name_raw
char *_xattr_value = xattr_value
size_t _xattr_value_len = len(xattr_value)
with nogil:
rados_write_op_setxattr(self.write_op, _xattr_name, _xattr_value, _xattr_value_len)
- @requires(('xattr_name', str))
- def rm_xattr(self, xattr_name):
+ def rm_xattr(self, xattr_name: str):
"""
Removes an extended attribute on from an object.
:param xattr_name: name of the xattr to remove
:type xattr_name: str
"""
- xattr_name = cstr(xattr_name, 'xattr_name')
+ xattr_name_raw = cstr(xattr_name, 'xattr_name')
cdef:
- char *_xattr_name = xattr_name
+ char *_xattr_name = xattr_name_raw
with nogil:
rados_write_op_rmxattr(self.write_op, _xattr_name)
- @requires(('to_write', bytes))
- def append(self, to_write):
+ def append(self, to_write: bytes):
"""
Append data to an object synchronously
:param to_write: data to write
with nogil:
rados_write_op_append(self.write_op, _to_write, length)
- @requires(('to_write', bytes))
- def write_full(self, to_write):
+ def write_full(self, to_write: bytes):
"""
Write whole object, atomically replacing it.
:param to_write: data to write
with nogil:
rados_write_op_write_full(self.write_op, _to_write, length)
- @requires(('to_write', bytes), ('offset', int))
- def write(self, to_write, offset=0):
+ def write(self, to_write: bytes, offset: int = 0):
"""
Write to offset.
:param to_write: data to write
with nogil:
rados_write_op_write(self.write_op, _to_write, length, _offset)
- @requires(('version', int))
- def assert_version(self, version):
+ def assert_version(self, version: int):
"""
Check if object's version is the expected one.
:param version: expected version of the object
with nogil:
rados_write_op_assert_version(self.write_op, _version)
- @requires(('offset', int), ('length', int))
- def zero(self, offset, length):
+ def zero(self, offset: int, length: int):
"""
Zero part of an object.
:param offset: byte offset in the object to begin writing at
with nogil:
rados_write_op_zero(self.write_op, _length, _offset)
- @requires(('offset', int))
- def truncate(self, offset):
+ def truncate(self, offset: int):
"""
Truncate an object.
:param offset: byte offset in the object to begin truncating at
with nogil:
rados_write_op_truncate(self.write_op, _offset)
- @requires(('cls', str), ('method', str), ('data', bytes))
- def execute(self, cls, method, data):
+ def execute(self, cls: str, method: str, data: bytes):
"""
Execute an OSD class method on an object
:type data: bytes
"""
- cls = cstr(cls, 'cls')
- method = cstr(method, 'method')
+ cls_raw = cstr(cls, 'cls')
+ method_raw = cstr(method, 'method')
cdef:
- char *_cls = cls
- char *_method = method
+ char *_cls = cls_raw
+ char *_method = method_raw
char *_data = data
size_t _data_len = len(data)
with nogil:
rados_write_op_exec(self.write_op, _cls, _method, _data, _data_len, NULL)
- @requires(('to_write', bytes), ('write_len', int), ('offset', int))
- def writesame(self, to_write, write_len, offset=0):
+ def writesame(self, to_write: bytes, write_len: int, offset: int = 0):
"""
Write the same buffer multiple times
:param to_write: data to write
with nogil:
rados_release_read_op(self.read_op)
- @requires(('flags', int))
- def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
+ def set_flags(self, flags: int = LIBRADOS_OPERATION_NOFLAG):
"""
Set flags for the last operation added to this read_op.
:para flags: flags to apply to the last operation
with self.lock:
self.safe_completions.append(completion_obj)
- def __get_completion(self, oncomplete, onsafe):
+ def __get_completion(self,
+ oncomplete: Callable[[Completion], None],
+ onsafe: Callable[[Completion], None]):
"""
Constructs a completion to use with asynchronous operations
ioctx.set_namespace(self.get_namespace())
return ioctx
- @requires(('object_name', str), ('oncomplete', opt(Callable)))
- def aio_stat(self, object_name, oncomplete):
+ def aio_stat(self,
+ object_name: str,
+ oncomplete: Callable[[Completion, Optional[int], Optional[time.struct_time]], None]) -> Completion:
"""
Asynchronously get object stats (size/mtime)
:returns: completion object
"""
- object_name = cstr(object_name, 'object_name')
+ object_name_raw = cstr(object_name, 'object_name')
cdef:
Completion completion
- char *_object_name = object_name
+ char *_object_name = object_name_raw
uint64_t psize
time_t pmtime
raise make_ex(ret, "error stating %s" % object_name)
return completion
- @requires(('object_name', str), ('to_write', bytes), ('offset', int),
- ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
- def aio_write(self, object_name, to_write, offset=0,
- oncomplete=None, onsafe=None):
+ def aio_write(self, object_name: str, to_write: bytes, offset: int = 0,
+ oncomplete: Optional[Callable[[Completion], None]] = None,
+ onsafe: Optional[Callable[[Completion], None]] = None) -> Completion:
"""
Write data to an object asynchronously
:returns: completion object
"""
- object_name = cstr(object_name, 'object_name')
+ object_name_raw = cstr(object_name, 'object_name')
cdef:
Completion completion
- char* _object_name = object_name
+ char* _object_name = object_name_raw
char* _to_write = to_write
size_t size = len(to_write)
uint64_t _offset = offset
raise make_ex(ret, "error writing object %s" % object_name)
return completion
- @requires(('object_name', str), ('to_write', bytes), ('oncomplete', opt(Callable)),
- ('onsafe', opt(Callable)))
- def aio_write_full(self, object_name, to_write,
- oncomplete=None, onsafe=None):
+ def aio_write_full(self, object_name: str, to_write: bytes,
+ oncomplete: Optional[Callable] = None,
+ onsafe: Optional[Callable] = None) -> Completion:
"""
Asynchronously write an entire object
:returns: completion object
"""
- object_name = cstr(object_name, 'object_name')
+ object_name_raw = cstr(object_name, 'object_name')
cdef:
Completion completion
- char* _object_name = object_name
+ char* _object_name = object_name_raw
char* _to_write = to_write
size_t size = len(to_write)
raise make_ex(ret, "error writing object %s" % object_name)
return completion
- @requires(('object_name', str), ('to_write', bytes), ('write_len', int),
- ('offset', int), ('oncomplete', opt(Callable)))
- def aio_writesame(self, object_name, to_write, write_len, offset=0,
- oncomplete=None):
+ def aio_writesame(self, object_name: str, to_write: bytes,
+ write_len: int, offset: int = 0,
+ oncomplete: Optional[Callable] = None) -> Completion:
"""
Asynchronously write the same buffer multiple times
:returns: completion object
"""
- object_name = cstr(object_name, 'object_name')
+ object_name_raw = cstr(object_name, 'object_name')
cdef:
Completion completion
- char* _object_name = object_name
+ char* _object_name = object_name_raw
char* _to_write = to_write
size_t _data_len = len(to_write)
size_t _write_len = write_len
raise make_ex(ret, "error writing object %s" % object_name)
return completion
- @requires(('object_name', str), ('to_append', bytes), ('oncomplete', opt(Callable)),
- ('onsafe', opt(Callable)))
- def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
+ def aio_append(self, object_name: str, to_append: bytes,
+ oncomplete: Optional[Callable] = None,
+ onsafe: Optional[Callable] = None) -> Completion:
"""
Asynchronously append data to an object
:raises: :class:`Error`
:returns: completion object
"""
- object_name = cstr(object_name, 'object_name')
+ object_name_raw = cstr(object_name, 'object_name')
cdef:
Completion completion
- char* _object_name = object_name
+ char* _object_name = object_name_raw
char* _to_append = to_append
size_t size = len(to_append)
if ret < 0:
raise make_ex(ret, "error flushing")
- @requires(('object_name', str), ('cmp_buf', bytes), ('offset', int))
- def aio_cmpext(self, object_name, cmp_buf, offset=0, oncomplete=None):
+ def aio_cmpext(self, object_name: str, cmp_buf: bytes, offset: int = 0,
+ oncomplete: Optional[Callable] = None) -> Completion:
"""
Asynchronously compare an on-disk object range with a buffer
:param object_name: the name of the object
returns: 0 - on success, negative error code on failure,
(-MAX_ERRNO - mismatch_off) on mismatch
"""
- object_name = cstr(object_name, 'object_name')
+ object_name_raw = cstr(object_name, 'object_name')
cdef:
Completion completion
- char* _object_name = object_name
+ char* _object_name = object_name_raw
char* _cmp_buf = cmp_buf
size_t _cmp_buf_len = len(cmp_buf)
uint64_t _offset = offset
raise make_ex(ret, "failed to compare %s" % object_name)
return completion
- @requires(('object_name', str), ('length', int), ('offset', int),
- ('oncomplete', opt(Callable)))
- def aio_read(self, object_name, length, offset, oncomplete):
+ def aio_read(self, object_name: str, length: int, offset: int,
+ oncomplete: Optional[Callable] = None) -> Completion:
"""
Asynchronously read data from an object
:returns: completion object
"""
- object_name = cstr(object_name, 'object_name')
+ object_name_raw = cstr(object_name, 'object_name')
cdef:
Completion completion
- char* _object_name = object_name
+ char* _object_name = object_name_raw
uint64_t _offset = offset
char *ref_buf
raise make_ex(ret, "error reading %s" % object_name)
return completion
- @requires(('object_name', str), ('cls', str), ('method', str),
- ('data', bytes), ('length', int),
- ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
- def aio_execute(self, object_name, cls, method, data,
- length=8192, oncomplete=None, onsafe=None):
+ def aio_execute(self, object_name: str, cls: str, method: str,
+ data: bytes, length: int = 8192,
+ oncomplete: Optional[Callable[[Completion, bytes], None]] = None,
+ onsafe: Optional[Callable[[Completion, bytes], None]] = None) -> Completion:
"""
Asynchronously execute an OSD class method on an object.
:returns: completion object
"""
- object_name = cstr(object_name, 'object_name')
- cls = cstr(cls, 'cls')
- method = cstr(method, 'method')
+ object_name_raw = cstr(object_name, 'object_name')
+ cls_raw = cstr(cls, 'cls')
+ method_raw = cstr(method, 'method')
cdef:
Completion completion
- char *_object_name = object_name
- char *_cls = cls
- char *_method = method
+ char *_object_name = object_name_raw
+ char *_cls = cls_raw
+ char *_method = method_raw
char *_data = data
size_t _data_len = len(data)
raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
return completion
- @requires(('object_name', str), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
- def aio_remove(self, object_name, oncomplete=None, onsafe=None):
+ def aio_remove(self, object_name: str,
+ oncomplete: Optional[Callable] = None,
+ onsafe: Optional[Callable] = None) -> Completion:
"""
Asynchronously remove an object
:raises: :class:`Error`
:returns: completion object
"""
- object_name = cstr(object_name, 'object_name')
+ object_name_raw = cstr(object_name, 'object_name')
cdef:
Completion completion
- char* _object_name = object_name
+ char* _object_name = object_name_raw
completion = self.__get_completion(oncomplete, onsafe)
self.__track_completion(completion)
if self.state != "open":
raise IoctxStateError("The pool is %s" % self.state)
- @requires(('loc_key', str))
- def set_locator_key(self, loc_key):
+ def set_locator_key(self, loc_key: str):
"""
Set the key for mapping objects to pgs within an io context.
"""
return self.locator_key
- @requires(('snap_id', long))
- def set_read(self, snap_id):
+ def set_read(self, snap_id: int):
"""
Set the snapshot for reading objects.
with nogil:
rados_ioctx_snap_set_read(self.io, _snap_id)
- @requires(('nspace', str))
- def set_namespace(self, nspace):
+ def set_namespace(self, nspace: str):
"""
Set the namespace for objects within an io context.
self.state = "closed"
- @requires(('key', str), ('data', bytes))
- def write(self, key, data, offset=0):
+ def write(self, key: str, data: bytes, offset: int = 0):
"""
Write data to an object synchronously
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
+ key_raw = cstr(key, 'key')
cdef:
- char *_key = key
+ char *_key = key_raw
char *_data = data
size_t length = len(data)
uint64_t _offset = offset
raise LogicError("Ioctx.write(%s): rados_write \
returned %d, but should return zero on success." % (self.name, ret))
- @requires(('key', str), ('data', bytes))
- def write_full(self, key, data):
+ def write_full(self, key: str, data: bytes):
"""
Write an entire object synchronously.
:returns: int - 0 on success
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
+ key_raw = cstr(key, 'key')
cdef:
- char *_key = key
+ char *_key = key_raw
char *_data = data
size_t length = len(data)
raise LogicError("Ioctx.write_full(%s): rados_write_full \
returned %d, but should return zero on success." % (self.name, ret))
- @requires(('key', str), ('data', bytes), ('write_len', int), ('offset', int))
- def writesame(self, key, data, write_len, offset=0):
+ def writesame(self, key: str, data: bytes, write_len: int, offset: int = 0):
"""
Write the same buffer multiple times
:param key: name of the object
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
+ key_raw = cstr(key, 'key')
cdef:
- char *_key = key
+ char *_key = key_raw
char *_data = data
size_t _data_len = len(data)
size_t _write_len = write_len
% (self.name, key))
assert(ret == 0)
- @requires(('key', str), ('data', bytes))
- def append(self, key, data):
+ def append(self, key: str, data: bytes):
"""
Append data to an object synchronously
:returns: int - 0 on success
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
+ key_raw = cstr(key, 'key')
cdef:
- char *_key = key
+ char *_key = key_raw
char *_data = data
size_t length = len(data)
raise LogicError("Ioctx.append(%s): rados_append \
returned %d, but should return zero on success." % (self.name, ret))
- @requires(('key', str))
- def read(self, key, length=8192, offset=0):
+ def read(self, key: str, length: int = 8192, offset: int = 0):
"""
Read data from an object synchronously
:returns: str - data read from object
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
+ key_raw = cstr(key, 'key')
cdef:
- char *_key = key
+ char *_key = key_raw
char *ret_buf
uint64_t _offset = offset
size_t _length = length
# itself and set ret_s to NULL, hence XDECREF).
ref.Py_XDECREF(ret_s)
- @requires(('key', str), ('cls', str), ('method', str), ('data', bytes))
- def execute(self, key, cls, method, data, length=8192):
+ def execute(self, key: str, cls: str, method: str, data: bytes, length: int = 8192) -> Tuple[int, object]:
"""
Execute an OSD class method on an object.
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
- cls = cstr(cls, 'cls')
- method = cstr(method, 'method')
+ key_raw = cstr(key, 'key')
+ cls_raw = cstr(cls, 'cls')
+ method_raw = cstr(method, 'method')
cdef:
- char *_key = key
- char *_cls = cls
- char *_method = method
+ char *_key = key_raw
+ char *_cls = cls_raw
+ char *_method = method_raw
char *_data = data
size_t _data_len = len(data)
"num_wr": stats.num_wr,
"num_wr_kb": stats.num_wr_kb}
- @requires(('key', str))
- def remove_object(self, key):
+ def remove_object(self, key: str):
"""
Delete an object
:returns: bool - True on success
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
+ key_raw = cstr(key, 'key')
cdef:
- char *_key = key
+ char *_key = key_raw
with nogil:
ret = rados_remove(self.io, _key)
raise make_ex(ret, "Failed to remove '%s'" % key)
return True
- @requires(('key', str))
- def trunc(self, key, size):
+ def trunc(self, key: str, size: int):
"""
Resize an object
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
+ key_raw = cstr(key, 'key')
cdef:
- char *_key = key
+ char *_key = key_raw
uint64_t _size = size
with nogil:
raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
return ret
- @requires(('key', str), ('cmp_buf', bytes), ('offset', int))
- def cmpext(self, key, cmp_buf, offset=0):
+ def cmpext(self, key: str, cmp_buf: bytes, offset: int = 0):
'''
Compare an on-disk object range with a buffer
:param key: the name of the object
(-MAX_ERRNO - mismatch_off) on mismatch
'''
self.require_ioctx_open()
- key = cstr(key, 'key')
+ key_raw = cstr(key, 'key')
cdef:
- char *_key = key
+ char *_key = key_raw
char *_cmp_buf = cmp_buf
size_t _cmp_buf_len = len(cmp_buf)
uint64_t _offset = offset
assert ret < -MAX_ERRNO or ret == 0, "Ioctx.cmpext(%s): failed to compare %s" % (self.name, key)
return ret
- @requires(('key', str))
- def stat(self, key):
+ def stat(self, key: str):
"""
Get object stats (size/mtime)
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
+ key_raw = cstr(key, 'key')
cdef:
- char *_key = key
+ char *_key = key_raw
uint64_t psize
time_t pmtime
raise make_ex(ret, "Failed to stat %r" % key)
return psize, time.localtime(pmtime)
- @requires(('key', str), ('xattr_name', str))
- def get_xattr(self, key, xattr_name):
+ def get_xattr(self, key: str, xattr_name: str):
"""
Get the value of an extended attribute on an object.
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
- xattr_name = cstr(xattr_name, 'xattr_name')
+ key_raw = cstr(key, 'key')
+ xattr_name_raw = cstr(xattr_name, 'xattr_name')
cdef:
- char *_key = key
- char *_xattr_name = xattr_name
+ char *_key = key_raw
+ char *_xattr_name = xattr_name_raw
size_t ret_length = 4096
char *ret_buf = NULL
finally:
free(ret_buf)
- @requires(('oid', str))
- def get_xattrs(self, oid):
+ def get_xattrs(self, oid: str):
"""
Start iterating over xattrs on an object.
self.require_ioctx_open()
return XattrIterator(self, oid)
- @requires(('key', str), ('xattr_name', str), ('xattr_value', bytes))
- def set_xattr(self, key, xattr_name, xattr_value):
+ def set_xattr(self, key: str, xattr_name: str, xattr_value: bytes):
"""
Set an extended attribute on an object.
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
- xattr_name = cstr(xattr_name, 'xattr_name')
+ key_raw = cstr(key, 'key')
+ xattr_name_raw = cstr(xattr_name, 'xattr_name')
cdef:
- char *_key = key
- char *_xattr_name = xattr_name
+ char *_key = key_raw
+ char *_xattr_name = xattr_name_raw
char *_xattr_value = xattr_value
size_t _xattr_value_len = len(xattr_value)
raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
return True
- @requires(('key', str), ('xattr_name', str))
- def rm_xattr(self, key, xattr_name):
+ def rm_xattr(self, key: str, xattr_name: str):
"""
Removes an extended attribute on from an object.
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
- xattr_name = cstr(xattr_name, 'xattr_name')
+ key_raw = cstr(key, 'key')
+ xattr_name_raw = cstr(xattr_name, 'xattr_name')
cdef:
- char *_key = key
- char *_xattr_name = xattr_name
+ char *_key = key_raw
+ char *_xattr_name = xattr_name_raw
with nogil:
ret = rados_rmxattr(self.io, _key, _xattr_name)
(key, xattr_name))
return True
- @requires(('obj', str), ('msg', str), ('timeout_ms', int))
- def notify(self, obj, msg='', timeout_ms=5000):
+ def notify(self, obj: str, msg: str = '', timeout_ms: int = 5000):
"""
Send a rados notification to an object.
self.require_ioctx_open()
msglen = len(msg)
- obj = cstr(obj, 'obj')
- msg = cstr(msg, 'msg')
+ obj_raw = cstr(obj, 'obj')
+ msg_raw = cstr(msg, 'msg')
cdef:
- char *_obj = obj
- char *_msg = msg
+ char *_obj = obj_raw
+ char *_msg = msg_raw
int _msglen = msglen
uint64_t _timeout_ms = timeout_ms
raise make_ex(ret, "Failed to notify %r" % (obj))
return True
- @requires(('obj', str), ('callback', opt(Callable)),
- ('error_callback', opt(Callable)), ('timeout', int))
- def watch(self, obj, callback, error_callback=None, timeout=None):
+ def watch(self, obj: str,
+ callback: Callable[[int, str, int, bytes], None],
+ error_callback: Optional[Callable[[int], None]] = None,
+ timeout: Optional[int] = None) -> Watch:
"""
Register an interest in an object.
free(name)
- @requires(('snap_name', str))
- def create_snap(self, snap_name):
+ def create_snap(self, snap_name: str):
"""
Create a pool-wide snapshot
:raises: :class:`Error`
"""
self.require_ioctx_open()
- snap_name = cstr(snap_name, 'snap_name')
- cdef char *_snap_name = snap_name
+ snap_name_raw = cstr(snap_name, 'snap_name')
+ cdef char *_snap_name = snap_name_raw
with nogil:
ret = rados_ioctx_snap_create(self.io, _snap_name)
if ret != 0:
raise make_ex(ret, "Failed to create snap %s" % snap_name)
- @requires(('snap_name', str))
- def remove_snap(self, snap_name):
+ def remove_snap(self, snap_name: str):
"""
Removes a pool-wide snapshot
:raises: :class:`Error`
"""
self.require_ioctx_open()
- snap_name = cstr(snap_name, 'snap_name')
- cdef char *_snap_name = snap_name
+ snap_name_raw = cstr(snap_name, 'snap_name')
+ cdef char *_snap_name = snap_name_raw
with nogil:
ret = rados_ioctx_snap_remove(self.io, _snap_name)
if ret != 0:
raise make_ex(ret, "Failed to remove snap %s" % snap_name)
- @requires(('snap_name', str))
- def lookup_snap(self, snap_name):
+ def lookup_snap(self, snap_name: str):
"""
Get the id of a pool snapshot
raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
return Snap(self, snap_name, int(snap_id))
- @requires(('oid', str), ('snap_name', str))
- def snap_rollback(self, oid, snap_name):
+ def snap_rollback(self, oid: str, snap_name: str):
"""
Rollback an object to a snapshot
:raises: :class:`Error`
"""
self.require_ioctx_open()
- oid = cstr(oid, 'oid')
- snap_name = cstr(snap_name, 'snap_name')
+ oid_raw = cstr(oid, 'oid')
+ snap_name_raw = cstr(snap_name, 'snap_name')
cdef:
- char *_snap_name = snap_name
- char *_oid = oid
+ char *_oid = oid_raw
+ char *_snap_name = snap_name_raw
with nogil:
ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
raise make_ex(ret, "Failed to create self-managed snapshot")
return int(_snap_id)
- @requires(('snap_id', int))
- def remove_self_managed_snap(self, snap_id):
+ def remove_self_managed_snap(self, snap_id: int):
"""
Removes a self-managed snapshot
finally:
free(_snaps)
- @requires(('oid', str), ('snap_id', int))
- def rollback_self_managed_snap(self, oid, snap_id):
+ def rollback_self_managed_snap(self, oid: str, snap_id: int):
"""
Rolls an specific object back to a self-managed snapshot revision
:raises: :class:`Error`
"""
self.require_ioctx_open()
- oid = cstr(oid, 'oid')
+ oid_raw = cstr(oid, 'oid')
cdef:
- char *_oid = oid
+ char *_oid = oid_raw
rados_snap_t _snap_id = snap_id
with nogil:
ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
"""
read_op.release()
- @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
- def set_omap(self, write_op, keys, values):
+ def set_omap(self, write_op: WriteOp, keys: Sequence[str], values: Sequence[bytes]):
"""
set keys values to write_op
:para write_op: write_operation object
free(_values)
free(_lens)
- @requires(('write_op', WriteOp), ('oid', str), ('mtime', opt(int)), ('flags', opt(int)))
- def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
+ def operate_write_op(self,
+ write_op: WriteOp,
+ oid: str,
+ mtime: int = 0,
+ flags: int = LIBRADOS_OPERATION_NOFLAG):
"""
execute the real write operation
:para write_op: write operation object
:type flags: int
"""
- oid = cstr(oid, 'oid')
+ oid_raw = cstr(oid, 'oid')
cdef:
WriteOp _write_op = write_op
- char *_oid = oid
+ char *_oid = oid_raw
time_t _mtime = mtime
int _flags = flags
if ret != 0:
raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
- @requires(('write_op', WriteOp), ('oid', str), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
- def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
+ def operate_aio_write_op(self, write_op: WriteOp, oid: str,
+ oncomplete: Optional[Callable[[Completion], None]] = None,
+ onsafe: Optional[Callable[[Completion], None]] = None,
+ mtime: int = 0,
+ flags: int = LIBRADOS_OPERATION_NOFLAG):
"""
execute the real write operation asynchronously
:para write_op: write operation object
:returns: completion object
"""
- oid = cstr(oid, 'oid')
+ oid_raw = cstr(oid, 'oid')
cdef:
WriteOp _write_op = write_op
- char *_oid = oid
+ char *_oid = oid_raw
Completion completion
time_t _mtime = mtime
int _flags = flags
raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
return completion
- @requires(('read_op', ReadOp), ('oid', str), ('flag', opt(int)))
- def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
+ def operate_read_op(self, read_op: ReadOp, oid: str, flag: int = LIBRADOS_OPERATION_NOFLAG):
"""
execute the real read operation
:para read_op: read operation object
:para flag: flags to apply to the entire operation
:type flag: int
"""
- oid = cstr(oid, 'oid')
+ oid_raw = cstr(oid, 'oid')
cdef:
ReadOp _read_op = read_op
- char *_oid = oid
+ char *_oid = oid_raw
int _flag = flag
with nogil:
if ret != 0:
raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
- @requires(('read_op', ReadOp), ('oid', str), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
- def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
+ def operate_aio_read_op(self, read_op: ReadOp, oid: str,
+ oncomplete: Optional[Callable[[Completion], None]] = None,
+ onsafe: Optional[Callable[[Completion], None]] = None,
+ flag: int = LIBRADOS_OPERATION_NOFLAG) -> Completion:
"""
execute the real read operation
:para read_op: read operation object
:para flag: flags to apply to the entire operation
:type flag: int
"""
- oid = cstr(oid, 'oid')
+ oid_raw = cstr(oid, 'oid')
cdef:
ReadOp _read_op = read_op
- char *_oid = oid
+ char *_oid = oid_raw
Completion completion
int _flag = flag
raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
return completion
- @requires(('read_op', ReadOp), ('start_after', str), ('filter_prefix', str), ('max_return', int))
- def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
+ def get_omap_vals(self,
+ read_op: ReadOp,
+ start_after: str,
+ filter_prefix: str,
+ max_return: int) -> Tuple[OmapIterator, int]:
"""
get the omap values
:para read_op: read operation object
:returns: an iterator over the requested omap values, return value from this action
"""
- start_after = cstr(start_after, 'start_after') if start_after else None
- filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
+ start_after_raw = cstr(start_after, 'start_after') if start_after else None
+ filter_prefix_raw = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
cdef:
- char *_start_after = opt_str(start_after)
- char *_filter_prefix = opt_str(filter_prefix)
+ char *_start_after = opt_str(start_after_raw)
+ char *_filter_prefix = opt_str(filter_prefix_raw)
ReadOp _read_op = read_op
rados_omap_iter_t iter_addr = NULL
int _max_return = max_return
it.ctx = iter_addr
return it, 0 # 0 is meaningless; there for backward-compat
- @requires(('read_op', ReadOp), ('start_after', str), ('max_return', int))
- def get_omap_keys(self, read_op, start_after, max_return):
+ def get_omap_keys(self, read_op: ReadOp, start_after: str, max_return: int) -> Tuple[OmapIterator, int]:
"""
get the omap keys
:para read_op: read operation object
it.ctx = iter_addr
return it, 0 # 0 is meaningless; there for backward-compat
- @requires(('read_op', ReadOp), ('keys', tuple))
- def get_omap_vals_by_keys(self, read_op, keys):
+ def get_omap_vals_by_keys(self, read_op: ReadOp, keys: Sequence[str]) -> Tuple[OmapIterator, int]:
"""
get the omap values by keys
:para read_op: read operation object
finally:
free(_keys)
- @requires(('write_op', WriteOp), ('keys', tuple))
- def remove_omap_keys(self, write_op, keys):
+ def remove_omap_keys(self, write_op: WriteOp, keys: Sequence[str]):
"""
remove omap keys specifiled
:para write_op: write operation object
finally:
free(_keys)
- @requires(('write_op', WriteOp))
- def clear_omap(self, write_op):
+ def clear_omap(self, write_op: WriteOp):
"""
Remove all key/value pairs from an object
:para write_op: write operation object
with nogil:
rados_write_op_omap_clear(_write_op.write_op)
- @requires(('key', str), ('name', str), ('cookie', str), ('desc', str),
- ('duration', opt(int)), ('flags', int))
- def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
+ def lock_exclusive(self, key: str, name: str, cookie: str, desc: str = "",
+ duration: Optional[int] = None,
+ flags: int = 0):
"""
Take an exclusive lock on an object
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
- name = cstr(name, 'name')
- cookie = cstr(cookie, 'cookie')
- desc = cstr(desc, 'desc')
+ key_raw = cstr(key, 'key')
+ name_raw = cstr(name, 'name')
+ cookie_raw = cstr(cookie, 'cookie')
+ desc_raw = cstr(desc, 'desc')
cdef:
- char* _key = key
- char* _name = name
- char* _cookie = cookie
- char* _desc = desc
+ char* _key = key_raw
+ char* _name = name_raw
+ char* _cookie = cookie_raw
+ char* _desc = desc_raw
uint8_t _flags = flags
timeval _duration
if ret < 0:
raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
- @requires(('key', str), ('name', str), ('cookie', str), ('tag', str),
- ('desc', str), ('duration', opt(int)), ('flags', int))
- def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
+ def lock_shared(self, key: str, name: str, cookie: str, tag: str, desc: str = "",
+ duration: Optional[int] = None,
+ flags: int = 0):
"""
Take a shared lock on an object
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
- tag = cstr(tag, 'tag')
- name = cstr(name, 'name')
- cookie = cstr(cookie, 'cookie')
- desc = cstr(desc, 'desc')
+ key_raw = cstr(key, 'key')
+ tag_raw = cstr(tag, 'tag')
+ name_raw = cstr(name, 'name')
+ cookie_raw = cstr(cookie, 'cookie')
+ desc_raw = cstr(desc, 'desc')
cdef:
- char* _key = key
- char* _tag = tag
- char* _name = name
- char* _cookie = cookie
- char* _desc = desc
+ char* _key = key_raw
+ char* _tag = tag_raw
+ char* _name = name_raw
+ char* _cookie = cookie_raw
+ char* _desc = desc_raw
uint8_t _flags = flags
timeval _duration
if ret < 0:
raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
- @requires(('key', str), ('name', str), ('cookie', str))
- def unlock(self, key, name, cookie):
+ def unlock(self, key: str, name: str, cookie: str):
"""
Release a shared or exclusive lock on an object
"""
self.require_ioctx_open()
- key = cstr(key, 'key')
- name = cstr(name, 'name')
- cookie = cstr(cookie, 'cookie')
+ key_raw = cstr(key, 'key')
+ name_raw = cstr(name, 'name')
+ cookie_raw = cstr(cookie, 'cookie')
cdef:
- char* _key = key
- char* _name = name
- char* _cookie = cookie
+ char* _key = key_raw
+ char* _name = name_raw
+ char* _cookie = cookie_raw
with nogil:
ret = rados_unlock(self.io, _key, _name, _cookie)
with nogil:
rados_unset_osdmap_full_try(self.io)
- def application_enable(self, app_name, force=False):
+ def application_enable(self, app_name: str, force: bool = False):
"""
Enable an application on an OSD pool
:raises: :class:`Error`
"""
- app_name = cstr(app_name, 'app_name')
+ app_name_raw = cstr(app_name, 'app_name')
cdef:
- char *_app_name = app_name
+ char *_app_name = app_name_raw
int _force = (1 if force else 0)
with nogil:
finally:
free(apps)
- def application_metadata_get(self, app_name, key):
+ def application_metadata_get(self, app_name: str, key: str):
"""
Gets application metadata on an OSD pool for the given key
:raises: :class:`Error`
"""
- app_name = cstr(app_name, 'app_name')
- key = cstr(key, 'key')
+ app_name_raw = cstr(app_name, 'app_name')
+ key_raw = cstr(key, 'key')
cdef:
- char *_app_name = app_name
- char *_key = key
+ char *_app_name = app_name_raw
+ char *_key = key_raw
size_t size = 129
char *value = NULL
int ret
finally:
free(value)
- def application_metadata_set(self, app_name, key, value):
+ def application_metadata_set(self, app_name: str, key: str, value: str):
"""
Sets application metadata on an OSD pool
:raises: :class:`Error`
"""
- app_name = cstr(app_name, 'app_name')
- key = cstr(key, 'key')
- value = cstr(value, 'value')
+ app_name_raw = cstr(app_name, 'app_name')
+ key_raw = cstr(key, 'key')
+ value_raw = cstr(value, 'value')
cdef:
- char *_app_name = app_name
- char *_key = key
- char *_value = value
+ char *_app_name = app_name_raw
+ char *_key = key_raw
+ char *_value = value_raw
with nogil:
ret = rados_application_metadata_set(self.io, _app_name, _key,
if ret < 0:
raise make_ex(ret, "error setting application metadata")
- def application_metadata_remove(self, app_name, key):
+ def application_metadata_remove(self, app_name: str, key: str):
"""
Remove application metadata from an OSD pool
:raises: :class:`Error`
"""
- app_name = cstr(app_name, 'app_name')
- key = cstr(key, 'key')
+ app_name_raw = cstr(app_name, 'app_name')
+ key_raw = cstr(key, 'key')
cdef:
- char *_app_name = app_name
- char *_key = key
+ char *_app_name = app_name_raw
+ char *_key = key_raw
with nogil:
ret = rados_application_metadata_remove(self.io, _app_name, _key)
if ret < 0:
raise make_ex(ret, "error removing application metadata")
- def application_metadata_list(self, app_name):
+ def application_metadata_list(self, app_name: str):
"""
Returns a list of enabled applications
:type app_name: str
:returns: list of key/value tuples
"""
- app_name = cstr(app_name, 'app_name')
+ app_name_raw = cstr(app_name, 'app_name')
cdef:
- char *_app_name = app_name
+ char *_app_name = app_name_raw
size_t key_length = 128
size_t val_length = 128
char *c_keys = NULL