#include "common/dout.h"
#include <chrono>
-#define dout_subsys ceph_subsys_rgw
+#define dout_subsys ceph_subsys_rgw_notification
namespace rgw::notify {
+
+static inline std::ostream& operator<<(std::ostream& out,
+ const event_entry_t& e) {
+ return out << "notification id: '" << e.event.configurationId
+ << "', topic: '" << e.arn_topic
+ << "', endpoint: '" << e.push_endpoint
+ << "', bucket_owner: '" << e.event.bucket_ownerIdentity
+ << "', bucket: '" << e.event.bucket_name
+ << "', object: '" << e.event.object_key
+ << "', event type: '" << e.event.eventName << "'";
+}
struct persistency_tracker {
ceph::coarse_real_time last_retry_time {ceph::coarse_real_clock::zero()};
}
};
-static inline std::ostream& operator<<(std::ostream& out,
- const event_entry_t& e) {
- return out << "notification id: '" << e.event.configurationId
- << "', topic: '" << e.arn_topic
- << "', endpoint: '" << e.push_endpoint
- << "', bucket_owner: '" << e.event.bucket_ownerIdentity
- << "', bucket: '" << e.event.bucket_name
- << "', object: '" << e.event.object_key
- << "', event type: '" << e.event.eventName << "'";
-}
-
-
class Manager : public DoutPrefixProvider {
bool shutdown = false;
const uint32_t queues_update_period_ms;
if ( (topic_persistency_ttl != 0 && event_entry.creation_time != ceph::coarse_real_clock::zero() &&
time_now - event_entry.creation_time > std::chrono::seconds(topic_persistency_ttl))
|| ( topic_persistency_max_retries != 0 && entry_persistency_tracker.retires_num > topic_persistency_max_retries) ) {
- ldpp_dout(this, 1) << "Expiring entry for topic= "
- << event_entry.arn_topic << " bucket_owner= "
- << event_entry.event.bucket_ownerIdentity
- << " bucket= " << event_entry.event.bucket_name
- << " object_name= " << event_entry.event.object_key
- << " entry retry_number="
+ ldpp_dout(this, 1) << "WARNING: Expiring entry marker: " << entry.marker
+ << " for event with " << event_entry
+ << " entry retry_number: "
<< entry_persistency_tracker.retires_num
- << " creation_time=" << event_entry.creation_time
- << " time_now=" << time_now << dendl;
+ << " creation_time: " << event_entry.creation_time
+ << " time_now: " << time_now << dendl;
return EntryProcessingResult::Expired;
}
if (time_now - entry_persistency_tracker.last_retry_time < std::chrono::seconds(topic_persistency_sleep_duration) ) {
auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, yield);
if (ret == -ENOENT) {
// queue was deleted
- ldpp_dout(this, 5) << "INFO: queue: "
- << queue_name << ". was removed. cleanup will stop" << dendl;
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name
+ << ". was removed. cleanup will stop" << dendl;
return;
}
if (ret == -EBUSY) {
- ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
+ ldpp_dout(this, 10)
+ << "WARNING: queue: " << queue_name
+ << " ownership moved to another daemon. processing will stop"
+ << dendl;
return;
}
if (ret < 0) {
if (ret == -ENOENT) {
// queue was deleted
topics_persistency_tracker.erase(queue_name);
- ldpp_dout(this, 5) << "INFO: queue: "
- << queue_name << ". was removed. processing will stop" << dendl;
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name
+ << ". was removed. processing will stop" << dendl;
return;
}
if (ret == -EBUSY) {
topics_persistency_tracker.erase(queue_name);
- ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
+ ldpp_dout(this, 10)
+ << "WARNING: queue: " << queue_name
+ << " ownership moved to another daemon. processing will stop"
+ << dendl;
return;
}
if (ret < 0) {
auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield);
if (ret == -ENOENT) {
// queue was deleted
- ldpp_dout(this, 5) << "INFO: queue: " << queue_name << ". was removed. processing will stop" << dendl;
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name
+ << ". was removed. processing will stop" << dendl;
return;
}
if (ret == -EBUSY) {
- ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
+ ldpp_dout(this, 10)
+ << "WARNING: queue: " << queue_name
+ << " ownership moved to another daemon. processing will stop"
+ << dendl;
return;
}
if (ret < 0) {
const auto ret = push_endpoint->send(event_entry.event, res.yield);
if (ret < 0) {
ldpp_dout(dpp, 1)
- << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint
- << " bucket: " << event_entry.event.bucket_name
- << " bucket_owner: " << event_entry.event.bucket_ownerIdentity
- << " object_name: " << event_entry.event.object_key
- << " failed. error: " << ret << dendl;
+ << "ERROR: failed to push sync notification event with error: "
+ << ret << " for event with " << event_entry << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return ret;
}
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
} catch (const RGWPubSubEndpoint::configuration_error& e) {
- ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: "
- << topic.cfg.dest.push_endpoint
- << " bucket: " << event_entry.event.bucket_name
- << " bucket_owner: "
- << event_entry.event.bucket_ownerIdentity
- << " object_name: " << event_entry.event.object_key
- << ". error: " << e.what() << dendl;
+ ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint for sync "
+ "notification event with error: "
+ << e.what() << " event with " << event_entry << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return -EINVAL;
}