From 3b62080aa4fa79f58ea71c6732c5532d84745149 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Fri, 25 Apr 2025 17:40:05 -0400 Subject: [PATCH] rgw/multi: Give tasks a reference to RGWDataChangesLog Also run them in strands. Also `datalog_rados` is a `shared_ptr`, now. Probably make it intrusive later. Signed-off-by: Adam C. Emerson (cherry picked from commit 3c2b587ead6b0cb5acfd84788958dd957d020875) Conflicts: src/rgw/driver/rados/rgw_service.cc src/rgw/rgw_sal.cc - `#ifdef`s for standalone Rados src/rgw/driver/rados/rgw_datalog.cc - Periodic re-run of recovery removed in main and pending backport Signed-off-by: Adam C. Emerson --- src/include/neorados/RADOS.hpp | 4 ++ src/rgw/driver/rados/rgw_datalog.cc | 95 ++++++++++++--------------- src/rgw/driver/rados/rgw_datalog.h | 36 +++++----- src/rgw/driver/rados/rgw_rados.cc | 7 +- src/rgw/driver/rados/rgw_sal_rados.cc | 11 +++- src/rgw/driver/rados/rgw_service.cc | 4 +- src/rgw/driver/rados/rgw_service.h | 2 +- src/rgw/rgw_sal.cc | 6 +- src/test/rgw/test_datalog.cc | 24 +++---- 9 files changed, 94 insertions(+), 95 deletions(-) diff --git a/src/include/neorados/RADOS.hpp b/src/include/neorados/RADOS.hpp index 7a129adbc2ad6..d28be2935ed52 100644 --- a/src/include/neorados/RADOS.hpp +++ b/src/include/neorados/RADOS.hpp @@ -1382,6 +1382,10 @@ public: executor_type get_executor() const; boost::asio::io_context& get_io_context(); + operator bool() { + return bool(impl); + } + private: template auto consign(CompletionToken&& token) { diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc index 9e65bf75e325c..6c0464dbf8828 100644 --- a/src/rgw/driver/rados/rgw_datalog.cc +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -18,7 +18,6 @@ #include #include -#include "common/async/parallel_for_each.h" #include "include/fs_types.h" #include "include/neorados/RADOS.hpp" @@ -357,17 +356,18 @@ public: } }; -RGWDataChangesLog::RGWDataChangesLog(CephContext* cct) - : cct(cct), +RGWDataChangesLog::RGWDataChangesLog(rgw::sal::RadosStore* driver) + : cct(driver->ctx()), rados(driver->get_neorados()), + executor(driver->get_io_context().get_executor()), num_shards(cct->_conf->rgw_data_log_num_shards), prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size) {} RGWDataChangesLog::RGWDataChangesLog(CephContext *cct, bool log_data, - neorados::RADOS *rados, + neorados::RADOS rados, std::optional num_shards, std::optional sem_max_keys) - : cct(cct), rados(rados), log_data(log_data), + : cct(cct), rados(rados), log_data(log_data), executor(rados.get_executor()), num_shards(num_shards ? *num_shards : cct->_conf->rgw_data_log_num_shards), prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size), @@ -439,14 +439,11 @@ void DataLogBackends::handle_empty_to(uint64_t new_tail) { int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, const RGWZone* zone, const RGWZoneParams& zoneparams, - rgw::sal::RadosStore* store, bool background_tasks) noexcept { log_data = zone->log_data; - rados = &store->get_neorados(); try { - // Blocking in startup code, not ideal, but won't hurt anything. - asio::co_spawn(store->get_io_context(), + asio::co_spawn(executor, start(dpp, zoneparams.log_pool, background_tasks, background_tasks, background_tasks), @@ -473,7 +470,6 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp, bool renew) { down_flag = false; - cancel_strand = asio::make_strand(rados->get_executor()); ran_background = (recovery || watch || renew); auto defbacking = to_log_type( @@ -509,10 +505,10 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp, if (renew) { asio::co_spawn( - co_await asio::this_coro::executor, - renew_run(renew_signal), - asio::bind_cancellation_slot(renew_signal->slot(), - asio::bind_executor(*cancel_strand, + renew_strand, + renew_run(shared_from_this()), + asio::bind_cancellation_slot(renew_signal.slot(), + asio::bind_executor(renew_strand, asio::detached))); } if (watch) { @@ -524,20 +520,20 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp, "Unable to establish recovery watch!"}; } asio::co_spawn( - co_await asio::this_coro::executor, - watch_loop(watch_signal), - asio::bind_cancellation_slot(watch_signal->slot(), - asio::bind_executor(*cancel_strand, + watch_strand, + watch_loop(shared_from_this()), + asio::bind_cancellation_slot(watch_signal.slot(), + asio::bind_executor(watch_strand, asio::detached))); } if (recovery) { // Recovery can run concurrent with normal operation, so we don't // have to block startup while we do all that I/O. asio::co_spawn( - co_await asio::this_coro::executor, - recover(dpp, recovery_signal), - asio::bind_cancellation_slot(recovery_signal->slot(), - asio::bind_executor(*cancel_strand, + recovery_strand, + recover(dpp, shared_from_this()), + asio::bind_cancellation_slot(recovery_signal.slot(), + asio::bind_executor(recovery_strand, asio::detached))); } co_return; @@ -664,7 +660,9 @@ RGWDataChangesLog::process_notification(const DoutPrefixProvider* dpp, } } -asio::awaitable RGWDataChangesLog::watch_loop(decltype(watch_signal)) { +asio::awaitable +RGWDataChangesLog::watch_loop(std::shared_ptr) +{ const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: "); const auto oid = get_sem_set_oid(0); bool need_rewatch = false; @@ -1259,21 +1257,18 @@ asio::awaitable RGWDataChangesLog::shutdown() { } renew_stop(); // Revisit this later - if (renew_signal) - asio::dispatch(*cancel_strand, - [this]() { - renew_signal->emit(asio::cancellation_type::terminal); - }); - if (recovery_signal) - asio::dispatch(*cancel_strand, - [this]() { - recovery_signal->emit(asio::cancellation_type::terminal); - }); - if (watch_signal) - asio::dispatch(*cancel_strand, - [this]() { - watch_signal->emit(asio::cancellation_type::terminal); - }); + asio::dispatch(renew_strand, + [this]() { + renew_signal.emit(asio::cancellation_type::terminal); + }); + asio::dispatch(recovery_strand, + [this]() { + recovery_signal.emit(asio::cancellation_type::terminal); + }); + asio::dispatch(watch_strand, + [this]() { + watch_signal.emit(asio::cancellation_type::terminal); + }); if (watchcookie && rados->check_watch(watchcookie)) { auto wc = watchcookie; watchcookie = 0; @@ -1286,15 +1281,6 @@ asio::awaitable RGWDataChangesLog::shutdown_or_timeout() { using namespace asio::experimental::awaitable_operators; asio::steady_timer t(co_await asio::this_coro::executor, 3s); co_await (shutdown() || t.async_wait(asio::use_awaitable)); - if (renew_signal) { - renew_signal->emit(asio::cancellation_type::terminal); - } - if (recovery_signal) { - recovery_signal->emit(asio::cancellation_type::terminal); - } - if (watch_signal) { - watch_signal->emit(asio::cancellation_type::terminal); - } } RGWDataChangesLog::~RGWDataChangesLog() { @@ -1322,7 +1308,8 @@ void RGWDataChangesLog::blocking_shutdown() { } } -asio::awaitable RGWDataChangesLog::renew_run(decltype(renew_signal)) { +asio::awaitable RGWDataChangesLog::renew_run( + std::shared_ptr) { static constexpr auto runs_per_prune = 150; auto run = 0; renew_timer.emplace(co_await asio::this_coro::executor); @@ -1595,14 +1582,14 @@ RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index) co_return; } -asio::awaitable RGWDataChangesLog::recover(const DoutPrefixProvider* dpp, - decltype(recovery_signal)) +asio::awaitable RGWDataChangesLog::recover( + const DoutPrefixProvider* dpp, + std::shared_ptr) { - auto strand = asio::make_strand(co_await asio::this_coro::executor); co_await asio::co_spawn( - strand, - [this](const DoutPrefixProvider* dpp)-> asio::awaitable { - auto ex = co_await boost::asio::this_coro::executor; + recovery_strand, + [this](const DoutPrefixProvider* dpp)-> asio::awaitable { + auto ex = recovery_strand; auto group = async::spawn_group{ex, static_cast(num_shards)}; for (auto i = 0; i < num_shards; ++i) { boost::asio::co_spawn(ex, recover_shard(dpp, i), group); diff --git a/src/rgw/driver/rados/rgw_datalog.h b/src/rgw/driver/rados/rgw_datalog.h index 49193b44f6177..d47e4e84eda12 100644 --- a/src/rgw/driver/rados/rgw_datalog.h +++ b/src/rgw/driver/rados/rgw_datalog.h @@ -350,23 +350,27 @@ struct hash { }; } -class RGWDataChangesLog { +class RGWDataChangesLog + : public std::enable_shared_from_this { friend class DataLogTestBase; friend DataLogBackends; CephContext *cct; - neorados::RADOS* rados; - std::optional> cancel_strand; + std::optional rados; neorados::IOContext loc; rgw::BucketChangeObserver *observer = nullptr; bool log_data = false; std::unique_ptr bes; - std::shared_ptr renew_signal = - std::make_shared(); - std::shared_ptr watch_signal = - std::make_shared(); - std::shared_ptr recovery_signal = - std::make_shared(); + using executor_t = asio::io_context::executor_type; + executor_t executor; + using strand_t = asio::strand; + strand_t renew_strand{executor}; + asio::cancellation_signal renew_signal = asio::cancellation_signal(); + strand_t watch_strand{executor}; + asio::cancellation_signal watch_signal = asio::cancellation_signal(); + strand_t recovery_strand{executor}; + asio::cancellation_signal recovery_signal = asio::cancellation_signal(); + ceph::mono_time last_recovery = ceph::mono_clock::zero(); const int num_shards; @@ -410,7 +414,8 @@ class RGWDataChangesLog { ceph::real_time expiration); std::optional renew_timer; - asio::awaitable renew_run(decltype(renew_signal) renew_signal); + asio::awaitable renew_run( + std::shared_ptr renew_signal); void renew_stop(); std::function num_shards = std::nullopt, std::optional sem_max_keys = std::nullopt); ~RGWDataChangesLog(); @@ -441,13 +446,12 @@ public: bool recovery, bool watch, bool renew); int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, - const RGWZoneParams& zoneparams, rgw::sal::RadosStore* store, - bool background_tasks) noexcept; + const RGWZoneParams& zoneparams, bool background_tasks) noexcept; asio::awaitable establish_watch(const DoutPrefixProvider* dpp, std::string_view oid); asio::awaitable process_notification(const DoutPrefixProvider* dpp, std::string_view oid); - asio::awaitable watch_loop(decltype(watch_signal)); + asio::awaitable watch_loop(std::shared_ptr); int choose_oid(const rgw_bucket_shard& bs); asio::awaitable add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, @@ -520,7 +524,7 @@ public: bc::flat_map&& semcount); asio::awaitable recover_shard(const DoutPrefixProvider* dpp, int index); asio::awaitable recover(const DoutPrefixProvider* dpp, - decltype(recovery_signal)); + std::shared_ptr); asio::awaitable shutdown(); asio::awaitable shutdown_or_timeout(); void blocking_shutdown(); diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index bfb2dd3d66c27..1922202ef93d1 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -1463,12 +1463,7 @@ int RGWRados::init_begin(CephContext* _cct, const DoutPrefixProvider *dpp, const rgw::SiteConfig& site) { set_context(_cct); - int ret = driver->init_neorados(dpp); - if (ret < 0) { - ldpp_dout(dpp, 0) << "ERROR: failed to initialize neorados (ret=" << cpp_strerror(-ret) << ")" << dendl; - return ret; - } - ret = init_rados(); + auto ret = init_rados(); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to initialize librados (ret=" << cpp_strerror(-ret) << ")" << dendl; return ret; diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index a8e0a8e22926a..589b9d0dfe034 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -5445,11 +5445,20 @@ int RadosRole::delete_obj(const DoutPrefixProvider *dpp, optional_yield y) extern "C" { -void* newRadosStore(void* io_context) + void* newRadosStore(void* io_context, void* dpp_) { + auto dpp = static_cast(dpp_); rgw::sal::RadosStore* store = new rgw::sal::RadosStore( *static_cast(io_context)); if (store) { + int ret = store->init_neorados(dpp); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to initialize neorados (ret=" << cpp_strerror(-ret) << ")" << dendl; + delete store; + store = nullptr; + return store; + } + RGWRados* rados = new RGWRados(); if (!rados) { diff --git a/src/rgw/driver/rados/rgw_service.cc b/src/rgw/driver/rados/rgw_service.cc index 2918df7b62eb9..9e34cbf00f1eb 100644 --- a/src/rgw/driver/rados/rgw_service.cc +++ b/src/rgw/driver/rados/rgw_service.cc @@ -62,7 +62,7 @@ int RGWServices_Def::init(CephContext *cct, bilog_rados = std::make_unique(cct); cls = std::make_unique(cct); config_key_rados = std::make_unique(cct); - datalog_rados = std::make_unique(cct); + datalog_rados = std::make_shared(driver); mdlog = std::make_unique(cct, run_sync); notify = std::make_unique(cct); zone = std::make_unique(cct); @@ -137,7 +137,7 @@ int RGWServices_Def::init(CephContext *cct, r = datalog_rados->start(dpp, &zone->get_zone(), zone->get_zone_params(), - driver, background_tasks); + background_tasks); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to start datalog_rados service (" << cpp_strerror(-r) << dendl; return r; diff --git a/src/rgw/driver/rados/rgw_service.h b/src/rgw/driver/rados/rgw_service.h index c9cb71c3e308e..3d361351946f6 100644 --- a/src/rgw/driver/rados/rgw_service.h +++ b/src/rgw/driver/rados/rgw_service.h @@ -98,7 +98,7 @@ struct RGWServices_Def std::unique_ptr sysobj_core; std::unique_ptr sysobj_cache; std::unique_ptr user_rados; - std::unique_ptr datalog_rados; + std::shared_ptr datalog_rados; std::unique_ptr async_processor; RGWServices_Def(); diff --git a/src/rgw/rgw_sal.cc b/src/rgw/rgw_sal.cc index acb4f14f352ed..a3646c296c861 100644 --- a/src/rgw/rgw_sal.cc +++ b/src/rgw/rgw_sal.cc @@ -48,7 +48,7 @@ //#define dout_context g_ceph_context extern "C" { -extern rgw::sal::Driver* newRadosStore(boost::asio::io_context* io_context); +extern rgw::sal::Driver* newRadosStore(void* io_context, const void* dpp); #ifdef WITH_RADOSGW_DBSTORE extern rgw::sal::Driver* newDBStore(CephContext *cct); #endif @@ -87,7 +87,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider* rgw::sal::Driver* driver{nullptr}; if (cfg.store_name.compare("rados") == 0) { - driver = newRadosStore(&io_context); + driver = newRadosStore(&io_context, dpp); RGWRados* rados = static_cast(driver)->getRados(); if ((*rados).set_use_cache(use_cache) @@ -241,7 +241,7 @@ rgw::sal::Driver* DriverManager::init_raw_storage_provider(const DoutPrefixProvi { rgw::sal::Driver* driver = nullptr; if (cfg.store_name.compare("rados") == 0) { - driver = newRadosStore(&io_context); + driver = newRadosStore(&io_context, dpp); RGWRados* rados = static_cast(driver)->getRados(); rados->set_context(cct); diff --git a/src/test/rgw/test_datalog.cc b/src/test/rgw/test_datalog.cc index dd00db7ec3828..cc873896ca83c 100644 --- a/src/test/rgw/test_datalog.cc +++ b/src/test/rgw/test_datalog.cc @@ -60,12 +60,12 @@ private: boost::asio::use_awaitable); } - virtual asio::awaitable> + virtual asio::awaitable> create_datalog() = 0; protected: - std::unique_ptr datalog; + std::shared_ptr datalog; neorados::RADOS& rados() noexcept { return *rados_; } const std::string& pool_name() const noexcept { return pool_name_; } @@ -170,7 +170,7 @@ protected: } auto recover(const DoutPrefixProvider* dpp) { - return datalog->recover(dpp, nullptr); + return datalog->recover(dpp, datalog->shared_from_this()); } void add_to_cur_cycle(const BucketGen& bg) { @@ -207,9 +207,9 @@ public: class DataLogTest : public DataLogTestBase { private: - asio::awaitable> create_datalog() override { - auto datalog = std::make_unique(rados().cct(), true, - &rados()); + asio::awaitable> create_datalog() override { + auto datalog = std::make_shared(rados().cct(), true, + rados()); co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false); co_return std::move(datalog); } @@ -217,9 +217,9 @@ private: class DataLogWatchless : public DataLogTestBase { private: - asio::awaitable> create_datalog() override { - auto datalog = std::make_unique(rados().cct(), true, - &rados()); + asio::awaitable> create_datalog() override { + auto datalog = std::make_shared(rados().cct(), true, + rados()); co_await datalog->start(dpp(), rgw_pool(pool_name()), false, false, false); co_return std::move(datalog); } @@ -227,11 +227,11 @@ private: class DataLogBulky : public DataLogTestBase { private: - asio::awaitable> create_datalog() override { + asio::awaitable> create_datalog() override { // Decrease max push/list and force everything into one shard so we // can test iterated increment/decrement/list code. - auto datalog = std::make_unique(rados().cct(), true, - &rados(), 1, 7); + auto datalog = std::make_shared(rados().cct(), true, + rados(), 1, 7); co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false); co_return std::move(datalog); } -- 2.39.5