struct multi_req_data {
CURL *easy_handle;
- CURLM *multi_handle;
curl_slist *h;
- multi_req_data() : easy_handle(NULL), multi_handle(NULL), h(NULL) {}
+ multi_req_data() : easy_handle(NULL), h(NULL) {}
~multi_req_data() {
- if (multi_handle)
- curl_multi_cleanup(multi_handle);
-
if (easy_handle)
curl_easy_cleanup(easy_handle);
int RGWHTTPClient::init_async(const char *method, const char *url, void **handle)
{
CURL *easy_handle;
- CURLM *multi_handle;
multi_req_data *req_data = new multi_req_data;
*handle = (void *)req_data;
char error_buf[CURL_ERROR_SIZE];
- multi_handle = curl_multi_init();
easy_handle = curl_easy_init();
- req_data->multi_handle = multi_handle;
req_data->easy_handle = easy_handle;
- CURLMcode mstatus = curl_multi_add_handle(multi_handle, easy_handle);
- if (mstatus) {
- dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
- delete req_data;
- return -EIO;
- }
-
dout(20) << "sending request to " << url << dendl;
curl_slist *h = headers_to_slist(headers);
if (has_send_len) {
curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len);
}
+ curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
return 0;
}
#endif
-int RGWHTTPClient::process_request(void *handle, bool wait_for_data, bool *done)
+RGWHTTPManager::RGWHTTPManager(CephContext *_cct) : cct(_cct) {
+ multi_handle = (void *)curl_multi_init();
+}
+
+RGWHTTPManager::~RGWHTTPManager() {
+ if (multi_handle)
+ curl_multi_cleanup((CURLM *)multi_handle);
+}
+
+int RGWHTTPManager::init_async(RGWHTTPClient *client, const char *method, const char *url, void **handle)
+{
+ int ret = client->init_async(method, url, handle);
+ if (ret < 0) {
+ return ret;
+ }
+
+ multi_req_data *req_data = static_cast<multi_req_data *>(*handle);
+
+ CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle);
+ if (mstatus) {
+ dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
+ delete req_data;
+ return -EIO;
+ }
+ return 0;
+}
+
+int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
{
- multi_req_data *req_data = static_cast<multi_req_data *>(handle);
int still_running;
int mstatus;
do {
if (wait_for_data) {
- int ret = do_curl_wait(cct, req_data->multi_handle);
+ int ret = do_curl_wait(cct, (CURLM *)multi_handle);
if (ret < 0) {
return ret;
}
}
- mstatus = curl_multi_perform(req_data->multi_handle, &still_running);
+ mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
switch (mstatus) {
case CURLM_OK:
}
int msgs_left;
CURLMsg *msg;
- while ((msg = curl_multi_info_read(req_data->multi_handle, &msgs_left))) {
+ while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
if (msg->msg == CURLMSG_DONE) {
+ CURL *e = msg->easy_handle;
+ multi_req_data *req_data;
+ curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
switch (msg->data.result) {
case CURLE_OK:
break;
return 0;
}
-int RGWHTTPClient::complete_request(void *handle)
+int RGWHTTPManager::complete_requests()
{
bool done;
int ret;
do {
- ret = process_request(handle, true, &done);
+ ret = process_requests(true, &done);
} while (!done && !ret);
- multi_req_data *req_data = static_cast<multi_req_data *>(handle);
- delete req_data;
return ret;
}
+
+
class RGWHTTPClient
{
+ friend class RGWHTTPManager;
+
bufferlist send_bl;
bufferlist::iterator send_iter;
size_t send_len;
CephContext *cct;
list<pair<string, string> > headers;
+ int init_async(const char *method, const char *url, void **handle);
public:
virtual ~RGWHTTPClient() {}
explicit RGWHTTPClient(CephContext *_cct): send_len (0), has_send_len(false), cct(_cct) {}
int process(const char *method, const char *url);
int process(const char *url) { return process("GET", url); }
+};
- int init_async(const char *method, const char *url, void **handle);
- int process_request(void *handle, bool wait_for_data, bool *done);
- int complete_request(void *handle);
+class RGWHTTPManager {
+ CephContext *cct;
+ void *multi_handle;
+public:
+ RGWHTTPManager(CephContext *_cct);
+ ~RGWHTTPManager();
+
+ int init_async(RGWHTTPClient *client, const char *method, const char *url, void **handle);
+ int process_requests(bool wait_for_data, bool *done);
+ int complete_requests();
};
#endif
lock.Unlock();
bool done;
- return process_request(handle, false, &done);
+ return http_manager.process_requests(false, &done);
}
static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
set_send_length(obj_size);
- int r = init_async(new_info.method, new_url.c_str(), &handle);
+ int r = http_manager.init_async(this, new_info.method, new_url.c_str(), &handle);
if (r < 0)
return r;
int RGWRESTStreamWriteRequest::complete(string& etag, time_t *mtime)
{
- int ret = complete_request(handle);
+ int ret = http_manager.complete_requests();
if (ret < 0)
return ret;
headers.push_back(pair<string, string>(iter->first, iter->second));
}
- int r = process(new_info.method, new_url.c_str());
+ void *handle;
+ int r = http_manager.init_async(this, new_info.method, new_url.c_str(), &handle);
if (r < 0)
return r;
+ r = http_manager.complete_requests();
+ if (r < 0)
+ return ret;
+
return 0;
}
list<bufferlist> pending_send;
void *handle;
RGWGetDataCB *cb;
+ RGWHTTPManager http_manager;
public:
int add_output_data(bufferlist& bl);
int send_data(void *ptr, size_t len);
RGWRESTStreamWriteRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
list<pair<string, string> > *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params),
- lock("RGWRESTStreamWriteRequest"), handle(NULL), cb(NULL) {}
+ lock("RGWRESTStreamWriteRequest"), handle(NULL), cb(NULL), http_manager(_cct) {}
~RGWRESTStreamWriteRequest();
int put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs);
int complete(string& etag, time_t *mtime);
bufferlist in_data;
size_t chunk_ofs;
size_t ofs;
+ RGWHTTPManager http_manager;
protected:
int handle_header(const string& name, const string& val);
public:
RGWRESTStreamReadRequest(CephContext *_cct, string& _url, RGWGetDataCB *_cb, list<pair<string, string> > *_headers,
list<pair<string, string> > *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params),
lock("RGWRESTStreamReadRequest"), cb(_cb),
- chunk_ofs(0), ofs(0) {}
+ chunk_ofs(0), ofs(0), http_manager(_cct) {}
~RGWRESTStreamReadRequest() {}
int get_obj(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj);
int get_resource(RGWAccessKey& key, map<string, string>& extra_headers, const string& resource);