From 24e67fa8112221b452d8bd8dc8a9d689203d3054 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Fri, 30 May 2025 16:54:45 -0400 Subject: [PATCH] neorados: Hold reference to implementation across operations Asynchrony combined with cancellations keeps leading to occasional lifetime issues, so follow the best-practices of Asio I/O objects by having completions keep a reference live. The original NeoRados backing implements Asio's two-phase shutdown properly. The RadosClient backing does not, because it shares an Objecter with completions that do not belong to it. In practice I don't think this will matter since librados and neorados get shut down around the same time. Signed-off-by: Adam C. Emerson --- src/include/neorados/RADOS.hpp | 131 ++++++------------ src/neorados/RADOS.cc | 23 ++- src/neorados/RADOSImpl.cc | 30 ++-- src/neorados/RADOSImpl.h | 36 ++++- .../librados_test_stub/NeoradosTestStub.cc | 2 +- 5 files changed, 106 insertions(+), 116 deletions(-) diff --git a/src/include/neorados/RADOS.hpp b/src/include/neorados/RADOS.hpp index 27ca79f1a0d1e..c58cf5a9fd733 100644 --- a/src/include/neorados/RADOS.hpp +++ b/src/include/neorados/RADOS.hpp @@ -24,7 +24,6 @@ #include #include #include -#include #include #include @@ -1373,8 +1372,8 @@ public: static RADOS make_with_librados(librados::Rados& rados); - RADOS(const RADOS&) = delete; - RADOS& operator =(const RADOS&) = delete; + RADOS(const RADOS&); + RADOS& operator =(const RADOS&); RADOS(RADOS&&); RADOS& operator =(RADOS&&); @@ -1387,14 +1386,26 @@ public: executor_type get_executor() const; boost::asio::io_context& get_io_context(); +private: + template + auto consign(CompletionToken&& token) { + return boost::asio::consign( + std::forward(token), + std::make_pair( + boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor())), + impl)); + } + +public: + + template CompletionToken> auto execute(Object o, IOContext ioc, ReadOp op, ceph::buffer::list* bl, CompletionToken&& token, uint64_t* objver = nullptr, const blkin_trace_info* trace_info = nullptr) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [bl, objver, trace_info, this](auto&& handler, Object o, IOContext ioc, ReadOp op) { @@ -1407,9 +1418,7 @@ public: auto execute(Object o, IOContext ioc, WriteOp op, CompletionToken&& token, uint64_t* objver = nullptr, const blkin_trace_info* trace_info = nullptr) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [objver, trace_info, this](auto&& handler, Object o, IOContext ioc, WriteOp op) { @@ -1425,9 +1434,7 @@ public: using LookupPoolComp = boost::asio::any_completion_handler; template CompletionToken> auto lookup_pool(std::string name, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [this](auto&& handler, std::string name) { lookup_pool_(std::move(name), std::move(handler)); @@ -1440,9 +1447,7 @@ public: using LSPoolsComp = boost::asio::any_completion_handler; template CompletionToken> auto list_pools(CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [this](auto&& handler) { list_pools_(std::move(handler)); @@ -1454,9 +1459,7 @@ public: template CompletionToken> auto create_pool_snap(int64_t pool, std::string snap_name, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [pool, this](auto&& handler, std::string snap_name) { create_pool_snap_(pool, std::move(snap_name), @@ -1475,9 +1478,7 @@ public: using SMSnapComp = boost::asio::any_completion_handler; template CompletionToken> auto allocate_selfmanaged_snap(int64_t pool, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [pool, this](auto&& handler) { allocate_selfmanaged_snap_(pool, std::move(handler)); @@ -1487,9 +1488,7 @@ public: template CompletionToken> auto delete_pool_snap(int64_t pool, std::string snap_name, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [pool, this](auto&& handler, std::string snap_name) { delete_pool_snap_(pool, std::move(snap_name), @@ -1500,9 +1499,7 @@ public: template CompletionToken> auto delete_selfmanaged_snap(int64_t pool, std::uint64_t snap, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [pool, snap, this](auto&& handler) { delete_selfmanaged_snap_(pool, snap, std::move(handler)); @@ -1545,9 +1542,7 @@ public: template CompletionToken> auto create_pool(std::string name, std::optional crush_rule, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [crush_rule, this](auto&& handler, std::string name) { create_pool_(std::move(name), crush_rule, @@ -1557,9 +1552,7 @@ public: template CompletionToken> auto delete_pool(std::string name, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [this](auto&& handler, std::string name) { delete_pool_(std::move(name), std::move(handler)); @@ -1568,9 +1561,7 @@ public: template CompletionToken> auto delete_pool(int64_t pool, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [pool, this](auto&& handler) { delete_pool_(pool, std::move(handler)); @@ -1583,9 +1574,7 @@ public: using PoolStatComp = boost::asio::any_completion_handler; template CompletionToken> auto stat_pools(std::vector pools, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [this](auto&& handler, std::vector pools) { stat_pools_(std::move(pools), std::move(handler)); @@ -1597,9 +1586,7 @@ public: using StatFSComp = boost::asio::any_completion_handler; template CompletionToken> auto statfs(std::optional pool, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [pool, this](auto&& handler) { statfs_(pool, std::move(handler)); @@ -1619,9 +1606,7 @@ public: auto watch(Object o, IOContext ioc, std::optional timeout, WatchCB cb, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [timeout, this](auto&& handler, Object o, IOContext ioc, WatchCB cb) { watch_(std::move(o), std::move(ioc), timeout, std::move(cb), @@ -1633,9 +1618,7 @@ public: auto watch(Object o, IOContext ioc, CompletionToken&& token, std::optional timeout = std::nullopt, std::uint32_t queue_size = 128u) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [timeout, queue_size, this] (auto&& handler, Object o, IOContext ioc) mutable { @@ -1651,9 +1634,7 @@ public: template CompletionToken> auto next_notification(uint64_t cookie, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate< decltype(consigned), NextNotificationSig>( [cookie, this](auto&& handler) mutable { @@ -1666,9 +1647,7 @@ public: uint64_t notify_id, uint64_t cookie, ceph::buffer::list bl, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [notify_id, cookie, this](auto&& handler, Object o, IOContext ioc, buffer::list bl) { @@ -1680,9 +1659,7 @@ public: template CompletionToken> auto unwatch(std::uint64_t cookie, IOContext ioc, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [cookie, this](auto&& handler, IOContext ioc) { unwatch_(cookie, std::move(ioc), std::move(handler)); @@ -1697,9 +1674,7 @@ public: using VoidOpComp = boost::asio::any_completion_handler; template CompletionToken> auto flush_watch(CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [this](auto&& handler) { flush_watch_(std::move(handler)); @@ -1720,9 +1695,7 @@ public: auto notify(Object o, IOContext ioc, ceph::buffer::list bl, std::optional timeout, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [timeout, this](auto&& handler, Object o, IOContext ioc, buffer::list bl) { @@ -1742,9 +1715,7 @@ public: Cursor end, std::uint32_t max, ceph::buffer::list filter, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [max, this](auto&& handler, IOContext ioc, Cursor begin, Cursor end, buffer::list filter) { @@ -1760,9 +1731,7 @@ public: template CompletionToken> auto osd_command(int osd, std::vector cmd, ceph::buffer::list in, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [osd, this](auto&& handler, std::vector cmd, buffer::list in) { @@ -1773,9 +1742,7 @@ public: template CompletionToken> auto pg_command(PG pg, std::vector cmd, ceph::buffer::list in, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [this](auto&& handler, PG pg, std::vector cmd, buffer::list in) { @@ -1789,9 +1756,7 @@ public: ceph::buffer::list bl, std::string* outs, ceph::buffer::list* outbl, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [outs, outbl, this](auto&& handler, std::vector command, buffer::list bl) { @@ -1803,9 +1768,7 @@ public: template CompletionToken> auto enable_application(std::string pool, std::string app_name, bool force, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [force, this](auto&& handler, std::string pool, std::string app_name) { enable_application_(std::move(pool), std::move(app_name), force, @@ -1817,9 +1780,7 @@ public: auto blocklist_add(std::string client_address, std::optional expire, CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [expire, this](auto&& handler, std::string client_address) { blocklist_add_(std::move(client_address), expire, @@ -1829,9 +1790,7 @@ public: template CompletionToken> auto wait_for_latest_osd_map(CompletionToken&& token) { - auto consigned = boost::asio::consign( - std::forward(token), boost::asio::make_work_guard( - boost::asio::get_associated_executor(token, get_executor()))); + auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( [this](auto&& handler) { wait_for_latest_osd_map_(std::move(handler)); @@ -1846,7 +1805,7 @@ private: friend Builder; - RADOS(std::unique_ptr impl); + RADOS(std::shared_ptr impl); static void make_with_cct_(CephContext* cct, boost::asio::io_context& ioctx, BuildComp c); @@ -1930,7 +1889,7 @@ private: void wait_for_latest_osd_map_(SimpleOpComp c); // Proxy object to provide access to low-level RADOS messaging clients - std::unique_ptr impl; + std::shared_ptr impl; }; #pragma clang diagnostic pop diff --git a/src/neorados/RADOS.cc b/src/neorados/RADOS.cc index f0b100bf96da6..ff98d3a94885e 100644 --- a/src/neorados/RADOS.cc +++ b/src/neorados/RADOS.cc @@ -897,9 +897,10 @@ void RADOS::make_with_cct_(CephContext* cct, asio::io_context& ioctx, BuildComp c) { try { - auto r = new detail::NeoClient{std::make_unique(ioctx, cct)}; + auto r = std::make_shared( + std::make_unique(ioctx, cct)); r->objecter->wait_for_osd_map( - [c = std::move(c), r = std::unique_ptr(r)]() mutable { + [c = std::move(c), r = std::move(r)]() mutable { asio::dispatch(asio::append(std::move(c), bs::error_code{}, RADOS{std::move(r)})); }); @@ -910,14 +911,17 @@ void RADOS::make_with_cct_(CephContext* cct, } RADOS RADOS::make_with_librados(librados::Rados& rados) { - return RADOS{std::make_unique(rados.client)}; + return RADOS{std::make_shared(rados.client)}; } RADOS::RADOS() = default; -RADOS::RADOS(std::unique_ptr impl) +RADOS::RADOS(std::shared_ptr impl) : impl(std::move(impl)) {} +RADOS::RADOS(const RADOS&) = default; +RADOS& RADOS::operator =(const RADOS&) = default; + RADOS::RADOS(RADOS&&) = default; RADOS& RADOS::operator =(RADOS&&) = default; @@ -1382,8 +1386,12 @@ class Notifier : public async::service_list_base_hook { std::deque handlers; std::mutex m; uint64_t next_id = 0; + std::shared_ptr neoref; void service_shutdown() { + if (neoref) { + neoref = nullptr; + } if (linger_op) { linger_op->put(); } @@ -1394,10 +1402,11 @@ class Notifier : public async::service_list_base_hook { public: Notifier(asio::io_context::executor_type ex, Objecter::LingerOp* linger_op, - uint32_t capacity) + uint32_t capacity, std::shared_ptr neoref) : ex(ex), linger_op(linger_op), capacity(capacity), svc(asio::use_service>( - asio::query(ex, boost::asio::execution::context))) { + asio::query(ex, boost::asio::execution::context))), + neoref(std::move(neoref)) { // register for service_shutdown() notifications svc.add(*this); } @@ -1534,7 +1543,7 @@ void RADOS::watch_(Object o, IOContext _ioc, WatchComp c, uint64_t cookie = linger_op->get_cookie(); // Shared pointer to avoid a potential race condition linger_op->user_data.emplace>( - std::make_shared(get_executor(), linger_op, queue_size)); + std::make_shared(get_executor(), linger_op, queue_size, impl)); auto& n = ceph::any_cast&>( linger_op->user_data); linger_op->handle = std::ref(*n); diff --git a/src/neorados/RADOSImpl.cc b/src/neorados/RADOSImpl.cc index c40acf52d04d5..e4d98eea25047 100644 --- a/src/neorados/RADOSImpl.cc +++ b/src/neorados/RADOSImpl.cc @@ -16,10 +16,6 @@ #include -#include "common/common_init.h" - -#include "global/global_init.h" - namespace neorados { namespace detail { @@ -83,17 +79,7 @@ RADOS::RADOS(boost::asio::io_context& ioctx, } RADOS::~RADOS() { - if (objecter && objecter->initialized) { - objecter->shutdown(); - } - - mgrclient.shutdown(); - monclient.shutdown(); - - if (messenger) { - messenger->shutdown(); - messenger->wait(); - } + shutdown(); } bool RADOS::ms_dispatch(Message *m) @@ -116,5 +102,19 @@ bool RADOS::ms_handle_refused(Connection *con) { return false; } +void RADOS::shutdown() { + if (objecter && objecter->initialized) { + objecter->shutdown(); + } + + // These shutdowns are idempotent + mgrclient.shutdown(); + monclient.shutdown(); + + if (messenger) { + messenger->shutdown(); + messenger->wait(); + } +} } // namespace detail } // namespace neorados diff --git a/src/neorados/RADOSImpl.h b/src/neorados/RADOSImpl.h index 9e3adea3a2bd0..b297037703112 100644 --- a/src/neorados/RADOSImpl.h +++ b/src/neorados/RADOSImpl.h @@ -14,11 +14,13 @@ #ifndef CEPH_NEORADOS_RADOSIMPL_H #define CEPH_NEORADOS_RADOSIMPL_H +#include #include -#include #include +#include "common/async/service.h" + #include "common/ceph_context.h" #include "common/ceph_mutex.h" @@ -40,15 +42,14 @@ namespace detail { class NeoClient; -class RADOS : public Dispatcher -{ +class RADOS : public Dispatcher { friend ::neorados::RADOS; friend NeoClient; boost::asio::io_context& ioctx; boost::intrusive_ptr cct; - ceph::mutex lock = ceph::make_mutex("RADOS_unleashed::_::RADOSImpl"); + ceph::mutex lock = ceph::make_mutex("neorados::detail::RADOSImpl"); int instance_id = -1; std::unique_ptr messenger; @@ -57,6 +58,7 @@ class RADOS : public Dispatcher MgrClient mgrclient; std::unique_ptr objecter; + std::atomic finished = false; public: @@ -70,9 +72,10 @@ public: mon_feature_t get_required_monitor_features() const { return monclient.with_monmap(std::mem_fn(&MonMap::get_required_features)); } + void shutdown(); }; -class Client { +class Client : public std::enable_shared_from_this { public: Client(boost::asio::io_context& ioctx, boost::intrusive_ptr cct, @@ -97,19 +100,38 @@ public: virtual int get_instance_id() const = 0; }; -class NeoClient : public Client { +class NeoClient : public Client, + public ceph::async::service_list_base_hook { public: + NeoClient(std::unique_ptr&& rados) : Client(rados->ioctx, rados->cct, rados->monclient, - rados->objecter.get()), + rados->objecter.get()), + svc(boost::asio::use_service>( + boost::asio::query(ioctx.get_executor(), + boost::asio::execution::context))), rados(std::move(rados)) { + svc.add(*this); + } + + ~NeoClient() { + svc.remove(*this); } int get_instance_id() const override { return rados->instance_id; } + void service_shutdown() { + // In case the last owner of a reference is an op we're about to + // cancel. (This can happen if the `RADOS` object + auto service_ref = shared_from_this(); + rados->shutdown(); + } + private: + friend ceph::async::service; + async::service& svc; std::unique_ptr rados; }; diff --git a/src/test/librados_test_stub/NeoradosTestStub.cc b/src/test/librados_test_stub/NeoradosTestStub.cc index a1b61f2459b3a..fa04bfa40a7c3 100644 --- a/src/test/librados_test_stub/NeoradosTestStub.cc +++ b/src/test/librados_test_stub/NeoradosTestStub.cc @@ -558,7 +558,7 @@ RADOS::RADOS() = default; RADOS::RADOS(RADOS&&) = default; -RADOS::RADOS(std::unique_ptr impl) +RADOS::RADOS(std::shared_ptr impl) : impl(std::move(impl)) { } -- 2.39.5