]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: rework http client to handle multiple async requests
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 8 Jul 2015 23:11:14 +0000 (16:11 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 9 Feb 2016 20:52:07 +0000 (12:52 -0800)
Currently we use the curl multi handler using only a single handle. Rework
things a bit to prepare for multiple handlers.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h

index 4fedff9d91348b76c731ec7896ee95e498917c3a..b0048361665140db0cd796aa47c46f6b87c3a7b1 100644 (file)
@@ -119,14 +119,10 @@ int RGWHTTPClient::process(const char *method, const char *url)
 
 struct multi_req_data {
   CURL *easy_handle;
-  CURLM *multi_handle;
   curl_slist *h;
 
-  multi_req_data() : easy_handle(NULL), multi_handle(NULL), h(NULL) {}
+  multi_req_data() : easy_handle(NULL), h(NULL) {}
   ~multi_req_data() {
-    if (multi_handle)
-      curl_multi_cleanup(multi_handle);
-
     if (easy_handle)
       curl_easy_cleanup(easy_handle);
 
@@ -138,25 +134,15 @@ struct multi_req_data {
 int RGWHTTPClient::init_async(const char *method, const char *url, void **handle)
 {
   CURL *easy_handle;
-  CURLM *multi_handle;
   multi_req_data *req_data = new multi_req_data;
   *handle = (void *)req_data;
 
   char error_buf[CURL_ERROR_SIZE];
 
-  multi_handle = curl_multi_init();
   easy_handle = curl_easy_init();
 
-  req_data->multi_handle = multi_handle;
   req_data->easy_handle = easy_handle;
 
-  CURLMcode mstatus = curl_multi_add_handle(multi_handle, easy_handle);
-  if (mstatus) {
-    dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
-    delete req_data;
-    return -EIO;
-  }
-
   dout(20) << "sending request to " << url << dendl;
 
   curl_slist *h = headers_to_slist(headers);
@@ -181,6 +167,7 @@ int RGWHTTPClient::init_async(const char *method, const char *url, void **handle
   if (has_send_len) {
     curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len); 
   }
+  curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
 
   return 0;
 }
@@ -239,21 +226,47 @@ static int do_curl_wait(CephContext *cct, CURLM *handle)
 
 #endif
 
-int RGWHTTPClient::process_request(void *handle, bool wait_for_data, bool *done)
+RGWHTTPManager::RGWHTTPManager(CephContext *_cct) : cct(_cct) {
+  multi_handle = (void *)curl_multi_init();
+}
+
+RGWHTTPManager::~RGWHTTPManager() {
+  if (multi_handle)
+    curl_multi_cleanup((CURLM *)multi_handle);
+}
+
+int RGWHTTPManager::init_async(RGWHTTPClient *client, const char *method, const char *url, void **handle)
+{
+  int ret = client->init_async(method, url, handle);
+  if (ret < 0) {
+    return ret;
+  }
+
+  multi_req_data *req_data = static_cast<multi_req_data *>(*handle);
+
+  CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle);
+  if (mstatus) {
+    dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
+    delete req_data;
+    return -EIO;
+  }
+  return 0;
+}
+
+int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
 {
-  multi_req_data *req_data = static_cast<multi_req_data *>(handle);
   int still_running;
   int mstatus;
 
   do {
     if (wait_for_data) {
-      int ret = do_curl_wait(cct, req_data->multi_handle);
+      int ret = do_curl_wait(cct, (CURLM *)multi_handle);
       if (ret < 0) {
         return ret;
       }
     }
 
-    mstatus = curl_multi_perform(req_data->multi_handle, &still_running);
+    mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
     dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
     switch (mstatus) {
       case CURLM_OK:
@@ -264,8 +277,11 @@ int RGWHTTPClient::process_request(void *handle, bool wait_for_data, bool *done)
     }
     int msgs_left;
     CURLMsg *msg;
-    while ((msg = curl_multi_info_read(req_data->multi_handle, &msgs_left))) {
+    while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
       if (msg->msg == CURLMSG_DONE) {
+       CURL *e = msg->easy_handle;
+       multi_req_data *req_data;
+       curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
         switch (msg->data.result) {
           case CURLE_OK:
             break;
@@ -282,15 +298,15 @@ int RGWHTTPClient::process_request(void *handle, bool wait_for_data, bool *done)
   return 0;
 }
 
-int RGWHTTPClient::complete_request(void *handle)
+int RGWHTTPManager::complete_requests()
 {
   bool done;
   int ret;
   do {
-    ret = process_request(handle, true, &done);
+    ret = process_requests(true, &done);
   } while (!done && !ret);
-  multi_req_data *req_data = static_cast<multi_req_data *>(handle);
-  delete req_data;
 
   return ret;
 }
+
+
index 705b848d1fbce6d85c84ea8a6963593ebeef68ca..38b6b29bef971a5765c15ea1cba0a1c6dd063ca2 100644 (file)
@@ -8,6 +8,8 @@
 
 class RGWHTTPClient
 {
+  friend class RGWHTTPManager;
+
   bufferlist send_bl;
   bufferlist::iterator send_iter;
   size_t send_len;
@@ -16,6 +18,7 @@ protected:
   CephContext *cct;
 
   list<pair<string, string> > headers;
+  int init_async(const char *method, const char *url, void **handle);
 public:
   virtual ~RGWHTTPClient() {}
   explicit RGWHTTPClient(CephContext *_cct): send_len (0), has_send_len(false), cct(_cct) {}
@@ -35,10 +38,18 @@ public:
 
   int process(const char *method, const char *url);
   int process(const char *url) { return process("GET", url); }
+};
 
-  int init_async(const char *method, const char *url, void **handle);
-  int process_request(void *handle, bool wait_for_data, bool *done);
-  int complete_request(void *handle);
+class RGWHTTPManager {
+  CephContext *cct;
+  void *multi_handle;
+public:
+  RGWHTTPManager(CephContext *_cct);
+  ~RGWHTTPManager();
+
+  int init_async(RGWHTTPClient *client, const char *method, const char *url, void **handle);
+  int process_requests(bool wait_for_data, bool *done);
+  int complete_requests();
 };
 
 #endif
index 5996a4c12ff7e90cd2687c85aa34c18a71e98619..d67759c610928208d2041e634dd03cb121edeac9 100644 (file)
@@ -307,7 +307,7 @@ int RGWRESTStreamWriteRequest::add_output_data(bufferlist& bl)
   lock.Unlock();
 
   bool done;
-  return process_request(handle, false, &done);
+  return http_manager.process_requests(false, &done);
 }
 
 static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
@@ -455,7 +455,7 @@ int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uin
 
   set_send_length(obj_size);
 
-  int r = init_async(new_info.method, new_url.c_str(), &handle);
+  int r = http_manager.init_async(this, new_info.method, new_url.c_str(), &handle);
   if (r < 0)
     return r;
 
@@ -520,7 +520,7 @@ void set_str_from_headers(map<string, string>& out_headers, const string& header
 
 int RGWRESTStreamWriteRequest::complete(string& etag, time_t *mtime)
 {
-  int ret = complete_request(handle);
+  int ret = http_manager.complete_requests();
   if (ret < 0)
     return ret;
 
@@ -602,9 +602,14 @@ int RGWRESTStreamReadRequest::get_resource(RGWAccessKey& key, map<string, string
     headers.push_back(pair<string, string>(iter->first, iter->second));
   }
 
-  int r = process(new_info.method, new_url.c_str());
+  void *handle;
+  int r = http_manager.init_async(this, new_info.method, new_url.c_str(), &handle);
   if (r < 0)
     return r;
+  r = http_manager.complete_requests();
+  if (r < 0)
+    return ret;
+
 
   return 0;
 }
index 694984a29e277ebc6dcedd6e185e76e61812c9e3..ad06308866fc5f78eebaee659fb4c7defb0f6c8e 100644 (file)
@@ -60,13 +60,14 @@ class RGWRESTStreamWriteRequest : public RGWRESTSimpleRequest {
   list<bufferlist> pending_send;
   void *handle;
   RGWGetDataCB *cb;
+  RGWHTTPManager http_manager;
 public:
   int add_output_data(bufferlist& bl);
   int send_data(void *ptr, size_t len);
 
   RGWRESTStreamWriteRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
                 list<pair<string, string> > *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params),
-                lock("RGWRESTStreamWriteRequest"), handle(NULL), cb(NULL) {}
+                lock("RGWRESTStreamWriteRequest"), handle(NULL), cb(NULL), http_manager(_cct) {}
   ~RGWRESTStreamWriteRequest();
   int put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs);
   int complete(string& etag, time_t *mtime);
@@ -80,6 +81,7 @@ class RGWRESTStreamReadRequest : public RGWRESTSimpleRequest {
   bufferlist in_data;
   size_t chunk_ofs;
   size_t ofs;
+  RGWHTTPManager http_manager;
 protected:
   int handle_header(const string& name, const string& val);
 public:
@@ -89,7 +91,7 @@ public:
   RGWRESTStreamReadRequest(CephContext *_cct, string& _url, RGWGetDataCB *_cb, list<pair<string, string> > *_headers,
                 list<pair<string, string> > *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params),
                 lock("RGWRESTStreamReadRequest"), cb(_cb),
-                chunk_ofs(0), ofs(0) {}
+                chunk_ofs(0), ofs(0), http_manager(_cct) {}
   ~RGWRESTStreamReadRequest() {}
   int get_obj(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj);
   int get_resource(RGWAccessKey& key, map<string, string>& extra_headers, const string& resource);