From: Casey Bodley Date: Sun, 19 Jun 2016 11:04:09 +0000 (-0400) Subject: rgw: implement ClientIO interface for asio + beast X-Git-Tag: v11.1.0~454^2~69 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b2cc1d9ad6810040fa57df9f4b1238c19ad05c3d;p=ceph.git rgw: implement ClientIO interface for asio + beast Signed-off-by: Casey Bodley Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index cd31fdefd36..4b1b323db89 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -94,7 +94,11 @@ set(rgw_a_srcs rgw_torrent.cc) add_library(rgw_a STATIC ${rgw_a_srcs}) -target_include_directories(rgw_a PUBLIC ${FCGI_INCLUDE_DIR}) + +target_include_directories(rgw_a PUBLIC + "../Beast/include" + ${FCGI_INCLUDE_DIR}) + target_link_libraries(rgw_a librados cls_lock_client cls_rgw_client cls_refcount_client cls_log_client cls_statelog_client cls_timeindex_client cls_version_client cls_replica_log_client cls_user_client common common_utf8 global @@ -103,6 +107,7 @@ target_link_libraries(rgw_a librados cls_lock_client cls_rgw_client cls_refcount ${OPENLDAP_LIBRARIES} ${CRYPTO_LIBS}) set(radosgw_srcs + rgw_asio_client.cc rgw_asio_frontend.cc rgw_fcgi_process.cc rgw_loadgen_process.cc diff --git a/src/rgw/rgw_asio_client.cc b/src/rgw/rgw_asio_client.cc new file mode 100644 index 00000000000..653ade56598 --- /dev/null +++ b/src/rgw/rgw_asio_client.cc @@ -0,0 +1,154 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include +#include + +#include +#include +#include +#include + +#include "rgw_asio_client.h" + +#define dout_subsys ceph_subsys_rgw + +#undef dout_prefix +#define dout_prefix (*_dout << "asio: ") + + +RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket, tcp::endpoint&& endpoint) + : socket(std::move(socket)), endpoint(std::move(endpoint)) +{} + +RGWAsioClientIO::~RGWAsioClientIO() = default; + +void RGWAsioClientIO::init_env(CephContext *cct) +{ + beast::basic_streambuf> buf; // XXX: not sure what this is for + beast::http::read(socket, buf, req); + body_iter = req.body.begin(); + + const auto& headers = req.headers; + for (auto header = headers.begin(); header != headers.end(); ++header) { + const auto& name = header->name(); + const auto& value = header->value(); + + if (boost::algorithm::iequals(name, "content-length")) { + env.set("CONTENT_LENGTH", value); + continue; + } + if (boost::algorithm::iequals(name, "content-type")) { + env.set("CONTENT_TYPE", value); + continue; + } + if (boost::algorithm::iequals(name, "connection")) { + conn_keepalive = boost::algorithm::iequals(value, "keep-alive"); + conn_close = boost::algorithm::iequals(value, "close"); + } + + static const boost::string_ref HTTP_{"HTTP_"}; + + char buf[name.size() + HTTP_.size()]; + auto dest = std::copy(std::begin(HTTP_), std::end(HTTP_), buf); + for (auto src = name.begin(); src != name.end(); ++src, ++dest) { + if (*src == '-') { + *dest = '_'; + } else { + *dest = std::toupper(*src); + } + } + *dest = '\0'; + + env.set(buf, value); + } + + env.set("REQUEST_METHOD", req.method); + + // split uri from query + auto url = boost::string_ref{req.url}; + auto pos = url.find('?'); + auto query = url.substr(pos + 1); + url = url.substr(0, pos); + + env.set("REQUEST_URI", url); + env.set("QUERY_STRING", query); + env.set("SCRIPT_URI", url); /* FIXME */ + + char port_buf[16]; + snprintf(port_buf, sizeof(port_buf), "%d", socket.local_endpoint().port()); + env.set("SERVER_PORT", port_buf); + // TODO: set SERVER_PORT_SECURE if using ssl + // TODO: set REMOTE_USER if authenticated +} + +int RGWAsioClientIO::write_data(const char *buf, int len) +{ + boost::system::error_code ec; + auto bytes = boost::asio::write(socket, boost::asio::buffer(buf, len), ec); + if (ec) { + derr << "write_data failed with " << ec.message() << dendl; + return -ec.value(); + } + return bytes; +} + +int RGWAsioClientIO::read_data(char *buf, int max) +{ + // read data from the body's bufferlist + auto bytes = std::min(max, body_iter.get_remaining()); + body_iter.copy(bytes, buf); + return bytes; +} + +int RGWAsioClientIO::complete_request() +{ + return 0; +} + +void RGWAsioClientIO::flush() +{ +} + +int RGWAsioClientIO::send_status(int status, const char *status_name) +{ + return print("HTTP/1.1 %d %s\r\n", status, status_name); +} + +int RGWAsioClientIO::send_100_continue() +{ + return print("HTTP/1.1 100 CONTINUE\r\n\r\n"); +} + +static constexpr size_t TIME_BUF_SIZE = 128; +static int dump_date_header(char *timestr, size_t size) +{ + const time_t gtime = time(nullptr); + struct tm result; + struct tm const * const tmp = gmtime_r(>ime, &result); + if (tmp == nullptr) + return 0; + return strftime(timestr, size, "Date: %a, %d %b %Y %H:%M:%S %Z\r\n", tmp); +} + +int RGWAsioClientIO::complete_header() +{ + char timestr[TIME_BUF_SIZE]; + if (dump_date_header(timestr, sizeof(timestr))) { + print(timestr); + } + + if (conn_keepalive) { + print("Connection: Keep-Alive\r\n"); + } else if (conn_close) { + print("Connection: close\r\n"); + } + print("\r\n"); + return 0; +} + +int RGWAsioClientIO::send_content_length(uint64_t len) +{ + return print("Content-Length: %" PRIu64 "\r\n", len); +} diff --git a/src/rgw/rgw_asio_client.h b/src/rgw/rgw_asio_client.h new file mode 100644 index 00000000000..43239716e68 --- /dev/null +++ b/src/rgw/rgw_asio_client.h @@ -0,0 +1,96 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef RGW_ASIO_CLIENT_H +#define RGW_ASIO_CLIENT_H + +#include +#include +#include "include/assert.h" + +#include "rgw_client_io.h" + +// bufferlist to represent the message body +class RGWBufferlistBody { + public: + using value_type = ceph::bufferlist; + + class reader; + class writer; + + template + using message_type = beast::http::message; +}; + +class RGWAsioClientIO : public RGWStreamIO { + using tcp = boost::asio::ip::tcp; + tcp::socket socket; + tcp::endpoint endpoint; + + using body_type = RGWBufferlistBody; + using request_type = beast::http::request_v1; + request_type req; + + bufferlist::const_iterator body_iter; + + bool conn_keepalive{false}; + bool conn_close{false}; + + void init_env(CephContext *cct) override; + int write_data(const char *buf, int len) override; + int read_data(char *buf, int max) override; + + public: + RGWAsioClientIO(tcp::socket&& socket, tcp::endpoint&& endpoint); + ~RGWAsioClientIO(); + + int complete_request() override; + void flush() override; + int send_status(int status, const char *status_name) override; + int send_100_continue() override; + int complete_header() override; + int send_content_length(uint64_t len) override; +}; + +// used by beast::http::read() to read the body into a bufferlist +class RGWBufferlistBody::reader { + value_type& bl; + public: + template + explicit reader(message_type& m) : bl(m.body) {} + + void write(const char* data, size_t size, boost::system::error_code&) { + bl.append(data, size); + } +}; + +// used by beast::http::write() to write the buffered body +class RGWBufferlistBody::writer { + const value_type& bl; + public: + template + explicit writer(const message_type& msg) + : bl(msg.body) {} + + void init(boost::system::error_code& ec) {} + uint64_t content_length() const { return bl.length(); } + + template + boost::tribool operator()(beast::http::resume_context&&, + boost::system::error_code&, Write&& write) { + // translate from bufferlist to a ConstBufferSequence for beast + std::vector buffers; + buffers.reserve(bl.get_num_buffers()); + for (auto& ptr : bl.buffers()) { + buffers.emplace_back(ptr.c_str(), ptr.length()); + } + write(buffers); + return true; + } +}; +static_assert(beast::http::is_ReadableBody{}, + "RGWBufferlistBody does not satisfy ReadableBody"); +static_assert(beast::http::is_WritableBody{}, + "RGWBufferlistBody does not satisfy WritableBody"); + +#endif // RGW_ASIO_CLIENT_H