From: Mykola Golub Date: Tue, 3 Mar 2020 13:24:06 +0000 (+0000) Subject: pybind/rados: add watch object bindings X-Git-Tag: v15.2.10~36^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=63fc68e82fa6557bb449e84cb9264c0ef59855fe;p=ceph.git pybind/rados: add watch object bindings Signed-off-by: Mykola Golub (cherry picked from commit dfef23d6d3fdf162f48f909b90c10577f03116bc) --- diff --git a/src/pybind/rados/rados.pyx b/src/pybind/rados/rados.pyx index bb07ad1855c3..0d0f6aba4df5 100644 --- a/src/pybind/rados/rados.pyx +++ b/src/pybind/rados/rados.pyx @@ -27,7 +27,7 @@ try: from collections.abc import Callable except ImportError: from collections import Callable -from datetime import datetime +from datetime import datetime, timedelta from functools import partial, wraps from itertools import chain @@ -100,6 +100,10 @@ cdef extern from "rados/librados.h" nogil: uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg) ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name, uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg) + ctypedef void (*rados_watchcb2_t)(void *arg, int64_t notify_id, + uint64_t handle, uint64_t notifier_id, + void *data, size_t data_len) + ctypedef void (*rados_watcherrcb_t)(void *pre, uint64_t cookie, int err) cdef struct rados_cluster_stat_t: @@ -137,6 +141,7 @@ cdef extern from "rados/librados.h" nogil: int rados_conf_set(rados_t cluster, char *option, const char *value) int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len) + rados_t rados_ioctx_get_cluster(rados_ioctx_t io) int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats) int64_t rados_pool_lookup(rados_t cluster, const char *pool_name) int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen) @@ -319,6 +324,11 @@ cdef extern from "rados/librados.h" nogil: int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len) void rados_omap_get_end(rados_omap_iter_t iter) int rados_notify2(rados_ioctx_t io, const char * o, const char *buf, int buf_len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len) + int rados_notify_ack(rados_ioctx_t io, const char *o, uint64_t notify_id, uint64_t cookie, const char *buf, int buf_len) + int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie, rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, uint32_t timeout, void *arg) + int rados_watch_check(rados_ioctx_t io, uint64_t cookie) + int rados_unwatch2(rados_ioctx_t io, uint64_t cookie) + int rados_watch_flush(rados_t cluster) LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL @@ -435,6 +445,11 @@ class NoSpace(OSError): super(NoSpace, self).__init__( "RADOS no space (%s)" % message, errno) +class NotConnected(OSError): + """ `NotConnected` class, derived from `OSError` """ + def __init__(self, message, errno=None): + super(NotConnected, self).__init__( + "RADOS not connected (%s)" % message, errno) class RadosStateError(Error): """ `RadosStateError` class, derived from `Error` """ @@ -500,6 +515,7 @@ IF UNAME_SYSNAME == "FreeBSD": errno.EINPROGRESS : InProgress, errno.EISCONN : IsConnected, errno.EINVAL : InvalidArgumentError, + errno.ENOTCONN : NotConnected, } ELSE: cdef errno_to_exception = { @@ -516,6 +532,7 @@ ELSE: errno.EINPROGRESS : InProgress, errno.EISCONN : IsConnected, errno.EINVAL : InvalidArgumentError, + errno.ENOTCONN : NotConnected, } @@ -2278,6 +2295,150 @@ class ReadOpCtx(ReadOp, OpCtx): """read operation context manager""" +cdef void __watch_callback(void *_arg, int64_t _notify_id, uint64_t _cookie, + uint64_t _notifier_id, void *_data, + size_t _data_len) with gil: + """ + Watch callback + """ + cdef object watch = _arg + data = None + if _data != NULL: + data = (_data)[:_data_len] + watch._callback(_notify_id, _notifier_id, _cookie, data) + +cdef void __watch_error_callback(void *_arg, uint64_t _cookie, + int _error) with gil: + """ + Watch error callback + """ + cdef object watch = _arg + watch._error_callback(_cookie, _error) + + +cdef class Watch(object): + """watch object""" + + cdef: + object id + Ioctx ioctx + object oid + object callback + object error_callback + + def __cinit__(self, Ioctx ioctx, object oid, object callback, + object error_callback, object timeout): + self.id = 0 + self.ioctx = ioctx.dup() + self.oid = cstr(oid, 'oid') + self.callback = callback + self.error_callback = error_callback + + if timeout is None: + timeout = 0 + + cdef: + char *_oid = self.oid + uint64_t _cookie; + uint32_t _timeout = timeout; + void *_args = self + + with nogil: + ret = rados_watch3(self.ioctx.io, _oid, &_cookie, + &__watch_callback, + &__watch_error_callback, + _timeout, _args) + if ret < 0: + raise make_ex(ret, "watch error") + + self.id = int(_cookie); + + def __enter__(self): + return self + + def __exit__(self, type_, value, traceback): + self.close() + return False + + def __dealloc__(self): + self.close() + + def _callback(self, notify_id, notifier_id, watch_id, data): + replay = self.callback(notify_id, notifier_id, watch_id, data) + + cdef: + rados_ioctx_t _io = self.ioctx.io + char *_obj = self.oid + int64_t _notify_id = notify_id + uint64_t _cookie = watch_id + char *_replay = NULL + int _replay_len = 0 + + if replay is not None: + replay = cstr(replay, 'replay') + _replay = replay + _replaylen = len(replay) + + with nogil: + rados_notify_ack(_io, _obj, _notify_id, _cookie, _replay, + _replaylen) + + def _error_callback(self, watch_id, error): + if self.error_callback is None: + return + self.error_callback(watch_id, error) + + def get_id(self): + return self.id + + def check(self): + """ + Check on watch validity. + + :raises: :class:`Error` + :returns: timedelta since last confirmed valid + """ + self.ioctx.require_ioctx_open() + + cdef: + uint64_t _cookie = self.id + + with nogil: + ret = rados_watch_check(self.ioctx.io, _cookie) + if ret < 0: + raise make_ex(ret, "check error") + + return timedelta(milliseconds=ret) + + def close(self): + """ + Unregister an interest in an object. + + :raises: :class:`Error` + """ + if self.id == 0: + return + + self.ioctx.require_ioctx_open() + + cdef: + uint64_t _cookie = self.id + + with nogil: + ret = rados_unwatch2(self.ioctx.io, _cookie) + if ret < 0 and ret != -errno.ENOENT: + raise make_ex(ret, "unwatch error") + self.id = 0 + + with nogil: + cluster = rados_ioctx_get_cluster(self.ioctx.io) + ret = rados_watch_flush(cluster); + if ret < 0: + raise make_ex(ret, "watch_flush error") + + self.ioctx.close() + + cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil: """ Callback to oncomplete() for asynchronous operations @@ -2286,7 +2447,6 @@ cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil: cb._complete() return 0 - cdef class Ioctx(object): """rados.Ioctx object""" # NOTE(sileht): attributes declared in .pyd @@ -3375,6 +3535,29 @@ returned %d, but should return zero on success." % (self.name, ret)) raise make_ex(ret, "Failed to notify %r" % (obj)) return True + @requires(('obj', str_type), ('callback', opt(Callable)), + ('error_callback', opt(Callable)), ('timeout', int)) + def watch(self, obj, callback, error_callback=None, timeout=None): + """ + Register an interest in an object. + + :param obj: the name of the object to notify + :type obj: str + :param callback: what to do when a notify is received on this object + :type callback: callable + :param error_callback: what to do when the watch session encounters an error + :type error_callback: callable + :param timeout: how many seconds the connection will keep after disconnection + :type timeout: int + + :raises: :class:`TypeError` + :raises: :class:`Error` + :returns: watch_id - internal id assigned to this watch + """ + self.require_ioctx_open() + + return Watch(self, obj, callback, error_callback, timeout) + def list_objects(self): """ Get ObjectIterator on rados.Ioctx object. diff --git a/src/test/pybind/test_rados.py b/src/test/pybind/test_rados.py index a4ad02f79249..e02a393fbdd1 100644 --- a/src/test/pybind/test_rados.py +++ b/src/test/pybind/test_rados.py @@ -2,9 +2,10 @@ from __future__ import print_function from nose import SkipTest from nose.tools import eq_ as eq, ok_ as ok, assert_raises from rados import (Rados, Error, RadosStateError, Object, ObjectExists, - ObjectNotFound, ObjectBusy, requires, opt, + ObjectNotFound, ObjectBusy, NotConnected, requires, opt, LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx, LIBRADOS_CREATE_EXCLUSIVE, LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog) +from datetime import timedelta import time import threading import json @@ -1208,3 +1209,94 @@ class TestCommand(object): eq(ret, 0) assert len(out) > 0 eq(u"pool '\u9ec5' created", out) + + +class TestWatchNotify(object): + OID = "test_watch_notify" + + def setUp(self): + self.rados = Rados(conffile='') + self.rados.connect() + self.rados.create_pool('test_pool') + assert self.rados.pool_exists('test_pool') + self.ioctx = self.rados.open_ioctx('test_pool') + self.ioctx.write(self.OID, b'test watch notify') + self.lock = threading.Condition() + self.notify_cnt = {} + self.notify_data = {} + self.notify_error = {} + + def tearDown(self): + self.ioctx.close() + self.rados.delete_pool('test_pool') + self.rados.shutdown() + + def make_callback(self): + def callback(notify_id, notifier_id, watch_id, data): + with self.lock: + if watch_id not in self.notify_cnt: + self.notify_cnt[watch_id] = 0 + self.notify_cnt[watch_id] += 1 + self.notify_data[watch_id] = data + return callback + + def make_error_callback(self): + def callback(watch_id, error): + with self.lock: + self.notify_error[watch_id] = error + return callback + + + def test(self): + with self.ioctx.watch(self.OID, self.make_callback(), + self.make_error_callback()) as watch1: + watch_id1 = watch1.get_id() + assert(watch_id1 > 0) + + with self.rados.open_ioctx('test_pool') as ioctx: + watch2 = ioctx.watch(self.OID, self.make_callback(), + self.make_error_callback()) + watch_id2 = watch2.get_id() + assert(watch_id2 > 0) + + assert(self.ioctx.notify(self.OID, 'test')) + with self.lock: + assert(watch_id1 in self.notify_cnt) + assert(watch_id2 in self.notify_cnt) + eq(self.notify_cnt[watch_id1], 1) + eq(self.notify_cnt[watch_id2], 1) + eq(self.notify_data[watch_id1], b'test') + eq(self.notify_data[watch_id2], b'test') + + assert(watch1.check() >= timedelta()) + assert(watch2.check() >= timedelta()) + + assert(self.ioctx.notify(self.OID, 'best')) + with self.lock: + eq(self.notify_cnt[watch_id1], 2) + eq(self.notify_cnt[watch_id2], 2) + eq(self.notify_data[watch_id1], b'best') + eq(self.notify_data[watch_id2], b'best') + + watch2.close() + + assert(self.ioctx.notify(self.OID, 'rest')) + with self.lock: + eq(self.notify_cnt[watch_id1], 3) + eq(self.notify_cnt[watch_id2], 2) + eq(self.notify_data[watch_id1], b'rest') + eq(self.notify_data[watch_id2], b'best') + + assert(watch1.check() >= timedelta()) + + self.ioctx.remove_object(self.OID) + + for i in range(10): + with self.lock: + if watch_id1 in self.notify_error: + break + time.sleep(1) + eq(self.notify_error[watch_id1], -errno.ENOTCONN) + assert_raises(NotConnected, watch1.check) + + assert_raises(ObjectNotFound, self.ioctx.notify, self.OID, 'test')