]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: separte stream crfs for read and write
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 2 Oct 2017 22:55:37 +0000 (15:55 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:38 +0000 (08:05 -0700)
also add a base class

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_cr_rest.h

index 5a5c8e944f2cd1cd972ba1517d5ef9905a11b017..626fdcbaf419df38a1aacb0140f403fcbf91ad97 100644 (file)
@@ -53,12 +53,12 @@ public:
 };
 
 
-RGWStreamRWHTTPResourceCRF::~RGWStreamRWHTTPResourceCRF()
+RGWStreamReadHTTPResourceCRF::~RGWStreamReadHTTPResourceCRF()
 {
   delete in_cb;
 }
 
-int RGWStreamRWHTTPResourceCRF::init()
+int RGWStreamReadHTTPResourceCRF::init()
 {
   env->stack->init_new_io(req);
 
@@ -74,7 +74,19 @@ int RGWStreamRWHTTPResourceCRF::init()
   return 0;
 }
 
-int RGWStreamRWHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
+int RGWStreamWriteHTTPResourceCRF::init()
+{
+  env->stack->init_new_io(req);
+
+  int r = http_manager->add_request(req);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
 {
     reenter(&read_state) {
     while (!req->is_done()) {
@@ -92,7 +104,7 @@ int RGWStreamRWHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *i
   return 0;
 }
 
-int RGWStreamRWHTTPResourceCRF::write(bufferlist& data)
+int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data)
 {
 #warning write need to throttle and block
   reenter(&write_state) {
@@ -103,7 +115,7 @@ int RGWStreamRWHTTPResourceCRF::write(bufferlist& data)
   return 0;
 }
 
-int RGWStreamRWHTTPResourceCRF::drain_writes(bool *need_retry)
+int RGWStreamWriteHTTPResourceCRF::drain_writes(bool *need_retry)
 {
   reenter(&drain_state) {
     *need_retry = true;
@@ -118,74 +130,6 @@ int RGWStreamRWHTTPResourceCRF::drain_writes(bool *need_retry)
   return 0;
 }
 
-TestCR::TestCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
-                                                                                        req(_req) {}
-TestCR::~TestCR() {
-  delete crf;
-}
-
-int TestCR::operate() {
-  reenter(this) {
-    crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, req);
-
-    {
-      int ret = crf->init();
-      if (ret < 0) {
-        return set_cr_error(ret);
-      }
-    }
-
-    do {
-
-      bl.clear();
-
-      do {
-        yield {
-          ret = crf->read(&bl, 4 * 1024 * 1024, &need_retry);
-          if (ret < 0)  {
-            return set_cr_error(ret);
-          }
-        }
-      } while (need_retry);
-
-      if (retcode < 0) {
-        return set_cr_error(ret);
-      }
-
-      dout(0) << "read " << bl.length() << " bytes" << dendl;
-
-      if (bl.length() == 0) {
-        break;
-      }
-
-      yield {
-        ret = crf->write(bl);
-        if (ret < 0)  {
-          return set_cr_error(ret);
-        }
-      }
-
-      if (retcode < 0) {
-        return set_cr_error(ret);
-      }
-
-      dout(0) << "wrote " << bl.length() << " bytes" << dendl;
-    } while (true);
-
-    do {
-      yield {
-        int ret = crf->drain_writes(&need_retry);
-        if (ret < 0) {
-          return set_cr_error(ret);
-        }
-      }
-    } while (need_retry);
-
-    return set_cr_done();
-  }
-  return 0;
-}
-
 TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
                            RGWHTTPStreamRWRequest *_in_req,
                            RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
@@ -197,8 +141,8 @@ TestSpliceCR::~TestSpliceCR() {
 
 int TestSpliceCR::operate() {
   reenter(this) {
-    in_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, in_req);
-    out_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, out_req);
+    in_crf = new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager, in_req);
+    out_crf = new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager, out_req);
 
     {
       int ret = in_crf->init();
@@ -244,7 +188,6 @@ int TestSpliceCR::operate() {
         if (ret < 0) {
           return set_cr_error(ret);
         }
-dout(0) << __FILE__ << ":" << __LINE__ << ": headers=" << in_req->get_out_headers() << dendl;
       }
 
       total_read += bl.length();
index bc430b96de475987c30e81348a15ddeecc4025d4..df7c93fbfe4a6140f274a190bb7ce34ad54887e5 100644 (file)
@@ -308,7 +308,27 @@ public:
 
 class RGWCRHTTPGetDataCB;
 
-class RGWStreamRWHTTPResourceCRF {
+class RGWStreamReadResourceCRF {
+protected:
+  boost::asio::coroutine read_state;
+
+public:
+  virtual int init() = 0;
+  virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */
+};
+
+class RGWStreamWriteResourceCRF {
+protected:
+  boost::asio::coroutine write_state;
+  boost::asio::coroutine drain_state;
+
+public:
+  virtual int init() = 0;
+  virtual int write(bufferlist& data) = 0; /* reentrant */
+  virtual int drain_writes(bool *need_retry) = 0; /* reentrant */
+};
+
+class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
   RGWCoroutinesEnv *env;
   RGWCoroutine *caller;
   RGWHTTPManager *http_manager;
@@ -317,13 +337,9 @@ class RGWStreamRWHTTPResourceCRF {
 
   RGWCRHTTPGetDataCB *in_cb{nullptr};
 
-  boost::asio::coroutine read_state;
-  boost::asio::coroutine write_state;
-  boost::asio::coroutine drain_state;
-
 
 public:
-  RGWStreamRWHTTPResourceCRF(CephContext *_cct,
+  RGWStreamReadHTTPResourceCRF(CephContext *_cct,
                                RGWCoroutinesEnv *_env,
                                RGWCoroutine *_caller,
                                RGWHTTPManager *_http_manager,
@@ -331,28 +347,33 @@ public:
                                                                caller(_caller),
                                                                http_manager(_http_manager),
                                                                req(_req) {}
-  ~RGWStreamRWHTTPResourceCRF();
+  virtual ~RGWStreamReadHTTPResourceCRF();
 
   int init();
-  int read(bufferlist *data, uint64_t max, bool *need_retry); /* reentrant */
-  int write(bufferlist& data); /* reentrant */
-  int drain_writes(bool *need_retry); /* reentrant */
+  int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
 };
 
-class TestCR : public RGWCoroutine {
-  CephContext *cct;
+class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
+  RGWCoroutinesEnv *env;
+  RGWCoroutine *caller;
   RGWHTTPManager *http_manager;
-  string url;
-  RGWHTTPStreamRWRequest *req{nullptr};
-  RGWStreamRWHTTPResourceCRF *crf{nullptr};
-  bufferlist bl;
-  bool need_retry{false};
-  int ret{0};
+
+  RGWHTTPStreamRWRequest *req;
+
 public:
-  TestCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_req);
-  ~TestCR();
+  RGWStreamWriteHTTPResourceCRF(CephContext *_cct,
+                               RGWCoroutinesEnv *_env,
+                               RGWCoroutine *_caller,
+                               RGWHTTPManager *_http_manager,
+                               RGWHTTPStreamRWRequest *_req) : env(_env),
+                                                               caller(_caller),
+                                                               http_manager(_http_manager),
+                                                               req(_req) {}
+  virtual ~RGWStreamWriteHTTPResourceCRF() {}
 
-  int operate();
+  int init();
+  int write(bufferlist& data); /* reentrant */
+  int drain_writes(bool *need_retry); /* reentrant */
 };
 
 class TestSpliceCR : public RGWCoroutine {
@@ -361,8 +382,8 @@ class TestSpliceCR : public RGWCoroutine {
   string url;
   RGWHTTPStreamRWRequest *in_req{nullptr};
   RGWHTTPStreamRWRequest *out_req{nullptr};
-  RGWStreamRWHTTPResourceCRF *in_crf{nullptr};
-  RGWStreamRWHTTPResourceCRF *out_crf{nullptr};
+  RGWStreamReadHTTPResourceCRF *in_crf{nullptr};
+  RGWStreamWriteHTTPResourceCRF *out_crf{nullptr};
   bufferlist bl;
   bool need_retry{false};
   uint64_t total_read{0};