]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/multisite/datalog: Don't read the entire sem_set per shard
authorAdam Emerson <aemerson@redhat.com>
Sat, 1 Jun 2024 06:38:23 +0000 (02:38 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Apr 2025 15:10:14 +0000 (11:10 -0400)
Process shards piecewise. Send each set of shards in a window to the
other RGWs and have them acknowledge having it or not.

Signed-off-by: Adam Emerson <aemerson@redhat.com>
src/rgw/driver/rados/rgw_datalog.cc
src/rgw/driver/rados/rgw_datalog.h
src/test/rgw/test_datalog.cc

index 603afa30badb7aa01760fa7b1c0edc8951951d1b..3d51fa23ae92ae0f8c8cbce9201b393d9c3cd959 100644 (file)
@@ -47,7 +47,6 @@ static constexpr auto dout_subsys = ceph_subsys_rgw;
 using namespace std::literals;
 
 namespace ranges = std::ranges;
-namespace views = ranges::views;
 
 namespace sys = boost::system;
 
@@ -572,37 +571,37 @@ RGWDataChangesLog::establish_watch(const DoutPrefixProvider* dpp,
 }
 
 struct recovery_check {
-  uint64_t shard = 0;
+  int64_t shard = 0;
+  std::vector<std::string> keys;
 
   recovery_check() = default;
 
-  recovery_check(uint64_t shard) : shard(shard) {}
+  recovery_check(uint64_t shard, std::vector<std::string> keys)
+    : shard(shard), keys(std::move(keys)) {}
 
   void encode(buffer::list& bl) const {
     ENCODE_START(1, 1, bl);
     encode(shard, bl);
+    encode(keys, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(buffer::list::const_iterator& bl) {
     DECODE_START(1, bl);
     decode(shard, bl);
+    decode(keys, bl);
     DECODE_FINISH(bl);
   }
-
-  operator uint64_t() {
-    return shard;
-  }
 };
 WRITE_CLASS_ENCODER(recovery_check);
 
 
 struct recovery_reply {
-  std::unordered_set<std::string> reply_set;
+  std::vector<unsigned> reply_set;
 
   recovery_reply() = default;
 
-  recovery_reply(std::unordered_set<std::string> reply_set)
+  recovery_reply(std::vector<unsigned> reply_set)
     : reply_set(std::move(reply_set)) {}
 
   void encode(buffer::list& bl) const {
@@ -616,44 +615,44 @@ struct recovery_reply {
     decode(reply_set, bl);
     DECODE_FINISH(bl);
   }
-
-  operator std::unordered_set<std::string>&() {
-    return reply_set;
-  }
 };
 WRITE_CLASS_ENCODER(recovery_reply);
 
-
 asio::awaitable<void>
 RGWDataChangesLog::process_notification(const DoutPrefixProvider* dpp,
                                        std::string_view oid) {
   auto notification = co_await rados->next_notification(watchcookie,
                                                        asio::use_awaitable);
-  int shard = 0;
+  recovery_check rc;
   // Don't send a reply if we get a bogus notification, we don't
   // want recovery to delete semaphores improperly.
   try {
-    recovery_check rc;
     decode(rc, notification.bl);
-    shard = rc;
   } catch (const std::exception& e) {
-    ldpp_dout(dpp, 2) << "Got malformed notification!" << dendl;
+    ldpp_dout(dpp, 2) << "Got malformed notification: " << e.what() << dendl;
     co_return;
   }
-  if (shard >= num_shards) {
-    ldpp_dout(dpp, 2) << "Got unknown shard " << shard << dendl;
+  if (rc.shard >= num_shards) {
+    ldpp_dout(dpp, 2) << "Got unknown shard " << rc.shard << dendl;
     co_return;
   }
   recovery_reply reply;
+  reply.reply_set.resize(rc.keys.size(), 0);
   std::unique_lock l(lock);
-  for (const auto& bg : cur_cycle) {
-    if (choose_oid(bg.shard) == shard) {
-      reply.reply_set.insert(bg.get_key());
+  for (auto i = 0u; i < rc.keys.size(); ++i) {
+    const auto& key = rc.keys[i];
+    try {
+      if (cur_cycle.contains(BucketGen{key})) {
+       ++reply.reply_set[i];
+      }
+    } catch (const std::exception&) {
+      ldpp_dout(dpp, 2) << "Got invalid BucketGen key: " << key << dendl;
+      co_return;
+    }
+    if (semaphores[rc.shard].contains(key)) {
+      ++reply.reply_set[i];
     }
   }
-  std::copy(semaphores[shard].begin(),
-           semaphores[shard].end(),
-           std::inserter(reply.reply_set, reply.reply_set.end()));
   l.unlock();
   buffer::list replybl;
   encode(reply, replybl);
@@ -1561,28 +1560,23 @@ int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp,
   return ceph::from_exception(eptr);
 }
 
-asio::awaitable<void>
-RGWDataChangesLog::read_all_sems(int index,
-                                bc::flat_map<std::string, uint64_t>* out)
-{
+asio::awaitable<std::pair<bc::flat_map<std::string, uint64_t>,
+                         std::string>>
+RGWDataChangesLog::read_sems(int index, std::string cursor) {
   namespace sem_set = neorados::cls::sem_set;
-  std::string cursor;
-  do {
-    try {
-      co_await rados->execute(
-       get_sem_set_oid(index), loc,
-       neorados::ReadOp{}.exec(sem_set::list(sem_max_keys, cursor, out,
-                                             &cursor)),
-       nullptr, asio::use_awaitable);
-    } catch (const sys::system_error& e) {
-      if (e.code() == sys::errc::no_such_file_or_directory) {
-       break;
-      } else {
-       throw;
-      }
+  bc::flat_map<std::string, uint64_t> out;
+  try {
+    co_await rados->execute(
+      get_sem_set_oid(index), loc,
+      neorados::ReadOp{}.exec(sem_set::list(sem_max_keys, std::move(cursor),
+                                           &out, &cursor)),
+      nullptr, asio::use_awaitable);
+  } catch (const sys::system_error& e) {
+    if (e.code() != sys::errc::no_such_file_or_directory) {
+      throw;
     }
-  } while (!cursor.empty());
-  co_return;
+  }
+  co_return std::make_pair(std::move(out), std::move(cursor));
 }
 
 asio::awaitable<bool>
@@ -1627,11 +1621,16 @@ RGWDataChangesLog::synthesize_entries(
 asio::awaitable<bool>
 RGWDataChangesLog::gather_working_sets(
   const DoutPrefixProvider* dpp,
-  int index,
+  int shard,
   bc::flat_map<std::string, uint64_t>& semcount)
 {
   buffer::list bl;
-  recovery_check rc = index;
+  recovery_check rc;
+  rc.shard = shard;
+  rc.keys.reserve(semcount.size());
+  for (const auto& [key, count] : semcount) {
+    rc.keys.emplace_back(key);
+  }
   encode(rc, bl);
   auto [reply_map, missed_set] = co_await rados->notify(
       get_sem_set_oid(0), loc, bl, 60s, asio::use_awaitable);
@@ -1642,24 +1641,32 @@ RGWDataChangesLog::gather_working_sets(
     co_return false;
   }
   for (const auto& [source, reply] : reply_map) {
-    recovery_reply keys;
+    recovery_reply counts;
     try {
-      decode(keys, reply);
+      decode(counts, reply);
     } catch (const std::exception& e) {
       ldpp_dout(dpp, -1)
        << "RGWDataChangesLog::gather_working_sets(): Failed decoding reply from: "
        << source << dendl;
       co_return false;
     }
-    for (const auto& key : keys.reply_set) {
+    if (rc.keys.size() != counts.reply_set.size()) {
+      ldpp_dout(dpp, -1)
+       << "RGWDataChangesLog::gather_working_sets(): reply set does not match: "
+       << source << dendl;
+      co_return false;
+    }
+    for (auto i = 0u; i < rc.keys.size(); ++i) {
+      const auto& key = rc.keys[i];
+      const auto& count = counts.reply_set[i];
       auto iter = semcount.find(key);
       if (iter == semcount.end()) {
        continue;
       }
-      if (iter->second == 1) {
+      if (iter->second <= count) {
        semcount.erase(iter);
       } else {
-       --(iter->second);
+       (iter->second) -= count;
       }
     }
   }
@@ -1689,35 +1696,39 @@ RGWDataChangesLog::decrement_sems(
 asio::awaitable<void>
 RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index)
 {
-  bc::flat_map<std::string, uint64_t> semcount;
+  std::string cursor;
+  do {
+    bc::flat_map<std::string, uint64_t> semcount;
 
-  // Gather entries in the shard
-  co_await read_all_sems(index, &semcount);
-  // If we have none, no point doing the rest
-  if (semcount.empty()) {
-    co_return;
-  }
-  // Synthesize entries to push
-  auto pushed = co_await synthesize_entries(dpp, index, semcount);
-  if (!pushed) {
-    // If pushing failed, don't decrement any semaphores
-    ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Pushing shard "
-                     << index << " failed, skipping decrement" << dendl;
-    co_return;
-  }
+    // Gather entries in the shard
+    std::tie(semcount, cursor) = co_await read_sems(index, std::move(cursor));
+    // If we have none, no point doing the rest
+    if (semcount.empty()) {
+      break;
+    }
 
-  // Check with other running RGWs, make sure not to decrement
-  // anything they have in flight. This doesn't cause an issue for
-  // partial upgrades, since older versions won't be using the
-  // semaphores at all.
-  auto notified = co_await gather_working_sets(dpp, index, semcount);
-  if (!notified) {
-    ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Gathering "
-                     << "working sets for shard " << index
-                     << "failed, skipping decrement" << dendl;
-    co_return;
-  }
-  co_await decrement_sems(index, std::move(semcount));
+    // Synthesize entries to push
+    auto pushed = co_await synthesize_entries(dpp, index, semcount);
+    if (!pushed) {
+      // If pushing failed, don't decrement any semaphores
+      ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover_shard(): Pushing shard "
+                       << index << " failed, skipping decrement" << dendl;
+      continue;
+    }
+
+    // Check with other running RGWs, make sure not to decrement
+    // anything they have in flight. This doesn't cause an issue for
+    // partial upgrades, since older versions won't be using the
+    // semaphores at all.
+    auto notified = co_await gather_working_sets(dpp, index, semcount);
+    if (!notified) {
+      ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover_shard(): Gathering "
+                       << "working sets for shard " << index
+                       << "failed, skipping decrement" << dendl;
+      continue;
+    }
+    co_await decrement_sems(index, std::move(semcount));
+  } while (!cursor.empty());
   co_return;
 }
 
index 43e9ca446ab03ca05beeb6f05dffb74867bdd6d2..fdd099733911573349691f1446c8ec20fb58603e 100644 (file)
@@ -512,8 +512,9 @@ public:
   int trim_generations(const DoutPrefixProvider *dpp,
                       std::optional<uint64_t>& through,
                       optional_yield y);
-  asio::awaitable<void> read_all_sems(int index,
-                                     bc::flat_map<std::string, uint64_t>* out);
+  asio::awaitable<std::pair<bc::flat_map<std::string, uint64_t>,
+                           std::string>>
+  read_sems(int index, std::string cursor);
   asio::awaitable<bool>
   synthesize_entries(const DoutPrefixProvider* dpp, int index,
                     const bc::flat_map<std::string, uint64_t>& semcount);
index 4f32d0b9d27a13c3ff3b282185de9be5fa14a8f0..2fea326c81a620af571df5dadcf2853c47bba568 100644 (file)
@@ -93,12 +93,34 @@ protected:
                           boost::asio::use_awaitable, ver);
   }
 
+  asio::awaitable<void>
+  read_all_sems(int index,
+               bc::flat_map<std::string, uint64_t>* out) {
+    std::string cursor;
+    do {
+      try {
+       co_await rados().execute(
+         datalog->get_sem_set_oid(index), datalog->loc,
+         neorados::ReadOp{}.exec(ss::list(datalog->sem_max_keys, cursor, out,
+                                          &cursor)),
+         nullptr, asio::use_awaitable);
+      } catch (const sys::system_error& e) {
+       if (e.code() == sys::errc::no_such_file_or_directory) {
+         break;
+       } else {
+         throw;
+       }
+      }
+    } while (!cursor.empty());
+    co_return;
+  }
+
   asio::awaitable<bc::flat_map<std::string, uint64_t>>
   read_all_sems_all_shards() {
     bc::flat_map<std::string, uint64_t> all_sems;
 
     for (auto i = 0; i < datalog->num_shards; ++i) {
-      co_await datalog->read_all_sems(i, &all_sems);
+      co_await read_all_sems(i, &all_sems);
     }
     co_return std::move(all_sems);
   }