From 4c0f4220e6f764a81912dc13f6e5040b081449ad Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Fri, 25 Apr 2025 17:40:05 -0400 Subject: [PATCH] rgw/multi: Give tasks a reference to RGWDataChangesLog Also run them in strands. Also `datalog_rados` is a `shared_ptr`, now. Probably make it intrusive later. Signed-off-by: Adam C. Emerson --- src/include/neorados/RADOS.hpp | 4 ++ src/rgw/driver/rados/rgw_datalog.cc | 94 ++++++++++++--------------- src/rgw/driver/rados/rgw_datalog.h | 36 +++++----- src/rgw/driver/rados/rgw_rados.cc | 7 +- src/rgw/driver/rados/rgw_sal_rados.cc | 11 +++- src/rgw/driver/rados/rgw_service.cc | 4 +- src/rgw/driver/rados/rgw_service.h | 2 +- src/rgw/rgw_sal.cc | 6 +- src/test/rgw/test_datalog.cc | 24 +++---- 9 files changed, 94 insertions(+), 94 deletions(-) diff --git a/src/include/neorados/RADOS.hpp b/src/include/neorados/RADOS.hpp index c58cf5a9fd733..8986f3948c032 100644 --- a/src/include/neorados/RADOS.hpp +++ b/src/include/neorados/RADOS.hpp @@ -1386,6 +1386,10 @@ public: executor_type get_executor() const; boost::asio::io_context& get_io_context(); + operator bool() { + return bool(impl); + } + private: template auto consign(CompletionToken&& token) { diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc index d68ce48aec794..9aaec5d16fc73 100644 --- a/src/rgw/driver/rados/rgw_datalog.cc +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -17,7 +17,6 @@ #include -#include "common/async/parallel_for_each.h" #include "include/fs_types.h" #include "include/neorados/RADOS.hpp" @@ -356,17 +355,18 @@ public: } }; -RGWDataChangesLog::RGWDataChangesLog(CephContext* cct) - : cct(cct), +RGWDataChangesLog::RGWDataChangesLog(rgw::sal::RadosStore* driver) + : cct(driver->ctx()), rados(driver->get_neorados()), + executor(driver->get_io_context().get_executor()), num_shards(cct->_conf->rgw_data_log_num_shards), prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size) {} RGWDataChangesLog::RGWDataChangesLog(CephContext *cct, bool log_data, - neorados::RADOS *rados, + neorados::RADOS rados, std::optional num_shards, std::optional sem_max_keys) - : cct(cct), rados(rados), log_data(log_data), + : cct(cct), rados(rados), log_data(log_data), executor(rados.get_executor()), num_shards(num_shards ? *num_shards : cct->_conf->rgw_data_log_num_shards), prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size), @@ -438,15 +438,13 @@ void DataLogBackends::handle_empty_to(uint64_t new_tail) { int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, const RGWZone* zone, const RGWZoneParams& zoneparams, - rgw::sal::RadosStore* store, bool background_tasks) { log_data = zone->log_data; - rados = &store->get_neorados(); try { // Blocking in startup code, not ideal, but won't hurt anything. std::exception_ptr eptr - = asio::co_spawn(store->get_io_context(), + = asio::co_spawn(executor, start(dpp, zoneparams.log_pool, background_tasks, background_tasks, background_tasks), @@ -476,7 +474,6 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp, bool renew) { down_flag = false; - cancel_strand = asio::make_strand(rados->get_executor()); ran_background = (recovery || watch || renew); auto defbacking = to_log_type( @@ -512,10 +509,10 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp, if (renew) { asio::co_spawn( - co_await asio::this_coro::executor, - renew_run(renew_signal), - asio::bind_cancellation_slot(renew_signal->slot(), - asio::bind_executor(*cancel_strand, + renew_strand, + renew_run(shared_from_this()), + asio::bind_cancellation_slot(renew_signal.slot(), + asio::bind_executor(renew_strand, asio::detached))); } if (watch) { @@ -527,20 +524,20 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp, "Unable to establish recovery watch!"}; } asio::co_spawn( - co_await asio::this_coro::executor, - watch_loop(watch_signal), - asio::bind_cancellation_slot(watch_signal->slot(), - asio::bind_executor(*cancel_strand, + watch_strand, + watch_loop(shared_from_this()), + asio::bind_cancellation_slot(watch_signal.slot(), + asio::bind_executor(watch_strand, asio::detached))); } if (recovery) { // Recovery can run concurrent with normal operation, so we don't // have to block startup while we do all that I/O. asio::co_spawn( - co_await asio::this_coro::executor, - recover(dpp, recovery_signal), - asio::bind_cancellation_slot(recovery_signal->slot(), - asio::bind_executor(*cancel_strand, + recovery_strand, + recover(dpp, shared_from_this()), + asio::bind_cancellation_slot(recovery_signal.slot(), + asio::bind_executor(recovery_strand, asio::detached))); } co_return; @@ -667,7 +664,9 @@ RGWDataChangesLog::process_notification(const DoutPrefixProvider* dpp, } } -asio::awaitable RGWDataChangesLog::watch_loop(decltype(watch_signal)) { +asio::awaitable +RGWDataChangesLog::watch_loop(std::shared_ptr) +{ const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: "); const auto oid = get_sem_set_oid(0); bool need_rewatch = false; @@ -1363,21 +1362,18 @@ asio::awaitable RGWDataChangesLog::shutdown() { } renew_stop(); // Revisit this later - if (renew_signal) - asio::dispatch(*cancel_strand, - [this]() { - renew_signal->emit(asio::cancellation_type::terminal); - }); - if (recovery_signal) - asio::dispatch(*cancel_strand, - [this]() { - recovery_signal->emit(asio::cancellation_type::terminal); - }); - if (watch_signal) - asio::dispatch(*cancel_strand, - [this]() { - watch_signal->emit(asio::cancellation_type::terminal); - }); + asio::dispatch(renew_strand, + [this]() { + renew_signal.emit(asio::cancellation_type::terminal); + }); + asio::dispatch(recovery_strand, + [this]() { + recovery_signal.emit(asio::cancellation_type::terminal); + }); + asio::dispatch(watch_strand, + [this]() { + watch_signal.emit(asio::cancellation_type::terminal); + }); if (watchcookie && rados->check_watch(watchcookie)) { auto wc = watchcookie; watchcookie = 0; @@ -1390,15 +1386,6 @@ asio::awaitable RGWDataChangesLog::shutdown_or_timeout() { using namespace asio::experimental::awaitable_operators; asio::steady_timer t(co_await asio::this_coro::executor, 3s); co_await (shutdown() || t.async_wait(asio::use_awaitable)); - if (renew_signal) { - renew_signal->emit(asio::cancellation_type::terminal); - } - if (recovery_signal) { - recovery_signal->emit(asio::cancellation_type::terminal); - } - if (watch_signal) { - watch_signal->emit(asio::cancellation_type::terminal); - } } RGWDataChangesLog::~RGWDataChangesLog() { @@ -1429,7 +1416,8 @@ void RGWDataChangesLog::blocking_shutdown() { } } -asio::awaitable RGWDataChangesLog::renew_run(decltype(renew_signal)) { +asio::awaitable RGWDataChangesLog::renew_run( + std::shared_ptr) { static constexpr auto runs_per_prune = 150; auto run = 0; renew_timer.emplace(co_await asio::this_coro::executor); @@ -1733,14 +1721,14 @@ RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index) co_return; } -asio::awaitable RGWDataChangesLog::recover(const DoutPrefixProvider* dpp, - decltype(recovery_signal)) +asio::awaitable RGWDataChangesLog::recover( + const DoutPrefixProvider* dpp, + std::shared_ptr) { - auto strand = asio::make_strand(co_await asio::this_coro::executor); co_await asio::co_spawn( - strand, - [this](const DoutPrefixProvider* dpp)-> asio::awaitable { - auto ex = co_await boost::asio::this_coro::executor; + recovery_strand, + [this](const DoutPrefixProvider* dpp)-> asio::awaitable { + auto ex = recovery_strand; auto group = async::spawn_group{ex, static_cast(num_shards)}; for (auto i = 0; i < num_shards; ++i) { boost::asio::co_spawn(ex, recover_shard(dpp, i), group); diff --git a/src/rgw/driver/rados/rgw_datalog.h b/src/rgw/driver/rados/rgw_datalog.h index 161b2d91776e0..e1846ba505e0e 100644 --- a/src/rgw/driver/rados/rgw_datalog.h +++ b/src/rgw/driver/rados/rgw_datalog.h @@ -350,23 +350,27 @@ struct hash { }; } -class RGWDataChangesLog { +class RGWDataChangesLog + : public std::enable_shared_from_this { friend class DataLogTestBase; friend DataLogBackends; CephContext *cct; - neorados::RADOS* rados; - std::optional> cancel_strand; + std::optional rados; neorados::IOContext loc; rgw::BucketChangeObserver *observer = nullptr; bool log_data = false; std::unique_ptr bes; - std::shared_ptr renew_signal = - std::make_shared(); - std::shared_ptr watch_signal = - std::make_shared(); - std::shared_ptr recovery_signal = - std::make_shared(); + using executor_t = asio::io_context::executor_type; + executor_t executor; + using strand_t = asio::strand; + strand_t renew_strand{executor}; + asio::cancellation_signal renew_signal = asio::cancellation_signal(); + strand_t watch_strand{executor}; + asio::cancellation_signal watch_signal = asio::cancellation_signal(); + strand_t recovery_strand{executor}; + asio::cancellation_signal recovery_signal = asio::cancellation_signal(); + ceph::mono_time last_recovery = ceph::mono_clock::zero(); const int num_shards; @@ -410,7 +414,8 @@ class RGWDataChangesLog { ceph::real_time expiration); std::optional renew_timer; - asio::awaitable renew_run(decltype(renew_signal) renew_signal); + asio::awaitable renew_run( + std::shared_ptr renew_signal); void renew_stop(); std::function num_shards = std::nullopt, std::optional sem_max_keys = std::nullopt); ~RGWDataChangesLog(); @@ -441,13 +446,12 @@ public: bool recovery, bool watch, bool renew); int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, - const RGWZoneParams& zoneparams, rgw::sal::RadosStore* store, - bool background_tasks); + const RGWZoneParams& zoneparams, bool background_tasks); asio::awaitable establish_watch(const DoutPrefixProvider* dpp, std::string_view oid); asio::awaitable process_notification(const DoutPrefixProvider* dpp, std::string_view oid); - asio::awaitable watch_loop(decltype(watch_signal)); + asio::awaitable watch_loop(std::shared_ptr); int choose_oid(const rgw_bucket_shard& bs); asio::awaitable add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, @@ -529,7 +533,7 @@ public: bc::flat_map&& semcount); asio::awaitable recover_shard(const DoutPrefixProvider* dpp, int index); asio::awaitable recover(const DoutPrefixProvider* dpp, - decltype(recovery_signal)); + std::shared_ptr); asio::awaitable shutdown(); asio::awaitable shutdown_or_timeout(); void blocking_shutdown(); diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 4ab7fe06e94f2..cfd897aabc014 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -1467,12 +1467,7 @@ int RGWRados::init_begin(CephContext* _cct, const DoutPrefixProvider *dpp, const rgw::SiteConfig& site, rgw::sal::ConfigStore* cfgstore) { set_context(_cct); - int ret = driver->init_neorados(dpp); - if (ret < 0) { - ldpp_dout(dpp, 0) << "ERROR: failed to initialize neorados (ret=" << cpp_strerror(-ret) << ")" << dendl; - return ret; - } - ret = init_rados(); + auto ret = init_rados(); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to initialize librados (ret=" << cpp_strerror(-ret) << ")" << dendl; return ret; diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index b8c1d7c26d21e..2221d30f7efbc 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -5483,11 +5483,20 @@ int RadosRole::delete_obj(const DoutPrefixProvider *dpp, optional_yield y) extern "C" { -void* newRadosStore(void* io_context) + void* newRadosStore(void* io_context, void* dpp_) { + auto dpp = static_cast(dpp_); rgw::sal::RadosStore* store = new rgw::sal::RadosStore( *static_cast(io_context)); if (store) { + int ret = store->init_neorados(dpp); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to initialize neorados (ret=" << cpp_strerror(-ret) << ")" << dendl; + delete store; + store = nullptr; + return store; + } + RGWRados* rados = new RGWRados(); if (!rados) { diff --git a/src/rgw/driver/rados/rgw_service.cc b/src/rgw/driver/rados/rgw_service.cc index b1c9a40bf0e13..23d1473badca7 100644 --- a/src/rgw/driver/rados/rgw_service.cc +++ b/src/rgw/driver/rados/rgw_service.cc @@ -64,7 +64,7 @@ int RGWServices_Def::init(CephContext *cct, bilog_rados = std::make_unique(cct); cls = std::make_unique(cct); config_key_rados = std::make_unique(cct); - datalog_rados = std::make_unique(cct); + datalog_rados = std::make_shared(driver); mdlog = std::make_unique(cct, run_sync, cfgstore); notify = std::make_unique(cct); zone = std::make_unique(cct, cfgstore, site); @@ -139,7 +139,7 @@ int RGWServices_Def::init(CephContext *cct, r = datalog_rados->start(dpp, &zone->get_zone(), zone->get_zone_params(), - driver, background_tasks); + background_tasks); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to start datalog_rados service (" << cpp_strerror(-r) << dendl; return r; diff --git a/src/rgw/driver/rados/rgw_service.h b/src/rgw/driver/rados/rgw_service.h index e4d3df4f91105..06643cae543fb 100644 --- a/src/rgw/driver/rados/rgw_service.h +++ b/src/rgw/driver/rados/rgw_service.h @@ -101,7 +101,7 @@ struct RGWServices_Def std::unique_ptr sysobj_core; std::unique_ptr sysobj_cache; std::unique_ptr user_rados; - std::unique_ptr datalog_rados; + std::shared_ptr datalog_rados; std::unique_ptr async_processor; RGWServices_Def(); diff --git a/src/rgw/rgw_sal.cc b/src/rgw/rgw_sal.cc index fe85704d3c844..5e7e20a796bf9 100644 --- a/src/rgw/rgw_sal.cc +++ b/src/rgw/rgw_sal.cc @@ -49,7 +49,7 @@ extern "C" { #ifdef WITH_RADOSGW_RADOS -extern rgw::sal::Driver* newRadosStore(boost::asio::io_context* io_context); +extern rgw::sal::Driver* newRadosStore(void* io_context, const void* dpp); #endif #ifdef WITH_RADOSGW_DBSTORE extern rgw::sal::Driver* newDBStore(CephContext *cct); @@ -90,7 +90,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider* if (cfg.store_name.compare("rados") == 0) { #ifdef WITH_RADOSGW_RADOS - driver = newRadosStore(&io_context); + driver = newRadosStore(&io_context, dpp); RGWRados* rados = static_cast(driver)->getRados(); if ((*rados).set_use_cache(use_cache) @@ -248,7 +248,7 @@ rgw::sal::Driver* DriverManager::init_raw_storage_provider(const DoutPrefixProvi rgw::sal::Driver* driver = nullptr; if (cfg.store_name.compare("rados") == 0) { #ifdef WITH_RADOSGW_RADOS - driver = newRadosStore(&io_context); + driver = newRadosStore(&io_context, dpp); RGWRados* rados = static_cast(driver)->getRados(); rados->set_context(cct); diff --git a/src/test/rgw/test_datalog.cc b/src/test/rgw/test_datalog.cc index 2fea326c81a62..a125019c26960 100644 --- a/src/test/rgw/test_datalog.cc +++ b/src/test/rgw/test_datalog.cc @@ -60,12 +60,12 @@ private: boost::asio::use_awaitable); } - virtual asio::awaitable> + virtual asio::awaitable> create_datalog() = 0; protected: - std::unique_ptr datalog; + std::shared_ptr datalog; neorados::RADOS& rados() noexcept { return *rados_; } const std::string& pool_name() const noexcept { return pool_name_; } @@ -170,7 +170,7 @@ protected: } auto recover(const DoutPrefixProvider* dpp) { - return datalog->recover(dpp, nullptr); + return datalog->recover(dpp, datalog->shared_from_this()); } void add_to_cur_cycle(const BucketGen& bg) { @@ -207,9 +207,9 @@ public: class DataLogTest : public DataLogTestBase { private: - asio::awaitable> create_datalog() override { - auto datalog = std::make_unique(rados().cct(), true, - &rados()); + asio::awaitable> create_datalog() override { + auto datalog = std::make_shared(rados().cct(), true, + rados()); co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false); co_return std::move(datalog); } @@ -217,9 +217,9 @@ private: class DataLogWatchless : public DataLogTestBase { private: - asio::awaitable> create_datalog() override { - auto datalog = std::make_unique(rados().cct(), true, - &rados()); + asio::awaitable> create_datalog() override { + auto datalog = std::make_shared(rados().cct(), true, + rados()); co_await datalog->start(dpp(), rgw_pool(pool_name()), false, false, false); co_return std::move(datalog); } @@ -227,11 +227,11 @@ private: class DataLogBulky : public DataLogTestBase { private: - asio::awaitable> create_datalog() override { + asio::awaitable> create_datalog() override { // Decrease max push/list and force everything into one shard so we // can test iterated increment/decrement/list code. - auto datalog = std::make_unique(rados().cct(), true, - &rados(), 1, 7); + auto datalog = std::make_shared(rados().cct(), true, + rados(), 1, 7); co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false); co_return std::move(datalog); } -- 2.39.5