From: Venky Shankar Date: Thu, 9 Jul 2020 10:42:30 +0000 (-0400) Subject: pybind/rados: introduce aio_notify() python binding X-Git-Tag: v16.1.0~932^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e3bd49f7b889c8dc61885d29872317f06a25363d;p=ceph.git pybind/rados: introduce aio_notify() python binding Signed-off-by: Venky Shankar --- diff --git a/src/include/rados/librados.h b/src/include/rados/librados.h index 729fb7846e0a..27031419f1f1 100644 --- a/src/include/rados/librados.h +++ b/src/include/rados/librados.h @@ -2642,6 +2642,36 @@ CEPH_RADOS_API int rados_notify2(rados_ioctx_t io, const char *o, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len); +/** + * Decode a notify response + * + * Decode a notify response (from rados_aio_notify() call) into acks and + * timeout arrays. + * + * @param reply_buffer buffer from rados_aio_notify() call + * @param reply_buffer_len reply_buffer length + * @param acks pointer to struct notify_ack_t pointer + * @param nr_acks pointer to ack count + * @param timeouts pointer to notify_timeout_t pointer + * @param nr_timeouts pointer to timeout count + * @returns 0 on success + */ +CEPH_RADOS_API int rados_decode_notify_response(char *reply_buffer, size_t reply_buffer_len, + struct notify_ack_t **acks, size_t *nr_acks, + struct notify_timeout_t **timeouts, size_t *nr_timeouts); + +/** + * Free notify allocated buffer + * + * Release memory allocated by rados_decode_notify_response() call + * + * @param acks notify_ack_t struct (from rados_decode_notify_response()) + * @param nr_acks ack count + * @param timeouts notify_timeout_t struct (from rados_decode_notify_response()) + */ +CEPH_RADOS_API void rados_free_notify_response(struct notify_ack_t *acks, size_t nr_acks, + struct notify_timeout_t *timeouts); + /** * Acknolwedge receipt of a notify * diff --git a/src/include/rados/rados_types.h b/src/include/rados/rados_types.h index 0712f489148f..d308341ec50e 100644 --- a/src/include/rados/rados_types.h +++ b/src/include/rados/rados_types.h @@ -18,6 +18,18 @@ struct obj_watch_t { uint32_t timeout_seconds; }; +struct notify_ack_t { + uint64_t notifier_id; + uint64_t cookie; + char *payload; + uint64_t payload_len; +}; + +struct notify_timeout_t { + uint64_t notifier_id; + uint64_t cookie; +}; + /** * * Pass as nspace argument to rados_ioctx_set_namespace() diff --git a/src/librados/librados_c.cc b/src/librados/librados_c.cc index 8e8b58f8cd68..22146459e992 100644 --- a/src/librados/librados_c.cc +++ b/src/librados/librados_c.cc @@ -2905,6 +2905,75 @@ extern "C" int _rados_notify2(rados_ioctx_t io, const char *o, } LIBRADOS_C_API_BASE_DEFAULT(rados_notify2); +extern "C" int _rados_decode_notify_response(char *reply_buffer, size_t reply_buffer_len, + struct notify_ack_t **acks, size_t *nr_acks, + struct notify_timeout_t **timeouts, size_t *nr_timeouts) { + if (!reply_buffer || !reply_buffer_len) { + return -EINVAL; + } + + bufferlist bl; + bl.append(reply_buffer, reply_buffer_len); + + map,bufferlist> acked; + set> missed; + auto iter = bl.cbegin(); + decode(acked, iter); + decode(missed, iter); + + *acks = nullptr; + *nr_acks = acked.size(); + if (*nr_acks) { + *acks = new notify_ack_t[*nr_acks]; + struct notify_ack_t *ack = *acks; + for (auto &[who, payload] : acked) { + ack->notifier_id = who.first; + ack->cookie = who.second; + ack->payload = nullptr; + ack->payload_len = payload.length(); + if (ack->payload_len) { + ack->payload = (char *)malloc(ack->payload_len); + memcpy(ack->payload, payload.c_str(), ack->payload_len); + } + + ack++; + } + } + + *timeouts = nullptr; + *nr_timeouts = missed.size(); + if (*nr_timeouts) { + *timeouts = new notify_timeout_t[*nr_timeouts]; + struct notify_timeout_t *timeout = *timeouts; + for (auto &[notifier_id, cookie] : missed) { + timeout->notifier_id = notifier_id; + timeout->cookie = cookie; + timeout++; + } + } + + return 0; +} +LIBRADOS_C_API_BASE_DEFAULT(rados_decode_notify_response); + + +extern "C" void _rados_free_notify_response(struct notify_ack_t *acks, size_t nr_acks, + struct notify_timeout_t *timeouts) { + for (uint64_t n = 0; n < nr_acks; ++n) { + assert(acks); + if (acks[n].payload) { + free(acks[n].payload); + } + } + if (acks) { + delete[] acks; + } + if (timeouts) { + delete[] timeouts; + } +} +LIBRADOS_C_API_BASE_DEFAULT(rados_free_notify_response); + extern "C" int _rados_aio_notify(rados_ioctx_t io, const char *o, rados_completion_t completion, const char *buf, int buf_len, diff --git a/src/pybind/rados/rados.pyx b/src/pybind/rados/rados.pyx index e2bf59067fc8..51085e7f21a8 100644 --- a/src/pybind/rados/rados.pyx +++ b/src/pybind/rados/rados.pyx @@ -25,7 +25,7 @@ import time from datetime import datetime, timedelta from functools import partial, wraps from itertools import chain -from typing import Callable, Dict, Optional, Sequence, Tuple, Union +from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union cdef extern from "Python.h": # These are in cpython/string.pxd, but use "object" types instead of @@ -55,7 +55,15 @@ cdef extern from "err.h" nogil: cdef extern from "rados/rados_types.h" nogil: cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES" + cdef struct notify_ack_t: + unsigned long notifier_id + unsigned long cookie + char *payload + unsigned long payload_len + cdef struct notify_timeout_t: + unsigned long notifier_id + unsigned long cookie cdef extern from "rados/librados.h" nogil: enum: @@ -319,6 +327,9 @@ cdef extern from "rados/librados.h" nogil: int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len) void rados_omap_get_end(rados_omap_iter_t iter) int rados_notify2(rados_ioctx_t io, const char * o, const char *buf, int buf_len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len) + int rados_aio_notify(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len) + int rados_decode_notify_response(char *reply_buffer, size_t reply_buffer_len, notify_ack_t **acks, size_t *nr_acks, notify_timeout_t **timeouts, size_t *nr_timeouts) + void rados_free_notify_response(notify_ack_t *acks, size_t nr_acks, notify_timeout_t *timeouts) int rados_notify_ack(rados_ioctx_t io, const char *o, uint64_t notify_id, uint64_t cookie, const char *buf, int buf_len) int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie, rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, uint32_t timeout, void *arg) int rados_watch_check(rados_ioctx_t io, uint64_t cookie) @@ -623,8 +634,6 @@ cdef char ** to_bytes_array(list_bytes): ret[i] = list_bytes[i] return ret - - cdef int __monitor_callback(void *arg, const char *line, const char *who, uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg) with gil: @@ -3417,6 +3426,57 @@ returned %d, but should return zero on success." % (self.name, ret)) raise make_ex(ret, "Failed to notify %r" % (obj)) return True + def aio_notify(self, obj: str, + oncomplete: Callable[[Completion, int, Optional[List], Optional[List]], None], + msg: str = '', timeout_ms: int = 5000) -> Completion: + """ + Asynchronously send a rados notification to an object + """ + self.require_ioctx_open() + + msglen = len(msg) + obj_raw = cstr(obj, 'obj') + msg_raw = cstr(msg, 'msg') + + cdef: + Completion completion + char *_obj = obj_raw + char *_msg = msg_raw + int _msglen = msglen + uint64_t _timeout_ms = timeout_ms + char *reply + size_t replylen = 0 + + def oncomplete_(completion_v): + cdef: + Completion _completion_v = completion_v + notify_ack_t *acks = NULL + notify_timeout_t *timeouts = NULL + size_t nr_acks + size_t nr_timeouts + return_value = _completion_v.get_return_value() + if return_value == 0: + return_value = rados_decode_notify_response(reply, replylen, &acks, &nr_acks, &timeouts, &nr_timeouts) + rados_buffer_free(reply) + if return_value == 0: + ack_list = [(ack.notifier_id, ack.cookie, '' if not ack.payload_len \ + else ack.payload[:ack.payload_len]) for ack in acks[:nr_acks]] + timeout_list = [(timeout.notifier_id, timeout.cookie) for timeout in timeouts[:nr_timeouts]] + rados_free_notify_response(acks, nr_acks, timeouts) + return oncomplete(_completion_v, 0, ack_list, timeout_list) + else: + return oncomplete(_completion_v, return_value, None, None) + + completion = self.__get_completion(oncomplete_, None) + self.__track_completion(completion) + with nogil: + ret = rados_aio_notify(self.io, _obj, completion.rados_comp, + _msg, _msglen, _timeout_ms, &reply, &replylen) + if ret < 0: + completion._cleanup() + raise make_ex(ret, "aio_notify error: %s" % obj) + return completion + def watch(self, obj: str, callback: Callable[[int, str, int, bytes], None], error_callback: Optional[Callable[[int], None]] = None, diff --git a/src/test/librados/watch_notify.cc b/src/test/librados/watch_notify.cc index 4154298f0d9d..148131062f78 100644 --- a/src/test/librados/watch_notify.cc +++ b/src/test/librados/watch_notify.cc @@ -326,20 +326,19 @@ TEST_F(LibRadosWatchNotify, AioNotify) { ASSERT_EQ(0, rados_aio_get_return_value(comp)); rados_aio_release(comp); - bufferlist reply; - reply.append(reply_buf, reply_buf_len); - std::map, bufferlist> reply_map; - std::set > missed_map; - auto reply_p = reply.cbegin(); - decode(reply_map, reply_p); - decode(missed_map, reply_p); - ASSERT_EQ(1u, reply_map.size()); - ASSERT_EQ(0u, missed_map.size()); + size_t nr_acks, nr_timeouts; + notify_ack_t *acks = nullptr; + notify_timeout_t *timeouts = nullptr; + ASSERT_EQ(0, rados_decode_notify_response(reply_buf, reply_buf_len, + &acks, &nr_acks, &timeouts, &nr_timeouts)); + ASSERT_EQ(1u, nr_acks); + ASSERT_EQ(0u, nr_timeouts); ASSERT_EQ(1u, notify_cookies.size()); ASSERT_EQ(1u, notify_cookies.count(handle)); - ASSERT_EQ(5u, reply_map.begin()->second.length()); - ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5)); + ASSERT_EQ(5u, acks[0].payload_len); + ASSERT_EQ(0, strncmp("reply", acks[0].payload, acks[0].payload_len)); ASSERT_GT(rados_watch_check(ioctx, handle), 0); + rados_free_notify_response(acks, nr_acks, timeouts); rados_buffer_free(reply_buf); // try it on a non-existent object ... our buffer pointers diff --git a/src/test/pybind/test_rados.py b/src/test/pybind/test_rados.py index 9b54f8028d90..557541610957 100644 --- a/src/test/pybind/test_rados.py +++ b/src/test/pybind/test_rados.py @@ -1248,6 +1248,10 @@ class TestWatchNotify(object): self.notify_cnt = {} self.notify_data = {} self.notify_error = {} + # aio related + self.ack_cnt = {} + self.ack_data = {} + self.instance_id = self.rados.get_instance_id() def tearDown(self): self.ioctx.close() @@ -1323,3 +1327,71 @@ class TestWatchNotify(object): assert_raises(NotConnected, watch1.check) assert_raises(ObjectNotFound, self.ioctx.notify, self.OID, 'test') + + def make_callback_reply(self): + def callback(notify_id, notifier_id, watch_id, data): + with self.lock: + return data + return callback + + def notify_callback(self, _, r, ack_list, timeout_list): + eq(r, 0) + with self.lock: + for notifier_id, _, notifier_data in ack_list: + if notifier_id not in self.ack_cnt: + self.ack_cnt[notifier_id] = 0 + self.ack_cnt[notifier_id] += 1 + self.ack_data[notifier_id] = notifier_data + + def notify_callback_err(self, _, r, ack_list, timeout_list): + eq(r, -errno.ENOENT) + + def test_aio_notify(self): + with self.ioctx.watch(self.OID, self.make_callback_reply(), + self.make_error_callback()) as watch1: + watch_id1 = watch1.get_id() + ok(watch_id1 > 0) + + with self.rados.open_ioctx('test_pool') as ioctx: + watch2 = ioctx.watch(self.OID, self.make_callback_reply(), + self.make_error_callback()) + watch_id2 = watch2.get_id() + ok(watch_id2 > 0) + + comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='test') + comp.wait_for_complete_and_cb() + with self.lock: + ok(self.instance_id in self.ack_cnt) + eq(self.ack_cnt[self.instance_id], 2) + eq(self.ack_data[self.instance_id], b'test') + + ok(watch1.check() >= timedelta()) + ok(watch2.check() >= timedelta()) + + comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='best') + comp.wait_for_complete_and_cb() + with self.lock: + eq(self.ack_cnt[self.instance_id], 4) + eq(self.ack_data[self.instance_id], b'best') + + watch2.close() + + comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='rest') + comp.wait_for_complete_and_cb() + with self.lock: + eq(self.ack_cnt[self.instance_id], 5) + eq(self.ack_data[self.instance_id], b'rest') + + assert(watch1.check() >= timedelta()) + self.ioctx.remove_object(self.OID) + + for i in range(10): + with self.lock: + if watch_id1 in self.notify_error: + break + time.sleep(1) + eq(self.notify_error[watch_id1], -errno.ENOTCONN) + assert_raises(NotConnected, watch1.check) + + comp = self.ioctx.aio_notify(self.OID, self.notify_callback_err, msg='test') + comp.wait_for_complete_and_cb()