std::aligned_storage_t<impl_size> impl;
};
+// Result from `next_notification`
+struct Notification {
+ std::uint64_t notify_id = 0;
+ std::uint64_t cookie = 0;
+ std::uint64_t notifier_id = 0;
+ ceph::buffer::list bl;
+};
+
// Clang reports a spurious warning that a captured `this` is unused
// in the public 'wrapper' functions that construct the completion
// handler and pass it to the actual worker member functions. The `this` is
}, consigned, std::move(o), std::move(ioc), std::move(cb));
}
+ template<boost::asio::completion_token_for<WatchSig> CompletionToken>
+ auto watch(Object o, IOContext ioc, CompletionToken&& token,
+ std::optional<std::chrono::seconds> timeout = std::nullopt,
+ std::uint32_t queue_size = 128u) {
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), WatchSig>(
+ [timeout, queue_size, this]
+ (auto&& handler, Object o, IOContext ioc) mutable {
+ watch_(std::move(o), std::move(ioc), std::move(handler), timeout,
+ queue_size);
+ }, consigned, std::move(o), std::move(ioc));
+ }
+
+ using NextNotificationSig = void(boost::system::error_code ec,
+ Notification);
+ using NextNotificationComp =
+ boost::asio::any_completion_handler<NextNotificationSig>;
+ template<boost::asio::completion_token_for<
+ NextNotificationSig> CompletionToken>
+ auto next_notification(uint64_t cookie, CompletionToken&& token) {
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<
+ decltype(consigned), NextNotificationSig>(
+ [cookie, this](auto&& handler) mutable {
+ next_notification_(cookie, std::move(handler));
+ }, consigned);
+ }
+
template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
auto notify_ack(Object o, IOContext ioc,
uint64_t notify_id, uint64_t cookie,
void watch_(Object o, IOContext ioc,
std::optional<std::chrono::seconds> timeout,
WatchCB cb, WatchComp c);
+ void watch_(Object o, IOContext ioc, WatchComp c,
+ std::optional<std::chrono::seconds> timeout,
+ std::uint32_t queue_size);
+ void next_notification_(uint64_t cookie, NextNotificationComp c);
+ tl::expected<ceph::timespan, boost::system::error_code>
+ watch_check_(std::uint64_t cookie);
void notify_ack_(Object o, IOContext _ioc,
uint64_t notify_id,
uint64_t cookie,
enum class errc {
pool_dne = 1,
snap_dne,
- invalid_snapcontext
+ invalid_snapcontext,
+ // Indicates that notifications were received while the queue was
+ // full. The watch is still valid and `next_notification` may be
+ // called again.
+ notification_overflow,
+ // Attempted to poll a callback watch
+ polled_callback_watch
};
const boost::system::error_category& error_category() noexcept;
*
*/
-#define BOOST_BIND_NO_PLACEHOLDERS
-
+#include <boost/asio/associated_executor.hpp>
+#include <boost/asio/error.hpp>
#include <optional>
+#include <deque>
+#include <queue>
#include <string_view>
+#include <boost/asio/execution/context.hpp>
+
+#include <boost/asio/any_completion_handler.hpp>
+#include <boost/asio/append.hpp>
+#include <boost/asio/async_result.hpp>
+#include <boost/asio/consign.hpp>
+#include <boost/asio/error.hpp>
+#include <boost/asio/execution_context.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/query.hpp>
+#include <boost/asio/strand.hpp>
+
+#include <boost/system/error_code.hpp>
+
+#include "common/async/service.h"
+
+#define BOOST_BIND_NO_PLACEHOLDERS
+
#include <boost/intrusive_ptr.hpp>
#include <fmt/format.h>
+#include "include/any.h"
#include "include/ceph_fs.h"
#include "common/ceph_context.h"
namespace bs = boost::system;
namespace cb = ceph::buffer;
+namespace async = ceph::async;
+
namespace neorados {
// Object
// --- Watch/Notify
+class Notifier : public async::service_list_base_hook {
+ friend ceph::async::service<Notifier>;
+
+ struct id_and_handler {
+ uint64_t id = 0;
+ RADOS::NextNotificationComp comp;
+ };
+
+ asio::io_context::executor_type ex;
+ // Zero for unbounded. I would not recommend this.
+ const uint32_t capacity;
+
+ async::service<Notifier>& svc;
+ std::queue<std::pair<bs::error_code, Notification>> notifications;
+ std::deque<id_and_handler> handlers;
+ std::mutex m;
+ uint64_t next_id = 0;
+
+ void service_shutdown() {
+ std::unique_lock l(m);
+ handlers.clear();
+ }
+
+public:
+
+ Notifier(asio::io_context::executor_type ex, uint32_t capacity)
+ : ex(ex), capacity(capacity),
+ svc(asio::use_service<async::service<Notifier>>(
+ asio::query(ex, boost::asio::execution::context))) {
+ // register for service_shutdown() notifications
+ svc.add(*this);
+ }
+
+ ~Notifier() {
+ while (!handlers.empty()) {
+ auto h = std::move(handlers.front());
+ asio::post(ex, asio::append(std::move(h.comp),
+ asio::error::operation_aborted,
+ Notification{}));
+ handlers.pop_front();
+ }
+ svc.remove(*this);
+ }
+
+ auto next_tid() {
+ std::unique_lock l(m);
+ return ++next_id;
+ }
+ void add_handler(uint64_t id, RADOS::NextNotificationComp&& h) {
+ std::unique_lock l(m);
+ if (notifications.empty()) {
+ handlers.push_back({id, std::move(h)});
+ } else {
+ auto [e, n] = std::move(notifications.front());
+ notifications.pop();
+ l.unlock();
+ dispatch(asio::append(std::move(h), std::move(e), std::move(n)));
+ }
+ }
+
+ void cancel(uint64_t id) {
+ std::unique_lock l(m);
+ for (auto i = handlers.begin(); i != handlers.end(); ++i) {
+ if (i->id == id) {
+ dispatch(asio::append(std::move(i->comp),
+ asio::error::operation_aborted, Notification{}));
+ handlers.erase(i);
+ break;
+ }
+ }
+ }
+
+ void operator ()(bs::error_code ec, uint64_t notify_id, uint64_t cookie,
+ uint64_t notifier_id, buffer::list&& bl) {
+ std::unique_lock l(m);
+ if (!handlers.empty()) {
+ auto h = std::move(handlers.front());
+ handlers.pop_front();
+ l.unlock();
+ dispatch(asio::append(std::move(h.comp), std::move(ec),
+ Notification{
+ .notify_id = notify_id,
+ .cookie = cookie,
+ .notifier_id = notifier_id,
+ .bl = std::move(bl),
+ }));
+ } else if (capacity && notifications.size() >= capacity) {
+ // We are allowed one over, so the client knows where in the
+ // sequence of notifications we started losing data.
+ notifications.push({errc::notification_overflow, {}});
+ } else {
+ notifications.push({{},
+ Notification{
+ .notify_id = notify_id,
+ .cookie = cookie,
+ .notifier_id = notifier_id,
+ .bl = std::move(bl),
+ }});
+ }
+ }
+};
+
+struct next_notify_cancellation {
+ std::weak_ptr<Notifier> ptr;
+ uint64_t id;
+
+ next_notify_cancellation(std::weak_ptr<Notifier> ptr, uint64_t id)
+ : ptr(std::move(ptr)), id(id) {}
+
+ void operator ()(asio::cancellation_type_t type) {
+ if (type == asio::cancellation_type::total ||
+ type == asio::cancellation_type::terminal) {
+ // Since nobody can cancel until we return (I hope) we shouldn't
+ // need a mutex or anything.
+ auto notifier = ptr.lock();
+ if (notifier) {
+ notifier->cancel(id);
+ }
+ }
+ }
+};
+
+
void RADOS::watch_(Object o, IOContext _ioc,
std::optional<std::chrono::seconds> timeout, WatchCB cb,
WatchComp c) {
}), nullptr);
}
+void RADOS::watch_(Object o, IOContext _ioc, WatchComp c,
+ std::optional<std::chrono::seconds> timeout,
+ std::uint32_t queue_size = 128u) {
+ auto oid = reinterpret_cast<const object_t*>(&o.impl);
+ auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+
+ ObjectOperation op;
+
+ auto linger_op = impl->objecter->linger_register(*oid, ioc->oloc,
+ ioc->extra_op_flags);
+ uint64_t cookie = linger_op->get_cookie();
+ // Shared pointer to avoid a potential race condition
+ linger_op->user_data.emplace<std::shared_ptr<Notifier>>(
+ std::make_shared<Notifier>(get_executor(), queue_size));
+ auto& n = ceph::any_cast<std::shared_ptr<Notifier>&>(
+ linger_op->user_data);
+ linger_op->handle = std::ref(*n);
+ op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count());
+ bufferlist bl;
+ auto e = asio::prefer(get_executor(),
+ asio::execution::outstanding_work.tracked);
+ impl->objecter->linger_watch(
+ linger_op, op, ioc->snapc, ceph::real_clock::now(), bl,
+ asio::bind_executor(
+ std::move(e),
+ [c = std::move(c), cookie](bs::error_code e, cb::list) mutable {
+ asio::dispatch(asio::append(std::move(c), e, cookie));
+ }), nullptr);
+}
+
+void RADOS::next_notification_(uint64_t cookie, NextNotificationComp c) {
+ Objecter::LingerOp* linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+ if (!impl->objecter->is_valid_watch(linger_op)) {
+ dispatch(asio::append(std::move(c),
+ bs::error_code(ENOTCONN, bs::generic_category()),
+ Notification{}));
+ } else try {
+ auto n = ceph::any_cast<std::shared_ptr<Notifier>&>(
+ linger_op->user_data);
+ // Arrange for cancellation
+ auto slot = boost::asio::get_associated_cancellation_slot(c);
+ auto id = n->next_tid();
+ if (slot.is_connected()) {
+ slot.template emplace<next_notify_cancellation>(
+ std::weak_ptr<Notifier>(n), id);
+ }
+ n->add_handler(id, std::move(c));
+ } catch (const std::bad_any_cast&) {
+ dispatch(asio::append(std::move(c),
+ bs::error_code(errc::polled_callback_watch),
+ Notification{}));
+ }
+}
+
void RADOS::notify_ack_(Object o, IOContext _ioc,
uint64_t notify_id,
uint64_t cookie,
return "Snapshot does not exist";
case errc::invalid_snapcontext:
return "Invalid snapcontext";
+ case errc::notification_overflow:
+ return "Notificaton overflow";
+ case errc::polled_callback_watch:
+ return "Attempted to poll a callback watch";
}
return "Unknown error";
return ceph::errc::does_not_exist;
case errc::invalid_snapcontext:
return bs::errc::invalid_argument;
+ case errc::notification_overflow:
+ // I don't know if this is the right choice, but it at least has a
+ // sense of "Try again". Maybe just map it to itself?
+ return bs::errc::interrupted;
+ case errc::polled_callback_watch:
+ return bs::errc::bad_file_descriptor;
}
return { ev, *this };
return true;
}
}
+ if (static_cast<errc>(ev) == errc::notification_overflow) {
+ if (c == bs::errc::interrupted) {
+ return true;
+ }
+ }
+ if (static_cast<errc>(ev) == errc::polled_callback_watch) {
+ if (c == bs::errc::invalid_argument) {
+ return true;
+ }
+ }
return default_error_condition(ev) == c;
}
return -ENOENT;
case errc::invalid_snapcontext:
return -EINVAL;
+ case errc::notification_overflow:
+ return -EINTR;
+ case errc::polled_callback_watch:
+ return -EBADF;
}
return -EDOM;
}
uint64_t cookie,
uint64_t notifier_id,
ceph::buffer::list&& bl)> handle;
+ // I am sorely tempted to replace function2 with cxx_function, as
+ // cxx_function has the `target` method from `std::function` and I
+ // keep having to do annoying circumlocutions like this one to be
+ // able to access function object data with function2.
+ ceph::unique_any user_data;
OSDSession *session{nullptr};
int ctx_budget{-1};
*
*/
+#include <boost/system/detail/errc.hpp>
#include <coroutine>
#include <cstdint>
#include <iostream>
#include <utility>
#include <vector>
-#include <boost/asio/as_tuple.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/use_awaitable.hpp>
+#include <boost/asio/experimental/awaitable_operators.hpp>
+
#include <boost/container/flat_set.hpp>
#include <boost/system/errc.hpp>
using std::uint64_t;
+using namespace boost::asio::experimental::awaitable_operators;
+
class NeoRadosWatchNotifyTest : public NeoRadosTest {
protected:
buffer::list notify_bl;
co_return;
}
+
+CORO_TEST_F(NeoRadosWatchNotifyPoll, WatchNotify, NeoRadosTest) {
+ static constexpr auto oid = "obj"sv;
+ co_await create_obj(oid);
+ auto handle = co_await rados().watch(oid, pool(), asio::use_awaitable);
+ EXPECT_TRUE(rados().check_watch(handle));
+ std::vector<neorados::ObjWatcher> watchers;
+ co_await execute(oid, ReadOp{}.list_watchers(&watchers));
+ EXPECT_EQ(1u, watchers.size());
+ auto notify = [](neorados::RADOS& r, neorados::IOContext ioc)
+ -> asio::awaitable<void> {
+ auto reply = co_await r.notify(oid, ioc, {}, {}, asio::use_awaitable);
+ std::map<std::pair<uint64_t, uint64_t>, buffer::list> reply_map;
+ std::set<std::pair<uint64_t, uint64_t>> missed_set;
+ auto p = reply.cbegin();
+ decode(reply_map, p);
+ decode(missed_set, p);
+ EXPECT_EQ(1u, reply_map.size());
+ EXPECT_EQ(5u, reply_map.begin()->second.length());
+ EXPECT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+ EXPECT_EQ(0u, missed_set.size());
+
+ co_return;
+ }(rados(), pool());
+ auto poll = [](neorados::RADOS& r, neorados::IOContext ioc,
+ uint64_t handle) -> asio::awaitable<void> {
+ auto notification = co_await r.next_notification(handle,
+ asio::use_awaitable);
+ co_await r.notify_ack(oid, ioc, notification.notify_id, handle,
+ to_buffer_list("reply"sv), asio::use_awaitable);
+ EXPECT_EQ(handle, notification.cookie);
+ }(rados(), pool(), handle);
+
+ co_await (std::move(notify) && std::move(poll));
+
+ EXPECT_TRUE(rados().check_watch(handle));
+ co_await rados().unwatch(handle, pool(), asio::use_awaitable);
+
+ co_return;
+}
+
+CORO_TEST_F(NeoRadosWatchNotifyPoll, WatchNotifyTimeout, NeoRadosTest) {
+ static constexpr auto oid = "obj"sv;
+ static constexpr auto timeout = 1s;
+ static constexpr auto delay = 3s;
+
+ co_await create_obj(oid);
+ auto handle = co_await rados().watch(oid, pool(), asio::use_awaitable);
+ EXPECT_TRUE(rados().check_watch(handle));
+ std::vector<neorados::ObjWatcher> watchers;
+ co_await execute(oid, ReadOp{}.list_watchers(&watchers));
+ EXPECT_EQ(1u, watchers.size());
+
+ auto notify = [](neorados::RADOS& r, neorados::IOContext ioc)
+ -> asio::awaitable<void> {
+ co_await expect_error_code(r.notify(oid, ioc, {}, timeout,
+ asio::use_awaitable),
+ sys::errc::timed_out);
+ }(rados(), pool());
+
+ auto ack_slowly = [](neorados::RADOS& r, neorados::IOContext ioc,
+ uint64_t handle) -> asio::awaitable<void> {
+ auto notification = co_await r.next_notification(handle,
+ asio::use_awaitable);
+ EXPECT_EQ(handle, notification.cookie);
+ co_await wait_for(delay);
+ co_await r.notify_ack(oid, ioc, notification.notify_id, handle,
+ to_buffer_list("reply"sv), asio::use_awaitable);
+ }(rados(), pool(), handle);
+
+
+ co_await (std::move(notify) || std::move(ack_slowly));
+
+ EXPECT_TRUE(rados().check_watch(handle));
+ co_await rados().unwatch(handle, pool(), asio::use_awaitable);
+
+ co_await rados().flush_watch(asio::use_awaitable);
+
+ co_return;
+}
+
+CORO_TEST_F(NeoRadosWatchNotifyPoll, WrongWatchType, NeoRadosTest) {
+ static constexpr auto oid = "obj"sv;
+
+ co_await create_obj(oid);
+ auto handle = co_await rados().watch(oid, pool(), std::nullopt,
+ [](auto&&...) { std::terminate(); },
+ asio::use_awaitable);
+ co_await expect_error_code(
+ rados().next_notification(handle, asio::use_awaitable),
+ neorados::errc::polled_callback_watch);
+ co_await expect_error_code(
+ rados().next_notification(handle, asio::use_awaitable),
+ sys::errc::bad_file_descriptor);
+ co_await expect_error_code(
+ rados().next_notification(handle, asio::use_awaitable),
+ sys::errc::invalid_argument);
+}
+
+CORO_TEST_F(NeoRadosWatchNotifyPoll, WatchNotifyCancel, NeoRadosTest) {
+ static constexpr auto oid = "obj"sv;
+
+ co_await create_obj(oid);
+ auto handle = co_await rados().watch(oid, pool(), asio::use_awaitable);
+ EXPECT_TRUE(rados().check_watch(handle));
+ std::vector<neorados::ObjWatcher> watchers;
+ co_await execute(oid, ReadOp{}.list_watchers(&watchers));
+ EXPECT_EQ(1u, watchers.size());
+
+ co_await (rados().next_notification(handle, asio::use_awaitable) ||
+ wait_for(50us));
+ EXPECT_TRUE(rados().check_watch(handle));
+ co_await rados().unwatch(handle, pool(), asio::use_awaitable);
+
+ co_await rados().flush_watch(asio::use_awaitable);
+
+ co_return;
+}