#define dout_subsys ceph_subsys_rgw
-static size_t receive_http_header(void *ptr, size_t size, size_t nmemb, void *_info)
+struct rgw_http_req_data : public RefCountedObject {
+ CURL *easy_handle;
+ curl_slist *h;
+ uint64_t id;
+ int ret;
+ atomic_t done;
+ RGWHTTPClient *client;
+ void *user_info;
+ bool registered;
+ RGWHTTPManager *mgr;
+ char error_buf[CURL_ERROR_SIZE];
+
+ Mutex lock;
+ Cond cond;
+
+ rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
+ client(nullptr), user_info(nullptr), registered(false),
+ mgr(NULL), lock("rgw_http_req_data::lock") {
+ memset(error_buf, 0, sizeof(error_buf));
+ }
+
+ int wait() {
+ Mutex::Locker l(lock);
+ cond.Wait(lock);
+ return ret;
+ }
+
+ void finish(int r) {
+ Mutex::Locker l(lock);
+ ret = r;
+ if (easy_handle)
+ curl_easy_cleanup(easy_handle);
+
+ if (h)
+ curl_slist_free_all(h);
+
+ easy_handle = NULL;
+ h = NULL;
+ done.set(1);
+ cond.Signal();
+ }
+
+ bool is_done() {
+ return done.read() != 0;
+ }
+
+ int get_retcode() {
+ Mutex::Locker l(lock);
+ return ret;
+ }
+};
+
+/*
+ * the simple set of callbacks will be called on RGWHTTPClient::process()
+ */
+static size_t simple_receive_http_header(void *ptr, size_t size, size_t nmemb, void *_info)
{
RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
size_t len = size * nmemb;
return len;
}
-static size_t receive_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
+static size_t simple_receive_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
{
RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
size_t len = size * nmemb;
return len;
}
-static size_t send_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
+static size_t simple_send_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
{
RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
int ret = client->send_data(ptr, size * nmemb);
return ret;
}
+/*
+ * the following set of callbacks will be called either on RGWHTTPManager::process(),
+ * or via the RGWHTTPManager async processing.
+ */
+static size_t receive_http_header(void *ptr, size_t size, size_t nmemb, void *_info)
+{
+ rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
+ size_t len = size * nmemb;
+
+ Mutex::Locker l(req_data->lock);
+
+ if (!req_data->registered) {
+ return len;
+ }
+
+ int ret = req_data->client->receive_header(ptr, size * nmemb);
+ if (ret < 0) {
+ dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
+ }
+
+ return len;
+}
+
+static size_t receive_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
+{
+ rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
+ size_t len = size * nmemb;
+
+ Mutex::Locker l(req_data->lock);
+
+ if (!req_data->registered) {
+ return len;
+ }
+
+ int ret = req_data->client->receive_data(ptr, size * nmemb);
+ if (ret < 0) {
+ dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
+ }
+
+ return len;
+}
+
+static size_t send_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
+{
+ rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
+
+ Mutex::Locker l(req_data->lock);
+
+ if (!req_data->registered) {
+ return 0;
+ }
+
+ int ret = req_data->client->send_data(ptr, size * nmemb);
+ if (ret < 0) {
+ dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
+ }
+
+ return ret;
+}
+
static curl_slist *headers_to_slist(param_vec_t& headers)
{
curl_slist *h = NULL;
return h;
}
+/*
+ * process a single simple one off request, not going through RGWHTTPManager. Not using
+ * req_data.
+ */
int RGWHTTPClient::process(const char *method, const char *url)
{
int ret = 0;
curl_easy_setopt(curl_handle, CURLOPT_URL, url);
curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
- curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
+ curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header);
curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
- curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
+ curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data);
curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
if (h) {
curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
}
- curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, send_http_data);
+ curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data);
curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L);
if (has_send_len) {
return ret;
}
-struct rgw_http_req_data : public RefCountedObject {
- CURL *easy_handle;
- curl_slist *h;
- uint64_t id;
- int ret;
- atomic_t done;
- RGWHTTPClient *client;
- void *user_info;
- RGWHTTPManager *mgr;
- char error_buf[CURL_ERROR_SIZE];
-
- Mutex lock;
- Cond cond;
-
- rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
- client(nullptr), user_info(nullptr),
- mgr(NULL), lock("rgw_http_req_data::lock") {
- memset(error_buf, 0, sizeof(error_buf));
- }
-
- int wait() {
- Mutex::Locker l(lock);
- cond.Wait(lock);
- return ret;
- }
-
- void finish(int r) {
- Mutex::Locker l(lock);
- ret = r;
- if (easy_handle)
- curl_easy_cleanup(easy_handle);
-
- if (h)
- curl_slist_free_all(h);
-
- easy_handle = NULL;
- h = NULL;
- done.set(1);
- cond.Signal();
- }
-
- bool is_done() {
- return done.read() != 0;
- }
-
- int get_retcode() {
- Mutex::Locker l(lock);
- return ret;
- }
-};
-
string RGWHTTPClient::to_str()
{
string method_str = (last_method.empty() ? "<no-method>" : last_method);
return req_data->get_retcode();
}
+/*
+ * init request, will be used later with RGWHTTPManager
+ */
int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data)
{
assert(!req_data);
curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
- curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)this);
+ curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
- curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)this);
+ curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
if (h) {
curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
}
curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
- curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)this);
+ curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
if (has_send_len) {
curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len);
return 0;
}
+/*
+ * wait for async request to complete
+ */
int RGWHTTPClient::wait()
{
if (!req_data->is_done()) {
RGWHTTPClient::~RGWHTTPClient()
{
if (req_data) {
- wait();
+ req_data->mgr->remove_request(this);
+
req_data->put();
}
}
return NULL;
}
+/*
+ * RGWHTTPManager has two modes of operation: threaded and non-threaded.
+ */
RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
completion_mgr(_cm), is_threaded(false),
reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
{
RWLock::WLocker rl(reqs_lock);
req_data->id = num_reqs;
+ req_data->registered = true;
reqs[num_reqs] = req_data;
num_reqs++;
ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
}
+void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
+{
+ RWLock::WLocker rl(reqs_lock);
+ req_data->get();
+ req_data->registered = false;
+ unregistered_reqs.push_back(req_data);
+ ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
+}
+
void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
{
RWLock::WLocker rl(reqs_lock);
if (completion_mgr) {
completion_mgr->complete(NULL, req_data->user_info);
}
+
req_data->put();
}
_complete_request(req_data);
}
+/*
+ * hook request to the curl multi handle
+ */
int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
{
ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
return 0;
}
-void RGWHTTPManager::link_pending_requests()
+/*
+ * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
+ * there will be no more processing on this request
+ */
+void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
+{
+ if (req_data->easy_handle) {
+ curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle);
+ }
+ if (!req_data->is_done()) {
+ _finish_request(req_data, -ECANCELED);
+ }
+}
+
+void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
+{
+ RWLock::WLocker wl(reqs_lock);
+ _unlink_request(req_data);
+}
+
+void RGWHTTPManager::manage_pending_requests()
{
reqs_lock.get_read();
- if (max_threaded_req == num_reqs) {
+ if (max_threaded_req == num_reqs && unregistered_reqs.empty()) {
reqs_lock.unlock();
return;
}
RWLock::WLocker wl(reqs_lock);
+ if (!unregistered_reqs.empty()) {
+ for (auto& r : unregistered_reqs) {
+ _unlink_request(r);
+ r->put();
+ }
+
+ unregistered_reqs.clear();
+ }
+
map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
list<std::pair<rgw_http_req_data *, int> > remove_reqs;
return ret;
}
+int RGWHTTPManager::remove_request(RGWHTTPClient *client)
+{
+ rgw_http_req_data *req_data = client->get_req_data();
+
+ if (!is_threaded) {
+ unlink_request(req_data);
+ return 0;
+ }
+ unregister_request(req_data);
+ int ret = signal_thread();
+ if (ret < 0) {
+ return ret;
+ }
+
+ return 0;
+}
+
+/*
+ * the synchronous, non-threaded request processing method.
+ */
int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
{
assert(!is_threaded);
return 0;
}
+/*
+ * the synchronous, non-threaded request processing completion method.
+ */
int RGWHTTPManager::complete_requests()
{
bool done;
return NULL;
}
- link_pending_requests();
+ manage_pending_requests();
mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
switch (mstatus) {