using namespace std::literals;
namespace ranges = std::ranges;
-namespace views = ranges::views;
namespace sys = boost::system;
}
struct recovery_check {
- uint64_t shard = 0;
+ int64_t shard = 0;
+ std::vector<std::string> keys;
recovery_check() = default;
- recovery_check(uint64_t shard) : shard(shard) {}
+ recovery_check(uint64_t shard, std::vector<std::string> keys)
+ : shard(shard), keys(std::move(keys)) {}
void encode(buffer::list& bl) const {
ENCODE_START(1, 1, bl);
encode(shard, bl);
+ encode(keys, bl);
ENCODE_FINISH(bl);
}
void decode(buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
decode(shard, bl);
+ decode(keys, bl);
DECODE_FINISH(bl);
}
-
- operator uint64_t() {
- return shard;
- }
};
WRITE_CLASS_ENCODER(recovery_check);
struct recovery_reply {
- std::unordered_set<std::string> reply_set;
+ std::vector<unsigned> reply_set;
recovery_reply() = default;
- recovery_reply(std::unordered_set<std::string> reply_set)
+ recovery_reply(std::vector<unsigned> reply_set)
: reply_set(std::move(reply_set)) {}
void encode(buffer::list& bl) const {
decode(reply_set, bl);
DECODE_FINISH(bl);
}
-
- operator std::unordered_set<std::string>&() {
- return reply_set;
- }
};
WRITE_CLASS_ENCODER(recovery_reply);
-
asio::awaitable<void>
RGWDataChangesLog::process_notification(const DoutPrefixProvider* dpp,
std::string_view oid) {
auto notification = co_await rados->next_notification(watchcookie,
asio::use_awaitable);
- int shard = 0;
+ recovery_check rc;
// Don't send a reply if we get a bogus notification, we don't
// want recovery to delete semaphores improperly.
try {
- recovery_check rc;
decode(rc, notification.bl);
- shard = rc;
} catch (const std::exception& e) {
- ldpp_dout(dpp, 2) << "Got malformed notification!" << dendl;
+ ldpp_dout(dpp, 2) << "Got malformed notification: " << e.what() << dendl;
co_return;
}
- if (shard >= num_shards) {
- ldpp_dout(dpp, 2) << "Got unknown shard " << shard << dendl;
+ if (rc.shard >= num_shards) {
+ ldpp_dout(dpp, 2) << "Got unknown shard " << rc.shard << dendl;
co_return;
}
recovery_reply reply;
+ reply.reply_set.resize(rc.keys.size(), 0);
std::unique_lock l(lock);
- for (const auto& bg : cur_cycle) {
- if (choose_oid(bg.shard) == shard) {
- reply.reply_set.insert(bg.get_key());
+ for (auto i = 0u; i < rc.keys.size(); ++i) {
+ const auto& key = rc.keys[i];
+ try {
+ if (cur_cycle.contains(BucketGen{key})) {
+ ++reply.reply_set[i];
+ }
+ } catch (const std::exception&) {
+ ldpp_dout(dpp, 2) << "Got invalid BucketGen key: " << key << dendl;
+ co_return;
+ }
+ if (semaphores[rc.shard].contains(key)) {
+ ++reply.reply_set[i];
}
}
- std::copy(semaphores[shard].begin(),
- semaphores[shard].end(),
- std::inserter(reply.reply_set, reply.reply_set.end()));
l.unlock();
buffer::list replybl;
encode(reply, replybl);
return ceph::from_exception(eptr);
}
-asio::awaitable<void>
-RGWDataChangesLog::read_all_sems(int index,
- bc::flat_map<std::string, uint64_t>* out)
-{
+asio::awaitable<std::pair<bc::flat_map<std::string, uint64_t>,
+ std::string>>
+RGWDataChangesLog::read_sems(int index, std::string cursor) {
namespace sem_set = neorados::cls::sem_set;
- std::string cursor;
- do {
- try {
- co_await rados->execute(
- get_sem_set_oid(index), loc,
- neorados::ReadOp{}.exec(sem_set::list(sem_max_keys, cursor, out,
- &cursor)),
- nullptr, asio::use_awaitable);
- } catch (const sys::system_error& e) {
- if (e.code() == sys::errc::no_such_file_or_directory) {
- break;
- } else {
- throw;
- }
+ bc::flat_map<std::string, uint64_t> out;
+ try {
+ co_await rados->execute(
+ get_sem_set_oid(index), loc,
+ neorados::ReadOp{}.exec(sem_set::list(sem_max_keys, std::move(cursor),
+ &out, &cursor)),
+ nullptr, asio::use_awaitable);
+ } catch (const sys::system_error& e) {
+ if (e.code() != sys::errc::no_such_file_or_directory) {
+ throw;
}
- } while (!cursor.empty());
- co_return;
+ }
+ co_return std::make_pair(std::move(out), std::move(cursor));
}
asio::awaitable<bool>
asio::awaitable<bool>
RGWDataChangesLog::gather_working_sets(
const DoutPrefixProvider* dpp,
- int index,
+ int shard,
bc::flat_map<std::string, uint64_t>& semcount)
{
buffer::list bl;
- recovery_check rc = index;
+ recovery_check rc;
+ rc.shard = shard;
+ rc.keys.reserve(semcount.size());
+ for (const auto& [key, count] : semcount) {
+ rc.keys.emplace_back(key);
+ }
encode(rc, bl);
auto [reply_map, missed_set] = co_await rados->notify(
get_sem_set_oid(0), loc, bl, 60s, asio::use_awaitable);
co_return false;
}
for (const auto& [source, reply] : reply_map) {
- recovery_reply keys;
+ recovery_reply counts;
try {
- decode(keys, reply);
+ decode(counts, reply);
} catch (const std::exception& e) {
ldpp_dout(dpp, -1)
<< "RGWDataChangesLog::gather_working_sets(): Failed decoding reply from: "
<< source << dendl;
co_return false;
}
- for (const auto& key : keys.reply_set) {
+ if (rc.keys.size() != counts.reply_set.size()) {
+ ldpp_dout(dpp, -1)
+ << "RGWDataChangesLog::gather_working_sets(): reply set does not match: "
+ << source << dendl;
+ co_return false;
+ }
+ for (auto i = 0u; i < rc.keys.size(); ++i) {
+ const auto& key = rc.keys[i];
+ const auto& count = counts.reply_set[i];
auto iter = semcount.find(key);
if (iter == semcount.end()) {
continue;
}
- if (iter->second == 1) {
+ if (iter->second <= count) {
semcount.erase(iter);
} else {
- --(iter->second);
+ (iter->second) -= count;
}
}
}
asio::awaitable<void>
RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index)
{
- bc::flat_map<std::string, uint64_t> semcount;
+ std::string cursor;
+ do {
+ bc::flat_map<std::string, uint64_t> semcount;
- // Gather entries in the shard
- co_await read_all_sems(index, &semcount);
- // If we have none, no point doing the rest
- if (semcount.empty()) {
- co_return;
- }
- // Synthesize entries to push
- auto pushed = co_await synthesize_entries(dpp, index, semcount);
- if (!pushed) {
- // If pushing failed, don't decrement any semaphores
- ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Pushing shard "
- << index << " failed, skipping decrement" << dendl;
- co_return;
- }
+ // Gather entries in the shard
+ std::tie(semcount, cursor) = co_await read_sems(index, std::move(cursor));
+ // If we have none, no point doing the rest
+ if (semcount.empty()) {
+ break;
+ }
- // Check with other running RGWs, make sure not to decrement
- // anything they have in flight. This doesn't cause an issue for
- // partial upgrades, since older versions won't be using the
- // semaphores at all.
- auto notified = co_await gather_working_sets(dpp, index, semcount);
- if (!notified) {
- ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Gathering "
- << "working sets for shard " << index
- << "failed, skipping decrement" << dendl;
- co_return;
- }
- co_await decrement_sems(index, std::move(semcount));
+ // Synthesize entries to push
+ auto pushed = co_await synthesize_entries(dpp, index, semcount);
+ if (!pushed) {
+ // If pushing failed, don't decrement any semaphores
+ ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover_shard(): Pushing shard "
+ << index << " failed, skipping decrement" << dendl;
+ continue;
+ }
+
+ // Check with other running RGWs, make sure not to decrement
+ // anything they have in flight. This doesn't cause an issue for
+ // partial upgrades, since older versions won't be using the
+ // semaphores at all.
+ auto notified = co_await gather_working_sets(dpp, index, semcount);
+ if (!notified) {
+ ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover_shard(): Gathering "
+ << "working sets for shard " << index
+ << "failed, skipping decrement" << dendl;
+ continue;
+ }
+ co_await decrement_sems(index, std::move(semcount));
+ } while (!cursor.empty());
co_return;
}