From df66f697d14343462b4eadc716031cf0b17a3607 Mon Sep 17 00:00:00 2001 From: Hoai-Thu Vuong Date: Fri, 25 Apr 2025 15:59:30 +0700 Subject: [PATCH] rgw: add metric when send message with kafka and ampq - l_rgw_pubsub_push_pending - l_rgw_pubsub_push_failed Fixes: https://tracker.ceph.com/issues/70256 Signed-off-by: Hoai-Thu Vuong (cherry picked from commit df4ef781f6ad33f64c2e5a9d158986623d2f8c89) Conflicts: src/rgw/driver/rados/rgw_notify.cc - keep current change and apply only increase metric --- src/rgw/driver/rados/rgw_notify.cc | 1 + src/rgw/driver/rados/rgw_pubsub_push.cc | 38 +++++++++++++++++++++---- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 77bba50d81a..8e345557f42 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -492,6 +492,7 @@ private: remove_entries = true; needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating); notifs_persistency_tracker.erase(entry.marker); + if (result == EntryProcessingResult::Expired && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); } else { if (set_min_marker(end_marker, entry.marker) < 0) { ldpp_dout(this, 1) << "ERROR: cannot determine minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl; diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index f3baeeb0aa8..8048b840d3e 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -207,10 +207,15 @@ public: int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event, optional_yield y) override { if (ack_level == ack_level_t::None) { - return amqp::publish(conn_id, topic, json_format_pubsub_event(event)); + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); + const auto rc = amqp::publish(conn_id, topic, json_format_pubsub_event(event)); + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return rc; } else { // TODO: currently broker and routable are the same - this will require different flags but the same mechanism if (y) { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); auto& yield = y.get_yield_context(); ceph::async::yield_waiter w; boost::asio::defer(yield.get_executor(),[&w, &event, this]() { @@ -222,7 +227,10 @@ public: w.complete(boost::system::error_code{}, rc); } }); - return w.async_wait(yield); + const auto rc = w.async_wait(yield); + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return rc; } ceph::async::waiter w; const auto rc = amqp::publish_with_confirm( @@ -230,9 +238,14 @@ public: [&w](int r) {w(r);}); if (rc < 0) { // failed to publish, does not wait for reply + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); return rc; } - return w.wait(); + const auto wait_rc = w.wait(); + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return wait_rc; } } @@ -293,8 +306,13 @@ public: int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event, optional_yield y) override { if (ack_level == ack_level_t::None) { - return kafka::publish(conn_id, topic, json_format_pubsub_event(event)); + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); + const auto rc = kafka::publish(conn_id, topic, json_format_pubsub_event(event)); + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return rc; } else { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); if (y) { auto& yield = y.get_yield_context(); ceph::async::yield_waiter w; @@ -307,7 +325,10 @@ public: w.complete(boost::system::error_code{}, rc); } }); - return w.async_wait(yield); + const auto rc = w.async_wait(yield); + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return rc; } ceph::async::waiter w; const auto rc = kafka::publish_with_confirm( @@ -315,9 +336,14 @@ public: [&w](int r) {w(r);}); if (rc < 0) { // failed to publish, does not wait for reply + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); return rc; } - return w.wait(); + const auto wait_rc = w.wait(); + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return wait_rc; } } -- 2.39.5