To keep consistency between the C and C++ APIs.
Signed-off-by: Venky Shankar <vshankar@redhat.com>
bufferlist& bl, ///< optional broadcast payload
uint64_t timeout_ms, ///< timeout (in ms)
bufferlist *pbl); ///< reply buffer
+ /*
+ * Decode a notify response into acks and timeout vectors.
+ */
+ void decode_notify_response(bufferlist &bl,
+ std::vector<librados::notify_ack_t> *acks,
+ std::vector<librados::notify_timeout_t> *timeouts);
int list_watchers(const std::string& o, std::list<obj_watch_t> *out_watchers);
int list_snaps(const std::string& o, snap_set_t *out_snaps);
*/
const std::string all_nspaces(LIBRADOS_ALL_NSPACES);
+struct notify_ack_t {
+ uint64_t notifier_id;
+ uint64_t cookie;
+ ceph::bufferlist payload_bl;
+};
+
+struct notify_timeout_t {
+ uint64_t notifier_id;
+ uint64_t cookie;
+};
}
#endif
NULL);
}
+void librados::IoCtx::decode_notify_response(bufferlist &bl,
+ std::vector<librados::notify_ack_t> *acks,
+ std::vector<librados::notify_timeout_t> *timeouts)
+{
+ map<pair<uint64_t,uint64_t>,bufferlist> acked;
+ set<pair<uint64_t,uint64_t>> missed;
+
+ auto iter = bl.cbegin();
+ decode(acked, iter);
+ decode(missed, iter);
+
+ for (auto &[who, payload] : acked) {
+ acks->emplace_back(librados::notify_ack_t{who.first, who.second, payload});
+ }
+ for (auto &[notifier_id, cookie] : missed) {
+ timeouts->emplace_back(librados::notify_timeout_t{notifier_id, cookie});
+ }
+}
+
void librados::IoCtx::notify_ack(const std::string& o,
uint64_t notify_id, uint64_t handle,
bufferlist& bl)
ASSERT_EQ(0, comp->wait_for_complete());
ASSERT_EQ(0, comp->get_return_value());
comp->release();
- auto p = bl_reply.cbegin();
- std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
- std::set<std::pair<uint64_t,uint64_t> > missed_map;
- decode(reply_map, p);
- decode(missed_map, p);
+ std::vector<librados::notify_ack_t> acks;
+ std::vector<librados::notify_timeout_t> timeouts;
+ ioctx.decode_notify_response(bl_reply, &acks, &timeouts);
ASSERT_EQ(1u, notify_cookies.size());
ASSERT_EQ(1u, notify_cookies.count(handle));
- ASSERT_EQ(1u, reply_map.size());
- ASSERT_EQ(5u, reply_map.begin()->second.length());
- ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
- ASSERT_EQ(0u, missed_map.size());
+ ASSERT_EQ(1u, acks.size());
+ ASSERT_EQ(5u, acks[0].payload_bl.length());
+ ASSERT_EQ(0, strncmp("reply", acks[0].payload_bl.c_str(), acks[0].payload_bl.length()));
+ ASSERT_EQ(0u, timeouts.size());
ASSERT_GT(ioctx.watch_check(handle), 0);
ioctx.unwatch2(handle);
cluster.watch_flush();