]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite/datalog: Semaphores and Recovery
authorAdam Emerson <aemerson@redhat.com>
Tue, 2 Apr 2024 19:47:29 +0000 (15:47 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Apr 2025 15:10:14 +0000 (11:10 -0400)
Increment in add_entry, decrement in renew_entry, and recover on
startup.

Signed-off-by: Adam Emerson <aemerson@redhat.com>
src/rgw/driver/rados/rgw_datalog.cc
src/rgw/driver/rados/rgw_datalog.h

index 9a498666a05946a86d63101645322efc8d65fa56..31ef9c350f428aa1729acb844fa3df2d5bfe41ba 100644 (file)
@@ -8,7 +8,10 @@
 #include <vector>
 
 #include <boost/asio/awaitable.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
 #include <boost/asio/co_spawn.hpp>
+#include <boost/container/flat_set.hpp>
+#include <boost/container/flat_map.hpp>
 #include <boost/system/system_error.hpp>
 
 #include "include/fs_types.h"
@@ -24,6 +27,7 @@
 
 #include "neorados/cls/fifo.h"
 #include "neorados/cls/log.h"
+#include "neorados/cls/sem_set.h"
 
 #include "rgw_asio_thread.h"
 #include "rgw_bucket.h"
@@ -351,6 +355,13 @@ RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
     prefix(get_prefix()),
     changes(cct->_conf->rgw_data_log_changes_size) {}
 
+RGWDataChangesLog::RGWDataChangesLog(CephContext* cct, bool log_data,
+                                    neorados::RADOS* rados)
+  : cct(cct), rados(rados), log_data(log_data),
+    num_shards(cct->_conf->rgw_data_log_num_shards),
+    prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size) {}
+
+
 void DataLogBackends::handle_init(entries_t e) {
   std::unique_lock l(m);
   for (const auto& [gen_id, gen] : e) {
@@ -413,72 +424,270 @@ void DataLogBackends::handle_empty_to(uint64_t new_tail) {
 }
 
 
-int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, const RGWZone* _zone,
+int RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
+                            const RGWZone* zone,
                             const RGWZoneParams& zoneparams,
-                            rgw::sal::RadosStore* _store)
+                            rgw::sal::RadosStore* store)
 {
-  zone = _zone;
-  store = _store;
-  ceph_assert(zone);
-  auto defbacking = to_log_type(
-    cct->_conf.get_val<std::string>("rgw_default_data_log_backing"));
-  // Should be guaranteed by `set_enum_allowed`
-  ceph_assert(defbacking);
-  auto log_pool = zoneparams.log_pool;
+  log_data = zone->log_data;
+  rados = &store->get_neorados();
 
   try {
-    std::exception_ptr eptr;
-    std::tie(eptr, loc) =
-      asio::co_spawn(store->get_io_context(),
-                    rgw::init_iocontext(dpp, store->get_neorados(),
-                                        log_pool, rgw::create,
-                                        asio::use_awaitable),
-                    async::use_blocked);
+    // Blocking in startup code, not ideal, but won't hurt anything.
+    std::exception_ptr eptr
+      = asio::co_spawn(store->get_io_context(),
+                      start(dpp, zoneparams.log_pool),
+                      async::use_blocked);
     if (eptr) {
       std::rethrow_exception(eptr);
     }
   } catch (const sys::system_error& e) {
     ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
-                      << ": Failed to initialized ioctx: " << e.what()
-                      << ", pool=" << log_pool << dendl;
+                      << ": Failed to start datalog: " << e.what()
+                      << dendl;
     return ceph::from_error_code(e.code());
+  } catch (const std::exception& e) {
+    ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+                      << ": Failed to start datalog: " << e.what()
+                      << dendl;
+    return -EIO;
+  }
+  return 0;
+}
+
+asio::awaitable<void>
+RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
+                        const rgw_pool& log_pool,
+                        bool recovery,
+                        bool watch,
+                        bool renew)
+{
+  if (!log_data) {
+    co_return;
+  }
+  auto defbacking = to_log_type(
+    cct->_conf.get_val<std::string>("rgw_default_data_log_backing"));
+  // Should be guaranteed by `set_enum_allowed`
+  ceph_assert(defbacking);
+  try {
+    loc = co_await rgw::init_iocontext(dpp, *rados, log_pool,
+                                      rgw::create, asio::use_awaitable);
   } catch (const std::exception& e) {
     ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
                       << ": Failed to initialized ioctx: " << e.what()
                       << ", pool=" << log_pool << dendl;
-    return -EIO;
+    throw;
   }
 
-  // Blocking in startup code, not ideal, but won't hurt anything.
   try {
-    std::exception_ptr eptr;
-    std::tie(eptr, bes) =
-      asio::co_spawn(
-       store->get_io_context().get_executor(),
-       logback_generations::init<DataLogBackends>(
-         dpp, store->get_neorados(), metadata_log_oid(), loc,
-         [this](uint64_t gen_id, int shard) {
-           return get_oid(gen_id, shard);
-         }, num_shards, *defbacking, *this),
-       async::use_blocked);
-    if (eptr) {
-      std::rethrow_exception(eptr);
-    }
-  } catch (const sys::system_error& e) {
-    lderr(cct) << __PRETTY_FUNCTION__
-              << ": Error initializing backends: "
-              << e.what() << dendl;
-    return ceph::from_error_code(e.code());
+    bes = co_await logback_generations::init<DataLogBackends>(
+      dpp, *rados, metadata_log_oid(), loc,
+      [this](uint64_t gen_id, int shard) {
+       return get_oid(gen_id, shard);
+      }, num_shards, *defbacking, *this);
   } catch (const std::exception& e) {
     ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
                       << ": Error initializing backends: " << e.what()
                       << dendl;
-    return -EIO;
+    throw;
   }
 
-  asio::co_spawn(store->get_io_context().get_executor(),
-                renew_run(), asio::detached);
-  return 0;
+  if (renew) {
+    asio::co_spawn(co_await asio::this_coro::executor,
+                   renew_run(renew_signal),
+                   asio::bind_cancellation_slot(renew_signal->slot(),
+                                                asio::detached));
+  }
+  if (watch) {
+    // Establish watch here so we won't be 'started up' until we're watching.
+    const auto oid = get_sem_set_oid(0);
+    auto established = co_await establish_watch(dpp, oid);
+    if (!established) {
+      throw sys::system_error{ENOTCONN, sys::generic_category(),
+                             "Unable to establish recovery watch!"};
+    }
+    asio::co_spawn(co_await asio::this_coro::executor,
+                   watch_loop(watch_signal),
+                   asio::bind_cancellation_slot(watch_signal->slot(),
+                                                asio::detached));
+  }
+  if (recovery) {
+    // Recovery can run concurrent with normal operation, so we don't
+    // have to block startup while we do all that I/O.
+    asio::co_spawn(co_await asio::this_coro::executor,
+                   recover(dpp, recovery_signal),
+                   asio::bind_cancellation_slot(recovery_signal->slot(),
+                                                asio::detached));
+  }
+  co_return;
+}
+
+asio::awaitable<bool>
+RGWDataChangesLog::establish_watch(const DoutPrefixProvider* dpp,
+                                  std::string_view oid) {
+  const auto queue_depth = num_shards * 128;
+  try {
+    co_await rados->execute(oid, loc, neorados::WriteOp{}.create(false),
+                           asio::use_awaitable);
+    watchcookie = co_await rados->watch(oid, loc, asio::use_awaitable,
+                                       std::nullopt, queue_depth);
+  } catch (const std::exception& e) {
+    ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+                      << ": Unable to start watch! Error: "
+                      << e.what() << dendl;
+    watchcookie = 0;
+  }
+
+  if (watchcookie == 0) {
+    // Dump our current working set.
+    co_await renew_entries(dpp);
+  }
+
+  co_return watchcookie != 0;
+}
+
+struct recovery_check {
+  uint64_t shard = 0;
+
+  recovery_check() = default;
+
+  recovery_check(uint64_t shard) : shard(shard) {}
+
+  void encode(buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(shard, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(shard, bl);
+    DECODE_FINISH(bl);
+  }
+
+  operator uint64_t() {
+    return shard;
+  }
+};
+WRITE_CLASS_ENCODER(recovery_check);
+
+
+struct recovery_reply {
+  std::unordered_set<std::string> reply_set;
+
+  recovery_reply() = default;
+
+  recovery_reply(std::unordered_set<std::string> reply_set)
+    : reply_set(std::move(reply_set)) {}
+
+  void encode(buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(reply_set, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(reply_set, bl);
+    DECODE_FINISH(bl);
+  }
+
+  operator std::unordered_set<std::string>&() {
+    return reply_set;
+  }
+};
+WRITE_CLASS_ENCODER(recovery_reply);
+
+
+asio::awaitable<void>
+RGWDataChangesLog::process_notification(const DoutPrefixProvider* dpp,
+                                       std::string_view oid) {
+  auto notification = co_await rados->next_notification(watchcookie,
+                                                       asio::use_awaitable);
+  int shard = 0;
+  // Don't send a reply if we get a bogus notification, we don't
+  // want recovery to delete semaphores improperly.
+  try {
+    recovery_check rc;
+    decode(rc, notification.bl);
+    shard = rc;
+  } catch (const std::exception& e) {
+    ldpp_dout(dpp, 2) << "Got malformed notification!" << dendl;
+    co_return;
+  }
+  if (shard >= num_shards) {
+    ldpp_dout(dpp, 2) << "Got unknown shard " << shard << dendl;
+    co_return;
+  }
+  recovery_reply reply;
+  std::unique_lock l(lock);
+  for (const auto& bg : cur_cycle) {
+    if (choose_oid(bg.shard) == shard) {
+      reply.reply_set.insert(bg.get_key());
+    }
+  }
+  std::copy(semaphores[shard].begin(),
+           semaphores[shard].end(),
+           std::inserter(reply.reply_set, reply.reply_set.end()));
+  l.unlock();
+  buffer::list replybl;
+  encode(reply, replybl);
+  try {
+    co_await rados->notify_ack(oid, loc, notification.notify_id, watchcookie,
+                              std::move(replybl), asio::use_awaitable);
+  } catch (const std::exception& e) {
+    ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__
+                      << ": Failed ack. Whatever server is in "
+                      << "recovery won't decrement semaphores: "
+                      << e.what() << dendl;
+  }
+}
+
+asio::awaitable<void> RGWDataChangesLog::watch_loop(decltype(watch_signal)) {
+  const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: ");
+  const auto oid = get_sem_set_oid(0);
+  bool need_rewatch = false;
+
+  while (!going_down()) {
+    try {
+      co_await process_notification(&dp, oid);
+    } catch (const sys::system_error& e) {
+      if (e.code() == neorados::errc::notification_overflow) {
+       ldpp_dout(&dp, 10) << __PRETTY_FUNCTION__
+                          << ": Notification overflow. Whatever server is in "
+                          << "recovery won't decrement semaphores." << dendl;
+       continue;
+      }
+      if (going_down()) {
+       break;
+      } else {
+       need_rewatch = true;
+      }
+    }
+    if (need_rewatch) {
+      try {
+       if (watchcookie) {
+         auto wc = watchcookie;
+         watchcookie = 0;
+         co_await rados->unwatch(wc, loc, asio::use_awaitable);
+       }
+      } catch (const std::exception& e) {
+       // Watch may not exist, don't care.
+      }
+      bool rewatched = false;
+      ldpp_dout(&dp, 10) << __PRETTY_FUNCTION__
+                        << ": Trying to re-establish watch" << dendl;
+
+      rewatched = co_await establish_watch(&dp, oid);
+      while (!rewatched) {
+       boost::asio::steady_timer t(co_await asio::this_coro::executor, 500ms);
+       co_await t.async_wait(asio::use_awaitable);
+       ldpp_dout(&dp, 10) << __PRETTY_FUNCTION__
+                          << ": Trying to re-establish watch" << dendl;
+       rewatched = co_await establish_watch(&dp, oid);
+      }
+    }
+  }
 }
 
 int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
@@ -492,21 +701,27 @@ int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
 asio::awaitable<void>
 RGWDataChangesLog::renew_entries(const DoutPrefixProvider* dpp)
 {
-  if (!zone->log_data)
+  if (!log_data) {
     co_return;
+  }
 
-  /* we can't keep the bucket name as part of the cls::log::entry, and we need
-   * it later, so we keep two lists under the map */
+  /* we can't keep the bucket name as part of the datalog entry, and
+   * we need it later, so we keep two lists under the map */
   bc::flat_map<int, std::pair<std::vector<BucketGen>,
                              RGWDataChangesBE::entries>> m;
 
   std::unique_lock l(lock);
   decltype(cur_cycle) entries;
   entries.swap(cur_cycle);
+  for (const auto& [bs, gen] : entries) {
+    unsigned index = choose_oid(bs);
+    semaphores[index].insert(BucketGen{bs, gen}.get_key());
+  }
   l.unlock();
 
   auto ut = real_clock::now();
   auto be = bes->head();
+
   for (const auto& [bs, gen] : entries) {
     auto index = choose_oid(bs);
 
@@ -522,17 +737,54 @@ RGWDataChangesLog::renew_entries(const DoutPrefixProvider* dpp)
     be->prepare(ut, change.key, std::move(bl), m[index].second);
   }
 
+  auto push_failed = false;
   for (auto& [index, p] : m) {
     auto& [buckets, entries] = p;
 
     auto now = real_clock::now();
-    co_await be->push(dpp, index, std::move(entries));
+    // Failure on push isn't fatal.
+    try {
+      co_await be->push(dpp, index, std::move(entries));
+    } catch (const std::exception& e) {
+      push_failed = true;
+      ldpp_dout(dpp, 5) << "RGWDataChangesLog::renew_entries(): Backend push failed "
+                       << "with exception: " << e.what() << dendl;
+    }
+
     auto expiration = now;
     expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
     for (auto& [bs, gen] : buckets) {
       update_renewed(bs, gen, expiration);
     }
   }
+  if (push_failed) {
+    co_return;
+  }
+
+  namespace sem_set = neorados::cls::sem_set;
+  // If we didn't error in pushing, we can now decrement the semaphores
+  l.lock();
+  for (auto index = 0u; index < unsigned(num_shards); ++index) {
+    using neorados::WriteOp;
+    auto& keys = semaphores[index];
+    while (!keys.empty()) {
+      bc::flat_set<std::string> batch;
+      // Can't use a move iterator here, since the keys have to stay
+      // until they're safely on the OSD to avoid the risk of
+      // double-decrement from recovery.
+      auto to_copy = std::min(sem_max_keys, keys.size());
+      std::copy_n(keys.begin(), to_copy,
+                 std::inserter(batch, batch.end()));
+      auto op = WriteOp{}.exec(sem_set::decrement(std::move(batch)));
+      l.unlock();
+      co_await rados->execute(get_sem_set_oid(index), loc, std::move(op),
+                             asio::use_awaitable);
+      l.lock();
+      auto iter = keys.cbegin();
+      std::advance(iter, to_copy);
+      keys.erase(keys.cbegin(), iter);
+    }
+  }
   co_return;
 }
 
@@ -542,18 +794,16 @@ auto RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
 {
   ChangeStatusPtr status;
   if (!changes.find({bs, gen}, status)) {
-    status = std::make_shared<ChangeStatus>(store->get_io_context()
-                                           .get_executor());
+    status = std::make_shared<ChangeStatus>(rados->get_executor());
     changes.add({bs, gen}, status);
   }
   return status;
 }
 
-void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs,
-                                      const rgw::bucket_log_layout_generation& gen)
+bool RGWDataChangesLog::register_renew(BucketGen bg)
 {
   std::scoped_lock l{lock};
-  cur_cycle.insert({bs, gen.gen});
+  return cur_cycle.insert(bg).second;
 }
 
 void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
@@ -591,7 +841,6 @@ RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp,
       optional_yield y(yc);
       return bucket_filter(bucket, y, dpp);
     }, asio::use_awaitable);
-  co_return true;
 }
 
 std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
@@ -600,13 +849,17 @@ std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
          fmt::format("{}.{}", prefix, i));
 }
 
+std::string RGWDataChangesLog::get_sem_set_oid(int i) const {
+  return fmt::format("_sem_set{}.{}", prefix, i);
+}
+
 asio::awaitable<void>
 RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
                             const RGWBucketInfo& bucket_info,
                             const rgw::bucket_log_layout_generation& gen,
                             int shard_id)
 {
-  if (!zone->log_data) {
+  if (!log_data) {
     co_return;
   }
 
@@ -623,6 +876,27 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
   rgw_bucket_shard bs(bucket, shard_id);
 
   int index = choose_oid(bs);
+  if (!(watchcookie && rados->check_watch(watchcookie))) {
+    auto now = real_clock::now();
+    ldpp_dout(dpp, 2) << "RGWDataChangesLog::add_entry(): "
+                     << "Bypassing window optimization and pushing directly: "
+                     << "bucket.name=" << bucket.name
+                     << " shard_id=" << shard_id << " now="
+                     << now << " cur_expiration=" << dendl;
+
+    buffer::list bl;
+    rgw_data_change change;
+    change.entity_type = ENTITY_TYPE_BUCKET;
+    change.key = bs.get_key();
+    change.timestamp = now;
+    change.gen = gen.gen;
+    encode(change, bl);
+
+    auto be = bes->head();
+    // Failure on push is fatal if we're bypassing semaphores.
+    co_await be->push(dpp, index, now, change.key, std::move(bl));
+    co_return;
+  }
 
   mark_modified(index, bs, gen.gen);
 
@@ -639,17 +913,26 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
                     << " shard_id=" << shard_id << " now=" << now
                     << " cur_expiration=" << status->cur_expiration << dendl;
 
+
   if (now < status->cur_expiration) {
     /* no need to send, recently completed */
     sl.unlock();
-    register_renew(bs, gen);
+    auto bg = BucketGen{bs, gen.gen};
+    auto key = bg.get_key();
+    auto need_sem_set = register_renew(std::move(bg));
+    if (need_sem_set) {
+      using neorados::WriteOp;
+      using neorados::cls::sem_set::increment;
+      co_await rados->execute(get_sem_set_oid(index), loc,
+                             WriteOp{}.exec(increment(std::move(key))),
+                             asio::use_awaitable);
+    }
     co_return;
   }
 
   if (status->pending) {
     co_await status->cond.async_wait(sl, asio::use_awaitable);
     sl.unlock();
-    register_renew(bs, gen);
     co_return;
   }
 
@@ -675,7 +958,14 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
   ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
 
   auto be = bes->head();
-  co_await be->push(dpp, index, now, change.key, std::move(bl));
+  // Failure on push isn't fatal.
+  try {
+    co_await be->push(dpp, index, now, change.key, std::move(bl));
+  } catch (const std::exception& e) {
+    ldpp_dout(dpp, 5) << "RGWDataChangesLog::add_entry(): Backend push failed "
+                     << "with exception: " << e.what() << dendl;
+  }
+
 
   now = real_clock::now();
 
@@ -708,7 +998,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
     }
   } else {
     maybe_warn_about_blocking(dpp);
-    eptr = asio::co_spawn(store->get_io_context().get_executor(),
+    eptr = asio::co_spawn(rados->get_executor(),
                          add_entry(dpp, bucket_info, gen, shard_id),
                          async::use_blocked);
   }
@@ -808,7 +1098,7 @@ int RGWDataChangesLog::list_entries(
     }
   } else {
     maybe_warn_about_blocking(dpp);
-    std::tie(eptr, out) = asio::co_spawn(store->get_io_context().get_executor(),
+    std::tie(eptr, out) = asio::co_spawn(rados->get_executor(),
                                         bes->list(dpp, shard, entries,
                                                   std::string{marker}),
                                         async::use_blocked);
@@ -887,10 +1177,10 @@ int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,int max_entrie
   } else {
     maybe_warn_about_blocking(dpp);
     std::tie(eptr, out) =
-      asio::co_spawn(store->get_io_context().get_executor(),
-                     list_entries(dpp, max_entries,
-                                  RGWDataChangesLogMarker{marker}),
-                     async::use_blocked);
+      asio::co_spawn(rados->get_executor(),
+                    list_entries(dpp, max_entries,
+                                 RGWDataChangesLogMarker{marker}),
+                    async::use_blocked);
   }
   if (eptr) {
     return ceph::from_exception(eptr);
@@ -923,7 +1213,7 @@ int RGWDataChangesLog::get_info(const DoutPrefixProvider* dpp, int shard_id,
     }
   } else {
     maybe_warn_about_blocking(dpp);
-    std::tie(eptr, *info) = asio::co_spawn(store->get_io_context().get_executor(),
+    std::tie(eptr, *info) = asio::co_spawn(rados->get_executor(),
                                           be->get_info(dpp, shard_id),
                                           async::use_blocked);
   }
@@ -977,7 +1267,7 @@ int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
     }
   } else {
     maybe_warn_about_blocking(dpp);
-    eptr = asio::co_spawn(store->get_io_context().get_executor(),
+    eptr = asio::co_spawn(rados->get_executor(),
                          bes->trim_entries(dpp, shard_id, marker),
                          async::use_blocked);
   }
@@ -987,7 +1277,7 @@ int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
 int RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id,
                                    std::string_view marker,
                                    librados::AioCompletion* c) {
-  asio::co_spawn(store->get_io_context().get_executor(),
+  asio::co_spawn(rados->get_executor(),
                 bes->trim_entries(dpp, shard_id, marker),
                 c);
   return 0;
@@ -1028,7 +1318,6 @@ asio::awaitable<void> DataLogBackends::trim_generations(
   co_return;
 }
 
-
 bool RGWDataChangesLog::going_down() const
 {
   return down_flag;
@@ -1039,11 +1328,21 @@ RGWDataChangesLog::~RGWDataChangesLog() {
 }
 
 void RGWDataChangesLog::shutdown() {
+  if (down_flag) {
+    return;
+  }
   down_flag = true;
   renew_stop();
+  // Revisit this later
+  if (renew_signal)
+    renew_signal->emit(asio::cancellation_type::terminal);
+  if (recovery_signal)
+    recovery_signal->emit(asio::cancellation_type::terminal);
+  if (recovery_signal)
+    recovery_signal->emit(asio::cancellation_type::terminal);
 }
 
-asio::awaitable<void> RGWDataChangesLog::renew_run() {
+asio::awaitable<void> RGWDataChangesLog::renew_run(decltype(renew_signal)) {
   static constexpr auto runs_per_prune = 150;
   auto run = 0;
   renew_timer.emplace(co_await asio::this_coro::executor);
@@ -1128,7 +1427,8 @@ std::string RGWDataChangesLog::max_marker() const {
                   "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
 }
 
-int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y) {
+int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp,
+                                    log_type type,optional_yield y) {
   std::exception_ptr eptr;
   if (y) {
     auto& yield = y.get_yield_context();
@@ -1141,7 +1441,7 @@ int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type typ
     }
   } else {
     maybe_warn_about_blocking(dpp);
-    eptr = asio::co_spawn(store->get_io_context().get_executor(),
+    eptr = asio::co_spawn(rados->get_executor(),
                          bes->new_backing(dpp, type),
                          async::use_blocked);
   }
@@ -1164,13 +1464,175 @@ int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp,
 
   } else {
     maybe_warn_about_blocking(dpp);
-    eptr = asio::co_spawn(store->get_io_context().get_executor(),
+    eptr = asio::co_spawn(rados->get_executor(),
                          bes->trim_generations(dpp, through),
                          async::use_blocked);
   }
   return ceph::from_exception(eptr);
 }
 
+asio::awaitable<void>
+RGWDataChangesLog::read_all_sems(int index,
+                                bc::flat_map<std::string, uint64_t>* out)
+{
+  namespace sem_set = neorados::cls::sem_set;
+  std::string cursor;
+  do {
+    try {
+      co_await rados->execute(
+       get_sem_set_oid(index), loc,
+       neorados::ReadOp{}.exec(sem_set::list(sem_max_keys, cursor, out,
+                                             &cursor)),
+       nullptr, asio::use_awaitable);
+    } catch (const sys::system_error& e) {
+      if (e.code() == sys::errc::no_such_file_or_directory) {
+       break;
+      } else {
+       throw;
+      }
+    }
+  } while (!cursor.empty());
+  co_return;
+}
+
+asio::awaitable<bool>
+RGWDataChangesLog::synthesize_entries(
+  const DoutPrefixProvider* dpp,
+  int index,
+  const bc::flat_map<std::string, uint64_t>& semcount)
+{
+  const auto timestamp = real_clock::now();
+  auto be = bes->head();
+  auto push_failed = false;
+
+  RGWDataChangesBE::entries batch;
+  for (const auto& [key, sem] : semcount) {
+    try {
+      BucketGen bg{key};
+      rgw_data_change change;
+      buffer::list bl;
+      change.entity_type = ENTITY_TYPE_BUCKET;
+      change.key = bg.shard.get_key();
+      change.timestamp = timestamp;
+      change.gen = bg.gen;
+      encode(change, bl);
+      be->prepare(timestamp, change.key, std::move(bl), batch);
+    } catch (const sys::error_code& e) {
+      push_failed = true;
+      ldpp_dout(dpp, -1) << "RGWDataChangesLog::synthesize_entries(): Unable to "
+                        << "parse Bucketgen key: " << key << "Got exception: "
+                        << e.what() << dendl;
+    }
+  }
+  try {
+    co_await be->push(dpp, index, std::move(batch));
+  } catch (const std::exception& e) {
+    push_failed = true;
+    ldpp_dout(dpp, 5) << "RGWDataChangesLog::synthesize_entries(): Backend push "
+                     << "failed with exception: " << e.what() << dendl;
+  }
+  co_return !push_failed;
+}
+
+asio::awaitable<bool>
+RGWDataChangesLog::gather_working_sets(
+  const DoutPrefixProvider* dpp,
+  int index,
+  bc::flat_map<std::string, uint64_t>& semcount)
+{
+  buffer::list bl;
+  recovery_check rc = index;
+  encode(rc, bl);
+  auto [reply_map, missed_set] = co_await rados->notify(
+      get_sem_set_oid(0), loc, bl, 60s, asio::use_awaitable);
+  // If we didn't get an answer from someone, don't decrement anything.
+  if (!missed_set.empty()) {
+    ldpp_dout(dpp, 5) << "RGWDataChangesLog::gather_working_sets(): Missed responses: "
+                     << missed_set << dendl;
+    co_return false;
+  }
+  for (const auto& [source, reply] : reply_map) {
+    recovery_reply keys;
+    try {
+      decode(keys, reply);
+    } catch (const std::exception& e) {
+      ldpp_dout(dpp, -1)
+       << "RGWDataChangesLog::gather_working_sets(): Failed decoding reply from: "
+       << source << dendl;
+      co_return false;
+    }
+    for (const auto& key : keys.reply_set) {
+      auto iter = semcount.find(key);
+      if (iter == semcount.end()) {
+       continue;
+      }
+      if (iter->second == 1) {
+       semcount.erase(iter);
+      } else {
+       --(iter->second);
+      }
+    }
+  }
+  co_return true;
+}
+
+asio::awaitable<void>
+RGWDataChangesLog::decrement_sems(
+  int index,
+  bc::flat_map<std::string, uint64_t>&& semcount)
+{
+  namespace sem_set = neorados::cls::sem_set;
+  while (!semcount.empty()) {
+    bc::flat_set<std::string> batch;
+    for (auto j = 0u; j < sem_max_keys && !semcount.empty(); ++j) {
+      auto iter = std::begin(semcount);
+      batch.insert(iter->first);
+      semcount.erase(std::move(iter));
+    }
+    co_await rados->execute(
+      get_sem_set_oid(index), loc, neorados::WriteOp{}.exec(
+       sem_set::decrement(std::move(batch))),
+      asio::use_awaitable);
+  }
+}
+
+asio::awaitable<void>
+RGWDataChangesLog::recover(const DoutPrefixProvider* dpp,
+                          decltype(recovery_signal)) {
+  using neorados::WriteOp;
+  bc::flat_map<uint64_t, bc::flat_map<std::string, uint64_t>> semcounts;
+  for (int index = 0; index < num_shards; ++index) {
+    // Gather entries in the shard
+    auto& semcount = semcounts[index];
+    co_await read_all_sems(index, &semcount);
+    // If we have none, no point doing the rest
+    if (semcount.empty()) {
+      continue;
+    }
+    // Synthesize entries to push
+    auto pushed = co_await synthesize_entries(dpp, index, semcount);
+    if (!pushed) {
+      // If pushing failed, don't decrement any semaphores
+      ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Pushing shard "
+                       << index << " failed, skipping decrement" << dendl;
+      continue;
+    }
+
+    // Check with other running RGWs, make sure not to decrement
+    // anything they have in flight. This doesn't cause an issue for
+    // partial upgrades, since older versions won't be using the
+    // semaphores at all.
+    auto notified = co_await gather_working_sets(dpp, index, semcount);
+    if (!notified) {
+      ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Gathering "
+                       << "working sets for shard " << index
+                       << " failed, skipping decrement" << dendl;
+      continue;
+    }
+    co_await decrement_sems(index, std::move(semcount));
+  }
+}
+
 void RGWDataChangesLogInfo::dump(Formatter *f) const
 {
   encode_json("marker", marker, f);
index 550612844e311217ee665ea513d5ae9f0dbb25d1..a97e4f381a4d90851c7874bed8c1547214402297 100644 (file)
 #include "common/lru_map.h"
 
 #include "cls/log/cls_log_types.h"
+#include "neorados/cls/sem_set.h"
 #include "rgw_basic_types.h"
 #include "rgw_log_backing.h"
 #include "rgw_sync_policy.h"
 #include "rgw_trim_bilog.h"
 #include "rgw_zone.h"
 
+#include "common/async/spawn_group.h"
+
 namespace asio = boost::asio;
 namespace bc = boost::container;
 
@@ -334,14 +337,22 @@ inline bool operator <(const BucketGen& l, const BucketGen& r) {
 }
 
 class RGWDataChangesLog {
+  friend class DataLogTest;
   friend DataLogBackends;
   CephContext *cct;
-  rgw::sal::RadosStore* store = nullptr;
+  neorados::RADOS* rados;
   neorados::IOContext loc;
   rgw::BucketChangeObserver *observer = nullptr;
-  const RGWZone* zone;
+  bool log_data = false;
   std::unique_ptr<DataLogBackends> bes;
 
+  std::shared_ptr<asio::cancellation_signal> renew_signal =
+    std::make_shared<asio::cancellation_signal>();
+  std::shared_ptr<asio::cancellation_signal> watch_signal =
+    std::make_shared<asio::cancellation_signal>();
+  std::shared_ptr<asio::cancellation_signal> recovery_signal =
+    std::make_shared<asio::cancellation_signal>();
+
   const int num_shards;
   std::string get_prefix() { return "data_log"; }
   std::string metadata_log_oid() {
@@ -370,18 +381,19 @@ class RGWDataChangesLog {
   using ChangeStatusPtr = std::shared_ptr<ChangeStatus>;
 
   lru_map<BucketGen, ChangeStatusPtr> changes;
+  const uint64_t sem_max_keys = neorados::cls::sem_set::max_keys;
 
   bc::flat_set<BucketGen> cur_cycle;
+  std::vector<bc::flat_set<std::string>> semaphores{unsigned(num_shards)};
 
   ChangeStatusPtr _get_change(const rgw_bucket_shard& bs, uint64_t gen);
-  void register_renew(const rgw_bucket_shard& bs,
-                     const rgw::bucket_log_layout_generation& gen);
+  bool register_renew(BucketGen bg);
   void update_renewed(const rgw_bucket_shard& bs,
                      uint64_t gen,
                      ceph::real_time expiration);
 
   std::optional<asio::steady_timer> renew_timer;
-  asio::awaitable<void> renew_run();
+  asio::awaitable<void> renew_run(decltype(renew_signal) renew_signal);
   void renew_stop();
 
   std::function<bool(const rgw_bucket& bucket, optional_yield y,
@@ -391,13 +403,31 @@ class RGWDataChangesLog {
                                      const rgw_bucket& bucket) const;
   asio::awaitable<void> renew_entries(const DoutPrefixProvider *dpp);
 
+  uint64_t watchcookie = 0;
+
 public:
 
   RGWDataChangesLog(CephContext* cct);
+  // For testing.
+  RGWDataChangesLog(CephContext* cct, bool log_data,
+                   neorados::RADOS* rados);
   ~RGWDataChangesLog();
 
-  int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams,
+  asio::awaitable<void> start(const DoutPrefixProvider* dpp,
+                             const rgw_pool& log_pool,
+                             // For testing
+                             bool recovery = true,
+                             bool watch = true,
+                             bool renew = true);
+
+  int start(const DoutPrefixProvider *dpp, const RGWZone* _zone,
+           const RGWZoneParams& zoneparams,
            rgw::sal::RadosStore* store);
+  asio::awaitable<bool> establish_watch(const DoutPrefixProvider* dpp,
+                                       std::string_view oid);
+  asio::awaitable<void> process_notification(const DoutPrefixProvider* dpp,
+                                            std::string_view oid);
+  asio::awaitable<void> watch_loop(decltype(watch_signal));
   int choose_oid(const rgw_bucket_shard& bs);
   asio::awaitable<void> add_entry(const DoutPrefixProvider *dpp,
                                  const RGWBucketInfo& bucket_info,
@@ -451,13 +481,29 @@ public:
   // a marker that compares greater than any other
   std::string max_marker() const;
   std::string get_oid(uint64_t gen_id, int shard_id) const;
+  std::string get_sem_set_oid(int shard_id) const;
 
 
-  int change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y);
+  int change_format(const DoutPrefixProvider *dpp, log_type type,
+                   optional_yield y);
   int trim_generations(const DoutPrefixProvider *dpp,
                       std::optional<uint64_t>& through,
                       optional_yield y);
   void shutdown();
+  asio::awaitable<void> read_all_sems(int index,
+                                     bc::flat_map<std::string, uint64_t>* out);
+  asio::awaitable<bool>
+  synthesize_entries(const DoutPrefixProvider* dpp, int index,
+                    const bc::flat_map<std::string, uint64_t>& semcount);
+  asio::awaitable<bool>
+  gather_working_sets(const DoutPrefixProvider* dpp,
+                     int index,
+                     bc::flat_map<std::string, uint64_t>& semcount);
+  asio::awaitable<void>
+  decrement_sems(int index,
+                bc::flat_map<std::string, uint64_t>&& semcount);
+  asio::awaitable<void> recover(const DoutPrefixProvider* dpp,
+                               decltype(recovery_signal));
 };
 
 class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {