From: Adam Emerson Date: Fri, 15 Sep 2023 20:08:51 +0000 (-0400) Subject: neorados: Add Asio-idiomatic watch support X-Git-Tag: testing/wip-vshankar-testing-20250407.170244-debug~16^2~43 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=9b395c8a237f47be40a00a9ca193f33d46900181;p=ceph-ci.git neorados: Add Asio-idiomatic watch support In addition to the librados-style callback function, add a variant of `RADOS::watch` and a `next_notification` function that retrieves the next notification. Rationale: The callback creates an inversion of control that was more noticeable with coroutine heavy workflows. You could co_spawn and detach, but even then there isn't an obvious way to report errors. Signed-off-by: Adam Emerson --- diff --git a/src/include/neorados/RADOS.hpp b/src/include/neorados/RADOS.hpp index 3ac42edb96b..639a0bce57a 100644 --- a/src/include/neorados/RADOS.hpp +++ b/src/include/neorados/RADOS.hpp @@ -1281,6 +1281,14 @@ private: std::aligned_storage_t impl; }; +// Result from `next_notification` +struct Notification { + std::uint64_t notify_id = 0; + std::uint64_t cookie = 0; + std::uint64_t notifier_id = 0; + ceph::buffer::list bl; +}; + // Clang reports a spurious warning that a captured `this` is unused // in the public 'wrapper' functions that construct the completion // handler and pass it to the actual worker member functions. The `this` is @@ -1617,6 +1625,38 @@ public: }, consigned, std::move(o), std::move(ioc), std::move(cb)); } + template CompletionToken> + auto watch(Object o, IOContext ioc, CompletionToken&& token, + std::optional timeout = std::nullopt, + std::uint32_t queue_size = 128u) { + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( + [timeout, queue_size, this] + (auto&& handler, Object o, IOContext ioc) mutable { + watch_(std::move(o), std::move(ioc), std::move(handler), timeout, + queue_size); + }, consigned, std::move(o), std::move(ioc)); + } + + using NextNotificationSig = void(boost::system::error_code ec, + Notification); + using NextNotificationComp = + boost::asio::any_completion_handler; + template CompletionToken> + auto next_notification(uint64_t cookie, CompletionToken&& token) { + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate< + decltype(consigned), NextNotificationSig>( + [cookie, this](auto&& handler) mutable { + next_notification_(cookie, std::move(handler)); + }, consigned); + } + template CompletionToken> auto notify_ack(Object o, IOContext ioc, uint64_t notify_id, uint64_t cookie, @@ -1832,6 +1872,12 @@ private: void watch_(Object o, IOContext ioc, std::optional timeout, WatchCB cb, WatchComp c); + void watch_(Object o, IOContext ioc, WatchComp c, + std::optional timeout, + std::uint32_t queue_size); + void next_notification_(uint64_t cookie, NextNotificationComp c); + tl::expected + watch_check_(std::uint64_t cookie); void notify_ack_(Object o, IOContext _ioc, uint64_t notify_id, uint64_t cookie, @@ -1882,7 +1928,13 @@ private: enum class errc { pool_dne = 1, snap_dne, - invalid_snapcontext + invalid_snapcontext, + // Indicates that notifications were received while the queue was + // full. The watch is still valid and `next_notification` may be + // called again. + notification_overflow, + // Attempted to poll a callback watch + polled_callback_watch }; const boost::system::error_category& error_category() noexcept; diff --git a/src/neorados/RADOS.cc b/src/neorados/RADOS.cc index b5d0eb4a882..1f7e89db882 100644 --- a/src/neorados/RADOS.cc +++ b/src/neorados/RADOS.cc @@ -12,15 +12,36 @@ * */ -#define BOOST_BIND_NO_PLACEHOLDERS - +#include +#include #include +#include +#include #include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "common/async/service.h" + +#define BOOST_BIND_NO_PLACEHOLDERS + #include #include +#include "include/any.h" #include "include/ceph_fs.h" #include "common/ceph_context.h" @@ -45,6 +66,8 @@ namespace bc = boost::container; namespace bs = boost::system; namespace cb = ceph::buffer; +namespace async = ceph::async; + namespace neorados { // Object @@ -1341,6 +1364,129 @@ void RADOS::stat_fs_(std::optional _pool, // --- Watch/Notify +class Notifier : public async::service_list_base_hook { + friend ceph::async::service; + + struct id_and_handler { + uint64_t id = 0; + RADOS::NextNotificationComp comp; + }; + + asio::io_context::executor_type ex; + // Zero for unbounded. I would not recommend this. + const uint32_t capacity; + + async::service& svc; + std::queue> notifications; + std::deque handlers; + std::mutex m; + uint64_t next_id = 0; + + void service_shutdown() { + std::unique_lock l(m); + handlers.clear(); + } + +public: + + Notifier(asio::io_context::executor_type ex, uint32_t capacity) + : ex(ex), capacity(capacity), + svc(asio::use_service>( + asio::query(ex, boost::asio::execution::context))) { + // register for service_shutdown() notifications + svc.add(*this); + } + + ~Notifier() { + while (!handlers.empty()) { + auto h = std::move(handlers.front()); + asio::post(ex, asio::append(std::move(h.comp), + asio::error::operation_aborted, + Notification{})); + handlers.pop_front(); + } + svc.remove(*this); + } + + auto next_tid() { + std::unique_lock l(m); + return ++next_id; + } + void add_handler(uint64_t id, RADOS::NextNotificationComp&& h) { + std::unique_lock l(m); + if (notifications.empty()) { + handlers.push_back({id, std::move(h)}); + } else { + auto [e, n] = std::move(notifications.front()); + notifications.pop(); + l.unlock(); + dispatch(asio::append(std::move(h), std::move(e), std::move(n))); + } + } + + void cancel(uint64_t id) { + std::unique_lock l(m); + for (auto i = handlers.begin(); i != handlers.end(); ++i) { + if (i->id == id) { + dispatch(asio::append(std::move(i->comp), + asio::error::operation_aborted, Notification{})); + handlers.erase(i); + break; + } + } + } + + void operator ()(bs::error_code ec, uint64_t notify_id, uint64_t cookie, + uint64_t notifier_id, buffer::list&& bl) { + std::unique_lock l(m); + if (!handlers.empty()) { + auto h = std::move(handlers.front()); + handlers.pop_front(); + l.unlock(); + dispatch(asio::append(std::move(h.comp), std::move(ec), + Notification{ + .notify_id = notify_id, + .cookie = cookie, + .notifier_id = notifier_id, + .bl = std::move(bl), + })); + } else if (capacity && notifications.size() >= capacity) { + // We are allowed one over, so the client knows where in the + // sequence of notifications we started losing data. + notifications.push({errc::notification_overflow, {}}); + } else { + notifications.push({{}, + Notification{ + .notify_id = notify_id, + .cookie = cookie, + .notifier_id = notifier_id, + .bl = std::move(bl), + }}); + } + } +}; + +struct next_notify_cancellation { + std::weak_ptr ptr; + uint64_t id; + + next_notify_cancellation(std::weak_ptr ptr, uint64_t id) + : ptr(std::move(ptr)), id(id) {} + + void operator ()(asio::cancellation_type_t type) { + if (type == asio::cancellation_type::total || + type == asio::cancellation_type::terminal) { + // Since nobody can cancel until we return (I hope) we shouldn't + // need a mutex or anything. + auto notifier = ptr.lock(); + if (notifier) { + notifier->cancel(id); + } + } + } +}; + + void RADOS::watch_(Object o, IOContext _ioc, std::optional timeout, WatchCB cb, WatchComp c) { @@ -1366,6 +1512,60 @@ void RADOS::watch_(Object o, IOContext _ioc, }), nullptr); } +void RADOS::watch_(Object o, IOContext _ioc, WatchComp c, + std::optional timeout, + std::uint32_t queue_size = 128u) { + auto oid = reinterpret_cast(&o.impl); + auto ioc = reinterpret_cast(&_ioc.impl); + + ObjectOperation op; + + auto linger_op = impl->objecter->linger_register(*oid, ioc->oloc, + ioc->extra_op_flags); + uint64_t cookie = linger_op->get_cookie(); + // Shared pointer to avoid a potential race condition + linger_op->user_data.emplace>( + std::make_shared(get_executor(), queue_size)); + auto& n = ceph::any_cast&>( + linger_op->user_data); + linger_op->handle = std::ref(*n); + op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count()); + bufferlist bl; + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); + impl->objecter->linger_watch( + linger_op, op, ioc->snapc, ceph::real_clock::now(), bl, + asio::bind_executor( + std::move(e), + [c = std::move(c), cookie](bs::error_code e, cb::list) mutable { + asio::dispatch(asio::append(std::move(c), e, cookie)); + }), nullptr); +} + +void RADOS::next_notification_(uint64_t cookie, NextNotificationComp c) { + Objecter::LingerOp* linger_op = reinterpret_cast(cookie); + if (!impl->objecter->is_valid_watch(linger_op)) { + dispatch(asio::append(std::move(c), + bs::error_code(ENOTCONN, bs::generic_category()), + Notification{})); + } else try { + auto n = ceph::any_cast&>( + linger_op->user_data); + // Arrange for cancellation + auto slot = boost::asio::get_associated_cancellation_slot(c); + auto id = n->next_tid(); + if (slot.is_connected()) { + slot.template emplace( + std::weak_ptr(n), id); + } + n->add_handler(id, std::move(c)); + } catch (const std::bad_any_cast&) { + dispatch(asio::append(std::move(c), + bs::error_code(errc::polled_callback_watch), + Notification{})); + } +} + void RADOS::notify_ack_(Object o, IOContext _ioc, uint64_t notify_id, uint64_t cookie, @@ -1775,6 +1975,10 @@ const char* category::message(int ev, char*, return "Snapshot does not exist"; case errc::invalid_snapcontext: return "Invalid snapcontext"; + case errc::notification_overflow: + return "Notificaton overflow"; + case errc::polled_callback_watch: + return "Attempted to poll a callback watch"; } return "Unknown error"; @@ -1792,6 +1996,12 @@ bs::error_condition category::default_error_condition(int ev) const noexcept { return ceph::errc::does_not_exist; case errc::invalid_snapcontext: return bs::errc::invalid_argument; + case errc::notification_overflow: + // I don't know if this is the right choice, but it at least has a + // sense of "Try again". Maybe just map it to itself? + return bs::errc::interrupted; + case errc::polled_callback_watch: + return bs::errc::bad_file_descriptor; } return { ev, *this }; @@ -1808,6 +2018,16 @@ bool category::equivalent(int ev, const bs::error_condition& c) const noexcept { return true; } } + if (static_cast(ev) == errc::notification_overflow) { + if (c == bs::errc::interrupted) { + return true; + } + } + if (static_cast(ev) == errc::polled_callback_watch) { + if (c == bs::errc::invalid_argument) { + return true; + } + } return default_error_condition(ev) == c; } @@ -1820,6 +2040,10 @@ int category::from_code(int ev) const noexcept { return -ENOENT; case errc::invalid_snapcontext: return -EINVAL; + case errc::notification_overflow: + return -EINTR; + case errc::polled_callback_watch: + return -EBADF; } return -EDOM; } diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 862d4d0cdaf..36b6f29cb91 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -2377,6 +2377,11 @@ public: uint64_t cookie, uint64_t notifier_id, ceph::buffer::list&& bl)> handle; + // I am sorely tempted to replace function2 with cxx_function, as + // cxx_function has the `target` method from `std::function` and I + // keep having to do annoying circumlocutions like this one to be + // able to access function object data with function2. + ceph::unique_any user_data; OSDSession *session{nullptr}; int ctx_budget{-1}; diff --git a/src/test/neorados/watch_notify.cc b/src/test/neorados/watch_notify.cc index 901a1e7491a..25efab9058b 100644 --- a/src/test/neorados/watch_notify.cc +++ b/src/test/neorados/watch_notify.cc @@ -9,17 +9,19 @@ * */ +#include #include #include #include #include #include -#include #include #include #include +#include + #include #include @@ -47,6 +49,8 @@ using neorados::WriteOp; using std::uint64_t; +using namespace boost::asio::experimental::awaitable_operators; + class NeoRadosWatchNotifyTest : public NeoRadosTest { protected: buffer::list notify_bl; @@ -171,3 +175,121 @@ CORO_TEST_F(NeoRadosWatchNotify, WatchNotifyTimeout, NeoRadosWatchNotifyTest) { co_return; } + +CORO_TEST_F(NeoRadosWatchNotifyPoll, WatchNotify, NeoRadosTest) { + static constexpr auto oid = "obj"sv; + co_await create_obj(oid); + auto handle = co_await rados().watch(oid, pool(), asio::use_awaitable); + EXPECT_TRUE(rados().check_watch(handle)); + std::vector watchers; + co_await execute(oid, ReadOp{}.list_watchers(&watchers)); + EXPECT_EQ(1u, watchers.size()); + auto notify = [](neorados::RADOS& r, neorados::IOContext ioc) + -> asio::awaitable { + auto reply = co_await r.notify(oid, ioc, {}, {}, asio::use_awaitable); + std::map, buffer::list> reply_map; + std::set> missed_set; + auto p = reply.cbegin(); + decode(reply_map, p); + decode(missed_set, p); + EXPECT_EQ(1u, reply_map.size()); + EXPECT_EQ(5u, reply_map.begin()->second.length()); + EXPECT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5)); + EXPECT_EQ(0u, missed_set.size()); + + co_return; + }(rados(), pool()); + auto poll = [](neorados::RADOS& r, neorados::IOContext ioc, + uint64_t handle) -> asio::awaitable { + auto notification = co_await r.next_notification(handle, + asio::use_awaitable); + co_await r.notify_ack(oid, ioc, notification.notify_id, handle, + to_buffer_list("reply"sv), asio::use_awaitable); + EXPECT_EQ(handle, notification.cookie); + }(rados(), pool(), handle); + + co_await (std::move(notify) && std::move(poll)); + + EXPECT_TRUE(rados().check_watch(handle)); + co_await rados().unwatch(handle, pool(), asio::use_awaitable); + + co_return; +} + +CORO_TEST_F(NeoRadosWatchNotifyPoll, WatchNotifyTimeout, NeoRadosTest) { + static constexpr auto oid = "obj"sv; + static constexpr auto timeout = 1s; + static constexpr auto delay = 3s; + + co_await create_obj(oid); + auto handle = co_await rados().watch(oid, pool(), asio::use_awaitable); + EXPECT_TRUE(rados().check_watch(handle)); + std::vector watchers; + co_await execute(oid, ReadOp{}.list_watchers(&watchers)); + EXPECT_EQ(1u, watchers.size()); + + auto notify = [](neorados::RADOS& r, neorados::IOContext ioc) + -> asio::awaitable { + co_await expect_error_code(r.notify(oid, ioc, {}, timeout, + asio::use_awaitable), + sys::errc::timed_out); + }(rados(), pool()); + + auto ack_slowly = [](neorados::RADOS& r, neorados::IOContext ioc, + uint64_t handle) -> asio::awaitable { + auto notification = co_await r.next_notification(handle, + asio::use_awaitable); + EXPECT_EQ(handle, notification.cookie); + co_await wait_for(delay); + co_await r.notify_ack(oid, ioc, notification.notify_id, handle, + to_buffer_list("reply"sv), asio::use_awaitable); + }(rados(), pool(), handle); + + + co_await (std::move(notify) || std::move(ack_slowly)); + + EXPECT_TRUE(rados().check_watch(handle)); + co_await rados().unwatch(handle, pool(), asio::use_awaitable); + + co_await rados().flush_watch(asio::use_awaitable); + + co_return; +} + +CORO_TEST_F(NeoRadosWatchNotifyPoll, WrongWatchType, NeoRadosTest) { + static constexpr auto oid = "obj"sv; + + co_await create_obj(oid); + auto handle = co_await rados().watch(oid, pool(), std::nullopt, + [](auto&&...) { std::terminate(); }, + asio::use_awaitable); + co_await expect_error_code( + rados().next_notification(handle, asio::use_awaitable), + neorados::errc::polled_callback_watch); + co_await expect_error_code( + rados().next_notification(handle, asio::use_awaitable), + sys::errc::bad_file_descriptor); + co_await expect_error_code( + rados().next_notification(handle, asio::use_awaitable), + sys::errc::invalid_argument); +} + +CORO_TEST_F(NeoRadosWatchNotifyPoll, WatchNotifyCancel, NeoRadosTest) { + static constexpr auto oid = "obj"sv; + + co_await create_obj(oid); + auto handle = co_await rados().watch(oid, pool(), asio::use_awaitable); + EXPECT_TRUE(rados().check_watch(handle)); + std::vector watchers; + co_await execute(oid, ReadOp{}.list_watchers(&watchers)); + EXPECT_EQ(1u, watchers.size()); + + co_await (rados().next_notification(handle, asio::use_awaitable) || + wait_for(50us)); + EXPECT_TRUE(rados().check_watch(handle)); + co_await rados().unwatch(handle, pool(), asio::use_awaitable); + + co_await rados().flush_watch(asio::use_awaitable); + + co_return; +}