From b22867383a2d9c7b13d3eb53b3f12be830835fa1 Mon Sep 17 00:00:00 2001 From: Adam Emerson Date: Sat, 1 Jun 2024 02:38:23 -0400 Subject: [PATCH] rgw/multisite/datalog: Don't read the entire sem_set per shard Process shards piecewise. Send each set of shards in a window to the other RGWs and have them acknowledge having it or not. Signed-off-by: Adam Emerson --- src/rgw/driver/rados/rgw_datalog.cc | 171 +++++++++++++++------------- src/rgw/driver/rados/rgw_datalog.h | 5 +- src/test/rgw/test_datalog.cc | 24 +++- 3 files changed, 117 insertions(+), 83 deletions(-) diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc index 603afa30bad..3d51fa23ae9 100644 --- a/src/rgw/driver/rados/rgw_datalog.cc +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -47,7 +47,6 @@ static constexpr auto dout_subsys = ceph_subsys_rgw; using namespace std::literals; namespace ranges = std::ranges; -namespace views = ranges::views; namespace sys = boost::system; @@ -572,37 +571,37 @@ RGWDataChangesLog::establish_watch(const DoutPrefixProvider* dpp, } struct recovery_check { - uint64_t shard = 0; + int64_t shard = 0; + std::vector keys; recovery_check() = default; - recovery_check(uint64_t shard) : shard(shard) {} + recovery_check(uint64_t shard, std::vector keys) + : shard(shard), keys(std::move(keys)) {} void encode(buffer::list& bl) const { ENCODE_START(1, 1, bl); encode(shard, bl); + encode(keys, bl); ENCODE_FINISH(bl); } void decode(buffer::list::const_iterator& bl) { DECODE_START(1, bl); decode(shard, bl); + decode(keys, bl); DECODE_FINISH(bl); } - - operator uint64_t() { - return shard; - } }; WRITE_CLASS_ENCODER(recovery_check); struct recovery_reply { - std::unordered_set reply_set; + std::vector reply_set; recovery_reply() = default; - recovery_reply(std::unordered_set reply_set) + recovery_reply(std::vector reply_set) : reply_set(std::move(reply_set)) {} void encode(buffer::list& bl) const { @@ -616,44 +615,44 @@ struct recovery_reply { decode(reply_set, bl); DECODE_FINISH(bl); } - - operator std::unordered_set&() { - return reply_set; - } }; WRITE_CLASS_ENCODER(recovery_reply); - asio::awaitable RGWDataChangesLog::process_notification(const DoutPrefixProvider* dpp, std::string_view oid) { auto notification = co_await rados->next_notification(watchcookie, asio::use_awaitable); - int shard = 0; + recovery_check rc; // Don't send a reply if we get a bogus notification, we don't // want recovery to delete semaphores improperly. try { - recovery_check rc; decode(rc, notification.bl); - shard = rc; } catch (const std::exception& e) { - ldpp_dout(dpp, 2) << "Got malformed notification!" << dendl; + ldpp_dout(dpp, 2) << "Got malformed notification: " << e.what() << dendl; co_return; } - if (shard >= num_shards) { - ldpp_dout(dpp, 2) << "Got unknown shard " << shard << dendl; + if (rc.shard >= num_shards) { + ldpp_dout(dpp, 2) << "Got unknown shard " << rc.shard << dendl; co_return; } recovery_reply reply; + reply.reply_set.resize(rc.keys.size(), 0); std::unique_lock l(lock); - for (const auto& bg : cur_cycle) { - if (choose_oid(bg.shard) == shard) { - reply.reply_set.insert(bg.get_key()); + for (auto i = 0u; i < rc.keys.size(); ++i) { + const auto& key = rc.keys[i]; + try { + if (cur_cycle.contains(BucketGen{key})) { + ++reply.reply_set[i]; + } + } catch (const std::exception&) { + ldpp_dout(dpp, 2) << "Got invalid BucketGen key: " << key << dendl; + co_return; + } + if (semaphores[rc.shard].contains(key)) { + ++reply.reply_set[i]; } } - std::copy(semaphores[shard].begin(), - semaphores[shard].end(), - std::inserter(reply.reply_set, reply.reply_set.end())); l.unlock(); buffer::list replybl; encode(reply, replybl); @@ -1561,28 +1560,23 @@ int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, return ceph::from_exception(eptr); } -asio::awaitable -RGWDataChangesLog::read_all_sems(int index, - bc::flat_map* out) -{ +asio::awaitable, + std::string>> +RGWDataChangesLog::read_sems(int index, std::string cursor) { namespace sem_set = neorados::cls::sem_set; - std::string cursor; - do { - try { - co_await rados->execute( - get_sem_set_oid(index), loc, - neorados::ReadOp{}.exec(sem_set::list(sem_max_keys, cursor, out, - &cursor)), - nullptr, asio::use_awaitable); - } catch (const sys::system_error& e) { - if (e.code() == sys::errc::no_such_file_or_directory) { - break; - } else { - throw; - } + bc::flat_map out; + try { + co_await rados->execute( + get_sem_set_oid(index), loc, + neorados::ReadOp{}.exec(sem_set::list(sem_max_keys, std::move(cursor), + &out, &cursor)), + nullptr, asio::use_awaitable); + } catch (const sys::system_error& e) { + if (e.code() != sys::errc::no_such_file_or_directory) { + throw; } - } while (!cursor.empty()); - co_return; + } + co_return std::make_pair(std::move(out), std::move(cursor)); } asio::awaitable @@ -1627,11 +1621,16 @@ RGWDataChangesLog::synthesize_entries( asio::awaitable RGWDataChangesLog::gather_working_sets( const DoutPrefixProvider* dpp, - int index, + int shard, bc::flat_map& semcount) { buffer::list bl; - recovery_check rc = index; + recovery_check rc; + rc.shard = shard; + rc.keys.reserve(semcount.size()); + for (const auto& [key, count] : semcount) { + rc.keys.emplace_back(key); + } encode(rc, bl); auto [reply_map, missed_set] = co_await rados->notify( get_sem_set_oid(0), loc, bl, 60s, asio::use_awaitable); @@ -1642,24 +1641,32 @@ RGWDataChangesLog::gather_working_sets( co_return false; } for (const auto& [source, reply] : reply_map) { - recovery_reply keys; + recovery_reply counts; try { - decode(keys, reply); + decode(counts, reply); } catch (const std::exception& e) { ldpp_dout(dpp, -1) << "RGWDataChangesLog::gather_working_sets(): Failed decoding reply from: " << source << dendl; co_return false; } - for (const auto& key : keys.reply_set) { + if (rc.keys.size() != counts.reply_set.size()) { + ldpp_dout(dpp, -1) + << "RGWDataChangesLog::gather_working_sets(): reply set does not match: " + << source << dendl; + co_return false; + } + for (auto i = 0u; i < rc.keys.size(); ++i) { + const auto& key = rc.keys[i]; + const auto& count = counts.reply_set[i]; auto iter = semcount.find(key); if (iter == semcount.end()) { continue; } - if (iter->second == 1) { + if (iter->second <= count) { semcount.erase(iter); } else { - --(iter->second); + (iter->second) -= count; } } } @@ -1689,35 +1696,39 @@ RGWDataChangesLog::decrement_sems( asio::awaitable RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index) { - bc::flat_map semcount; + std::string cursor; + do { + bc::flat_map semcount; - // Gather entries in the shard - co_await read_all_sems(index, &semcount); - // If we have none, no point doing the rest - if (semcount.empty()) { - co_return; - } - // Synthesize entries to push - auto pushed = co_await synthesize_entries(dpp, index, semcount); - if (!pushed) { - // If pushing failed, don't decrement any semaphores - ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Pushing shard " - << index << " failed, skipping decrement" << dendl; - co_return; - } + // Gather entries in the shard + std::tie(semcount, cursor) = co_await read_sems(index, std::move(cursor)); + // If we have none, no point doing the rest + if (semcount.empty()) { + break; + } - // Check with other running RGWs, make sure not to decrement - // anything they have in flight. This doesn't cause an issue for - // partial upgrades, since older versions won't be using the - // semaphores at all. - auto notified = co_await gather_working_sets(dpp, index, semcount); - if (!notified) { - ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Gathering " - << "working sets for shard " << index - << "failed, skipping decrement" << dendl; - co_return; - } - co_await decrement_sems(index, std::move(semcount)); + // Synthesize entries to push + auto pushed = co_await synthesize_entries(dpp, index, semcount); + if (!pushed) { + // If pushing failed, don't decrement any semaphores + ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover_shard(): Pushing shard " + << index << " failed, skipping decrement" << dendl; + continue; + } + + // Check with other running RGWs, make sure not to decrement + // anything they have in flight. This doesn't cause an issue for + // partial upgrades, since older versions won't be using the + // semaphores at all. + auto notified = co_await gather_working_sets(dpp, index, semcount); + if (!notified) { + ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover_shard(): Gathering " + << "working sets for shard " << index + << "failed, skipping decrement" << dendl; + continue; + } + co_await decrement_sems(index, std::move(semcount)); + } while (!cursor.empty()); co_return; } diff --git a/src/rgw/driver/rados/rgw_datalog.h b/src/rgw/driver/rados/rgw_datalog.h index 43e9ca446ab..fdd09973391 100644 --- a/src/rgw/driver/rados/rgw_datalog.h +++ b/src/rgw/driver/rados/rgw_datalog.h @@ -512,8 +512,9 @@ public: int trim_generations(const DoutPrefixProvider *dpp, std::optional& through, optional_yield y); - asio::awaitable read_all_sems(int index, - bc::flat_map* out); + asio::awaitable, + std::string>> + read_sems(int index, std::string cursor); asio::awaitable synthesize_entries(const DoutPrefixProvider* dpp, int index, const bc::flat_map& semcount); diff --git a/src/test/rgw/test_datalog.cc b/src/test/rgw/test_datalog.cc index 4f32d0b9d27..2fea326c81a 100644 --- a/src/test/rgw/test_datalog.cc +++ b/src/test/rgw/test_datalog.cc @@ -93,12 +93,34 @@ protected: boost::asio::use_awaitable, ver); } + asio::awaitable + read_all_sems(int index, + bc::flat_map* out) { + std::string cursor; + do { + try { + co_await rados().execute( + datalog->get_sem_set_oid(index), datalog->loc, + neorados::ReadOp{}.exec(ss::list(datalog->sem_max_keys, cursor, out, + &cursor)), + nullptr, asio::use_awaitable); + } catch (const sys::system_error& e) { + if (e.code() == sys::errc::no_such_file_or_directory) { + break; + } else { + throw; + } + } + } while (!cursor.empty()); + co_return; + } + asio::awaitable> read_all_sems_all_shards() { bc::flat_map all_sems; for (auto i = 0; i < datalog->num_shards; ++i) { - co_await datalog->read_all_sems(i, &all_sems); + co_await read_all_sems(i, &all_sems); } co_return std::move(all_sems); } -- 2.39.5