int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event, optional_yield y) override {
if (ack_level == ack_level_t::None) {
- return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+ const auto rc = amqp::publish(conn_id, topic, json_format_pubsub_event(event));
+ if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+ if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+ return rc;
} else {
// TODO: currently broker and routable are the same - this will require different flags but the same mechanism
if (y) {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
auto& yield = y.get_yield_context();
ceph::async::yield_waiter<int> w;
boost::asio::defer(yield.get_executor(),[&w, &event, this]() {
w.complete(boost::system::error_code{}, rc);
}
});
- return w.async_wait(yield);
+ const auto rc = w.async_wait(yield);
+ if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+ if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+ return rc;
}
ceph::async::waiter<int> w;
const auto rc = amqp::publish_with_confirm(
[&w](int r) {w(r);});
if (rc < 0) {
// failed to publish, does not wait for reply
+ if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return rc;
}
- return w.wait();
+ const auto wait_rc = w.wait();
+ if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+ if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+ return wait_rc;
}
}
int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event,
optional_yield y) override {
if (ack_level == ack_level_t::None) {
- return kafka::publish(conn_id, topic, json_format_pubsub_event(event));
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+ const auto rc = kafka::publish(conn_id, topic, json_format_pubsub_event(event));
+ if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+ if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+ return rc;
} else {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
if (y) {
auto& yield = y.get_yield_context();
ceph::async::yield_waiter<int> w;
w.complete(boost::system::error_code{}, rc);
}
});
- return w.async_wait(yield);
+ const auto rc = w.async_wait(yield);
+ if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+ if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+ return rc;
}
ceph::async::waiter<int> w;
const auto rc = kafka::publish_with_confirm(
[&w](int r) {w(r);});
if (rc < 0) {
// failed to publish, does not wait for reply
+ if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return rc;
}
- return w.wait();
+ const auto wait_rc = w.wait();
+ if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+ if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+ return wait_rc;
}
}