RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket,
request_type&& request)
: socket(std::move(socket)),
- request(std::move(request))
-{}
+ request(std::move(request)),
+ txbuf(*this) {
+}
RGWAsioClientIO::~RGWAsioClientIO() = default;
void RGWAsioClientIO::flush()
{
- return;
+ txbuf.pubsync();
}
size_t RGWAsioClientIO::send_status(const int status,
const auto statuslen = snprintf(statusbuf, sizeof(statusbuf),
"HTTP/1.1 %d %s\r\n", status, status_name);
- return write_data(statusbuf, statuslen);
+ return txbuf.sputn(statusbuf, statuslen);
}
size_t RGWAsioClientIO::send_100_continue()
{
const char HTTTP_100_CONTINUE[] = "HTTP/1.1 100 CONTINUE\r\n\r\n";
- return write_data(HTTTP_100_CONTINUE, sizeof(HTTTP_100_CONTINUE) - 1);
+ const size_t sent = txbuf.sputn(HTTTP_100_CONTINUE,
+ sizeof(HTTTP_100_CONTINUE) - 1);
+ flush();
+ return sent;
}
static constexpr size_t TIME_BUF_SIZE = 128;
char timestr[TIME_BUF_SIZE];
if (dump_date_header(timestr)) {
- sent += write_data(timestr, strlen(timestr));
+ sent += txbuf.sputn(timestr, strlen(timestr));
}
if (conn_keepalive) {
constexpr char CONN_KEEP_ALIVE[] = "Connection: Keep-Alive\r\n";
- sent += write_data(CONN_KEEP_ALIVE, sizeof(CONN_KEEP_ALIVE) - 1);
+ sent += txbuf.sputn(CONN_KEEP_ALIVE, sizeof(CONN_KEEP_ALIVE) - 1);
} else if (conn_close) {
constexpr char CONN_KEEP_CLOSE[] = "Connection: close\r\n";
- sent += write_data(CONN_KEEP_CLOSE, sizeof(CONN_KEEP_CLOSE) - 1);
+ sent += txbuf.sputn(CONN_KEEP_CLOSE, sizeof(CONN_KEEP_CLOSE) - 1);
}
constexpr char HEADER_END[] = "\r\n";
- return sent + write_data(HEADER_END, sizeof(HEADER_END) - 1);
+ sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);
+
+ flush();
+ return sent;
}
size_t RGWAsioClientIO::send_header(const boost::string_ref& name,
const boost::string_ref& value)
{
- char hdrbuf[name.size() + 2 + value.size() + 2 + 1];
- const auto hdrlen = snprintf(hdrbuf, sizeof(hdrbuf), "%.*s: %.*s\r\n",
- static_cast<int>(name.length()),
- name.data(),
- static_cast<int>(value.length()),
- value.data());
- return write_data(hdrbuf, hdrlen);
+ static constexpr char HEADER_SEP[] = ": ";
+ static constexpr char HEADER_END[] = "\r\n";
+
+ size_t sent = 0;
+
+ sent += txbuf.sputn(name.data(), name.length());
+ sent += txbuf.sputn(HEADER_SEP, sizeof(HEADER_SEP) - 1);
+ sent += txbuf.sputn(value.data(), value.length());
+ sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);
+
+ return sent;
}
size_t RGWAsioClientIO::send_content_length(const uint64_t len)
const auto sizelen = snprintf(sizebuf, sizeof(sizebuf),
"Content-Length: %" PRIu64 "\r\n", len);
- return write_data(sizebuf, sizelen);
+ return txbuf.sputn(sizebuf, sizelen);
}
Headers>;
};
-class RGWAsioClientIO : public rgw::io::RestfulClient {
+class RGWAsioClientIO : public rgw::io::RestfulClient,
+ public rgw::io::BuffererSink {
using tcp = boost::asio::ip::tcp;
tcp::socket socket;
bool conn_close{false};
RGWEnv env;
- size_t write_data(const char *buf, size_t len);
+ rgw::io::StaticOutputBufferer<> txbuf;
+
+ size_t write_data(const char *buf, size_t len) override;
size_t read_data(char *buf, size_t max);
public:
: conn(conn),
port(port),
explicit_keepalive(false),
- explicit_conn_close(false)
+ explicit_conn_close(false),
+ txbuf(*this)
{
}
void RGWCivetWeb::flush()
{
+ txbuf.pubsync();
}
size_t RGWCivetWeb::complete_request()
}
}
-template <class... Args>
-static inline size_t safe_mg_printf(Args&&... args)
-{
- const int ret = mg_printf(std::forward<Args>(args)...);
- if (ret == 0) {
- /* didn't send anything, error out */
- throw rgw::io::Exception(EIO, std::system_category());
- } else if (ret < 0) {
- throw rgw::io::Exception(-ret, std::system_category());
- }
- return static_cast<size_t>(ret);
-}
-
size_t RGWCivetWeb::send_status(int status, const char *status_name)
{
mg_set_http_status(conn, status);
- return safe_mg_printf(conn, "HTTP/1.1 %d %s\r\n", status,
- status_name ? status_name : "");
+ static constexpr size_t STATUS_BUF_SIZE = 128;
+
+ char statusbuf[STATUS_BUF_SIZE];
+ const auto statuslen = snprintf(statusbuf, sizeof(statusbuf),
+ "HTTP/1.1 %d %s\r\n", status, status_name);
+
+ return txbuf.sputn(statusbuf, statuslen);
}
size_t RGWCivetWeb::send_100_continue()
{
const char HTTTP_100_CONTINUE[] = "HTTP/1.1 100 CONTINUE\r\n\r\n";
- return write_data(HTTTP_100_CONTINUE, sizeof(HTTTP_100_CONTINUE) - 1);
+ const size_t sent = txbuf.sputn(HTTTP_100_CONTINUE,
+ sizeof(HTTTP_100_CONTINUE) - 1);
+ flush();
+ return sent;
}
size_t RGWCivetWeb::send_header(const boost::string_ref& name,
const boost::string_ref& value)
{
- return safe_mg_printf(conn, "%.*s: %.*s\r\n", name.length(), name.data(),
- value.length(), value.data());
+ static constexpr char HEADER_SEP[] = ": ";
+ static constexpr char HEADER_END[] = "\r\n";
+
+ size_t sent = 0;
+
+ sent += txbuf.sputn(name.data(), name.length());
+ sent += txbuf.sputn(HEADER_SEP, sizeof(HEADER_SEP) - 1);
+ sent += txbuf.sputn(value.data(), value.length());
+ sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);
+
+ return sent;
}
size_t RGWCivetWeb::dump_date_header()
return 0;
}
- return write_data(timestr, strlen(timestr));
+ return txbuf.sputn(timestr, strlen(timestr));
}
size_t RGWCivetWeb::complete_header()
if (explicit_keepalive) {
constexpr char CONN_KEEP_ALIVE[] = "Connection: Keep-Alive\r\n";
- sent += write_data(CONN_KEEP_ALIVE, sizeof(CONN_KEEP_ALIVE) - 1);
+ sent += txbuf.sputn(CONN_KEEP_ALIVE, sizeof(CONN_KEEP_ALIVE) - 1);
} else if (explicit_conn_close) {
constexpr char CONN_KEEP_CLOSE[] = "Connection: close\r\n";
- sent += write_data(CONN_KEEP_CLOSE, sizeof(CONN_KEEP_CLOSE) - 1);
+ sent += txbuf.sputn(CONN_KEEP_CLOSE, sizeof(CONN_KEEP_CLOSE) - 1);
}
- constexpr char HEADER_END[] = "\r\n";
- return sent + write_data(HEADER_END, sizeof(HEADER_END) - 1);
+ static constexpr char HEADER_END[] = "\r\n";
+ sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);
+
+ flush();
+ return sent;
}
size_t RGWCivetWeb::send_content_length(uint64_t len)
{
- return safe_mg_printf(conn, "Content-Length: %" PRIu64 "\r\n", len);
+ static constexpr size_t CONLEN_BUF_SIZE = 128;
+
+ char sizebuf[CONLEN_BUF_SIZE];
+ const auto sizelen = snprintf(sizebuf, sizeof(sizebuf),
+ "Content-Length: %" PRIu64 "\r\n", len);
+ return txbuf.sputn(sizebuf, sizelen);
}
struct mg_connection;
-class RGWCivetWeb : public rgw::io::RestfulClient
-{
+class RGWCivetWeb : public rgw::io::RestfulClient,
+ public rgw::io::BuffererSink {
RGWEnv env;
mg_connection *conn;
bool explicit_keepalive;
bool explicit_conn_close;
- size_t write_data(const char *buf, size_t len);
+ rgw::io::StaticOutputBufferer<> txbuf;
+
+ size_t write_data(const char *buf, size_t len) override;
size_t read_data(char *buf, size_t len);
size_t dump_date_header();
/* Hold a read lock over access to env.store for reconfiguration. */
RWLock::RLocker lock(env.mutex);
- RGWRequest req(env.store->get_new_req_id());
+ RGWCivetWeb cw_client(conn, env.port);
auto real_client_io = rgw::io::add_reordering(
rgw::io::add_buffering(
rgw::io::add_chunking(
rgw::io::add_conlen_controlling(
- RGWCivetWeb(conn, env.port)))));
+ &cw_client))));
RGWRestfulIO client_io(&real_client_io);
+ RGWRequest req(env.store->get_new_req_id());
int ret = process_request(env.store, env.rest, &req, env.uri_prefix,
&client_io, env.olog);
if (ret < 0) {
}
} /* rgw::io::DecoratedRestfulClient */;
+
+/* Interface that should be provided by a front-end class wanting to to use
+ * the low-level buffering offered by i.e. StaticOutputBufferer. */
+class BuffererSink {
+public:
+ virtual ~BuffererSink() = default;
+
+ /* Send exactly @len bytes from the memory location pointed by @buf.
+ * On success returns @len. On failure throws rgw::io::Exception. */
+ virtual size_t write_data(const char *buf, size_t len) = 0;
+};
+
+/* Utility class providing RestfulClient's implementations with facilities
+ * for low-level buffering without relying on dynamic memory allocations.
+ * The buffer is carried entirely on stack. This narrows down applicability
+ * to these situations where buffers are relatively small. This perfectly
+ * fits the needs of composing an HTTP header. Without that a front-end
+ * might need to issue a lot of small IO operations leading to increased
+ * overhead on syscalls and fragmentation of a message if the Nagle's
+ * algorithm won't be able to form a single TCP segment (usually when
+ * running on extremely fast network interfaces like the loopback). */
+template <size_t BufferSizeV = 4096>
+class StaticOutputBufferer : public std::streambuf {
+ static_assert(BufferSizeV >= sizeof(std::streambuf::char_type),
+ "Buffer size must be bigger than a single char_type.");
+
+ using std::streambuf::int_type;
+
+ int_type overflow(const int_type c) override {
+ *pptr() = c;
+ pbump(sizeof(std::streambuf::char_type));
+
+ if (! sync()) {
+ /* No error, the buffer has been successfully synchronized. */
+ return c;
+ } else {
+ return std::streambuf::traits_type::eof();
+ }
+ }
+
+ int sync() override {
+ const auto len = static_cast<size_t>(std::streambuf::pptr() -
+ std::streambuf::pbase());
+ std::streambuf::pbump(-len);
+ sink.write_data(std::streambuf::pbase(), len);
+ /* Always return success here. In case of failure write_data() will throw
+ * rgw::io::Exception. */
+ return 0;
+ }
+
+ BuffererSink& sink;
+ std::streambuf::char_type buffer[BufferSizeV];
+
+public:
+ StaticOutputBufferer(BuffererSink& sink)
+ : sink(sink) {
+ constexpr size_t len = sizeof(buffer) - sizeof(std::streambuf::char_type);
+ std::streambuf::setp(buffer, buffer + len);
+ }
+};
+
} /* namespace rgw */
} /* namespace io */
void RGWFCGX::flush()
{
+ txbuf.pubsync();
FCGX_FFlush(fcgx->out);
}
const auto statuslen = snprintf(statusbuf, sizeof(statusbuf),
"Status: %d %s\r\n", status, status_name);
- return write_data(statusbuf, statuslen);
+ return txbuf.sputn(statusbuf, statuslen);
}
size_t RGWFCGX::send_100_continue()
size_t RGWFCGX::send_header(const boost::string_ref& name,
const boost::string_ref& value)
{
- char hdrbuf[name.size() + 2 + value.size() + 2 + 1];
- const auto hdrlen = snprintf(hdrbuf, sizeof(hdrbuf),
- "%.*s: %.*s\r\n",
- static_cast<int>(name.length()),
- name.data(),
- static_cast<int>(value.length()),
- value.data());
-
- return write_data(hdrbuf, hdrlen);
+ static constexpr char HEADER_SEP[] = ": ";
+ static constexpr char HEADER_END[] = "\r\n";
+
+ size_t sent = 0;
+
+ sent += txbuf.sputn(name.data(), name.length());
+ sent += txbuf.sputn(HEADER_SEP, sizeof(HEADER_SEP) - 1);
+ sent += txbuf.sputn(value.data(), value.length());
+ sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);
+
+ return sent;
}
size_t RGWFCGX::send_content_length(const uint64_t len)
const auto sizelen = snprintf(sizebuf, sizeof(sizebuf),
"Content-Length: %" PRIu64 "\r\n", len);
- return write_data(sizebuf, sizelen);
+ return txbuf.sputn(sizebuf, sizelen);
}
size_t RGWFCGX::complete_header()
{
static constexpr char HEADER_END[] = "\r\n";
- return write_data(HEADER_END, sizeof(HEADER_END) - 1);
+ const size_t sent = txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);
+
+ flush();
+ return sent;
}
struct FCGX_Request;
-class RGWFCGX : public rgw::io::RestfulClient
-{
+class RGWFCGX : public rgw::io::RestfulClient,
+ public rgw::io::BuffererSink {
FCGX_Request *fcgx;
RGWEnv env;
+ rgw::io::StaticOutputBufferer<> txbuf;
+
size_t read_data(char* buf, size_t len);
- size_t write_data(const char* buf, size_t len);
+ size_t write_data(const char* buf, size_t len) override;
public:
explicit RGWFCGX(FCGX_Request* const fcgx)
- : fcgx(fcgx) {
+ : fcgx(fcgx),
+ txbuf(*this) {
}
void init_env(CephContext* cct) override;
return write_data(buf, len);
}
- void flush();
+ void flush() override;
RGWEnv& get_env() noexcept override {
return env;
void RGWFCGXProcess::handle_request(RGWRequest* r)
{
- RGWFCGXRequest* req = static_cast<RGWFCGXRequest*>(r);
- FCGX_Request* fcgx = req->fcgx;
+ RGWFCGXRequest* const req = static_cast<RGWFCGXRequest*>(r);
+
+ RGWFCGX fcgxfe(req->fcgx);
auto real_client_io = rgw::io::add_reordering(
rgw::io::add_buffering(
rgw::io::add_chunking(
- RGWFCGX(fcgx))));
+ &fcgxfe)));
RGWRestfulIO client_io(&real_client_io);
dout(20) << "process_request() returned " << ret << dendl;
}
- FCGX_Finish_r(fcgx);
+ FCGX_Finish_r(req->fcgx);
delete req;
} /* RGWFCGXProcess::handle_request */