]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/rados: introduce aio_notify() python binding
authorVenky Shankar <vshankar@redhat.com>
Thu, 9 Jul 2020 10:42:30 +0000 (06:42 -0400)
committerVenky Shankar <vshankar@redhat.com>
Thu, 17 Sep 2020 04:29:52 +0000 (00:29 -0400)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/include/rados/librados.h
src/include/rados/rados_types.h
src/librados/librados_c.cc
src/pybind/rados/rados.pyx
src/test/librados/watch_notify.cc
src/test/pybind/test_rados.py

index 729fb7846e0acd36ae9b453b6a77b12db25a01ae..27031419f1f164a864b90f8edf40e1d8a5a62272 100644 (file)
@@ -2642,6 +2642,36 @@ CEPH_RADOS_API int rados_notify2(rados_ioctx_t io, const char *o,
                                 uint64_t timeout_ms,
                                 char **reply_buffer, size_t *reply_buffer_len);
 
+/**
+ * Decode a notify response
+ *
+ * Decode a notify response (from rados_aio_notify() call) into acks and
+ * timeout arrays.
+ *
+ * @param reply_buffer buffer from rados_aio_notify() call
+ * @param reply_buffer_len reply_buffer length
+ * @param acks pointer to struct notify_ack_t pointer
+ * @param nr_acks pointer to ack count
+ * @param timeouts pointer to notify_timeout_t pointer
+ * @param nr_timeouts pointer to timeout count
+ * @returns 0 on success
+ */
+CEPH_RADOS_API int rados_decode_notify_response(char *reply_buffer, size_t reply_buffer_len,
+                                                struct notify_ack_t **acks, size_t *nr_acks,
+                                                struct notify_timeout_t **timeouts, size_t *nr_timeouts);
+
+/**
+ * Free notify allocated buffer
+ *
+ * Release memory allocated by rados_decode_notify_response() call
+ *
+ * @param acks notify_ack_t struct (from rados_decode_notify_response())
+ * @param nr_acks ack count
+ * @param timeouts notify_timeout_t struct (from rados_decode_notify_response())
+ */
+CEPH_RADOS_API void rados_free_notify_response(struct notify_ack_t *acks, size_t nr_acks,
+                                               struct notify_timeout_t *timeouts);
+
 /**
  * Acknolwedge receipt of a notify
  *
index 0712f489148fc438587a706eaf55e9451f6d68d5..d308341ec50edc3cf0a0922c9f5cd6405a8c3f8a 100644 (file)
@@ -18,6 +18,18 @@ struct obj_watch_t {
   uint32_t timeout_seconds;
 }; 
 
+struct notify_ack_t {
+  uint64_t notifier_id;
+  uint64_t cookie;
+  char *payload;
+  uint64_t payload_len;
+};
+
+struct notify_timeout_t {
+  uint64_t notifier_id;
+  uint64_t cookie;
+};
+
 /**
  *
  * Pass as nspace argument to rados_ioctx_set_namespace()
index 8e8b58f8cd683e2499fa2ff4c151478e29a9d554..22146459e992e71f81781de4e9c56926520af7f9 100644 (file)
@@ -2905,6 +2905,75 @@ extern "C" int _rados_notify2(rados_ioctx_t io, const char *o,
 }
 LIBRADOS_C_API_BASE_DEFAULT(rados_notify2);
 
+extern "C" int _rados_decode_notify_response(char *reply_buffer, size_t reply_buffer_len,
+                                             struct notify_ack_t **acks, size_t *nr_acks,
+                                             struct notify_timeout_t **timeouts, size_t *nr_timeouts) {
+  if (!reply_buffer || !reply_buffer_len) {
+    return -EINVAL;
+  }
+
+  bufferlist bl;
+  bl.append(reply_buffer, reply_buffer_len);
+
+  map<pair<uint64_t,uint64_t>,bufferlist> acked;
+  set<pair<uint64_t,uint64_t>> missed;
+  auto iter = bl.cbegin();
+  decode(acked, iter);
+  decode(missed, iter);
+
+  *acks = nullptr;
+  *nr_acks = acked.size();
+  if (*nr_acks) {
+    *acks = new notify_ack_t[*nr_acks];
+    struct notify_ack_t *ack = *acks;
+    for (auto &[who, payload] : acked) {
+      ack->notifier_id = who.first;
+      ack->cookie = who.second;
+      ack->payload = nullptr;
+      ack->payload_len = payload.length();
+      if (ack->payload_len) {
+        ack->payload = (char *)malloc(ack->payload_len);
+        memcpy(ack->payload, payload.c_str(), ack->payload_len);
+      }
+
+      ack++;
+    }
+  }
+
+  *timeouts = nullptr;
+  *nr_timeouts = missed.size();
+  if (*nr_timeouts) {
+    *timeouts = new notify_timeout_t[*nr_timeouts];
+    struct notify_timeout_t *timeout = *timeouts;
+    for (auto &[notifier_id, cookie] : missed) {
+      timeout->notifier_id = notifier_id;
+      timeout->cookie = cookie;
+      timeout++;
+    }
+  }
+
+  return 0;
+}
+LIBRADOS_C_API_BASE_DEFAULT(rados_decode_notify_response);
+
+
+extern "C" void _rados_free_notify_response(struct notify_ack_t *acks, size_t nr_acks,
+                                            struct notify_timeout_t *timeouts) {
+  for (uint64_t n = 0; n < nr_acks; ++n) {
+    assert(acks);
+    if (acks[n].payload) {
+      free(acks[n].payload);
+    }
+  }
+  if (acks) {
+    delete[] acks;
+  }
+  if (timeouts) {
+    delete[] timeouts;
+  }
+}
+LIBRADOS_C_API_BASE_DEFAULT(rados_free_notify_response);
+
 extern "C" int _rados_aio_notify(rados_ioctx_t io, const char *o,
                                  rados_completion_t completion,
                                  const char *buf, int buf_len,
index e2bf59067fc88bc516305248c70ad4c34b79e816..51085e7f21a8382b295d25f4c1208a78457e668f 100644 (file)
@@ -25,7 +25,7 @@ import time
 from datetime import datetime, timedelta
 from functools import partial, wraps
 from itertools import chain
-from typing import Callable, Dict, Optional, Sequence, Tuple, Union
+from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
 
 cdef extern from "Python.h":
     # These are in cpython/string.pxd, but use "object" types instead of
@@ -55,7 +55,15 @@ cdef extern from "err.h" nogil:
 
 cdef extern from "rados/rados_types.h" nogil:
     cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
+    cdef struct notify_ack_t:
+        unsigned long notifier_id
+        unsigned long cookie
+        char *payload
+        unsigned long payload_len
 
+    cdef struct notify_timeout_t:
+        unsigned long notifier_id
+        unsigned long cookie
 
 cdef extern from "rados/librados.h" nogil:
     enum:
@@ -319,6 +327,9 @@ cdef extern from "rados/librados.h" nogil:
     int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
     void rados_omap_get_end(rados_omap_iter_t iter)
     int rados_notify2(rados_ioctx_t io, const char * o, const char *buf, int buf_len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len)
+    int rados_aio_notify(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len)
+    int rados_decode_notify_response(char *reply_buffer, size_t reply_buffer_len, notify_ack_t **acks, size_t *nr_acks, notify_timeout_t **timeouts, size_t *nr_timeouts)
+    void rados_free_notify_response(notify_ack_t *acks, size_t nr_acks, notify_timeout_t *timeouts)
     int rados_notify_ack(rados_ioctx_t io, const char *o, uint64_t notify_id, uint64_t cookie, const char *buf, int buf_len)
     int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie, rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, uint32_t timeout, void *arg)
     int rados_watch_check(rados_ioctx_t io, uint64_t cookie)
@@ -623,8 +634,6 @@ cdef char ** to_bytes_array(list_bytes):
         ret[i] = <char *>list_bytes[i]
     return ret
 
-
-
 cdef int __monitor_callback(void *arg, const char *line, const char *who,
                              uint64_t sec, uint64_t nsec, uint64_t seq,
                              const char *level, const char *msg) with gil:
@@ -3417,6 +3426,57 @@ returned %d, but should return zero on success." % (self.name, ret))
             raise make_ex(ret, "Failed to notify %r" % (obj))
         return True
 
+    def aio_notify(self, obj: str,
+                   oncomplete: Callable[[Completion, int, Optional[List], Optional[List]], None],
+                   msg: str = '', timeout_ms: int = 5000) -> Completion:
+        """
+        Asynchronously send a rados notification to an object
+        """
+        self.require_ioctx_open()
+
+        msglen = len(msg)
+        obj_raw = cstr(obj, 'obj')
+        msg_raw = cstr(msg, 'msg')
+
+        cdef:
+            Completion completion
+            char *_obj = obj_raw
+            char *_msg = msg_raw
+            int _msglen = msglen
+            uint64_t _timeout_ms = timeout_ms
+            char *reply
+            size_t replylen = 0
+
+        def oncomplete_(completion_v):
+            cdef:
+                Completion _completion_v = completion_v
+                notify_ack_t *acks = NULL
+                notify_timeout_t *timeouts = NULL
+                size_t nr_acks
+                size_t nr_timeouts
+            return_value = _completion_v.get_return_value()
+            if return_value == 0:
+                return_value = rados_decode_notify_response(reply, replylen, &acks, &nr_acks, &timeouts, &nr_timeouts)
+                rados_buffer_free(reply)
+            if return_value == 0:
+                ack_list = [(ack.notifier_id, ack.cookie, '' if not ack.payload_len \
+                                                             else ack.payload[:ack.payload_len]) for ack in acks[:nr_acks]]
+                timeout_list = [(timeout.notifier_id, timeout.cookie) for timeout in timeouts[:nr_timeouts]]
+                rados_free_notify_response(acks, nr_acks, timeouts)
+                return oncomplete(_completion_v, 0, ack_list, timeout_list)
+            else:
+                return oncomplete(_completion_v, return_value, None, None)
+
+        completion = self.__get_completion(oncomplete_, None)
+        self.__track_completion(completion)
+        with nogil:
+            ret = rados_aio_notify(self.io, _obj, completion.rados_comp,
+                                   _msg, _msglen, _timeout_ms, &reply, &replylen)
+        if ret < 0:
+            completion._cleanup()
+            raise make_ex(ret, "aio_notify error: %s" % obj)
+        return completion
+
     def watch(self, obj: str,
               callback: Callable[[int, str, int, bytes], None],
               error_callback: Optional[Callable[[int], None]] = None,
index 4154298f0d9d6549d433c42db174bbcda04f9bd3..148131062f786e3a704bcfbbffcca13f5e6be3ae 100644 (file)
@@ -326,20 +326,19 @@ TEST_F(LibRadosWatchNotify, AioNotify) {
   ASSERT_EQ(0, rados_aio_get_return_value(comp));
   rados_aio_release(comp);
 
-  bufferlist reply;
-  reply.append(reply_buf, reply_buf_len);
-  std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
-  std::set<std::pair<uint64_t,uint64_t> > missed_map;
-  auto reply_p = reply.cbegin();
-  decode(reply_map, reply_p);
-  decode(missed_map, reply_p);
-  ASSERT_EQ(1u, reply_map.size());
-  ASSERT_EQ(0u, missed_map.size());
+  size_t nr_acks, nr_timeouts;
+  notify_ack_t *acks = nullptr;
+  notify_timeout_t *timeouts = nullptr;
+  ASSERT_EQ(0, rados_decode_notify_response(reply_buf, reply_buf_len,
+                                            &acks, &nr_acks, &timeouts, &nr_timeouts));
+  ASSERT_EQ(1u, nr_acks);
+  ASSERT_EQ(0u, nr_timeouts);
   ASSERT_EQ(1u, notify_cookies.size());
   ASSERT_EQ(1u, notify_cookies.count(handle));
-  ASSERT_EQ(5u, reply_map.begin()->second.length());
-  ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+  ASSERT_EQ(5u, acks[0].payload_len);
+  ASSERT_EQ(0, strncmp("reply", acks[0].payload, acks[0].payload_len));
   ASSERT_GT(rados_watch_check(ioctx, handle), 0);
+  rados_free_notify_response(acks, nr_acks, timeouts);
   rados_buffer_free(reply_buf);
 
   // try it on a non-existent object ... our buffer pointers
index 9b54f8028d90a240a4ea42b8c501ebcc6bdfb389..55754161095750b981b28162f537257a61c8afe4 100644 (file)
@@ -1248,6 +1248,10 @@ class TestWatchNotify(object):
         self.notify_cnt = {}
         self.notify_data = {}
         self.notify_error = {}
+        # aio related
+        self.ack_cnt = {}
+        self.ack_data = {}
+        self.instance_id = self.rados.get_instance_id()
 
     def tearDown(self):
         self.ioctx.close()
@@ -1323,3 +1327,71 @@ class TestWatchNotify(object):
             assert_raises(NotConnected, watch1.check)
 
             assert_raises(ObjectNotFound, self.ioctx.notify, self.OID, 'test')
+
+    def make_callback_reply(self):
+        def callback(notify_id, notifier_id, watch_id, data):
+            with self.lock:
+                return data
+        return callback
+
+    def notify_callback(self, _, r, ack_list, timeout_list):
+        eq(r, 0)
+        with self.lock:
+            for notifier_id, _, notifier_data in ack_list:
+                if notifier_id not in self.ack_cnt:
+                    self.ack_cnt[notifier_id] = 0
+                self.ack_cnt[notifier_id] += 1
+                self.ack_data[notifier_id] = notifier_data
+
+    def notify_callback_err(self, _, r, ack_list, timeout_list):
+        eq(r, -errno.ENOENT)
+
+    def test_aio_notify(self):
+        with self.ioctx.watch(self.OID, self.make_callback_reply(),
+                              self.make_error_callback()) as watch1:
+            watch_id1 = watch1.get_id()
+            ok(watch_id1 > 0)
+
+            with self.rados.open_ioctx('test_pool') as ioctx:
+                watch2 = ioctx.watch(self.OID, self.make_callback_reply(),
+                                     self.make_error_callback())
+            watch_id2 = watch2.get_id()
+            ok(watch_id2 > 0)
+
+            comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='test')
+            comp.wait_for_complete_and_cb()
+            with self.lock:
+                ok(self.instance_id in self.ack_cnt)
+                eq(self.ack_cnt[self.instance_id], 2)
+                eq(self.ack_data[self.instance_id], b'test')
+
+            ok(watch1.check() >= timedelta())
+            ok(watch2.check() >= timedelta())
+
+            comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='best')
+            comp.wait_for_complete_and_cb()
+            with self.lock:
+                eq(self.ack_cnt[self.instance_id], 4)
+                eq(self.ack_data[self.instance_id], b'best')
+
+            watch2.close()
+
+            comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='rest')
+            comp.wait_for_complete_and_cb()
+            with self.lock:
+                eq(self.ack_cnt[self.instance_id], 5)
+                eq(self.ack_data[self.instance_id], b'rest')
+
+            assert(watch1.check() >= timedelta())
+            self.ioctx.remove_object(self.OID)
+
+            for i in range(10):
+                with self.lock:
+                    if watch_id1 in self.notify_error:
+                        break
+                time.sleep(1)
+            eq(self.notify_error[watch_id1], -errno.ENOTCONN)
+            assert_raises(NotConnected, watch1.check)
+
+            comp = self.ioctx.aio_notify(self.OID, self.notify_callback_err, msg='test')
+            comp.wait_for_complete_and_cb()