]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: add metric when send message with kafka and ampq
authorHoai-Thu Vuong <thuvh87@gmail.com>
Fri, 25 Apr 2025 08:59:30 +0000 (15:59 +0700)
committerHoai-Thu Vuong <thuvh87@gmail.com>
Sun, 12 Oct 2025 17:26:43 +0000 (00:26 +0700)
- l_rgw_pubsub_push_pending
- l_rgw_pubsub_push_failed

Fixes: https://tracker.ceph.com/issues/70256
Signed-off-by: Hoai-Thu Vuong <thuvh87@gmail.com>
(cherry picked from commit df4ef781f6ad33f64c2e5a9d158986623d2f8c89)

Conflicts:
    src/rgw/driver/rados/rgw_notify.cc

- keep current change and apply only increase metric

src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_pubsub_push.cc

index 48bd7522632e637b276c2e0a50c5c78a6a1ca6fc..99615f38c3bf93183a1b7059d310194dea71a0c7 100644 (file)
@@ -485,6 +485,7 @@ private:
               remove_entries = true;
               needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating);
               notifs_persistency_tracker.erase(entry.marker);
+              if (result == EntryProcessingResult::Expired && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
             }  else {
               if (set_min_marker(end_marker, entry.marker) < 0) {
                 ldpp_dout(this, 1) << "ERROR: cannot determine minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
index 4c2dd430cf619f79c92afde54a201ea8b8b7e2e8..4164e74b4efce900847781cae6af24efb62aa222 100644 (file)
@@ -202,10 +202,15 @@ public:
 
   int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
     if (ack_level == ack_level_t::None) {
-      return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
+      if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+      const auto rc = amqp::publish(conn_id, topic, json_format_pubsub_event(event));
+      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+      if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+      return rc;
     } else {
       // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
       if (y) {
+        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
         auto& yield = y.get_yield_context();
         ceph::async::yield_waiter<int> w;
         boost::asio::defer(yield.get_executor(),[&w, &event, this]() {
@@ -217,7 +222,10 @@ public:
             w.complete(boost::system::error_code{}, rc);
           }
         });
-        return w.async_wait(yield);
+        const auto rc = w.async_wait(yield);
+        if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+        if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+        return rc;
       }
       ceph::async::waiter<int> w;
       const auto rc = amqp::publish_with_confirm(
@@ -225,9 +233,14 @@ public:
             [&w](int r) {w(r);});
       if (rc < 0) {
         // failed to publish, does not wait for reply
+        if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
         return rc;
       }
-      return w.wait();
+      const auto wait_rc = w.wait();
+      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+      if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+      return wait_rc;
     }
   }
 
@@ -287,8 +300,13 @@ public:
 
   int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
     if (ack_level == ack_level_t::None) {
-      return kafka::publish(conn_id, topic, json_format_pubsub_event(event));
+      if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+      const auto rc = kafka::publish(conn_id, topic, json_format_pubsub_event(event));
+      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+      if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+      return rc;
     } else {
+      if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
       if (y) {
         auto& yield = y.get_yield_context();
         ceph::async::yield_waiter<int> w;
@@ -301,7 +319,10 @@ public:
             w.complete(boost::system::error_code{}, rc);
           }
         });
-        return w.async_wait(yield);
+        const auto rc = w.async_wait(yield);
+        if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+        if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+        return rc;
       }
       ceph::async::waiter<int> w;
       const auto rc = kafka::publish_with_confirm(
@@ -309,9 +330,14 @@ public:
             [&w](int r) {w(r);});
       if (rc < 0) {
         // failed to publish, does not wait for reply
+        if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
         return rc;
       }
-      return w.wait();
+      const auto wait_rc = w.wait();
+      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+      if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+      return wait_rc;
     }
   }