#include "include/common_fwd.h"
#include "include/int_types.h"
#include "librbd/io/Types.h"
+#include "librbd/migration/HttpProcessorInterface.h"
#include "librbd/migration/Types.h"
#include <boost/asio/io_context_strand.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/write.hpp>
#include <boost/beast/ssl/ssl_stream.hpp>
+#include <functional>
#include <memory>
#include <string>
#include <utility>
using Request = boost::beast::http::request<EmptyBody>;
using Response = boost::beast::http::response<StringBody>;
+ using RequestPreprocessor = std::function<void(Request&)>;
+
static HttpClient* create(ImageCtxT* image_ctx, const std::string& url) {
return new HttpClient(image_ctx, url);
}
m_ignore_self_signed_cert = ignore;
}
+ void set_http_processor(HttpProcessorInterface* http_processor) {
+ m_http_processor = http_processor;
+ }
+
template <class Body, typename Completion>
void issue(boost::beast::http::request<Body>&& request,
Completion&& completion) {
struct WorkImpl : Work {
+ HttpClient* http_client;
boost::beast::http::request<Body> request;
Completion completion;
- WorkImpl(boost::beast::http::request<Body>&& request,
+ WorkImpl(HttpClient* http_client,
+ boost::beast::http::request<Body>&& request,
Completion&& completion)
- : request(std::move(request)), completion(std::move(completion)) {
+ : http_client(http_client), request(std::move(request)),
+ completion(std::move(completion)) {
}
WorkImpl(const WorkImpl&) = delete;
WorkImpl& operator=(const WorkImpl&) = delete;
completion(r, std::move(response));
}
- void operator()(
- HttpSessionInterface* http_session,
- boost::beast::tcp_stream& stream) override {
+ void operator()(boost::beast::tcp_stream& stream) override {
+ preprocess_request();
+
boost::beast::http::async_write(
stream, request,
- [http_session, work=this->shared_from_this()]
+ [http_session=http_client->m_http_session.get(),
+ work=this->shared_from_this()]
(boost::beast::error_code ec, std::size_t) mutable {
http_session->handle_issue(ec, std::move(work));
});
}
void operator()(
- HttpSessionInterface* http_session,
boost::beast::ssl_stream<boost::beast::tcp_stream>& stream) override {
+ preprocess_request();
+
boost::beast::http::async_write(
stream, request,
- [http_session, work=this->shared_from_this()]
+ [http_session=http_client->m_http_session.get(),
+ work=this->shared_from_this()]
(boost::beast::error_code ec, std::size_t) mutable {
http_session->handle_issue(ec, std::move(work));
});
}
+
+ void preprocess_request() {
+ if (http_client->m_http_processor) {
+ http_client->m_http_processor->process_request(request);
+ }
+ }
};
initialize_default_fields(request);
- issue(std::make_shared<WorkImpl>(std::move(request),
+ issue(std::make_shared<WorkImpl>(this, std::move(request),
std::move(completion)));
}
virtual bool need_eof() const = 0;
virtual bool header_only() const = 0;
virtual void complete(int r, Response&&) = 0;
+ virtual void operator()(boost::beast::tcp_stream& stream) = 0;
virtual void operator()(
- HttpSessionInterface* http_session,
- boost::beast::tcp_stream& stream) = 0;
- virtual void operator()(
- HttpSessionInterface* http_session,
boost::beast::ssl_stream<boost::beast::tcp_stream>& stream) = 0;
};
bool m_ignore_self_signed_cert = false;
+ HttpProcessorInterface* m_http_processor = nullptr;
+
boost::asio::io_context::strand m_strand;
boost::asio::ssl::context m_ssl_context;
using EmptyHttpRequest = boost::beast::http::request<
boost::beast::http::empty_body>;
-using HttpRequest = boost::beast::http::request<
- boost::beast::http::string_body>;
using HttpResponse = boost::beast::http::response<
boost::beast::http::string_body>;
client_accept(&socket, false, &on_connect_ctx2);
// send request via closed connection
- HttpRequest req;
+ EmptyHttpRequest req;
req.method(boost::beast::http::verb::get);
C_SaferCond ctx2;
- http_client.issue(HttpRequest{req},
+ http_client.issue(EmptyHttpRequest{req},
[&ctx2](int r, HttpResponse&&) mutable {
ctx2.complete(r);
});
ASSERT_EQ(0, ctx1.wait());
// send request via closed connection
- HttpRequest req;
+ EmptyHttpRequest req;
req.method(boost::beast::http::verb::get);
C_SaferCond ctx2;
- http_client.issue(HttpRequest{req},
+ http_client.issue(EmptyHttpRequest{req},
[&ctx2](int r, HttpResponse&&) mutable {
ctx2.complete(r);
});
ASSERT_EQ(0, ctx1.wait());
// send requests then close connection
- HttpRequest req;
+ EmptyHttpRequest req;
req.method(boost::beast::http::verb::get);
C_SaferCond ctx2;
- http_client.issue(HttpRequest{req},
+ http_client.issue(EmptyHttpRequest{req},
[&ctx2](int r, HttpResponse&&) mutable {
ctx2.complete(r);
});
C_SaferCond ctx3;
- http_client.issue(HttpRequest{req},
+ http_client.issue(EmptyHttpRequest{req},
[&ctx3](int r, HttpResponse&&) mutable {
ctx3.complete(r);
});
client_accept(&socket, false, &on_connect_ctx2);
C_SaferCond ctx4;
- http_client.issue(HttpRequest{req},
+ http_client.issue(EmptyHttpRequest{req},
[&ctx4](int r, HttpResponse&&) mutable {
ctx4.complete(r);
});
ASSERT_EQ(0, ctx1.wait());
// issue two pipelined (concurrent) get requests
- HttpRequest req1;
+ EmptyHttpRequest req1;
req1.method(boost::beast::http::verb::get);
C_SaferCond ctx2;
HttpResponse res1;
- http_client.issue(HttpRequest{req1},
+ http_client.issue(EmptyHttpRequest{req1},
[&ctx2, &res1](int r, HttpResponse&& response) mutable {
res1 = std::move(response);
ctx2.complete(r);
});
- HttpRequest req2;
+ EmptyHttpRequest req2;
req2.method(boost::beast::http::verb::get);
C_SaferCond ctx3;
HttpResponse res2;
- http_client.issue(HttpRequest{req2},
+ http_client.issue(EmptyHttpRequest{req2},
[&ctx3, &res2](int r, HttpResponse&& response) mutable {
res2 = std::move(response);
ctx3.complete(r);
ASSERT_EQ(0, ctx1.wait());
// issue two pipelined (concurrent) get requests
- HttpRequest req1;
+ EmptyHttpRequest req1;
req1.keep_alive(false);
req1.method(boost::beast::http::verb::get);
C_SaferCond ctx2;
HttpResponse res1;
- http_client.issue(HttpRequest{req1},
+ http_client.issue(EmptyHttpRequest{req1},
[&ctx2, &res1](int r, HttpResponse&& response) mutable {
res1 = std::move(response);
ctx2.complete(r);
});
- HttpRequest req2;
+ EmptyHttpRequest req2;
req2.method(boost::beast::http::verb::get);
C_SaferCond ctx3;
HttpResponse res2;
- http_client.issue(HttpRequest{req2},
+ http_client.issue(EmptyHttpRequest{req2},
[&ctx3, &res2](int r, HttpResponse&& response) mutable {
res2 = std::move(response);
ctx3.complete(r);
ASSERT_EQ(0, on_connect_ctx.wait());
ASSERT_EQ(0, ctx1.wait());
- HttpRequest req;
+ EmptyHttpRequest req;
req.method(boost::beast::http::verb::get);
C_SaferCond ctx2;
- http_client.issue(HttpRequest{req},
+ http_client.issue(EmptyHttpRequest{req},
[&ctx2](int r, HttpResponse&&) mutable {
ctx2.complete(r);
});
C_SaferCond ctx2;
http_client.get_size(&size, &ctx2);
- HttpRequest expected_req;
+ EmptyHttpRequest expected_req;
expected_req.method(boost::beast::http::verb::head);
client_read_request(socket, expected_req);
C_SaferCond ctx2;
http_client.get_size(&size, &ctx2);
- HttpRequest expected_req;
+ EmptyHttpRequest expected_req;
expected_req.method(boost::beast::http::verb::head);
client_read_request(socket, expected_req);
C_SaferCond ctx2;
http_client.read({{0, 128}, {256, 64}}, &bl, &ctx2);
- HttpRequest expected_req1;
+ EmptyHttpRequest expected_req1;
expected_req1.method(boost::beast::http::verb::get);
expected_req1.set(boost::beast::http::field::range, "bytes=0-127");
client_read_request(socket, expected_req1);
- HttpRequest expected_req2;
+ EmptyHttpRequest expected_req2;
expected_req2.method(boost::beast::http::verb::get);
expected_req2.set(boost::beast::http::field::range, "bytes=256-319");
client_read_request(socket, expected_req2);