]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: factor the tcp::socket out of ClientIO
authorCasey Bodley <cbodley@redhat.com>
Tue, 23 Jan 2018 03:24:23 +0000 (22:24 -0500)
committerCasey Bodley <cbodley@redhat.com>
Thu, 5 Apr 2018 19:30:22 +0000 (15:30 -0400)
remove ClientIO's dependency on a concrete socket type by moving it into
a derived StreamIO class in rgw_asio_frontend.cc

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_asio_client.cc
src/rgw/rgw_asio_client.h
src/rgw/rgw_asio_frontend.cc

index 0ad801667b2f8437c0479aa38af3ae0ee2597db0..29db397df56fc4c6035cbb1528a12c0f181f1bc4 100644 (file)
 
 using namespace rgw::asio;
 
-ClientIO::ClientIO(tcp::socket& socket,
-                   parser_type& parser,
-                   beast::flat_buffer& buffer)
-  : socket(socket), parser(parser), buffer(buffer), txbuf(*this)
+ClientIO::ClientIO(parser_type& parser,
+                   const endpoint_type& local_endpoint,
+                   const endpoint_type& remote_endpoint)
+  : parser(parser),
+    local_endpoint(local_endpoint),
+    remote_endpoint(remote_endpoint),
+    txbuf(*this)
 {
 }
 
@@ -77,53 +80,14 @@ int ClientIO::init_env(CephContext *cct)
   env.set("SCRIPT_URI", url.to_string()); /* FIXME */
 
   char port_buf[16];
-  snprintf(port_buf, sizeof(port_buf), "%d", socket.local_endpoint().port());
+  snprintf(port_buf, sizeof(port_buf), "%d", local_endpoint.port());
   env.set("SERVER_PORT", port_buf);
-  env.set("REMOTE_ADDR", socket.remote_endpoint().address().to_string());
+  env.set("REMOTE_ADDR", remote_endpoint.address().to_string());
   // TODO: set SERVER_PORT_SECURE if using ssl
   // TODO: set REMOTE_USER if authenticated
   return 0;
 }
 
-size_t ClientIO::write_data(const char* buf, size_t len)
-{
-  boost::system::error_code ec;
-  auto bytes = boost::asio::write(socket, boost::asio::buffer(buf, len), ec);
-  if (ec) {
-    derr << "write_data failed: " << ec.message() << dendl;
-    throw rgw::io::Exception(ec.value(), std::system_category());
-  }
-  /* According to the documentation of boost::asio::write if there is
-   * no error (signalised by ec), then bytes == len. We don't need to
-   * take care of partial writes in such situation. */
-  return bytes;
-}
-
-size_t ClientIO::read_data(char* buf, size_t max)
-{
-  auto& message = parser.get();
-  auto& body_remaining = message.body();
-  body_remaining.data = buf;
-  body_remaining.size = max;
-
-  dout(30) << this << " read_data for " << max << " with "
-      << buffer.size() << " bytes buffered" << dendl;
-
-  while (body_remaining.size && !parser.is_done()) {
-    boost::system::error_code ec;
-    beast::http::read_some(socket, buffer, parser, ec);
-    if (ec == beast::http::error::partial_message ||
-        ec == beast::http::error::need_buffer) {
-      break;
-    }
-    if (ec) {
-      derr << "failed to read body: " << ec.message() << dendl;
-      throw rgw::io::Exception(ec.value(), std::system_category());
-    }
-  }
-  return max - body_remaining.size;
-}
-
 size_t ClientIO::complete_request()
 {
   perfcounter->inc(l_rgw_qlen, -1);
index eeed4ee8051e28fa9e5dac1a066ec37e2c4f0d50..2dac89b41775f2637b681b6a1d8ed463709dbc84 100644 (file)
@@ -18,22 +18,21 @@ using parser_type = beast::http::request_parser<beast::http::buffer_body>;
 
 class ClientIO : public io::RestfulClient,
                  public io::BuffererSink {
- private:
-  using tcp = boost::asio::ip::tcp;
-  tcp::socket& socket;
+ protected:
   parser_type& parser;
-  beast::flat_buffer& buffer; //< parse buffer
+ private:
+  using endpoint_type = boost::asio::ip::tcp::endpoint;
+  endpoint_type local_endpoint;
+  endpoint_type remote_endpoint;
 
   RGWEnv env;
 
   rgw::io::StaticOutputBufferer<> txbuf;
 
-  size_t write_data(const char *buf, size_t len) override;
-  size_t read_data(char *buf, size_t max);
-
  public:
-  ClientIO(tcp::socket& socket, parser_type& parser,
-           beast::flat_buffer& buffer);
+  ClientIO(parser_type& parser,
+           const endpoint_type& local_endpoint,
+           const endpoint_type& remote_endpoint);
   ~ClientIO() override;
 
   int init_env(CephContext *cct) override;
@@ -46,10 +45,6 @@ class ClientIO : public io::RestfulClient,
   size_t send_content_length(uint64_t len) override;
   size_t complete_header() override;
 
-  size_t recv_body(char* buf, size_t max) override {
-    return read_data(buf, max);
-  }
-
   size_t send_body(const char* buf, size_t len) override {
     return write_data(buf, len);
   }
index faf7806fccb64ccfdcdbacc99d6317dd53c63c45..73c6fe2b497ba6443eee22257bcb8beaeea81024 100644 (file)
@@ -63,6 +63,50 @@ void Pauser::wait()
 using tcp = boost::asio::ip::tcp;
 namespace beast = boost::beast;
 
+class StreamIO : public rgw::asio::ClientIO {
+  tcp::socket& stream;
+  beast::flat_buffer& buffer;
+ public:
+  StreamIO(tcp::socket& stream, rgw::asio::parser_type& parser,
+           beast::flat_buffer& buffer,
+           const tcp::endpoint& local_endpoint,
+           const tcp::endpoint& remote_endpoint)
+      : ClientIO(parser, local_endpoint, remote_endpoint),
+        stream(stream), buffer(buffer)
+  {}
+
+  size_t write_data(const char* buf, size_t len) override {
+    boost::system::error_code ec;
+    auto bytes = boost::asio::write(stream, boost::asio::buffer(buf, len), ec);
+    if (ec) {
+      derr << "write_data failed: " << ec.message() << dendl;
+      throw rgw::io::Exception(ec.value(), std::system_category());
+    }
+    return bytes;
+  }
+
+  size_t recv_body(char* buf, size_t max) override {
+    auto& message = parser.get();
+    auto& body_remaining = message.body();
+    body_remaining.data = buf;
+    body_remaining.size = max;
+
+    while (body_remaining.size && !parser.is_done()) {
+      boost::system::error_code ec;
+      beast::http::read_some(stream, buffer, parser, ec);
+      if (ec == beast::http::error::partial_message ||
+          ec == beast::http::error::need_buffer) {
+        break;
+      }
+      if (ec) {
+        derr << "failed to read body: " << ec.message() << dendl;
+        throw rgw::io::Exception(ec.value(), std::system_category());
+      }
+    }
+    return max - body_remaining.size;
+  }
+};
+
 void handle_connection(RGWProcessEnv& env, tcp::socket& socket,
                        boost::asio::yield_context yield)
 {
@@ -106,7 +150,9 @@ void handle_connection(RGWProcessEnv& env, tcp::socket& socket,
     // process the request
     RGWRequest req{env.store->get_new_req_id()};
 
-    rgw::asio::ClientIO real_client{socket, parser, buffer};
+    StreamIO real_client{socket, parser, buffer,
+                         socket.local_endpoint(),
+                         socket.remote_endpoint()};
 
     auto real_client_io = rgw::io::add_reordering(
                             rgw::io::add_buffering(cct,