From: Jason Dillaman Date: Wed, 4 Nov 2020 20:26:40 +0000 (-0500) Subject: librbd/migration: implement http stream interface X-Git-Tag: v16.1.0~527^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=30192d987ff601106007a49190da7bd1d971d560;p=ceph.git librbd/migration: implement http stream interface This HttpStream class wraps an HttpClient and proxies all IO requests to HTTP range get requests. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt index 17fbad4c739a5..a59b4939ebf06 100644 --- a/src/librbd/CMakeLists.txt +++ b/src/librbd/CMakeLists.txt @@ -127,6 +127,7 @@ set(librbd_internal_srcs managed_lock/Utils.cc migration/FileStream.cc migration/HttpClient.cc + migration/HttpStream.cc migration/ImageDispatch.cc migration/NativeFormat.cc migration/OpenSourceImageRequest.cc diff --git a/src/librbd/migration/HttpStream.cc b/src/librbd/migration/HttpStream.cc new file mode 100644 index 0000000000000..e035092dc7616 --- /dev/null +++ b/src/librbd/migration/HttpStream.cc @@ -0,0 +1,148 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/migration/HttpStream.h" +#include "common/dout.h" +#include "common/errno.h" +#include "librbd/AsioEngine.h" +#include "librbd/ImageCtx.h" +#include "librbd/Utils.h" +#include "librbd/asio/Utils.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ReadResult.h" +#include "librbd/migration/HttpClient.h" +#include + +namespace librbd { +namespace migration { + +namespace { + +const std::string URL_KEY {"url"}; + +} // anonymous namespace + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::HttpStream: " << this \ + << " " << __func__ << ": " + +template +HttpStream::HttpStream(I* image_ctx, const json_spirit::mObject& json_object) + : m_image_ctx(image_ctx), m_cct(image_ctx->cct), + m_asio_engine(image_ctx->asio_engine), m_json_object(json_object) { +} + +template +HttpStream::~HttpStream() { +} + +template +void HttpStream::open(Context* on_finish) { + auto& url_value = m_json_object[URL_KEY]; + if (url_value.type() != json_spirit::str_type) { + lderr(m_cct) << "failed to locate '" << URL_KEY << "' key" << dendl; + on_finish->complete(-EINVAL); + return; + } + + m_url = url_value.get_str(); + ldout(m_cct, 10) << "url=" << m_url << dendl; + + m_http_client.reset(HttpClient::create(m_image_ctx, m_url)); + m_http_client->open(on_finish); +} + +template +void HttpStream::close(Context* on_finish) { + ldout(m_cct, 10) << dendl; + + if (!m_http_client) { + on_finish->complete(0); + } + + m_http_client->close(on_finish); +} + +template +void HttpStream::get_size(uint64_t* size, Context* on_finish) { + ldout(m_cct, 10) << dendl; + + m_http_client->get_size(size, on_finish); +} + +template +void HttpStream::read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) { + using HttpRequest = boost::beast::http::request< + boost::beast::http::empty_body>; + + ldout(m_cct, 20) << dendl; + + auto aio_comp = io::AioCompletion::create_and_start( + on_finish, util::get_image_ctx(m_image_ctx), io::AIO_TYPE_READ); + aio_comp->set_request_count(byte_extents.size()); + + // utilize ReadResult to assemble multiple byte extents into a single bl + // since boost::beast doesn't support multipart responses out-of-the-box + io::ReadResult read_result{data}; + aio_comp->read_result = std::move(read_result); + aio_comp->read_result.set_image_extents(byte_extents); + + // issue a range get request for each extent + uint64_t buffer_offset = 0; + for (auto [byte_offset, byte_length] : byte_extents) { + auto ctx = new io::ReadResult::C_ImageReadRequest( + aio_comp, buffer_offset, {{byte_offset, byte_length}}); + buffer_offset += byte_length; + + HttpRequest req; + req.method(boost::beast::http::verb::get); + + std::stringstream range; + ceph_assert(byte_length > 0); + range << "bytes=" << byte_offset << "-" << (byte_offset + byte_length - 1); + req.set(boost::beast::http::field::range, range.str()); + + m_http_client->issue(std::move(req), + [this, byte_offset, byte_length, ctx](int r, HttpResponse&& response) { + handle_read(r, std::move(response), byte_offset, byte_length, &ctx->bl, + ctx); + }); + } +} + +template +void HttpStream::handle_read(int r, HttpResponse&& response, + uint64_t byte_offset, uint64_t byte_length, + bufferlist* data, Context* on_finish) { + ldout(m_cct, 20) << "bytes=" << byte_offset << "~" << byte_length << ", " + << "r=" << r << dendl; + + if (r < 0) { + lderr(m_cct) << "failed to read requested byte range: " + << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } else if (response.result() != boost::beast::http::status::partial_content) { + lderr(m_cct) << "failed to retrieve requested byte range: HTTP " + << response.result() << dendl; + on_finish->complete(-EIO); + return; + } else if (byte_length != response.body().size()) { + lderr(m_cct) << "unexpected short range read: " + << "wanted=" << byte_length << ", " + << "received=" << response.body().size() << dendl; + on_finish->complete(-EINVAL); + return; + } + + data->clear(); + data->append(response.body()); + on_finish->complete(data->length()); +} + +} // namespace migration +} // namespace librbd + +template class librbd::migration::HttpStream; diff --git a/src/librbd/migration/HttpStream.h b/src/librbd/migration/HttpStream.h new file mode 100644 index 0000000000000..eba28864ee368 --- /dev/null +++ b/src/librbd/migration/HttpStream.h @@ -0,0 +1,70 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H +#define CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H + +#include "include/int_types.h" +#include "librbd/migration/StreamInterface.h" +#include +#include +#include +#include +#include + +struct Context; + +namespace librbd { + +struct AsioEngine; +struct ImageCtx; + +namespace migration { + +template class HttpClient; + +template +class HttpStream : public StreamInterface { +public: + static HttpStream* create(ImageCtxT* image_ctx, + const json_spirit::mObject& json_object) { + return new HttpStream(image_ctx, json_object); + } + + HttpStream(ImageCtxT* image_ctx, const json_spirit::mObject& json_object); + ~HttpStream() override; + + HttpStream(const HttpStream&) = delete; + HttpStream& operator=(const HttpStream&) = delete; + + void open(Context* on_finish) override; + void close(Context* on_finish) override; + + void get_size(uint64_t* size, Context* on_finish) override; + + void read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) override; + +private: + using HttpResponse = boost::beast::http::response< + boost::beast::http::string_body>; + + ImageCtxT* m_image_ctx; + CephContext* m_cct; + std::shared_ptr m_asio_engine; + json_spirit::mObject m_json_object; + + std::string m_url; + + std::unique_ptr> m_http_client; + + void handle_read(int r, HttpResponse&& response, uint64_t byte_offset, + uint64_t byte_length, bufferlist* data, Context* on_finish); +}; + +} // namespace migration +} // namespace librbd + +extern template class librbd::migration::HttpStream; + +#endif // CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H diff --git a/src/librbd/migration/SourceSpecBuilder.cc b/src/librbd/migration/SourceSpecBuilder.cc index 99c5e10c0cd27..45fdce0398096 100644 --- a/src/librbd/migration/SourceSpecBuilder.cc +++ b/src/librbd/migration/SourceSpecBuilder.cc @@ -5,6 +5,7 @@ #include "common/dout.h" #include "librbd/ImageCtx.h" #include "librbd/migration/FileStream.h" +#include "librbd/migration/HttpStream.h" #include "librbd/migration/NativeFormat.h" #include "librbd/migration/RawFormat.h" @@ -96,6 +97,8 @@ int SourceSpecBuilder::build_stream( auto& type = type_value_it->second.get_str(); if (type == "file") { stream->reset(FileStream::create(m_image_ctx, stream_obj)); + } else if (type == "http") { + stream->reset(HttpStream::create(m_image_ctx, stream_obj)); } else { lderr(cct) << "unknown or unsupported stream type '" << type << "'" << dendl; diff --git a/src/test/librbd/CMakeLists.txt b/src/test/librbd/CMakeLists.txt index 8ee29357d6b15..12d80ccecba30 100644 --- a/src/test/librbd/CMakeLists.txt +++ b/src/test/librbd/CMakeLists.txt @@ -91,6 +91,7 @@ set(unittest_librbd_srcs managed_lock/test_mock_ReleaseRequest.cc migration/test_mock_FileStream.cc migration/test_mock_HttpClient.cc + migration/test_mock_HttpStream.cc migration/test_mock_RawFormat.cc migration/test_mock_Utils.cc mirror/snapshot/test_mock_CreateNonPrimaryRequest.cc diff --git a/src/test/librbd/migration/test_mock_HttpStream.cc b/src/test/librbd/migration/test_mock_HttpStream.cc new file mode 100644 index 0000000000000..84e32c3ac610a --- /dev/null +++ b/src/test/librbd/migration/test_mock_HttpStream.cc @@ -0,0 +1,236 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/librbd/test_mock_fixture.h" +#include "test/librbd/test_support.h" +#include "include/rbd_types.h" +#include "common/ceph_mutex.h" +#include "librbd/migration/HttpClient.h" +#include "librbd/migration/HttpStream.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "json_spirit/json_spirit.h" +#include + +namespace librbd { +namespace { + +struct MockTestImageCtx : public MockImageCtx { + MockTestImageCtx(ImageCtx &image_ctx) : MockImageCtx(image_ctx) { + } +}; + +} // anonymous namespace + +namespace migration { + +template <> +struct HttpClient { + static HttpClient* s_instance; + static HttpClient* create(MockTestImageCtx*, const std::string&) { + ceph_assert(s_instance != nullptr); + return s_instance; + } + + MOCK_METHOD1(open, void(Context*)); + MOCK_METHOD1(close, void(Context*)); + MOCK_METHOD2(get_size, void(uint64_t*, Context*)); + + MOCK_METHOD3(issue, void(const boost::beast::http::request< + boost::beast::http::empty_body>&, + boost::beast::http::response< + boost::beast::http::string_body>*, + Context*)); + + template + void issue(boost::beast::http::request&& request, + Completion&& completion) { + struct ContextImpl : public Context { + boost::beast::http::response res; + Completion completion; + + ContextImpl(Completion&& completion) : completion(std::move(completion)) { + } + + void finish(int r) override { + completion(r, std::move(res)); + } + }; + + auto ctx = new ContextImpl(std::move(completion)); + issue(request, &ctx->res, ctx); + } + + HttpClient() { + s_instance = this; + } +}; + +HttpClient* HttpClient::s_instance = nullptr; + +} // namespace migration + +namespace util { + +inline ImageCtx *get_image_ctx(MockTestImageCtx *image_ctx) { + return image_ctx->image_ctx; +} + +} // namespace util +} // namespace librbd + +#include "librbd/migration/HttpStream.cc" + +namespace librbd { +namespace migration { + +using ::testing::_; +using ::testing::Invoke; +using ::testing::InSequence; + +class TestMockMigrationHttpStream : public TestMockFixture { +public: + typedef HttpStream MockHttpStream; + typedef HttpClient MockHttpClient; + + librbd::ImageCtx *m_image_ctx; + + void SetUp() override { + TestMockFixture::SetUp(); + + ASSERT_EQ(0, open_image(m_image_name, &m_image_ctx)); + json_object["url"] = "http://some.site/file"; + } + + void expect_open(MockHttpClient& mock_http_client, int r) { + EXPECT_CALL(mock_http_client, open(_)) + .WillOnce(Invoke([r](Context* ctx) { ctx->complete(r); })); + } + + void expect_close(MockHttpClient& mock_http_client, int r) { + EXPECT_CALL(mock_http_client, close(_)) + .WillOnce(Invoke([r](Context* ctx) { ctx->complete(r); })); + } + + void expect_get_size(MockHttpClient& mock_http_client, uint64_t size, int r) { + EXPECT_CALL(mock_http_client, get_size(_, _)) + .WillOnce(Invoke([size, r](uint64_t* out_size, Context* ctx) { + *out_size = size; + ctx->complete(r); + })); + } + + void expect_read(MockHttpClient& mock_http_client, io::Extent byte_extent, + const std::string& data, int r) { + std::stringstream byte_range; + byte_range << byte_extent.first << "-" + << (byte_extent.first + byte_extent.second - 1); + + EXPECT_CALL(mock_http_client, issue(_, _, _)) + .WillOnce(Invoke( + [data, r, byte_range=byte_range.str()] + (const boost::beast::http::request< + boost::beast::http::empty_body>& request, + boost::beast::http::response< + boost::beast::http::string_body>* response, Context* ctx) { + ASSERT_EQ("bytes=" + byte_range, + request[boost::beast::http::field::range]); + + response->result(boost::beast::http::status::partial_content); + response->set(boost::beast::http::field::content_range, + "bytes " + byte_range + "/*"); + response->body() = data; + response->prepare_payload(); + ctx->complete(r); + })); + } + + json_spirit::mObject json_object; +}; + +TEST_F(TestMockMigrationHttpStream, OpenClose) { + MockTestImageCtx mock_image_ctx(*m_image_ctx); + + InSequence seq; + + auto mock_http_client = new MockHttpClient(); + expect_open(*mock_http_client, 0); + + expect_close(*mock_http_client, 0); + + MockHttpStream mock_http_stream(&mock_image_ctx, json_object); + + C_SaferCond ctx1; + mock_http_stream.open(&ctx1); + ASSERT_EQ(0, ctx1.wait()); + + C_SaferCond ctx2; + mock_http_stream.close(&ctx2); + ASSERT_EQ(0, ctx2.wait()); +} + +TEST_F(TestMockMigrationHttpStream, GetSize) { + MockTestImageCtx mock_image_ctx(*m_image_ctx); + + InSequence seq; + + auto mock_http_client = new MockHttpClient(); + expect_open(*mock_http_client, 0); + + expect_get_size(*mock_http_client, 128, 0); + + expect_close(*mock_http_client, 0); + + MockHttpStream mock_http_stream(&mock_image_ctx, json_object); + + C_SaferCond ctx1; + mock_http_stream.open(&ctx1); + ASSERT_EQ(0, ctx1.wait()); + + C_SaferCond ctx2; + uint64_t size; + mock_http_stream.get_size(&size, &ctx2); + ASSERT_EQ(0, ctx2.wait()); + ASSERT_EQ(128, size); + + C_SaferCond ctx3; + mock_http_stream.close(&ctx3); + ASSERT_EQ(0, ctx3.wait()); +} + +TEST_F(TestMockMigrationHttpStream, Read) { + MockTestImageCtx mock_image_ctx(*m_image_ctx); + + InSequence seq; + + auto mock_http_client = new MockHttpClient(); + expect_open(*mock_http_client, 0); + + expect_read(*mock_http_client, {0, 128}, std::string(128, '1'), 0); + expect_read(*mock_http_client, {256, 64}, std::string(64, '2'), 0); + + expect_close(*mock_http_client, 0); + + MockHttpStream mock_http_stream(&mock_image_ctx, json_object); + + C_SaferCond ctx1; + mock_http_stream.open(&ctx1); + ASSERT_EQ(0, ctx1.wait()); + + C_SaferCond ctx2; + bufferlist bl; + mock_http_stream.read({{0, 128}, {256, 64}}, &bl, &ctx2); + ASSERT_EQ(192, ctx2.wait()); + + bufferlist expect_bl; + expect_bl.append(std::string(128, '1')); + expect_bl.append(std::string(64, '2')); + ASSERT_EQ(expect_bl, bl); + + C_SaferCond ctx3; + mock_http_stream.close(&ctx3); + ASSERT_EQ(0, ctx3.wait()); +} + +} // namespace migration +} // namespace librbd