]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add metric when send message with kafka and ampq 62971/head
authorHoai-Thu Vuong <thuvh87@gmail.com>
Fri, 25 Apr 2025 08:59:30 +0000 (15:59 +0700)
committerHoai-Thu Vuong <thuvh87@gmail.com>
Wed, 8 Oct 2025 09:38:47 +0000 (16:38 +0700)
- l_rgw_pubsub_push_pending
- l_rgw_pubsub_push_failed

Fixes: https://tracker.ceph.com/issues/70256
Signed-off-by: Hoai-Thu Vuong <thuvh87@gmail.com>
src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_pubsub_push.cc

index c60c8c7a4f57fcd69be697dc43434c7c697533de..d9410e5f6e62e7ec9f3695841e218114c3e7be07 100644 (file)
@@ -509,6 +509,7 @@ private:
               needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating);
               notifs_persistency_tracker.erase(entry.marker);
               is_idle = false;
+              if (result == EntryProcessingResult::Expired && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
               return;
             }
             if (set_min_marker(end_marker, entry.marker) < 0) {
index 91526f8d76f356a939f8aa561c8530171075947f..50f5ef3c170b734deb7b8da6fccad11f3722e53b 100644 (file)
@@ -218,10 +218,15 @@ public:
 
   int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event, optional_yield y) override {
     if (ack_level == ack_level_t::None) {
-      return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
+      if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+      const auto rc = amqp::publish(conn_id, topic, json_format_pubsub_event(event));
+      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+      if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+      return rc;
     } else {
       // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
       if (y) {
+        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
         auto& yield = y.get_yield_context();
         ceph::async::yield_waiter<int> w;
         boost::asio::defer(yield.get_executor(),[&w, &event, this]() {
@@ -233,7 +238,10 @@ public:
             w.complete(boost::system::error_code{}, rc);
           }
         });
-        return w.async_wait(yield);
+        const auto rc = w.async_wait(yield);
+        if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+        if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+        return rc;
       }
       ceph::async::waiter<int> w;
       const auto rc = amqp::publish_with_confirm(
@@ -241,9 +249,14 @@ public:
             [&w](int r) {w(r);});
       if (rc < 0) {
         // failed to publish, does not wait for reply
+        if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
         return rc;
       }
-      return w.wait();
+      const auto wait_rc = w.wait();
+      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+      if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+      return wait_rc;
     }
   }
 
@@ -304,8 +317,13 @@ public:
   int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event,
            optional_yield y) override {
     if (ack_level == ack_level_t::None) {
-      return kafka::publish(conn_id, topic, json_format_pubsub_event(event));
+      if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+      const auto rc = kafka::publish(conn_id, topic, json_format_pubsub_event(event));
+      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+      if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+      return rc;
     } else {
+      if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
       if (y) {
         auto& yield = y.get_yield_context();
         ceph::async::yield_waiter<int> w;
@@ -318,7 +336,10 @@ public:
             w.complete(boost::system::error_code{}, rc);
           }
         });
-        return w.async_wait(yield);
+        const auto rc = w.async_wait(yield);
+        if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+        if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+        return rc;
       }
       ceph::async::waiter<int> w;
       const auto rc = kafka::publish_with_confirm(
@@ -326,9 +347,14 @@ public:
             [&w](int r) {w(r);});
       if (rc < 0) {
         // failed to publish, does not wait for reply
+        if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
         return rc;
       }
-      return w.wait();
+      const auto wait_rc = w.wait();
+      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+      if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+      return wait_rc;
     }
   }