From: Adam Emerson Date: Mon, 29 Apr 2024 23:49:16 +0000 (-0400) Subject: rgw/multisite/datalog: Parallelize recovery X-Git-Tag: v20.3.0~169^2~11 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=65ebb7cd2866edb2a8e8c7550b240abbb046a2ba;p=ceph.git rgw/multisite/datalog: Parallelize recovery Signed-off-by: Adam Emerson --- diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc index 31ef9c350f428..c292d724d1f31 100644 --- a/src/rgw/driver/rados/rgw_datalog.cc +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -14,10 +14,12 @@ #include #include +#include "common/async/parallel_for_each.h" #include "include/fs_types.h" #include "include/neorados/RADOS.hpp" #include "common/async/blocked_completion.h" +#include "common/async/co_throttle.h" #include "common/async/librados_completion.h" #include "common/async/yield_context.h" @@ -41,6 +43,9 @@ static constexpr auto dout_subsys = ceph_subsys_rgw; using namespace std::literals; +namespace ranges = std::ranges; +namespace views = ranges::views; + namespace sys = boost::system; namespace nlog = ::neorados::cls::log; @@ -1529,7 +1534,7 @@ RGWDataChangesLog::synthesize_entries( } catch (const std::exception& e) { push_failed = true; ldpp_dout(dpp, 5) << "RGWDataChangesLog::synthesize_entries(): Backend push " - << "failed with exception: " << e.what() << dendl; + << " failed with exception: " << e.what() << dendl; } co_return !push_failed; } @@ -1597,40 +1602,55 @@ RGWDataChangesLog::decrement_sems( } asio::awaitable -RGWDataChangesLog::recover(const DoutPrefixProvider* dpp, - decltype(recovery_signal)) { - using neorados::WriteOp; - bc::flat_map> semcounts; - for (int index = 0; index < num_shards; ++index) { - // Gather entries in the shard - auto& semcount = semcounts[index]; - co_await read_all_sems(index, &semcount); - // If we have none, no point doing the rest - if (semcount.empty()) { - continue; - } - // Synthesize entries to push - auto pushed = co_await synthesize_entries(dpp, index, semcount); - if (!pushed) { - // If pushing failed, don't decrement any semaphores - ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Pushing shard " - << index << " failed, skipping decrement" << dendl; - continue; - } +RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index) +{ + bc::flat_map semcount; - // Check with other running RGWs, make sure not to decrement - // anything they have in flight. This doesn't cause an issue for - // partial upgrades, since older versions won't be using the - // semaphores at all. - auto notified = co_await gather_working_sets(dpp, index, semcount); - if (!notified) { - ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Gathering " - << "working sets for shard " << index - << " failed, skipping decrement" << dendl; - continue; - } - co_await decrement_sems(index, std::move(semcount)); + // Gather entries in the shard + co_await read_all_sems(index, &semcount); + // If we have none, no point doing the rest + if (semcount.empty()) { + co_return; + } + // Synthesize entries to push + auto pushed = co_await synthesize_entries(dpp, index, semcount); + if (!pushed) { + // If pushing failed, don't decrement any semaphores + ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Pushing shard " + << index << " failed, skipping decrement" << dendl; + co_return; } + + // Check with other running RGWs, make sure not to decrement + // anything they have in flight. This doesn't cause an issue for + // partial upgrades, since older versions won't be using the + // semaphores at all. + auto notified = co_await gather_working_sets(dpp, index, semcount); + if (!notified) { + ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Gathering " + << "working sets for shard " << index + << "failed, skipping decrement" << dendl; + co_return; + } + co_await decrement_sems(index, std::move(semcount)); + co_return; +} + +asio::awaitable RGWDataChangesLog::recover(const DoutPrefixProvider* dpp, + decltype(recovery_signal)) +{ + auto strand = asio::make_strand(co_await asio::this_coro::executor); + co_await asio::co_spawn( + strand, + [this](const DoutPrefixProvider* dpp)-> asio::awaitable { + auto ex = co_await boost::asio::this_coro::executor; + auto group = async::spawn_group{ex, static_cast(num_shards)}; + for (auto i = 0; i < num_shards; ++i) { + boost::asio::co_spawn(ex, recover_shard(dpp, i), group); + } + co_await group.wait(); + }(dpp), + asio::use_awaitable); } void RGWDataChangesLogInfo::dump(Formatter *f) const diff --git a/src/rgw/driver/rados/rgw_datalog.h b/src/rgw/driver/rados/rgw_datalog.h index a97e4f381a4d9..f690d0010b594 100644 --- a/src/rgw/driver/rados/rgw_datalog.h +++ b/src/rgw/driver/rados/rgw_datalog.h @@ -502,6 +502,7 @@ public: asio::awaitable decrement_sems(int index, bc::flat_map&& semcount); + asio::awaitable recover_shard(const DoutPrefixProvider* dpp, int index); asio::awaitable recover(const DoutPrefixProvider* dpp, decltype(recovery_signal)); };