#include <boost/container/flat_map.hpp>
#include <boost/system/system_error.hpp>
+#include "common/async/parallel_for_each.h"
#include "include/fs_types.h"
#include "include/neorados/RADOS.hpp"
#include "common/async/blocked_completion.h"
+#include "common/async/co_throttle.h"
#include "common/async/librados_completion.h"
#include "common/async/yield_context.h"
using namespace std::literals;
+namespace ranges = std::ranges;
+namespace views = ranges::views;
+
namespace sys = boost::system;
namespace nlog = ::neorados::cls::log;
} catch (const std::exception& e) {
push_failed = true;
ldpp_dout(dpp, 5) << "RGWDataChangesLog::synthesize_entries(): Backend push "
- << "failed with exception: " << e.what() << dendl;
+ << " failed with exception: " << e.what() << dendl;
}
co_return !push_failed;
}
}
asio::awaitable<void>
-RGWDataChangesLog::recover(const DoutPrefixProvider* dpp,
- decltype(recovery_signal)) {
- using neorados::WriteOp;
- bc::flat_map<uint64_t, bc::flat_map<std::string, uint64_t>> semcounts;
- for (int index = 0; index < num_shards; ++index) {
- // Gather entries in the shard
- auto& semcount = semcounts[index];
- co_await read_all_sems(index, &semcount);
- // If we have none, no point doing the rest
- if (semcount.empty()) {
- continue;
- }
- // Synthesize entries to push
- auto pushed = co_await synthesize_entries(dpp, index, semcount);
- if (!pushed) {
- // If pushing failed, don't decrement any semaphores
- ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Pushing shard "
- << index << " failed, skipping decrement" << dendl;
- continue;
- }
+RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index)
+{
+ bc::flat_map<std::string, uint64_t> semcount;
- // Check with other running RGWs, make sure not to decrement
- // anything they have in flight. This doesn't cause an issue for
- // partial upgrades, since older versions won't be using the
- // semaphores at all.
- auto notified = co_await gather_working_sets(dpp, index, semcount);
- if (!notified) {
- ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Gathering "
- << "working sets for shard " << index
- << " failed, skipping decrement" << dendl;
- continue;
- }
- co_await decrement_sems(index, std::move(semcount));
+ // Gather entries in the shard
+ co_await read_all_sems(index, &semcount);
+ // If we have none, no point doing the rest
+ if (semcount.empty()) {
+ co_return;
+ }
+ // Synthesize entries to push
+ auto pushed = co_await synthesize_entries(dpp, index, semcount);
+ if (!pushed) {
+ // If pushing failed, don't decrement any semaphores
+ ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Pushing shard "
+ << index << " failed, skipping decrement" << dendl;
+ co_return;
}
+
+ // Check with other running RGWs, make sure not to decrement
+ // anything they have in flight. This doesn't cause an issue for
+ // partial upgrades, since older versions won't be using the
+ // semaphores at all.
+ auto notified = co_await gather_working_sets(dpp, index, semcount);
+ if (!notified) {
+ ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Gathering "
+ << "working sets for shard " << index
+ << "failed, skipping decrement" << dendl;
+ co_return;
+ }
+ co_await decrement_sems(index, std::move(semcount));
+ co_return;
+}
+
+asio::awaitable<void> RGWDataChangesLog::recover(const DoutPrefixProvider* dpp,
+ decltype(recovery_signal))
+{
+ auto strand = asio::make_strand(co_await asio::this_coro::executor);
+ co_await asio::co_spawn(
+ strand,
+ [this](const DoutPrefixProvider* dpp)-> asio::awaitable<void, decltype(strand)> {
+ auto ex = co_await boost::asio::this_coro::executor;
+ auto group = async::spawn_group{ex, static_cast<size_t>(num_shards)};
+ for (auto i = 0; i < num_shards; ++i) {
+ boost::asio::co_spawn(ex, recover_shard(dpp, i), group);
+ }
+ co_await group.wait();
+ }(dpp),
+ asio::use_awaitable);
}
void RGWDataChangesLogInfo::dump(Formatter *f) const