]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: cr rest splice, work towards write throttling
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 31 Oct 2017 16:44:15 +0000 (09:44 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:39 +0000 (08:05 -0700)
Need to throttle writes, so that we don't just accumulate all data read
from source endpoint in memory, in the case where the write endpoint is
too slow.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_coroutine.cc
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_cr_rest.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h

index ecc30fad808253fc4d7704a102a120e076c8e5d6..cbdeecd84cdd28cb30397999ccf4b7b0a4cca007 100644 (file)
@@ -62,6 +62,7 @@ void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_
     cns.erase(cn);
   }
 
+#warning shouldn't have more than one entry in complete_reqs per io_id
   complete_reqs.push_back(io_completion{io_id, user_info});
   cond.Signal();
 }
index f489c239ad27cacf54d03f07780bef3168df7505..3d5fae456da86ee5659e779b214322ebfd8c0d57 100644 (file)
@@ -86,7 +86,7 @@ int RGWStreamReadHTTPResourceCRF::init()
 {
   env->stack->init_new_io(req);
 
-  in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ));
+  in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
 
   req->set_in_cb(in_cb);
 
@@ -102,6 +102,8 @@ int RGWStreamWriteHTTPResourceCRF::send()
 {
   env->stack->init_new_io(req);
 
+  req->set_write_drain_cb(&write_drain_notify_cb);
+
   int r = http_manager->add_request(req);
   if (r < 0) {
     return r;
@@ -164,7 +166,7 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool
          */
         continue;
       }
-      if (!req->is_done()) {
+      if (!req->is_done() || out->length() >= max_size) {
         yield;
       }
     }
@@ -185,11 +187,38 @@ void RGWStreamWriteHTTPResourceCRF::send_ready(const rgw_rest_obj& rest_obj)
   }
 }
 
-int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data)
+#define PENDING_WRITES_WINDOW (1 * 1024 * 1024)
+
+void RGWStreamWriteHTTPResourceCRF::write_drain_notify(uint64_t pending_size)
+{
+  lock_guard l(blocked_lock);
+  if (is_blocked && (pending_size < PENDING_WRITES_WINDOW / 2)) {
+    env->manager->io_complete(caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_WRITE | RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
+    is_blocked = false;
+  }
+}
+
+void RGWStreamWriteHTTPResourceCRF::WriteDrainNotify::notify(uint64_t pending_size)
+{
+  crf->write_drain_notify(pending_size);
+}
+
+int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data, bool *io_pending)
 {
-#warning write need to throttle and block
   reenter(&write_state) {
     while (!req->is_done()) {
+      *io_pending = false;
+      if (req->get_pending_send_size() >= PENDING_WRITES_WINDOW) {
+        *io_pending = true;
+        {
+          lock_guard l(blocked_lock);
+          is_blocked = true;
+
+          /* it's ok to unlock here, even if io_complete() arrives before io_block(), it'll wakeup
+           * correctly */
+        }
+        yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
+      }
       yield req->add_send_data(data);
     }
   }
@@ -275,18 +304,20 @@ int RGWStreamSpliceCR::operate() {
 
       total_read += bl.length();
 
-      yield {
-        ldout(cct, 20) << "writing " << bl.length() << " bytes" << dendl;
-        ret = out_crf->write(bl);
-        if (ret < 0)  {
-          return set_cr_error(ret);
+      do {
+        yield {
+          ldout(cct, 20) << "writing " << bl.length() << " bytes" << dendl;
+          ret = out_crf->write(bl, &need_retry);
+          if (ret < 0)  {
+            return set_cr_error(ret);
+          }
         }
-      }
 
-      if (retcode < 0) {
-        ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl;
-        return set_cr_error(ret);
-      }
+        if (retcode < 0) {
+          ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl;
+          return set_cr_error(ret);
+        }
+      } while (need_retry);
     } while (true);
 
     do {
index c8bf4763ae5e15d1c37a0e928308eb9c3cd7cbbd..66bb663ecf9dcaf9d8b5131420874ef63d02f83c 100644 (file)
@@ -2,6 +2,7 @@
 #define CEPH_RGW_CR_REST_H
 
 #include <boost/intrusive_ptr.hpp>
+#include <mutex>
 #include "include/assert.h" // boost header clobbers our assert.h
 
 #include "rgw_coroutine.h"
@@ -349,7 +350,7 @@ public:
   virtual int init() = 0;
   virtual void send_ready(const rgw_rest_obj& rest_obj) = 0;
   virtual int send() = 0;
-  virtual int write(bufferlist& data) = 0; /* reentrant */
+  virtual int write(bufferlist& data, bool *need_retry) = 0; /* reentrant */
   virtual int drain_writes(bool *need_retry) = 0; /* reentrant */
 };
 
@@ -421,6 +422,11 @@ protected:
   RGWCoroutine *caller;
   RGWHTTPManager *http_manager;
 
+  using lock_guard = std::lock_guard<std::mutex>;
+
+  std::mutex blocked_lock;
+  bool is_blocked;
+
   RGWHTTPStreamRWRequest *req{nullptr};
 
   struct multipart_info {
@@ -430,13 +436,21 @@ protected:
     uint64_t part_size;
   } multipart;
 
+  class WriteDrainNotify : public RGWWriteDrainCB {
+    RGWStreamWriteHTTPResourceCRF *crf;
+  public:
+    WriteDrainNotify(RGWStreamWriteHTTPResourceCRF *_crf) : crf(_crf) {}
+    void notify(uint64_t pending_size) override;
+  } write_drain_notify_cb;
+
 public:
   RGWStreamWriteHTTPResourceCRF(CephContext *_cct,
                                RGWCoroutinesEnv *_env,
                                RGWCoroutine *_caller,
                                RGWHTTPManager *_http_manager) : env(_env),
                                                                caller(_caller),
-                                                               http_manager(_http_manager) {}
+                                                               http_manager(_http_manager),
+                                                               write_drain_notify_cb(this) {}
   virtual ~RGWStreamWriteHTTPResourceCRF() {}
 
   int init() override {
@@ -444,7 +458,8 @@ public:
   }
   void send_ready(const rgw_rest_obj& rest_obj) override;
   int send() override;
-  int write(bufferlist& data) override; /* reentrant */
+  int write(bufferlist& data, bool *need_retry) override; /* reentrant */
+  void write_drain_notify(uint64_t pending_size);
   int drain_writes(bool *need_retry) override; /* reentrant */
 
   virtual void handle_headers(const std::map<string, string>& headers) {}
index 9a791866622bb5062741c7fefcba04f78dc13e2d..ca63e09fb1d4bb843c67d41f37c85a6b2d236149 100644 (file)
@@ -774,6 +774,12 @@ void RGWHTTPStreamRWRequest::add_send_data(bufferlist& bl)
   _set_write_paused(false);
 }
 
+uint64_t RGWHTTPStreamRWRequest::get_pending_send_size()
+{
+  Mutex::Locker wl(write_lock);
+  return outbl.length();
+}
+
 void RGWHTTPStreamRWRequest::finish_write()
 {
   Mutex::Locker req_locker(get_req_lock());
@@ -784,23 +790,34 @@ void RGWHTTPStreamRWRequest::finish_write()
 
 int RGWHTTPStreamRWRequest::send_data(void *ptr, size_t len, bool *pause)
 {
-  Mutex::Locker wl(write_lock);
+  uint64_t out_len;
+  uint64_t send_size;
+  {
+    Mutex::Locker wl(write_lock);
 
-  if (outbl.length() == 0) {
-    if (stream_writes && !write_stream_complete) {
-      *pause = true;
+    if (outbl.length() == 0) {
+      if (stream_writes && !write_stream_complete) {
+        *pause = true;
+      }
+      return 0;
     }
-    return 0;
-  }
 
-  len = std::min(len, (size_t)outbl.length());
+    len = std::min(len, (size_t)outbl.length());
+
+    bufferlist bl;
+    outbl.splice(0, len, &bl);
+    send_size = bl.length();
+    if (send_size > 0) {
+      memcpy(ptr, bl.c_str(), send_size);
+      write_ofs += send_size;
+    }
 
-  bufferlist bl;
-  outbl.splice(0, len, &bl);
-  uint64_t send_size = bl.length();
-  if (send_size > 0) {
-    memcpy(ptr, bl.c_str(), send_size);
-    write_ofs += send_size;
+    out_len = outbl.length();
+  }
+  /* don't need to be under write_lock here, avoid deadlocks in case notify callback
+   * needs to lock */
+  if (write_drain_cb) {
+    write_drain_cb->notify(out_len);
   }
   return send_size;
 }
index 277263bc9723a322b3a01556eed2e7620c89d3d0..964aa0dff7c3ffed9ab4511390a5a92474977a99 100644 (file)
@@ -69,11 +69,19 @@ public:
   int forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl);
 };
 
+class RGWWriteDrainCB {
+public:
+  RGWWriteDrainCB() = default;
+  virtual ~RGWWriteDrainCB() = default;
+  virtual void notify(uint64_t pending_size) = 0;
+};
+
 
 class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest {
   Mutex lock;
   Mutex write_lock;
   RGWGetDataCB *cb{nullptr};
+  RGWWriteDrainCB *write_drain_cb{nullptr};
   bufferlist outbl;
   bufferlist in_data;
   size_t chunk_ofs{0};
@@ -103,11 +111,14 @@ public:
   }
 
   void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; }
+  void set_write_drain_cb(RGWWriteDrainCB *_cb) { write_drain_cb = _cb; }
 
   void add_send_data(bufferlist& bl);
 
   void set_stream_write(bool s);
 
+  uint64_t get_pending_send_size();
+
   /* finish streaming writes */
   void finish_write();
 };