]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite/datalog: Parallelize recovery
authorAdam Emerson <aemerson@redhat.com>
Mon, 29 Apr 2024 23:49:16 +0000 (19:49 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Apr 2025 15:10:14 +0000 (11:10 -0400)
Signed-off-by: Adam Emerson <aemerson@redhat.com>
src/rgw/driver/rados/rgw_datalog.cc
src/rgw/driver/rados/rgw_datalog.h

index 31ef9c350f428aa1729acb844fa3df2d5bfe41ba..c292d724d1f31e31d4791683e48128146b1b9974 100644 (file)
 #include <boost/container/flat_map.hpp>
 #include <boost/system/system_error.hpp>
 
+#include "common/async/parallel_for_each.h"
 #include "include/fs_types.h"
 #include "include/neorados/RADOS.hpp"
 
 #include "common/async/blocked_completion.h"
+#include "common/async/co_throttle.h"
 #include "common/async/librados_completion.h"
 #include "common/async/yield_context.h"
 
@@ -41,6 +43,9 @@ static constexpr auto dout_subsys = ceph_subsys_rgw;
 
 using namespace std::literals;
 
+namespace ranges = std::ranges;
+namespace views = ranges::views;
+
 namespace sys = boost::system;
 
 namespace nlog = ::neorados::cls::log;
@@ -1529,7 +1534,7 @@ RGWDataChangesLog::synthesize_entries(
   } catch (const std::exception& e) {
     push_failed = true;
     ldpp_dout(dpp, 5) << "RGWDataChangesLog::synthesize_entries(): Backend push "
-                     << "failed with exception: " << e.what() << dendl;
+                     << " failed with exception: " << e.what() << dendl;
   }
   co_return !push_failed;
 }
@@ -1597,40 +1602,55 @@ RGWDataChangesLog::decrement_sems(
 }
 
 asio::awaitable<void>
-RGWDataChangesLog::recover(const DoutPrefixProvider* dpp,
-                          decltype(recovery_signal)) {
-  using neorados::WriteOp;
-  bc::flat_map<uint64_t, bc::flat_map<std::string, uint64_t>> semcounts;
-  for (int index = 0; index < num_shards; ++index) {
-    // Gather entries in the shard
-    auto& semcount = semcounts[index];
-    co_await read_all_sems(index, &semcount);
-    // If we have none, no point doing the rest
-    if (semcount.empty()) {
-      continue;
-    }
-    // Synthesize entries to push
-    auto pushed = co_await synthesize_entries(dpp, index, semcount);
-    if (!pushed) {
-      // If pushing failed, don't decrement any semaphores
-      ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Pushing shard "
-                       << index << " failed, skipping decrement" << dendl;
-      continue;
-    }
+RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index)
+{
+  bc::flat_map<std::string, uint64_t> semcount;
 
-    // Check with other running RGWs, make sure not to decrement
-    // anything they have in flight. This doesn't cause an issue for
-    // partial upgrades, since older versions won't be using the
-    // semaphores at all.
-    auto notified = co_await gather_working_sets(dpp, index, semcount);
-    if (!notified) {
-      ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Gathering "
-                       << "working sets for shard " << index
-                       << " failed, skipping decrement" << dendl;
-      continue;
-    }
-    co_await decrement_sems(index, std::move(semcount));
+  // Gather entries in the shard
+  co_await read_all_sems(index, &semcount);
+  // If we have none, no point doing the rest
+  if (semcount.empty()) {
+    co_return;
+  }
+  // Synthesize entries to push
+  auto pushed = co_await synthesize_entries(dpp, index, semcount);
+  if (!pushed) {
+    // If pushing failed, don't decrement any semaphores
+    ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Pushing shard "
+                     << index << " failed, skipping decrement" << dendl;
+    co_return;
   }
+
+  // Check with other running RGWs, make sure not to decrement
+  // anything they have in flight. This doesn't cause an issue for
+  // partial upgrades, since older versions won't be using the
+  // semaphores at all.
+  auto notified = co_await gather_working_sets(dpp, index, semcount);
+  if (!notified) {
+    ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Gathering "
+                     << "working sets for shard " << index
+                     << "failed, skipping decrement" << dendl;
+    co_return;
+  }
+  co_await decrement_sems(index, std::move(semcount));
+  co_return;
+}
+
+asio::awaitable<void> RGWDataChangesLog::recover(const DoutPrefixProvider* dpp,
+                                                decltype(recovery_signal))
+{
+  auto strand = asio::make_strand(co_await asio::this_coro::executor);
+  co_await asio::co_spawn(
+    strand,
+    [this](const DoutPrefixProvider* dpp)-> asio::awaitable<void, decltype(strand)> {
+      auto ex = co_await boost::asio::this_coro::executor;
+      auto group = async::spawn_group{ex, static_cast<size_t>(num_shards)};
+      for (auto i = 0; i < num_shards; ++i) {
+       boost::asio::co_spawn(ex, recover_shard(dpp, i), group);
+      }
+      co_await group.wait();
+    }(dpp),
+    asio::use_awaitable);
 }
 
 void RGWDataChangesLogInfo::dump(Formatter *f) const
index a97e4f381a4d90851c7874bed8c1547214402297..f690d0010b5940f57b1dbbeaaff2a39e7fb36bd5 100644 (file)
@@ -502,6 +502,7 @@ public:
   asio::awaitable<void>
   decrement_sems(int index,
                 bc::flat_map<std::string, uint64_t>&& semcount);
+  asio::awaitable<void> recover_shard(const DoutPrefixProvider* dpp, int index);
   asio::awaitable<void> recover(const DoutPrefixProvider* dpp,
                                decltype(recovery_signal));
 };