From df4ef781f6ad33f64c2e5a9d158986623d2f8c89 Mon Sep 17 00:00:00 2001 From: Hoai-Thu Vuong Date: Fri, 25 Apr 2025 15:59:30 +0700 Subject: [PATCH] rgw: add metric when send message with kafka and ampq - l_rgw_pubsub_push_pending - l_rgw_pubsub_push_failed Fixes: https://tracker.ceph.com/issues/70256 Signed-off-by: Hoai-Thu Vuong --- src/rgw/driver/rados/rgw_notify.cc | 1 + src/rgw/driver/rados/rgw_pubsub_push.cc | 38 +++++++++++++++++++++---- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index c60c8c7a4f57f..d9410e5f6e62e 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -509,6 +509,7 @@ private: needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating); notifs_persistency_tracker.erase(entry.marker); is_idle = false; + if (result == EntryProcessingResult::Expired && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); return; } if (set_min_marker(end_marker, entry.marker) < 0) { diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index 91526f8d76f35..50f5ef3c170b7 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -218,10 +218,15 @@ public: int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event, optional_yield y) override { if (ack_level == ack_level_t::None) { - return amqp::publish(conn_id, topic, json_format_pubsub_event(event)); + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); + const auto rc = amqp::publish(conn_id, topic, json_format_pubsub_event(event)); + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return rc; } else { // TODO: currently broker and routable are the same - this will require different flags but the same mechanism if (y) { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); auto& yield = y.get_yield_context(); ceph::async::yield_waiter w; boost::asio::defer(yield.get_executor(),[&w, &event, this]() { @@ -233,7 +238,10 @@ public: w.complete(boost::system::error_code{}, rc); } }); - return w.async_wait(yield); + const auto rc = w.async_wait(yield); + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return rc; } ceph::async::waiter w; const auto rc = amqp::publish_with_confirm( @@ -241,9 +249,14 @@ public: [&w](int r) {w(r);}); if (rc < 0) { // failed to publish, does not wait for reply + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); return rc; } - return w.wait(); + const auto wait_rc = w.wait(); + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return wait_rc; } } @@ -304,8 +317,13 @@ public: int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event, optional_yield y) override { if (ack_level == ack_level_t::None) { - return kafka::publish(conn_id, topic, json_format_pubsub_event(event)); + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); + const auto rc = kafka::publish(conn_id, topic, json_format_pubsub_event(event)); + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return rc; } else { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); if (y) { auto& yield = y.get_yield_context(); ceph::async::yield_waiter w; @@ -318,7 +336,10 @@ public: w.complete(boost::system::error_code{}, rc); } }); - return w.async_wait(yield); + const auto rc = w.async_wait(yield); + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return rc; } ceph::async::waiter w; const auto rc = kafka::publish_with_confirm( @@ -326,9 +347,14 @@ public: [&w](int r) {w(r);}); if (rc < 0) { // failed to publish, does not wait for reply + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); return rc; } - return w.wait(); + const auto wait_rc = w.wait(); + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return wait_rc; } } -- 2.39.5