#include <boost/asio/read.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast/core.hpp>
-#include <boost/beast/ssl/ssl_stream.hpp>
+#include <boost/beast/http/read.hpp>
+#include <deque>
namespace librbd {
namespace migration {
-template <typename I>
-struct HttpClient<I>::HttpSessionInterface {
- virtual ~HttpSessionInterface() {}
-
- virtual void init(Context* on_finish) = 0;
- virtual void shut_down(Context* on_finish) = 0;
-};
-
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#define dout_prefix *_dout << "librbd::migration::HttpClient::" \
<< "HttpSession " << this << " " << __func__ \
<< ": "
+/**
+ * boost::beast utilizes non-inheriting template classes for handling plain vs
+ * encrypted TCP streams. Utilize a base-class for handling the majority of the
+ * logic for handling connecting, disconnecting, reseting, and sending requests.
+ */
+
template <typename I>
template <typename D>
class HttpClient<I>::HttpSession : public HttpSessionInterface {
public:
void init(Context* on_finish) override {
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ ceph_assert(m_state == STATE_UNINITIALIZED);
+ m_state = STATE_CONNECTING;
+
resolve_host(on_finish);
}
void shut_down(Context* on_finish) override {
- disconnect(new LambdaContext([this, on_finish](int r) {
- handle_shut_down(r, on_finish); }));
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ ceph_assert(on_finish != nullptr);
+ ceph_assert(m_on_shutdown == nullptr);
+ m_on_shutdown = on_finish;
+
+ auto current_state = m_state;
+ if (current_state == STATE_UNINITIALIZED) {
+ // never initialized or resolve/connect failed
+ on_finish->complete(0);
+ return;
+ }
+
+ m_state = STATE_SHUTTING_DOWN;
+ if (current_state != STATE_READY) {
+ // delay shutdown until current state transition completes
+ return;
+ }
+
+ disconnect(new LambdaContext([this](int r) { handle_shut_down(r); }));
+ }
+
+ void issue(std::shared_ptr<Work>&& work) override {
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 20) << "work=" << work.get() << dendl;
+
+ if (is_shutdown()) {
+ lderr(cct) << "cannot issue HTTP request, client is shutdown"
+ << dendl;
+ work->complete(-ESHUTDOWN, {});
+ return;
+ }
+
+ bool first_issue = m_issue_queue.empty();
+ m_issue_queue.emplace_back(work);
+ if (m_state == STATE_READY && first_issue) {
+ ldout(cct, 20) << "sending http request: work=" << work.get() << dendl;
+ finalize_issue(std::move(work));
+ } else if (m_state == STATE_UNINITIALIZED) {
+ ldout(cct, 20) << "resetting HTTP session: work=" << work.get() << dendl;
+ m_state = STATE_RESET_CONNECTING;
+ resolve_host(nullptr);
+ } else {
+ ldout(cct, 20) << "queueing HTTP request: work=" << work.get() << dendl;
+ }
+ }
+
+ void finalize_issue(std::shared_ptr<Work>&& work) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 20) << "work=" << work.get() << dendl;
+
+ ++m_in_flight_requests;
+ (*work)(this, derived().stream());
+ }
+
+ void handle_issue(boost::system::error_code ec,
+ std::shared_ptr<Work>&& work) override {
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 20) << "work=" << work.get() << ", r=" << -ec.value() << dendl;
+
+ ceph_assert(m_in_flight_requests > 0);
+ --m_in_flight_requests;
+ if (maybe_finalize_reset()) {
+ // previous request is attempting reset to this request will be resent
+ return;
+ }
+
+ ceph_assert(!m_issue_queue.empty());
+ m_issue_queue.pop_front();
+
+ if (is_shutdown()) {
+ lderr(cct) << "client shutdown during in-flight request" << dendl;
+ work->complete(-ESHUTDOWN, {});
+
+ maybe_finalize_shutdown();
+ return;
+ }
+
+ if (ec) {
+ if (ec == boost::asio::error::bad_descriptor ||
+ ec == boost::asio::error::broken_pipe ||
+ ec == boost::asio::error::connection_reset ||
+ ec == boost::asio::error::operation_aborted ||
+ ec == boost::asio::ssl::error::stream_truncated ||
+ ec == boost::beast::http::error::end_of_stream ||
+ ec == boost::beast::http::error::partial_message) {
+ ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl;
+ m_issue_queue.push_front(work);
+ } else if (ec == boost::beast::error::timeout) {
+ lderr(cct) << "timed-out while issuing request" << dendl;
+ work->complete(-ETIMEDOUT, {});
+ } else {
+ lderr(cct) << "failed to issue request: " << ec.message() << dendl;
+ work->complete(-ec.value(), {});
+ }
+
+ // attempt to recover the connection
+ reset();
+ return;
+ }
+
+ bool first_receive = m_receive_queue.empty();
+ m_receive_queue.push_back(work);
+ if (first_receive) {
+ receive(std::move(work));
+ }
+
+ // TODO disable pipelining for non-idempotent requests
+
+ // pipeline the next request into the stream
+ if (!m_issue_queue.empty()) {
+ work = m_issue_queue.front();
+ ldout(cct, 20) << "sending http request: work=" << work.get() << dendl;
+ finalize_issue(std::move(work));
+ }
}
protected:
Context* on_finish) = 0;
virtual void disconnect(Context* on_finish) = 0;
- void close() {
+ void close_socket() {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
boost::system::error_code ec;
boost::beast::get_lowest_layer(derived().stream()).socket().close(ec);
}
private:
+ enum State {
+ STATE_UNINITIALIZED,
+ STATE_CONNECTING,
+ STATE_READY,
+ STATE_RESET_PENDING,
+ STATE_RESET_DISCONNECTING,
+ STATE_RESET_CONNECTING,
+ STATE_SHUTTING_DOWN,
+ STATE_SHUTDOWN,
+ };
+
+ State m_state = STATE_UNINITIALIZED;
boost::asio::ip::tcp::resolver m_resolver;
+ Context* m_on_shutdown = nullptr;
+
+ uint64_t m_in_flight_requests = 0;
+ std::deque<std::shared_ptr<Work>> m_issue_queue;
+ std::deque<std::shared_ptr<Work>> m_receive_queue;
+
+ boost::beast::flat_buffer m_buffer;
+ std::optional<boost::beast::http::parser<
+ false, boost::beast::http::empty_body>> m_header_parser;
+ std::optional<boost::beast::http::parser<false, StringBody>> m_parser;
+
D& derived() {
return static_cast<D&>(*this);
}
auto cct = m_http_client->m_cct;
ldout(cct, 15) << dendl;
+ shutdown_socket();
m_resolver.async_resolve(
m_http_client->m_url_spec.host, m_http_client->m_url_spec.port,
- asio::util::get_callback_adapter(
- [this, on_finish](int r, auto results) {
- handle_resolve_host(r, results, on_finish); }));
+ [this, on_finish](boost::system::error_code ec, auto results) {
+ handle_resolve_host(ec, results, on_finish); });
}
void handle_resolve_host(
- int r, boost::asio::ip::tcp::resolver::results_type results,
+ boost::system::error_code ec,
+ boost::asio::ip::tcp::resolver::results_type results,
Context* on_finish) {
auto cct = m_http_client->m_cct;
+ int r = -ec.value();
ldout(cct, 15) << "r=" << r << dendl;
- if (r < 0) {
- if (r == -boost::asio::error::host_not_found) {
+ if (ec) {
+ if (ec == boost::asio::error::host_not_found) {
r = -ENOENT;
+ } else if (ec == boost::asio::error::host_not_found_try_again) {
+ // TODO: add retry throttle
+ r = -EAGAIN;
}
lderr(cct) << "failed to resolve host '"
<< m_http_client->m_url_spec.host << "': "
<< cpp_strerror(r) << dendl;
- on_finish->complete(r);
+ advance_state(STATE_UNINITIALIZED, r, on_finish);
return;
}
lderr(cct) << "failed to connect to host '"
<< m_http_client->m_url_spec.host << "': "
<< cpp_strerror(r) << dendl;
- on_finish->complete(r);
+ advance_state(STATE_UNINITIALIZED, r, on_finish);
return;
}
- on_finish->complete(0);
+ advance_state(STATE_READY, 0, on_finish);
}
- void handle_shut_down(int r, Context* on_finish) {
+ void handle_shut_down(int r) {
auto cct = m_http_client->m_cct;
ldout(cct, 15) << "r=" << r << dendl;
if (r < 0) {
- lderr(cct) << "failed to close stream: '" << cpp_strerror(r) << dendl;
- on_finish->complete(r);
+ lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r)
+ << dendl;
+ }
+
+ // cancel all in-flight send/receives (if any)
+ shutdown_socket();
+
+ maybe_finalize_shutdown();
+ }
+
+ void maybe_finalize_shutdown() {
+ if (m_in_flight_requests > 0) {
return;
}
- on_finish->complete(0);
+ // cancel any queued IOs
+ fail_queued_work(-ESHUTDOWN);
+
+ advance_state(STATE_SHUTDOWN, 0, nullptr);
+ }
+
+ bool is_shutdown() const {
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+ return (m_state == STATE_SHUTTING_DOWN || m_state == STATE_SHUTDOWN);
+ }
+
+ void reset() {
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+ ceph_assert(m_state == STATE_READY);
+
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ m_state = STATE_RESET_PENDING;
+ maybe_finalize_reset();
+ }
+
+ bool maybe_finalize_reset() {
+ if (m_state != STATE_RESET_PENDING) {
+ return false;
+ }
+
+ if (m_in_flight_requests > 0) {
+ return true;
+ }
+
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ m_buffer.clear();
+
+ // move in-flight request back to the front of the issue queue
+ m_issue_queue.insert(m_issue_queue.begin(),
+ m_receive_queue.begin(), m_receive_queue.end());
+ m_receive_queue.clear();
+
+ m_state = STATE_RESET_DISCONNECTING;
+ disconnect(new LambdaContext([this](int r) { handle_reset(r); }));
+ return true;
+ }
+
+ void handle_reset(int r) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r)
+ << dendl;
+ }
+
+ advance_state(STATE_RESET_CONNECTING, r, nullptr);
+ }
+
+ int shutdown_socket() {
+ if (!boost::beast::get_lowest_layer(
+ derived().stream()).socket().is_open()) {
+ return 0;
+ }
+
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ boost::system::error_code ec;
+ boost::beast::get_lowest_layer(derived().stream()).socket().shutdown(
+ boost::asio::ip::tcp::socket::shutdown_both, ec);
+
+ if (ec && ec != boost::beast::errc::not_connected) {
+ lderr(cct) << "failed to shutdown socket: " << ec.message() << dendl;
+ return -ec.value();
+ }
+
+ close_socket();
+ return 0;
+ }
+
+ void receive(std::shared_ptr<Work>&& work) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << "work=" << work.get() << dendl;
+
+ ceph_assert(!m_receive_queue.empty());
+ ++m_in_flight_requests;
+
+ // receive the response for this request
+ m_parser.emplace();
+ if (work->header_only()) {
+ // HEAD requests don't trasfer data but the parser still cares about max
+ // content-length
+ m_header_parser.emplace();
+ m_header_parser->body_limit(std::numeric_limits<uint64_t>::max());
+
+ boost::beast::http::async_read_header(
+ derived().stream(), m_buffer, *m_header_parser,
+ [this, work=std::move(work)]
+ (boost::beast::error_code ec, std::size_t) mutable {
+ handle_receive(ec, std::move(work));
+ });
+ } else {
+ m_parser->body_limit(1 << 25); // max RBD object size
+ boost::beast::http::async_read(
+ derived().stream(), m_buffer, *m_parser,
+ [this, work=std::move(work)]
+ (boost::beast::error_code ec, std::size_t) mutable {
+ handle_receive(ec, std::move(work));
+ });
+ }
+ }
+
+ void handle_receive(boost::system::error_code ec,
+ std::shared_ptr<Work>&& work) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << "work=" << work.get() << ", r=" << -ec.value() << dendl;
+
+ ceph_assert(m_in_flight_requests > 0);
+ --m_in_flight_requests;
+ if (maybe_finalize_reset()) {
+ // previous request is attempting reset to this request will be resent
+ return;
+ }
+
+ ceph_assert(!m_receive_queue.empty());
+ m_receive_queue.pop_front();
+
+ if (is_shutdown()) {
+ lderr(cct) << "client shutdown with in-flight request" << dendl;
+ work->complete(-ESHUTDOWN, {});
+
+ maybe_finalize_shutdown();
+ return;
+ }
+
+ if (ec) {
+ if (ec == boost::asio::error::bad_descriptor ||
+ ec == boost::asio::error::broken_pipe ||
+ ec == boost::asio::error::connection_reset ||
+ ec == boost::asio::error::operation_aborted ||
+ ec == boost::asio::ssl::error::stream_truncated ||
+ ec == boost::beast::http::error::end_of_stream ||
+ ec == boost::beast::http::error::partial_message) {
+ ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl;
+ m_receive_queue.push_front(work);
+ } else if (ec == boost::beast::error::timeout) {
+ lderr(cct) << "timed-out while issuing request" << dendl;
+ work->complete(-ETIMEDOUT, {});
+ } else {
+ lderr(cct) << "failed to issue request: " << ec.message() << dendl;
+ work->complete(-ec.value(), {});
+ }
+
+ reset();
+ return;
+ }
+
+ boost::beast::http::response<StringBody> response;
+ if (work->header_only()) {
+ m_parser.emplace(std::move(*m_header_parser));
+ }
+ response = std::move(m_parser->release());
+
+ bool need_eof = response.need_eof();
+ work->complete(0, std::move(response));
+
+ if (need_eof) {
+ ldout(cct, 20) << "reset required for non-pipelined response: "
+ << "work=" << work.get() << dendl;
+ reset();
+ } else if (!m_receive_queue.empty()) {
+ auto work = m_receive_queue.front();
+ receive(std::move(work));
+ }
+ }
+
+ void advance_state(State next_state, int r, Context* on_finish) {
+ auto cct = m_http_client->m_cct;
+ auto current_state = m_state;
+ ldout(cct, 15) << "current_state=" << current_state << ", "
+ << "next_state=" << next_state << ", "
+ << "r=" << r << dendl;
+
+ m_state = next_state;
+ if (current_state == STATE_CONNECTING) {
+ if (next_state == STATE_UNINITIALIZED) {
+ shutdown_socket();
+ on_finish->complete(r);
+ return;
+ } else if (next_state == STATE_READY) {
+ on_finish->complete(r);
+ return;
+ }
+ } else if (current_state == STATE_SHUTTING_DOWN) {
+ if (next_state == STATE_READY) {
+ // shut down requested while connecting/resetting
+ disconnect(new LambdaContext([this](int r) { handle_shut_down(r); }));
+ return;
+ } else if (next_state == STATE_UNINITIALIZED ||
+ next_state == STATE_SHUTDOWN ||
+ next_state == STATE_RESET_CONNECTING) {
+ ceph_assert(m_on_shutdown != nullptr);
+ m_on_shutdown->complete(r);
+ return;
+ }
+ } else if (current_state == STATE_RESET_DISCONNECTING) {
+ // disconnected from peer -- ignore errors and reconnect
+ ceph_assert(next_state == STATE_RESET_CONNECTING);
+ ceph_assert(on_finish == nullptr);
+ shutdown_socket();
+ resolve_host(nullptr);
+ return;
+ } else if (current_state == STATE_RESET_CONNECTING) {
+ ceph_assert(on_finish == nullptr);
+ if (next_state == STATE_READY) {
+ // restart queued IO
+ if (!m_issue_queue.empty()) {
+ auto& work = m_issue_queue.front();
+ finalize_issue(std::move(work));
+ }
+ return;
+ } else if (next_state == STATE_UNINITIALIZED) {
+ shutdown_socket();
+
+ // fail all queued IO
+ fail_queued_work(r);
+ return;
+ }
+ }
+
+ lderr(cct) << "unexpected state transition: "
+ << "current_state=" << current_state << ", "
+ << "next_state=" << next_state << dendl;
+ ceph_assert(false);
+ }
+
+ void complete_work(std::shared_ptr<Work> work, int r,
+ boost::beast::http::response<StringBody>&& response) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 20) << "work=" << work.get() << ", r=" << r << dendl;
+
+ work->complete(r, std::move(response));
+ }
+
+ void fail_queued_work(int r) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ for (auto& work : m_issue_queue) {
+ complete_work(work, r, {});
+ }
+ m_issue_queue.clear();
+ ceph_assert(m_receive_queue.empty());
}
};
m_stream(http_client->m_strand) {
}
~PlainHttpSession() override {
- this->close();
+ this->close_socket();
}
-
inline boost::beast::tcp_stream&
stream() {
return m_stream;
}
void disconnect(Context* on_finish) override {
- auto http_client = this->m_http_client;
- auto cct = http_client->m_cct;
- ldout(cct, 15) << dendl;
-
- boost::system::error_code ec;
- m_stream.socket().shutdown(
- boost::asio::ip::tcp::socket::shutdown_both, ec);
-
- on_finish->complete(-ec.value());
+ on_finish->complete(0);
}
private:
m_stream(http_client->m_strand, http_client->m_ssl_context) {
}
~SslHttpSession() override {
- this->close();
+ this->close_socket();
}
inline boost::beast::ssl_stream<boost::beast::tcp_stream>&
auto cct = http_client->m_cct;
ldout(cct, 15) << dendl;
- if (m_ssl_enabled) {
- m_stream.async_shutdown(
- asio::util::get_callback_adapter([this, on_finish](int r) {
- shutdown(r, on_finish); }));
- } else {
- shutdown(0, on_finish);
+ if (!m_ssl_enabled) {
+ on_finish->complete(0);
+ return;
}
+
+ m_stream.async_shutdown(
+ asio::util::get_callback_adapter([this, on_finish](int r) {
+ shutdown(r, on_finish); }));
}
private:
auto cct = http_client->m_cct;
ldout(cct, 15) << "r=" << r << dendl;
- boost::system::error_code ec;
- boost::beast::get_lowest_layer(m_stream).socket().shutdown(
- boost::asio::ip::tcp::socket::shutdown_both, ec);
-
- on_finish->complete(-ec.value());
+ on_finish->complete(r);
}
};
return;
}
- // initial bootstrap connection -- later IOs might rebuild the session
- create_http_session(on_finish);
+ boost::asio::post(m_strand, [this, on_finish]() mutable {
+ create_http_session(on_finish); });
}
template <typename I>
void HttpClient<I>::close(Context* on_finish) {
- // execute within the strand to ensure all IO completes
- boost::asio::post(
- m_strand, [this, on_finish]() {
- if (m_http_session == nullptr) {
- on_finish->complete(0);
- return;
- }
+ boost::asio::post(m_strand, [this, on_finish]() mutable {
+ shut_down_http_session(on_finish); });
+}
- m_http_session->shut_down(on_finish);
- });
+template <typename I>
+void HttpClient<I>::issue(std::shared_ptr<Work>&& work) {
+ boost::asio::post(m_strand, [this, work=std::move(work)]() mutable {
+ m_http_session->issue(std::move(work)); });
}
template <typename I>
m_http_session->init(on_finish);
}
+template <typename I>
+void HttpClient<I>::shut_down_http_session(Context* on_finish) {
+ ldout(m_cct, 15) << dendl;
+
+ if (m_http_session == nullptr) {
+ on_finish->complete(0);
+ return;
+ }
+
+ m_http_session->shut_down(on_finish);
+}
+
} // namespace migration
} // namespace librbd
#include <boost/asio/io_context_strand.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/context.hpp>
+#include <boost/beast/version.hpp>
+#include <boost/beast/core/tcp_stream.hpp>
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <boost/beast/http/write.hpp>
+#include <boost/beast/ssl/ssl_stream.hpp>
#include <memory>
#include <string>
#include <utility>
m_ignore_self_signed_cert = ignore;
}
+ using StringBody = boost::beast::http::string_body;
+
+ template <class Body, typename Completion>
+ void issue(boost::beast::http::request<Body>&& request,
+ Completion&& completion) {
+ struct WorkImpl : Work {
+ boost::beast::http::request<Body> request;
+ Completion completion;
+
+ WorkImpl(boost::beast::http::request<Body>&& request,
+ Completion&& completion)
+ : request(std::move(request)), completion(std::move(completion)) {
+ }
+ WorkImpl(const WorkImpl&) = delete;
+ WorkImpl& operator=(const WorkImpl&) = delete;
+
+ bool need_eof() const override {
+ return request.need_eof();
+ }
+
+ bool header_only() const override {
+ return (request.method() == boost::beast::http::verb::head);
+ }
+
+ void complete(
+ int r, boost::beast::http::response<StringBody>&& response) override {
+ completion(r, std::move(response));
+ }
+
+ void operator()(
+ HttpSessionInterface* http_session,
+ boost::beast::tcp_stream& stream) override {
+ boost::beast::http::async_write(
+ stream, request,
+ [http_session, work=this->shared_from_this()]
+ (boost::beast::error_code ec, std::size_t) mutable {
+ http_session->handle_issue(ec, std::move(work));
+ });
+ }
+
+ void operator()(
+ HttpSessionInterface* http_session,
+ boost::beast::ssl_stream<boost::beast::tcp_stream>& stream) override {
+ boost::beast::http::async_write(
+ stream, request,
+ [http_session, work=this->shared_from_this()]
+ (boost::beast::error_code ec, std::size_t) mutable {
+ http_session->handle_issue(ec, std::move(work));
+ });
+ }
+ };
+
+ initialize_default_fields(request);
+ issue(std::make_shared<WorkImpl>(std::move(request),
+ std::move(completion)));
+ }
+
private:
+ struct Work;
+ struct HttpSessionInterface {
+ virtual ~HttpSessionInterface() {}
+
+ virtual void init(Context* on_finish) = 0;
+ virtual void shut_down(Context* on_finish) = 0;
+
+ virtual void issue(std::shared_ptr<Work>&& work) = 0;
+ virtual void handle_issue(boost::system::error_code ec,
+ std::shared_ptr<Work>&& work) = 0;
+ };
+
+ struct Work : public std::enable_shared_from_this<Work> {
+ virtual ~Work() {}
+ virtual bool need_eof() const = 0;
+ virtual bool header_only() const = 0;
+ virtual void complete(
+ int r, boost::beast::http::response<StringBody>&&) = 0;
+ virtual void operator()(
+ HttpSessionInterface* http_session,
+ boost::beast::tcp_stream& stream) = 0;
+ virtual void operator()(
+ HttpSessionInterface* http_session,
+ boost::beast::ssl_stream<boost::beast::tcp_stream>& stream) = 0;
+ };
+
struct HttpSessionInterface;
template <typename D> struct HttpSession;
struct PlainHttpSession;
boost::asio::ssl::context m_ssl_context;
std::unique_ptr<HttpSessionInterface> m_http_session;
+ template <typename Fields>
+ void initialize_default_fields(Fields& fields) const {
+ fields.target(m_url_spec.path);
+ fields.set(boost::beast::http::field::host, m_url_spec.host);
+ fields.set(boost::beast::http::field::user_agent,
+ BOOST_BEAST_VERSION_STRING);
+ }
+
+ void issue(std::shared_ptr<Work>&& work);
+
void create_http_session(Context* on_finish);
+ void shut_down_http_session(Context* on_finish);
};
} // namespace migration
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include <boost/asio/ip/tcp.hpp>
+#include <boost/beast/core.hpp>
+#include <boost/beast/http.hpp>
namespace librbd {
namespace {
#include "librbd/migration/HttpClient.cc"
+using EmptyHttpRequest = boost::beast::http::request<
+ boost::beast::http::empty_body>;
+using HttpRequest = boost::beast::http::request<
+ boost::beast::http::string_body>;
+using HttpResponse = boost::beast::http::response<
+ boost::beast::http::string_body>;
+
+namespace boost {
+namespace beast {
+namespace http {
+
+template <typename Body>
+bool operator==(const boost::beast::http::request<Body>& lhs,
+ const boost::beast::http::request<Body>& rhs) {
+ return (lhs.method() == rhs.method() &&
+ lhs.target() == rhs.target());
+}
+
+template <typename Body>
+bool operator==(const boost::beast::http::response<Body>& lhs,
+ const boost::beast::http::response<Body>& rhs) {
+ return (lhs.result() == rhs.result() &&
+ lhs.body() == rhs.body());
+}
+
+} // namespace http
+} // namespace beast
+} // namespace boost
+
namespace librbd {
namespace migration {
ASSERT_EQ(0, open_image(m_image_name, &m_image_ctx));
- m_acceptor.emplace(*m_image_ctx->asio_engine,
- boost::asio::ip::tcp::endpoint(
- boost::asio::ip::tcp::v4(), 0));
- m_server_port = m_acceptor->local_endpoint().port();
+ create_acceptor();
}
void TearDown() override {
TestMockFixture::TearDown();
}
+ void create_acceptor() {
+ m_acceptor.emplace(*m_image_ctx->asio_engine,
+ boost::asio::ip::tcp::endpoint(
+ boost::asio::ip::tcp::v4(), m_server_port));
+ m_server_port = m_acceptor->local_endpoint().port();
+ }
+
std::string get_local_url(UrlScheme url_scheme) {
std::stringstream sstream;
switch (url_scheme) {
break;
}
- sstream << ":" << m_server_port << "/";
+ sstream << ":" << m_server_port << "/target";
return sstream.str();
}
});
}
+ template <typename Body>
+ void client_read_request(boost::asio::ip::tcp::socket& socket,
+ boost::beast::http::request<Body>& expected_req) {
+ boost::beast::http::request<Body> req;
+ boost::beast::error_code ec;
+ boost::beast::http::read(socket, m_buffer, req, ec);
+ ASSERT_FALSE(ec);
+
+ expected_req.target("/target");
+ ASSERT_EQ(expected_req, req);
+ }
+
+ void client_write_response(boost::asio::ip::tcp::socket& socket,
+ HttpResponse& expected_res) {
+ expected_res.result(boost::beast::http::status::ok);
+ expected_res.set(boost::beast::http::field::server,
+ BOOST_BEAST_VERSION_STRING);
+ expected_res.set(boost::beast::http::field::content_type, "text/plain");
+ expected_res.content_length(expected_res.body().size());
+ expected_res.prepare_payload();
+
+ boost::beast::error_code ec;
+ boost::beast::http::write(socket, expected_res, ec);
+ ASSERT_FALSE(ec);
+ }
+
template <typename Stream>
void client_ssl_handshake(Stream& stream, Context* on_handshake) {
stream.async_handshake(
librbd::ImageCtx *m_image_ctx;
std::optional<boost::asio::ip::tcp::acceptor> m_acceptor;
+ boost::beast::flat_buffer m_buffer;
uint64_t m_server_port = 0;
};
ASSERT_EQ(-ECONNREFUSED, ctx1.wait());
}
+TEST_F(TestMockMigrationHttpClient, IssueHead) {
+ boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+ C_SaferCond on_connect_ctx;
+ client_accept(&socket, false, &on_connect_ctx);
+
+ MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+ MockHttpClient http_client(&mock_test_image_ctx,
+ get_local_url(URL_SCHEME_HTTP));
+
+ C_SaferCond ctx1;
+ http_client.open(&ctx1);
+ ASSERT_EQ(0, on_connect_ctx.wait());
+ ASSERT_EQ(0, ctx1.wait());
+
+ EmptyHttpRequest req;
+ req.method(boost::beast::http::verb::head);
+
+ C_SaferCond ctx2;
+ HttpResponse res;
+ http_client.issue(EmptyHttpRequest{req},
+ [&ctx2, &res](int r, HttpResponse&& response) mutable {
+ res = std::move(response);
+ ctx2.complete(r);
+ });
+
+ HttpResponse expected_res;
+ client_read_request(socket, req);
+ client_write_response(socket, expected_res);
+
+ ASSERT_EQ(0, ctx2.wait());
+ ASSERT_EQ(expected_res, res);
+
+ C_SaferCond ctx3;
+ http_client.close(&ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, IssueGet) {
+ boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+ C_SaferCond on_connect_ctx;
+ client_accept(&socket, false, &on_connect_ctx);
+
+ MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+ MockHttpClient http_client(&mock_test_image_ctx,
+ get_local_url(URL_SCHEME_HTTP));
+
+ C_SaferCond ctx1;
+ http_client.open(&ctx1);
+ ASSERT_EQ(0, on_connect_ctx.wait());
+ ASSERT_EQ(0, ctx1.wait());
+
+ HttpRequest req;
+ req.method(boost::beast::http::verb::get);
+
+ C_SaferCond ctx2;
+ HttpResponse res;
+ http_client.issue(HttpRequest{req},
+ [&ctx2, &res](int r, HttpResponse&& response) mutable {
+ res = std::move(response);
+ ctx2.complete(r);
+ });
+
+ HttpResponse expected_res;
+ expected_res.body() = "test";
+ client_read_request(socket, req);
+ client_write_response(socket, expected_res);
+
+ ASSERT_EQ(0, ctx2.wait());
+ ASSERT_EQ(expected_res, res);
+
+ C_SaferCond ctx3;
+ http_client.close(&ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, IssueSendFailed) {
+ boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+ C_SaferCond on_connect_ctx1;
+ client_accept(&socket, false, &on_connect_ctx1);
+
+ MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+ MockHttpClient http_client(&mock_test_image_ctx,
+ get_local_url(URL_SCHEME_HTTP));
+
+ C_SaferCond ctx1;
+ http_client.open(&ctx1);
+ ASSERT_EQ(0, on_connect_ctx1.wait());
+ ASSERT_EQ(0, ctx1.wait());
+
+ // close connection to client
+ boost::system::error_code ec;
+ socket.close(ec);
+
+ C_SaferCond on_connect_ctx2;
+ client_accept(&socket, false, &on_connect_ctx2);
+
+ // send request via closed connection
+ HttpRequest req;
+ req.method(boost::beast::http::verb::get);
+
+ C_SaferCond ctx2;
+ http_client.issue(HttpRequest{req},
+ [&ctx2](int r, HttpResponse&&) mutable {
+ ctx2.complete(r);
+ });
+
+ // connection will be reset and request retried
+ ASSERT_EQ(0, on_connect_ctx2.wait());
+ HttpResponse expected_res;
+ expected_res.body() = "test";
+ client_read_request(socket, req);
+ client_write_response(socket, expected_res);
+ ASSERT_EQ(0, ctx2.wait());
+
+ C_SaferCond ctx3;
+ http_client.close(&ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, IssueReceiveFailed) {
+ boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+ C_SaferCond on_connect_ctx1;
+ client_accept(&socket, false, &on_connect_ctx1);
+
+ MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+ MockHttpClient http_client(&mock_test_image_ctx,
+ get_local_url(URL_SCHEME_HTTP));
+
+ C_SaferCond ctx1;
+ http_client.open(&ctx1);
+ ASSERT_EQ(0, on_connect_ctx1.wait());
+ ASSERT_EQ(0, ctx1.wait());
+
+ // send request via closed connection
+ HttpRequest req;
+ req.method(boost::beast::http::verb::get);
+
+ C_SaferCond ctx2;
+ http_client.issue(HttpRequest{req},
+ [&ctx2](int r, HttpResponse&&) mutable {
+ ctx2.complete(r);
+ });
+
+ C_SaferCond on_connect_ctx2;
+ client_accept(&socket, false, &on_connect_ctx2);
+
+ // close connection to client after reading request
+ client_read_request(socket, req);
+ boost::system::error_code ec;
+ socket.close(ec);
+
+ // connection will be reset and request retried
+ ASSERT_EQ(0, on_connect_ctx2.wait());
+ HttpResponse expected_res;
+ expected_res.body() = "test";
+ client_read_request(socket, req);
+ client_write_response(socket, expected_res);
+ ASSERT_EQ(0, ctx2.wait());
+
+ C_SaferCond ctx3;
+ http_client.close(&ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, IssueResetFailed) {
+ boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+ C_SaferCond on_connect_ctx1;
+ client_accept(&socket, false, &on_connect_ctx1);
+
+ MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+ MockHttpClient http_client(&mock_test_image_ctx,
+ get_local_url(URL_SCHEME_HTTP));
+
+ C_SaferCond ctx1;
+ http_client.open(&ctx1);
+ ASSERT_EQ(0, on_connect_ctx1.wait());
+ ASSERT_EQ(0, ctx1.wait());
+
+ // send requests then close connection
+ HttpRequest req;
+ req.method(boost::beast::http::verb::get);
+
+ C_SaferCond ctx2;
+ http_client.issue(HttpRequest{req},
+ [&ctx2](int r, HttpResponse&&) mutable {
+ ctx2.complete(r);
+ });
+
+ C_SaferCond ctx3;
+ http_client.issue(HttpRequest{req},
+ [&ctx3](int r, HttpResponse&&) mutable {
+ ctx3.complete(r);
+ });
+
+ client_read_request(socket, req);
+ client_read_request(socket, req);
+
+ // close connection to client and verify requests are failed
+ m_acceptor.reset();
+ boost::system::error_code ec;
+ socket.close(ec);
+
+ ASSERT_EQ(-ECONNREFUSED, ctx2.wait());
+ ASSERT_EQ(-ECONNREFUSED, ctx3.wait());
+
+ // additional request will retry the failed connection
+ create_acceptor();
+
+ C_SaferCond on_connect_ctx2;
+ client_accept(&socket, false, &on_connect_ctx2);
+
+ C_SaferCond ctx4;
+ http_client.issue(HttpRequest{req},
+ [&ctx4](int r, HttpResponse&&) mutable {
+ ctx4.complete(r);
+ });
+
+ ASSERT_EQ(0, on_connect_ctx2.wait());
+ client_read_request(socket, req);
+
+ HttpResponse expected_res;
+ expected_res.body() = "test";
+ client_write_response(socket, expected_res);
+ ASSERT_EQ(0, ctx4.wait());
+
+ C_SaferCond ctx5;
+ http_client.close(&ctx5);
+ ASSERT_EQ(0, ctx5.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, IssuePipelined) {
+ boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+ C_SaferCond on_connect_ctx;
+ client_accept(&socket, false, &on_connect_ctx);
+
+ MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+ MockHttpClient http_client(&mock_test_image_ctx,
+ get_local_url(URL_SCHEME_HTTP));
+
+ C_SaferCond ctx1;
+ http_client.open(&ctx1);
+ ASSERT_EQ(0, on_connect_ctx.wait());
+ ASSERT_EQ(0, ctx1.wait());
+
+ // issue two pipelined (concurrent) get requests
+ HttpRequest req1;
+ req1.method(boost::beast::http::verb::get);
+
+ C_SaferCond ctx2;
+ HttpResponse res1;
+ http_client.issue(HttpRequest{req1},
+ [&ctx2, &res1](int r, HttpResponse&& response) mutable {
+ res1 = std::move(response);
+ ctx2.complete(r);
+ });
+
+ HttpRequest req2;
+ req2.method(boost::beast::http::verb::get);
+
+ C_SaferCond ctx3;
+ HttpResponse res2;
+ http_client.issue(HttpRequest{req2},
+ [&ctx3, &res2](int r, HttpResponse&& response) mutable {
+ res2 = std::move(response);
+ ctx3.complete(r);
+ });
+
+ client_read_request(socket, req1);
+ client_read_request(socket, req2);
+
+ // read the responses sequentially
+ HttpResponse expected_res1;
+ expected_res1.body() = "test";
+ client_write_response(socket, expected_res1);
+ ASSERT_EQ(0, ctx2.wait());
+ ASSERT_EQ(expected_res1, res1);
+
+ HttpResponse expected_res2;
+ expected_res2.body() = "test";
+ client_write_response(socket, expected_res2);
+ ASSERT_EQ(0, ctx3.wait());
+ ASSERT_EQ(expected_res2, res2);
+
+ C_SaferCond ctx4;
+ http_client.close(&ctx4);
+ ASSERT_EQ(0, ctx4.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, IssuePipelinedRestart) {
+ boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+ C_SaferCond on_connect_ctx1;
+ client_accept(&socket, false, &on_connect_ctx1);
+
+ MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+ MockHttpClient http_client(&mock_test_image_ctx,
+ get_local_url(URL_SCHEME_HTTP));
+
+ C_SaferCond ctx1;
+ http_client.open(&ctx1);
+ ASSERT_EQ(0, on_connect_ctx1.wait());
+ ASSERT_EQ(0, ctx1.wait());
+
+ // issue two pipelined (concurrent) get requests
+ HttpRequest req1;
+ req1.keep_alive(false);
+ req1.method(boost::beast::http::verb::get);
+
+ C_SaferCond on_connect_ctx2;
+ client_accept(&socket, false, &on_connect_ctx2);
+
+ C_SaferCond ctx2;
+ HttpResponse res1;
+ http_client.issue(HttpRequest{req1},
+ [&ctx2, &res1](int r, HttpResponse&& response) mutable {
+ res1 = std::move(response);
+ ctx2.complete(r);
+ });
+
+ HttpRequest req2;
+ req2.method(boost::beast::http::verb::get);
+
+ C_SaferCond ctx3;
+ HttpResponse res2;
+ http_client.issue(HttpRequest{req2},
+ [&ctx3, &res2](int r, HttpResponse&& response) mutable {
+ res2 = std::move(response);
+ ctx3.complete(r);
+ });
+
+ client_read_request(socket, req1);
+ client_read_request(socket, req2);
+
+ // read the responses sequentially
+ HttpResponse expected_res1;
+ expected_res1.body() = "test";
+ expected_res1.keep_alive(false);
+ client_write_response(socket, expected_res1);
+ ASSERT_EQ(0, ctx2.wait());
+ ASSERT_EQ(expected_res1, res1);
+
+ // second request will need to be re-sent due to 'need_eof' condition
+ ASSERT_EQ(0, on_connect_ctx2.wait());
+ client_read_request(socket, req2);
+
+ HttpResponse expected_res2;
+ expected_res2.body() = "test";
+ client_write_response(socket, expected_res2);
+ ASSERT_EQ(0, ctx3.wait());
+ ASSERT_EQ(expected_res2, res2);
+
+ C_SaferCond ctx4;
+ http_client.close(&ctx4);
+ ASSERT_EQ(0, ctx4.wait());
+}
+
+TEST_F(TestMockMigrationHttpClient, ShutdownInFlight) {
+ boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
+ C_SaferCond on_connect_ctx;
+ client_accept(&socket, false, &on_connect_ctx);
+
+ MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
+ MockHttpClient http_client(&mock_test_image_ctx,
+ get_local_url(URL_SCHEME_HTTP));
+
+ C_SaferCond ctx1;
+ http_client.open(&ctx1);
+ ASSERT_EQ(0, on_connect_ctx.wait());
+ ASSERT_EQ(0, ctx1.wait());
+
+ HttpRequest req;
+ req.method(boost::beast::http::verb::get);
+
+ C_SaferCond ctx2;
+ http_client.issue(HttpRequest{req},
+ [&ctx2](int r, HttpResponse&&) mutable {
+ ctx2.complete(r);
+ });
+
+ client_read_request(socket, req);
+
+ C_SaferCond ctx3;
+ http_client.close(&ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+ ASSERT_EQ(-ESHUTDOWN, ctx2.wait());
+}
+
} // namespace migration
} // namespace librbd