]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: http_client, add the ability to pause/unpause incoming data
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 3 Nov 2017 23:30:52 +0000 (16:30 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:39 +0000 (08:05 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h

index fc884668dd126d734cb5f7f50b50718421a02199..39d585bd5b5cc7f4a7e92f5277f4b22ced6e1714 100644 (file)
@@ -40,6 +40,7 @@ struct rgw_http_req_data : public RefCountedObject {
   RGWHTTPManager *mgr{nullptr};
   char error_buf[CURL_ERROR_SIZE];
   bool write_paused{false};
+  bool read_paused{false};
 
   Mutex lock;
   Cond cond;
@@ -54,7 +55,7 @@ struct rgw_http_req_data : public RefCountedObject {
     return ret;
   }
 
-  void set_state(RGWHTTPRequestSetState state) {
+  void set_state(int bitmask) {
     Mutex::Locker l(lock);
     CURLcode rc;
     int bitmask;
@@ -69,6 +70,7 @@ struct rgw_http_req_data : public RefCountedObject {
         /* shouldn't really be here */
         return;
     }
+
     rc = curl_easy_pause(**curl_handle, bitmask);
     if (rc != CURLE_OK) {
       dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
@@ -333,10 +335,11 @@ size_t RGWHTTPClient::receive_http_data(void * const ptr,
  
   if (pause) {
     skip_bytes = len;
-  } else {
-    skip_bytes = 0;
+    return CURL_WRITEFUNC_PAUSE;
   }
 
+  skip_bytes = 0;
+
   return len;
 }
 
@@ -389,6 +392,21 @@ void RGWHTTPClient::_set_write_paused(bool pause)
   }
 }
 
+void RGWHTTPClient::_set_read_paused(bool pause)
+{
+  assert(req_data->lock.is_locked());
+  
+  RGWHTTPManager *mgr = req_data->mgr;
+  if (pause == req_data->read_paused) {
+    return;
+  }
+  if (pause) {
+    mgr->set_request_state(this, SET_READ_PAUSED);
+  } else {
+    mgr->set_request_state(this, SET_READ_RESUME);
+  }
+}
+
 static curl_slist *headers_to_slist(param_vec_t& headers)
 {
   curl_slist *h = NULL;
@@ -815,7 +833,7 @@ void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
 
 void RGWHTTPManager::_set_req_state(set_state& ss)
 {
-  ss.req->set_state(ss.state);
+  ss.req->set_state(ss.bitmask);
 }
 /*
  * hook request to the curl multi handle
@@ -965,25 +983,44 @@ int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetSt
     return -EINVAL;
   }
 
-  bool suggested_paused;
+  bool suggested_wr_paused;
+  bool suggested_rd_paused;
   switch (state) {
     case SET_WRITE_PAUSED:
-      suggested_paused = true;
+      suggested_wr_paused = true;
       break;
     case SET_WRITE_RESUME:
-      suggested_paused = false;
+      suggested_wr_paused = false;
+      break;
+    case SET_READ_PAUSED:
+      suggested_rd_paused = true;
+      break;
+    case SET_READ_RESUME:
+      suggested_rd_paused = false;
       break;
     default:
       /* shouldn't really be here */
       return -EIO;
   }
-  if (suggested_paused == req_data->write_paused) {
+  if (suggested_wr_paused == req_data->write_paused &&
+      suggested_rd_paused == req_data->read_paused) {
     return 0;
   }
 
-  req_data->write_paused = suggested_paused;
+  req_data->write_paused = suggested_wr_paused;
+  req_data->read_paused = suggested_rd_paused;
+
+  int bitmask = CURLPAUSE_CONT;
+
+  if (req_data->write_paused) {
+    bitmask |= CURLPAUSE_SEND;
+  }
+
+  if (req_data->read_paused) {
+    bitmask |= CURLPAUSE_RECV;
+  }
 
-  reqs_change_state.push_back(set_state(req_data, state));
+  reqs_change_state.push_back(set_state(req_data, bitmask));
   int ret = signal_thread();
   if (ret < 0) {
     return ret;
index e276d4d6fe074618e7eacf3eb0344bed014f4129..cdd33e01c5bd2b63ba085f9cff0b9e7cbaf0999b 100644 (file)
@@ -125,6 +125,7 @@ protected:
 
   /* needs to be called under req_lock() */
   void _set_write_paused(bool pause);
+  void _set_read_paused(bool pause);
 public:
   static const long HTTP_STATUS_NOSTATUS     = 0;
   static const long HTTP_STATUS_UNAUTHORIZED = 401;
@@ -277,14 +278,16 @@ enum RGWHTTPRequestSetState {
   SET_NOP = 0,
   SET_WRITE_PAUSED = 1,
   SET_WRITE_RESUME = 2,
+  SET_READ_PAUSED  = 3,
+  SET_READ_RESUME  = 4,
 };
 
 class RGWHTTPManager {
   struct set_state {
     rgw_http_req_data *req;
-    RGWHTTPRequestSetState state;
+    int bitmask;
 
-    set_state(rgw_http_req_data *_req, RGWHTTPRequestSetState _state) : req(_req), state(_state) {}
+    set_state(rgw_http_req_data *_req, int _bitmask) : req(_req), bitmask(_bitmask) {}
   };
   CephContext *cct;
   RGWCompletionManager *completion_mgr;