]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/notifications: replace timer waiter with async waiter
authorYuval Lifshitz <ylifshit@ibm.com>
Tue, 17 Jun 2025 12:51:47 +0000 (12:51 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Thu, 3 Jul 2025 15:46:48 +0000 (15:46 +0000)
this should allow for proper shutdown of the queue handling
code of persistent notifications.

Fixes: https://tracker.ceph.com/issues/71963
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/rgw/driver/rados/rgw_notify.cc

index dd735ebf005cc01240d250659219f4aebf0a6cf8..695e5af6c119215e665fd484e5e6a4ff55edb9e4 100644 (file)
@@ -23,6 +23,7 @@
 #include <chrono>
 #include <fmt/format.h>
 #include "librados/AioCompletionImpl.h"
+#include "common/async/yield_waiter.h"
 
 #include <unordered_map>
 
@@ -143,53 +144,49 @@ private:
     return 0;
   }
 
-  using Clock = ceph::coarse_mono_clock;
-  using Executor = boost::asio::io_context::executor_type;
-  using Timer = boost::asio::basic_waitable_timer<Clock,
-        boost::asio::wait_traits<Clock>, Executor>;
-
   class tokens_waiter {
-    const std::chrono::hours infinite_duration;
-    size_t pending_tokens;
-    Timer timer;
-    struct token {
-      tokens_waiter& waiter;
-      token(const token& other) : waiter(other.waiter) {
-        ++waiter.pending_tokens;
+    size_t pending_tokens = 0;
+    DoutPrefixProvider* const dpp;
+    ceph::async::yield_waiter<void> waiter;
+
+  public:
+    class token{
+      tokens_waiter* tw;
+    public:
+      token(const token& other) = delete;
+      token(token&& other) : tw(other.tw) {
+        other.tw = nullptr; // mark as moved
       }
-      token(tokens_waiter& _waiter) : waiter(_waiter) {
-        ++waiter.pending_tokens;
+      token& operator=(const token& other) = delete;
+      token(tokens_waiter* _tw) : tw(_tw) {
+        ++tw->pending_tokens;
       }
-      
+
       ~token() {
-        --waiter.pending_tokens;
-        if (waiter.pending_tokens == 0) {
-          waiter.timer.cancel();
-        }   
-      }   
+        if (!tw) {
+          return; // already moved
+        }
+        --tw->pending_tokens;
+        if (tw->pending_tokens == 0 && tw->waiter) {
+          tw->waiter.complete(boost::system::error_code{});
+        }
+      }
     };
-  
-  public:
 
-    tokens_waiter(boost::asio::io_context& io_context) :
-      infinite_duration(1000),
-      pending_tokens(0),
-      timer(io_context) {}  
+    tokens_waiter(DoutPrefixProvider* _dpp) : dpp(_dpp) {}
+    tokens_waiter(const tokens_waiter& other) = delete;
+    tokens_waiter& operator=(const tokens_waiter& other) = delete;
+
     void async_wait(boost::asio::yield_context yield) {
       if (pending_tokens == 0) {
         return;
       }
-      timer.expires_after(infinite_duration);
-      boost::system::error_code ec; 
-      timer.async_wait(yield[ec]);
-      ceph_assert(ec == boost::system::errc::operation_canceled);
-    }   
-    token make_token() {    
-      return token(*this);
-    }   
+      ldpp_dout(dpp, 20) << "INFO: tokens waiter is waiting on " <<
+        pending_tokens << " tokens" << dendl;
+      boost::system::error_code ec;
+      waiter.async_wait(yield[ec]);
+      ldpp_dout(dpp, 20) << "INFO: tokens waiter finished waiting for all tokens" << dendl;
+    }
   };
 
   enum class EntryProcessingResult {
@@ -270,6 +267,11 @@ private:
     return EntryProcessingResult::Successful;
   }
 
+  using Clock = ceph::coarse_mono_clock;
+  using Executor = boost::asio::io_context::executor_type;
+  using Timer = boost::asio::basic_waitable_timer<Clock,
+        boost::asio::wait_traits<Clock>, Executor>;
+
   // clean stale reservation from queue
   void cleanup_queue(const std::string& queue_name, boost::asio::yield_context yield) {
     while (!shutdown) {
@@ -467,7 +469,7 @@ private:
       auto stop_processing = false;
       auto remove_entries = false;
       auto entry_idx = 1U;
-      tokens_waiter waiter(io_context);
+      tokens_waiter tw(this);
       std::vector<bool> needs_migration_vector(entries.size(), false);
       for (auto& entry : entries) {
         if (stop_processing) {
@@ -475,10 +477,11 @@ private:
         }
 
         entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
+        tokens_waiter::token token(&tw);
         boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(),
           [this, &is_idle, &notifs_persistency_tracker, &queue_name, entry_idx,
            total_entries, &end_marker, &remove_entries, &stop_processing,
-           token = waiter.make_token(), &entry, &needs_migration_vector,
+           token = std::move(token), &entry, &needs_migration_vector,
            push_endpoint = push_endpoint.get(),
            &topic_info](boost::asio::yield_context yield) {
             auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
@@ -518,8 +521,10 @@ private:
         ++entry_idx;
       }
 
-      // wait for all pending work to finish
-      waiter.async_wait(yield);
+      if (!entries.empty()) {
+        // wait for all pending work to finish
+        tw.async_wait(yield);
+      }
 
       // delete all published entries from queue
       if (remove_entries) {