]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: stop processing when we reach a skipped notifications
authorYuval Lifshitz <ylifshit@ibm.com>
Wed, 18 Jun 2025 12:09:12 +0000 (12:09 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Wed, 25 Jun 2025 15:09:20 +0000 (15:09 +0000)
if a notification retry should be skipped, we should stop processing
all notifications. if we successfully processing another notification
it will not be removed (as we will remove only up to the marker of the
skipped notification). as a result, the successfull notification will be
processed again.

Fixes: https://tracker.ceph.com/issues/70756
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/rgw/driver/rados/rgw_notify.cc

index 77bba50d81a2f10685bb45bd8da77fbe0fe2cde3..5170fa27de539966df27ba8049a468ee31bf835f 100644 (file)
@@ -462,21 +462,20 @@ private:
         continue;
       }
       is_idle = false;
-      auto has_error = false;
+      auto stop_processing = false;
       auto remove_entries = false;
       auto entry_idx = 1U;
       tokens_waiter waiter(io_context);
       std::vector<bool> needs_migration_vector(entries.size(), false);
       for (auto& entry : entries) {
-        if (has_error) {
-          // bail out on first error
+        if (stop_processing) {
           break;
         }
 
         entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
         boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(),
           [this, &notifs_persistency_tracker, &queue_name, entry_idx,
-           total_entries, &end_marker, &remove_entries, &has_error,
+           total_entries, &end_marker, &remove_entries, &stop_processing,
            token = waiter.make_token(), &entry, &needs_migration_vector,
            push_endpoint = push_endpoint.get(),
            &topic_info](boost::asio::yield_context yield) {
@@ -492,16 +491,21 @@ private:
               remove_entries = true;
               needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating);
               notifs_persistency_tracker.erase(entry.marker);
-            }  else {
-              if (set_min_marker(end_marker, entry.marker) < 0) {
-                ldpp_dout(this, 1) << "ERROR: cannot determine minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
-              } else {
-                ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
-              }
-              has_error = (result == EntryProcessingResult::Failure);
-              ldpp_dout(this, 20) << "INFO: processing of entry: " << 
+              return;
+            }
+            if (set_min_marker(end_marker, entry.marker) < 0) {
+              ldpp_dout(this, 1) << "ERROR: cannot determine minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
+            } else {
+              ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
+            }
+            if (result == EntryProcessingResult::Sleeping) {
+              ldpp_dout(this, 20) << "INFO: skip processing of entry: " << entry.marker
+                << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << dendl;
+            } else {
+              ldpp_dout(this, 20) << "INFO: processing of entry: " <<
                 entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
-            } 
+            }
+            stop_processing = true;
         }, [] (std::exception_ptr eptr) {
           if (eptr) std::rethrow_exception(eptr);
         });