uint64_t timeout_ms,
char **reply_buffer, size_t *reply_buffer_len);
+/**
+ * Decode a notify response
+ *
+ * Decode a notify response (from rados_aio_notify() call) into acks and
+ * timeout arrays.
+ *
+ * @param reply_buffer buffer from rados_aio_notify() call
+ * @param reply_buffer_len reply_buffer length
+ * @param acks pointer to struct notify_ack_t pointer
+ * @param nr_acks pointer to ack count
+ * @param timeouts pointer to notify_timeout_t pointer
+ * @param nr_timeouts pointer to timeout count
+ * @returns 0 on success
+ */
+CEPH_RADOS_API int rados_decode_notify_response(char *reply_buffer, size_t reply_buffer_len,
+ struct notify_ack_t **acks, size_t *nr_acks,
+ struct notify_timeout_t **timeouts, size_t *nr_timeouts);
+
+/**
+ * Free notify allocated buffer
+ *
+ * Release memory allocated by rados_decode_notify_response() call
+ *
+ * @param acks notify_ack_t struct (from rados_decode_notify_response())
+ * @param nr_acks ack count
+ * @param timeouts notify_timeout_t struct (from rados_decode_notify_response())
+ */
+CEPH_RADOS_API void rados_free_notify_response(struct notify_ack_t *acks, size_t nr_acks,
+ struct notify_timeout_t *timeouts);
+
/**
* Acknolwedge receipt of a notify
*
uint32_t timeout_seconds;
};
+struct notify_ack_t {
+ uint64_t notifier_id;
+ uint64_t cookie;
+ char *payload;
+ uint64_t payload_len;
+};
+
+struct notify_timeout_t {
+ uint64_t notifier_id;
+ uint64_t cookie;
+};
+
/**
*
* Pass as nspace argument to rados_ioctx_set_namespace()
}
LIBRADOS_C_API_BASE_DEFAULT(rados_notify2);
+extern "C" int _rados_decode_notify_response(char *reply_buffer, size_t reply_buffer_len,
+ struct notify_ack_t **acks, size_t *nr_acks,
+ struct notify_timeout_t **timeouts, size_t *nr_timeouts) {
+ if (!reply_buffer || !reply_buffer_len) {
+ return -EINVAL;
+ }
+
+ bufferlist bl;
+ bl.append(reply_buffer, reply_buffer_len);
+
+ map<pair<uint64_t,uint64_t>,bufferlist> acked;
+ set<pair<uint64_t,uint64_t>> missed;
+ auto iter = bl.cbegin();
+ decode(acked, iter);
+ decode(missed, iter);
+
+ *acks = nullptr;
+ *nr_acks = acked.size();
+ if (*nr_acks) {
+ *acks = new notify_ack_t[*nr_acks];
+ struct notify_ack_t *ack = *acks;
+ for (auto &[who, payload] : acked) {
+ ack->notifier_id = who.first;
+ ack->cookie = who.second;
+ ack->payload = nullptr;
+ ack->payload_len = payload.length();
+ if (ack->payload_len) {
+ ack->payload = (char *)malloc(ack->payload_len);
+ memcpy(ack->payload, payload.c_str(), ack->payload_len);
+ }
+
+ ack++;
+ }
+ }
+
+ *timeouts = nullptr;
+ *nr_timeouts = missed.size();
+ if (*nr_timeouts) {
+ *timeouts = new notify_timeout_t[*nr_timeouts];
+ struct notify_timeout_t *timeout = *timeouts;
+ for (auto &[notifier_id, cookie] : missed) {
+ timeout->notifier_id = notifier_id;
+ timeout->cookie = cookie;
+ timeout++;
+ }
+ }
+
+ return 0;
+}
+LIBRADOS_C_API_BASE_DEFAULT(rados_decode_notify_response);
+
+
+extern "C" void _rados_free_notify_response(struct notify_ack_t *acks, size_t nr_acks,
+ struct notify_timeout_t *timeouts) {
+ for (uint64_t n = 0; n < nr_acks; ++n) {
+ assert(acks);
+ if (acks[n].payload) {
+ free(acks[n].payload);
+ }
+ }
+ if (acks) {
+ delete[] acks;
+ }
+ if (timeouts) {
+ delete[] timeouts;
+ }
+}
+LIBRADOS_C_API_BASE_DEFAULT(rados_free_notify_response);
+
extern "C" int _rados_aio_notify(rados_ioctx_t io, const char *o,
rados_completion_t completion,
const char *buf, int buf_len,
from datetime import datetime, timedelta
from functools import partial, wraps
from itertools import chain
-from typing import Callable, Dict, Optional, Sequence, Tuple, Union
+from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
cdef extern from "Python.h":
# These are in cpython/string.pxd, but use "object" types instead of
cdef extern from "rados/rados_types.h" nogil:
cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
+ cdef struct notify_ack_t:
+ unsigned long notifier_id
+ unsigned long cookie
+ char *payload
+ unsigned long payload_len
+ cdef struct notify_timeout_t:
+ unsigned long notifier_id
+ unsigned long cookie
cdef extern from "rados/librados.h" nogil:
enum:
int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
void rados_omap_get_end(rados_omap_iter_t iter)
int rados_notify2(rados_ioctx_t io, const char * o, const char *buf, int buf_len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len)
+ int rados_aio_notify(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len)
+ int rados_decode_notify_response(char *reply_buffer, size_t reply_buffer_len, notify_ack_t **acks, size_t *nr_acks, notify_timeout_t **timeouts, size_t *nr_timeouts)
+ void rados_free_notify_response(notify_ack_t *acks, size_t nr_acks, notify_timeout_t *timeouts)
int rados_notify_ack(rados_ioctx_t io, const char *o, uint64_t notify_id, uint64_t cookie, const char *buf, int buf_len)
int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie, rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, uint32_t timeout, void *arg)
int rados_watch_check(rados_ioctx_t io, uint64_t cookie)
ret[i] = <char *>list_bytes[i]
return ret
-
-
cdef int __monitor_callback(void *arg, const char *line, const char *who,
uint64_t sec, uint64_t nsec, uint64_t seq,
const char *level, const char *msg) with gil:
raise make_ex(ret, "Failed to notify %r" % (obj))
return True
+ def aio_notify(self, obj: str,
+ oncomplete: Callable[[Completion, int, Optional[List], Optional[List]], None],
+ msg: str = '', timeout_ms: int = 5000) -> Completion:
+ """
+ Asynchronously send a rados notification to an object
+ """
+ self.require_ioctx_open()
+
+ msglen = len(msg)
+ obj_raw = cstr(obj, 'obj')
+ msg_raw = cstr(msg, 'msg')
+
+ cdef:
+ Completion completion
+ char *_obj = obj_raw
+ char *_msg = msg_raw
+ int _msglen = msglen
+ uint64_t _timeout_ms = timeout_ms
+ char *reply
+ size_t replylen = 0
+
+ def oncomplete_(completion_v):
+ cdef:
+ Completion _completion_v = completion_v
+ notify_ack_t *acks = NULL
+ notify_timeout_t *timeouts = NULL
+ size_t nr_acks
+ size_t nr_timeouts
+ return_value = _completion_v.get_return_value()
+ if return_value == 0:
+ return_value = rados_decode_notify_response(reply, replylen, &acks, &nr_acks, &timeouts, &nr_timeouts)
+ rados_buffer_free(reply)
+ if return_value == 0:
+ ack_list = [(ack.notifier_id, ack.cookie, '' if not ack.payload_len \
+ else ack.payload[:ack.payload_len]) for ack in acks[:nr_acks]]
+ timeout_list = [(timeout.notifier_id, timeout.cookie) for timeout in timeouts[:nr_timeouts]]
+ rados_free_notify_response(acks, nr_acks, timeouts)
+ return oncomplete(_completion_v, 0, ack_list, timeout_list)
+ else:
+ return oncomplete(_completion_v, return_value, None, None)
+
+ completion = self.__get_completion(oncomplete_, None)
+ self.__track_completion(completion)
+ with nogil:
+ ret = rados_aio_notify(self.io, _obj, completion.rados_comp,
+ _msg, _msglen, _timeout_ms, &reply, &replylen)
+ if ret < 0:
+ completion._cleanup()
+ raise make_ex(ret, "aio_notify error: %s" % obj)
+ return completion
+
def watch(self, obj: str,
callback: Callable[[int, str, int, bytes], None],
error_callback: Optional[Callable[[int], None]] = None,
ASSERT_EQ(0, rados_aio_get_return_value(comp));
rados_aio_release(comp);
- bufferlist reply;
- reply.append(reply_buf, reply_buf_len);
- std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
- std::set<std::pair<uint64_t,uint64_t> > missed_map;
- auto reply_p = reply.cbegin();
- decode(reply_map, reply_p);
- decode(missed_map, reply_p);
- ASSERT_EQ(1u, reply_map.size());
- ASSERT_EQ(0u, missed_map.size());
+ size_t nr_acks, nr_timeouts;
+ notify_ack_t *acks = nullptr;
+ notify_timeout_t *timeouts = nullptr;
+ ASSERT_EQ(0, rados_decode_notify_response(reply_buf, reply_buf_len,
+ &acks, &nr_acks, &timeouts, &nr_timeouts));
+ ASSERT_EQ(1u, nr_acks);
+ ASSERT_EQ(0u, nr_timeouts);
ASSERT_EQ(1u, notify_cookies.size());
ASSERT_EQ(1u, notify_cookies.count(handle));
- ASSERT_EQ(5u, reply_map.begin()->second.length());
- ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+ ASSERT_EQ(5u, acks[0].payload_len);
+ ASSERT_EQ(0, strncmp("reply", acks[0].payload, acks[0].payload_len));
ASSERT_GT(rados_watch_check(ioctx, handle), 0);
+ rados_free_notify_response(acks, nr_acks, timeouts);
rados_buffer_free(reply_buf);
// try it on a non-existent object ... our buffer pointers
self.notify_cnt = {}
self.notify_data = {}
self.notify_error = {}
+ # aio related
+ self.ack_cnt = {}
+ self.ack_data = {}
+ self.instance_id = self.rados.get_instance_id()
def tearDown(self):
self.ioctx.close()
assert_raises(NotConnected, watch1.check)
assert_raises(ObjectNotFound, self.ioctx.notify, self.OID, 'test')
+
+ def make_callback_reply(self):
+ def callback(notify_id, notifier_id, watch_id, data):
+ with self.lock:
+ return data
+ return callback
+
+ def notify_callback(self, _, r, ack_list, timeout_list):
+ eq(r, 0)
+ with self.lock:
+ for notifier_id, _, notifier_data in ack_list:
+ if notifier_id not in self.ack_cnt:
+ self.ack_cnt[notifier_id] = 0
+ self.ack_cnt[notifier_id] += 1
+ self.ack_data[notifier_id] = notifier_data
+
+ def notify_callback_err(self, _, r, ack_list, timeout_list):
+ eq(r, -errno.ENOENT)
+
+ def test_aio_notify(self):
+ with self.ioctx.watch(self.OID, self.make_callback_reply(),
+ self.make_error_callback()) as watch1:
+ watch_id1 = watch1.get_id()
+ ok(watch_id1 > 0)
+
+ with self.rados.open_ioctx('test_pool') as ioctx:
+ watch2 = ioctx.watch(self.OID, self.make_callback_reply(),
+ self.make_error_callback())
+ watch_id2 = watch2.get_id()
+ ok(watch_id2 > 0)
+
+ comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='test')
+ comp.wait_for_complete_and_cb()
+ with self.lock:
+ ok(self.instance_id in self.ack_cnt)
+ eq(self.ack_cnt[self.instance_id], 2)
+ eq(self.ack_data[self.instance_id], b'test')
+
+ ok(watch1.check() >= timedelta())
+ ok(watch2.check() >= timedelta())
+
+ comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='best')
+ comp.wait_for_complete_and_cb()
+ with self.lock:
+ eq(self.ack_cnt[self.instance_id], 4)
+ eq(self.ack_data[self.instance_id], b'best')
+
+ watch2.close()
+
+ comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='rest')
+ comp.wait_for_complete_and_cb()
+ with self.lock:
+ eq(self.ack_cnt[self.instance_id], 5)
+ eq(self.ack_data[self.instance_id], b'rest')
+
+ assert(watch1.check() >= timedelta())
+ self.ioctx.remove_object(self.OID)
+
+ for i in range(10):
+ with self.lock:
+ if watch_id1 in self.notify_error:
+ break
+ time.sleep(1)
+ eq(self.notify_error[watch_id1], -errno.ENOTCONN)
+ assert_raises(NotConnected, watch1.check)
+
+ comp = self.ioctx.aio_notify(self.OID, self.notify_callback_err, msg='test')
+ comp.wait_for_complete_and_cb()