continue;
}
is_idle = false;
- auto has_error = false;
+ auto stop_processing = false;
auto remove_entries = false;
auto entry_idx = 1U;
tokens_waiter waiter(io_context);
std::vector<bool> needs_migration_vector(entries.size(), false);
for (auto& entry : entries) {
- if (has_error) {
- // bail out on first error
+ if (stop_processing) {
break;
}
entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(),
[this, ¬ifs_persistency_tracker, &queue_name, entry_idx,
- total_entries, &end_marker, &remove_entries, &has_error,
+ total_entries, &end_marker, &remove_entries, &stop_processing,
token = waiter.make_token(), &entry, &needs_migration_vector,
push_endpoint = push_endpoint.get(),
&topic_info](boost::asio::yield_context yield) {
remove_entries = true;
needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating);
notifs_persistency_tracker.erase(entry.marker);
- } else {
- if (set_min_marker(end_marker, entry.marker) < 0) {
- ldpp_dout(this, 1) << "ERROR: cannot determine minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
- } else {
- ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
- }
- has_error = (result == EntryProcessingResult::Failure);
- ldpp_dout(this, 20) << "INFO: processing of entry: " <<
+ return;
+ }
+ if (set_min_marker(end_marker, entry.marker) < 0) {
+ ldpp_dout(this, 1) << "ERROR: cannot determine minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
+ } else {
+ ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
+ }
+ if (result == EntryProcessingResult::Sleeping) {
+ ldpp_dout(this, 20) << "INFO: skip processing of entry: " << entry.marker
+ << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << dendl;
+ } else {
+ ldpp_dout(this, 20) << "INFO: processing of entry: " <<
entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
- }
+ }
+ stop_processing = true;
}, [] (std::exception_ptr eptr) {
if (eptr) std::rethrow_exception(eptr);
});