]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/multi: Give tasks a reference to RGWDataChangesLog
authorAdam C. Emerson <aemerson@redhat.com>
Fri, 25 Apr 2025 21:40:05 +0000 (17:40 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 9 Sep 2025 00:47:07 +0000 (20:47 -0400)
Also run them in strands. Also `datalog_rados` is a `shared_ptr`,
now. Probably make it intrusive later.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
(cherry picked from commit 3c2b587ead6b0cb5acfd84788958dd957d020875)

Conflicts:
src/rgw/driver/rados/rgw_service.cc
src/rgw/rgw_sal.cc
 - `#ifdef`s for standalone Rados
src/rgw/driver/rados/rgw_datalog.cc
 - Periodic re-run of recovery removed in main and pending backport

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/include/neorados/RADOS.hpp
src/rgw/driver/rados/rgw_datalog.cc
src/rgw/driver/rados/rgw_datalog.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_service.cc
src/rgw/driver/rados/rgw_service.h
src/rgw/rgw_sal.cc
src/test/rgw/test_datalog.cc

index 7a129adbc2ad64e1037da6733a9f86f52e62357b..d28be2935ed52eb20566719f964991140ca4abfb 100644 (file)
@@ -1382,6 +1382,10 @@ public:
   executor_type get_executor() const;
   boost::asio::io_context& get_io_context();
 
+  operator bool() {
+    return bool(impl);
+  }
+
 private:
   template<typename CompletionToken>
   auto consign(CompletionToken&& token) {
index 9e65bf75e325c5d59d8d63faf31c198adccbb03d..6c0464dbf88288f6350eeea18c960e1f71b53cc1 100644 (file)
@@ -18,7 +18,6 @@
 #include <boost/system/system_error.hpp>
 #include <boost/system/generic_category.hpp>
 
-#include "common/async/parallel_for_each.h"
 #include "include/fs_types.h"
 #include "include/neorados/RADOS.hpp"
 
@@ -357,17 +356,18 @@ public:
   }
 };
 
-RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
-  : cct(cct),
+RGWDataChangesLog::RGWDataChangesLog(rgw::sal::RadosStore* driver)
+  : cct(driver->ctx()), rados(driver->get_neorados()),
+    executor(driver->get_io_context().get_executor()),
     num_shards(cct->_conf->rgw_data_log_num_shards),
     prefix(get_prefix()),
     changes(cct->_conf->rgw_data_log_changes_size) {}
 
 RGWDataChangesLog::RGWDataChangesLog(CephContext *cct, bool log_data,
-                                     neorados::RADOS *rados,
+                                     neorados::RADOS rados,
                                      std::optional<int> num_shards,
                                      std::optional<uint64_t> sem_max_keys)
-    : cct(cct), rados(rados), log_data(log_data),
+  : cct(cct), rados(rados), log_data(log_data), executor(rados.get_executor()), 
       num_shards(num_shards ? *num_shards :
                 cct->_conf->rgw_data_log_num_shards),
       prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size),
@@ -439,14 +439,11 @@ void DataLogBackends::handle_empty_to(uint64_t new_tail) {
 int RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
                             const RGWZone* zone,
                             const RGWZoneParams& zoneparams,
-                            rgw::sal::RadosStore* store,
                             bool background_tasks) noexcept
 {
   log_data = zone->log_data;
-  rados = &store->get_neorados();
   try {
-    // Blocking in startup code, not ideal, but won't hurt anything.
-    asio::co_spawn(store->get_io_context(),
+    asio::co_spawn(executor,
                   start(dpp, zoneparams.log_pool,
                         background_tasks, background_tasks,
                         background_tasks),
@@ -473,7 +470,6 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
                         bool renew)
 {
   down_flag = false;
-  cancel_strand = asio::make_strand(rados->get_executor());
   ran_background = (recovery || watch || renew);
 
   auto defbacking = to_log_type(
@@ -509,10 +505,10 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
 
   if (renew) {
     asio::co_spawn(
-      co_await asio::this_coro::executor,
-      renew_run(renew_signal),
-      asio::bind_cancellation_slot(renew_signal->slot(),
-                                  asio::bind_executor(*cancel_strand,
+      renew_strand,
+      renew_run(shared_from_this()),
+      asio::bind_cancellation_slot(renew_signal.slot(),
+                                  asio::bind_executor(renew_strand,
                                                       asio::detached)));
   }
   if (watch) {
@@ -524,20 +520,20 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
                              "Unable to establish recovery watch!"};
     }
     asio::co_spawn(
-      co_await asio::this_coro::executor,
-      watch_loop(watch_signal),
-      asio::bind_cancellation_slot(watch_signal->slot(),
-                                  asio::bind_executor(*cancel_strand,
+      watch_strand,
+      watch_loop(shared_from_this()),
+      asio::bind_cancellation_slot(watch_signal.slot(),
+                                  asio::bind_executor(watch_strand,
                                                       asio::detached)));
   }
   if (recovery) {
     // Recovery can run concurrent with normal operation, so we don't
     // have to block startup while we do all that I/O.
     asio::co_spawn(
-      co_await asio::this_coro::executor,
-      recover(dpp, recovery_signal),
-      asio::bind_cancellation_slot(recovery_signal->slot(),
-                                  asio::bind_executor(*cancel_strand,
+      recovery_strand,
+      recover(dpp, shared_from_this()),
+      asio::bind_cancellation_slot(recovery_signal.slot(),
+                                  asio::bind_executor(recovery_strand,
                                                       asio::detached)));
   }
   co_return;
@@ -664,7 +660,9 @@ RGWDataChangesLog::process_notification(const DoutPrefixProvider* dpp,
   }
 }
 
-asio::awaitable<void> RGWDataChangesLog::watch_loop(decltype(watch_signal)) {
+asio::awaitable<void>
+RGWDataChangesLog::watch_loop(std::shared_ptr<RGWDataChangesLog>)
+{
   const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: ");
   const auto oid = get_sem_set_oid(0);
   bool need_rewatch = false;
@@ -1259,21 +1257,18 @@ asio::awaitable<void> RGWDataChangesLog::shutdown() {
   }
   renew_stop();
   // Revisit this later
-  if (renew_signal)
-    asio::dispatch(*cancel_strand,
-                  [this]() {
-                    renew_signal->emit(asio::cancellation_type::terminal);
-                  });
-  if (recovery_signal)
-    asio::dispatch(*cancel_strand,
-                  [this]() {
-                    recovery_signal->emit(asio::cancellation_type::terminal);
-                  });
-  if (watch_signal)
-    asio::dispatch(*cancel_strand,
-                  [this]() {
-                    watch_signal->emit(asio::cancellation_type::terminal);
-                  });
+  asio::dispatch(renew_strand,
+                [this]() {
+                  renew_signal.emit(asio::cancellation_type::terminal);
+                });
+  asio::dispatch(recovery_strand,
+                [this]() {
+                  recovery_signal.emit(asio::cancellation_type::terminal);
+                });
+  asio::dispatch(watch_strand,
+                [this]() {
+                  watch_signal.emit(asio::cancellation_type::terminal);
+                });
   if (watchcookie && rados->check_watch(watchcookie)) {
     auto wc = watchcookie;
     watchcookie = 0;
@@ -1286,15 +1281,6 @@ asio::awaitable<void> RGWDataChangesLog::shutdown_or_timeout() {
   using namespace asio::experimental::awaitable_operators;
   asio::steady_timer t(co_await asio::this_coro::executor, 3s);
   co_await (shutdown() || t.async_wait(asio::use_awaitable));
-  if (renew_signal) {
-    renew_signal->emit(asio::cancellation_type::terminal);
-  }
-  if (recovery_signal) {
-    recovery_signal->emit(asio::cancellation_type::terminal);
-  }
-  if (watch_signal) {
-    watch_signal->emit(asio::cancellation_type::terminal);
-  }
 }
 
 RGWDataChangesLog::~RGWDataChangesLog() {
@@ -1322,7 +1308,8 @@ void RGWDataChangesLog::blocking_shutdown() {
   }
 }
 
-asio::awaitable<void> RGWDataChangesLog::renew_run(decltype(renew_signal)) {
+asio::awaitable<void> RGWDataChangesLog::renew_run(
+  std::shared_ptr<RGWDataChangesLog>) {
   static constexpr auto runs_per_prune = 150;
   auto run = 0;
   renew_timer.emplace(co_await asio::this_coro::executor);
@@ -1595,14 +1582,14 @@ RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index)
   co_return;
 }
 
-asio::awaitable<void> RGWDataChangesLog::recover(const DoutPrefixProvider* dpp,
-                                                decltype(recovery_signal))
+asio::awaitable<void> RGWDataChangesLog::recover(
+  const DoutPrefixProvider* dpp,
+  std::shared_ptr<RGWDataChangesLog>)
 {
-  auto strand = asio::make_strand(co_await asio::this_coro::executor);
   co_await asio::co_spawn(
-    strand,
-    [this](const DoutPrefixProvider* dpp)-> asio::awaitable<void, decltype(strand)> {
-      auto ex = co_await boost::asio::this_coro::executor;
+    recovery_strand,
+    [this](const DoutPrefixProvider* dpp)-> asio::awaitable<void, strand_t> {
+      auto ex = recovery_strand;
       auto group = async::spawn_group{ex, static_cast<size_t>(num_shards)};
       for (auto i = 0; i < num_shards; ++i) {
        boost::asio::co_spawn(ex, recover_shard(dpp, i), group);
index 49193b44f617796a71c823247f84f43db60241d7..d47e4e84eda12fb6c5dde650bde04b92e688a321 100644 (file)
@@ -350,23 +350,27 @@ struct hash<BucketGen> {
 };
 }
 
-class RGWDataChangesLog {
+class RGWDataChangesLog
+  : public std::enable_shared_from_this<RGWDataChangesLog> {
   friend class DataLogTestBase;
   friend DataLogBackends;
   CephContext *cct;
-  neorados::RADOS* rados;
-  std::optional<asio::strand<asio::io_context::executor_type>> cancel_strand;
+  std::optional<neorados::RADOS> rados;
   neorados::IOContext loc;
   rgw::BucketChangeObserver *observer = nullptr;
   bool log_data = false;
   std::unique_ptr<DataLogBackends> bes;
 
-  std::shared_ptr<asio::cancellation_signal> renew_signal =
-    std::make_shared<asio::cancellation_signal>();
-  std::shared_ptr<asio::cancellation_signal> watch_signal =
-    std::make_shared<asio::cancellation_signal>();
-  std::shared_ptr<asio::cancellation_signal> recovery_signal =
-    std::make_shared<asio::cancellation_signal>();
+  using executor_t = asio::io_context::executor_type;
+  executor_t executor;
+  using strand_t = asio::strand<executor_t>;
+  strand_t renew_strand{executor};
+  asio::cancellation_signal renew_signal = asio::cancellation_signal();
+  strand_t watch_strand{executor};
+  asio::cancellation_signal watch_signal = asio::cancellation_signal();
+  strand_t recovery_strand{executor};
+  asio::cancellation_signal recovery_signal = asio::cancellation_signal();
+
   ceph::mono_time last_recovery = ceph::mono_clock::zero();
 
   const int num_shards;
@@ -410,7 +414,8 @@ class RGWDataChangesLog {
                      ceph::real_time expiration);
 
   std::optional<asio::steady_timer> renew_timer;
-  asio::awaitable<void> renew_run(decltype(renew_signal) renew_signal);
+  asio::awaitable<void> renew_run(
+    std::shared_ptr<RGWDataChangesLog> renew_signal);
   void renew_stop();
 
   std::function<bool(const rgw_bucket& bucket, optional_yield y,
@@ -425,10 +430,10 @@ class RGWDataChangesLog {
 
 public:
 
-  RGWDataChangesLog(CephContext* cct);
+  RGWDataChangesLog(rgw::sal::RadosStore* driver);
   // For testing.
   RGWDataChangesLog(CephContext* cct, bool log_data,
-                   neorados::RADOS* rados,
+                   neorados::RADOS rados,
                    std::optional<int> num_shards = std::nullopt,
                    std::optional<uint64_t> sem_max_keys = std::nullopt);
   ~RGWDataChangesLog();
@@ -441,13 +446,12 @@ public:
                              bool recovery, bool watch, bool renew);
 
   int start(const DoutPrefixProvider *dpp, const RGWZone* _zone,
-           const RGWZoneParams& zoneparams, rgw::sal::RadosStore* store,
-           bool background_tasks) noexcept;
+           const RGWZoneParams& zoneparams, bool background_tasks) noexcept;
   asio::awaitable<bool> establish_watch(const DoutPrefixProvider* dpp,
                                        std::string_view oid);
   asio::awaitable<void> process_notification(const DoutPrefixProvider* dpp,
                                             std::string_view oid);
-  asio::awaitable<void> watch_loop(decltype(watch_signal));
+  asio::awaitable<void> watch_loop(std::shared_ptr<RGWDataChangesLog>);
   int choose_oid(const rgw_bucket_shard& bs);
   asio::awaitable<void> add_entry(const DoutPrefixProvider *dpp,
                                  const RGWBucketInfo& bucket_info,
@@ -520,7 +524,7 @@ public:
                 bc::flat_map<std::string, uint64_t>&& semcount);
   asio::awaitable<void> recover_shard(const DoutPrefixProvider* dpp, int index);
   asio::awaitable<void> recover(const DoutPrefixProvider* dpp,
-                               decltype(recovery_signal));
+                               std::shared_ptr<RGWDataChangesLog>);
   asio::awaitable<void> shutdown();
   asio::awaitable<void> shutdown_or_timeout();
   void blocking_shutdown();
index bfb2dd3d66c278b9f27e6ed03a9653cd9b3de023..1922202ef93d1892cd24a64133ed14a2651d2c3a 100644 (file)
@@ -1463,12 +1463,7 @@ int RGWRados::init_begin(CephContext* _cct, const DoutPrefixProvider *dpp,
                          const rgw::SiteConfig& site)
 {
   set_context(_cct);
-  int ret = driver->init_neorados(dpp);
-  if (ret < 0) {
-    ldpp_dout(dpp, 0) << "ERROR: failed to initialize neorados (ret=" << cpp_strerror(-ret) << ")" << dendl;
-    return ret;
-  }
-  ret = init_rados();
+  auto ret = init_rados();
   if (ret < 0) {
     ldpp_dout(dpp, 0) << "ERROR: failed to initialize librados (ret=" << cpp_strerror(-ret) << ")" << dendl;
     return ret;
index a8e0a8e22926af9069a8cb6db78c4bb786eab8a0..589b9d0dfe0342d06d4d1f508b5e9bc128c1672f 100644 (file)
@@ -5445,11 +5445,20 @@ int RadosRole::delete_obj(const DoutPrefixProvider *dpp, optional_yield y)
 
 extern "C" {
 
-void* newRadosStore(void* io_context)
+  void* newRadosStore(void* io_context, void* dpp_)
 {
+  auto dpp = static_cast<DoutPrefixProvider*>(dpp_);
   rgw::sal::RadosStore* store = new rgw::sal::RadosStore(
     *static_cast<boost::asio::io_context*>(io_context));
   if (store) {
+    int ret = store->init_neorados(dpp);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "ERROR: failed to initialize neorados (ret=" << cpp_strerror(-ret) << ")" << dendl;
+      delete store;
+      store = nullptr;
+      return store;
+    }
+
     RGWRados* rados = new RGWRados();
 
     if (!rados) {
index 2918df7b62eb9080ca5384a143bbc8bfffb703ee..9e34cbf00f1eb30934931e8d99b86a9dfd733580 100644 (file)
@@ -62,7 +62,7 @@ int RGWServices_Def::init(CephContext *cct,
   bilog_rados = std::make_unique<RGWSI_BILog_RADOS>(cct);
   cls = std::make_unique<RGWSI_Cls>(cct);
   config_key_rados = std::make_unique<RGWSI_ConfigKey_RADOS>(cct);
-  datalog_rados = std::make_unique<RGWDataChangesLog>(cct);
+  datalog_rados = std::make_shared<RGWDataChangesLog>(driver);
   mdlog = std::make_unique<RGWSI_MDLog>(cct, run_sync);
   notify = std::make_unique<RGWSI_Notify>(cct);
   zone = std::make_unique<RGWSI_Zone>(cct);
@@ -137,7 +137,7 @@ int RGWServices_Def::init(CephContext *cct,
 
     r = datalog_rados->start(dpp, &zone->get_zone(),
                             zone->get_zone_params(),
-                            driver, background_tasks);
+                            background_tasks);
     if (r < 0) {
       ldpp_dout(dpp, 0) << "ERROR: failed to start datalog_rados service (" << cpp_strerror(-r) << dendl;
       return r;
index c9cb71c3e308eb9ec3b4466858df49232f54e31e..3d361351946f6f9a959bc6aca6e32d1939bdbc06 100644 (file)
@@ -98,7 +98,7 @@ struct RGWServices_Def
   std::unique_ptr<RGWSI_SysObj_Core> sysobj_core;
   std::unique_ptr<RGWSI_SysObj_Cache> sysobj_cache;
   std::unique_ptr<RGWSI_User_RADOS> user_rados;
-  std::unique_ptr<RGWDataChangesLog> datalog_rados;
+  std::shared_ptr<RGWDataChangesLog> datalog_rados;
   std::unique_ptr<RGWAsyncRadosProcessor> async_processor;
 
   RGWServices_Def();
index acb4f14f352ed011b198e5a227dfbe7dc12b3745..a3646c296c861398253839da5f49afb12140f544 100644 (file)
@@ -48,7 +48,7 @@
 //#define dout_context g_ceph_context
 
 extern "C" {
-extern rgw::sal::Driver* newRadosStore(boost::asio::io_context* io_context);
+extern rgw::sal::Driver* newRadosStore(void* io_context, const void* dpp);
 #ifdef WITH_RADOSGW_DBSTORE
 extern rgw::sal::Driver* newDBStore(CephContext *cct);
 #endif
@@ -87,7 +87,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider*
   rgw::sal::Driver* driver{nullptr};
 
   if (cfg.store_name.compare("rados") == 0) {
-    driver = newRadosStore(&io_context);
+    driver = newRadosStore(&io_context, dpp);
     RGWRados* rados = static_cast<rgw::sal::RadosStore* >(driver)->getRados();
 
     if ((*rados).set_use_cache(use_cache)
@@ -241,7 +241,7 @@ rgw::sal::Driver* DriverManager::init_raw_storage_provider(const DoutPrefixProvi
 {
   rgw::sal::Driver* driver = nullptr;
   if (cfg.store_name.compare("rados") == 0) {
-    driver = newRadosStore(&io_context);
+    driver = newRadosStore(&io_context, dpp);
     RGWRados* rados = static_cast<rgw::sal::RadosStore* >(driver)->getRados();
 
     rados->set_context(cct);
index dd00db7ec38286a9ba43a806cd2e1b69d25391bd..cc873896ca83c4d9463f9ff12b3578377978d70c 100644 (file)
@@ -60,12 +60,12 @@ private:
                                boost::asio::use_awaitable);
   }
 
-  virtual asio::awaitable<std::unique_ptr<RGWDataChangesLog>>
+  virtual asio::awaitable<std::shared_ptr<RGWDataChangesLog>>
   create_datalog() = 0;
 
 protected:
 
-  std::unique_ptr<RGWDataChangesLog> datalog;
+  std::shared_ptr<RGWDataChangesLog> datalog;
 
   neorados::RADOS& rados() noexcept { return *rados_; }
   const std::string& pool_name() const noexcept { return pool_name_; }
@@ -170,7 +170,7 @@ protected:
   }
 
   auto recover(const DoutPrefixProvider* dpp) {
-    return datalog->recover(dpp, nullptr);
+    return datalog->recover(dpp, datalog->shared_from_this());
   }
 
   void add_to_cur_cycle(const BucketGen& bg) {
@@ -207,9 +207,9 @@ public:
 
 class DataLogTest : public DataLogTestBase {
 private:
-  asio::awaitable<std::unique_ptr<RGWDataChangesLog>> create_datalog() override {
-    auto datalog = std::make_unique<RGWDataChangesLog>(rados().cct(), true,
-                                                      &rados());
+  asio::awaitable<std::shared_ptr<RGWDataChangesLog>> create_datalog() override {
+    auto datalog = std::make_shared<RGWDataChangesLog>(rados().cct(), true,
+                                                      rados());
     co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false);
     co_return std::move(datalog);
   }
@@ -217,9 +217,9 @@ private:
 
 class DataLogWatchless : public DataLogTestBase {
 private:
-  asio::awaitable<std::unique_ptr<RGWDataChangesLog>> create_datalog() override {
-    auto datalog = std::make_unique<RGWDataChangesLog>(rados().cct(), true,
-                                                      &rados());
+  asio::awaitable<std::shared_ptr<RGWDataChangesLog>> create_datalog() override {
+    auto datalog = std::make_shared<RGWDataChangesLog>(rados().cct(), true,
+                                                      rados());
     co_await datalog->start(dpp(), rgw_pool(pool_name()), false, false, false);
     co_return std::move(datalog);
   }
@@ -227,11 +227,11 @@ private:
 
 class DataLogBulky : public DataLogTestBase {
 private:
-  asio::awaitable<std::unique_ptr<RGWDataChangesLog>> create_datalog() override {
+  asio::awaitable<std::shared_ptr<RGWDataChangesLog>> create_datalog() override {
     // Decrease max push/list and force everything into one shard so we
     // can test iterated increment/decrement/list code.
-    auto datalog = std::make_unique<RGWDataChangesLog>(rados().cct(), true,
-                                                      &rados(), 1, 7);
+    auto datalog = std::make_shared<RGWDataChangesLog>(rados().cct(), true,
+                                                      rados(), 1, 7);
     co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false);
     co_return std::move(datalog);
   }