const cls_queue_entry& entry,
RGWPubSubEndpoint* const push_endpoint,
const rgw_pubsub_topic& topic,
- boost::asio::yield_context yield) {
+ boost::asio::yield_context yield,
+ int& ret) {
+ ret = 0;
event_entry_t event_entry;
auto iter = entry.data.cbegin();
try {
<< " retry_number: "
<< entry_persistency_tracker.retires_num
<< " current time: " << time_now << dendl;
- const auto ret = push_endpoint->send(this, event_entry.event, yield);
+ ret = push_endpoint->send(this, event_entry.event, yield);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker
<< " failed. error: " << ret
<< " (will retry sending events) " << dendl;
continue;
}
- is_idle = false;
auto stop_processing = false;
auto remove_entries = false;
auto entry_idx = 1U;
entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(),
- [this, ¬ifs_persistency_tracker, &queue_name, entry_idx,
+ [this, &is_idle, ¬ifs_persistency_tracker, &queue_name, entry_idx,
total_entries, &end_marker, &remove_entries, &stop_processing,
token = waiter.make_token(), &entry, &needs_migration_vector,
push_endpoint = push_endpoint.get(),
&topic_info](boost::asio::yield_context yield) {
auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
+ int result_code;
auto result =
process_entry(this->get_cct()->_conf, persistency_tracker,
- entry, push_endpoint, topic_info, yield);
+ entry, push_endpoint, topic_info, yield, result_code);
if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired
|| result == EntryProcessingResult::Migrating) {
ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker
remove_entries = true;
needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating);
notifs_persistency_tracker.erase(entry.marker);
+ is_idle = false;
return;
}
if (set_min_marker(end_marker, entry.marker) < 0) {
ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
}
if (result == EntryProcessingResult::Sleeping) {
- ldpp_dout(this, 20) << "INFO: skip processing of entry: " << entry.marker
+ ldpp_dout(this, 20) << "INFO: skipped processing of entry: " << entry.marker
<< " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << dendl;
} else {
- ldpp_dout(this, 20) << "INFO: processing of entry: " <<
- entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
+ is_idle = (result_code == -EBUSY);
+ ldpp_dout(this, 20) << "INFO: failed processing of entry: " <<
+ entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name <<
+ " result code: " << cpp_strerror(-result_code) << dendl;
}
stop_processing = true;
}, [] (std::exception_ptr eptr) {