]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
neorados: Add Asio-idiomatic watch support
authorAdam Emerson <aemerson@redhat.com>
Fri, 15 Sep 2023 20:08:51 +0000 (16:08 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Apr 2025 15:10:13 +0000 (11:10 -0400)
In addition to the librados-style callback function, add a variant of
`RADOS::watch` and a `next_notification` function that retrieves the
next notification.

Rationale:

The callback creates an inversion of control that was more noticeable
with coroutine heavy workflows. You could co_spawn and detach, but
even then there isn't an obvious way to report errors.

Signed-off-by: Adam Emerson <aemerson@redhat.com>
src/include/neorados/RADOS.hpp
src/neorados/RADOS.cc
src/osdc/Objecter.h
src/test/neorados/watch_notify.cc

index 3ac42edb96b386e7aa5148f9f6a52a051b882521..639a0bce57a78a9acbd1579a8e5901e4ed742e88 100644 (file)
@@ -1281,6 +1281,14 @@ private:
   std::aligned_storage_t<impl_size> impl;
 };
 
+// Result from `next_notification`
+struct Notification {
+  std::uint64_t notify_id = 0;
+  std::uint64_t cookie = 0;
+  std::uint64_t notifier_id = 0;
+  ceph::buffer::list bl;
+};
+
 // Clang reports a spurious warning that a captured `this` is unused
 // in the public 'wrapper' functions that construct the completion
 // handler and pass it to the actual worker member functions. The `this` is
@@ -1617,6 +1625,38 @@ public:
       }, consigned, std::move(o), std::move(ioc), std::move(cb));
   }
 
+  template<boost::asio::completion_token_for<WatchSig> CompletionToken>
+  auto watch(Object o, IOContext ioc, CompletionToken&& token,
+            std::optional<std::chrono::seconds> timeout = std::nullopt,
+            std::uint32_t queue_size = 128u) {
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), WatchSig>(
+      [timeout, queue_size, this]
+      (auto&& handler, Object o, IOContext ioc) mutable {
+       watch_(std::move(o), std::move(ioc), std::move(handler), timeout,
+              queue_size);
+      }, consigned, std::move(o), std::move(ioc));
+  }
+
+  using NextNotificationSig = void(boost::system::error_code ec,
+                                  Notification);
+  using NextNotificationComp =
+    boost::asio::any_completion_handler<NextNotificationSig>;
+  template<boost::asio::completion_token_for<
+            NextNotificationSig> CompletionToken>
+  auto next_notification(uint64_t cookie, CompletionToken&& token) {
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<
+      decltype(consigned), NextNotificationSig>(
+       [cookie, this](auto&& handler) mutable {
+       next_notification_(cookie, std::move(handler));
+       }, consigned);
+  }
+
   template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto notify_ack(Object o, IOContext ioc,
                  uint64_t notify_id, uint64_t cookie,
@@ -1832,6 +1872,12 @@ private:
   void watch_(Object o, IOContext ioc,
              std::optional<std::chrono::seconds> timeout,
              WatchCB cb, WatchComp c);
+  void watch_(Object o, IOContext ioc, WatchComp c,
+             std::optional<std::chrono::seconds> timeout,
+             std::uint32_t queue_size);
+  void next_notification_(uint64_t cookie, NextNotificationComp c);
+  tl::expected<ceph::timespan, boost::system::error_code>
+  watch_check_(std::uint64_t cookie);
   void notify_ack_(Object o, IOContext _ioc,
                   uint64_t notify_id,
                   uint64_t cookie,
@@ -1882,7 +1928,13 @@ private:
 enum class errc {
   pool_dne = 1,
   snap_dne,
-  invalid_snapcontext
+  invalid_snapcontext,
+  // Indicates that notifications were received while the queue was
+  // full. The watch is still valid and `next_notification` may be
+  // called again.
+  notification_overflow,
+  // Attempted to poll a callback watch
+  polled_callback_watch
 };
 
 const boost::system::error_category& error_category() noexcept;
index b5d0eb4a8823ef602fbf45ae7781cbea51f76111..1f7e89db8829bb4f02dee9535ebd6a3480246500 100644 (file)
  *
  */
 
-#define BOOST_BIND_NO_PLACEHOLDERS
-
+#include <boost/asio/associated_executor.hpp>
+#include <boost/asio/error.hpp>
 #include <optional>
+#include <deque>
+#include <queue>
 #include <string_view>
 
+#include <boost/asio/execution/context.hpp>
+
+#include <boost/asio/any_completion_handler.hpp>
+#include <boost/asio/append.hpp>
+#include <boost/asio/async_result.hpp>
+#include <boost/asio/consign.hpp>
+#include <boost/asio/error.hpp>
+#include <boost/asio/execution_context.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/query.hpp>
+#include <boost/asio/strand.hpp>
+
+#include <boost/system/error_code.hpp>
+
+#include "common/async/service.h"
+
+#define BOOST_BIND_NO_PLACEHOLDERS
+
 #include <boost/intrusive_ptr.hpp>
 
 #include <fmt/format.h>
 
+#include "include/any.h"
 #include "include/ceph_fs.h"
 
 #include "common/ceph_context.h"
@@ -45,6 +66,8 @@ namespace bc = boost::container;
 namespace bs = boost::system;
 namespace cb = ceph::buffer;
 
+namespace async = ceph::async;
+
 namespace neorados {
 // Object
 
@@ -1341,6 +1364,129 @@ void RADOS::stat_fs_(std::optional<std::int64_t> _pool,
 
 // --- Watch/Notify
 
+class Notifier : public async::service_list_base_hook {
+  friend ceph::async::service<Notifier>;
+
+  struct id_and_handler {
+    uint64_t id = 0;
+    RADOS::NextNotificationComp comp;
+  };
+
+  asio::io_context::executor_type ex;
+  // Zero for unbounded. I would not recommend this.
+  const uint32_t capacity;
+
+  async::service<Notifier>& svc;
+  std::queue<std::pair<bs::error_code, Notification>> notifications;
+  std::deque<id_and_handler> handlers;
+  std::mutex m;
+  uint64_t next_id = 0;
+
+  void service_shutdown() {
+    std::unique_lock l(m);
+    handlers.clear();
+  }
+
+public:
+
+  Notifier(asio::io_context::executor_type ex, uint32_t capacity)
+    : ex(ex), capacity(capacity),
+      svc(asio::use_service<async::service<Notifier>>(
+           asio::query(ex, boost::asio::execution::context))) {
+    // register for service_shutdown() notifications
+    svc.add(*this);
+  }
+
+  ~Notifier() {
+    while (!handlers.empty()) {
+      auto h = std::move(handlers.front());
+      asio::post(ex, asio::append(std::move(h.comp),
+                                 asio::error::operation_aborted,
+                                 Notification{}));
+      handlers.pop_front();
+    }
+    svc.remove(*this);
+  }
+
+  auto next_tid() {
+    std::unique_lock l(m);
+    return ++next_id;
+  }
+  void add_handler(uint64_t id, RADOS::NextNotificationComp&& h) {
+    std::unique_lock l(m);
+    if (notifications.empty()) {
+      handlers.push_back({id, std::move(h)});
+    } else {
+      auto [e, n] = std::move(notifications.front());
+      notifications.pop();
+      l.unlock();
+      dispatch(asio::append(std::move(h), std::move(e), std::move(n)));
+    }
+  }
+
+  void cancel(uint64_t id) {
+    std::unique_lock l(m);
+    for (auto i = handlers.begin(); i != handlers.end(); ++i) {
+      if (i->id == id) {
+       dispatch(asio::append(std::move(i->comp),
+                             asio::error::operation_aborted, Notification{}));
+       handlers.erase(i);
+       break;
+      }
+    }
+  }
+
+  void operator ()(bs::error_code ec, uint64_t notify_id, uint64_t cookie,
+                  uint64_t notifier_id, buffer::list&& bl) {
+    std::unique_lock l(m);
+    if (!handlers.empty()) {
+      auto h = std::move(handlers.front());
+      handlers.pop_front();
+      l.unlock();
+      dispatch(asio::append(std::move(h.comp), std::move(ec),
+                           Notification{
+                             .notify_id = notify_id,
+                             .cookie = cookie,
+                             .notifier_id = notifier_id,
+                             .bl = std::move(bl),
+                           }));
+    } else if (capacity && notifications.size() >= capacity) {
+      // We are allowed one over, so the client knows where in the
+      // sequence of notifications we started losing data.
+      notifications.push({errc::notification_overflow, {}});
+    } else {
+      notifications.push({{},
+                         Notification{
+                             .notify_id = notify_id,
+                             .cookie = cookie,
+                             .notifier_id = notifier_id,
+                             .bl = std::move(bl),
+                         }});
+    }
+  }
+};
+
+struct next_notify_cancellation {
+  std::weak_ptr<Notifier> ptr;
+  uint64_t id;
+
+  next_notify_cancellation(std::weak_ptr<Notifier> ptr, uint64_t id)
+    : ptr(std::move(ptr)), id(id) {}
+
+  void operator ()(asio::cancellation_type_t type) {
+    if (type == asio::cancellation_type::total ||
+       type == asio::cancellation_type::terminal) {
+      // Since nobody can cancel until we return (I hope) we shouldn't
+      // need a mutex or anything.
+      auto notifier = ptr.lock();
+      if (notifier) {
+       notifier->cancel(id);
+      }
+    }
+  }
+};
+
+
 void RADOS::watch_(Object o, IOContext _ioc,
                   std::optional<std::chrono::seconds> timeout, WatchCB cb,
                   WatchComp c) {
@@ -1366,6 +1512,60 @@ void RADOS::watch_(Object o, IOContext _ioc,
       }), nullptr);
 }
 
+void RADOS::watch_(Object o, IOContext _ioc, WatchComp c,
+                  std::optional<std::chrono::seconds> timeout,
+                  std::uint32_t queue_size = 128u) {
+  auto oid = reinterpret_cast<const object_t*>(&o.impl);
+  auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+
+  ObjectOperation op;
+
+  auto linger_op = impl->objecter->linger_register(*oid, ioc->oloc,
+                                                   ioc->extra_op_flags);
+  uint64_t cookie = linger_op->get_cookie();
+  // Shared pointer to avoid a potential race condition
+  linger_op->user_data.emplace<std::shared_ptr<Notifier>>(
+    std::make_shared<Notifier>(get_executor(), queue_size));
+  auto& n = ceph::any_cast<std::shared_ptr<Notifier>&>(
+    linger_op->user_data);
+  linger_op->handle = std::ref(*n);
+  op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count());
+  bufferlist bl;
+  auto e = asio::prefer(get_executor(),
+                       asio::execution::outstanding_work.tracked);
+  impl->objecter->linger_watch(
+    linger_op, op, ioc->snapc, ceph::real_clock::now(), bl,
+    asio::bind_executor(
+      std::move(e),
+      [c = std::move(c), cookie](bs::error_code e, cb::list) mutable {
+       asio::dispatch(asio::append(std::move(c), e, cookie));
+      }), nullptr);
+}
+
+void RADOS::next_notification_(uint64_t cookie, NextNotificationComp c) {
+  Objecter::LingerOp* linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+  if (!impl->objecter->is_valid_watch(linger_op)) {
+    dispatch(asio::append(std::move(c),
+                         bs::error_code(ENOTCONN, bs::generic_category()),
+                         Notification{}));
+  } else try {
+      auto n = ceph::any_cast<std::shared_ptr<Notifier>&>(
+       linger_op->user_data);
+      // Arrange for cancellation
+      auto slot = boost::asio::get_associated_cancellation_slot(c);
+      auto id = n->next_tid();
+      if (slot.is_connected()) {
+       slot.template emplace<next_notify_cancellation>(
+         std::weak_ptr<Notifier>(n), id);
+      }
+      n->add_handler(id, std::move(c));
+    } catch (const std::bad_any_cast&) {
+      dispatch(asio::append(std::move(c),
+                           bs::error_code(errc::polled_callback_watch),
+                           Notification{}));
+    }
+}
+
 void RADOS::notify_ack_(Object o, IOContext _ioc,
                        uint64_t notify_id,
                        uint64_t cookie,
@@ -1775,6 +1975,10 @@ const char* category::message(int ev, char*,
     return "Snapshot does not exist";
   case errc::invalid_snapcontext:
     return "Invalid snapcontext";
+  case errc::notification_overflow:
+    return "Notificaton overflow";
+  case errc::polled_callback_watch:
+    return "Attempted to poll a callback watch";
   }
 
   return "Unknown error";
@@ -1792,6 +1996,12 @@ bs::error_condition category::default_error_condition(int ev) const noexcept {
     return ceph::errc::does_not_exist;
   case errc::invalid_snapcontext:
     return bs::errc::invalid_argument;
+  case errc::notification_overflow:
+    // I don't know if this is the right choice, but it at least has a
+    // sense of "Try again". Maybe just map it to itself?
+    return bs::errc::interrupted;
+  case errc::polled_callback_watch:
+    return bs::errc::bad_file_descriptor;
   }
 
   return { ev, *this };
@@ -1808,6 +2018,16 @@ bool category::equivalent(int ev, const bs::error_condition& c) const noexcept {
       return true;
     }
   }
+  if (static_cast<errc>(ev) == errc::notification_overflow) {
+    if (c == bs::errc::interrupted) {
+      return true;
+    }
+  }
+  if (static_cast<errc>(ev) == errc::polled_callback_watch) {
+    if (c == bs::errc::invalid_argument) {
+      return true;
+    }
+  }
 
   return default_error_condition(ev) == c;
 }
@@ -1820,6 +2040,10 @@ int category::from_code(int ev) const noexcept {
     return -ENOENT;
   case errc::invalid_snapcontext:
     return -EINVAL;
+  case errc::notification_overflow:
+    return -EINTR;
+  case errc::polled_callback_watch:
+    return -EBADF;
   }
   return -EDOM;
 }
index 862d4d0cdaf1d59028eddbaf5805f259a80d8798..36b6f29cb916461a48c3ba39aac93c5c2ab4b6fe 100644 (file)
@@ -2377,6 +2377,11 @@ public:
                              uint64_t cookie,
                              uint64_t notifier_id,
                              ceph::buffer::list&& bl)> handle;
+    // I am sorely tempted to replace function2 with cxx_function, as
+    // cxx_function has the `target` method from `std::function` and I
+    // keep having to do annoying circumlocutions like this one to be
+    // able to access function object data with function2.
+    ceph::unique_any user_data;
     OSDSession *session{nullptr};
 
     int ctx_budget{-1};
index 901a1e7491ae7c4155638b1e815172c47b980883..25efab9058b3c8a5cebaf2053934362c4fa0a878 100644 (file)
@@ -9,17 +9,19 @@
  *
  */
 
+#include <boost/system/detail/errc.hpp>
 #include <coroutine>
 #include <cstdint>
 #include <iostream>
 #include <utility>
 #include <vector>
 
-#include <boost/asio/as_tuple.hpp>
 #include <boost/asio/awaitable.hpp>
 #include <boost/asio/co_spawn.hpp>
 #include <boost/asio/use_awaitable.hpp>
 
+#include <boost/asio/experimental/awaitable_operators.hpp>
+
 #include <boost/container/flat_set.hpp>
 
 #include <boost/system/errc.hpp>
@@ -47,6 +49,8 @@ using neorados::WriteOp;
 
 using std::uint64_t;
 
+using namespace boost::asio::experimental::awaitable_operators;
+
 class NeoRadosWatchNotifyTest : public NeoRadosTest {
 protected:
   buffer::list notify_bl;
@@ -171,3 +175,121 @@ CORO_TEST_F(NeoRadosWatchNotify, WatchNotifyTimeout, NeoRadosWatchNotifyTest) {
 
   co_return;
 }
+
+CORO_TEST_F(NeoRadosWatchNotifyPoll, WatchNotify, NeoRadosTest) {
+  static constexpr auto oid = "obj"sv;
+  co_await create_obj(oid);
+  auto handle = co_await rados().watch(oid, pool(), asio::use_awaitable);
+  EXPECT_TRUE(rados().check_watch(handle));
+  std::vector<neorados::ObjWatcher> watchers;
+  co_await execute(oid, ReadOp{}.list_watchers(&watchers));
+  EXPECT_EQ(1u, watchers.size());
+  auto notify = [](neorados::RADOS& r, neorados::IOContext ioc)
+    -> asio::awaitable<void> {
+    auto reply = co_await r.notify(oid, ioc, {}, {}, asio::use_awaitable);
+    std::map<std::pair<uint64_t, uint64_t>, buffer::list> reply_map;
+    std::set<std::pair<uint64_t, uint64_t>> missed_set;
+    auto p = reply.cbegin();
+    decode(reply_map, p);
+    decode(missed_set, p);
+    EXPECT_EQ(1u, reply_map.size());
+    EXPECT_EQ(5u, reply_map.begin()->second.length());
+    EXPECT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+    EXPECT_EQ(0u, missed_set.size());
+
+    co_return;
+  }(rados(), pool());
+  auto poll = [](neorados::RADOS& r, neorados::IOContext ioc,
+                uint64_t handle) -> asio::awaitable<void> {
+    auto notification = co_await r.next_notification(handle,
+                                                    asio::use_awaitable);
+    co_await r.notify_ack(oid, ioc, notification.notify_id, handle,
+                         to_buffer_list("reply"sv), asio::use_awaitable);
+    EXPECT_EQ(handle, notification.cookie);
+  }(rados(), pool(), handle);
+
+  co_await (std::move(notify) && std::move(poll));
+
+  EXPECT_TRUE(rados().check_watch(handle));
+  co_await rados().unwatch(handle, pool(), asio::use_awaitable);
+
+  co_return;
+}
+
+CORO_TEST_F(NeoRadosWatchNotifyPoll, WatchNotifyTimeout, NeoRadosTest) {
+  static constexpr auto oid = "obj"sv;
+  static constexpr auto timeout = 1s;
+  static constexpr auto delay = 3s;
+
+  co_await create_obj(oid);
+  auto handle = co_await rados().watch(oid, pool(), asio::use_awaitable);
+  EXPECT_TRUE(rados().check_watch(handle));
+  std::vector<neorados::ObjWatcher> watchers;
+  co_await execute(oid, ReadOp{}.list_watchers(&watchers));
+  EXPECT_EQ(1u, watchers.size());
+
+  auto notify = [](neorados::RADOS& r, neorados::IOContext ioc)
+    -> asio::awaitable<void> {
+    co_await expect_error_code(r.notify(oid, ioc, {}, timeout,
+                                       asio::use_awaitable),
+                              sys::errc::timed_out);
+  }(rados(), pool());
+
+  auto ack_slowly = [](neorados::RADOS& r, neorados::IOContext ioc,
+                      uint64_t handle) -> asio::awaitable<void> {
+    auto notification = co_await r.next_notification(handle,
+                                                    asio::use_awaitable);
+    EXPECT_EQ(handle, notification.cookie);
+    co_await wait_for(delay);
+    co_await r.notify_ack(oid, ioc, notification.notify_id, handle,
+                         to_buffer_list("reply"sv), asio::use_awaitable);
+  }(rados(), pool(), handle);
+
+
+  co_await (std::move(notify) || std::move(ack_slowly));
+
+  EXPECT_TRUE(rados().check_watch(handle));
+  co_await rados().unwatch(handle, pool(), asio::use_awaitable);
+
+  co_await rados().flush_watch(asio::use_awaitable);
+
+  co_return;
+}
+
+CORO_TEST_F(NeoRadosWatchNotifyPoll, WrongWatchType, NeoRadosTest) {
+  static constexpr auto oid = "obj"sv;
+
+  co_await create_obj(oid);
+  auto handle = co_await rados().watch(oid, pool(), std::nullopt,
+                                       [](auto&&...) { std::terminate(); },
+                                       asio::use_awaitable);
+  co_await expect_error_code(
+    rados().next_notification(handle, asio::use_awaitable),
+    neorados::errc::polled_callback_watch);
+  co_await expect_error_code(
+    rados().next_notification(handle, asio::use_awaitable),
+    sys::errc::bad_file_descriptor);
+  co_await expect_error_code(
+    rados().next_notification(handle, asio::use_awaitable),
+    sys::errc::invalid_argument);
+}
+
+CORO_TEST_F(NeoRadosWatchNotifyPoll, WatchNotifyCancel, NeoRadosTest) {
+  static constexpr auto oid = "obj"sv;
+
+  co_await create_obj(oid);
+  auto handle = co_await rados().watch(oid, pool(), asio::use_awaitable);
+  EXPECT_TRUE(rados().check_watch(handle));
+  std::vector<neorados::ObjWatcher> watchers;
+  co_await execute(oid, ReadOp{}.list_watchers(&watchers));
+  EXPECT_EQ(1u, watchers.size());
+
+  co_await (rados().next_notification(handle, asio::use_awaitable) ||
+           wait_for(50us));
+  EXPECT_TRUE(rados().check_watch(handle));
+  co_await rados().unwatch(handle, pool(), asio::use_awaitable);
+
+  co_await rados().flush_watch(asio::use_awaitable);
+
+  co_return;
+}