From: Yuval Lifshitz Date: Wed, 18 Jun 2025 14:56:01 +0000 (+0000) Subject: rgw/notifications: make queue idle when all notifications are in "sleep" state X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bbab0e6181a56a1083973a6697bdfe97dbe8cb3b;p=ceph.git rgw/notifications: make queue idle when all notifications are in "sleep" state this will prevent re-reading the queue when there is no work to do also, put into "idle" state in case of failure with -EBUSY error code Signed-off-by: Yuval Lifshitz --- diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 5170fa27de53..dec13d7630c7 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -205,7 +205,9 @@ private: const cls_queue_entry& entry, RGWPubSubEndpoint* const push_endpoint, const rgw_pubsub_topic& topic, - boost::asio::yield_context yield) { + boost::asio::yield_context yield, + int& ret) { + ret = 0; event_entry_t event_entry; auto iter = entry.data.cbegin(); try { @@ -252,7 +254,7 @@ private: << " retry_number: " << entry_persistency_tracker.retires_num << " current time: " << time_now << dendl; - const auto ret = push_endpoint->send(this, event_entry.event, yield); + ret = push_endpoint->send(this, event_entry.event, yield); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker << " failed. error: " << ret @@ -461,7 +463,6 @@ private: << " (will retry sending events) " << dendl; continue; } - is_idle = false; auto stop_processing = false; auto remove_entries = false; auto entry_idx = 1U; @@ -474,15 +475,16 @@ private: entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name]; boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(), - [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, + [this, &is_idle, ¬ifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &stop_processing, token = waiter.make_token(), &entry, &needs_migration_vector, push_endpoint = push_endpoint.get(), &topic_info](boost::asio::yield_context yield) { auto& persistency_tracker = notifs_persistency_tracker[entry.marker]; + int result_code; auto result = process_entry(this->get_cct()->_conf, persistency_tracker, - entry, push_endpoint, topic_info, yield); + entry, push_endpoint, topic_info, yield, result_code); if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired || result == EntryProcessingResult::Migrating) { ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker @@ -491,6 +493,7 @@ private: remove_entries = true; needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating); notifs_persistency_tracker.erase(entry.marker); + is_idle = false; return; } if (set_min_marker(end_marker, entry.marker) < 0) { @@ -499,11 +502,13 @@ private: ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl; } if (result == EntryProcessingResult::Sleeping) { - ldpp_dout(this, 20) << "INFO: skip processing of entry: " << entry.marker + ldpp_dout(this, 20) << "INFO: skipped processing of entry: " << entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << dendl; } else { - ldpp_dout(this, 20) << "INFO: processing of entry: " << - entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl; + is_idle = (result_code == -EBUSY); + ldpp_dout(this, 20) << "INFO: failed processing of entry: " << + entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << + " result code: " << cpp_strerror(-result_code) << dendl; } stop_processing = true; }, [] (std::exception_ptr eptr) {