From: Yuval Lifshitz Date: Wed, 18 Jun 2025 12:09:12 +0000 (+0000) Subject: rgw/notifications: stop processing when we reach a skipped notifications X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d732db6e2b66ff115722d8b05a3a878a4cbb645a;p=ceph.git rgw/notifications: stop processing when we reach a skipped notifications if a notification retry should be skipped, we should stop processing all notifications. if we successfully processing another notification it will not be removed (as we will remove only up to the marker of the skipped notification). as a result, the successfull notification will be processed again. Fixes: https://tracker.ceph.com/issues/70756 Signed-off-by: Yuval Lifshitz --- diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 77bba50d81a..5170fa27de5 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -462,21 +462,20 @@ private: continue; } is_idle = false; - auto has_error = false; + auto stop_processing = false; auto remove_entries = false; auto entry_idx = 1U; tokens_waiter waiter(io_context); std::vector needs_migration_vector(entries.size(), false); for (auto& entry : entries) { - if (has_error) { - // bail out on first error + if (stop_processing) { break; } entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name]; boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(), [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, - total_entries, &end_marker, &remove_entries, &has_error, + total_entries, &end_marker, &remove_entries, &stop_processing, token = waiter.make_token(), &entry, &needs_migration_vector, push_endpoint = push_endpoint.get(), &topic_info](boost::asio::yield_context yield) { @@ -492,16 +491,21 @@ private: remove_entries = true; needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating); notifs_persistency_tracker.erase(entry.marker); - } else { - if (set_min_marker(end_marker, entry.marker) < 0) { - ldpp_dout(this, 1) << "ERROR: cannot determine minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl; - } else { - ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl; - } - has_error = (result == EntryProcessingResult::Failure); - ldpp_dout(this, 20) << "INFO: processing of entry: " << + return; + } + if (set_min_marker(end_marker, entry.marker) < 0) { + ldpp_dout(this, 1) << "ERROR: cannot determine minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl; + } else { + ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl; + } + if (result == EntryProcessingResult::Sleeping) { + ldpp_dout(this, 20) << "INFO: skip processing of entry: " << entry.marker + << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << dendl; + } else { + ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl; - } + } + stop_processing = true; }, [] (std::exception_ptr eptr) { if (eptr) std::rethrow_exception(eptr); });