]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: more streaming crf abstraction
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 3 Oct 2017 00:26:57 +0000 (17:26 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:38 +0000 (08:05 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_cr_rest.h

index 626fdcbaf419df38a1aacb0140f403fcbf91ad97..e179e74fb3328cb372f64fcf0062efef04956a6d 100644 (file)
@@ -86,6 +86,17 @@ int RGWStreamWriteHTTPResourceCRF::init()
   return 0;
 }
 
+bool RGWStreamReadHTTPResourceCRF::has_attrs()
+{
+  return got_attrs;
+}
+
+void RGWStreamReadHTTPResourceCRF::get_attrs(std::map<string, string> *attrs)
+{
+#warning need to lock in_req->headers
+  *attrs = req->get_out_headers();
+}
+
 int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
 {
     reenter(&read_state) {
@@ -94,6 +105,7 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool
       if (!in_cb->has_data()) {
         yield caller->io_block(0, req->get_io_id());
       }
+      got_attrs = true;
       *io_pending = false;
       in_cb->claim_data(out, max_size);
       if (!req->is_done()) {
@@ -104,6 +116,17 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool
   return 0;
 }
 
+void RGWStreamWriteHTTPResourceCRF::set_attrs(const map<string, string>& attrs)
+{
+  for (auto h : attrs) {
+    if (h.first == "CONTENT_LENGTH") {
+      req->set_send_length(atoi(h.second.c_str()));
+    } else {
+      req->append_header(h.first, h.second);
+    }
+  }
+}
+
 int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data)
 {
 #warning write need to throttle and block
@@ -130,20 +153,14 @@ int RGWStreamWriteHTTPResourceCRF::drain_writes(bool *need_retry)
   return 0;
 }
 
-TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
-                           RGWHTTPStreamRWRequest *_in_req,
-                           RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
-                                                               in_req(_in_req), out_req(_out_req) {}
-TestSpliceCR::~TestSpliceCR() {
-  delete in_crf;
-  delete out_crf;
-}
+RGWStreamSpliceCR::RGWStreamSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
+                           RGWStreamReadHTTPResourceCRF *_in_crf,
+                           RGWStreamWriteHTTPResourceCRF *_out_crf) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
+                                                               in_crf(_in_crf), out_crf(_out_crf) {}
+RGWStreamSpliceCR::~RGWStreamSpliceCR() { }
 
-int TestSpliceCR::operate() {
+int RGWStreamSpliceCR::operate() {
   reenter(this) {
-    in_crf = new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager, in_req);
-    out_crf = new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager, out_req);
-
     {
       int ret = in_crf->init();
       if (ret < 0) {
@@ -169,25 +186,27 @@ int TestSpliceCR::operate() {
         }
       } while (need_retry);
 
-      dout(0) << "read " << bl.length() << " bytes" << dendl;
+      ldout(cct, 20) << "read " << bl.length() << " bytes" << dendl;
 
       if (bl.length() == 0) {
         break;
       }
 
-      if (total_read == 0) {
-#warning need to lock in_req->headers
-        for (auto h : in_req->get_out_headers()) {
-          if (h.first == "CONTENT_LENGTH") {
-            out_req->set_send_length(atoi(h.second.c_str()));
-          } else {
-            out_req->append_header(h.first, h.second);
-          }
-        }
+      if (!in_crf->has_attrs()) {
+        /* shouldn't happen */
+        ldout(cct, 0) << "ERROR: " << __func__ << ": can't handle !in_ctf->has_attrs" << dendl;
+        return set_cr_error(-EIO);
+      }
+
+      if (!sent_attrs) {
+        map<string, string> attrs;
+        in_crf->get_attrs(&attrs);
+        out_crf->set_attrs(attrs);
         int ret = out_crf->init();
         if (ret < 0) {
           return set_cr_error(ret);
         }
+        sent_attrs = true;
       }
 
       total_read += bl.length();
@@ -204,7 +223,7 @@ int TestSpliceCR::operate() {
         return set_cr_error(ret);
       }
 
-      dout(0) << "wrote " << bl.length() << " bytes" << dendl;
+      ldout(cct, 20) << "wrote " << bl.length() << " bytes" << dendl;
     } while (true);
 
     do {
@@ -220,3 +239,25 @@ int TestSpliceCR::operate() {
   }
   return 0;
 }
+
+TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
+                           RGWHTTPStreamRWRequest *_in_req,
+                           RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
+                                                               in_req(_in_req), out_req(_out_req) {}
+int TestSpliceCR::operate() {
+  reenter(this) {
+    in_crf = new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager, in_req);
+    out_crf = new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager, out_req);
+
+    yield call(new RGWStreamSpliceCR(cct, http_manager, in_crf, out_crf));
+
+    if (retcode < 0) {
+      return set_cr_error(retcode);
+    }
+
+    return set_cr_done();
+  }
+
+  return 0;
+};
+
index df7c93fbfe4a6140f274a190bb7ce34ad54887e5..07296a0a36fe9db85f93aebb0e96755cee969778 100644 (file)
@@ -315,6 +315,8 @@ protected:
 public:
   virtual int init() = 0;
   virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */
+  virtual bool has_attrs() = 0;
+  virtual void get_attrs(std::map<string, string> *attrs) = 0;
 };
 
 class RGWStreamWriteResourceCRF {
@@ -324,6 +326,7 @@ protected:
 
 public:
   virtual int init() = 0;
+  virtual void set_attrs(const std::map<string, string>& attrs) = 0;
   virtual int write(bufferlist& data) = 0; /* reentrant */
   virtual int drain_writes(bool *need_retry) = 0; /* reentrant */
 };
@@ -337,6 +340,7 @@ class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
 
   RGWCRHTTPGetDataCB *in_cb{nullptr};
 
+  bool got_attrs{false};
 
 public:
   RGWStreamReadHTTPResourceCRF(CephContext *_cct,
@@ -349,8 +353,10 @@ public:
                                                                req(_req) {}
   virtual ~RGWStreamReadHTTPResourceCRF();
 
-  int init();
+  int init() override;
   int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
+  bool has_attrs() override;
+  void get_attrs(std::map<string, string> *pattrs) override;
 };
 
 class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
@@ -371,28 +377,43 @@ public:
                                                                req(_req) {}
   virtual ~RGWStreamWriteHTTPResourceCRF() {}
 
-  int init();
-  int write(bufferlist& data); /* reentrant */
-  int drain_writes(bool *need_retry); /* reentrant */
+  int init() override;
+  void set_attrs(const std::map<string, string>& attrs) override;
+  int write(bufferlist& data) override; /* reentrant */
+  int drain_writes(bool *need_retry) override; /* reentrant */
 };
 
-class TestSpliceCR : public RGWCoroutine {
+class RGWStreamSpliceCR : public RGWCoroutine {
   CephContext *cct;
   RGWHTTPManager *http_manager;
   string url;
-  RGWHTTPStreamRWRequest *in_req{nullptr};
-  RGWHTTPStreamRWRequest *out_req{nullptr};
   RGWStreamReadHTTPResourceCRF *in_crf{nullptr};
   RGWStreamWriteHTTPResourceCRF *out_crf{nullptr};
   bufferlist bl;
   bool need_retry{false};
+  bool sent_attrs{false};
   uint64_t total_read{0};
   int ret{0};
+public:
+  RGWStreamSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
+                    RGWStreamReadHTTPResourceCRF *_in_crf,
+                    RGWStreamWriteHTTPResourceCRF *_out_crf);
+  ~RGWStreamSpliceCR();
+
+  int operate();
+};
+
+class TestSpliceCR : public RGWCoroutine {
+  CephContext *cct;
+  RGWHTTPManager *http_manager;
+  RGWHTTPStreamRWRequest *in_req{nullptr};
+  RGWHTTPStreamRWRequest *out_req{nullptr};
+  RGWStreamReadHTTPResourceCRF *in_crf{nullptr};
+  RGWStreamWriteHTTPResourceCRF *out_crf{nullptr};
 public:
   TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
                RGWHTTPStreamRWRequest *_in_req,
                RGWHTTPStreamRWRequest *_out_req);
-  ~TestSpliceCR();
 
   int operate();
 };