]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
neorados: Hold reference to implementation across operations
authorAdam C. Emerson <aemerson@redhat.com>
Fri, 30 May 2025 20:54:45 +0000 (16:54 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 5 Aug 2025 20:07:42 +0000 (16:07 -0400)
Asynchrony combined with cancellations keeps leading to occasional
lifetime issues, so follow the best-practices of Asio I/O objects by
having completions keep a reference live.

The original NeoRados backing implements Asio's two-phase shutdown
properly.

The RadosClient backing does not, because it shares an Objecter with
completions that do not belong to it. In practice I don't think this
will matter since librados and neorados get shut down around the same
time.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/include/neorados/RADOS.hpp
src/neorados/RADOS.cc
src/neorados/RADOSImpl.cc
src/neorados/RADOSImpl.h
src/test/librados_test_stub/NeoradosTestStub.cc

index 27ca79f1a0d1e6f00edfd527092a1758edc786b0..c58cf5a9fd73355de3ad7db4268c0a6d91a65a35 100644 (file)
@@ -24,7 +24,6 @@
 #include <string>
 #include <string_view>
 #include <type_traits>
-#include <variant>
 
 #include <fmt/format.h>
 #include <fmt/ostream.h>
@@ -1373,8 +1372,8 @@ public:
 
   static RADOS make_with_librados(librados::Rados& rados);
 
-  RADOS(const RADOS&) = delete;
-  RADOS& operator =(const RADOS&) = delete;
+  RADOS(const RADOS&);
+  RADOS& operator =(const RADOS&);
 
   RADOS(RADOS&&);
   RADOS& operator =(RADOS&&);
@@ -1387,14 +1386,26 @@ public:
   executor_type get_executor() const;
   boost::asio::io_context& get_io_context();
 
+private:
+  template<typename CompletionToken>
+  auto consign(CompletionToken&& token) {
+    return boost::asio::consign(
+      std::forward<CompletionToken>(token),
+      std::make_pair(
+       boost::asio::make_work_guard(
+         boost::asio::get_associated_executor(token, get_executor())),
+       impl));
+  }
+
+public:
+
+
   template<boost::asio::completion_token_for<Op::Signature> CompletionToken>
   auto execute(Object o, IOContext ioc, ReadOp op,
               ceph::buffer::list* bl,
               CompletionToken&& token, uint64_t* objver = nullptr,
               const blkin_trace_info* trace_info = nullptr) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), Op::Signature>(
       [bl, objver, trace_info, this](auto&& handler, Object o, IOContext ioc,
                                     ReadOp op) {
@@ -1407,9 +1418,7 @@ public:
   auto execute(Object o, IOContext ioc, WriteOp op,
               CompletionToken&& token, uint64_t* objver = nullptr,
               const blkin_trace_info* trace_info = nullptr) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), Op::Signature>(
       [objver, trace_info, this](auto&& handler, Object o, IOContext ioc,
                                 WriteOp op) {
@@ -1425,9 +1434,7 @@ public:
   using LookupPoolComp = boost::asio::any_completion_handler<LookupPoolSig>;
   template<boost::asio::completion_token_for<LookupPoolSig> CompletionToken>
   auto lookup_pool(std::string name, CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), LookupPoolSig>(
       [this](auto&& handler, std::string name) {
        lookup_pool_(std::move(name), std::move(handler));
@@ -1440,9 +1447,7 @@ public:
   using LSPoolsComp = boost::asio::any_completion_handler<LSPoolsSig>;
   template<boost::asio::completion_token_for<LSPoolsSig> CompletionToken>
   auto list_pools(CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), LSPoolsSig>(
       [this](auto&& handler) {
        list_pools_(std::move(handler));
@@ -1454,9 +1459,7 @@ public:
   template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto create_pool_snap(int64_t pool, std::string snap_name,
                        CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [pool, this](auto&& handler, std::string snap_name) {
        create_pool_snap_(pool, std::move(snap_name),
@@ -1475,9 +1478,7 @@ public:
   using SMSnapComp = boost::asio::any_completion_handler<SMSnapSig>;
   template<boost::asio::completion_token_for<SMSnapSig> CompletionToken>
   auto allocate_selfmanaged_snap(int64_t pool, CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), SMSnapSig>(
       [pool, this](auto&& handler) {
        allocate_selfmanaged_snap_(pool, std::move(handler));
@@ -1487,9 +1488,7 @@ public:
   template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto delete_pool_snap(int64_t pool, std::string snap_name,
                        CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [pool, this](auto&& handler, std::string snap_name) {
        delete_pool_snap_(pool, std::move(snap_name),
@@ -1500,9 +1499,7 @@ public:
   template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto delete_selfmanaged_snap(int64_t pool, std::uint64_t snap,
                               CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [pool, snap, this](auto&& handler) {
        delete_selfmanaged_snap_(pool, snap, std::move(handler));
@@ -1545,9 +1542,7 @@ public:
   template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto create_pool(std::string name, std::optional<int> crush_rule,
                   CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [crush_rule, this](auto&& handler, std::string name) {
        create_pool_(std::move(name), crush_rule,
@@ -1557,9 +1552,7 @@ public:
 
   template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto delete_pool(std::string name, CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [this](auto&& handler, std::string name) {
        delete_pool_(std::move(name), std::move(handler));
@@ -1568,9 +1561,7 @@ public:
 
   template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto delete_pool(int64_t pool, CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [pool, this](auto&& handler) {
        delete_pool_(pool, std::move(handler));
@@ -1583,9 +1574,7 @@ public:
   using PoolStatComp = boost::asio::any_completion_handler<PoolStatSig>;
   template<boost::asio::completion_token_for<PoolStatSig> CompletionToken>
   auto stat_pools(std::vector<std::string> pools, CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), PoolStatSig>(
       [this](auto&& handler, std::vector<std::string> pools) {
        stat_pools_(std::move(pools), std::move(handler));
@@ -1597,9 +1586,7 @@ public:
   using StatFSComp = boost::asio::any_completion_handler<StatFSSig>;
   template<boost::asio::completion_token_for<StatFSSig> CompletionToken>
   auto statfs(std::optional<int64_t> pool, CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), StatFSSig>(
       [pool, this](auto&& handler) {
        statfs_(pool, std::move(handler));
@@ -1619,9 +1606,7 @@ public:
   auto watch(Object o, IOContext ioc,
             std::optional<std::chrono::seconds> timeout,
             WatchCB cb, CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), WatchSig>(
       [timeout, this](auto&& handler, Object o, IOContext ioc, WatchCB cb) {
        watch_(std::move(o), std::move(ioc), timeout, std::move(cb),
@@ -1633,9 +1618,7 @@ public:
   auto watch(Object o, IOContext ioc, CompletionToken&& token,
             std::optional<std::chrono::seconds> timeout = std::nullopt,
             std::uint32_t queue_size = 128u) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), WatchSig>(
       [timeout, queue_size, this]
       (auto&& handler, Object o, IOContext ioc) mutable {
@@ -1651,9 +1634,7 @@ public:
   template<boost::asio::completion_token_for<
             NextNotificationSig> CompletionToken>
   auto next_notification(uint64_t cookie, CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<
       decltype(consigned), NextNotificationSig>(
        [cookie, this](auto&& handler) mutable {
@@ -1666,9 +1647,7 @@ public:
                  uint64_t notify_id, uint64_t cookie,
                  ceph::buffer::list bl,
                  CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [notify_id, cookie, this](auto&& handler, Object o, IOContext ioc,
                                buffer::list bl) {
@@ -1680,9 +1659,7 @@ public:
   template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto unwatch(std::uint64_t cookie, IOContext ioc,
               CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [cookie, this](auto&& handler, IOContext ioc) {
        unwatch_(cookie, std::move(ioc), std::move(handler));
@@ -1697,9 +1674,7 @@ public:
   using VoidOpComp = boost::asio::any_completion_handler<VoidOpSig>;
   template<boost::asio::completion_token_for<VoidOpSig> CompletionToken>
   auto flush_watch(CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), VoidOpSig>(
       [this](auto&& handler) {
        flush_watch_(std::move(handler));
@@ -1720,9 +1695,7 @@ public:
   auto notify(Object o, IOContext ioc, ceph::buffer::list bl,
              std::optional<std::chrono::seconds> timeout,
              CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), NotifySig>(
       [timeout, this](auto&& handler, Object o, IOContext ioc,
                      buffer::list bl) {
@@ -1742,9 +1715,7 @@ public:
                         Cursor end, std::uint32_t max,
                         ceph::buffer::list filter,
                         CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), EnumerateSig>(
       [max, this](auto&& handler, IOContext ioc, Cursor begin, Cursor end,
                  buffer::list filter) {
@@ -1760,9 +1731,7 @@ public:
   template<boost::asio::completion_token_for<CommandSig> CompletionToken>
   auto osd_command(int osd, std::vector<std::string> cmd,
                   ceph::buffer::list in, CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), CommandSig>(
       [osd, this](auto&& handler, std::vector<std::string> cmd,
                  buffer::list in) {
@@ -1773,9 +1742,7 @@ public:
   template<boost::asio::completion_token_for<CommandSig> CompletionToken>
   auto pg_command(PG pg, std::vector<std::string> cmd,
                  ceph::buffer::list in, CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), CommandSig>(
       [this](auto&& handler, PG pg, std::vector<std::string> cmd,
             buffer::list in) {
@@ -1789,9 +1756,7 @@ public:
                   ceph::buffer::list bl,
                   std::string* outs, ceph::buffer::list* outbl,
                   CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [outs, outbl, this](auto&& handler, std::vector<std::string> command,
                          buffer::list bl) {
@@ -1803,9 +1768,7 @@ public:
   template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto enable_application(std::string pool, std::string app_name,
                          bool force, CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [force, this](auto&& handler, std::string pool, std::string app_name) {
        enable_application_(std::move(pool), std::move(app_name), force,
@@ -1817,9 +1780,7 @@ public:
   auto blocklist_add(std::string client_address,
                      std::optional<std::chrono::seconds> expire,
                      CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [expire, this](auto&& handler, std::string client_address) {
        blocklist_add_(std::move(client_address), expire,
@@ -1829,9 +1790,7 @@ public:
 
   template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto wait_for_latest_osd_map(CompletionToken&& token) {
-    auto consigned = boost::asio::consign(
-      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
-       boost::asio::get_associated_executor(token, get_executor())));
+    auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [this](auto&& handler) {
        wait_for_latest_osd_map_(std::move(handler));
@@ -1846,7 +1805,7 @@ private:
 
   friend Builder;
 
-  RADOS(std::unique_ptr<detail::Client> impl);
+  RADOS(std::shared_ptr<detail::Client> impl);
   static void make_with_cct_(CephContext* cct,
                             boost::asio::io_context& ioctx,
                             BuildComp c);
@@ -1930,7 +1889,7 @@ private:
   void wait_for_latest_osd_map_(SimpleOpComp c);
 
   // Proxy object to provide access to low-level RADOS messaging clients
-  std::unique_ptr<detail::Client> impl;
+  std::shared_ptr<detail::Client> impl;
 };
 #pragma clang diagnostic pop
 
index f0b100bf96da677b6fec2011d0d3fbad1fd850e8..ff98d3a94885e6d837232149f38e73dc2e19d419 100644 (file)
@@ -897,9 +897,10 @@ void RADOS::make_with_cct_(CephContext* cct,
                           asio::io_context& ioctx,
                           BuildComp c) {
   try {
-    auto r = new detail::NeoClient{std::make_unique<detail::RADOS>(ioctx, cct)};
+    auto r = std::make_shared<detail::NeoClient>(
+      std::make_unique<detail::RADOS>(ioctx, cct));
     r->objecter->wait_for_osd_map(
-      [c = std::move(c), r = std::unique_ptr<detail::Client>(r)]() mutable {
+      [c = std::move(c), r = std::move(r)]() mutable {
        asio::dispatch(asio::append(std::move(c), bs::error_code{},
                                    RADOS{std::move(r)}));
       });
@@ -910,14 +911,17 @@ void RADOS::make_with_cct_(CephContext* cct,
 }
 
 RADOS RADOS::make_with_librados(librados::Rados& rados) {
-  return RADOS{std::make_unique<detail::RadosClient>(rados.client)};
+  return RADOS{std::make_shared<detail::RadosClient>(rados.client)};
 }
 
 RADOS::RADOS() = default;
 
-RADOS::RADOS(std::unique_ptr<detail::Client> impl)
+RADOS::RADOS(std::shared_ptr<detail::Client> impl)
   : impl(std::move(impl)) {}
 
+RADOS::RADOS(const RADOS&) = default;
+RADOS& RADOS::operator =(const RADOS&) = default;
+
 RADOS::RADOS(RADOS&&) = default;
 RADOS& RADOS::operator =(RADOS&&) = default;
 
@@ -1382,8 +1386,12 @@ class Notifier : public async::service_list_base_hook {
   std::deque<id_and_handler> handlers;
   std::mutex m;
   uint64_t next_id = 0;
+  std::shared_ptr<detail::Client> neoref;
 
   void service_shutdown() {
+    if (neoref) {
+      neoref = nullptr;
+    }
     if (linger_op) {
       linger_op->put();
     }
@@ -1394,10 +1402,11 @@ class Notifier : public async::service_list_base_hook {
 public:
 
   Notifier(asio::io_context::executor_type ex, Objecter::LingerOp* linger_op,
-          uint32_t capacity)
+          uint32_t capacity, std::shared_ptr<detail::Client> neoref)
     : ex(ex), linger_op(linger_op), capacity(capacity),
       svc(asio::use_service<async::service<Notifier>>(
-           asio::query(ex, boost::asio::execution::context))) {
+           asio::query(ex, boost::asio::execution::context))),
+      neoref(std::move(neoref)) {
     // register for service_shutdown() notifications
     svc.add(*this);
   }
@@ -1534,7 +1543,7 @@ void RADOS::watch_(Object o, IOContext _ioc, WatchComp c,
   uint64_t cookie = linger_op->get_cookie();
   // Shared pointer to avoid a potential race condition
   linger_op->user_data.emplace<std::shared_ptr<Notifier>>(
-    std::make_shared<Notifier>(get_executor(), linger_op, queue_size));
+    std::make_shared<Notifier>(get_executor(), linger_op, queue_size, impl));
   auto& n = ceph::any_cast<std::shared_ptr<Notifier>&>(
     linger_op->user_data);
   linger_op->handle = std::ref(*n);
index c40acf52d04d516acf6ea3eab3419355391d2f67..e4d98eea2504745020e0a7c6aed5dfd6a042cc8e 100644 (file)
 
 #include <boost/system/system_error.hpp>
 
-#include "common/common_init.h"
-
-#include "global/global_init.h"
-
 namespace neorados {
 namespace detail {
 
@@ -83,17 +79,7 @@ RADOS::RADOS(boost::asio::io_context& ioctx,
 }
 
 RADOS::~RADOS() {
-  if (objecter && objecter->initialized) {
-    objecter->shutdown();
-  }
-
-  mgrclient.shutdown();
-  monclient.shutdown();
-
-  if (messenger) {
-    messenger->shutdown();
-    messenger->wait();
-  }
+  shutdown();
 }
 
 bool RADOS::ms_dispatch(Message *m)
@@ -116,5 +102,19 @@ bool RADOS::ms_handle_refused(Connection *con) {
   return false;
 }
 
+void RADOS::shutdown() {
+  if (objecter && objecter->initialized) {
+    objecter->shutdown();
+  }
+
+  // These shutdowns are idempotent
+  mgrclient.shutdown();
+  monclient.shutdown();
+
+  if (messenger) {
+    messenger->shutdown();
+    messenger->wait();
+  }
+}
 } // namespace detail
 } // namespace neorados
index 9e3adea3a2bd0f27869093501b963437c0181774..b297037703112ff0bd97730c53dd467045e59080 100644 (file)
 #ifndef CEPH_NEORADOS_RADOSIMPL_H
 #define CEPH_NEORADOS_RADOSIMPL_H
 
+#include <atomic>
 #include <memory>
-#include <string>
 
 #include <boost/intrusive_ptr.hpp>
 
+#include "common/async/service.h"
+
 #include "common/ceph_context.h"
 #include "common/ceph_mutex.h"
 
@@ -40,15 +42,14 @@ namespace detail {
 
 class NeoClient;
 
-class RADOS : public Dispatcher
-{
+class RADOS : public Dispatcher {
   friend ::neorados::RADOS;
   friend NeoClient;
 
   boost::asio::io_context& ioctx;
   boost::intrusive_ptr<CephContext> cct;
 
-  ceph::mutex lock = ceph::make_mutex("RADOS_unleashed::_::RADOSImpl");
+  ceph::mutex lock = ceph::make_mutex("neorados::detail::RADOSImpl");
   int instance_id = -1;
 
   std::unique_ptr<Messenger> messenger;
@@ -57,6 +58,7 @@ class RADOS : public Dispatcher
   MgrClient mgrclient;
 
   std::unique_ptr<Objecter> objecter;
+  std::atomic<bool> finished = false;
 
 public:
 
@@ -70,9 +72,10 @@ public:
   mon_feature_t get_required_monitor_features() const {
     return monclient.with_monmap(std::mem_fn(&MonMap::get_required_features));
   }
+  void shutdown();
 };
 
-class Client {
+class Client : public std::enable_shared_from_this<Client> {
 public:
   Client(boost::asio::io_context& ioctx,
          boost::intrusive_ptr<CephContext> cct,
@@ -97,19 +100,38 @@ public:
   virtual int get_instance_id() const = 0;
 };
 
-class NeoClient : public Client {
+class NeoClient : public Client,
+                 public ceph::async::service_list_base_hook {
 public:
+
   NeoClient(std::unique_ptr<RADOS>&& rados)
     : Client(rados->ioctx, rados->cct, rados->monclient,
-             rados->objecter.get()),
+            rados->objecter.get()),
+      svc(boost::asio::use_service<ceph::async::service<NeoClient>>(
+         boost::asio::query(ioctx.get_executor(),
+                            boost::asio::execution::context))),
       rados(std::move(rados)) {
+    svc.add(*this);
+  }
+
+  ~NeoClient() {
+    svc.remove(*this);
   }
 
   int get_instance_id() const override {
     return rados->instance_id;
   }
 
+  void service_shutdown() {
+    // In case the last owner of a reference is an op we're about to
+    // cancel. (This can happen if the `RADOS` object
+    auto service_ref = shared_from_this();
+    rados->shutdown();
+  }
+
 private:
+  friend ceph::async::service<RADOS>;
+  async::service<NeoClient>& svc;
   std::unique_ptr<RADOS> rados;
 };
 
index a1b61f2459b3a8a24a67209408628c9f0bb014f4..fa04bfa40a7c352df9154b98a8eaaca44a607a08 100644 (file)
@@ -558,7 +558,7 @@ RADOS::RADOS() = default;
 
 RADOS::RADOS(RADOS&&) = default;
 
-RADOS::RADOS(std::unique_ptr<detail::Client> impl)
+RADOS::RADOS(std::shared_ptr<detail::Client> impl)
   : impl(std::move(impl)) {
 }