event on each notification, but ``pubsub_push_ok`` and ``pubsub_push_fail``
are incremented per push action on each notification.
+Configuration Options
+------------------------------
+The following are global configuration options for the different endpoints:
+
+HTTP
+~~~~
+.. confval:: rgw_http_notif_message_timeout
+.. confval:: rgw_http_notif_connection_timeout
+.. confval:: rgw_http_notif_max_inflight
+
+
Bucket Notification REST API
----------------------------
services:
- rgw
with_legacy: true
+- name: rgw_http_notif_message_timeout
+ type: uint
+ level: advanced
+ desc: This is the maximum time in seconds to deliver a notification
+ long_desc: This is the maximum time in seconds to deliver a notification.
+ Delivery error occurs when the message timeout is exceeded.
+ This value includes the connection time, and hence must be larger than rgw_http_notif_connection_timeout.
+ If set to zero the http client will wait indefinitely.
+ see https://curl.se/libcurl/c/CURLOPT_TIMEOUT.html
+ default: 10
+ services:
+ - rgw
+ with_legacy: true
+- name: rgw_http_notif_connection_timeout
+ type: uint
+ level: advanced
+ desc: This is the maximum time in seconds to connect to an endpoint
+ long_desc: This is the maximum time in seconds to connect to an endpoint.
+ Delivery error occurs when the message timeout is exceeded.
+ If set to zero the default value of 300 seconds will be used.
+ see https://curl.se/libcurl/c/CURLOPT_CONNECTTIMEOUT.html
+ default: 5
+ services:
+ - rgw
+ with_legacy: true
+- name: rgw_http_notif_max_inflight
+ type: uint
+ level: advanced
+ desc: This is the maximum number of messages in-flight (across all http endpoints)
+ long_desc: This is the maximum number of messages in-flight (across all http endpoints).
+ Delivery error (BUSY) occurs when the number of messages is exceeded.
+ If set to zero there is no limit on the number of messages in-flight.
+ default: 8192
+ services:
+ - rgw
+ with_legacy: true
- name: rgw_d4n_l1_datacache_address
type: str
level: advanced
static std::unique_ptr<RGWHTTPManager> s_http_manager;
static std::shared_mutex s_http_manager_mutex;
+static std::atomic<unsigned> s_http_manager_inflight(0);
class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
private:
ldout(cct, 1) << "ERROR: send failed. http endpoint manager not running" << dendl;
return -ESRCH;
}
+ const auto max_inflight = cct->_conf->rgw_http_notif_max_inflight;
+ if (max_inflight != 0 &&
+ s_http_manager_inflight >= max_inflight) {
+ ldout(cct, 1) << "ERROR: send failed. http endpoint manager busy. in-flight requests: " <<
+ s_http_manager_inflight << " >= " << max_inflight << dendl;
+ return -EBUSY;
+ }
bufferlist read_bl;
RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
- //default to 3 seconds for wrong url hits - if wrong endpoint configured
- request.set_req_connect_timeout(3);
+ request.set_req_connect_timeout(cct->_conf->rgw_http_notif_connection_timeout);
+ request.set_req_timeout(cct->_conf->rgw_http_notif_message_timeout);
const auto post_data = json_format_pubsub_event(event);
if (cloudevents) {
// following: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md
request.set_post_data(post_data);
request.set_send_length(post_data.length());
request.append_header("Content-Type", "application/json");
+ ++s_http_manager_inflight;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
auto rc = s_http_manager->add_request(&request);
if (rc == 0) {
rc = request.wait(dpp, y);
}
+ --s_http_manager_inflight;
if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
// TODO: use read_bl to process return code and handle according to ack level
return rc;