From: Radoslaw Zarzynski Date: Thu, 11 Aug 2016 12:01:20 +0000 (+0200) Subject: rgw: implement RGWStreamIOEngine::{send,recv}_body(). X-Git-Tag: v11.1.0~454^2~29 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1b28da378e84787aaf7d5c19beb150b6986acc58;p=ceph.git rgw: implement RGWStreamIOEngine::{send,recv}_body(). Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/rgw/rgw_asio_client.h b/src/rgw/rgw_asio_client.h index e2d12bfeeea..13bda9d7f20 100644 --- a/src/rgw/rgw_asio_client.h +++ b/src/rgw/rgw_asio_client.h @@ -39,8 +39,8 @@ class RGWAsioClientIO : public RGWStreamIOEngine { RGWEnv env; void init_env(CephContext *cct) override; - std::size_t write_data(const char *buf, std::size_t len) override; - std::size_t read_data(char *buf, std::size_t max) override; + std::size_t write_data(const char *buf, std::size_t len); + std::size_t read_data(char *buf, std::size_t max); public: RGWAsioClientIO(tcp::socket&& socket, request_type&& request); @@ -55,6 +55,14 @@ class RGWAsioClientIO : public RGWStreamIOEngine { std::size_t send_content_length(uint64_t len) override; std::size_t complete_header() override; + std::size_t recv_body(char* buf, std::size_t max) override { + return read_data(buf, max); + } + + std::size_t send_body(const char* buf, std::size_t len) override { + return write_data(buf, len); + } + RGWEnv& get_env() noexcept override { return env; } diff --git a/src/rgw/rgw_civetweb.h b/src/rgw/rgw_civetweb.h index 46e0d0044ee..7837f2102ee 100644 --- a/src/rgw/rgw_civetweb.h +++ b/src/rgw/rgw_civetweb.h @@ -20,19 +20,28 @@ class RGWCivetWeb : public RGWStreamIOEngine bool explicit_keepalive; bool explicit_conn_close; + std::size_t write_data(const char *buf, std::size_t len); + std::size_t read_data(char *buf, std::size_t len); std::size_t dump_date_header(); + public: void init_env(CephContext *cct); - std::size_t write_data(const char *buf, std::size_t len) override; - std::size_t read_data(char *buf, std::size_t len) override; - std::size_t send_status(int status, const char *status_name) override; std::size_t send_100_continue() override; std::size_t send_header(const boost::string_ref& name, const boost::string_ref& value) override; std::size_t send_content_length(uint64_t len) override; std::size_t complete_header() override; + + std::size_t recv_body(char* buf, std::size_t max) override { + return read_data(buf, max); + } + + std::size_t send_body(const char* buf, std::size_t len) override { + return write_data(buf, len); + } + int complete_request() override; void flush() override; diff --git a/src/rgw/rgw_client_io.cc b/src/rgw/rgw_client_io.cc index ddac4b3d981..eb20047730d 100644 --- a/src/rgw/rgw_client_io.cc +++ b/src/rgw/rgw_client_io.cc @@ -29,7 +29,7 @@ int RGWStreamIOFacade::write(const char *buf, int len) return 0; } - const auto ret = engine.write_data(buf, len); + const auto ret = engine.send_body(buf, len); if (ret < 0) { return ret; } else if (ret < len) { @@ -42,7 +42,7 @@ int RGWStreamIOFacade::write(const char *buf, int len) int RGWStreamIOFacade::read(char *buf, int max, int *actual) { - int ret = engine.read_data(buf, max); + int ret = engine.recv_body(buf, max); if (ret < 0) { return ret; } diff --git a/src/rgw/rgw_client_io.h b/src/rgw/rgw_client_io.h index d3a587b926d..38640084b8d 100644 --- a/src/rgw/rgw_client_io.h +++ b/src/rgw/rgw_client_io.h @@ -43,14 +43,9 @@ class RGWStreamIOEngine : public RGWClientIO { friend class RGWStreamIOFacade; friend class RGWStreamIOLegacyWrapper; -protected: - virtual std::size_t read_data(char *buf, std::size_t max) = 0; - virtual std::size_t write_data(const char *buf, std::size_t len) = 0; - public: class Exception : public std::exception { int err; - public: Exception(const int err) : err(err) { @@ -72,6 +67,12 @@ public: virtual std::size_t send_content_length(uint64_t len) = 0; virtual std::size_t complete_header() = 0; + + /* Receive body. On success Returns number of bytes sent to the direct + * client of RadosGW. On failure throws int containing errno. */ + virtual std::size_t recv_body(char* buf, std::size_t max) = 0; + virtual std::size_t send_body(const char* buf, std::size_t len) = 0; + virtual void flush() = 0; }; @@ -113,16 +114,19 @@ protected: RGWEnv env; public: - virtual int read_data(char *buf, int max) = 0; - virtual int write_data(const char *buf, int len) = 0; virtual int send_status(int status, const char *status_name) = 0; virtual int send_100_continue() = 0; virtual std::size_t send_header(const boost::string_ref& name, const boost::string_ref& value) noexcept = 0; - virtual int complete_header() = 0; virtual int send_content_length(uint64_t len) = 0; + virtual int complete_header() = 0; + + virtual int recv_body(char* buf, std::size_t max) = 0; + virtual int send_body(const char* buf, std::size_t len) = 0; virtual void flush() = 0; + virtual ~RGWStreamIO() {} + RGWStreamIO() : RGWStreamIOFacade(this), _account(false), @@ -186,14 +190,6 @@ protected: EXCPT_TO_VOID(get_decoratee().init_env(cct)); } - int read_data(char* const buf, const int max) override { - EXCPT_TO_RC(get_decoratee().read_data(buf, max)); - } - - int write_data(const char* const buf, const int len) override { - EXCPT_TO_RC(get_decoratee().write_data(buf, len)); - } - public: RGWStreamIOLegacyWrapper(RGWStreamIOEngine * const engine) : engine(engine) { @@ -220,6 +216,15 @@ public: EXCPT_TO_RC(get_decoratee().complete_header()); } + int recv_body(char* buf, const std::size_t max) override { + EXCPT_TO_RC(get_decoratee().recv_body(buf, max)); + } + + int send_body(const char* const buf, const std::size_t len) override { + EXCPT_TO_RC(get_decoratee().send_body(buf, len)); + } + + void flush() override { EXCPT_TO_VOID(get_decoratee().flush()); } diff --git a/src/rgw/rgw_client_io_decoimpl.h b/src/rgw/rgw_client_io_decoimpl.h index 24f8020d728..ba50a7a6000 100644 --- a/src/rgw/rgw_client_io_decoimpl.h +++ b/src/rgw/rgw_client_io_decoimpl.h @@ -46,19 +46,9 @@ protected: return get_decoratee().init_env(cct); } - std::size_t read_data(char* const buf, - const std::size_t max) override { - return get_decoratee().read_data(buf, max); - } - - std::size_t write_data(const char* const buf, - const std::size_t len) override { - return get_decoratee().write_data(buf, len); - } - public: - RGWDecoratedStreamIO(const DecorateeT& decoratee) - : decoratee(decoratee) { + RGWDecoratedStreamIO(DecorateeT&& decoratee) + : decoratee(std::move(decoratee)) { } std::size_t send_status(const int status, @@ -83,6 +73,15 @@ public: return get_decoratee().complete_header(); } + std::size_t recv_body(char* const buf, const std::size_t max) override { + return get_decoratee().recv_body(buf, max); + } + + std::size_t send_body(const char* const buf, + const std::size_t len) override { + return get_decoratee().send_body(buf, len); + } + void flush() override { return get_decoratee().flush(); } @@ -104,24 +103,6 @@ class RGWStreamIOAccountingEngine : public RGWDecoratedStreamIO, uint64_t total_sent; uint64_t total_received; -protected: - std::size_t read_data(char* const buf, const std::size_t max) override { - const auto received = RGWDecoratedStreamIO::read_data(buf, max); - if (enabled) { - total_received += received; - } - return received; - } - - std::size_t write_data(const char* const buf, - const std::size_t len) override { - const auto sent = RGWDecoratedStreamIO::write_data(buf, len); - if (enabled) { - total_sent += sent; - } - return sent; - } - public: template RGWStreamIOAccountingEngine(U&& decoratee) @@ -173,6 +154,23 @@ public: return sent; } + std::size_t recv_body(char* buf, std::size_t max) override { + const auto received = RGWDecoratedStreamIO::recv_body(buf, max); + if (enabled) { + total_received += received; + } + return received; + } + + std::size_t send_body(const char* const buf, + const std::size_t len) override { + const auto sent = RGWDecoratedStreamIO::send_body(buf, len); + if (enabled) { + total_sent += sent; + } + return sent; + } + uint64_t get_bytes_sent() const override { return total_sent; } @@ -198,8 +196,6 @@ protected: bool has_content_length; bool buffer_data; - std::size_t write_data(const char* buf, const std::size_t len) override; - public: template RGWStreamIOBufferingEngine(U&& decoratee) @@ -210,19 +206,20 @@ public: std::size_t send_content_length(const uint64_t len) override; std::size_t complete_header() override; + std::size_t send_body(const char* buf, std::size_t len) override; int complete_request() override; }; template -std::size_t RGWStreamIOBufferingEngine::write_data(const char* buf, - const std::size_t len) +std::size_t RGWStreamIOBufferingEngine::send_body(const char* const buf, + const std::size_t len) { if (buffer_data) { data.append(buf, len); return 0; } - return RGWDecoratedStreamIO::write_data(buf, len); + return RGWDecoratedStreamIO::send_body(buf, len); } template @@ -258,8 +255,8 @@ int RGWStreamIOBufferingEngine::complete_request() /* We are sending each buffer separately to avoid extra memory shuffling * that would occur on data.c_str() to provide a continuous memory area. */ for (const auto& ptr : data.buffers()) { - sent += RGWDecoratedStreamIO::write_data(ptr.c_str(), - ptr.length()); + sent += RGWDecoratedStreamIO::send_body(ptr.c_str(), + ptr.length()); } data.clear(); buffer_data = false; @@ -276,21 +273,6 @@ protected: bool has_content_length; bool chunking_enabled; - std::size_t write_data(const char* const buf, - const std::size_t len) override { - if (! chunking_enabled) { - return RGWDecoratedStreamIO::write_data(buf, len); - } else { - constexpr char HEADER_END[] = "\r\n"; - char sizebuf[32]; - snprintf(sizebuf, sizeof(buf), "%" PRIx64 "\r\n", len); - - RGWDecoratedStreamIO::write_data(sizebuf, strlen(sizebuf)); - RGWDecoratedStreamIO::write_data(buf, len); - return RGWDecoratedStreamIO::write_data(HEADER_END, sizeof(HEADER_END) - 1); - } - } - public: template RGWStreamIOChunkingEngine(U&& decoratee) @@ -315,6 +297,24 @@ public: return sent + RGWDecoratedStreamIO::complete_header(); } + + std::size_t send_body(const char* buf, + const std::size_t len) override { + if (! chunking_enabled) { + return RGWDecoratedStreamIO::send_body(buf, len); + } else { + static constexpr char HEADER_END[] = "\r\n"; + char sizebuf[32]; + const auto slen = snprintf(sizebuf, sizeof(buf), "%" PRIx64 "\r\n", len); + std::size_t sent = 0; + + sent += RGWDecoratedStreamIO::send_body(sizebuf, slen); + sent += RGWDecoratedStreamIO::send_body(buf, len); + sent += RGWDecoratedStreamIO::send_body(HEADER_END, + sizeof(HEADER_END) - 1); + return sent; + } + } }; template diff --git a/src/rgw/rgw_fcgi.h b/src/rgw/rgw_fcgi.h index 8d8454b0b70..0835cecdbeb 100644 --- a/src/rgw/rgw_fcgi.h +++ b/src/rgw/rgw_fcgi.h @@ -16,15 +16,15 @@ class RGWFCGX : public RGWStreamIOEngine FCGX_Request *fcgx; RGWEnv env; + std::size_t read_data(char* buf, std::size_t len); + std::size_t write_data(const char* buf, std::size_t len); + public: explicit RGWFCGX(FCGX_Request* const fcgx) : fcgx(fcgx) { } void init_env(CephContext* cct) override; - std::size_t read_data(char* buf, std::size_t len) override; - std::size_t write_data(const char* buf, std::size_t len) override; - std::size_t send_status(int status, const char* status_name) override; std::size_t send_100_continue() override; std::size_t send_header(const boost::string_ref& name, @@ -32,6 +32,14 @@ public: std::size_t send_content_length(uint64_t len) override; std::size_t complete_header() override; + std::size_t recv_body(char* buf, std::size_t max) override { + return read_data(buf, max); + } + + std::size_t send_body(const char* buf, std::size_t len) override { + return write_data(buf, len); + } + void flush(); RGWEnv& get_env() noexcept override { diff --git a/src/rgw/rgw_loadgen.h b/src/rgw/rgw_loadgen.h index 2154cdea507..ae271e78c34 100644 --- a/src/rgw/rgw_loadgen.h +++ b/src/rgw/rgw_loadgen.h @@ -38,30 +38,38 @@ class RGWLoadGenIO : public RGWStreamIOEngine RGWLoadGenRequestEnv* req; RGWEnv env; + void init_env(CephContext *cct) override; + std::size_t read_data(char *buf, std::size_t len); + std::size_t write_data(const char *buf, std::size_t len); + public: explicit RGWLoadGenIO(RGWLoadGenRequestEnv* const req) : left_to_read(0), req(req) { } - void init_env(CephContext *cct); - std::size_t read_data(char *buf, std::size_t len); - std::size_t write_data(const char *buf, std::size_t len); - - std::size_t send_status(int status, const char *status_name); - std::size_t send_100_continue(); + std::size_t send_status(int status, const char *status_name) override; + std::size_t send_100_continue() override; std::size_t send_header(const boost::string_ref& name, const boost::string_ref& value) override; - std::size_t complete_header(); - std::size_t send_content_length(uint64_t len); + std::size_t complete_header() override; + std::size_t send_content_length(uint64_t len) override; + + std::size_t recv_body(char* buf, std::size_t max) override { + return read_data(buf, max); + } + + std::size_t send_body(const char* buf, std::size_t len) override { + return write_data(buf, len); + } - void flush(); + void flush() override; RGWEnv& get_env() noexcept override { return env; } - int complete_request(); + int complete_request() override; }; #endif