]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: RGWHTTPClient requests can unregister themselves early
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 5 May 2016 17:57:05 +0000 (10:57 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 9 May 2016 21:38:22 +0000 (14:38 -0700)
No need to wait for req_state to complete anymore.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h

index bcb6250b51299c1ed204ce60cf8d67df6f053006..fe24f163e89cbbad8d923ddcc6ca8702ca935bf9 100644 (file)
 
 #define dout_subsys ceph_subsys_rgw
 
-static size_t receive_http_header(void *ptr, size_t size, size_t nmemb, void *_info)
+struct rgw_http_req_data : public RefCountedObject {
+  CURL *easy_handle;
+  curl_slist *h;
+  uint64_t id;
+  int ret;
+  atomic_t done;
+  RGWHTTPClient *client;
+  void *user_info;
+  bool registered;
+  RGWHTTPManager *mgr;
+  char error_buf[CURL_ERROR_SIZE];
+
+  Mutex lock;
+  Cond cond;
+
+  rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
+                        client(nullptr), user_info(nullptr), registered(false),
+                        mgr(NULL), lock("rgw_http_req_data::lock") {
+    memset(error_buf, 0, sizeof(error_buf));
+  }
+
+  int wait() {
+    Mutex::Locker l(lock);
+    cond.Wait(lock);
+    return ret;
+  }
+
+  void finish(int r) {
+    Mutex::Locker l(lock);
+    ret = r;
+    if (easy_handle)
+      curl_easy_cleanup(easy_handle);
+
+    if (h)
+      curl_slist_free_all(h);
+
+    easy_handle = NULL;
+    h = NULL;
+    done.set(1);
+    cond.Signal();
+  }
+
+  bool is_done() {
+    return done.read() != 0;
+  }
+
+  int get_retcode() {
+    Mutex::Locker l(lock);
+    return ret;
+  }
+};
+
+/*
+ * the simple set of callbacks will be called on RGWHTTPClient::process()
+ */
+static size_t simple_receive_http_header(void *ptr, size_t size, size_t nmemb, void *_info)
 {
   RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
   size_t len = size * nmemb;
@@ -26,7 +81,7 @@ static size_t receive_http_header(void *ptr, size_t size, size_t nmemb, void *_i
   return len;
 }
 
-static size_t receive_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
+static size_t simple_receive_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
 {
   RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
   size_t len = size * nmemb;
@@ -38,7 +93,7 @@ static size_t receive_http_data(void *ptr, size_t size, size_t nmemb, void *_inf
   return len;
 }
 
-static size_t send_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
+static size_t simple_send_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
 {
   RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
   int ret = client->send_data(ptr, size * nmemb);
@@ -49,6 +104,66 @@ static size_t send_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
   return ret;
 }
 
+/*
+ * the following set of callbacks will be called either on RGWHTTPManager::process(),
+ * or via the RGWHTTPManager async processing.
+ */
+static size_t receive_http_header(void *ptr, size_t size, size_t nmemb, void *_info)
+{
+  rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
+  size_t len = size * nmemb;
+
+  Mutex::Locker l(req_data->lock);
+  
+  if (!req_data->registered) {
+    return len;
+  }
+
+  int ret = req_data->client->receive_header(ptr, size * nmemb);
+  if (ret < 0) {
+    dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
+  }
+
+  return len;
+}
+
+static size_t receive_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
+{
+  rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
+  size_t len = size * nmemb;
+
+  Mutex::Locker l(req_data->lock);
+  
+  if (!req_data->registered) {
+    return len;
+  }
+  
+  int ret = req_data->client->receive_data(ptr, size * nmemb);
+  if (ret < 0) {
+    dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
+  }
+
+  return len;
+}
+
+static size_t send_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
+{
+  rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
+
+  Mutex::Locker l(req_data->lock);
+  
+  if (!req_data->registered) {
+    return 0;
+  }
+
+  int ret = req_data->client->send_data(ptr, size * nmemb);
+  if (ret < 0) {
+    dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
+  }
+
+  return ret;
+}
+
 static curl_slist *headers_to_slist(param_vec_t& headers)
 {
   curl_slist *h = NULL;
@@ -79,6 +194,10 @@ static curl_slist *headers_to_slist(param_vec_t& headers)
   return h;
 }
 
+/*
+ * process a single simple one off request, not going through RGWHTTPManager. Not using
+ * req_data.
+ */
 int RGWHTTPClient::process(const char *method, const char *url)
 {
   int ret = 0;
@@ -99,15 +218,15 @@ int RGWHTTPClient::process(const char *method, const char *url)
   curl_easy_setopt(curl_handle, CURLOPT_URL, url);
   curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
   curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
-  curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
+  curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header);
   curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
-  curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
+  curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data);
   curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
   curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
   if (h) {
     curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
   }
-  curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, send_http_data);
+  curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data);
   curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
   curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L); 
   if (has_send_len) {
@@ -131,57 +250,6 @@ int RGWHTTPClient::process(const char *method, const char *url)
   return ret;
 }
 
-struct rgw_http_req_data : public RefCountedObject {
-  CURL *easy_handle;
-  curl_slist *h;
-  uint64_t id;
-  int ret;
-  atomic_t done;
-  RGWHTTPClient *client;
-  void *user_info;
-  RGWHTTPManager *mgr;
-  char error_buf[CURL_ERROR_SIZE];
-
-  Mutex lock;
-  Cond cond;
-
-  rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
-                        client(nullptr), user_info(nullptr),
-                        mgr(NULL), lock("rgw_http_req_data::lock") {
-    memset(error_buf, 0, sizeof(error_buf));
-  }
-
-  int wait() {
-    Mutex::Locker l(lock);
-    cond.Wait(lock);
-    return ret;
-  }
-
-  void finish(int r) {
-    Mutex::Locker l(lock);
-    ret = r;
-    if (easy_handle)
-      curl_easy_cleanup(easy_handle);
-
-    if (h)
-      curl_slist_free_all(h);
-
-    easy_handle = NULL;
-    h = NULL;
-    done.set(1);
-    cond.Signal();
-  }
-
-  bool is_done() {
-    return done.read() != 0;
-  }
-
-  int get_retcode() {
-    Mutex::Locker l(lock);
-    return ret;
-  }
-};
-
 string RGWHTTPClient::to_str()
 {
   string method_str = (last_method.empty() ? "<no-method>" : last_method);
@@ -198,6 +266,9 @@ int RGWHTTPClient::get_req_retcode()
   return req_data->get_retcode();
 }
 
+/*
+ * init request, will be used later with RGWHTTPManager
+ */
 int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data)
 {
   assert(!req_data);
@@ -224,15 +295,15 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re
   curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
   curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
   curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
-  curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)this);
+  curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
   curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
-  curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)this);
+  curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
   curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
   if (h) {
     curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
   }
   curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
-  curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)this);
+  curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
   curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L); 
   if (has_send_len) {
     curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len); 
@@ -242,6 +313,9 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re
   return 0;
 }
 
+/*
+ * wait for async request to complete
+ */
 int RGWHTTPClient::wait()
 {
   if (!req_data->is_done()) {
@@ -254,7 +328,8 @@ int RGWHTTPClient::wait()
 RGWHTTPClient::~RGWHTTPClient()
 {
   if (req_data) {
-    wait();
+    req_data->mgr->remove_request(this);
+
     req_data->put();
   }
 }
@@ -350,6 +425,9 @@ void *RGWHTTPManager::ReqsThread::entry()
   return NULL;
 }
 
+/*
+ * RGWHTTPManager has two modes of operation: threaded and non-threaded.
+ */
 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
                                                     completion_mgr(_cm), is_threaded(false),
                                                     reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
@@ -370,11 +448,21 @@ void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
 {
   RWLock::WLocker rl(reqs_lock);
   req_data->id = num_reqs;
+  req_data->registered = true;
   reqs[num_reqs] = req_data;
   num_reqs++;
   ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
 }
 
+void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
+{
+  RWLock::WLocker rl(reqs_lock);
+  req_data->get();
+  req_data->registered = false;
+  unregistered_reqs.push_back(req_data);
+  ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
+}
+
 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
 {
   RWLock::WLocker rl(reqs_lock);
@@ -390,6 +478,7 @@ void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
   if (completion_mgr) {
     completion_mgr->complete(NULL, req_data->user_info);
   }
+
   req_data->put();
 }
 
@@ -405,6 +494,9 @@ void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
   _complete_request(req_data);
 }
 
+/*
+ * hook request to the curl multi handle
+ */
 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
 {
   ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
@@ -416,10 +508,30 @@ int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
   return 0;
 }
 
-void RGWHTTPManager::link_pending_requests()
+/*
+ * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
+ * there will be no more processing on this request
+ */
+void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
+{
+  if (req_data->easy_handle) {
+    curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle);
+  }
+  if (!req_data->is_done()) {
+    _finish_request(req_data, -ECANCELED);
+  }
+}
+
+void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
+{
+  RWLock::WLocker wl(reqs_lock);
+  _unlink_request(req_data);
+}
+
+void RGWHTTPManager::manage_pending_requests()
 {
   reqs_lock.get_read();
-  if (max_threaded_req == num_reqs) {
+  if (max_threaded_req == num_reqs && unregistered_reqs.empty()) {
     reqs_lock.unlock();
     return;
   }
@@ -427,6 +539,15 @@ void RGWHTTPManager::link_pending_requests()
 
   RWLock::WLocker wl(reqs_lock);
 
+  if (!unregistered_reqs.empty()) {
+    for (auto& r : unregistered_reqs) {
+      _unlink_request(r);
+      r->put();
+    }
+
+    unregistered_reqs.clear();
+  }
+
   map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
 
   list<std::pair<rgw_http_req_data *, int> > remove_reqs;
@@ -483,6 +604,26 @@ int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const
   return ret;
 }
 
+int RGWHTTPManager::remove_request(RGWHTTPClient *client)
+{
+  rgw_http_req_data *req_data = client->get_req_data();
+
+  if (!is_threaded) {
+    unlink_request(req_data);
+    return 0;
+  }
+  unregister_request(req_data);
+  int ret = signal_thread();
+  if (ret < 0) {
+    return ret;
+  }
+
+  return 0;
+}
+
+/*
+ * the synchronous, non-threaded request processing method.
+ */
 int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
 {
   assert(!is_threaded);
@@ -537,6 +678,9 @@ int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
   return 0;
 }
 
+/*
+ * the synchronous, non-threaded request processing completion method.
+ */
 int RGWHTTPManager::complete_requests()
 {
   bool done;
@@ -605,7 +749,7 @@ void *RGWHTTPManager::reqs_thread_entry()
       return NULL;
     }
 
-    link_pending_requests();
+    manage_pending_requests();
 
     mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
     switch (mstatus) {
index 1d69cedfe39f6f5580d1f831be07f66430fc003c..3587f227efbfc2fcfebdf335ba945b27c8cb4446 100644 (file)
@@ -114,6 +114,7 @@ class RGWHTTPManager {
 
   RWLock reqs_lock;
   map<uint64_t, rgw_http_req_data *> reqs;
+  list<rgw_http_req_data *> unregistered_reqs;
   map<uint64_t, rgw_http_req_data *> complete_reqs;
   int64_t num_reqs;
   int64_t max_threaded_req;
@@ -122,11 +123,14 @@ class RGWHTTPManager {
   void register_request(rgw_http_req_data *req_data);
   void complete_request(rgw_http_req_data *req_data);
   void _complete_request(rgw_http_req_data *req_data);
+  void unregister_request(rgw_http_req_data *req_data);
+  void _unlink_request(rgw_http_req_data *req_data);
+  void unlink_request(rgw_http_req_data *req_data);
   void finish_request(rgw_http_req_data *req_data, int r);
   void _finish_request(rgw_http_req_data *req_data, int r);
   int link_request(rgw_http_req_data *req_data);
 
-  void link_pending_requests();
+  void manage_pending_requests();
 
   class ReqsThread : public Thread {
     RGWHTTPManager *manager;
@@ -150,6 +154,7 @@ public:
   void stop();
 
   int add_request(RGWHTTPClient *client, const char *method, const char *url);
+  int remove_request(RGWHTTPClient *client);
 
   /* only for non threaded case */
   int process_requests(bool wait_for_data, bool *done);