from collections.abc import Callable
except ImportError:
from collections import Callable
-from datetime import datetime
+from datetime import datetime, timedelta
from functools import partial, wraps
from itertools import chain
uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
+ ctypedef void (*rados_watchcb2_t)(void *arg, int64_t notify_id,
+ uint64_t handle, uint64_t notifier_id,
+ void *data, size_t data_len)
+ ctypedef void (*rados_watcherrcb_t)(void *pre, uint64_t cookie, int err)
cdef struct rados_cluster_stat_t:
int rados_conf_set(rados_t cluster, char *option, const char *value)
int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
+ rados_t rados_ioctx_get_cluster(rados_ioctx_t io)
int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
void rados_omap_get_end(rados_omap_iter_t iter)
int rados_notify2(rados_ioctx_t io, const char * o, const char *buf, int buf_len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len)
+ int rados_notify_ack(rados_ioctx_t io, const char *o, uint64_t notify_id, uint64_t cookie, const char *buf, int buf_len)
+ int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie, rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, uint32_t timeout, void *arg)
+ int rados_watch_check(rados_ioctx_t io, uint64_t cookie)
+ int rados_unwatch2(rados_ioctx_t io, uint64_t cookie)
+ int rados_watch_flush(rados_t cluster)
LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
super(NoSpace, self).__init__(
"RADOS no space (%s)" % message, errno)
+class NotConnected(OSError):
+ """ `NotConnected` class, derived from `OSError` """
+ def __init__(self, message, errno=None):
+ super(NotConnected, self).__init__(
+ "RADOS not connected (%s)" % message, errno)
class RadosStateError(Error):
""" `RadosStateError` class, derived from `Error` """
errno.EINPROGRESS : InProgress,
errno.EISCONN : IsConnected,
errno.EINVAL : InvalidArgumentError,
+ errno.ENOTCONN : NotConnected,
}
ELSE:
cdef errno_to_exception = {
errno.EINPROGRESS : InProgress,
errno.EISCONN : IsConnected,
errno.EINVAL : InvalidArgumentError,
+ errno.ENOTCONN : NotConnected,
}
"""read operation context manager"""
+cdef void __watch_callback(void *_arg, int64_t _notify_id, uint64_t _cookie,
+ uint64_t _notifier_id, void *_data,
+ size_t _data_len) with gil:
+ """
+ Watch callback
+ """
+ cdef object watch = <object>_arg
+ data = None
+ if _data != NULL:
+ data = (<char *>_data)[:_data_len]
+ watch._callback(_notify_id, _notifier_id, _cookie, data)
+
+cdef void __watch_error_callback(void *_arg, uint64_t _cookie,
+ int _error) with gil:
+ """
+ Watch error callback
+ """
+ cdef object watch = <object>_arg
+ watch._error_callback(_cookie, _error)
+
+
+cdef class Watch(object):
+ """watch object"""
+
+ cdef:
+ object id
+ Ioctx ioctx
+ object oid
+ object callback
+ object error_callback
+
+ def __cinit__(self, Ioctx ioctx, object oid, object callback,
+ object error_callback, object timeout):
+ self.id = 0
+ self.ioctx = ioctx.dup()
+ self.oid = cstr(oid, 'oid')
+ self.callback = callback
+ self.error_callback = error_callback
+
+ if timeout is None:
+ timeout = 0
+
+ cdef:
+ char *_oid = self.oid
+ uint64_t _cookie;
+ uint32_t _timeout = timeout;
+ void *_args = <PyObject*>self
+
+ with nogil:
+ ret = rados_watch3(self.ioctx.io, _oid, &_cookie,
+ <rados_watchcb2_t>&__watch_callback,
+ <rados_watcherrcb_t>&__watch_error_callback,
+ _timeout, _args)
+ if ret < 0:
+ raise make_ex(ret, "watch error")
+
+ self.id = int(_cookie);
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type_, value, traceback):
+ self.close()
+ return False
+
+ def __dealloc__(self):
+ self.close()
+
+ def _callback(self, notify_id, notifier_id, watch_id, data):
+ replay = self.callback(notify_id, notifier_id, watch_id, data)
+
+ cdef:
+ rados_ioctx_t _io = <rados_ioctx_t>self.ioctx.io
+ char *_obj = self.oid
+ int64_t _notify_id = notify_id
+ uint64_t _cookie = watch_id
+ char *_replay = NULL
+ int _replay_len = 0
+
+ if replay is not None:
+ replay = cstr(replay, 'replay')
+ _replay = replay
+ _replaylen = len(replay)
+
+ with nogil:
+ rados_notify_ack(_io, _obj, _notify_id, _cookie, _replay,
+ _replaylen)
+
+ def _error_callback(self, watch_id, error):
+ if self.error_callback is None:
+ return
+ self.error_callback(watch_id, error)
+
+ def get_id(self):
+ return self.id
+
+ def check(self):
+ """
+ Check on watch validity.
+
+ :raises: :class:`Error`
+ :returns: timedelta since last confirmed valid
+ """
+ self.ioctx.require_ioctx_open()
+
+ cdef:
+ uint64_t _cookie = self.id
+
+ with nogil:
+ ret = rados_watch_check(self.ioctx.io, _cookie)
+ if ret < 0:
+ raise make_ex(ret, "check error")
+
+ return timedelta(milliseconds=ret)
+
+ def close(self):
+ """
+ Unregister an interest in an object.
+
+ :raises: :class:`Error`
+ """
+ if self.id == 0:
+ return
+
+ self.ioctx.require_ioctx_open()
+
+ cdef:
+ uint64_t _cookie = self.id
+
+ with nogil:
+ ret = rados_unwatch2(self.ioctx.io, _cookie)
+ if ret < 0 and ret != -errno.ENOENT:
+ raise make_ex(ret, "unwatch error")
+ self.id = 0
+
+ with nogil:
+ cluster = rados_ioctx_get_cluster(self.ioctx.io)
+ ret = rados_watch_flush(cluster);
+ if ret < 0:
+ raise make_ex(ret, "watch_flush error")
+
+ self.ioctx.close()
+
+
cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
"""
Callback to oncomplete() for asynchronous operations
cb._complete()
return 0
-
cdef class Ioctx(object):
"""rados.Ioctx object"""
# NOTE(sileht): attributes declared in .pyd
raise make_ex(ret, "Failed to notify %r" % (obj))
return True
+ @requires(('obj', str_type), ('callback', opt(Callable)),
+ ('error_callback', opt(Callable)), ('timeout', int))
+ def watch(self, obj, callback, error_callback=None, timeout=None):
+ """
+ Register an interest in an object.
+
+ :param obj: the name of the object to notify
+ :type obj: str
+ :param callback: what to do when a notify is received on this object
+ :type callback: callable
+ :param error_callback: what to do when the watch session encounters an error
+ :type error_callback: callable
+ :param timeout: how many seconds the connection will keep after disconnection
+ :type timeout: int
+
+ :raises: :class:`TypeError`
+ :raises: :class:`Error`
+ :returns: watch_id - internal id assigned to this watch
+ """
+ self.require_ioctx_open()
+
+ return Watch(self, obj, callback, error_callback, timeout)
+
def list_objects(self):
"""
Get ObjectIterator on rados.Ioctx object.
from nose import SkipTest
from nose.tools import eq_ as eq, ok_ as ok, assert_raises
from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
- ObjectNotFound, ObjectBusy, requires, opt,
+ ObjectNotFound, ObjectBusy, NotConnected, requires, opt,
LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx, LIBRADOS_CREATE_EXCLUSIVE,
LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog)
+from datetime import timedelta
import time
import threading
import json
eq(ret, 0)
assert len(out) > 0
eq(u"pool '\u9ec5' created", out)
+
+
+class TestWatchNotify(object):
+ OID = "test_watch_notify"
+
+ def setUp(self):
+ self.rados = Rados(conffile='')
+ self.rados.connect()
+ self.rados.create_pool('test_pool')
+ assert self.rados.pool_exists('test_pool')
+ self.ioctx = self.rados.open_ioctx('test_pool')
+ self.ioctx.write(self.OID, b'test watch notify')
+ self.lock = threading.Condition()
+ self.notify_cnt = {}
+ self.notify_data = {}
+ self.notify_error = {}
+
+ def tearDown(self):
+ self.ioctx.close()
+ self.rados.delete_pool('test_pool')
+ self.rados.shutdown()
+
+ def make_callback(self):
+ def callback(notify_id, notifier_id, watch_id, data):
+ with self.lock:
+ if watch_id not in self.notify_cnt:
+ self.notify_cnt[watch_id] = 0
+ self.notify_cnt[watch_id] += 1
+ self.notify_data[watch_id] = data
+ return callback
+
+ def make_error_callback(self):
+ def callback(watch_id, error):
+ with self.lock:
+ self.notify_error[watch_id] = error
+ return callback
+
+
+ def test(self):
+ with self.ioctx.watch(self.OID, self.make_callback(),
+ self.make_error_callback()) as watch1:
+ watch_id1 = watch1.get_id()
+ assert(watch_id1 > 0)
+
+ with self.rados.open_ioctx('test_pool') as ioctx:
+ watch2 = ioctx.watch(self.OID, self.make_callback(),
+ self.make_error_callback())
+ watch_id2 = watch2.get_id()
+ assert(watch_id2 > 0)
+
+ assert(self.ioctx.notify(self.OID, 'test'))
+ with self.lock:
+ assert(watch_id1 in self.notify_cnt)
+ assert(watch_id2 in self.notify_cnt)
+ eq(self.notify_cnt[watch_id1], 1)
+ eq(self.notify_cnt[watch_id2], 1)
+ eq(self.notify_data[watch_id1], b'test')
+ eq(self.notify_data[watch_id2], b'test')
+
+ assert(watch1.check() >= timedelta())
+ assert(watch2.check() >= timedelta())
+
+ assert(self.ioctx.notify(self.OID, 'best'))
+ with self.lock:
+ eq(self.notify_cnt[watch_id1], 2)
+ eq(self.notify_cnt[watch_id2], 2)
+ eq(self.notify_data[watch_id1], b'best')
+ eq(self.notify_data[watch_id2], b'best')
+
+ watch2.close()
+
+ assert(self.ioctx.notify(self.OID, 'rest'))
+ with self.lock:
+ eq(self.notify_cnt[watch_id1], 3)
+ eq(self.notify_cnt[watch_id2], 2)
+ eq(self.notify_data[watch_id1], b'rest')
+ eq(self.notify_data[watch_id2], b'best')
+
+ assert(watch1.check() >= timedelta())
+
+ self.ioctx.remove_object(self.OID)
+
+ for i in range(10):
+ with self.lock:
+ if watch_id1 in self.notify_error:
+ break
+ time.sleep(1)
+ eq(self.notify_error[watch_id1], -errno.ENOTCONN)
+ assert_raises(NotConnected, watch1.check)
+
+ assert_raises(ObjectNotFound, self.ioctx.notify, self.OID, 'test')