]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: initial work for integrating streaming read/write with cr
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 4 Sep 2017 12:15:11 +0000 (05:15 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:38 +0000 (08:05 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_coroutine.cc
src/rgw/rgw_coroutine.h
src/rgw/rgw_cr_rest.cc [new file with mode: 0644]
src/rgw/rgw_cr_rest.h
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h
src/rgw/rgw_rest_client.h

index 772b72c8d57a3bf681fe72a3492f0d264261c130..5ac5ea8bff89fd8db82afd11d95ec69523c31977 100644 (file)
@@ -89,6 +89,7 @@ set(rgw_a_srcs
   rgw_reshard.cc
   rgw_coroutine.cc
   rgw_cr_rados.cc
+  rgw_cr_rest.cc
   rgw_object_expirer_core.cc
   rgw_op.cc
   rgw_os_lib.cc
index 0a8b4c270a1b164edfc3f7df984263780413e8f0..88e4c6071317b6f1a56bf58a40ccd0382eb66e57 100644 (file)
@@ -452,6 +452,12 @@ void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *s
   context_stacks.insert(stack);
 }
 
+void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag)
+{
+  RWLock::WLocker wl(lock);
+  cr->set_sleeping(flag);
+}
+
 int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
 {
   int ret = 0;
@@ -811,6 +817,11 @@ void RGWCoroutine::wakeup()
   stack->wakeup();
 }
 
+RGWCoroutinesEnv *RGWCoroutine::get_env() const
+{
+  return stack->get_env();
+}
+
 void RGWCoroutine::dump(Formatter *f) const {
   if (!description.str().empty()) {
     encode_json("description", description.str(), f);
index 9ccba4b15c36182ac739a898e7a6229378b948aa..d3006b59a147fce47e6d662dccecdb5430ce6180 100644 (file)
@@ -282,6 +282,8 @@ public:
     return stack;
   }
 
+  RGWCoroutinesEnv *get_env() const;
+
   void dump(Formatter *f) const;
 };
 
@@ -463,7 +465,7 @@ public:
 
   bool unblock_stack(RGWCoroutinesStack **s);
 
-  RGWCoroutinesEnv *get_env() { return env; }
+  RGWCoroutinesEnv *get_env() const { return env; }
 
   void dump(Formatter *f) const;
 };
@@ -561,6 +563,8 @@ public:
   void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
   RGWCoroutinesStack *allocate_stack();
 
+  void set_sleeping(RGWCoroutine *cr, bool flag);
+
   virtual string get_id();
   void dump(Formatter *f) const;
 };
diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc
new file mode 100644 (file)
index 0000000..fa5d99e
--- /dev/null
@@ -0,0 +1,235 @@
+#include "rgw_cr_rest.h"
+
+#include "rgw_coroutine.h"
+
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
+#include <boost/asio/yield.hpp>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+class RGWCRHTTPGetDataCB : public RGWGetDataCB {
+  Mutex lock;
+  RGWCoroutinesEnv *env;
+  RGWCoroutine *cr;
+  bufferlist data;
+public:
+  RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr) {}
+
+  int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
+    {
+      Mutex::Locker l(lock);
+      if (bl_len == bl.length()) {
+        data.claim_append(bl);
+      } else {
+        bl.splice(0, bl_len, &data);
+      }
+    }
+
+    env->manager->io_complete(cr);
+    return 0;
+  }
+
+  void claim_data(bufferlist *dest, uint64_t max) {
+    Mutex::Locker l(lock);
+
+    if (data.length() == 0) {
+      return;
+    }
+
+    if (data.length() < max) {
+      max = data.length();
+    }
+
+    data.splice(0, max, dest);
+  }
+
+  bool has_data() {
+    return (data.length() > 0);
+  }
+};
+
+
+RGWStreamRWHTTPResourceCRF::~RGWStreamRWHTTPResourceCRF()
+{
+  delete in_cb;
+}
+
+int RGWStreamRWHTTPResourceCRF::init()
+{
+  in_cb = new RGWCRHTTPGetDataCB(env, caller);
+
+  req->set_user_info(env->stack);
+  req->set_in_cb(in_cb);
+
+  int r = http_manager->add_request(req);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+int RGWStreamRWHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
+{
+    reenter(&read_state) {
+    while (!req->is_done()) {
+      *io_pending = true;
+      if (!in_cb->has_data()) {
+        yield caller->io_block();
+      }
+      *io_pending = false;
+      in_cb->claim_data(out, max_size);
+      if (!req->is_done()) {
+        yield;
+      }
+    }
+  }
+  return 0;
+}
+
+int RGWStreamRWHTTPResourceCRF::write(bufferlist& data)
+{
+#warning write need to throttle and block
+  reenter(&write_state) {
+    while (!req->is_done()) {
+      yield req->add_send_data(data);
+    }
+  }
+  return 0;
+}
+
+TestCR::TestCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
+                                                                                        req(_req) {}
+TestCR::~TestCR() {
+  delete crf;
+}
+
+int TestCR::operate() {
+  reenter(this) {
+    crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, req);
+
+    {
+      int ret = crf->init();
+      if (ret < 0) {
+        return set_cr_error(ret);
+      }
+    }
+
+    do {
+
+      bl.clear();
+
+      do {
+        yield {
+          ret = crf->read(&bl, 4 * 1024 * 1024, &need_retry);
+          if (ret < 0)  {
+            return set_cr_error(ret);
+          }
+        }
+      } while (need_retry);
+
+      if (retcode < 0) {
+        dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl;
+        return set_cr_error(ret);
+      }
+
+      dout(0) << "read " << bl.length() << " bytes" << dendl;
+
+      if (bl.length() == 0) {
+        break;
+      }
+
+      yield {
+        ret = crf->write(bl);
+        if (ret < 0)  {
+          return set_cr_error(ret);
+        }
+      }
+
+      if (retcode < 0) {
+        dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl;
+        return set_cr_error(ret);
+      }
+
+      dout(0) << "wrote " << bl.length() << " bytes" << dendl;
+    } while (true);
+
+    return set_cr_done();
+  }
+  return 0;
+}
+
+TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
+                           RGWHTTPStreamRWRequest *_in_req,
+                           RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
+                                                               in_req(_in_req), out_req(_out_req) {}
+TestSpliceCR::~TestSpliceCR() {
+  delete in_crf;
+  delete out_crf;
+}
+
+int TestSpliceCR::operate() {
+  reenter(this) {
+    in_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, in_req);
+    out_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, out_req);
+
+    {
+      int ret = in_crf->init();
+      if (ret < 0) {
+        return set_cr_error(ret);
+      }
+    }
+
+    {
+      int ret = out_crf->init();
+      if (ret < 0) {
+        return set_cr_error(ret);
+      }
+    }
+
+    do {
+
+      bl.clear();
+
+      do {
+        yield {
+          ret = in_crf->read(&bl, 4 * 1024 * 1024, &need_retry);
+          if (ret < 0)  {
+            return set_cr_error(ret);
+          }
+        }
+
+        if (retcode < 0) {
+          dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl;
+          return set_cr_error(ret);
+        }
+      } while (need_retry);
+
+      dout(0) << "read " << bl.length() << " bytes" << dendl;
+
+      if (bl.length() == 0) {
+        break;
+      }
+
+      yield {
+        ret = out_crf->write(bl);
+        if (ret < 0)  {
+          return set_cr_error(ret);
+        }
+      }
+
+      if (retcode < 0) {
+        dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl;
+        return set_cr_error(ret);
+      }
+
+      dout(0) << "wrote " << bl.length() << " bytes" << dendl;
+    } while (true);
+
+    return set_cr_done();
+  }
+  return 0;
+}
index b47d7da1553f80b3c29c0cdd1282f9c6680cf22a..8d35b327a186f7ebe67e28e949d9fb8bb472c08a 100644 (file)
@@ -306,4 +306,71 @@ public:
   }
 };
 
+class RGWCRHTTPGetDataCB;
+
+class RGWStreamRWHTTPResourceCRF {
+  RGWCoroutinesEnv *env;
+  RGWCoroutine *caller;
+  RGWHTTPManager *http_manager;
+
+  RGWHTTPStreamRWRequest *req;
+
+  RGWCRHTTPGetDataCB *in_cb{nullptr};
+
+  boost::asio::coroutine read_state;
+  boost::asio::coroutine write_state;
+
+
+public:
+  RGWStreamRWHTTPResourceCRF(CephContext *_cct,
+                               RGWCoroutinesEnv *_env,
+                               RGWCoroutine *_caller,
+                               RGWHTTPManager *_http_manager,
+                               RGWHTTPStreamRWRequest *_req) : env(_env),
+                                                               caller(_caller),
+                                                               http_manager(_http_manager),
+                                                               req(_req) {}
+  ~RGWStreamRWHTTPResourceCRF();
+
+  int init();
+  int read(bufferlist *data, uint64_t max, bool *need_retry); /* reentrant */
+  int write(bufferlist& data); /* reentrant */
+};
+
+class TestCR : public RGWCoroutine {
+  CephContext *cct;
+  RGWHTTPManager *http_manager;
+  string url;
+  RGWHTTPStreamRWRequest *req{nullptr};
+  RGWStreamRWHTTPResourceCRF *crf{nullptr};
+  bufferlist bl;
+  bool need_retry{false};
+  int ret{0};
+public:
+  TestCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_req);
+  ~TestCR();
+
+  int operate();
+};
+
+class TestSpliceCR : public RGWCoroutine {
+  CephContext *cct;
+  RGWHTTPManager *http_manager;
+  string url;
+  RGWHTTPStreamRWRequest *in_req{nullptr};
+  RGWHTTPStreamRWRequest *out_req{nullptr};
+  RGWStreamRWHTTPResourceCRF *in_crf{nullptr};
+  RGWStreamRWHTTPResourceCRF *out_crf{nullptr};
+  bufferlist bl;
+  bool need_retry{false};
+  int ret{0};
+public:
+  TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
+               RGWHTTPStreamRWRequest *_in_req,
+               RGWHTTPStreamRWRequest *_out_req);
+  ~TestSpliceCR();
+
+  int operate();
+};
+
 #endif
index ccea463d2ec7ad17cc810914f03ccdc2cf155877..9867bcdf3122607fd0ef328103e9cdf30808ef05 100644 (file)
@@ -90,7 +90,12 @@ struct rgw_http_req_data : public RefCountedObject {
     cond.Signal();
   }
 
+  bool _is_done() {
+    return done;
+  }
+
   bool is_done() {
+    Mutex::Locker l(lock);
     return done;
   }
 
@@ -526,6 +531,11 @@ int RGWHTTPClient::init_request(rgw_http_req_data *_req_data, bool send_data_hin
   return 0;
 }
 
+bool RGWHTTPClient::is_done()
+{
+  return req_data->is_done();
+}
+
 /*
  * wait for async request to complete
  */
@@ -862,7 +872,7 @@ void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
   if (req_data->curl_handle) {
     curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
   }
-  if (!req_data->is_done()) {
+  if (!req_data->_is_done()) {
     _finish_request(req_data, -ECANCELED);
   }
 }
index dcc78d19904a6296088ab2058241383e91a1aa7b..c43b018e27d9435348bf48a395f5ff5dc6140781 100644 (file)
@@ -145,6 +145,8 @@ public:
   int process();
 
   int wait();
+  bool is_done();
+
   rgw_http_req_data *get_req_data() { return req_data; }
 
   string to_str();
index 51a7ec4310b0be5d846473c65b9ac464dc79bda0..990ca976912f72b79a8d3104a7ae8ab62032e415 100644 (file)
@@ -70,7 +70,7 @@ public:
 class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest {
   Mutex lock;
   Mutex write_lock;
-  RGWGetDataCB *cb;
+  RGWGetDataCB *cb{nullptr};
   bufferlist outbl;
   bufferlist in_data;
   size_t chunk_ofs{0};
@@ -85,6 +85,10 @@ public:
   int send_data(void *ptr, size_t len, bool *pause) override;
   int receive_data(void *ptr, size_t len) override;
 
+  RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url,
+                         param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params),
+                                                                        lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock") {
+  }
   RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb,
                          param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params),
                                                                         lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock"), cb(_cb) {