]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: implement RGWStreamIOEngine::{send,recv}_body().
authorRadoslaw Zarzynski <rzarzynski@mirantis.com>
Thu, 11 Aug 2016 12:01:20 +0000 (14:01 +0200)
committerRadoslaw Zarzynski <rzarzynski@mirantis.com>
Fri, 21 Oct 2016 20:57:20 +0000 (22:57 +0200)
Signed-off-by: Radoslaw Zarzynski <rzarzynski@mirantis.com>
src/rgw/rgw_asio_client.h
src/rgw/rgw_civetweb.h
src/rgw/rgw_client_io.cc
src/rgw/rgw_client_io.h
src/rgw/rgw_client_io_decoimpl.h
src/rgw/rgw_fcgi.h
src/rgw/rgw_loadgen.h

index e2d12bfeeea8ea7cb27e289505eab64a4246db7a..13bda9d7f20d5a894b68ff9cf45b948ce6ffd6b0 100644 (file)
@@ -39,8 +39,8 @@ class RGWAsioClientIO : public RGWStreamIOEngine {
   RGWEnv env;
 
   void init_env(CephContext *cct) override;
-  std::size_t write_data(const char *buf, std::size_t len) override;
-  std::size_t read_data(char *buf, std::size_t max) override;
+  std::size_t write_data(const char *buf, std::size_t len);
+  std::size_t read_data(char *buf, std::size_t max);
 
  public:
   RGWAsioClientIO(tcp::socket&& socket, request_type&& request);
@@ -55,6 +55,14 @@ class RGWAsioClientIO : public RGWStreamIOEngine {
   std::size_t send_content_length(uint64_t len) override;
   std::size_t complete_header() override;
 
+  std::size_t recv_body(char* buf, std::size_t max) override {
+    return read_data(buf, max);
+  }
+
+  std::size_t send_body(const char* buf, std::size_t len) override {
+    return write_data(buf, len);
+  }
+
   RGWEnv& get_env() noexcept override {
     return env;
   }
index 46e0d0044eec3a46006e7485b7f83b5ee85b14d4..7837f2102ee193aecfed547912cb128d12794005 100644 (file)
@@ -20,19 +20,28 @@ class RGWCivetWeb : public RGWStreamIOEngine
   bool explicit_keepalive;
   bool explicit_conn_close;
 
+  std::size_t write_data(const char *buf, std::size_t len);
+  std::size_t read_data(char *buf, std::size_t len);
   std::size_t dump_date_header();
+
 public:
   void init_env(CephContext *cct);
 
-  std::size_t write_data(const char *buf, std::size_t len) override;
-  std::size_t read_data(char *buf, std::size_t len) override;
-
   std::size_t send_status(int status, const char *status_name) override;
   std::size_t send_100_continue() override;
   std::size_t send_header(const boost::string_ref& name,
                           const boost::string_ref& value) override;
   std::size_t send_content_length(uint64_t len) override;
   std::size_t complete_header() override;
+
+  std::size_t recv_body(char* buf, std::size_t max) override {
+    return read_data(buf, max);
+  }
+
+  std::size_t send_body(const char* buf, std::size_t len) override {
+    return write_data(buf, len);
+  }
+
   int complete_request() override;
 
   void flush() override;
index ddac4b3d981403987bda8eda11e673022c69c82d..eb20047730d75ef1c5c6c139905b79f2f33c357b 100644 (file)
@@ -29,7 +29,7 @@ int RGWStreamIOFacade::write(const char *buf, int len)
     return 0;
   }
 
-  const auto ret = engine.write_data(buf, len);
+  const auto ret = engine.send_body(buf, len);
   if (ret < 0) {
     return ret;
   } else if (ret < len) {
@@ -42,7 +42,7 @@ int RGWStreamIOFacade::write(const char *buf, int len)
 
 int RGWStreamIOFacade::read(char *buf, int max, int *actual)
 {
-  int ret = engine.read_data(buf, max);
+  int ret = engine.recv_body(buf, max);
   if (ret < 0) {
     return ret;
   }
index d3a587b926de7540f1aea23a67a547658ddeaaf5..38640084b8d10aab56133f10ae1086cc7a3241f7 100644 (file)
@@ -43,14 +43,9 @@ class RGWStreamIOEngine : public RGWClientIO {
   friend class RGWStreamIOFacade;
   friend class RGWStreamIOLegacyWrapper;
 
-protected:
-  virtual std::size_t read_data(char *buf, std::size_t max) = 0;
-  virtual std::size_t write_data(const char *buf, std::size_t len) = 0;
-
 public:
   class Exception : public std::exception {
     int err;
-
   public:
     Exception(const int err)
       : err(err) {
@@ -72,6 +67,12 @@ public:
 
   virtual std::size_t send_content_length(uint64_t len) = 0;
   virtual std::size_t complete_header() = 0;
+
+  /* Receive body. On success Returns number of bytes sent to the direct
+   * client of RadosGW. On failure throws int containing errno. */
+  virtual std::size_t recv_body(char* buf, std::size_t max) = 0;
+  virtual std::size_t send_body(const char* buf, std::size_t len) = 0;
+
   virtual void flush() = 0;
 };
 
@@ -113,16 +114,19 @@ protected:
   RGWEnv env;
 
 public:
-  virtual int read_data(char *buf, int max) = 0;
-  virtual int write_data(const char *buf, int len) = 0;
   virtual int send_status(int status, const char *status_name) = 0;
   virtual int send_100_continue() = 0;
   virtual std::size_t send_header(const boost::string_ref& name,
                                   const boost::string_ref& value) noexcept = 0;
-  virtual int complete_header() = 0;
   virtual int send_content_length(uint64_t len) = 0;
+  virtual int complete_header() = 0;
+
+  virtual int recv_body(char* buf, std::size_t max) = 0;
+  virtual int send_body(const char* buf, std::size_t len) = 0;
   virtual void flush() = 0;
+
   virtual ~RGWStreamIO() {}
+
   RGWStreamIO()
     : RGWStreamIOFacade(this),
       _account(false),
@@ -186,14 +190,6 @@ protected:
     EXCPT_TO_VOID(get_decoratee().init_env(cct));
   }
 
-  int read_data(char* const buf, const int max) override {
-    EXCPT_TO_RC(get_decoratee().read_data(buf, max));
-  }
-
-  int write_data(const char* const buf, const int len) override {
-    EXCPT_TO_RC(get_decoratee().write_data(buf, len));
-  }
-
 public:
   RGWStreamIOLegacyWrapper(RGWStreamIOEngine * const engine)
     : engine(engine) {
@@ -220,6 +216,15 @@ public:
     EXCPT_TO_RC(get_decoratee().complete_header());
   }
 
+  int recv_body(char* buf, const std::size_t max) override {
+    EXCPT_TO_RC(get_decoratee().recv_body(buf, max));
+  }
+
+  int send_body(const char* const buf, const std::size_t len) override {
+    EXCPT_TO_RC(get_decoratee().send_body(buf, len));
+  }
+
+
   void flush() override {
     EXCPT_TO_VOID(get_decoratee().flush());
   }
index 24f8020d728085d7587041d3a0a13d030fd519f9..ba50a7a60008fd63e30693d6942b4ed3e4c34dca 100644 (file)
@@ -46,19 +46,9 @@ protected:
     return get_decoratee().init_env(cct);
   }
 
-  std::size_t read_data(char* const buf,
-                        const std::size_t max) override {
-    return get_decoratee().read_data(buf, max);
-  }
-
-  std::size_t write_data(const char* const buf,
-                         const std::size_t len) override {
-    return get_decoratee().write_data(buf, len);
-  }
-
 public:
-  RGWDecoratedStreamIO(const DecorateeT& decoratee)
-    : decoratee(decoratee) {
+  RGWDecoratedStreamIO(DecorateeT&& decoratee)
+    : decoratee(std::move(decoratee)) {
   }
 
   std::size_t send_status(const int status,
@@ -83,6 +73,15 @@ public:
     return get_decoratee().complete_header();
   }
 
+  std::size_t recv_body(char* const buf, const std::size_t max) override {
+    return get_decoratee().recv_body(buf, max);
+  }
+
+  std::size_t send_body(const char* const buf,
+                        const std::size_t len) override {
+    return get_decoratee().send_body(buf, len);
+  }
+
   void flush() override {
     return get_decoratee().flush();
   }
@@ -104,24 +103,6 @@ class RGWStreamIOAccountingEngine : public RGWDecoratedStreamIO<T>,
   uint64_t total_sent;
   uint64_t total_received;
 
-protected:
-  std::size_t read_data(char* const buf, const std::size_t max) override {
-    const auto received = RGWDecoratedStreamIO<T>::read_data(buf, max);
-    if (enabled) {
-      total_received += received;
-    }
-    return received;
-  }
-
-  std::size_t write_data(const char* const buf,
-                         const std::size_t len) override {
-    const auto sent = RGWDecoratedStreamIO<T>::write_data(buf, len);
-    if (enabled) {
-      total_sent += sent;
-    }
-    return sent;
-  }
-
 public:
   template <typename U>
   RGWStreamIOAccountingEngine(U&& decoratee)
@@ -173,6 +154,23 @@ public:
     return sent;
   }
 
+  std::size_t recv_body(char* buf, std::size_t max) override {
+    const auto received = RGWDecoratedStreamIO<T>::recv_body(buf, max);
+    if (enabled) {
+      total_received += received;
+    }
+    return received;
+  }
+
+  std::size_t send_body(const char* const buf,
+                        const std::size_t len) override {
+    const auto sent = RGWDecoratedStreamIO<T>::send_body(buf, len);
+    if (enabled) {
+      total_sent += sent;
+    }
+    return sent;
+  }
+
   uint64_t get_bytes_sent() const override {
     return total_sent;
   }
@@ -198,8 +196,6 @@ protected:
   bool has_content_length;
   bool buffer_data;
 
-  std::size_t write_data(const char* buf, const std::size_t len) override;
-
 public:
   template <typename U>
   RGWStreamIOBufferingEngine(U&& decoratee)
@@ -210,19 +206,20 @@ public:
 
   std::size_t send_content_length(const uint64_t len) override;
   std::size_t complete_header() override;
+  std::size_t send_body(const char* buf, std::size_t len) override;
   int complete_request() override;
 };
 
 template <typename T>
-std::size_t RGWStreamIOBufferingEngine<T>::write_data(const char* buf,
-                                                      const std::size_t len)
+std::size_t RGWStreamIOBufferingEngine<T>::send_body(const char* const buf,
+                                                     const std::size_t len)
 {
   if (buffer_data) {
     data.append(buf, len);
     return 0;
   }
 
-  return RGWDecoratedStreamIO<T>::write_data(buf, len);
+  return RGWDecoratedStreamIO<T>::send_body(buf, len);
 }
 
 template <typename T>
@@ -258,8 +255,8 @@ int RGWStreamIOBufferingEngine<T>::complete_request()
     /* We are sending each buffer separately to avoid extra memory shuffling
      * that would occur on data.c_str() to provide a continuous memory area. */
     for (const auto& ptr : data.buffers()) {
-      sent += RGWDecoratedStreamIO<T>::write_data(ptr.c_str(),
-                                                  ptr.length());
+      sent += RGWDecoratedStreamIO<T>::send_body(ptr.c_str(),
+                                                 ptr.length());
     }
     data.clear();
     buffer_data = false;
@@ -276,21 +273,6 @@ protected:
   bool has_content_length;
   bool chunking_enabled;
 
-  std::size_t write_data(const char* const buf,
-                         const std::size_t len) override {
-    if (! chunking_enabled) {
-      return RGWDecoratedStreamIO<T>::write_data(buf, len);
-    } else {
-      constexpr char HEADER_END[] = "\r\n";
-      char sizebuf[32];
-      snprintf(sizebuf, sizeof(buf), "%" PRIx64 "\r\n", len);
-
-      RGWDecoratedStreamIO<T>::write_data(sizebuf, strlen(sizebuf));
-      RGWDecoratedStreamIO<T>::write_data(buf, len);
-      return RGWDecoratedStreamIO<T>::write_data(HEADER_END, sizeof(HEADER_END) - 1);
-    }
-  }
-
 public:
   template <typename U>
   RGWStreamIOChunkingEngine(U&& decoratee)
@@ -315,6 +297,24 @@ public:
 
     return sent + RGWDecoratedStreamIO<T>::complete_header();
   }
+
+  std::size_t send_body(const char* buf,
+                        const std::size_t len) override {
+    if (! chunking_enabled) {
+      return RGWDecoratedStreamIO<T>::send_body(buf, len);
+    } else {
+      static constexpr char HEADER_END[] = "\r\n";
+      char sizebuf[32];
+      const auto slen = snprintf(sizebuf, sizeof(buf), "%" PRIx64 "\r\n", len);
+      std::size_t sent = 0;
+
+      sent += RGWDecoratedStreamIO<T>::send_body(sizebuf, slen);
+      sent += RGWDecoratedStreamIO<T>::send_body(buf, len);
+      sent += RGWDecoratedStreamIO<T>::send_body(HEADER_END,
+                                                  sizeof(HEADER_END) - 1);
+      return sent;
+    }
+  }
 };
 
 template <typename T>
index 8d8454b0b70d3074791dba055f88e74ab267ff6e..0835cecdbebc5d2c23a18fc00c76c3e3d1dbedb6 100644 (file)
@@ -16,15 +16,15 @@ class RGWFCGX : public RGWStreamIOEngine
   FCGX_Request *fcgx;
   RGWEnv env;
 
+  std::size_t read_data(char* buf, std::size_t len);
+  std::size_t write_data(const char* buf, std::size_t len);
+
 public:
   explicit RGWFCGX(FCGX_Request* const fcgx)
     : fcgx(fcgx) {
   }
 
   void init_env(CephContext* cct) override;
-  std::size_t read_data(char* buf, std::size_t len) override;
-  std::size_t write_data(const char* buf, std::size_t len) override;
-
   std::size_t send_status(int status, const char* status_name) override;
   std::size_t send_100_continue() override;
   std::size_t send_header(const boost::string_ref& name,
@@ -32,6 +32,14 @@ public:
   std::size_t send_content_length(uint64_t len) override;
   std::size_t complete_header() override;
 
+  std::size_t recv_body(char* buf, std::size_t max) override {
+    return read_data(buf, max);
+  }
+
+  std::size_t send_body(const char* buf, std::size_t len) override {
+    return write_data(buf, len);
+  }
+
   void flush();
 
   RGWEnv& get_env() noexcept override {
index 2154cdea5076bdf92fd5c9312f94196f25f437e1..ae271e78c341122de3328a55b2ecf1fff2cdfcf9 100644 (file)
@@ -38,30 +38,38 @@ class RGWLoadGenIO : public RGWStreamIOEngine
   RGWLoadGenRequestEnv* req;
   RGWEnv env;
 
+  void init_env(CephContext *cct) override;
+  std::size_t read_data(char *buf, std::size_t len);
+  std::size_t write_data(const char *buf, std::size_t len);
+
 public:
   explicit RGWLoadGenIO(RGWLoadGenRequestEnv* const req)
     : left_to_read(0),
       req(req) {
   }
 
-  void init_env(CephContext *cct);
-  std::size_t read_data(char *buf, std::size_t len);
-  std::size_t write_data(const char *buf, std::size_t len);
-
-  std::size_t send_status(int status, const char *status_name);
-  std::size_t send_100_continue();
+  std::size_t send_status(int status, const char *status_name) override;
+  std::size_t send_100_continue() override;
   std::size_t send_header(const boost::string_ref& name,
                           const boost::string_ref& value) override;
-  std::size_t complete_header();
-  std::size_t send_content_length(uint64_t len);
+  std::size_t complete_header() override;
+  std::size_t send_content_length(uint64_t len) override;
+
+  std::size_t recv_body(char* buf, std::size_t max) override {
+    return read_data(buf, max);
+  }
+
+  std::size_t send_body(const char* buf, std::size_t len) override {
+    return write_data(buf, len);
+  }
 
-  void flush();
+  void flush() override;
 
   RGWEnv& get_env() noexcept override {
     return env;
   }
 
-  int complete_request();
+  int complete_request() override;
 };
 
 #endif