]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/rados: add watch object bindings
authorMykola Golub <mgolub@suse.com>
Tue, 3 Mar 2020 13:24:06 +0000 (13:24 +0000)
committerJason Dillaman <dillaman@redhat.com>
Tue, 9 Feb 2021 15:42:03 +0000 (10:42 -0500)
Signed-off-by: Mykola Golub <mgolub@suse.com>
(cherry picked from commit dfef23d6d3fdf162f48f909b90c10577f03116bc)

src/pybind/rados/rados.pyx
src/test/pybind/test_rados.py

index bb07ad1855c3478b4b6ac0879f75780d6e09ee81..0d0f6aba4df51147967ae7707a5c9bb74f2314bc 100644 (file)
@@ -27,7 +27,7 @@ try:
     from collections.abc import Callable
 except ImportError:
     from collections import Callable
-from datetime import datetime
+from datetime import datetime, timedelta
 from functools import partial, wraps
 from itertools import chain
 
@@ -100,6 +100,10 @@ cdef extern from "rados/librados.h" nogil:
                                           uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
     ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
                                           uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
+    ctypedef void (*rados_watchcb2_t)(void *arg, int64_t notify_id,
+                                      uint64_t handle, uint64_t notifier_id,
+                                      void *data, size_t data_len)
+    ctypedef void (*rados_watcherrcb_t)(void *pre, uint64_t cookie, int err)
 
 
     cdef struct rados_cluster_stat_t:
@@ -137,6 +141,7 @@ cdef extern from "rados/librados.h" nogil:
     int rados_conf_set(rados_t cluster, char *option, const char *value)
     int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
 
+    rados_t rados_ioctx_get_cluster(rados_ioctx_t io)
     int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
     int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
     int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
@@ -319,6 +324,11 @@ cdef extern from "rados/librados.h" nogil:
     int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
     void rados_omap_get_end(rados_omap_iter_t iter)
     int rados_notify2(rados_ioctx_t io, const char * o, const char *buf, int buf_len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len)
+    int rados_notify_ack(rados_ioctx_t io, const char *o, uint64_t notify_id, uint64_t cookie, const char *buf, int buf_len)
+    int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie, rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, uint32_t timeout, void *arg)
+    int rados_watch_check(rados_ioctx_t io, uint64_t cookie)
+    int rados_unwatch2(rados_ioctx_t io, uint64_t cookie)
+    int rados_watch_flush(rados_t cluster)
 
 
 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
@@ -435,6 +445,11 @@ class NoSpace(OSError):
         super(NoSpace, self).__init__(
                 "RADOS no space (%s)" % message, errno)
 
+class NotConnected(OSError):
+    """ `NotConnected` class, derived from `OSError` """
+    def __init__(self, message, errno=None):
+        super(NotConnected, self).__init__(
+                "RADOS not connected (%s)" % message, errno)
 
 class RadosStateError(Error):
     """ `RadosStateError` class, derived from `Error` """
@@ -500,6 +515,7 @@ IF UNAME_SYSNAME == "FreeBSD":
         errno.EINPROGRESS : InProgress,
         errno.EISCONN   : IsConnected,
         errno.EINVAL    : InvalidArgumentError,
+        errno.ENOTCONN  : NotConnected,
     }
 ELSE:
     cdef errno_to_exception = {
@@ -516,6 +532,7 @@ ELSE:
         errno.EINPROGRESS : InProgress,
         errno.EISCONN   : IsConnected,
         errno.EINVAL    : InvalidArgumentError,
+        errno.ENOTCONN  : NotConnected,
     }
 
 
@@ -2278,6 +2295,150 @@ class ReadOpCtx(ReadOp, OpCtx):
     """read operation context manager"""
 
 
+cdef void __watch_callback(void *_arg, int64_t _notify_id, uint64_t _cookie,
+                           uint64_t _notifier_id, void *_data,
+                           size_t _data_len) with gil:
+    """
+    Watch callback
+    """
+    cdef object watch = <object>_arg
+    data = None
+    if _data != NULL:
+        data = (<char *>_data)[:_data_len]
+    watch._callback(_notify_id, _notifier_id, _cookie, data)
+
+cdef void __watch_error_callback(void *_arg, uint64_t _cookie,
+                                 int _error) with gil:
+    """
+    Watch error callback
+    """
+    cdef object watch = <object>_arg
+    watch._error_callback(_cookie, _error)
+
+
+cdef class Watch(object):
+    """watch object"""
+
+    cdef:
+        object id
+        Ioctx ioctx
+        object oid
+        object callback
+        object error_callback
+
+    def __cinit__(self, Ioctx ioctx, object oid, object callback,
+                  object error_callback, object timeout):
+        self.id = 0
+        self.ioctx = ioctx.dup()
+        self.oid = cstr(oid, 'oid')
+        self.callback = callback
+        self.error_callback = error_callback
+
+        if timeout is None:
+            timeout = 0
+
+        cdef:
+            char *_oid = self.oid
+            uint64_t _cookie;
+            uint32_t _timeout = timeout;
+            void *_args = <PyObject*>self
+
+        with nogil:
+            ret = rados_watch3(self.ioctx.io, _oid, &_cookie,
+                               <rados_watchcb2_t>&__watch_callback,
+                               <rados_watcherrcb_t>&__watch_error_callback,
+                               _timeout, _args)
+        if ret < 0:
+            raise make_ex(ret, "watch error")
+
+        self.id = int(_cookie);
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type_, value, traceback):
+        self.close()
+        return False
+
+    def __dealloc__(self):
+        self.close()
+
+    def _callback(self, notify_id, notifier_id, watch_id, data):
+        replay = self.callback(notify_id, notifier_id, watch_id, data)
+
+        cdef:
+            rados_ioctx_t _io = <rados_ioctx_t>self.ioctx.io
+            char *_obj = self.oid
+            int64_t _notify_id = notify_id
+            uint64_t _cookie = watch_id
+            char *_replay = NULL
+            int _replay_len = 0
+
+        if replay is not None:
+            replay = cstr(replay, 'replay')
+            _replay = replay
+            _replaylen = len(replay)
+
+        with nogil:
+            rados_notify_ack(_io, _obj, _notify_id, _cookie, _replay,
+                             _replaylen)
+
+    def _error_callback(self, watch_id, error):
+        if self.error_callback is None:
+            return
+        self.error_callback(watch_id, error)
+
+    def get_id(self):
+        return self.id
+
+    def check(self):
+        """
+        Check on watch validity.
+
+        :raises: :class:`Error`
+        :returns: timedelta since last confirmed valid
+        """
+        self.ioctx.require_ioctx_open()
+
+        cdef:
+            uint64_t _cookie = self.id
+
+        with nogil:
+            ret = rados_watch_check(self.ioctx.io, _cookie)
+        if ret < 0:
+            raise make_ex(ret, "check error")
+
+        return timedelta(milliseconds=ret)
+
+    def close(self):
+        """
+        Unregister an interest in an object.
+
+        :raises: :class:`Error`
+        """
+        if self.id == 0:
+            return
+
+        self.ioctx.require_ioctx_open()
+
+        cdef:
+            uint64_t _cookie = self.id
+
+        with nogil:
+            ret = rados_unwatch2(self.ioctx.io, _cookie)
+        if ret < 0 and ret != -errno.ENOENT:
+            raise make_ex(ret, "unwatch error")
+        self.id = 0
+
+        with nogil:
+            cluster = rados_ioctx_get_cluster(self.ioctx.io)
+            ret = rados_watch_flush(cluster);
+        if ret < 0:
+            raise make_ex(ret, "watch_flush error")
+
+        self.ioctx.close()
+
+
 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
     """
     Callback to oncomplete() for asynchronous operations
@@ -2286,7 +2447,6 @@ cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
     cb._complete()
     return 0
 
-
 cdef class Ioctx(object):
     """rados.Ioctx object"""
     # NOTE(sileht): attributes declared in .pyd
@@ -3375,6 +3535,29 @@ returned %d, but should return zero on success." % (self.name, ret))
             raise make_ex(ret, "Failed to notify %r" % (obj))
         return True
 
+    @requires(('obj', str_type), ('callback', opt(Callable)),
+              ('error_callback', opt(Callable)), ('timeout', int))
+    def watch(self, obj, callback, error_callback=None, timeout=None):
+        """
+        Register an interest in an object.
+
+        :param obj: the name of the object to notify
+        :type obj: str
+        :param callback: what to do when a notify is received on this object
+        :type callback: callable
+        :param error_callback: what to do when the watch session encounters an error
+        :type error_callback: callable
+        :param timeout: how many seconds the connection will keep after disconnection
+        :type timeout: int
+
+        :raises: :class:`TypeError`
+        :raises: :class:`Error`
+        :returns: watch_id - internal id assigned to this watch
+        """
+        self.require_ioctx_open()
+
+        return Watch(self, obj, callback, error_callback, timeout)
+
     def list_objects(self):
         """
         Get ObjectIterator on rados.Ioctx object.
index a4ad02f79249199dfeaa0d22b588ab0eef1ecfbd..e02a393fbdd1a784d6a6ce461ea38d74d50e4365 100644 (file)
@@ -2,9 +2,10 @@ from __future__ import print_function
 from nose import SkipTest
 from nose.tools import eq_ as eq, ok_ as ok, assert_raises
 from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
-                   ObjectNotFound, ObjectBusy, requires, opt,
+                   ObjectNotFound, ObjectBusy, NotConnected, requires, opt,
                    LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx, LIBRADOS_CREATE_EXCLUSIVE,
                    LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog)
+from datetime import timedelta
 import time
 import threading
 import json
@@ -1208,3 +1209,94 @@ class TestCommand(object):
         eq(ret, 0)
         assert len(out) > 0
         eq(u"pool '\u9ec5' created", out)
+
+
+class TestWatchNotify(object):
+    OID = "test_watch_notify"
+
+    def setUp(self):
+        self.rados = Rados(conffile='')
+        self.rados.connect()
+        self.rados.create_pool('test_pool')
+        assert self.rados.pool_exists('test_pool')
+        self.ioctx = self.rados.open_ioctx('test_pool')
+        self.ioctx.write(self.OID, b'test watch notify')
+        self.lock = threading.Condition()
+        self.notify_cnt = {}
+        self.notify_data = {}
+        self.notify_error = {}
+
+    def tearDown(self):
+        self.ioctx.close()
+        self.rados.delete_pool('test_pool')
+        self.rados.shutdown()
+
+    def make_callback(self):
+        def callback(notify_id, notifier_id, watch_id, data):
+            with self.lock:
+                if watch_id not in self.notify_cnt:
+                    self.notify_cnt[watch_id] = 0
+                self.notify_cnt[watch_id] += 1
+                self.notify_data[watch_id] = data
+        return callback
+
+    def make_error_callback(self):
+        def callback(watch_id, error):
+            with self.lock:
+                self.notify_error[watch_id] = error
+        return callback
+
+
+    def test(self):
+        with self.ioctx.watch(self.OID, self.make_callback(),
+                              self.make_error_callback()) as watch1:
+            watch_id1 = watch1.get_id()
+            assert(watch_id1 > 0)
+
+            with self.rados.open_ioctx('test_pool') as ioctx:
+                watch2 = ioctx.watch(self.OID, self.make_callback(),
+                                     self.make_error_callback())
+            watch_id2 = watch2.get_id()
+            assert(watch_id2 > 0)
+
+            assert(self.ioctx.notify(self.OID, 'test'))
+            with self.lock:
+                assert(watch_id1 in self.notify_cnt)
+                assert(watch_id2 in self.notify_cnt)
+                eq(self.notify_cnt[watch_id1], 1)
+                eq(self.notify_cnt[watch_id2], 1)
+                eq(self.notify_data[watch_id1], b'test')
+                eq(self.notify_data[watch_id2], b'test')
+
+            assert(watch1.check() >= timedelta())
+            assert(watch2.check() >= timedelta())
+
+            assert(self.ioctx.notify(self.OID, 'best'))
+            with self.lock:
+                eq(self.notify_cnt[watch_id1], 2)
+                eq(self.notify_cnt[watch_id2], 2)
+                eq(self.notify_data[watch_id1], b'best')
+                eq(self.notify_data[watch_id2], b'best')
+
+            watch2.close()
+
+            assert(self.ioctx.notify(self.OID, 'rest'))
+            with self.lock:
+                eq(self.notify_cnt[watch_id1], 3)
+                eq(self.notify_cnt[watch_id2], 2)
+                eq(self.notify_data[watch_id1], b'rest')
+                eq(self.notify_data[watch_id2], b'best')
+
+            assert(watch1.check() >= timedelta())
+
+            self.ioctx.remove_object(self.OID)
+
+            for i in range(10):
+                with self.lock:
+                    if watch_id1 in self.notify_error:
+                        break
+                time.sleep(1)
+            eq(self.notify_error[watch_id1], -errno.ENOTCONN)
+            assert_raises(NotConnected, watch1.check)
+
+            assert_raises(ObjectNotFound, self.ioctx.notify, self.OID, 'test')