std::string push_endpoint;
std::string push_endpoint_args;
std::string arn_topic;
+ ceph::coarse_real_time creation_time;
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(event, bl);
encode(push_endpoint, bl);
encode(push_endpoint_args, bl);
encode(arn_topic, bl);
+ encode(creation_time, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(event, bl);
decode(push_endpoint, bl);
decode(push_endpoint_args, bl);
decode(arn_topic, bl);
+ if (struct_v > 1) {
+ decode(creation_time, bl);
+ } else {
+ creation_time = ceph::coarse_real_clock::zero();
+ }
DECODE_FINISH(bl);
}
};
WRITE_CLASS_ENCODER(event_entry_t)
+
+struct persistency_tracker {
+ ceph::coarse_real_time last_retry_time {ceph::coarse_real_clock::zero()};
+ uint64_t retires_num {0};
+};
+
using queues_t = std::set<std::string>;
+using entries_persistency_tracker = ceph::unordered_map<std::string, persistency_tracker>;
+using queues_persistency_tracker = ceph::unordered_map<std::string, entries_persistency_tracker>;
// use mmap/mprotect to allocate 128k coroutine stacks
auto make_stack_allocator() {
std::vector<std::thread> workers;
const uint32_t stale_reservations_period_s;
const uint32_t reservations_cleanup_period_s;
+ queues_persistency_tracker topics_persistency_tracker;
public:
librados::IoCtx& rados_ioctx;
+
private:
CephContext *get_cct() const override { return cct; }
}
};
+ enum class EntryProcessingResult {
+ Failure, Successful, Sleeping, Expired
+ };
+
// processing of a specific entry
// return whether processing was successfull (true) or not (false)
- bool process_entry(const cls_queue_entry& entry, yield_context yield) {
+ EntryProcessingResult process_entry(const ConfigProxy& conf, persistency_tracker& entry_persistency_tracker,
+ const cls_queue_entry& entry, yield_context yield) {
event_entry_t event_entry;
auto iter = entry.data.cbegin();
try {
decode(event_entry, iter);
} catch (buffer::error& err) {
ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
- return false;
+ return EntryProcessingResult::Failure;
+ }
+
+ const auto topic_persistency_ttl = conf->rgw_topic_persistency_time_to_live;
+ const auto topic_persistency_max_retries = conf->rgw_topic_persistency_max_retries;
+ const auto time_now = ceph::coarse_real_clock::now();
+ if ( (topic_persistency_ttl != 0 && event_entry.creation_time != ceph::coarse_real_clock::zero() &&
+ time_now - event_entry.creation_time > std::chrono::seconds(topic_persistency_ttl))
+ || ( topic_persistency_max_retries != 0 && entry_persistency_tracker.retires_num > topic_persistency_max_retries) ) {
+ ldpp_dout(this, 20) << "Expiring entry retry_number=" << entry_persistency_tracker.retires_num << " creation_time="
+ << event_entry.creation_time << " time_now:" << time_now << dendl;
+ return EntryProcessingResult::Expired;
+ }
+ if (time_now - entry_persistency_tracker.last_retry_time
+ < std::chrono::seconds(conf->rgw_topic_persistency_sleep_duration) ) {
+ return EntryProcessingResult::Sleeping;
}
+ // TODO: write back the entry with creation time as if now
+ // if event_entry.creation_time == zero
+
+ ++entry_persistency_tracker.retires_num;
+ entry_persistency_tracker.last_retry_time = time_now;
+ ldpp_dout(this, 20) << "Processing entry retry_number=" << entry_persistency_tracker.retires_num << " time=" << dendl;
try {
// TODO move endpoint creation to queue level
const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
<< " failed. error: " << ret << " (will retry)" << dendl;
- return false;
+ return EntryProcessingResult::Failure;
} else {
ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
<< " ok" << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
- return true;
+ return EntryProcessingResult::Successful;
}
} catch (const RGWPubSubEndpoint::configuration_error& e) {
ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: "
<< event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
- return false;
+ return EntryProcessingResult::Failure;
}
}
auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
if (ret == -ENOENT) {
// queue was deleted
+ topics_persistency_tracker.erase(queue_name);
ldpp_dout(this, 5) << "INFO: queue: "
<< queue_name << ". was removed. processing will stop" << dendl;
return;
}
if (ret == -EBUSY) {
+ topics_persistency_tracker.erase(queue_name);
ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
return;
}
// bail out on first error
break;
}
- // TODO pass entry pointer instead of by-value
- spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &entries_to_remove, &has_error, &waiter, entry](yield_context yield) {
+
+ entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
+ spawn::spawn(yield, [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries,
+ &entries_to_remove, &has_error, &waiter, &entry](yield_context yield) {
const auto token = waiter.make_token();
- if (process_entry(entry, yield)) {
+ auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
+ auto result = process_entry(this->get_cct()->_conf, persistency_tracker, entry, yield);
+ if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired) {
ldpp_dout(this, 20) << "INFO: processing of entry: " <<
- entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl;
+ entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name
+ << (result == EntryProcessingResult::Successful? " ok": " expired") << dendl;
remove_entries = true;
++entries_to_remove;
+ notifs_persistency_tracker.erase(entry.marker);
} else {
if (set_min_marker(end_marker, entry.marker) < 0) {
ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
{
std::lock_guard lock_guard(queue_gc_lock);
std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) {
+ topics_persistency_tracker.erase(queue_name);
owned_queues.erase(queue_name);
ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
});
event_entry.push_endpoint_args =
std::move(topic.cfg.dest.push_endpoint_args);
event_entry.arn_topic = topic.cfg.dest.arn_topic;
+ event_entry.creation_time = ceph::coarse_real_clock::now();
bufferlist bl;
encode(event_entry, bl);
const auto& queue_name = topic.cfg.dest.arn_topic;