]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite/datalog: Orderly shutdown
authorAdam Emerson <aemerson@redhat.com>
Wed, 1 May 2024 15:41:55 +0000 (11:41 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Apr 2025 15:10:14 +0000 (11:10 -0400)
Signed-off-by: Adam Emerson <aemerson@redhat.com>
src/rgw/driver/rados/rgw_datalog.cc
src/rgw/driver/rados/rgw_datalog.h
src/rgw/driver/rados/rgw_sal_rados.cc

index c292d724d1f31e31d4791683e48128146b1b9974..f3a2b59fcbb4b2e51e16c799c12e67c7ab52e780 100644 (file)
 #include <boost/asio/awaitable.hpp>
 #include <boost/asio/bind_cancellation_slot.hpp>
 #include <boost/asio/co_spawn.hpp>
+#include <boost/asio/experimental/awaitable_operators.hpp>
+
 #include <boost/container/flat_set.hpp>
 #include <boost/container/flat_map.hpp>
+
 #include <boost/system/system_error.hpp>
 
 #include "common/async/parallel_for_each.h"
@@ -467,9 +470,9 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
                         bool watch,
                         bool renew)
 {
-  if (!log_data) {
-    co_return;
-  }
+  down_flag = false;
+  cancel_strand = asio::make_strand(rados->get_executor());
+
   auto defbacking = to_log_type(
     cct->_conf.get_val<std::string>("rgw_default_data_log_backing"));
   // Should be guaranteed by `set_enum_allowed`
@@ -497,11 +500,17 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
     throw;
   }
 
+  if (!log_data) {
+    co_return;
+  }
+
   if (renew) {
-    asio::co_spawn(co_await asio::this_coro::executor,
-                   renew_run(renew_signal),
-                   asio::bind_cancellation_slot(renew_signal->slot(),
-                                                asio::detached));
+    asio::co_spawn(
+      co_await asio::this_coro::executor,
+      renew_run(renew_signal),
+      asio::bind_cancellation_slot(renew_signal->slot(),
+                                  asio::bind_executor(*cancel_strand,
+                                                      asio::detached)));
   }
   if (watch) {
     // Establish watch here so we won't be 'started up' until we're watching.
@@ -511,18 +520,22 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
       throw sys::system_error{ENOTCONN, sys::generic_category(),
                              "Unable to establish recovery watch!"};
     }
-    asio::co_spawn(co_await asio::this_coro::executor,
-                   watch_loop(watch_signal),
-                   asio::bind_cancellation_slot(watch_signal->slot(),
-                                                asio::detached));
+    asio::co_spawn(
+      co_await asio::this_coro::executor,
+      watch_loop(watch_signal),
+      asio::bind_cancellation_slot(watch_signal->slot(),
+                                  asio::bind_executor(*cancel_strand,
+                                                      asio::detached)));
   }
   if (recovery) {
     // Recovery can run concurrent with normal operation, so we don't
     // have to block startup while we do all that I/O.
-    asio::co_spawn(co_await asio::this_coro::executor,
-                   recover(dpp, recovery_signal),
-                   asio::bind_cancellation_slot(recovery_signal->slot(),
-                                                asio::detached));
+    asio::co_spawn(
+      co_await asio::this_coro::executor,
+      recover(dpp, recovery_signal),
+      asio::bind_cancellation_slot(recovery_signal->slot(),
+                                  asio::bind_executor(*cancel_strand,
+                                                      asio::detached)));
   }
   co_return;
 }
@@ -663,7 +676,8 @@ asio::awaitable<void> RGWDataChangesLog::watch_loop(decltype(watch_signal)) {
                           << "recovery won't decrement semaphores." << dendl;
        continue;
       }
-      if (going_down()) {
+      if (going_down() || e.code() == asio::error::operation_aborted){
+       need_rewatch = false;
        break;
       } else {
        need_rewatch = true;
@@ -1328,23 +1342,78 @@ bool RGWDataChangesLog::going_down() const
   return down_flag;
 }
 
-RGWDataChangesLog::~RGWDataChangesLog() {
-  shutdown();
-}
-
-void RGWDataChangesLog::shutdown() {
+asio::awaitable<void> RGWDataChangesLog::shutdown() {
+  DoutPrefix dp{cct, ceph_subsys_rgw, "Datalog Shutdown"};
   if (down_flag) {
-    return;
+    co_return;
   }
   down_flag = true;
   renew_stop();
   // Revisit this later
   if (renew_signal)
-    renew_signal->emit(asio::cancellation_type::terminal);
-  if (recovery_signal)
-    recovery_signal->emit(asio::cancellation_type::terminal);
+    asio::dispatch(*cancel_strand,
+                  [this]() {
+                    renew_signal->emit(asio::cancellation_type::terminal);
+                  });
   if (recovery_signal)
+    asio::dispatch(*cancel_strand,
+                  [this]() {
+                    recovery_signal->emit(asio::cancellation_type::terminal);
+                  });
+  if (watch_signal)
+    asio::dispatch(*cancel_strand,
+                  [this]() {
+                    watch_signal->emit(asio::cancellation_type::terminal);
+                  });
+  if (watchcookie && rados->check_watch(watchcookie)) {
+    auto wc = watchcookie;
+    watchcookie = 0;
+    co_await rados->unwatch(wc, loc, asio::use_awaitable);
+  }
+  co_await renew_entries(&dp);
+}
+
+asio::awaitable<void> RGWDataChangesLog::shutdown_or_timeout() {
+  using namespace asio::experimental::awaitable_operators;
+  asio::steady_timer t(co_await asio::this_coro::executor, 3s);
+  co_await (shutdown() || t.async_wait(asio::use_awaitable));
+  if (renew_signal) {
+    renew_signal->emit(asio::cancellation_type::terminal);
+  }
+  if (recovery_signal) {
     recovery_signal->emit(asio::cancellation_type::terminal);
+  }
+  if (watch_signal) {
+    watch_signal->emit(asio::cancellation_type::terminal);
+  }
+}
+
+RGWDataChangesLog::~RGWDataChangesLog() {
+  if (log_data && !down_flag) {
+    lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+              << ": RGWDataChangesLog destructed without dhutdown." << dendl;
+  }
+}
+
+void RGWDataChangesLog::blocking_shutdown() {
+  if (!down_flag) {
+    try {
+      auto eptr = asio::co_spawn(rados->get_io_context(),
+                                shutdown_or_timeout(),
+                                async::use_blocked);
+      if (eptr) {
+       std::rethrow_exception(eptr);
+      }
+    } catch (const sys::system_error& e) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": Failed to shutting down: " << e.what()
+                << dendl;
+    } catch (const std::exception& e) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": Failed to shutting down: " << e.what()
+                << dendl;
+    }
+  }
 }
 
 asio::awaitable<void> RGWDataChangesLog::renew_run(decltype(renew_signal)) {
index f690d0010b5940f57b1dbbeaaff2a39e7fb36bd5..588a79fb76c8d9af29286218b1a8888947b3165b 100644 (file)
@@ -337,10 +337,11 @@ inline bool operator <(const BucketGen& l, const BucketGen& r) {
 }
 
 class RGWDataChangesLog {
-  friend class DataLogTest;
+  friend class DataLogTestBase;
   friend DataLogBackends;
   CephContext *cct;
   neorados::RADOS* rados;
+  std::optional<asio::strand<asio::io_context::executor_type>> cancel_strand;
   neorados::IOContext loc;
   rgw::BucketChangeObserver *observer = nullptr;
   bool log_data = false;
@@ -364,7 +365,7 @@ class RGWDataChangesLog {
   std::shared_mutex modified_lock;
   bc::flat_map<int, bc::flat_set<rgw_data_notify_entry>> modified_shards;
 
-  std::atomic<bool> down_flag = { false };
+  std::atomic<bool> down_flag = { true };
 
   struct ChangeStatus {
     std::shared_ptr<const rgw_sync_policy_info> sync_policy;
@@ -489,7 +490,6 @@ public:
   int trim_generations(const DoutPrefixProvider *dpp,
                       std::optional<uint64_t>& through,
                       optional_yield y);
-  void shutdown();
   asio::awaitable<void> read_all_sems(int index,
                                      bc::flat_map<std::string, uint64_t>* out);
   asio::awaitable<bool>
@@ -505,6 +505,9 @@ public:
   asio::awaitable<void> recover_shard(const DoutPrefixProvider* dpp, int index);
   asio::awaitable<void> recover(const DoutPrefixProvider* dpp,
                                decltype(recovery_signal));
+  asio::awaitable<void> shutdown();
+  asio::awaitable<void> shutdown_or_timeout();
+  void blocking_shutdown();
 };
 
 class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {
index 5e82d5b0a2306ce11f6f9f51616e2af48d3a018d..c7097ae69bd15f2312bda65575b3dfe4811d1385 100644 (file)
@@ -2286,7 +2286,7 @@ int RadosStore::meta_remove(const DoutPrefixProvider* dpp, std::string& metadata
 }
 
 void RadosStore::shutdown(void) {
-  svc()->datalog_rados->shutdown();
+  svc()->datalog_rados->blocking_shutdown();
   return;
 }