]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multi: Give tasks a reference to RGWDataChangesLog
authorAdam C. Emerson <aemerson@redhat.com>
Fri, 25 Apr 2025 21:40:05 +0000 (17:40 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 5 Aug 2025 20:18:36 +0000 (16:18 -0400)
Also run them in strands. Also `datalog_rados` is a `shared_ptr`,
now. Probably make it intrusive later.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/include/neorados/RADOS.hpp
src/rgw/driver/rados/rgw_datalog.cc
src/rgw/driver/rados/rgw_datalog.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_service.cc
src/rgw/driver/rados/rgw_service.h
src/rgw/rgw_sal.cc
src/test/rgw/test_datalog.cc

index c58cf5a9fd73355de3ad7db4268c0a6d91a65a35..8986f3948c032868674c097e36c1187a1a263ad0 100644 (file)
@@ -1386,6 +1386,10 @@ public:
   executor_type get_executor() const;
   boost::asio::io_context& get_io_context();
 
+  operator bool() {
+    return bool(impl);
+  }
+
 private:
   template<typename CompletionToken>
   auto consign(CompletionToken&& token) {
index d68ce48aec794a22cfdaad40b4b550aa4c2c370f..9aaec5d16fc7326f7cf092f00fa46e8379aa058b 100644 (file)
@@ -17,7 +17,6 @@
 
 #include <boost/system/system_error.hpp>
 
-#include "common/async/parallel_for_each.h"
 #include "include/fs_types.h"
 #include "include/neorados/RADOS.hpp"
 
@@ -356,17 +355,18 @@ public:
   }
 };
 
-RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
-  : cct(cct),
+RGWDataChangesLog::RGWDataChangesLog(rgw::sal::RadosStore* driver)
+  : cct(driver->ctx()), rados(driver->get_neorados()),
+    executor(driver->get_io_context().get_executor()),
     num_shards(cct->_conf->rgw_data_log_num_shards),
     prefix(get_prefix()),
     changes(cct->_conf->rgw_data_log_changes_size) {}
 
 RGWDataChangesLog::RGWDataChangesLog(CephContext *cct, bool log_data,
-                                     neorados::RADOS *rados,
+                                     neorados::RADOS rados,
                                      std::optional<int> num_shards,
                                      std::optional<uint64_t> sem_max_keys)
-    : cct(cct), rados(rados), log_data(log_data),
+  : cct(cct), rados(rados), log_data(log_data), executor(rados.get_executor()), 
       num_shards(num_shards ? *num_shards :
                 cct->_conf->rgw_data_log_num_shards),
       prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size),
@@ -438,15 +438,13 @@ void DataLogBackends::handle_empty_to(uint64_t new_tail) {
 int RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
                             const RGWZone* zone,
                             const RGWZoneParams& zoneparams,
-                            rgw::sal::RadosStore* store,
                             bool background_tasks)
 {
   log_data = zone->log_data;
-  rados = &store->get_neorados();
   try {
     // Blocking in startup code, not ideal, but won't hurt anything.
     std::exception_ptr eptr
-      = asio::co_spawn(store->get_io_context(),
+      = asio::co_spawn(executor,
                       start(dpp, zoneparams.log_pool,
                             background_tasks, background_tasks,
                             background_tasks),
@@ -476,7 +474,6 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
                         bool renew)
 {
   down_flag = false;
-  cancel_strand = asio::make_strand(rados->get_executor());
   ran_background = (recovery || watch || renew);
 
   auto defbacking = to_log_type(
@@ -512,10 +509,10 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
 
   if (renew) {
     asio::co_spawn(
-      co_await asio::this_coro::executor,
-      renew_run(renew_signal),
-      asio::bind_cancellation_slot(renew_signal->slot(),
-                                  asio::bind_executor(*cancel_strand,
+      renew_strand,
+      renew_run(shared_from_this()),
+      asio::bind_cancellation_slot(renew_signal.slot(),
+                                  asio::bind_executor(renew_strand,
                                                       asio::detached)));
   }
   if (watch) {
@@ -527,20 +524,20 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
                              "Unable to establish recovery watch!"};
     }
     asio::co_spawn(
-      co_await asio::this_coro::executor,
-      watch_loop(watch_signal),
-      asio::bind_cancellation_slot(watch_signal->slot(),
-                                  asio::bind_executor(*cancel_strand,
+      watch_strand,
+      watch_loop(shared_from_this()),
+      asio::bind_cancellation_slot(watch_signal.slot(),
+                                  asio::bind_executor(watch_strand,
                                                       asio::detached)));
   }
   if (recovery) {
     // Recovery can run concurrent with normal operation, so we don't
     // have to block startup while we do all that I/O.
     asio::co_spawn(
-      co_await asio::this_coro::executor,
-      recover(dpp, recovery_signal),
-      asio::bind_cancellation_slot(recovery_signal->slot(),
-                                  asio::bind_executor(*cancel_strand,
+      recovery_strand,
+      recover(dpp, shared_from_this()),
+      asio::bind_cancellation_slot(recovery_signal.slot(),
+                                  asio::bind_executor(recovery_strand,
                                                       asio::detached)));
   }
   co_return;
@@ -667,7 +664,9 @@ RGWDataChangesLog::process_notification(const DoutPrefixProvider* dpp,
   }
 }
 
-asio::awaitable<void> RGWDataChangesLog::watch_loop(decltype(watch_signal)) {
+asio::awaitable<void>
+RGWDataChangesLog::watch_loop(std::shared_ptr<RGWDataChangesLog>)
+{
   const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: ");
   const auto oid = get_sem_set_oid(0);
   bool need_rewatch = false;
@@ -1363,21 +1362,18 @@ asio::awaitable<void> RGWDataChangesLog::shutdown() {
   }
   renew_stop();
   // Revisit this later
-  if (renew_signal)
-    asio::dispatch(*cancel_strand,
-                  [this]() {
-                    renew_signal->emit(asio::cancellation_type::terminal);
-                  });
-  if (recovery_signal)
-    asio::dispatch(*cancel_strand,
-                  [this]() {
-                    recovery_signal->emit(asio::cancellation_type::terminal);
-                  });
-  if (watch_signal)
-    asio::dispatch(*cancel_strand,
-                  [this]() {
-                    watch_signal->emit(asio::cancellation_type::terminal);
-                  });
+  asio::dispatch(renew_strand,
+                [this]() {
+                  renew_signal.emit(asio::cancellation_type::terminal);
+                });
+  asio::dispatch(recovery_strand,
+                [this]() {
+                  recovery_signal.emit(asio::cancellation_type::terminal);
+                });
+  asio::dispatch(watch_strand,
+                [this]() {
+                  watch_signal.emit(asio::cancellation_type::terminal);
+                });
   if (watchcookie && rados->check_watch(watchcookie)) {
     auto wc = watchcookie;
     watchcookie = 0;
@@ -1390,15 +1386,6 @@ asio::awaitable<void> RGWDataChangesLog::shutdown_or_timeout() {
   using namespace asio::experimental::awaitable_operators;
   asio::steady_timer t(co_await asio::this_coro::executor, 3s);
   co_await (shutdown() || t.async_wait(asio::use_awaitable));
-  if (renew_signal) {
-    renew_signal->emit(asio::cancellation_type::terminal);
-  }
-  if (recovery_signal) {
-    recovery_signal->emit(asio::cancellation_type::terminal);
-  }
-  if (watch_signal) {
-    watch_signal->emit(asio::cancellation_type::terminal);
-  }
 }
 
 RGWDataChangesLog::~RGWDataChangesLog() {
@@ -1429,7 +1416,8 @@ void RGWDataChangesLog::blocking_shutdown() {
   }
 }
 
-asio::awaitable<void> RGWDataChangesLog::renew_run(decltype(renew_signal)) {
+asio::awaitable<void> RGWDataChangesLog::renew_run(
+  std::shared_ptr<RGWDataChangesLog>) {
   static constexpr auto runs_per_prune = 150;
   auto run = 0;
   renew_timer.emplace(co_await asio::this_coro::executor);
@@ -1733,14 +1721,14 @@ RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index)
   co_return;
 }
 
-asio::awaitable<void> RGWDataChangesLog::recover(const DoutPrefixProvider* dpp,
-                                                decltype(recovery_signal))
+asio::awaitable<void> RGWDataChangesLog::recover(
+  const DoutPrefixProvider* dpp,
+  std::shared_ptr<RGWDataChangesLog>)
 {
-  auto strand = asio::make_strand(co_await asio::this_coro::executor);
   co_await asio::co_spawn(
-    strand,
-    [this](const DoutPrefixProvider* dpp)-> asio::awaitable<void, decltype(strand)> {
-      auto ex = co_await boost::asio::this_coro::executor;
+    recovery_strand,
+    [this](const DoutPrefixProvider* dpp)-> asio::awaitable<void, strand_t> {
+      auto ex = recovery_strand;
       auto group = async::spawn_group{ex, static_cast<size_t>(num_shards)};
       for (auto i = 0; i < num_shards; ++i) {
        boost::asio::co_spawn(ex, recover_shard(dpp, i), group);
index 161b2d91776e0948eff42e857ba03aecbf145f76..e1846ba505e0e442206f7c04677fdabcf4554569 100644 (file)
@@ -350,23 +350,27 @@ struct hash<BucketGen> {
 };
 }
 
-class RGWDataChangesLog {
+class RGWDataChangesLog
+  : public std::enable_shared_from_this<RGWDataChangesLog> {
   friend class DataLogTestBase;
   friend DataLogBackends;
   CephContext *cct;
-  neorados::RADOS* rados;
-  std::optional<asio::strand<asio::io_context::executor_type>> cancel_strand;
+  std::optional<neorados::RADOS> rados;
   neorados::IOContext loc;
   rgw::BucketChangeObserver *observer = nullptr;
   bool log_data = false;
   std::unique_ptr<DataLogBackends> bes;
 
-  std::shared_ptr<asio::cancellation_signal> renew_signal =
-    std::make_shared<asio::cancellation_signal>();
-  std::shared_ptr<asio::cancellation_signal> watch_signal =
-    std::make_shared<asio::cancellation_signal>();
-  std::shared_ptr<asio::cancellation_signal> recovery_signal =
-    std::make_shared<asio::cancellation_signal>();
+  using executor_t = asio::io_context::executor_type;
+  executor_t executor;
+  using strand_t = asio::strand<executor_t>;
+  strand_t renew_strand{executor};
+  asio::cancellation_signal renew_signal = asio::cancellation_signal();
+  strand_t watch_strand{executor};
+  asio::cancellation_signal watch_signal = asio::cancellation_signal();
+  strand_t recovery_strand{executor};
+  asio::cancellation_signal recovery_signal = asio::cancellation_signal();
+
   ceph::mono_time last_recovery = ceph::mono_clock::zero();
 
   const int num_shards;
@@ -410,7 +414,8 @@ class RGWDataChangesLog {
                      ceph::real_time expiration);
 
   std::optional<asio::steady_timer> renew_timer;
-  asio::awaitable<void> renew_run(decltype(renew_signal) renew_signal);
+  asio::awaitable<void> renew_run(
+    std::shared_ptr<RGWDataChangesLog> renew_signal);
   void renew_stop();
 
   std::function<bool(const rgw_bucket& bucket, optional_yield y,
@@ -425,10 +430,10 @@ class RGWDataChangesLog {
 
 public:
 
-  RGWDataChangesLog(CephContext* cct);
+  RGWDataChangesLog(rgw::sal::RadosStore* driver);
   // For testing.
   RGWDataChangesLog(CephContext* cct, bool log_data,
-                   neorados::RADOS* rados,
+                   neorados::RADOS rados,
                    std::optional<int> num_shards = std::nullopt,
                    std::optional<uint64_t> sem_max_keys = std::nullopt);
   ~RGWDataChangesLog();
@@ -441,13 +446,12 @@ public:
                              bool recovery, bool watch, bool renew);
 
   int start(const DoutPrefixProvider *dpp, const RGWZone* _zone,
-           const RGWZoneParams& zoneparams, rgw::sal::RadosStore* store,
-           bool background_tasks);
+           const RGWZoneParams& zoneparams, bool background_tasks);
   asio::awaitable<bool> establish_watch(const DoutPrefixProvider* dpp,
                                        std::string_view oid);
   asio::awaitable<void> process_notification(const DoutPrefixProvider* dpp,
                                             std::string_view oid);
-  asio::awaitable<void> watch_loop(decltype(watch_signal));
+  asio::awaitable<void> watch_loop(std::shared_ptr<RGWDataChangesLog>);
   int choose_oid(const rgw_bucket_shard& bs);
   asio::awaitable<void> add_entry(const DoutPrefixProvider *dpp,
                                  const RGWBucketInfo& bucket_info,
@@ -529,7 +533,7 @@ public:
                 bc::flat_map<std::string, uint64_t>&& semcount);
   asio::awaitable<void> recover_shard(const DoutPrefixProvider* dpp, int index);
   asio::awaitable<void> recover(const DoutPrefixProvider* dpp,
-                               decltype(recovery_signal));
+                               std::shared_ptr<RGWDataChangesLog>);
   asio::awaitable<void> shutdown();
   asio::awaitable<void> shutdown_or_timeout();
   void blocking_shutdown();
index 4ab7fe06e94f2343f6d1b39c3d1403ebda14f746..cfd897aabc014f0e95ba183f52870c28c417adaa 100644 (file)
@@ -1467,12 +1467,7 @@ int RGWRados::init_begin(CephContext* _cct, const DoutPrefixProvider *dpp,
        const rgw::SiteConfig& site, rgw::sal::ConfigStore* cfgstore)
 {
   set_context(_cct);
-  int ret = driver->init_neorados(dpp);
-  if (ret < 0) {
-    ldpp_dout(dpp, 0) << "ERROR: failed to initialize neorados (ret=" << cpp_strerror(-ret) << ")" << dendl;
-    return ret;
-  }
-  ret = init_rados();
+  auto ret = init_rados();
   if (ret < 0) {
     ldpp_dout(dpp, 0) << "ERROR: failed to initialize librados (ret=" << cpp_strerror(-ret) << ")" << dendl;
     return ret;
index b8c1d7c26d21e2195a68ef86e826ea51613af949..2221d30f7efbc0fe9703e8f09994b85d23575c49 100644 (file)
@@ -5483,11 +5483,20 @@ int RadosRole::delete_obj(const DoutPrefixProvider *dpp, optional_yield y)
 
 extern "C" {
 
-void* newRadosStore(void* io_context)
+  void* newRadosStore(void* io_context, void* dpp_)
 {
+  auto dpp = static_cast<DoutPrefixProvider*>(dpp_);
   rgw::sal::RadosStore* store = new rgw::sal::RadosStore(
     *static_cast<boost::asio::io_context*>(io_context));
   if (store) {
+    int ret = store->init_neorados(dpp);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "ERROR: failed to initialize neorados (ret=" << cpp_strerror(-ret) << ")" << dendl;
+      delete store;
+      store = nullptr;
+      return store;
+    }
+
     RGWRados* rados = new RGWRados();
 
     if (!rados) {
index b1c9a40bf0e13bb29a23808c2a301f555e879fef..23d1473badca712c90cadc549ee82a064e1afde5 100644 (file)
@@ -64,7 +64,7 @@ int RGWServices_Def::init(CephContext *cct,
   bilog_rados = std::make_unique<RGWSI_BILog_RADOS>(cct);
   cls = std::make_unique<RGWSI_Cls>(cct);
   config_key_rados = std::make_unique<RGWSI_ConfigKey_RADOS>(cct);
-  datalog_rados = std::make_unique<RGWDataChangesLog>(cct);
+  datalog_rados = std::make_shared<RGWDataChangesLog>(driver);
   mdlog = std::make_unique<RGWSI_MDLog>(cct, run_sync, cfgstore);
   notify = std::make_unique<RGWSI_Notify>(cct);
   zone = std::make_unique<RGWSI_Zone>(cct, cfgstore, site);
@@ -139,7 +139,7 @@ int RGWServices_Def::init(CephContext *cct,
 
     r = datalog_rados->start(dpp, &zone->get_zone(),
                             zone->get_zone_params(),
-                            driver, background_tasks);
+                            background_tasks);
     if (r < 0) {
       ldpp_dout(dpp, 0) << "ERROR: failed to start datalog_rados service (" << cpp_strerror(-r) << dendl;
       return r;
index e4d3df4f911051bb012878b1a8b157c42060ae5e..06643cae543fbbde5ecc7cb5cd9ce1dd98272718 100644 (file)
@@ -101,7 +101,7 @@ struct RGWServices_Def
   std::unique_ptr<RGWSI_SysObj_Core> sysobj_core;
   std::unique_ptr<RGWSI_SysObj_Cache> sysobj_cache;
   std::unique_ptr<RGWSI_User_RADOS> user_rados;
-  std::unique_ptr<RGWDataChangesLog> datalog_rados;
+  std::shared_ptr<RGWDataChangesLog> datalog_rados;
   std::unique_ptr<RGWAsyncRadosProcessor> async_processor;
 
   RGWServices_Def();
index fe85704d3c8447c877293e0a0354b881982cdabe..5e7e20a796bf9f65fa82129d85eebab9219da84f 100644 (file)
@@ -49,7 +49,7 @@
 
 extern "C" {
 #ifdef WITH_RADOSGW_RADOS
-extern rgw::sal::Driver* newRadosStore(boost::asio::io_context* io_context);
+extern rgw::sal::Driver* newRadosStore(void* io_context, const void* dpp);
 #endif
 #ifdef WITH_RADOSGW_DBSTORE
 extern rgw::sal::Driver* newDBStore(CephContext *cct);
@@ -90,7 +90,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider*
 
   if (cfg.store_name.compare("rados") == 0) {
 #ifdef WITH_RADOSGW_RADOS
-    driver = newRadosStore(&io_context);
+    driver = newRadosStore(&io_context, dpp);
     RGWRados* rados = static_cast<rgw::sal::RadosStore* >(driver)->getRados();
 
     if ((*rados).set_use_cache(use_cache)
@@ -248,7 +248,7 @@ rgw::sal::Driver* DriverManager::init_raw_storage_provider(const DoutPrefixProvi
   rgw::sal::Driver* driver = nullptr;
   if (cfg.store_name.compare("rados") == 0) {
 #ifdef WITH_RADOSGW_RADOS
-    driver = newRadosStore(&io_context);
+    driver = newRadosStore(&io_context, dpp);
     RGWRados* rados = static_cast<rgw::sal::RadosStore* >(driver)->getRados();
 
     rados->set_context(cct);
index 2fea326c81a620af571df5dadcf2853c47bba568..a125019c269605349315c888b629a73b1bd30788 100644 (file)
@@ -60,12 +60,12 @@ private:
                                boost::asio::use_awaitable);
   }
 
-  virtual asio::awaitable<std::unique_ptr<RGWDataChangesLog>>
+  virtual asio::awaitable<std::shared_ptr<RGWDataChangesLog>>
   create_datalog() = 0;
 
 protected:
 
-  std::unique_ptr<RGWDataChangesLog> datalog;
+  std::shared_ptr<RGWDataChangesLog> datalog;
 
   neorados::RADOS& rados() noexcept { return *rados_; }
   const std::string& pool_name() const noexcept { return pool_name_; }
@@ -170,7 +170,7 @@ protected:
   }
 
   auto recover(const DoutPrefixProvider* dpp) {
-    return datalog->recover(dpp, nullptr);
+    return datalog->recover(dpp, datalog->shared_from_this());
   }
 
   void add_to_cur_cycle(const BucketGen& bg) {
@@ -207,9 +207,9 @@ public:
 
 class DataLogTest : public DataLogTestBase {
 private:
-  asio::awaitable<std::unique_ptr<RGWDataChangesLog>> create_datalog() override {
-    auto datalog = std::make_unique<RGWDataChangesLog>(rados().cct(), true,
-                                                      &rados());
+  asio::awaitable<std::shared_ptr<RGWDataChangesLog>> create_datalog() override {
+    auto datalog = std::make_shared<RGWDataChangesLog>(rados().cct(), true,
+                                                      rados());
     co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false);
     co_return std::move(datalog);
   }
@@ -217,9 +217,9 @@ private:
 
 class DataLogWatchless : public DataLogTestBase {
 private:
-  asio::awaitable<std::unique_ptr<RGWDataChangesLog>> create_datalog() override {
-    auto datalog = std::make_unique<RGWDataChangesLog>(rados().cct(), true,
-                                                      &rados());
+  asio::awaitable<std::shared_ptr<RGWDataChangesLog>> create_datalog() override {
+    auto datalog = std::make_shared<RGWDataChangesLog>(rados().cct(), true,
+                                                      rados());
     co_await datalog->start(dpp(), rgw_pool(pool_name()), false, false, false);
     co_return std::move(datalog);
   }
@@ -227,11 +227,11 @@ private:
 
 class DataLogBulky : public DataLogTestBase {
 private:
-  asio::awaitable<std::unique_ptr<RGWDataChangesLog>> create_datalog() override {
+  asio::awaitable<std::shared_ptr<RGWDataChangesLog>> create_datalog() override {
     // Decrease max push/list and force everything into one shard so we
     // can test iterated increment/decrement/list code.
-    auto datalog = std::make_unique<RGWDataChangesLog>(rados().cct(), true,
-                                                      &rados(), 1, 7);
+    auto datalog = std::make_shared<RGWDataChangesLog>(rados().cct(), true,
+                                                      rados(), 1, 7);
     co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false);
     co_return std::move(datalog);
   }