]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: add metric when send message with kafka and ampq
authorHoai-Thu Vuong <thuvh87@gmail.com>
Fri, 25 Apr 2025 08:59:30 +0000 (15:59 +0700)
committerHoai-Thu Vuong <thuvh87@gmail.com>
Sun, 12 Oct 2025 17:40:54 +0000 (00:40 +0700)
- l_rgw_pubsub_push_pending
- l_rgw_pubsub_push_failed

Fixes: https://tracker.ceph.com/issues/70256
Signed-off-by: Hoai-Thu Vuong <thuvh87@gmail.com>
(cherry picked from commit df4ef781f6ad33f64c2e5a9d158986623d2f8c89)

Conflicts:
    src/rgw/driver/rados/rgw_notify.cc
- keep current change and apply only increase metric

src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_pubsub_push.cc

index 77bba50d81a2f10685bb45bd8da77fbe0fe2cde3..8e345557f42ace9f0093638d775bad9dcb6d7aa8 100644 (file)
@@ -492,6 +492,7 @@ private:
               remove_entries = true;
               needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating);
               notifs_persistency_tracker.erase(entry.marker);
+              if (result == EntryProcessingResult::Expired && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
             }  else {
               if (set_min_marker(end_marker, entry.marker) < 0) {
                 ldpp_dout(this, 1) << "ERROR: cannot determine minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
index f3baeeb0aa89bb720cdc3a8cc26d3fb392ad3af2..8048b840d3e8597c741fa5878fb936a113752d58 100644 (file)
@@ -207,10 +207,15 @@ public:
 
   int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event, optional_yield y) override {
     if (ack_level == ack_level_t::None) {
-      return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
+      if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+      const auto rc = amqp::publish(conn_id, topic, json_format_pubsub_event(event));
+      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+      if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+      return rc;
     } else {
       // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
       if (y) {
+        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
         auto& yield = y.get_yield_context();
         ceph::async::yield_waiter<int> w;
         boost::asio::defer(yield.get_executor(),[&w, &event, this]() {
@@ -222,7 +227,10 @@ public:
             w.complete(boost::system::error_code{}, rc);
           }
         });
-        return w.async_wait(yield);
+        const auto rc = w.async_wait(yield);
+        if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+        if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+        return rc;
       }
       ceph::async::waiter<int> w;
       const auto rc = amqp::publish_with_confirm(
@@ -230,9 +238,14 @@ public:
             [&w](int r) {w(r);});
       if (rc < 0) {
         // failed to publish, does not wait for reply
+        if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
         return rc;
       }
-      return w.wait();
+      const auto wait_rc = w.wait();
+      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+      if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+      return wait_rc;
     }
   }
 
@@ -293,8 +306,13 @@ public:
   int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event,
            optional_yield y) override {
     if (ack_level == ack_level_t::None) {
-      return kafka::publish(conn_id, topic, json_format_pubsub_event(event));
+      if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+      const auto rc = kafka::publish(conn_id, topic, json_format_pubsub_event(event));
+      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+      if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+      return rc;
     } else {
+      if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
       if (y) {
         auto& yield = y.get_yield_context();
         ceph::async::yield_waiter<int> w;
@@ -307,7 +325,10 @@ public:
             w.complete(boost::system::error_code{}, rc);
           }
         });
-        return w.async_wait(yield);
+        const auto rc = w.async_wait(yield);
+        if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+        if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+        return rc;
       }
       ceph::async::waiter<int> w;
       const auto rc = kafka::publish_with_confirm(
@@ -315,9 +336,14 @@ public:
             [&w](int r) {w(r);});
       if (rc < 0) {
         // failed to publish, does not wait for reply
+        if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
         return rc;
       }
-      return w.wait();
+      const auto wait_rc = w.wait();
+      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+      if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+      return wait_rc;
     }
   }