is_idle = false;
auto has_error = false;
auto remove_entries = false;
- uint64_t entries_to_remove = 0;
auto entry_idx = 1U;
tokens_waiter waiter(io_context);
for (auto& entry : entries) {
}
entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
- spawn::spawn(yield, [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries,
- &entries_to_remove, &has_error, &waiter, &entry](yield_context yield) {
+ spawn::spawn(yield, [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker,
+ &remove_entries, &has_error, &waiter, &entry](yield_context yield) {
const auto token = waiter.make_token();
auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
auto result = process_entry(this->get_cct()->_conf, persistency_tracker, entry, yield);
entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name
<< (result == EntryProcessingResult::Successful? " ok": " expired") << dendl;
remove_entries = true;
- ++entries_to_remove;
notifs_persistency_tracker.erase(entry.marker);
} else {
if (set_min_marker(end_marker, entry.marker) < 0) {
// delete all published entries from queue
if (remove_entries) {
+ uint64_t entries_to_remove = 0;
+ for (const auto& entry: entries) {
+ if (end_marker == entry.marker) {
+ break;
+ }
+ entries_to_remove++;
+ }
+
librados::ObjectWriteOperation op;
op.assert_exists();
rados::cls::lock::assert_locked(&op, queue_name+"_lock",
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
- delay = 20
+ delay = 30
time.sleep(delay)
http_server.close()
assert_equal(result[1], 0)
# create objects in the bucket (async)
- number_of_objects = 10
+ number_of_objects = 100
client_threads = []
start_time = time.time()
for i in range(number_of_objects):