From: Mehdi Abaakouk Date: Mon, 15 Feb 2016 08:53:08 +0000 (+0100) Subject: Remove old rados pybinding X-Git-Tag: v10.1.0~369^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3c40524042c347e2f5565ad14ce9f7947d52d2a9;p=ceph.git Remove old rados pybinding Signed-off-by: Mehdi Abaakouk --- diff --git a/src/Makefile-client.am b/src/Makefile-client.am index 6babaf62e2e6..07413eaebe7d 100644 --- a/src/Makefile-client.am +++ b/src/Makefile-client.am @@ -34,8 +34,6 @@ bash_completion_DATA += \ $(srcdir)/bash_completion/rados \ $(srcdir)/bash_completion/radosgw-admin -python_PYTHON += pybind/rados.py - librados_config_SOURCES = librados-config.cc librados_config_LDADD = $(LIBRADOS) $(CEPH_GLOBAL) bin_PROGRAMS += librados-config diff --git a/src/pybind/rados.py b/src/pybind/rados.py deleted file mode 100644 index a599346ba0ba..000000000000 --- a/src/pybind/rados.py +++ /dev/null @@ -1,2467 +0,0 @@ -""" -This module is a thin wrapper around librados. - -Copyright 2011, Hannu Valtonen -""" -from ctypes import CDLL, c_char_p, c_size_t, c_void_p, c_char, c_int, c_long, \ - c_ulong, create_string_buffer, byref, Structure, c_uint64, c_ubyte, \ - pointer, CFUNCTYPE, c_int64, c_uint32, c_uint8 -from ctypes.util import find_library -import ctypes -import errno -import threading -import time -import sys - -from collections import Iterator -from datetime import datetime -from functools import wraps -from itertools import chain - -ANONYMOUS_AUID = 0xffffffffffffffff -ADMIN_AUID = 0 -LIBRADOS_ALL_NSPACES = '\001' - -LIBRADOS_OP_FLAG_FADVISE_RANDOM = 0x4 -LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = 0x8 -LIBRADOS_OP_FLAG_FADVISE_WILLNEED = 0x10 -LIBRADOS_OP_FLAG_FADVISE_DONTNEED = 0x20 -LIBRADOS_OP_FLAG_FADVISE_NOCACHE = 0x40 -LIBRADOS_SNAP_HEAD = -2 - - -# Are we running Python 2.x -_python2 = sys.hexversion < 0x03000000 - - -if _python2: - str_type = basestring -else: - str_type = str - - -class Error(Exception): - """ `Error` class, derived from `Exception` """ - pass - - -class InterruptedOrTimeoutError(Error): - """ `InterruptedOrTimeoutError` class, derived from `Error` """ - pass - - -class PermissionError(Error): - """ `PermissionError` class, derived from `Error` """ - pass - -class PermissionDeniedError(Error): - """ deal with EACCES related. """ - pass - -class ObjectNotFound(Error): - """ `ObjectNotFound` class, derived from `Error` """ - pass - - -class NoData(Error): - """ `NoData` class, derived from `Error` """ - pass - - -class ObjectExists(Error): - """ `ObjectExists` class, derived from `Error` """ - pass - - -class ObjectBusy(Error): - """ `ObjectBusy` class, derived from `Error` """ - pass - - -class IOError(Error): - """ `IOError` class, derived from `Error` """ - pass - - -class NoSpace(Error): - """ `NoSpace` class, derived from `Error` """ - pass - - -class IncompleteWriteError(Error): - """ `IncompleteWriteError` class, derived from `Error` """ - pass - - -class RadosStateError(Error): - """ `RadosStateError` class, derived from `Error` """ - pass - - -class IoctxStateError(Error): - """ `IoctxStateError` class, derived from `Error` """ - pass - - -class ObjectStateError(Error): - """ `ObjectStateError` class, derived from `Error` """ - pass - - -class LogicError(Error): - """ `` class, derived from `Error` """ - pass - - -class TimedOut(Error): - """ `TimedOut` class, derived from `Error` """ - pass - - -def make_ex(ret, msg): - """ - Translate a librados return code into an exception. - - :param ret: the return code - :type ret: int - :param msg: the error message to use - :type msg: str - :returns: a subclass of :class:`Error` - """ - - errors = { - errno.EPERM : PermissionError, - errno.ENOENT : ObjectNotFound, - errno.EIO : IOError, - errno.ENOSPC : NoSpace, - errno.EEXIST : ObjectExists, - errno.EBUSY : ObjectBusy, - errno.ENODATA : NoData, - errno.EINTR : InterruptedOrTimeoutError, - errno.ETIMEDOUT : TimedOut, - errno.EACCES : PermissionDeniedError - } - ret = abs(ret) - if ret in errors: - return errors[ret](msg) - else: - return Error(msg + (": errno %s" % errno.errorcode[ret])) - - -class rados_pool_stat_t(Structure): - """ Usage information for a pool """ - _fields_ = [("num_bytes", c_uint64), - ("num_kb", c_uint64), - ("num_objects", c_uint64), - ("num_object_clones", c_uint64), - ("num_object_copies", c_uint64), - ("num_objects_missing_on_primary", c_uint64), - ("num_objects_unfound", c_uint64), - ("num_objects_degraded", c_uint64), - ("num_rd", c_uint64), - ("num_rd_kb", c_uint64), - ("num_wr", c_uint64), - ("num_wr_kb", c_uint64)] - - -class rados_cluster_stat_t(Structure): - """ Cluster-wide usage information """ - _fields_ = [("kb", c_uint64), - ("kb_used", c_uint64), - ("kb_avail", c_uint64), - ("num_objects", c_uint64)] - - -class timeval(Structure): - _fields_ = [("tv_sec", c_long), ("tv_usec", c_long)] - - -class Version(object): - """ Version information """ - def __init__(self, major, minor, extra): - self.major = major - self.minor = minor - self.extra = extra - - def __str__(self): - return "%d.%d.%d" % (self.major, self.minor, self.extra) - - -class RadosThread(threading.Thread): - def __init__(self, target, args=None): - self.args = args - self.target = target - threading.Thread.__init__(self) - - def run(self): - self.retval = self.target(*self.args) - -# time in seconds between each call to t.join() for child thread -POLL_TIME_INCR = 0.5 - - -def run_in_thread(target, args, timeout=0): - interrupt = False - - countdown = timeout - t = RadosThread(target, args) - - # allow the main thread to exit (presumably, avoid a join() on this - # subthread) before this thread terminates. This allows SIGINT - # exit of a blocked call. See below. - t.daemon = True - - t.start() - try: - # poll for thread exit - while t.is_alive(): - t.join(POLL_TIME_INCR) - if timeout and t.is_alive(): - countdown = countdown - POLL_TIME_INCR - if countdown <= 0: - raise KeyboardInterrupt - - t.join() # in case t exits before reaching the join() above - except KeyboardInterrupt: - # ..but allow SIGINT to terminate the waiting. Note: this - # relies on the Linux kernel behavior of delivering the signal - # to the main thread in preference to any subthread (all that's - # strictly guaranteed is that *some* thread that has the signal - # unblocked will receive it). But there doesn't seem to be - # any interface to create t with SIGINT blocked. - interrupt = True - - if interrupt: - t.retval = -errno.EINTR - return t.retval - - -# helper to specify an optional argument, where in addition to `cls`, `None` -# is also acceptable -def opt(cls): - return (cls, None) - - -# validate argument types of an instance method -# kwargs is an un-ordered dict, so use args instead -def requires(*types): - def is_type_of(v, t): - if t is None: - return v is None - else: - return isinstance(v, t) - - def check_type(val, arg_name, arg_type): - if isinstance(arg_type, tuple): - if any(is_type_of(val, t) for t in arg_type): - return - type_names = ' or '.join('None' if t is None else t.__name__ - for t in arg_type) - raise TypeError('%s must be %s' % (arg_name, type_names)) - else: - if is_type_of(val, arg_type): - return - assert(arg_type is not None) - raise TypeError('%s must be %s' % (arg_name, arg_type.__name__)) - - def wrapper(f): - @wraps(f) - def validate_func(*args, **kwargs): - # ignore the `self` arg - pos_args = zip(args[1:], types) - named_args = ((kwargs[name], (name, spec)) for name, spec in types - if name in kwargs) - for arg_val, (arg_name, arg_type) in chain(pos_args, named_args): - check_type(arg_val, arg_name, arg_type) - return f(*args, **kwargs) - return validate_func - return wrapper - - -def cstr(val, encoding="utf-8"): - """ - Create a C-style string from a Python string - - :param str val: Python string - :param encoding: Encoding to use - :rtype: c_char_p - """ - if val is None: - return c_char_p(None) - - if _python2 and isinstance(val, str): - # Don't encode str on Python 2, as it's already an 8-bit string - return c_char_p(val) - else: - return c_char_p(val.encode(encoding)) - - -def decode_cstr(addr, size=-1, encoding="utf-8"): - """ - Decode a C-style string into a Python string. - - Return None if a the C string is a NULL pointer. - - :param c_char_p addr: C-style string - :param int: String size (assume NUL-terminated if size is -1) - :param encoding: Encoding to use - :rtype: str or None - """ - if not addr: - # NULL pointer - return None - - return ctypes.string_at(addr, size).decode(encoding) - - -class Rados(object): - """librados python wrapper""" - def require_state(self, *args): - """ - Checks if the Rados object is in a special state - - :raises: RadosStateError - """ - if self.state in args: - return - raise RadosStateError("You cannot perform that operation on a \ -Rados object in state %s." % self.state) - - @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)), - ('conffile', opt(str_type))) - def __init__(self, rados_id=None, name=None, clustername=None, - conf_defaults=None, conffile=None, conf=None, flags=0): - library_path = find_library('rados') - # maybe find_library can not find it correctly on all platforms, - # so fall back to librados.so.2 in such case. - self.librados = CDLL(library_path if library_path is not None else 'librados.so.2') - - self.parsed_args = [] - self.conf_defaults = conf_defaults - self.conffile = conffile - self.cluster = c_void_p() - self.rados_id = rados_id - if rados_id and name: - raise Error("Rados(): can't supply both rados_id and name") - elif rados_id: - name = 'client.' + rados_id - elif name is None: - name = 'client.admin' - if clustername is None: - clustername = 'ceph' - ret = run_in_thread(self.librados.rados_create2, - (byref(self.cluster), cstr(clustername), - cstr(name), c_uint64(flags))) - - if ret != 0: - raise Error("rados_initialize failed with error code: %d" % ret) - self.state = "configuring" - # order is important: conf_defaults, then conffile, then conf - if conf_defaults: - for key, value in conf_defaults.items(): - self.conf_set(key, value) - if conffile is not None: - # read the default conf file when '' is given - if conffile == '': - conffile = None - self.conf_read_file(conffile) - if conf: - for key, value in conf.items(): - self.conf_set(key, value) - - def shutdown(self): - """ - Disconnects from the cluster. Call this explicitly when a - Rados.connect()ed object is no longer used. - """ - if hasattr(self, "state") and self.state != "shutdown": - run_in_thread(self.librados.rados_shutdown, (self.cluster,)) - self.state = "shutdown" - - def __enter__(self): - self.connect() - return self - - def __exit__(self, type_, value, traceback): - self.shutdown() - return False - - def version(self): - """ - Get the version number of the ``librados`` C library. - - :returns: a tuple of ``(major, minor, extra)`` components of the - librados version - """ - major = c_int(0) - minor = c_int(0) - extra = c_int(0) - run_in_thread(self.librados.rados_version, - (byref(major), byref(minor), byref(extra))) - return Version(major.value, minor.value, extra.value) - - - @requires(('path', opt(str_type))) - def conf_read_file(self, path=None): - """ - Configure the cluster handle using a Ceph config file. - - :param path: path to the config file - :type path: str - """ - self.require_state("configuring", "connected") - ret = run_in_thread(self.librados.rados_conf_read_file, - (self.cluster, cstr(path))) - if (ret != 0): - raise make_ex(ret, "error calling conf_read_file") - - def conf_parse_argv(self, args): - """ - Parse known arguments from args, and remove; returned - args contain only those unknown to ceph - """ - self.require_state("configuring", "connected") - if not args: - return - # create instances of arrays of c_char_p's, both len(args) long - # cretargs will always be a subset of cargs (perhaps identical) - cargs = (c_char_p * len(args))(*map(cstr, args)) - cretargs = (c_char_p * len(args))() - ret = run_in_thread(self.librados.rados_conf_parse_argv_remainder, - (self.cluster, len(args), cargs, cretargs)) - if ret: - raise make_ex(ret, "error calling conf_parse_argv_remainder") - - # cretargs was allocated with fixed length; collapse return - # list to eliminate any missing args - retargs = [a.decode('utf-8') for a in cretargs if a is not None] - self.parsed_args = args - return retargs - - def conf_parse_env(self, var='CEPH_ARGS'): - """ - Parse known arguments from an environment variable, normally - CEPH_ARGS. - """ - self.require_state("configuring", "connected") - if not var: - return - ret = run_in_thread(self.librados.rados_conf_parse_env, - (self.cluster, cstr(var))) - if (ret != 0): - raise make_ex(ret, "error calling conf_parse_env") - - @requires(('option', str_type)) - def conf_get(self, option): - """ - Get the value of a configuration option - - :param option: which option to read - :type option: str - - :returns: str - value of the option or None - :raises: :class:`TypeError` - """ - self.require_state("configuring", "connected") - length = 20 - while True: - ret_buf = create_string_buffer(length) - ret = run_in_thread(self.librados.rados_conf_get, - (self.cluster, cstr(option), ret_buf, - c_size_t(length))) - if (ret == 0): - return decode_cstr(ret_buf) - elif (ret == -errno.ENAMETOOLONG): - length = length * 2 - elif (ret == -errno.ENOENT): - return None - else: - raise make_ex(ret, "error calling conf_get") - - @requires(('option', str_type), ('val', str_type)) - def conf_set(self, option, val): - """ - Set the value of a configuration option - - :param option: which option to set - :type option: str - :param option: value of the option - :type option: str - - :raises: :class:`TypeError`, :class:`ObjectNotFound` - """ - self.require_state("configuring", "connected") - ret = run_in_thread(self.librados.rados_conf_set, - (self.cluster, cstr(option), cstr(val))) - if (ret != 0): - raise make_ex(ret, "error calling conf_set") - - def ping_monitor(self, mon_id): - """ - Ping a monitor to assess liveness - - May be used as a simply way to assess liveness, or to obtain - information about the monitor in a simple way even in the - absence of quorum. - - :param mon_id: the ID portion of the monitor's name (i.e., mon.) - :type mon_id: str - :returns: the string reply from the monitor - """ - - self.require_state("configuring", "connected") - - outstrp = pointer(pointer(c_char())) - outstrlen = c_long() - - ret = run_in_thread(self.librados.rados_ping_monitor, - (self.cluster, cstr(mon_id), - outstrp, byref(outstrlen))) - - my_outstr = outstrp.contents[:(outstrlen.value)] - if outstrlen.value: - run_in_thread(self.librados.rados_buffer_free, (outstrp.contents,)) - - if ret != 0: - raise make_ex(ret, "error calling ping_monitor") - return decode_cstr(my_outstr) - - def connect(self, timeout=0): - """ - Connect to the cluster. Use shutdown() to release resources. - """ - self.require_state("configuring") - ret = run_in_thread(self.librados.rados_connect, (self.cluster,), - timeout) - if (ret != 0): - raise make_ex(ret, "error connecting to the cluster") - self.state = "connected" - - def get_cluster_stats(self): - """ - Read usage info about the cluster - - This tells you total space, space used, space available, and number - of objects. These are not updated immediately when data is written, - they are eventually consistent. - - :returns: dict - contains the following keys: - - - ``kb`` (int) - total space - - - ``kb_used`` (int) - space used - - - ``kb_avail`` (int) - free space available - - - ``num_objects`` (int) - number of objects - - """ - stats = rados_cluster_stat_t() - ret = run_in_thread(self.librados.rados_cluster_stat, - (self.cluster, byref(stats))) - if ret < 0: - raise make_ex( - ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id) - return {'kb': stats.kb, - 'kb_used': stats.kb_used, - 'kb_avail': stats.kb_avail, - 'num_objects': stats.num_objects} - - @requires(('pool_name', str_type)) - def pool_exists(self, pool_name): - """ - Checks if a given pool exists. - - :param pool_name: name of the pool to check - :type pool_name: str - - :raises: :class:`TypeError`, :class:`Error` - :returns: bool - whether the pool exists, false otherwise. - """ - self.require_state("connected") - ret = run_in_thread(self.librados.rados_pool_lookup, - (self.cluster, cstr(pool_name))) - if (ret >= 0): - return True - elif (ret == -errno.ENOENT): - return False - else: - raise make_ex(ret, "error looking up pool '%s'" % pool_name) - - @requires(('pool_name', str_type)) - def pool_lookup(self, pool_name): - """ - Returns a pool's ID based on its name. - - :param pool_name: name of the pool to look up - :type pool_name: str - - :raises: :class:`TypeError`, :class:`Error` - :returns: int - pool ID, or None if it doesn't exist - """ - self.require_state("connected") - ret = run_in_thread(self.librados.rados_pool_lookup, - (self.cluster, cstr(pool_name))) - if (ret >= 0): - return int(ret) - elif (ret == -errno.ENOENT): - return None - else: - raise make_ex(ret, "error looking up pool '%s'" % pool_name) - - @requires(('pool_id', int)) - def pool_reverse_lookup(self, pool_id): - """ - Returns a pool's name based on its ID. - - :param pool_id: ID of the pool to look up - :type pool_id: int - - :raises: :class:`TypeError`, :class:`Error` - :returns: string - pool name, or None if it doesn't exist - """ - self.require_state("connected") - size = c_size_t(512) - while True: - c_name = create_string_buffer(size.value) - ret = run_in_thread(self.librados.rados_pool_reverse_lookup, - (self.cluster, c_int64(pool_id), byref(c_name), size)) - if ret > size.value: - size = c_size_t(ret) - elif ret == -errno.ENOENT: - return None - elif ret < 0: - raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id) - else: - return decode_cstr(c_name.value) - break - - @requires(('pool_name', str_type), ('auid', opt(int)), ('crush_rule', opt(int))) - def create_pool(self, pool_name, auid=None, crush_rule=None): - """ - Create a pool: - - with default settings: if auid=None and crush_rule=None - - owned by a specific auid: auid given and crush_rule=None - - with a specific CRUSH rule: if auid=None and crush_rule given - - with a specific CRUSH rule and auid: if auid and crush_rule given - - :param pool_name: name of the pool to create - :type pool_name: str - :param auid: the id of the owner of the new pool - :type auid: int - :param crush_rule: rule to use for placement in the new pool - :type crush_rule: int - - :raises: :class:`TypeError`, :class:`Error` - """ - self.require_state("connected") - if auid is None: - if crush_rule is None: - ret = run_in_thread(self.librados.rados_pool_create, - (self.cluster, cstr(pool_name))) - else: - ret = run_in_thread(self.librados. - rados_pool_create_with_crush_rule, - (self.cluster, cstr(pool_name), - c_ubyte(crush_rule))) - - elif crush_rule is None: - ret = run_in_thread(self.librados.rados_pool_create_with_auid, - (self.cluster, cstr(pool_name), - c_uint64(auid))) - else: - ret = run_in_thread(self.librados.rados_pool_create_with_all, - (self.cluster, cstr(pool_name), - c_uint64(auid), c_ubyte(crush_rule))) - if ret < 0: - raise make_ex(ret, "error creating pool '%s'" % pool_name) - - @requires(('pool_id', int)) - def get_pool_base_tier(self, pool_id): - """ - Get base pool - - :returns: base pool, or pool_id if tiering is not configured for the pool - """ - self.require_state("connected") - base_tier = c_int64(0) - ret = run_in_thread(self.librados.rados_pool_get_base_tier, - (self.cluster, c_int64(pool_id), byref(base_tier))) - if ret < 0: - raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id) - return base_tier.value - - @requires(('pool_name', str_type)) - def delete_pool(self, pool_name): - """ - Delete a pool and all data inside it. - - The pool is removed from the cluster immediately, - but the actual data is deleted in the background. - - :param pool_name: name of the pool to delete - :type pool_name: str - - :raises: :class:`TypeError`, :class:`Error` - """ - self.require_state("connected") - ret = run_in_thread(self.librados.rados_pool_delete, - (self.cluster, cstr(pool_name))) - if ret < 0: - raise make_ex(ret, "error deleting pool '%s'" % pool_name) - - def list_pools(self): - """ - Gets a list of pool names. - - :returns: list - of pool names. - """ - self.require_state("connected") - size = c_size_t(512) - while True: - c_names = create_string_buffer(size.value) - ret = run_in_thread(self.librados.rados_pool_list, - (self.cluster, byref(c_names), size)) - if ret > size.value: - size = c_size_t(ret) - else: - break - - return [decode_cstr(name) for name in c_names.raw.split(b'\0') if len(name) > 0] - - def get_fsid(self): - """ - Get the fsid of the cluster as a hexadecimal string. - - :raises: :class:`Error` - :returns: str - cluster fsid - """ - self.require_state("connected") - buf_len = 37 - fsid = create_string_buffer(buf_len) - ret = run_in_thread(self.librados.rados_cluster_fsid, - (self.cluster, byref(fsid), c_size_t(buf_len))) - if ret < 0: - raise make_ex(ret, "error getting cluster fsid") - return fsid.value - - @requires(('ioctx_name', str_type)) - def open_ioctx(self, ioctx_name): - """ - Create an io context - - The io context allows you to perform operations within a particular - pool. - - :param ioctx_name: name of the pool - :type ioctx_name: str - - :raises: :class:`TypeError`, :class:`Error` - :returns: Ioctx - Rados Ioctx object - """ - self.require_state("connected") - ioctx = c_void_p() - ret = run_in_thread(self.librados.rados_ioctx_create, - (self.cluster, cstr(ioctx_name), byref(ioctx))) - if ret < 0: - raise make_ex(ret, "error opening pool '%s'" % ioctx_name) - return Ioctx(ioctx_name, self.librados, ioctx) - - def mon_command(self, cmd, inbuf, timeout=0, target=None): - """ - mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen) - returns (int ret, string outbuf, string outs) - """ - self.require_state("connected") - outbufp = pointer(pointer(c_char())) - outbuflen = c_long() - outsp = pointer(pointer(c_char())) - outslen = c_long() - cmdarr = (c_char_p * len(cmd))(*map(cstr, cmd)) - - if target: - ret = run_in_thread(self.librados.rados_mon_command_target, - (self.cluster, cstr(target), cmdarr, - len(cmd), c_char_p(inbuf), len(inbuf), - outbufp, byref(outbuflen), outsp, - byref(outslen)), timeout) - else: - ret = run_in_thread(self.librados.rados_mon_command, - (self.cluster, cmdarr, len(cmd), - c_char_p(inbuf), len(inbuf), - outbufp, byref(outbuflen), outsp, byref(outslen)), - timeout) - - # copy returned memory (ctypes makes a copy, not a reference) - my_outbuf = outbufp.contents[:(outbuflen.value)] - my_outs = decode_cstr(outsp.contents, outslen.value) - - # free callee's allocations - if outbuflen.value: - run_in_thread(self.librados.rados_buffer_free, (outbufp.contents,)) - if outslen.value: - run_in_thread(self.librados.rados_buffer_free, (outsp.contents,)) - - return (ret, my_outbuf, my_outs) - - def osd_command(self, osdid, cmd, inbuf, timeout=0): - """ - osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen) - returns (int ret, string outbuf, string outs) - """ - self.require_state("connected") - outbufp = pointer(pointer(c_char())) - outbuflen = c_long() - outsp = pointer(pointer(c_char())) - outslen = c_long() - cmdarr = (c_char_p * len(cmd))(*map(cstr, cmd)) - ret = run_in_thread(self.librados.rados_osd_command, - (self.cluster, osdid, cmdarr, len(cmd), - c_char_p(inbuf), len(inbuf), - outbufp, byref(outbuflen), outsp, byref(outslen)), - timeout) - - # copy returned memory (ctypes makes a copy, not a reference) - my_outbuf = outbufp.contents[:(outbuflen.value)] - my_outs = decode_cstr(outsp.contents, outslen.value) - - # free callee's allocations - if outbuflen.value: - run_in_thread(self.librados.rados_buffer_free, (outbufp.contents,)) - if outslen.value: - run_in_thread(self.librados.rados_buffer_free, (outsp.contents,)) - - return (ret, my_outbuf, my_outs) - - def pg_command(self, pgid, cmd, inbuf, timeout=0): - """ - pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen) - returns (int ret, string outbuf, string outs) - """ - self.require_state("connected") - outbufp = pointer(pointer(c_char())) - outbuflen = c_long() - outsp = pointer(pointer(c_char())) - outslen = c_long() - cmdarr = (c_char_p * len(cmd))(*map(cstr, cmd)) - ret = run_in_thread(self.librados.rados_pg_command, - (self.cluster, cstr(pgid), cmdarr, len(cmd), - c_char_p(inbuf), len(inbuf), - outbufp, byref(outbuflen), outsp, byref(outslen)), - timeout) - - # copy returned memory (ctypes makes a copy, not a reference) - my_outbuf = outbufp.contents[:(outbuflen.value)] - my_outs = decode_cstr(outsp.contents, outslen.value) - - # free callee's allocations - if outbuflen.value: - run_in_thread(self.librados.rados_buffer_free, (outbufp.contents,)) - if outslen.value: - run_in_thread(self.librados.rados_buffer_free, (outsp.contents,)) - - return (ret, my_outbuf, my_outs) - - def wait_for_latest_osdmap(self): - self.require_state("connected") - return run_in_thread(self.librados.rados_wait_for_latest_osdmap, (self.cluster,)) - - def blacklist_add(self, client_address, expire_seconds=0): - """ - Blacklist a client from the OSDs - - :param client_address: client address - :type client_address: str - :param expire_seconds: number of seconds to blacklist - :type expire_seconds: int - - :raises: :class:`Error` - """ - self.require_state("connected") - ret = run_in_thread(self.librados.rados_blacklist_add, - (self.cluster, cstr(client_address), - c_uint32(expire_seconds))) - if ret < 0: - raise make_ex(ret, "error blacklisting client '%s'" % client_address) - - -class OmapIterator(Iterator): - """Omap iterator""" - def __init__(self, ioctx, ctx): - self.ioctx = ioctx - self.ctx = ctx - - def __iter__(self): - return self - - def next(self): - return self.__next__() - - def __next__(self): - """ - Get the next key-value pair in the object - :returns: next rados.OmapItem - """ - key_ = c_char_p(0) - val_ = c_char_p(0) - len_ = c_int(0) - ret = run_in_thread(self.ioctx.librados.rados_omap_get_next, - (self.ctx, byref(key_), byref(val_), byref(len_))) - if (ret != 0): - raise make_ex(ret, "error iterating over the omap") - if key_.value is None: - raise StopIteration() - key = decode_cstr(key_) - val = None - if val_.value is not None: - val = ctypes.string_at(val_, len_) - return (key, val) - - def __del__(self): - run_in_thread(self.ioctx.librados.rados_omap_get_end, (self.ctx,)) - - -class ObjectIterator(Iterator): - """rados.Ioctx Object iterator""" - def __init__(self, ioctx): - self.ioctx = ioctx - self.ctx = c_void_p() - ret = run_in_thread(self.ioctx.librados.rados_nobjects_list_open, - (self.ioctx.io, byref(self.ctx))) - if ret < 0: - raise make_ex(ret, "error iterating over the objects in ioctx '%s'" - % self.ioctx.name) - - def __iter__(self): - return self - - def next(self): - return self.__next__() - - def __next__(self): - """ - Get the next object name and locator in the pool - - :raises: StopIteration - :returns: next rados.Ioctx Object - """ - key_ = c_char_p() - locator_ = c_char_p() - nspace_ = c_char_p() - ret = run_in_thread(self.ioctx.librados.rados_nobjects_list_next, - (self.ctx, byref(key_), byref(locator_), byref(nspace_))) - if ret < 0: - raise StopIteration() - - key = decode_cstr(key_) - locator = decode_cstr(locator_) - nspace = decode_cstr(nspace_) - return Object(self.ioctx, key, locator, nspace) - - def __del__(self): - run_in_thread(self.ioctx.librados.rados_nobjects_list_close, (self.ctx,)) - - -class XattrIterator(Iterator): - """Extended attribute iterator""" - def __init__(self, ioctx, it, oid): - self.ioctx = ioctx - self.it = it - self.oid = oid - - def __iter__(self): - return self - - def next(self): - return self.__next__() - - def __next__(self): - """ - Get the next xattr on the object - - :raises: StopIteration - :returns: pair - of name and value of the next Xattr - """ - name_ = c_char_p(0) - val_ = c_char_p(0) - len_ = c_int(0) - ret = run_in_thread(self.ioctx.librados.rados_getxattrs_next, - (self.it, byref(name_), byref(val_), byref(len_))) - if (ret != 0): - raise make_ex(ret, "error iterating over the extended attributes \ -in '%s'" % self.oid) - if name_.value is None: - raise StopIteration() - name = decode_cstr(name_) - val = ctypes.string_at(val_, len_) - return (name, val) - - def __del__(self): - run_in_thread(self.ioctx.librados.rados_getxattrs_end, (self.it,)) - - -class SnapIterator(Iterator): - """Snapshot iterator""" - def __init__(self, ioctx): - self.ioctx = ioctx - # We don't know how big a buffer we need until we've called the - # function. So use the exponential doubling strategy. - num_snaps = 10 - while True: - self.snaps = (ctypes.c_uint64 * num_snaps)() - ret = run_in_thread(self.ioctx.librados.rados_ioctx_snap_list, - (self.ioctx.io, self.snaps, c_int(num_snaps))) - if (ret >= 0): - self.max_snap = ret - break - elif (ret != -errno.ERANGE): - raise make_ex(ret, "error calling rados_snap_list for \ -ioctx '%s'" % self.ioctx.name) - num_snaps = num_snaps * 2 - self.cur_snap = 0 - - def __iter__(self): - return self - - def next(self): - return self.__next__() - - def __next__(self): - """ - Get the next Snapshot - - :raises: :class:`Error`, StopIteration - :returns: Snap - next snapshot - """ - if (self.cur_snap >= self.max_snap): - raise StopIteration - snap_id = self.snaps[self.cur_snap] - name_len = 10 - while True: - name = create_string_buffer(name_len) - ret = run_in_thread(self.ioctx.librados.rados_ioctx_snap_get_name, - (self.ioctx.io, c_uint64(snap_id), byref(name), - c_int(name_len))) - if (ret == 0): - name_len = ret - break - elif (ret != -errno.ERANGE): - raise make_ex(ret, "rados_snap_get_name error") - name_len = name_len * 2 - snap = Snap(self.ioctx, decode_cstr(name), snap_id) - self.cur_snap = self.cur_snap + 1 - return snap - - -class Snap(object): - """Snapshot object""" - def __init__(self, ioctx, name, snap_id): - self.ioctx = ioctx - self.name = name - self.snap_id = snap_id - - def __str__(self): - return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \ - % (str(self.ioctx), self.name, self.snap_id) - - def get_timestamp(self): - """ - Find when a snapshot in the current pool occurred - - :raises: :class:`Error` - :returns: datetime - the data and time the snapshot was created - """ - snap_time = c_long(0) - ret = run_in_thread(self.ioctx.librados.rados_ioctx_snap_get_stamp, - (self.ioctx.io, self.snap_id, byref(snap_time))) - if (ret != 0): - raise make_ex(ret, "rados_ioctx_snap_get_stamp error") - return datetime.fromtimestamp(snap_time.value) - - -class Completion(object): - """completion object""" - def __init__(self, ioctx, rados_comp, oncomplete, onsafe, - complete_cb, safe_cb): - self.rados_comp = rados_comp - self.oncomplete = oncomplete - self.onsafe = onsafe - self.ioctx = ioctx - self.complete_cb = complete_cb - self.safe_cb = safe_cb - - def is_safe(self): - """ - Is an asynchronous operation safe? - - This does not imply that the safe callback has finished. - - :returns: True if the operation is safe - """ - return run_in_thread(self.ioctx.librados.rados_aio_is_safe, - (self.rados_comp,)) == 1 - - def is_complete(self): - """ - Has an asynchronous operation completed? - - This does not imply that the safe callback has finished. - - :returns: True if the operation is completed - """ - return run_in_thread(self.ioctx.librados.rados_aio_is_complete, - (self.rados_comp,)) == 1 - - def wait_for_safe(self): - """ - Wait for an asynchronous operation to be marked safe - - This does not imply that the safe callback has finished. - """ - run_in_thread(self.ioctx.librados.rados_aio_wait_for_safe, - (self.rados_comp,)) - - def wait_for_complete(self): - """ - Wait for an asynchronous operation to complete - - This does not imply that the complete callback has finished. - """ - run_in_thread(self.ioctx.librados.rados_aio_wait_for_complete, - (self.rados_comp,)) - - def wait_for_safe_and_cb(self): - """ - Wait for an asynchronous operation to be marked safe and for - the safe callback to have returned - """ - run_in_thread(self.ioctx.librados.rados_aio_wait_for_safe_and_cb, - (self.rados_comp,)) - - def wait_for_complete_and_cb(self): - """ - Wait for an asynchronous operation to complete and for the - complete callback to have returned - - :returns: whether the operation is completed - """ - return run_in_thread( - self.ioctx.librados.rados_aio_wait_for_complete_and_cb, - (self.rados_comp,) - ) - - def get_return_value(self): - """ - Get the return value of an asychronous operation - - The return value is set when the operation is complete or safe, - whichever comes first. - - :returns: int - return value of the operation - """ - return run_in_thread(self.ioctx.librados.rados_aio_get_return_value, - (self.rados_comp,)) - - def __del__(self): - """ - Release a completion - - Call this when you no longer need the completion. It may not be - freed immediately if the operation is not acked and committed. - """ - run_in_thread(self.ioctx.librados.rados_aio_release, - (self.rados_comp,)) - - -class WriteOpCtx(object): - """write operation context manager""" - def __init__(self, ioctx): - self.ioctx = ioctx - - def __enter__(self): - self.ioctx.librados.rados_create_write_op.restype = c_void_p - ret = run_in_thread(self.ioctx.librados.rados_create_write_op, (None,)) - self.write_op = ret - return ret - - def __exit__(self, type, msg, traceback): - self.ioctx.librados.rados_release_write_op.argtypes = [c_void_p] - run_in_thread(self.ioctx.librados.rados_release_write_op, (c_void_p(self.write_op),)) - - -class ReadOpCtx(object): - """read operation context manager""" - def __init__(self, ioctx): - self.ioctx = ioctx - - def __enter__(self): - self.ioctx.librados.rados_create_read_op.restype = c_void_p - ret = run_in_thread(self.ioctx.librados.rados_create_read_op, (None,)) - self.read_op = ret - return ret - - def __exit__(self, type, msg, traceback): - self.ioctx.librados.rados_release_read_op.argtypes = [c_void_p] - run_in_thread(self.ioctx.librados.rados_release_read_op, (c_void_p(self.read_op),)) - - -RADOS_CB = CFUNCTYPE(c_int, c_void_p, c_void_p) - - -class Ioctx(object): - """rados.Ioctx object""" - def __init__(self, name, librados, io): - self.name = name - self.librados = librados - self.io = io - self.state = "open" - self.locator_key = "" - self.nspace = "" - self.safe_cbs = {} - self.complete_cbs = {} - self.lock = threading.Lock() - - def __enter__(self): - return self - - def __exit__(self, type_, value, traceback): - self.close() - return False - - def __del__(self): - self.close() - - def __aio_safe_cb(self, completion, _): - """ - Callback to onsafe() for asynchronous operations - """ - cb = None - with self.lock: - cb = self.safe_cbs[completion] - del self.safe_cbs[completion] - cb.onsafe(cb) - return 0 - - def __aio_complete_cb(self, completion, _): - """ - Callback to oncomplete() for asynchronous operations - """ - cb = None - with self.lock: - cb = self.complete_cbs[completion] - del self.complete_cbs[completion] - cb.oncomplete(cb) - return 0 - - def __get_completion(self, oncomplete, onsafe): - """ - Constructs a completion to use with asynchronous operations - - :param oncomplete: what to do when the write is safe and complete in memory - on all replicas - :type oncomplete: completion - :param onsafe: what to do when the write is safe and complete on storage - on all replicas - :type onsafe: completion - - :raises: :class:`Error` - :returns: completion object - """ - completion = c_void_p(0) - complete_cb = None - safe_cb = None - if oncomplete: - complete_cb = RADOS_CB(self.__aio_complete_cb) - if onsafe: - safe_cb = RADOS_CB(self.__aio_safe_cb) - ret = run_in_thread(self.librados.rados_aio_create_completion, - (c_void_p(0), complete_cb, safe_cb, - byref(completion))) - if ret < 0: - raise make_ex(ret, "error getting a completion") - with self.lock: - completion_obj = Completion(self, completion, oncomplete, onsafe, - complete_cb, safe_cb) - if oncomplete: - self.complete_cbs[completion.value] = completion_obj - if onsafe: - self.safe_cbs[completion.value] = completion_obj - return completion_obj - - def aio_write(self, object_name, to_write, offset=0, - oncomplete=None, onsafe=None): - """ - Write data to an object asynchronously - - Queues the write and returns. - - :param object_name: name of the object - :type object_name: str - :param to_write: data to write - :type to_write: str - :param offset: byte offset in the object to begin writing at - :type offset: int - :param oncomplete: what to do when the write is safe and complete in memory - on all replicas - :type oncomplete: completion - :param onsafe: what to do when the write is safe and complete on storage - on all replicas - :type onsafe: completion - - :raises: :class:`Error` - :returns: completion object - """ - completion = self.__get_completion(oncomplete, onsafe) - ret = run_in_thread(self.librados.rados_aio_write, - (self.io, cstr(object_name), - completion.rados_comp, c_char_p(to_write), - c_size_t(len(to_write)), c_uint64(offset))) - if ret < 0: - raise make_ex(ret, "error writing object %s" % object_name) - return completion - - def aio_write_full(self, object_name, to_write, - oncomplete=None, onsafe=None): - """ - Asychronously write an entire object - - The object is filled with the provided data. If the object exists, - it is atomically truncated and then written. - Queues the write and returns. - - :param object_name: name of the object - :type object_name: str - :param to_write: data to write - :type to_write: str - :param oncomplete: what to do when the write is safe and complete in memory - on all replicas - :type oncomplete: completion - :param onsafe: what to do when the write is safe and complete on storage - on all replicas - :type onsafe: completion - - :raises: :class:`Error` - :returns: completion object - """ - completion = self.__get_completion(oncomplete, onsafe) - ret = run_in_thread(self.librados.rados_aio_write_full, - (self.io, cstr(object_name), - completion.rados_comp, c_char_p(to_write), - c_size_t(len(to_write)))) - if ret < 0: - raise make_ex(ret, "error writing object %s" % object_name) - return completion - - def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None): - """ - Asychronously append data to an object - - Queues the write and returns. - - :param object_name: name of the object - :type object_name: str - :param to_append: data to append - :type to_append: str - :param offset: byte offset in the object to begin writing at - :type offset: int - :param oncomplete: what to do when the write is safe and complete in memory - on all replicas - :type oncomplete: completion - :param onsafe: what to do when the write is safe and complete on storage - on all replicas - :type onsafe: completion - - :raises: :class:`Error` - :returns: completion object - """ - completion = self.__get_completion(oncomplete, onsafe) - ret = run_in_thread(self.librados.rados_aio_append, - (self.io, cstr(object_name), - completion.rados_comp, c_char_p(to_append), - c_size_t(len(to_append)))) - if ret < 0: - raise make_ex(ret, "error appending to object %s" % object_name) - return completion - - def aio_flush(self): - """ - Block until all pending writes in an io context are safe - - :raises: :class:`Error` - """ - ret = run_in_thread(self.librados.rados_aio_flush, (self.io,)) - if ret < 0: - raise make_ex(ret, "error flushing") - - def aio_read(self, object_name, length, offset, oncomplete): - """ - Asychronously read data from an object - - oncomplete will be called with the returned read value as - well as the completion: - - oncomplete(completion, data_read) - - :param object_name: name of the object to read from - :type object_name: str - :param length: the number of bytes to read - :type length: int - :param offset: byte offset in the object to begin reading from - :type offset: int - :param oncomplete: what to do when the read is complete - :type oncomplete: completion - - :raises: :class:`Error` - :returns: completion object - """ - buf = create_string_buffer(length) - - def oncomplete_(completion_v): - return_value = completion_v.get_return_value() - return oncomplete(completion_v, - ctypes.string_at(buf, return_value) if return_value >= 0 else None) - - completion = self.__get_completion(oncomplete_, None) - ret = run_in_thread(self.librados.rados_aio_read, - (self.io, cstr(object_name), - completion.rados_comp, buf, c_size_t(length), - c_uint64(offset))) - if ret < 0: - raise make_ex(ret, "error reading %s" % object_name) - return completion - - def aio_remove(self, object_name, oncomplete=None, onsafe=None): - """ - Asychronously remove an object - - :param object_name: name of the object to remove - :type object_name: str - :param oncomplete: what to do when the remove is safe and complete in memory - on all replicas - :type oncomplete: completion - :param onsafe: what to do when the remove is safe and complete on storage - on all replicas - :type onsafe: completion - - :raises: :class:`Error` - :returns: completion object - """ - completion = self.__get_completion(oncomplete, onsafe) - ret = run_in_thread(self.librados.rados_aio_remove, - (self.io, cstr(object_name), - completion.rados_comp)) - if ret < 0: - raise make_ex(ret, "error removing %s" % object_name) - return completion - - def require_ioctx_open(self): - """ - Checks if the rados.Ioctx object state is 'open' - - :raises: IoctxStateError - """ - if self.state != "open": - raise IoctxStateError("The pool is %s" % self.state) - - def change_auid(self, auid): - """ - Attempt to change an io context's associated auid "owner." - - Requires that you have write permission on both the current and new - auid. - - :raises: :class:`Error` - """ - self.require_ioctx_open() - ret = run_in_thread(self.librados.rados_ioctx_pool_set_auid, - (self.io, ctypes.c_uint64(auid))) - if ret < 0: - raise make_ex(ret, "error changing auid of '%s' to %d" - % (self.name, auid)) - - @requires(('loc_key', str_type)) - def set_locator_key(self, loc_key): - """ - Set the key for mapping objects to pgs within an io context. - - The key is used instead of the object name to determine which - placement groups an object is put in. This affects all subsequent - operations of the io context - until a different locator key is - set, all objects in this io context will be placed in the same pg. - - :param loc_key: the key to use as the object locator, or NULL to discard - any previously set key - :type loc_key: str - - :raises: :class:`TypeError` - """ - self.require_ioctx_open() - run_in_thread(self.librados.rados_ioctx_locator_set_key, - (self.io, cstr(loc_key))) - self.locator_key = loc_key - - def get_locator_key(self): - """ - Get the locator_key of context - - :returns: locator_key - """ - return self.locator_key - - @requires(('snap_id', int)) - def set_read(self, snap_id): - """ - Set the snapshot for reading objects. - - To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD) - - :param snap_id: the snapshot Id - :type snap_id: int - - :raises: :class:`TypeError` - """ - self.require_ioctx_open() - run_in_thread(self.librados.rados_ioctx_snap_set_read, - (self.io, c_uint64(snap_id))) - - @requires(('nspace', str_type)) - def set_namespace(self, nspace): - """ - Set the namespace for objects within an io context. - - The namespace in addition to the object name fully identifies - an object. This affects all subsequent operations of the io context - - until a different namespace is set, all objects in this io context - will be placed in the same namespace. - - :param nspace: the namespace to use, or None/"" for the default namespace - :type nspace: str - - :raises: :class:`TypeError` - """ - self.require_ioctx_open() - if nspace is None: - nspace = "" - run_in_thread(self.librados.rados_ioctx_set_namespace, - (self.io, cstr(nspace))) - self.nspace = nspace - - def get_namespace(self): - """ - Get the namespace of context - - :returns: namespace - """ - return self.nspace - - def close(self): - """ - Close a rados.Ioctx object. - - This just tells librados that you no longer need to use the io context. - It may not be freed immediately if there are pending asynchronous - requests on it, but you should not use an io context again after - calling this function on it. - """ - if self.state == "open": - self.require_ioctx_open() - run_in_thread(self.librados.rados_ioctx_destroy, (self.io,)) - self.state = "closed" - - - @requires(('key', str_type), ('data', bytes)) - def write(self, key, data, offset=0): - """ - Write data to an object synchronously - - :param key: name of the object - :type key: str - :param data: data to write - :type data: bytes - :param offset: byte offset in the object to begin writing at - :type offset: int - - :raises: :class:`TypeError` - :raises: :class:`LogicError` - :returns: int - 0 on success - """ - self.require_ioctx_open() - length = len(data) - ret = run_in_thread(self.librados.rados_write, - (self.io, cstr(key), c_char_p(data), - c_size_t(length), c_uint64(offset))) - if ret == 0: - return ret - elif ret < 0: - raise make_ex(ret, "Ioctx.write(%s): failed to write %s" - % (self.name, key)) - else: - raise LogicError("Ioctx.write(%s): rados_write \ -returned %d, but should return zero on success." % (self.name, ret)) - - @requires(('key', str_type), ('data', bytes)) - def write_full(self, key, data): - """ - Write an entire object synchronously. - - The object is filled with the provided data. If the object exists, - it is atomically truncated and then written. - - :param key: name of the object - :type key: str - :param data: data to write - :type data: bytes - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: int - 0 on success - """ - self.require_ioctx_open() - length = len(data) - ret = run_in_thread(self.librados.rados_write_full, - (self.io, cstr(key), c_char_p(data), - c_size_t(length))) - if ret == 0: - return ret - elif ret < 0: - raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s" - % (self.name, key)) - else: - raise LogicError("Ioctx.write_full(%s): rados_write_full \ -returned %d, but should return zero on success." % (self.name, ret)) - - @requires(('key', str_type), ('data', bytes)) - def append(self, key, data): - """ - Append data to an object synchronously - - :param key: name of the object - :type key: str - :param data: data to write - :type data: bytes - - :raises: :class:`TypeError` - :raises: :class:`LogicError` - :returns: int - 0 on success - """ - self.require_ioctx_open() - length = len(data) - ret = run_in_thread(self.librados.rados_append, - (self.io, cstr(key), c_char_p(data), - c_size_t(length))) - if ret == 0: - return ret - elif ret < 0: - raise make_ex(ret, "Ioctx.append(%s): failed to append %s" - % (self.name, key)) - else: - raise LogicError("Ioctx.append(%s): rados_append \ -returned %d, but should return zero on success." % (self.name, ret)) - - @requires(('key', str_type)) - def read(self, key, length=8192, offset=0): - """ - Read data from an object synchronously - - :param key: name of the object - :type key: str - :param length: the number of bytes to read (default=8192) - :type length: int - :param offset: byte offset in the object to begin reading at - :type offset: int - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: str - data read from object - """ - self.require_ioctx_open() - ret_buf = create_string_buffer(length) - ret = run_in_thread(self.librados.rados_read, - (self.io, cstr(key), ret_buf, c_size_t(length), - c_uint64(offset))) - if ret < 0: - raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key)) - return ctypes.string_at(ret_buf, ret) - - @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes)) - def execute(self, key, cls, method, data, length=8192): - """ - Execute an OSD class method on an object. - - :param key: name of the object - :type key: str - :param cls: name of the object class - :type cls: str - :param method: name of the method - :type method: str - :param data: input data - :type data: bytes - :param length: size of output buffer in bytes (default=8291) - :type length: int - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: (ret, method output) - """ - self.require_ioctx_open() - ret_buf = create_string_buffer(length) - ret = run_in_thread(self.librados.rados_exec, - (self.io, cstr(key), cstr(cls), cstr(method), - c_char_p(data), c_size_t(len(data)), ret_buf, - c_size_t(length))) - if ret < 0: - raise make_ex(ret, "Ioctx.exec(%s): failed to exec %s:%s on %s" % - (self.name, cls, method, key)) - return ret, ctypes.string_at(ret_buf, min(ret, length)) - - def get_stats(self): - """ - Get pool usage statistics - - :returns: dict - contains the following keys: - - - ``num_bytes`` (int) - size of pool in bytes - - - ``num_kb`` (int) - size of pool in kbytes - - - ``num_objects`` (int) - number of objects in the pool - - - ``num_object_clones`` (int) - number of object clones - - - ``num_object_copies`` (int) - number of object copies - - - ``num_objects_missing_on_primary`` (int) - number of objets - missing on primary - - - ``num_objects_unfound`` (int) - number of unfound objects - - - ``num_objects_degraded`` (int) - number of degraded objects - - - ``num_rd`` (int) - bytes read - - - ``num_rd_kb`` (int) - kbytes read - - - ``num_wr`` (int) - bytes written - - - ``num_wr_kb`` (int) - kbytes written - """ - self.require_ioctx_open() - stats = rados_pool_stat_t() - ret = run_in_thread(self.librados.rados_ioctx_pool_stat, - (self.io, byref(stats))) - if ret < 0: - raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name) - return {'num_bytes': stats.num_bytes, - 'num_kb': stats.num_kb, - 'num_objects': stats.num_objects, - 'num_object_clones': stats.num_object_clones, - 'num_object_copies': stats.num_object_copies, - "num_objects_missing_on_primary": stats.num_objects_missing_on_primary, - "num_objects_unfound": stats.num_objects_unfound, - "num_objects_degraded": stats.num_objects_degraded, - "num_rd": stats.num_rd, - "num_rd_kb": stats.num_rd_kb, - "num_wr": stats.num_wr, - "num_wr_kb": stats.num_wr_kb} - - @requires(('key', str_type)) - def remove_object(self, key): - """ - Delete an object - - This does not delete any snapshots of the object. - - :param key: the name of the object to delete - :type key: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: bool - True on success - """ - self.require_ioctx_open() - ret = run_in_thread(self.librados.rados_remove, - (self.io, cstr(key))) - if ret < 0: - raise make_ex(ret, "Failed to remove '%s'" % key) - return True - - @requires(('key', str_type)) - def trunc(self, key, size): - """ - Resize an object - - If this enlarges the object, the new area is logically filled with - zeroes. If this shrinks the object, the excess data is removed. - - :param key: the name of the object to resize - :type key: str - :param size: the new size of the object in bytes - :type size: int - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: int - 0 on success, otherwise raises error - """ - - self.require_ioctx_open() - ret = run_in_thread(self.librados.rados_trunc, - (self.io, cstr(key), c_uint64(size))) - if ret < 0: - raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key)) - return ret - - @requires(('key', str_type)) - def stat(self, key): - """ - Get object stats (size/mtime) - - :param key: the name of the object to get stats from - :type key: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: (size,timestamp) - """ - self.require_ioctx_open() - psize = c_uint64() - pmtime = c_uint64() - - ret = run_in_thread(self.librados.rados_stat, - (self.io, cstr(key), pointer(psize), - pointer(pmtime))) - if ret < 0: - raise make_ex(ret, "Failed to stat %r" % key) - return psize.value, time.localtime(pmtime.value) - - @requires(('key', str_type), ('xattr_name', str_type)) - def get_xattr(self, key, xattr_name): - """ - Get the value of an extended attribute on an object. - - :param key: the name of the object to get xattr from - :type key: str - :param xattr_name: which extended attribute to read - :type xattr_name: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: str - value of the xattr - """ - self.require_ioctx_open() - ret_length = 4096 - while ret_length < 4096 * 1024 * 1024: - ret_buf = create_string_buffer(ret_length) - ret = run_in_thread(self.librados.rados_getxattr, - (self.io, cstr(key), cstr(xattr_name), - ret_buf, c_size_t(ret_length))) - if (ret == -errno.ERANGE): - ret_length *= 2 - elif ret < 0: - raise make_ex(ret, "Failed to get xattr %r" % xattr_name) - else: - break - return ctypes.string_at(ret_buf, ret) - - @requires(('oid', str_type)) - def get_xattrs(self, oid): - """ - Start iterating over xattrs on an object. - - :param oid: the name of the object to get xattrs from - :type oid: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: XattrIterator - """ - self.require_ioctx_open() - it = c_void_p(0) - ret = run_in_thread(self.librados.rados_getxattrs, - (self.io, cstr(oid), byref(it))) - if ret != 0: - raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid) - return XattrIterator(self, it, oid) - - @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes)) - def set_xattr(self, key, xattr_name, xattr_value): - """ - Set an extended attribute on an object. - - :param key: the name of the object to set xattr to - :type key: str - :param xattr_name: which extended attribute to set - :type xattr_name: str - :param xattr_value: the value of the extended attribute - :type xattr_value: bytes - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: bool - True on success, otherwise raise an error - """ - self.require_ioctx_open() - ret = run_in_thread(self.librados.rados_setxattr, - (self.io, cstr(key), cstr(xattr_name), - c_char_p(xattr_value), c_size_t(len(xattr_value)))) - if ret < 0: - raise make_ex(ret, "Failed to set xattr %r" % xattr_name) - return True - - @requires(('key', str_type), ('xattr_name', str_type)) - def rm_xattr(self, key, xattr_name): - """ - Removes an extended attribute on from an object. - - :param key: the name of the object to remove xattr from - :type key: str - :param xattr_name: which extended attribute to remove - :type xattr_name: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: bool - True on success, otherwise raise an error - """ - self.require_ioctx_open() - ret = run_in_thread(self.librados.rados_rmxattr, - (self.io, cstr(key), cstr(xattr_name))) - if ret < 0: - raise make_ex(ret, "Failed to delete key %r xattr %r" % - (key, xattr_name)) - return True - - def list_objects(self): - """ - Get ObjectIterator on rados.Ioctx object. - - :returns: ObjectIterator - """ - self.require_ioctx_open() - return ObjectIterator(self) - - def list_snaps(self): - """ - Get SnapIterator on rados.Ioctx object. - - :returns: SnapIterator - """ - self.require_ioctx_open() - return SnapIterator(self) - - @requires(('snap_name', str_type)) - def create_snap(self, snap_name): - """ - Create a pool-wide snapshot - - :param snap_name: the name of the snapshot - :type snap_name: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - """ - self.require_ioctx_open() - ret = run_in_thread(self.librados.rados_ioctx_snap_create, - (self.io, cstr(snap_name))) - if (ret != 0): - raise make_ex(ret, "Failed to create snap %s" % snap_name) - - @requires(('snap_name', str_type)) - def remove_snap(self, snap_name): - """ - Removes a pool-wide snapshot - - :param snap_name: the name of the snapshot - :type snap_name: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - """ - self.require_ioctx_open() - ret = run_in_thread(self.librados.rados_ioctx_snap_remove, - (self.io, cstr(snap_name))) - if (ret != 0): - raise make_ex(ret, "Failed to remove snap %s" % snap_name) - - @requires(('snap_name', str_type)) - def lookup_snap(self, snap_name): - """ - Get the id of a pool snapshot - - :param snap_name: the name of the snapshot to lookop - :type snap_name: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - :returns: Snap - on success - """ - self.require_ioctx_open() - snap_id = c_uint64() - ret = run_in_thread(self.librados.rados_ioctx_snap_lookup, - (self.io, cstr(snap_name), byref(snap_id))) - if (ret != 0): - raise make_ex(ret, "Failed to lookup snap %s" % snap_name) - return Snap(self, snap_name, snap_id) - - @requires(('oid', str_type), ('snap_name', str_type)) - def snap_rollback(self, oid, snap_name): - """ - Rollback an object to a snapshot - - :param oid: the name of the object - :type oid: str - :param snap_name: the name of the snapshot - :type snap_name: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - """ - self.require_ioctx_open() - ret = run_in_thread(self.librados.rados_ioctx_snap_rollback, - (self.io, cstr(oid), cstr(snap_name))) - if (ret != 0): - raise make_ex(ret, "Failed to rollback %s" % oid) - - def get_last_version(self): - """ - Return the version of the last object read or written to. - - This exposes the internal version number of the last object read or - written via this io context - - :returns: version of the last object used - """ - self.require_ioctx_open() - return run_in_thread(self.librados.rados_get_last_version, (self.io,)) - - def create_write_op(self): - """ - create write operation object. - need call release_write_op after use - """ - self.librados.rados_create_write_op.restype = c_void_p - return run_in_thread(self.librados.rados_create_write_op, (None,)) - - def create_read_op(self): - """ - create read operation object. - need call release_read_op after use - """ - self.librados.rados_create_read_op.restype = c_void_p - return run_in_thread(self.librados.rados_create_read_op, (None,)) - - def release_write_op(self, write_op): - """ - release memory alloc by create_write_op - """ - self.librados.rados_release_write_op.argtypes = [c_void_p] - run_in_thread(self.librados.rados_release_write_op, (c_void_p(write_op),)) - - def release_read_op(self, read_op): - """ - release memory alloc by create_read_op - :para read_op: read_op object - :type: int - """ - self.librados.rados_release_read_op.argtypes = [c_void_p] - run_in_thread(self.librados.rados_release_read_op, (c_void_p(read_op),)) - - @requires(('write_op', int), ('keys', tuple), ('values', tuple)) - def set_omap(self, write_op, keys, values): - """ - set keys values to write_op - :para write_op: write_operation object - :type write_op: int - :para keys: a tuple of keys - :type keys: tuple - :para values: a tuple of values - :type values: tuple - """ - if len(keys) != len(values): - raise Error("Rados(): keys and values must have the same number of items") - key_num = len(keys) - key_array_type = c_char_p*key_num - key_array = key_array_type() - key_array[:] = [cstr(key) for key in keys] - - value_array_type = c_char_p*key_num - value_array = value_array_type() - value_array[:] = values - - lens_array_type = c_size_t*key_num - lens_array = lens_array_type() - for index, value in enumerate(values): - lens_array[index] = c_size_t(len(value)) - - run_in_thread(self.librados.rados_write_op_omap_set, - (c_void_p(write_op), byref(key_array), byref(value_array), - byref(lens_array), c_int(key_num),)) - - @requires(('write_op', int), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int))) - def operate_write_op(self, write_op, oid, mtime=0, flags=0): - """ - excute the real write operation - :para write_op: write operation object - :type write_op: int - :para oid: object name - :type oid: str - :para mtime: the time to set the mtime to, 0 for the current time - :type mtime: int - :para flags: flags to apply to the entire operation - :type flags: int - """ - run_in_thread(self.librados.rados_write_op_operate, - (c_void_p(write_op), self.io, cstr(oid), - c_long(mtime), c_int(flags),)) - - @requires(('read_op', int), ('oid', str_type), ('flag', opt(int))) - def operate_read_op(self, read_op, oid, flag=0): - """ - excute the real read operation - :para read_op: read operation object - :type read_op: int - :para oid: object name - :type oid: str - :para flag: flags to apply to the entire operation - :type flag: int - """ - run_in_thread(self.librados.rados_read_op_operate, - (c_void_p(read_op), self.io, cstr(oid), c_int(flag),)) - - @requires(('read_op', int), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int)) - def get_omap_vals(self, read_op, start_after, filter_prefix, max_return): - """ - get the omap values - :para read_op: read operation object - :type read_op: int - :para start_after: list keys starting after start_after - :type start_after: str - :para filter_prefix: list only keys beginning with filter_prefix - :type filter_prefix: str - :para max_return: list no more than max_return key/value pairs - :type max_return: int - :returns: an iterator over the the requested omap values, return value from this action - """ - prval = c_int() - iter_addr = c_void_p() - run_in_thread(self.librados.rados_read_op_omap_get_vals, - (c_void_p(read_op), cstr(start_after), - cstr(filter_prefix), c_int(max_return), - byref(iter_addr), pointer(prval))) - return OmapIterator(self, iter_addr), prval.value - - @requires(('read_op', int), ('start_after', str_type), ('max_return', int)) - def get_omap_keys(self, read_op, start_after, max_return): - """ - get the omap keys - :para read_op: read operation object - :type read_op: int - :para start_after: list keys starting after start_after - :type start_after: str - :para max_return: list no more than max_return key/value pairs - :type max_return: int - :returns: an iterator over the the requested omap values, return value from this action - """ - prval = c_int() - iter_addr = c_void_p() - run_in_thread(self.librados.rados_read_op_omap_get_keys, - (c_void_p(read_op), cstr(start_after), - c_int(max_return), byref(iter_addr), pointer(prval))) - return OmapIterator(self, iter_addr), prval.value - - @requires(('read_op', int), ('keys', tuple)) - def get_omap_vals_by_keys(self, read_op, keys): - """ - get the omap values by keys - :para read_op: read operation object - :type read_op: int - :para keys: input key tuple - :type keys: tuple - :returns: an iterator over the the requested omap values, return value from this action - """ - prval = c_int() - iter_addr = c_void_p() - key_num = len(keys) - key_array_type = c_char_p*key_num - key_array = key_array_type() - key_array[:] = [cstr(key) for key in keys] - run_in_thread(self.librados.rados_read_op_omap_get_vals_by_keys, - (c_void_p(read_op), byref(key_array), c_int(key_num), - byref(iter_addr), pointer(prval))) - return OmapIterator(self, iter_addr), prval.value - - @requires(('write_op', int), ('keys', tuple)) - def remove_omap_keys(self, write_op, keys): - """ - remove omap keys specifiled - :para write_op: write operation object - :type write_op: int - :para keys: input key tuple - :type keys: tuple - """ - key_num = len(keys) - key_array_type = c_char_p*key_num - key_array = key_array_type() - key_array[:] = [cstr(key) for key in keys] - run_in_thread(self.librados.rados_write_op_omap_rm_keys, - (c_void_p(write_op), byref(key_array), c_int(key_num))) - - @requires(('write_op', int)) - def clear_omap(self, write_op): - """ - Remove all key/value pairs from an object - :para write_op: write operation object - :type write_op: int - """ - run_in_thread(self.librados.rados_write_op_omap_clear, - (c_void_p(write_op),)) - - @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type), - ('duration', opt(int)), ('flags', int)) - def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0): - - """ - Take an exclusive lock on an object - - :param key: name of the object - :type key: str - :param name: name of the lock - :type name: str - :param cookie: cookie of the lock - :type cookie: str - :param desc: description of the lock - :type desc: str - :param duration: duration of the lock in seconds - :type duration: int - :param flags: flags - :type flags: int - - :raises: :class:`TypeError` - :raises: :class:`Error` - """ - self.require_ioctx_open() - - ret = run_in_thread(self.librados.rados_lock_exclusive, - (self.io, cstr(key), cstr(name), cstr(cookie), - cstr(desc), - timeval(duration, None) if duration is None else None, - c_uint8(flags))) - if ret < 0: - raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key)) - - @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type), - ('desc', str_type), ('duration', opt(int)), ('flags', int)) - def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0): - - """ - Take a shared lock on an object - - :param key: name of the object - :type key: str - :param name: name of the lock - :type name: str - :param cookie: cookie of the lock - :type cookie: str - :param tag: tag of the lock - :type tag: str - :param desc: description of the lock - :type desc: str - :param duration: duration of the lock in seconds - :type duration: int - :param flags: flags - :type flags: int - - :raises: :class:`TypeError` - :raises: :class:`Error` - """ - self.require_ioctx_open() - - ret = run_in_thread(self.librados.rados_lock_shared, - (self.io, cstr(key), cstr(name), cstr(cookie), - cstr(tag), cstr(desc), - timeval(duration, None) if duration is None else None, - c_uint8(flags))) - if ret < 0: - raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key)) - - @requires(('key', str_type), ('name', str_type), ('cookie', str_type)) - def unlock(self, key, name, cookie): - - """ - Release a shared or exclusive lock on an object - - :param key: name of the object - :type key: str - :param name: name of the lock - :type name: str - :param cookie: cookie of the lock - :type cookie: str - - :raises: :class:`TypeError` - :raises: :class:`Error` - """ - self.require_ioctx_open() - - ret = run_in_thread(self.librados.rados_unlock, - (self.io, cstr(key), cstr(name), cstr(cookie))) - if ret < 0: - raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key)) - - -def set_object_locator(func): - def retfunc(self, *args, **kwargs): - if self.locator_key is not None: - old_locator = self.ioctx.get_locator_key() - self.ioctx.set_locator_key(self.locator_key) - retval = func(self, *args, **kwargs) - self.ioctx.set_locator_key(old_locator) - return retval - else: - return func(self, *args, **kwargs) - return retfunc - - -def set_object_namespace(func): - def retfunc(self, *args, **kwargs): - if self.nspace is None: - raise LogicError("Namespace not set properly in context") - old_nspace = self.ioctx.get_namespace() - self.ioctx.set_namespace(self.nspace) - retval = func(self, *args, **kwargs) - self.ioctx.set_namespace(old_nspace) - return retval - return retfunc - - -class Object(object): - """Rados object wrapper, makes the object look like a file""" - def __init__(self, ioctx, key, locator_key=None, nspace=None): - self.key = key - self.ioctx = ioctx - self.offset = 0 - self.state = "exists" - self.locator_key = locator_key - self.nspace = "" if nspace is None else nspace - - def __str__(self): - return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \ - (str(self.ioctx), self.key, "--default--" - if self.nspace is "" else self.nspace, self.locator_key) - - def require_object_exists(self): - if self.state != "exists": - raise ObjectStateError("The object is %s" % self.state) - - @set_object_locator - @set_object_namespace - def read(self, length=1024 * 1024): - self.require_object_exists() - ret = self.ioctx.read(self.key, length, self.offset) - self.offset += len(ret) - return ret - - @set_object_locator - @set_object_namespace - def write(self, string_to_write): - self.require_object_exists() - ret = self.ioctx.write(self.key, string_to_write, self.offset) - if ret == 0: - self.offset += len(string_to_write) - return ret - - @set_object_locator - @set_object_namespace - def remove(self): - self.require_object_exists() - self.ioctx.remove_object(self.key) - self.state = "removed" - - @set_object_locator - @set_object_namespace - def stat(self): - self.require_object_exists() - return self.ioctx.stat(self.key) - - def seek(self, position): - self.require_object_exists() - self.offset = position - - @set_object_locator - @set_object_namespace - def get_xattr(self, xattr_name): - self.require_object_exists() - return self.ioctx.get_xattr(self.key, xattr_name) - - @set_object_locator - @set_object_namespace - def get_xattrs(self): - self.require_object_exists() - return self.ioctx.get_xattrs(self.key) - - @set_object_locator - @set_object_namespace - def set_xattr(self, xattr_name, xattr_value): - self.require_object_exists() - return self.ioctx.set_xattr(self.key, xattr_name, xattr_value) - - @set_object_locator - @set_object_namespace - def rm_xattr(self, xattr_name): - self.require_object_exists() - return self.ioctx.rm_xattr(self.key, xattr_name) - -MONITOR_LEVELS = [ - "debug", - "info", - "warn", "warning", - "err", "error", - "sec", - ] - - -class MonitorLog(object): - """ - For watching cluster log messages. Instantiate an object and keep - it around while callback is periodically called. Construct with - 'level' to monitor 'level' messages (one of MONITOR_LEVELS). - arg will be passed to the callback. - - callback will be called with: - arg (given to __init__) - line (the full line, including timestamp, who, level, msg) - who (which entity issued the log message) - timestamp_sec (sec of a struct timespec) - timestamp_nsec (sec of a struct timespec) - seq (sequence number) - level (string representing the level of the log message) - msg (the message itself) - callback's return value is ignored - """ - - def monitor_log_callback(self, arg, line, who, sec, nsec, seq, level, msg): - """ - Local callback wrapper, in case we decide to do something - """ - self.callback(arg, line, who, sec, nsec, seq, level, msg) - return 0 - - def __init__(self, cluster, level, callback, arg): - if level not in MONITOR_LEVELS: - raise LogicError("invalid monitor level " + level) - if not callable(callback): - raise LogicError("callback must be a callable function") - self.level = level - self.callback = callback - self.arg = arg - callback_factory = CFUNCTYPE(c_int, # return type (really void) - c_void_p, # arg - c_char_p, # line - c_char_p, # who - c_uint64, # timestamp_sec - c_uint64, # timestamp_nsec - c_ulong, # seq - c_char_p, # level - c_char_p) # msg - self.internal_callback = callback_factory(self.monitor_log_callback) - - r = run_in_thread(cluster.librados.rados_monitor_log, - (cluster.cluster, level, self.internal_callback, arg)) - if r: - raise make_ex(r, 'error calling rados_monitor_log')