]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: make queue idle when all notifications are in "sleep" state
authorYuval Lifshitz <ylifshit@ibm.com>
Wed, 18 Jun 2025 14:56:01 +0000 (14:56 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Wed, 25 Jun 2025 15:09:20 +0000 (15:09 +0000)
this will prevent re-reading the queue when there is no work to do
also, put into "idle" state in case of failure with -EBUSY error code

Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/rgw/driver/rados/rgw_notify.cc

index 5170fa27de539966df27ba8049a468ee31bf835f..dec13d7630c78c8eb0c2510cca2cebe1695cd144 100644 (file)
@@ -205,7 +205,9 @@ private:
       const cls_queue_entry& entry,
       RGWPubSubEndpoint* const push_endpoint,
       const rgw_pubsub_topic& topic,
-      boost::asio::yield_context yield) {
+      boost::asio::yield_context yield,
+      int& ret) {
+    ret = 0;
     event_entry_t event_entry;
     auto iter = entry.data.cbegin();
     try {
@@ -252,7 +254,7 @@ private:
                         << " retry_number: "
                         << entry_persistency_tracker.retires_num
                         << " current time: " << time_now << dendl;
-    const auto ret = push_endpoint->send(this, event_entry.event, yield);
+    ret = push_endpoint->send(this, event_entry.event, yield);
     if (ret < 0) {
       ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker
                          << " failed. error: " << ret
@@ -461,7 +463,6 @@ private:
                            << " (will retry sending events) " << dendl;
         continue;
       }
-      is_idle = false;
       auto stop_processing = false;
       auto remove_entries = false;
       auto entry_idx = 1U;
@@ -474,15 +475,16 @@ private:
 
         entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
         boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(),
-          [this, &notifs_persistency_tracker, &queue_name, entry_idx,
+          [this, &is_idle, &notifs_persistency_tracker, &queue_name, entry_idx,
            total_entries, &end_marker, &remove_entries, &stop_processing,
            token = waiter.make_token(), &entry, &needs_migration_vector,
            push_endpoint = push_endpoint.get(),
            &topic_info](boost::asio::yield_context yield) {
             auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
+            int result_code;
             auto result =
                 process_entry(this->get_cct()->_conf, persistency_tracker,
-                              entry, push_endpoint, topic_info, yield);
+                              entry, push_endpoint, topic_info, yield, result_code);
             if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired
                 || result == EntryProcessingResult::Migrating) {
               ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker
@@ -491,6 +493,7 @@ private:
               remove_entries = true;
               needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating);
               notifs_persistency_tracker.erase(entry.marker);
+              is_idle = false;
               return;
             }
             if (set_min_marker(end_marker, entry.marker) < 0) {
@@ -499,11 +502,13 @@ private:
               ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
             }
             if (result == EntryProcessingResult::Sleeping) {
-              ldpp_dout(this, 20) << "INFO: skip processing of entry: " << entry.marker
+              ldpp_dout(this, 20) << "INFO: skipped processing of entry: " << entry.marker
                 << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << dendl;
             } else {
-              ldpp_dout(this, 20) << "INFO: processing of entry: " <<
-                entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
+              is_idle = (result_code == -EBUSY);
+              ldpp_dout(this, 20) << "INFO: failed processing of entry: " <<
+                entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name <<
+                " result code: " << cpp_strerror(-result_code) << dendl;
             }
             stop_processing = true;
         }, [] (std::exception_ptr eptr) {