]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd/migration: pipelined http request / response implementation
authorJason Dillaman <dillaman@redhat.com>
Wed, 4 Nov 2020 01:30:39 +0000 (20:30 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 17 Nov 2020 01:25:15 +0000 (20:25 -0500)
The generic http/https client now supports the ability to send
pipelined requests and async callbacks with the responses. It will
automatically attempt to reconnect to the server upon any errors
as well.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/migration/HttpClient.cc
src/librbd/migration/HttpClient.h
src/test/librbd/migration/test_mock_HttpClient.cc

index d3f4338a2b9dbc95328af3608729a998d091cd9d..4739b75d1a6e4abbbcb34c2dd2fa80fd4ab412d2 100644 (file)
 #include <boost/asio/read.hpp>
 #include <boost/asio/ssl.hpp>
 #include <boost/beast/core.hpp>
-#include <boost/beast/ssl/ssl_stream.hpp>
+#include <boost/beast/http/read.hpp>
+#include <deque>
 
 namespace librbd {
 namespace migration {
 
-template <typename I>
-struct HttpClient<I>::HttpSessionInterface {
-  virtual ~HttpSessionInterface() {}
-
-  virtual void init(Context* on_finish) = 0;
-  virtual void shut_down(Context* on_finish) = 0;
-};
-
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
 #define dout_prefix *_dout << "librbd::migration::HttpClient::" \
                            << "HttpSession " << this << " " << __func__ \
                            << ": "
 
+/**
+ * boost::beast utilizes non-inheriting template classes for handling plain vs
+ * encrypted TCP streams. Utilize a base-class for handling the majority of the
+ * logic for handling connecting, disconnecting, reseting, and sending requests.
+ */
+
 template <typename I>
 template <typename D>
 class HttpClient<I>::HttpSession : public HttpSessionInterface {
 public:
   void init(Context* on_finish) override {
+    ceph_assert(m_http_client->m_strand.running_in_this_thread());
+
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 15) << dendl;
+
+    ceph_assert(m_state == STATE_UNINITIALIZED);
+    m_state = STATE_CONNECTING;
+
     resolve_host(on_finish);
   }
 
   void shut_down(Context* on_finish) override {
-    disconnect(new LambdaContext([this, on_finish](int r) {
-      handle_shut_down(r, on_finish); }));
+    ceph_assert(m_http_client->m_strand.running_in_this_thread());
+
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 15) << dendl;
+
+    ceph_assert(on_finish != nullptr);
+    ceph_assert(m_on_shutdown == nullptr);
+    m_on_shutdown = on_finish;
+
+    auto current_state = m_state;
+    if (current_state == STATE_UNINITIALIZED) {
+      // never initialized or resolve/connect failed
+      on_finish->complete(0);
+      return;
+    }
+
+    m_state = STATE_SHUTTING_DOWN;
+    if (current_state != STATE_READY) {
+      // delay shutdown until current state transition completes
+      return;
+    }
+
+    disconnect(new LambdaContext([this](int r) { handle_shut_down(r); }));
+  }
+
+  void issue(std::shared_ptr<Work>&& work) override {
+    ceph_assert(m_http_client->m_strand.running_in_this_thread());
+
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 20) << "work=" << work.get() << dendl;
+
+    if (is_shutdown()) {
+      lderr(cct) << "cannot issue HTTP request, client is shutdown"
+                 << dendl;
+      work->complete(-ESHUTDOWN, {});
+      return;
+    }
+
+    bool first_issue = m_issue_queue.empty();
+    m_issue_queue.emplace_back(work);
+    if (m_state == STATE_READY && first_issue) {
+      ldout(cct, 20) << "sending http request: work=" << work.get() << dendl;
+      finalize_issue(std::move(work));
+    } else if (m_state == STATE_UNINITIALIZED) {
+      ldout(cct, 20) << "resetting HTTP session: work=" << work.get() << dendl;
+      m_state = STATE_RESET_CONNECTING;
+      resolve_host(nullptr);
+    } else {
+      ldout(cct, 20) << "queueing HTTP request: work=" << work.get() << dendl;
+    }
+  }
+
+  void finalize_issue(std::shared_ptr<Work>&& work) {
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 20) << "work=" << work.get() << dendl;
+
+    ++m_in_flight_requests;
+    (*work)(this, derived().stream());
+  }
+
+  void handle_issue(boost::system::error_code ec,
+                    std::shared_ptr<Work>&& work) override {
+    ceph_assert(m_http_client->m_strand.running_in_this_thread());
+
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 20) << "work=" << work.get() << ", r=" << -ec.value() << dendl;
+
+    ceph_assert(m_in_flight_requests > 0);
+    --m_in_flight_requests;
+    if (maybe_finalize_reset()) {
+      // previous request is attempting reset to this request will be resent
+      return;
+    }
+
+    ceph_assert(!m_issue_queue.empty());
+    m_issue_queue.pop_front();
+
+    if (is_shutdown()) {
+      lderr(cct) << "client shutdown during in-flight request" << dendl;
+      work->complete(-ESHUTDOWN, {});
+
+      maybe_finalize_shutdown();
+      return;
+    }
+
+    if (ec) {
+      if (ec == boost::asio::error::bad_descriptor ||
+          ec == boost::asio::error::broken_pipe ||
+          ec == boost::asio::error::connection_reset ||
+          ec == boost::asio::error::operation_aborted ||
+          ec == boost::asio::ssl::error::stream_truncated ||
+          ec == boost::beast::http::error::end_of_stream ||
+          ec == boost::beast::http::error::partial_message) {
+        ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl;
+        m_issue_queue.push_front(work);
+      } else if (ec == boost::beast::error::timeout) {
+        lderr(cct) << "timed-out while issuing request" << dendl;
+        work->complete(-ETIMEDOUT, {});
+      } else {
+        lderr(cct) << "failed to issue request: " << ec.message() << dendl;
+        work->complete(-ec.value(), {});
+      }
+
+      // attempt to recover the connection
+      reset();
+      return;
+    }
+
+    bool first_receive = m_receive_queue.empty();
+    m_receive_queue.push_back(work);
+    if (first_receive) {
+      receive(std::move(work));
+    }
+
+    // TODO disable pipelining for non-idempotent requests
+
+    // pipeline the next request into the stream
+    if (!m_issue_queue.empty()) {
+      work = m_issue_queue.front();
+      ldout(cct, 20) << "sending http request: work=" << work.get() << dendl;
+      finalize_issue(std::move(work));
+    }
   }
 
 protected:
@@ -57,14 +184,40 @@ protected:
                        Context* on_finish) = 0;
   virtual void disconnect(Context* on_finish) = 0;
 
-  void close() {
+  void close_socket() {
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 15) << dendl;
+
     boost::system::error_code ec;
     boost::beast::get_lowest_layer(derived().stream()).socket().close(ec);
   }
 
 private:
+  enum State {
+    STATE_UNINITIALIZED,
+    STATE_CONNECTING,
+    STATE_READY,
+    STATE_RESET_PENDING,
+    STATE_RESET_DISCONNECTING,
+    STATE_RESET_CONNECTING,
+    STATE_SHUTTING_DOWN,
+    STATE_SHUTDOWN,
+  };
+
+  State m_state = STATE_UNINITIALIZED;
   boost::asio::ip::tcp::resolver m_resolver;
 
+  Context* m_on_shutdown = nullptr;
+
+  uint64_t m_in_flight_requests = 0;
+  std::deque<std::shared_ptr<Work>> m_issue_queue;
+  std::deque<std::shared_ptr<Work>> m_receive_queue;
+
+  boost::beast::flat_buffer m_buffer;
+  std::optional<boost::beast::http::parser<
+    false, boost::beast::http::empty_body>> m_header_parser;
+  std::optional<boost::beast::http::parser<false, StringBody>> m_parser;
+
   D& derived() {
     return static_cast<D&>(*this);
   }
@@ -73,28 +226,33 @@ private:
     auto cct = m_http_client->m_cct;
     ldout(cct, 15) << dendl;
 
+    shutdown_socket();
     m_resolver.async_resolve(
       m_http_client->m_url_spec.host, m_http_client->m_url_spec.port,
-      asio::util::get_callback_adapter(
-        [this, on_finish](int r, auto results) {
-          handle_resolve_host(r, results, on_finish); }));
+      [this, on_finish](boost::system::error_code ec, auto results) {
+          handle_resolve_host(ec, results, on_finish); });
   }
 
   void handle_resolve_host(
-      int r, boost::asio::ip::tcp::resolver::results_type results,
+      boost::system::error_code ec,
+      boost::asio::ip::tcp::resolver::results_type results,
       Context* on_finish) {
     auto cct = m_http_client->m_cct;
+    int r = -ec.value();
     ldout(cct, 15) << "r=" << r << dendl;
 
-    if (r < 0) {
-      if (r == -boost::asio::error::host_not_found) {
+    if (ec) {
+      if (ec == boost::asio::error::host_not_found) {
         r = -ENOENT;
+      } else if (ec == boost::asio::error::host_not_found_try_again) {
+        // TODO: add retry throttle
+        r = -EAGAIN;
       }
 
       lderr(cct) << "failed to resolve host '"
                  << m_http_client->m_url_spec.host << "': "
                  << cpp_strerror(r) << dendl;
-      on_finish->complete(r);
+      advance_state(STATE_UNINITIALIZED, r, on_finish);
       return;
     }
 
@@ -110,24 +268,287 @@ private:
       lderr(cct) << "failed to connect to host '"
                  << m_http_client->m_url_spec.host << "': "
                  << cpp_strerror(r) << dendl;
-      on_finish->complete(r);
+      advance_state(STATE_UNINITIALIZED, r, on_finish);
       return;
     }
 
-    on_finish->complete(0);
+    advance_state(STATE_READY, 0, on_finish);
   }
 
-  void handle_shut_down(int r, Context* on_finish) {
+  void handle_shut_down(int r) {
     auto cct = m_http_client->m_cct;
     ldout(cct, 15) << "r=" << r << dendl;
 
     if (r < 0) {
-      lderr(cct) << "failed to close stream: '" << cpp_strerror(r) << dendl;
-      on_finish->complete(r);
+      lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r)
+                 << dendl;
+    }
+
+    // cancel all in-flight send/receives (if any)
+    shutdown_socket();
+
+    maybe_finalize_shutdown();
+  }
+
+  void maybe_finalize_shutdown() {
+    if (m_in_flight_requests > 0) {
       return;
     }
 
-    on_finish->complete(0);
+    // cancel any queued IOs
+    fail_queued_work(-ESHUTDOWN);
+
+    advance_state(STATE_SHUTDOWN, 0, nullptr);
+  }
+
+  bool is_shutdown() const {
+    ceph_assert(m_http_client->m_strand.running_in_this_thread());
+    return (m_state == STATE_SHUTTING_DOWN || m_state == STATE_SHUTDOWN);
+  }
+
+  void reset() {
+    ceph_assert(m_http_client->m_strand.running_in_this_thread());
+    ceph_assert(m_state == STATE_READY);
+
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 15) << dendl;
+
+    m_state = STATE_RESET_PENDING;
+    maybe_finalize_reset();
+  }
+
+  bool maybe_finalize_reset() {
+    if (m_state != STATE_RESET_PENDING) {
+      return false;
+    }
+
+    if (m_in_flight_requests > 0) {
+      return true;
+    }
+
+    ceph_assert(m_http_client->m_strand.running_in_this_thread());
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 15) << dendl;
+
+    m_buffer.clear();
+
+    // move in-flight request back to the front of the issue queue
+    m_issue_queue.insert(m_issue_queue.begin(),
+                         m_receive_queue.begin(), m_receive_queue.end());
+    m_receive_queue.clear();
+
+    m_state = STATE_RESET_DISCONNECTING;
+    disconnect(new LambdaContext([this](int r) { handle_reset(r); }));
+    return true;
+  }
+
+  void handle_reset(int r) {
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 15) << "r=" << r << dendl;
+
+    if (r < 0) {
+      lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r)
+                 << dendl;
+    }
+
+    advance_state(STATE_RESET_CONNECTING, r, nullptr);
+  }
+
+  int shutdown_socket() {
+    if (!boost::beast::get_lowest_layer(
+          derived().stream()).socket().is_open()) {
+      return 0;
+    }
+
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 15) << dendl;
+
+    boost::system::error_code ec;
+    boost::beast::get_lowest_layer(derived().stream()).socket().shutdown(
+      boost::asio::ip::tcp::socket::shutdown_both, ec);
+
+    if (ec && ec != boost::beast::errc::not_connected) {
+      lderr(cct) << "failed to shutdown socket: " << ec.message() << dendl;
+      return -ec.value();
+    }
+
+    close_socket();
+    return 0;
+  }
+
+  void receive(std::shared_ptr<Work>&& work) {
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 15) << "work=" << work.get() << dendl;
+
+    ceph_assert(!m_receive_queue.empty());
+    ++m_in_flight_requests;
+
+    // receive the response for this request
+    m_parser.emplace();
+    if (work->header_only()) {
+      // HEAD requests don't trasfer data but the parser still cares about max
+      // content-length
+      m_header_parser.emplace();
+      m_header_parser->body_limit(std::numeric_limits<uint64_t>::max());
+
+      boost::beast::http::async_read_header(
+        derived().stream(), m_buffer, *m_header_parser,
+        [this, work=std::move(work)]
+        (boost::beast::error_code ec, std::size_t) mutable {
+          handle_receive(ec, std::move(work));
+        });
+    } else {
+      m_parser->body_limit(1 << 25); // max RBD object size
+      boost::beast::http::async_read(
+        derived().stream(), m_buffer, *m_parser,
+        [this, work=std::move(work)]
+        (boost::beast::error_code ec, std::size_t) mutable {
+          handle_receive(ec, std::move(work));
+        });
+    }
+  }
+
+  void handle_receive(boost::system::error_code ec,
+                      std::shared_ptr<Work>&& work) {
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 15) << "work=" << work.get() << ", r=" << -ec.value() << dendl;
+
+    ceph_assert(m_in_flight_requests > 0);
+    --m_in_flight_requests;
+    if (maybe_finalize_reset()) {
+      // previous request is attempting reset to this request will be resent
+      return;
+    }
+
+    ceph_assert(!m_receive_queue.empty());
+    m_receive_queue.pop_front();
+
+    if (is_shutdown()) {
+      lderr(cct) << "client shutdown with in-flight request" << dendl;
+      work->complete(-ESHUTDOWN, {});
+
+      maybe_finalize_shutdown();
+      return;
+    }
+
+    if (ec) {
+      if (ec == boost::asio::error::bad_descriptor ||
+          ec == boost::asio::error::broken_pipe ||
+          ec == boost::asio::error::connection_reset ||
+          ec == boost::asio::error::operation_aborted ||
+          ec == boost::asio::ssl::error::stream_truncated ||
+          ec == boost::beast::http::error::end_of_stream ||
+          ec == boost::beast::http::error::partial_message) {
+        ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl;
+        m_receive_queue.push_front(work);
+      } else if (ec == boost::beast::error::timeout) {
+        lderr(cct) << "timed-out while issuing request" << dendl;
+        work->complete(-ETIMEDOUT, {});
+      } else {
+        lderr(cct) << "failed to issue request: " << ec.message() << dendl;
+        work->complete(-ec.value(), {});
+      }
+
+      reset();
+      return;
+    }
+
+    boost::beast::http::response<StringBody> response;
+    if (work->header_only()) {
+      m_parser.emplace(std::move(*m_header_parser));
+    }
+    response = std::move(m_parser->release());
+
+    bool need_eof = response.need_eof();
+    work->complete(0, std::move(response));
+
+    if (need_eof) {
+      ldout(cct, 20) << "reset required for non-pipelined response: "
+                     << "work=" << work.get() << dendl;
+      reset();
+    } else if (!m_receive_queue.empty()) {
+      auto work = m_receive_queue.front();
+      receive(std::move(work));
+    }
+  }
+
+  void advance_state(State next_state, int r, Context* on_finish) {
+    auto cct = m_http_client->m_cct;
+    auto current_state = m_state;
+    ldout(cct, 15) << "current_state=" << current_state << ", "
+                   << "next_state=" << next_state << ", "
+                   << "r=" << r << dendl;
+
+    m_state = next_state;
+    if (current_state == STATE_CONNECTING) {
+      if (next_state == STATE_UNINITIALIZED) {
+        shutdown_socket();
+        on_finish->complete(r);
+        return;
+      } else if (next_state == STATE_READY) {
+        on_finish->complete(r);
+        return;
+      }
+    } else if (current_state == STATE_SHUTTING_DOWN) {
+      if (next_state == STATE_READY) {
+        // shut down requested while connecting/resetting
+        disconnect(new LambdaContext([this](int r) { handle_shut_down(r); }));
+        return;
+      } else if (next_state == STATE_UNINITIALIZED ||
+                 next_state == STATE_SHUTDOWN ||
+                 next_state == STATE_RESET_CONNECTING) {
+        ceph_assert(m_on_shutdown != nullptr);
+        m_on_shutdown->complete(r);
+        return;
+      }
+    } else if (current_state == STATE_RESET_DISCONNECTING) {
+      // disconnected from peer -- ignore errors and reconnect
+      ceph_assert(next_state == STATE_RESET_CONNECTING);
+      ceph_assert(on_finish == nullptr);
+      shutdown_socket();
+      resolve_host(nullptr);
+      return;
+    } else if (current_state == STATE_RESET_CONNECTING) {
+      ceph_assert(on_finish == nullptr);
+      if (next_state == STATE_READY) {
+        // restart queued IO
+        if (!m_issue_queue.empty()) {
+          auto& work = m_issue_queue.front();
+          finalize_issue(std::move(work));
+        }
+        return;
+      } else if (next_state == STATE_UNINITIALIZED) {
+        shutdown_socket();
+
+        // fail all queued IO
+        fail_queued_work(r);
+        return;
+      }
+    }
+
+    lderr(cct) << "unexpected state transition: "
+               << "current_state=" << current_state << ", "
+               << "next_state=" << next_state << dendl;
+    ceph_assert(false);
+  }
+
+  void complete_work(std::shared_ptr<Work> work, int r,
+                     boost::beast::http::response<StringBody>&& response) {
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 20) << "work=" << work.get() << ", r=" << r << dendl;
+
+    work->complete(r, std::move(response));
+  }
+
+  void fail_queued_work(int r) {
+    auto cct = m_http_client->m_cct;
+    ldout(cct, 10) << "r=" << r << dendl;
+
+    for (auto& work : m_issue_queue) {
+      complete_work(work, r, {});
+    }
+    m_issue_queue.clear();
+    ceph_assert(m_receive_queue.empty());
   }
 };
 
@@ -144,10 +565,9 @@ public:
       m_stream(http_client->m_strand) {
   }
   ~PlainHttpSession() override {
-    this->close();
+    this->close_socket();
   }
 
-
   inline boost::beast::tcp_stream&
   stream() {
     return m_stream;
@@ -167,15 +587,7 @@ protected:
   }
 
   void disconnect(Context* on_finish) override {
-    auto http_client = this->m_http_client;
-    auto cct = http_client->m_cct;
-    ldout(cct, 15) << dendl;
-
-    boost::system::error_code ec;
-    m_stream.socket().shutdown(
-      boost::asio::ip::tcp::socket::shutdown_both, ec);
-
-    on_finish->complete(-ec.value());
+    on_finish->complete(0);
   }
 
 private:
@@ -196,7 +608,7 @@ public:
       m_stream(http_client->m_strand, http_client->m_ssl_context) {
   }
   ~SslHttpSession() override {
-    this->close();
+    this->close_socket();
   }
 
   inline boost::beast::ssl_stream<boost::beast::tcp_stream>&
@@ -223,13 +635,14 @@ protected:
     auto cct = http_client->m_cct;
     ldout(cct, 15) << dendl;
 
-    if (m_ssl_enabled) {
-      m_stream.async_shutdown(
-        asio::util::get_callback_adapter([this, on_finish](int r) {
-          shutdown(r, on_finish); }));
-    } else {
-      shutdown(0, on_finish);
+    if (!m_ssl_enabled) {
+      on_finish->complete(0);
+      return;
     }
+
+    m_stream.async_shutdown(
+      asio::util::get_callback_adapter([this, on_finish](int r) {
+        shutdown(r, on_finish); }));
   }
 
 private:
@@ -319,11 +732,7 @@ private:
     auto cct = http_client->m_cct;
     ldout(cct, 15) << "r=" << r << dendl;
 
-    boost::system::error_code ec;
-    boost::beast::get_lowest_layer(m_stream).socket().shutdown(
-      boost::asio::ip::tcp::socket::shutdown_both, ec);
-
-    on_finish->complete(-ec.value());
+    on_finish->complete(r);
   }
 };
 
@@ -351,22 +760,20 @@ void HttpClient<I>::open(Context* on_finish) {
     return;
   }
 
-  // initial bootstrap connection -- later IOs might rebuild the session
-  create_http_session(on_finish);
+  boost::asio::post(m_strand, [this, on_finish]() mutable {
+    create_http_session(on_finish); });
 }
 
 template <typename I>
 void HttpClient<I>::close(Context* on_finish) {
-  // execute within the strand to ensure all IO completes
-  boost::asio::post(
-    m_strand, [this, on_finish]() {
-      if (m_http_session == nullptr) {
-        on_finish->complete(0);
-        return;
-      }
+  boost::asio::post(m_strand, [this, on_finish]() mutable {
+    shut_down_http_session(on_finish); });
+}
 
-      m_http_session->shut_down(on_finish);
-    });
+template <typename I>
+void HttpClient<I>::issue(std::shared_ptr<Work>&& work) {
+  boost::asio::post(m_strand, [this, work=std::move(work)]() mutable {
+    m_http_session->issue(std::move(work)); });
 }
 
 template <typename I>
@@ -389,6 +796,18 @@ void HttpClient<I>::create_http_session(Context* on_finish) {
   m_http_session->init(on_finish);
 }
 
+template <typename I>
+void HttpClient<I>::shut_down_http_session(Context* on_finish) {
+  ldout(m_cct, 15) << dendl;
+
+  if (m_http_session == nullptr) {
+    on_finish->complete(0);
+    return;
+  }
+
+  m_http_session->shut_down(on_finish);
+}
+
 } // namespace migration
 } // namespace librbd
 
index bbfba449c31821998baaa41a44aa30887e69c7ae..a812c94c3cad51e87b8892f99108a58934e3904a 100644 (file)
 #include <boost/asio/io_context_strand.hpp>
 #include <boost/asio/ip/tcp.hpp>
 #include <boost/asio/ssl/context.hpp>
+#include <boost/beast/version.hpp>
+#include <boost/beast/core/tcp_stream.hpp>
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <boost/beast/http/write.hpp>
+#include <boost/beast/ssl/ssl_stream.hpp>
 #include <memory>
 #include <string>
 #include <utility>
@@ -41,7 +47,90 @@ public:
     m_ignore_self_signed_cert = ignore;
   }
 
+  using StringBody = boost::beast::http::string_body;
+
+  template <class Body, typename Completion>
+  void issue(boost::beast::http::request<Body>&& request,
+             Completion&& completion) {
+    struct WorkImpl : Work {
+      boost::beast::http::request<Body> request;
+      Completion completion;
+
+      WorkImpl(boost::beast::http::request<Body>&& request,
+               Completion&& completion)
+        : request(std::move(request)), completion(std::move(completion)) {
+      }
+      WorkImpl(const WorkImpl&) = delete;
+      WorkImpl& operator=(const WorkImpl&) = delete;
+
+      bool need_eof() const override {
+        return request.need_eof();
+      }
+
+      bool header_only() const override {
+        return (request.method() == boost::beast::http::verb::head);
+      }
+
+      void complete(
+          int r, boost::beast::http::response<StringBody>&& response) override {
+        completion(r, std::move(response));
+      }
+
+      void operator()(
+          HttpSessionInterface* http_session,
+          boost::beast::tcp_stream& stream) override {
+        boost::beast::http::async_write(
+          stream, request,
+          [http_session, work=this->shared_from_this()]
+          (boost::beast::error_code ec, std::size_t) mutable {
+            http_session->handle_issue(ec, std::move(work));
+          });
+      }
+
+      void operator()(
+          HttpSessionInterface* http_session,
+          boost::beast::ssl_stream<boost::beast::tcp_stream>& stream) override {
+        boost::beast::http::async_write(
+          stream, request,
+          [http_session, work=this->shared_from_this()]
+          (boost::beast::error_code ec, std::size_t) mutable {
+            http_session->handle_issue(ec, std::move(work));
+          });
+      }
+    };
+
+    initialize_default_fields(request);
+    issue(std::make_shared<WorkImpl>(std::move(request),
+                                     std::move(completion)));
+  }
+
 private:
+  struct Work;
+  struct HttpSessionInterface {
+    virtual ~HttpSessionInterface() {}
+
+    virtual void init(Context* on_finish) = 0;
+    virtual void shut_down(Context* on_finish) = 0;
+
+    virtual void issue(std::shared_ptr<Work>&& work) = 0;
+    virtual void handle_issue(boost::system::error_code ec,
+                              std::shared_ptr<Work>&& work) = 0;
+  };
+
+  struct Work : public std::enable_shared_from_this<Work> {
+    virtual ~Work() {}
+    virtual bool need_eof() const = 0;
+    virtual bool header_only() const = 0;
+    virtual void complete(
+        int r, boost::beast::http::response<StringBody>&&) = 0;
+    virtual void operator()(
+        HttpSessionInterface* http_session,
+        boost::beast::tcp_stream& stream) = 0;
+    virtual void operator()(
+        HttpSessionInterface* http_session,
+        boost::beast::ssl_stream<boost::beast::tcp_stream>& stream) = 0;
+  };
+
   struct HttpSessionInterface;
   template <typename D> struct HttpSession;
   struct PlainHttpSession;
@@ -60,7 +149,18 @@ private:
   boost::asio::ssl::context m_ssl_context;
   std::unique_ptr<HttpSessionInterface> m_http_session;
 
+  template <typename Fields>
+  void initialize_default_fields(Fields& fields) const {
+    fields.target(m_url_spec.path);
+    fields.set(boost::beast::http::field::host, m_url_spec.host);
+    fields.set(boost::beast::http::field::user_agent,
+               BOOST_BEAST_VERSION_STRING);
+  }
+
+  void issue(std::shared_ptr<Work>&& work);
+
   void create_http_session(Context* on_finish);
+  void shut_down_http_session(Context* on_finish);
 };
 
 } // namespace migration
index 25a6de519c6d5f1879d3fb53425354a0eb144cc2..618718e9a8cffdeea85d2715c1b64a79780a3d23 100644 (file)
@@ -9,6 +9,8 @@
 #include "gtest/gtest.h"
 #include "gmock/gmock.h"
 #include <boost/asio/ip/tcp.hpp>
+#include <boost/beast/core.hpp>
+#include <boost/beast/http.hpp>
 
 namespace librbd {
 namespace {
@@ -23,6 +25,35 @@ struct MockTestImageCtx : public MockImageCtx {
 
 #include "librbd/migration/HttpClient.cc"
 
+using EmptyHttpRequest = boost::beast::http::request<
+  boost::beast::http::empty_body>;
+using HttpRequest = boost::beast::http::request<
+  boost::beast::http::string_body>;
+using HttpResponse = boost::beast::http::response<
+  boost::beast::http::string_body>;
+
+namespace boost {
+namespace beast {
+namespace http {
+
+template <typename Body>
+bool operator==(const boost::beast::http::request<Body>& lhs,
+                const boost::beast::http::request<Body>& rhs) {
+  return (lhs.method() == rhs.method() &&
+          lhs.target() == rhs.target());
+}
+
+template <typename Body>
+bool operator==(const boost::beast::http::response<Body>& lhs,
+                const boost::beast::http::response<Body>& rhs) {
+  return (lhs.result() == rhs.result() &&
+          lhs.body() == rhs.body());
+}
+
+} // namespace http
+} // namespace beast
+} // namespace boost
+
 namespace librbd {
 namespace migration {
 
@@ -37,10 +68,7 @@ public:
 
     ASSERT_EQ(0, open_image(m_image_name, &m_image_ctx));
 
-    m_acceptor.emplace(*m_image_ctx->asio_engine,
-                       boost::asio::ip::tcp::endpoint(
-                         boost::asio::ip::tcp::v4(), 0));
-    m_server_port = m_acceptor->local_endpoint().port();
+    create_acceptor();
   }
 
   void TearDown() override {
@@ -49,6 +77,13 @@ public:
     TestMockFixture::TearDown();
   }
 
+  void create_acceptor() {
+    m_acceptor.emplace(*m_image_ctx->asio_engine,
+                       boost::asio::ip::tcp::endpoint(
+                         boost::asio::ip::tcp::v4(), m_server_port));
+    m_server_port = m_acceptor->local_endpoint().port();
+  }
+
   std::string get_local_url(UrlScheme url_scheme) {
     std::stringstream sstream;
     switch (url_scheme) {
@@ -63,7 +98,7 @@ public:
       break;
     }
 
-    sstream << ":" << m_server_port << "/";
+    sstream << ":" << m_server_port << "/target";
     return sstream.str();
   }
 
@@ -83,6 +118,32 @@ public:
       });
   }
 
+  template <typename Body>
+  void client_read_request(boost::asio::ip::tcp::socket& socket,
+                           boost::beast::http::request<Body>& expected_req) {
+    boost::beast::http::request<Body> req;
+    boost::beast::error_code ec;
+    boost::beast::http::read(socket, m_buffer, req, ec);
+    ASSERT_FALSE(ec);
+
+    expected_req.target("/target");
+    ASSERT_EQ(expected_req, req);
+  }
+
+  void client_write_response(boost::asio::ip::tcp::socket& socket,
+                             HttpResponse& expected_res) {
+    expected_res.result(boost::beast::http::status::ok);
+    expected_res.set(boost::beast::http::field::server,
+                     BOOST_BEAST_VERSION_STRING);
+    expected_res.set(boost::beast::http::field::content_type, "text/plain");
+    expected_res.content_length(expected_res.body().size());
+    expected_res.prepare_payload();
+
+    boost::beast::error_code ec;
+    boost::beast::http::write(socket, expected_res, ec);
+    ASSERT_FALSE(ec);
+  }
+
   template <typename Stream>
   void client_ssl_handshake(Stream& stream, Context* on_handshake) {
     stream.async_handshake(
@@ -179,6 +240,7 @@ public:
   librbd::ImageCtx *m_image_ctx;
 
   std::optional<boost::asio::ip::tcp::acceptor> m_acceptor;
+  boost::beast::flat_buffer m_buffer;
   uint64_t m_server_port = 0;
 };
 
@@ -290,5 +352,391 @@ TEST_F(TestMockMigrationHttpClient, OpenConnectFail) {
   ASSERT_EQ(-ECONNREFUSED, ctx1.wait());
 }
 
+TEST_F(TestMockMigrationHttpClient, IssueHead) {
+  boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+  C_SaferCond on_connect_ctx;
+  client_accept(&socket, false, &on_connect_ctx);
+
+  MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+  MockHttpClient http_client(&mock_test_image_ctx,
+                             get_local_url(URL_SCHEME_HTTP));
+
+  C_SaferCond ctx1;
+  http_client.open(&ctx1);
+  ASSERT_EQ(0, on_connect_ctx.wait());
+  ASSERT_EQ(0, ctx1.wait());
+
+  EmptyHttpRequest req;
+  req.method(boost::beast::http::verb::head);
+
+  C_SaferCond ctx2;
+  HttpResponse res;
+  http_client.issue(EmptyHttpRequest{req},
+    [&ctx2, &res](int r, HttpResponse&& response) mutable {
+      res = std::move(response);
+      ctx2.complete(r);
+    });
+
+  HttpResponse expected_res;
+  client_read_request(socket, req);
+  client_write_response(socket, expected_res);
+
+  ASSERT_EQ(0, ctx2.wait());
+  ASSERT_EQ(expected_res, res);
+
+  C_SaferCond ctx3;
+  http_client.close(&ctx3);
+  ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, IssueGet) {
+  boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+  C_SaferCond on_connect_ctx;
+  client_accept(&socket, false, &on_connect_ctx);
+
+  MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+  MockHttpClient http_client(&mock_test_image_ctx,
+                             get_local_url(URL_SCHEME_HTTP));
+
+  C_SaferCond ctx1;
+  http_client.open(&ctx1);
+  ASSERT_EQ(0, on_connect_ctx.wait());
+  ASSERT_EQ(0, ctx1.wait());
+
+  HttpRequest req;
+  req.method(boost::beast::http::verb::get);
+
+  C_SaferCond ctx2;
+  HttpResponse res;
+  http_client.issue(HttpRequest{req},
+    [&ctx2, &res](int r, HttpResponse&& response) mutable {
+      res = std::move(response);
+      ctx2.complete(r);
+    });
+
+  HttpResponse expected_res;
+  expected_res.body() = "test";
+  client_read_request(socket, req);
+  client_write_response(socket, expected_res);
+
+  ASSERT_EQ(0, ctx2.wait());
+  ASSERT_EQ(expected_res, res);
+
+  C_SaferCond ctx3;
+  http_client.close(&ctx3);
+  ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, IssueSendFailed) {
+  boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+  C_SaferCond on_connect_ctx1;
+  client_accept(&socket, false, &on_connect_ctx1);
+
+  MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+  MockHttpClient http_client(&mock_test_image_ctx,
+                             get_local_url(URL_SCHEME_HTTP));
+
+  C_SaferCond ctx1;
+  http_client.open(&ctx1);
+  ASSERT_EQ(0, on_connect_ctx1.wait());
+  ASSERT_EQ(0, ctx1.wait());
+
+  // close connection to client
+  boost::system::error_code ec;
+  socket.close(ec);
+
+  C_SaferCond on_connect_ctx2;
+  client_accept(&socket, false, &on_connect_ctx2);
+
+  // send request via closed connection
+  HttpRequest req;
+  req.method(boost::beast::http::verb::get);
+
+  C_SaferCond ctx2;
+  http_client.issue(HttpRequest{req},
+    [&ctx2](int r, HttpResponse&&) mutable {
+      ctx2.complete(r);
+    });
+
+  // connection will be reset and request retried
+  ASSERT_EQ(0, on_connect_ctx2.wait());
+  HttpResponse expected_res;
+  expected_res.body() = "test";
+  client_read_request(socket, req);
+  client_write_response(socket, expected_res);
+  ASSERT_EQ(0, ctx2.wait());
+
+  C_SaferCond ctx3;
+  http_client.close(&ctx3);
+  ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, IssueReceiveFailed) {
+  boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+  C_SaferCond on_connect_ctx1;
+  client_accept(&socket, false, &on_connect_ctx1);
+
+  MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+  MockHttpClient http_client(&mock_test_image_ctx,
+                             get_local_url(URL_SCHEME_HTTP));
+
+  C_SaferCond ctx1;
+  http_client.open(&ctx1);
+  ASSERT_EQ(0, on_connect_ctx1.wait());
+  ASSERT_EQ(0, ctx1.wait());
+
+  // send request via closed connection
+  HttpRequest req;
+  req.method(boost::beast::http::verb::get);
+
+  C_SaferCond ctx2;
+  http_client.issue(HttpRequest{req},
+    [&ctx2](int r, HttpResponse&&) mutable {
+      ctx2.complete(r);
+    });
+
+  C_SaferCond on_connect_ctx2;
+  client_accept(&socket, false, &on_connect_ctx2);
+
+  // close connection to client after reading request
+  client_read_request(socket, req);
+  boost::system::error_code ec;
+  socket.close(ec);
+
+  // connection will be reset and request retried
+  ASSERT_EQ(0, on_connect_ctx2.wait());
+  HttpResponse expected_res;
+  expected_res.body() = "test";
+  client_read_request(socket, req);
+  client_write_response(socket, expected_res);
+  ASSERT_EQ(0, ctx2.wait());
+
+  C_SaferCond ctx3;
+  http_client.close(&ctx3);
+  ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, IssueResetFailed) {
+  boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+  C_SaferCond on_connect_ctx1;
+  client_accept(&socket, false, &on_connect_ctx1);
+
+  MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+  MockHttpClient http_client(&mock_test_image_ctx,
+                             get_local_url(URL_SCHEME_HTTP));
+
+  C_SaferCond ctx1;
+  http_client.open(&ctx1);
+  ASSERT_EQ(0, on_connect_ctx1.wait());
+  ASSERT_EQ(0, ctx1.wait());
+
+  // send requests then close connection
+  HttpRequest req;
+  req.method(boost::beast::http::verb::get);
+
+  C_SaferCond ctx2;
+  http_client.issue(HttpRequest{req},
+    [&ctx2](int r, HttpResponse&&) mutable {
+      ctx2.complete(r);
+    });
+
+  C_SaferCond ctx3;
+  http_client.issue(HttpRequest{req},
+    [&ctx3](int r, HttpResponse&&) mutable {
+      ctx3.complete(r);
+    });
+
+  client_read_request(socket, req);
+  client_read_request(socket, req);
+
+  // close connection to client and verify requests are failed
+  m_acceptor.reset();
+  boost::system::error_code ec;
+  socket.close(ec);
+
+  ASSERT_EQ(-ECONNREFUSED, ctx2.wait());
+  ASSERT_EQ(-ECONNREFUSED, ctx3.wait());
+
+  // additional request will retry the failed connection
+  create_acceptor();
+
+  C_SaferCond on_connect_ctx2;
+  client_accept(&socket, false, &on_connect_ctx2);
+
+  C_SaferCond ctx4;
+  http_client.issue(HttpRequest{req},
+    [&ctx4](int r, HttpResponse&&) mutable {
+      ctx4.complete(r);
+    });
+
+  ASSERT_EQ(0, on_connect_ctx2.wait());
+  client_read_request(socket, req);
+
+  HttpResponse expected_res;
+  expected_res.body() = "test";
+  client_write_response(socket, expected_res);
+  ASSERT_EQ(0, ctx4.wait());
+
+  C_SaferCond ctx5;
+  http_client.close(&ctx5);
+  ASSERT_EQ(0, ctx5.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, IssuePipelined) {
+  boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+  C_SaferCond on_connect_ctx;
+  client_accept(&socket, false, &on_connect_ctx);
+
+  MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+  MockHttpClient http_client(&mock_test_image_ctx,
+                             get_local_url(URL_SCHEME_HTTP));
+
+  C_SaferCond ctx1;
+  http_client.open(&ctx1);
+  ASSERT_EQ(0, on_connect_ctx.wait());
+  ASSERT_EQ(0, ctx1.wait());
+
+  // issue two pipelined (concurrent) get requests
+  HttpRequest req1;
+  req1.method(boost::beast::http::verb::get);
+
+  C_SaferCond ctx2;
+  HttpResponse res1;
+  http_client.issue(HttpRequest{req1},
+    [&ctx2, &res1](int r, HttpResponse&& response) mutable {
+      res1 = std::move(response);
+      ctx2.complete(r);
+    });
+
+  HttpRequest req2;
+  req2.method(boost::beast::http::verb::get);
+
+  C_SaferCond ctx3;
+  HttpResponse res2;
+  http_client.issue(HttpRequest{req2},
+    [&ctx3, &res2](int r, HttpResponse&& response) mutable {
+      res2 = std::move(response);
+      ctx3.complete(r);
+    });
+
+  client_read_request(socket, req1);
+  client_read_request(socket, req2);
+
+  // read the responses sequentially
+  HttpResponse expected_res1;
+  expected_res1.body() = "test";
+  client_write_response(socket, expected_res1);
+  ASSERT_EQ(0, ctx2.wait());
+  ASSERT_EQ(expected_res1, res1);
+
+  HttpResponse expected_res2;
+  expected_res2.body() = "test";
+  client_write_response(socket, expected_res2);
+  ASSERT_EQ(0, ctx3.wait());
+  ASSERT_EQ(expected_res2, res2);
+
+  C_SaferCond ctx4;
+  http_client.close(&ctx4);
+  ASSERT_EQ(0, ctx4.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, IssuePipelinedRestart) {
+  boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+  C_SaferCond on_connect_ctx1;
+  client_accept(&socket, false, &on_connect_ctx1);
+
+  MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+  MockHttpClient http_client(&mock_test_image_ctx,
+                             get_local_url(URL_SCHEME_HTTP));
+
+  C_SaferCond ctx1;
+  http_client.open(&ctx1);
+  ASSERT_EQ(0, on_connect_ctx1.wait());
+  ASSERT_EQ(0, ctx1.wait());
+
+  // issue two pipelined (concurrent) get requests
+  HttpRequest req1;
+  req1.keep_alive(false);
+  req1.method(boost::beast::http::verb::get);
+
+  C_SaferCond on_connect_ctx2;
+  client_accept(&socket, false, &on_connect_ctx2);
+
+  C_SaferCond ctx2;
+  HttpResponse res1;
+  http_client.issue(HttpRequest{req1},
+    [&ctx2, &res1](int r, HttpResponse&& response) mutable {
+      res1 = std::move(response);
+      ctx2.complete(r);
+    });
+
+  HttpRequest req2;
+  req2.method(boost::beast::http::verb::get);
+
+  C_SaferCond ctx3;
+  HttpResponse res2;
+  http_client.issue(HttpRequest{req2},
+    [&ctx3, &res2](int r, HttpResponse&& response) mutable {
+      res2 = std::move(response);
+      ctx3.complete(r);
+    });
+
+  client_read_request(socket, req1);
+  client_read_request(socket, req2);
+
+  // read the responses sequentially
+  HttpResponse expected_res1;
+  expected_res1.body() = "test";
+  expected_res1.keep_alive(false);
+  client_write_response(socket, expected_res1);
+  ASSERT_EQ(0, ctx2.wait());
+  ASSERT_EQ(expected_res1, res1);
+
+  // second request will need to be re-sent due to 'need_eof' condition
+  ASSERT_EQ(0, on_connect_ctx2.wait());
+  client_read_request(socket, req2);
+
+  HttpResponse expected_res2;
+  expected_res2.body() = "test";
+  client_write_response(socket, expected_res2);
+  ASSERT_EQ(0, ctx3.wait());
+  ASSERT_EQ(expected_res2, res2);
+
+  C_SaferCond ctx4;
+  http_client.close(&ctx4);
+  ASSERT_EQ(0, ctx4.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, ShutdownInFlight) {
+  boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+  C_SaferCond on_connect_ctx;
+  client_accept(&socket, false, &on_connect_ctx);
+
+  MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+  MockHttpClient http_client(&mock_test_image_ctx,
+                             get_local_url(URL_SCHEME_HTTP));
+
+  C_SaferCond ctx1;
+  http_client.open(&ctx1);
+  ASSERT_EQ(0, on_connect_ctx.wait());
+  ASSERT_EQ(0, ctx1.wait());
+
+  HttpRequest req;
+  req.method(boost::beast::http::verb::get);
+
+  C_SaferCond ctx2;
+  http_client.issue(HttpRequest{req},
+    [&ctx2](int r, HttpResponse&&) mutable {
+      ctx2.complete(r);
+    });
+
+  client_read_request(socket, req);
+
+  C_SaferCond ctx3;
+  http_client.close(&ctx3);
+  ASSERT_EQ(0, ctx3.wait());
+  ASSERT_EQ(-ESHUTDOWN, ctx2.wait());
+}
+
 } // namespace migration
 } // namespace librbd