#include "common/dout.h"
#include <chrono>
-#define dout_subsys ceph_subsys_rgw
+#define dout_subsys ceph_subsys_rgw_notification
namespace rgw::notify {
};
WRITE_CLASS_ENCODER(event_entry_t)
+static inline std::ostream& operator<<(std::ostream& out,
+ const event_entry_t& e) {
+ return out << "notification id: '" << e.event.configurationId
+ << "', topic: '" << e.arn_topic
+ << "', endpoint: '" << e.push_endpoint
+ << "', bucket_owner: '" << e.event.bucket_ownerIdentity
+ << "', bucket: '" << e.event.bucket_name
+ << "', object: '" << e.event.object_key
+ << "', event type: '" << e.event.eventName << "'";
+}
struct persistency_tracker {
ceph::coarse_real_time last_retry_time {ceph::coarse_real_clock::zero()};
if ( (topic_persistency_ttl != 0 && event_entry.creation_time != ceph::coarse_real_clock::zero() &&
time_now - event_entry.creation_time > std::chrono::seconds(topic_persistency_ttl))
|| ( topic_persistency_max_retries != 0 && entry_persistency_tracker.retires_num > topic_persistency_max_retries) ) {
- ldpp_dout(this, 1) << "Expiring entry for topic= "
- << event_entry.arn_topic << " bucket_owner= "
- << event_entry.event.bucket_ownerIdentity
- << " bucket= " << event_entry.event.bucket_name
- << " object_name= " << event_entry.event.object_key
- << " entry retry_number="
+ ldpp_dout(this, 1) << "WARNING: Expiring entry marker: " << entry.marker
+ << " for event with " << event_entry
+ << " entry retry_number: "
<< entry_persistency_tracker.retires_num
- << " creation_time=" << event_entry.creation_time
- << " time_now=" << time_now << dendl;
+ << " creation_time: " << event_entry.creation_time
+ << " time_now: " << time_now << dendl;
return EntryProcessingResult::Expired;
}
if (time_now - entry_persistency_tracker.last_retry_time < std::chrono::seconds(topic_persistency_sleep_duration) ) {
++entry_persistency_tracker.retires_num;
entry_persistency_tracker.last_retry_time = time_now;
- ldpp_dout(this, 20) << "Processing entry retry_number=" << entry_persistency_tracker.retires_num << " time=" << dendl;
+ ldpp_dout(this, 20) << "Processing event entry with " << event_entry
+ << " retry_number: "
+ << entry_persistency_tracker.retires_num
+ << " current time: " << time_now << dendl;
try {
// TODO move endpoint creation to queue level
const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
" for entry: " << entry.marker << dendl;
const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield));
if (ret < 0) {
- ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
- << " failed. error: " << ret << " (will retry)" << dendl;
+ ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker
+ << " failed. error: " << ret
+ << " (will retry) for event with " << event_entry
+ << dendl;
return EntryProcessingResult::Failure;
} else {
- ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
- << " ok" << dendl;
+ ldpp_dout(this, 5) << "INFO: push entry marker: " << entry.marker
+ << " ok for event with " << event_entry << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
return EntryProcessingResult::Successful;
}
auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield));
if (ret == -ENOENT) {
// queue was deleted
- ldpp_dout(this, 5) << "INFO: queue: "
- << queue_name << ". was removed. cleanup will stop" << dendl;
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name
+ << ". was removed. cleanup will stop" << dendl;
return;
}
if (ret == -EBUSY) {
- ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
+ ldpp_dout(this, 10)
+ << "WARNING: queue: " << queue_name
+ << " ownership moved to another daemon. processing will stop"
+ << dendl;
return;
}
if (ret < 0) {
if (ret == -ENOENT) {
// queue was deleted
topics_persistency_tracker.erase(queue_name);
- ldpp_dout(this, 5) << "INFO: queue: "
- << queue_name << ". was removed. processing will stop" << dendl;
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name
+ << ". was removed. processing will stop" << dendl;
return;
}
if (ret == -EBUSY) {
topics_persistency_tracker.erase(queue_name);
- ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
+ ldpp_dout(this, 10)
+ << "WARNING: queue: " << queue_name
+ << " ownership moved to another daemon. processing will stop"
+ << dendl;
return;
}
if (ret < 0) {
auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
if (ret == -ENOENT) {
// queue was deleted
- ldpp_dout(this, 5) << "INFO: queue: " << queue_name << ". was removed. processing will stop" << dendl;
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name
+ << ". was removed. processing will stop" << dendl;
return;
}
if (ret == -EBUSY) {
- ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
+ ldpp_dout(this, 10)
+ << "WARNING: queue: " << queue_name
+ << " ownership moved to another daemon. processing will stop"
+ << dendl;
return;
}
if (ret < 0) {
dpp->get_cct(), event_entry.event, res.yield);
if (ret < 0) {
ldpp_dout(dpp, 1)
- << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint
- << " bucket: " << event_entry.event.bucket_name
- << " bucket_owner: " << event_entry.event.bucket_ownerIdentity
- << " object_name: " << event_entry.event.object_key
- << " failed. error: " << ret << dendl;
+ << "ERROR: failed to push sync notification event with error: "
+ << ret << " for event with " << event_entry << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return ret;
}
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
} catch (const RGWPubSubEndpoint::configuration_error& e) {
- ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: "
- << topic.cfg.dest.push_endpoint
- << " bucket: " << event_entry.event.bucket_name
- << " bucket_owner: "
- << event_entry.event.bucket_ownerIdentity
- << " object_name: " << event_entry.event.object_key
- << ". error: " << e.what() << dendl;
+ ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint for sync "
+ "notification event with error: "
+ << e.what() << " event with " << event_entry << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return -EINVAL;
}