]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: cr rest splice, both reads and writes are throttled
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 4 Nov 2017 03:26:31 +0000 (20:26 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:39 +0000 (08:05 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h

index da812a9bd5c3728d53b77d1e6e5262cd3c25188a..cfb1241ffebb43b2938e88489ab777c1d3ded4b7 100644 (file)
@@ -14,12 +14,17 @@ class RGWCRHTTPGetDataCB : public RGWHTTPStreamRWRequest::ReceiveCB {
   Mutex lock;
   RGWCoroutinesEnv *env;
   RGWCoroutine *cr;
+  RGWHTTPStreamRWRequest *req;
   rgw_io_id io_id;
   bufferlist data;
   bufferlist extra_data;
   bool got_all_extra_data{false};
+  bool paused{false};
 public:
-  RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, const rgw_io_id& _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {}
+  RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), req(_req) {
+    io_id = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
+    req->set_in_cb(this);
+  }
 
   int handle_data(bufferlist& bl, bool *pause) override {
     {
@@ -40,25 +45,39 @@ public:
       data.append(bl);
     }
 
-#define GET_DATA_WINDOW_SIZE 1 * 1024 * 1024
-    if (data.length() >= GET_DATA_WINDOW_SIZE) {
+#define GET_DATA_WINDOW_SIZE 2 * 1024 * 1024
+    uint64_t data_len = data.length();
+    if (data_len >= GET_DATA_WINDOW_SIZE) {
       env->manager->io_complete(cr, io_id);
     }
+    if (data_len >= 2 * GET_DATA_WINDOW_SIZE) {
+      *pause = true;
+      paused = true;
+    }
     return 0;
   }
 
   void claim_data(bufferlist *dest, uint64_t max) {
-    Mutex::Locker l(lock);
+    bool need_to_unpause = false;
 
-    if (data.length() == 0) {
-      return;
-    }
+    {
+      Mutex::Locker l(lock);
 
-    if (data.length() < max) {
-      max = data.length();
+      if (data.length() == 0) {
+        return;
+      }
+
+      if (data.length() < max) {
+        max = data.length();
+      }
+
+      data.splice(0, max, dest);
+      need_to_unpause = (paused && data.length() <= GET_DATA_WINDOW_SIZE);
     }
 
-    data.splice(0, max, dest);
+    if (need_to_unpause) {
+      req->unpause_receive();
+    }
   }
 
   bufferlist& get_extra_data() {
@@ -84,9 +103,7 @@ int RGWStreamReadHTTPResourceCRF::init()
 {
   env->stack->init_new_io(req);
 
-  in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
-
-  req->set_in_cb(in_cb);
+  in_cb = new RGWCRHTTPGetDataCB(env, caller, req);
 
   int r = http_manager->add_request(req);
   if (r < 0) {
@@ -215,7 +232,7 @@ int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data, bool *io_pending)
           /* it's ok to unlock here, even if io_complete() arrives before io_block(), it'll wakeup
            * correctly */
         }
-        yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
+        yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_WRITE | RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
       }
       yield req->add_send_data(data);
     }
index 39d585bd5b5cc7f4a7e92f5277f4b22ced6e1714..d4eadaac03dd02f46086ed3eefbb0c71ea3b5acb 100644 (file)
@@ -55,28 +55,7 @@ struct rgw_http_req_data : public RefCountedObject {
     return ret;
   }
 
-  void set_state(int bitmask) {
-    Mutex::Locker l(lock);
-    CURLcode rc;
-    int bitmask;
-    switch (state) {
-      case SET_WRITE_PAUSED:
-        bitmask = CURLPAUSE_SEND;
-        break;
-      case SET_WRITE_RESUME:
-        bitmask = CURLPAUSE_CONT;
-        break;
-      default:
-        /* shouldn't really be here */
-        return;
-    }
-
-    rc = curl_easy_pause(**curl_handle, bitmask);
-    if (rc != CURLE_OK) {
-      dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
-    }
-  }
-
+  void set_state(int bitmask);
 
   void finish(int r) {
     Mutex::Locker l(lock);
@@ -313,7 +292,7 @@ size_t RGWHTTPClient::receive_http_data(void * const ptr,
 
   bool pause = false;
 
-  size_t skip_bytes = req_data->client->receive_pause_skip;
+  size_t& skip_bytes = req_data->client->receive_pause_skip;
 
   if (skip_bytes >= len) {
     skip_bytes -= len;
@@ -334,7 +313,9 @@ size_t RGWHTTPClient::receive_http_data(void * const ptr,
   skip_bytes = 0;
  
   if (pause) {
+    dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
     skip_bytes = len;
+    req_data->read_paused = true;
     return CURL_WRITEFUNC_PAUSE;
   }
 
@@ -983,8 +964,9 @@ int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetSt
     return -EINVAL;
   }
 
-  bool suggested_wr_paused;
-  bool suggested_rd_paused;
+  bool suggested_wr_paused = req_data->write_paused;
+  bool suggested_rd_paused = req_data->read_paused;
+
   switch (state) {
     case SET_WRITE_PAUSED:
       suggested_wr_paused = true;
index cdd33e01c5bd2b63ba085f9cff0b9e7cbaf0999b..9c64feb19aa73b0631ee3d5de9dec9c2e9182496 100644 (file)
@@ -65,7 +65,6 @@ class RGWHTTPClient : public RGWIOProvider
 
   bufferlist send_bl;
   bufferlist::iterator send_iter;
-  size_t send_len;
   bool has_send_len;
   long http_status;
   size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called
@@ -85,6 +84,8 @@ protected:
   string method;
   string url;
 
+  size_t send_len{0};
+
   param_vec_t headers;
 
   RGWHTTPManager *get_manager();
@@ -139,8 +140,7 @@ public:
   explicit RGWHTTPClient(CephContext *cct,
                          const string& _method,
                          const string& _url)
-    : send_len(0),
-      has_send_len(false),
+    : has_send_len(false),
       http_status(HTTP_STATUS_NOSTATUS),
       req_data(nullptr),
       verify_ssl(cct->_conf->rgw_verify_ssl),
index c802e81bee4e2de5bf31888e2dfd659defd163c4..9baf60aa079c3eb16cd335febf5e808e4e3f78cc 100644 (file)
@@ -752,8 +752,10 @@ int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len, bool *pause)
   size_t orig_len = len;
 
   if (cb) {
-    bufferptr bp((const char *)ptr, len);
-    in_data.append(bp);
+    in_data.append((const char *)ptr, len);
+
+    size_t orig_in_data_len = in_data.length();
+
     int ret = cb->handle_data(in_data, pause);
     if (ret < 0)
       return ret;
@@ -761,9 +763,13 @@ int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len, bool *pause)
       in_data.clear();
     } else {
       /* partial read */
+      assert(in_data.length() <= orig_in_data_len);
       len = ret;
       bufferlist bl;
-      in_data.splice(0, len, &bl);
+      size_t left_to_read = orig_in_data_len - len;
+      if (in_data.length() > left_to_read) {
+        in_data.splice(0, in_data.length() - left_to_read, &bl);
+      }
     }
   }
   ofs += len;
@@ -775,6 +781,14 @@ void RGWHTTPStreamRWRequest::set_stream_write(bool s) {
   stream_writes = s;
 }
 
+void RGWHTTPStreamRWRequest::unpause_receive()
+{
+  Mutex::Locker req_locker(get_req_lock());
+  if (!read_paused) {
+    _set_read_paused(false);
+  }
+}
+
 void RGWHTTPStreamRWRequest::add_send_data(bufferlist& bl)
 {
   Mutex::Locker req_locker(get_req_lock());
@@ -805,7 +819,8 @@ int RGWHTTPStreamRWRequest::send_data(void *ptr, size_t len, bool *pause)
     Mutex::Locker wl(write_lock);
 
     if (outbl.length() == 0) {
-      if (stream_writes && !write_stream_complete) {
+      if ((stream_writes && !write_stream_complete) ||
+          (write_ofs < send_len)) {
         *pause = true;
       }
       return 0;
index 816c87474eddb66d723ddb180765c64b8d83902e..5d033ab41e5a7d822999b2a85561c7a506df1f5f 100644 (file)
@@ -130,6 +130,8 @@ public:
   void set_in_cb(ReceiveCB *_cb) { cb = _cb; }
   void set_write_drain_cb(RGWWriteDrainCB *_cb) { write_drain_cb = _cb; }
 
+  void unpause_receive();
+
   void add_send_data(bufferlist& bl);
 
   void set_stream_write(bool s);