managed_lock/Utils.cc
migration/FileStream.cc
migration/HttpClient.cc
+ migration/HttpStream.cc
migration/ImageDispatch.cc
migration/NativeFormat.cc
migration/OpenSourceImageRequest.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/migration/HttpStream.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/AsioEngine.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+#include "librbd/asio/Utils.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/ReadResult.h"
+#include "librbd/migration/HttpClient.h"
+#include <boost/beast/http.hpp>
+
+namespace librbd {
+namespace migration {
+
+namespace {
+
+const std::string URL_KEY {"url"};
+
+} // anonymous namespace
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::HttpStream: " << this \
+ << " " << __func__ << ": "
+
+template <typename I>
+HttpStream<I>::HttpStream(I* image_ctx, const json_spirit::mObject& json_object)
+ : m_image_ctx(image_ctx), m_cct(image_ctx->cct),
+ m_asio_engine(image_ctx->asio_engine), m_json_object(json_object) {
+}
+
+template <typename I>
+HttpStream<I>::~HttpStream() {
+}
+
+template <typename I>
+void HttpStream<I>::open(Context* on_finish) {
+ auto& url_value = m_json_object[URL_KEY];
+ if (url_value.type() != json_spirit::str_type) {
+ lderr(m_cct) << "failed to locate '" << URL_KEY << "' key" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ m_url = url_value.get_str();
+ ldout(m_cct, 10) << "url=" << m_url << dendl;
+
+ m_http_client.reset(HttpClient<I>::create(m_image_ctx, m_url));
+ m_http_client->open(on_finish);
+}
+
+template <typename I>
+void HttpStream<I>::close(Context* on_finish) {
+ ldout(m_cct, 10) << dendl;
+
+ if (!m_http_client) {
+ on_finish->complete(0);
+ }
+
+ m_http_client->close(on_finish);
+}
+
+template <typename I>
+void HttpStream<I>::get_size(uint64_t* size, Context* on_finish) {
+ ldout(m_cct, 10) << dendl;
+
+ m_http_client->get_size(size, on_finish);
+}
+
+template <typename I>
+void HttpStream<I>::read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) {
+ using HttpRequest = boost::beast::http::request<
+ boost::beast::http::empty_body>;
+
+ ldout(m_cct, 20) << dendl;
+
+ auto aio_comp = io::AioCompletion::create_and_start(
+ on_finish, util::get_image_ctx(m_image_ctx), io::AIO_TYPE_READ);
+ aio_comp->set_request_count(byte_extents.size());
+
+ // utilize ReadResult to assemble multiple byte extents into a single bl
+ // since boost::beast doesn't support multipart responses out-of-the-box
+ io::ReadResult read_result{data};
+ aio_comp->read_result = std::move(read_result);
+ aio_comp->read_result.set_image_extents(byte_extents);
+
+ // issue a range get request for each extent
+ uint64_t buffer_offset = 0;
+ for (auto [byte_offset, byte_length] : byte_extents) {
+ auto ctx = new io::ReadResult::C_ImageReadRequest(
+ aio_comp, buffer_offset, {{byte_offset, byte_length}});
+ buffer_offset += byte_length;
+
+ HttpRequest req;
+ req.method(boost::beast::http::verb::get);
+
+ std::stringstream range;
+ ceph_assert(byte_length > 0);
+ range << "bytes=" << byte_offset << "-" << (byte_offset + byte_length - 1);
+ req.set(boost::beast::http::field::range, range.str());
+
+ m_http_client->issue(std::move(req),
+ [this, byte_offset, byte_length, ctx](int r, HttpResponse&& response) {
+ handle_read(r, std::move(response), byte_offset, byte_length, &ctx->bl,
+ ctx);
+ });
+ }
+}
+
+template <typename I>
+void HttpStream<I>::handle_read(int r, HttpResponse&& response,
+ uint64_t byte_offset, uint64_t byte_length,
+ bufferlist* data, Context* on_finish) {
+ ldout(m_cct, 20) << "bytes=" << byte_offset << "~" << byte_length << ", "
+ << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(m_cct) << "failed to read requested byte range: "
+ << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ } else if (response.result() != boost::beast::http::status::partial_content) {
+ lderr(m_cct) << "failed to retrieve requested byte range: HTTP "
+ << response.result() << dendl;
+ on_finish->complete(-EIO);
+ return;
+ } else if (byte_length != response.body().size()) {
+ lderr(m_cct) << "unexpected short range read: "
+ << "wanted=" << byte_length << ", "
+ << "received=" << response.body().size() << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ data->clear();
+ data->append(response.body());
+ on_finish->complete(data->length());
+}
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::HttpStream<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H
+#define CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H
+
+#include "include/int_types.h"
+#include "librbd/migration/StreamInterface.h"
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <json_spirit/json_spirit.h>
+#include <memory>
+#include <string>
+
+struct Context;
+
+namespace librbd {
+
+struct AsioEngine;
+struct ImageCtx;
+
+namespace migration {
+
+template <typename> class HttpClient;
+
+template <typename ImageCtxT>
+class HttpStream : public StreamInterface {
+public:
+ static HttpStream* create(ImageCtxT* image_ctx,
+ const json_spirit::mObject& json_object) {
+ return new HttpStream(image_ctx, json_object);
+ }
+
+ HttpStream(ImageCtxT* image_ctx, const json_spirit::mObject& json_object);
+ ~HttpStream() override;
+
+ HttpStream(const HttpStream&) = delete;
+ HttpStream& operator=(const HttpStream&) = delete;
+
+ void open(Context* on_finish) override;
+ void close(Context* on_finish) override;
+
+ void get_size(uint64_t* size, Context* on_finish) override;
+
+ void read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) override;
+
+private:
+ using HttpResponse = boost::beast::http::response<
+ boost::beast::http::string_body>;
+
+ ImageCtxT* m_image_ctx;
+ CephContext* m_cct;
+ std::shared_ptr<AsioEngine> m_asio_engine;
+ json_spirit::mObject m_json_object;
+
+ std::string m_url;
+
+ std::unique_ptr<HttpClient<ImageCtxT>> m_http_client;
+
+ void handle_read(int r, HttpResponse&& response, uint64_t byte_offset,
+ uint64_t byte_length, bufferlist* data, Context* on_finish);
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::HttpStream<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H
#include "common/dout.h"
#include "librbd/ImageCtx.h"
#include "librbd/migration/FileStream.h"
+#include "librbd/migration/HttpStream.h"
#include "librbd/migration/NativeFormat.h"
#include "librbd/migration/RawFormat.h"
auto& type = type_value_it->second.get_str();
if (type == "file") {
stream->reset(FileStream<I>::create(m_image_ctx, stream_obj));
+ } else if (type == "http") {
+ stream->reset(HttpStream<I>::create(m_image_ctx, stream_obj));
} else {
lderr(cct) << "unknown or unsupported stream type '" << type << "'"
<< dendl;
managed_lock/test_mock_ReleaseRequest.cc
migration/test_mock_FileStream.cc
migration/test_mock_HttpClient.cc
+ migration/test_mock_HttpStream.cc
migration/test_mock_RawFormat.cc
migration/test_mock_Utils.cc
mirror/snapshot/test_mock_CreateNonPrimaryRequest.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/test_mock_fixture.h"
+#include "test/librbd/test_support.h"
+#include "include/rbd_types.h"
+#include "common/ceph_mutex.h"
+#include "librbd/migration/HttpClient.h"
+#include "librbd/migration/HttpStream.h"
+#include "gtest/gtest.h"
+#include "gmock/gmock.h"
+#include "json_spirit/json_spirit.h"
+#include <boost/beast/http.hpp>
+
+namespace librbd {
+namespace {
+
+struct MockTestImageCtx : public MockImageCtx {
+ MockTestImageCtx(ImageCtx &image_ctx) : MockImageCtx(image_ctx) {
+ }
+};
+
+} // anonymous namespace
+
+namespace migration {
+
+template <>
+struct HttpClient<MockTestImageCtx> {
+ static HttpClient* s_instance;
+ static HttpClient* create(MockTestImageCtx*, const std::string&) {
+ ceph_assert(s_instance != nullptr);
+ return s_instance;
+ }
+
+ MOCK_METHOD1(open, void(Context*));
+ MOCK_METHOD1(close, void(Context*));
+ MOCK_METHOD2(get_size, void(uint64_t*, Context*));
+
+ MOCK_METHOD3(issue, void(const boost::beast::http::request<
+ boost::beast::http::empty_body>&,
+ boost::beast::http::response<
+ boost::beast::http::string_body>*,
+ Context*));
+
+ template <class Body, typename Completion>
+ void issue(boost::beast::http::request<Body>&& request,
+ Completion&& completion) {
+ struct ContextImpl : public Context {
+ boost::beast::http::response<boost::beast::http::string_body> res;
+ Completion completion;
+
+ ContextImpl(Completion&& completion) : completion(std::move(completion)) {
+ }
+
+ void finish(int r) override {
+ completion(r, std::move(res));
+ }
+ };
+
+ auto ctx = new ContextImpl(std::move(completion));
+ issue(request, &ctx->res, ctx);
+ }
+
+ HttpClient() {
+ s_instance = this;
+ }
+};
+
+HttpClient<MockTestImageCtx>* HttpClient<MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace migration
+
+namespace util {
+
+inline ImageCtx *get_image_ctx(MockTestImageCtx *image_ctx) {
+ return image_ctx->image_ctx;
+}
+
+} // namespace util
+} // namespace librbd
+
+#include "librbd/migration/HttpStream.cc"
+
+namespace librbd {
+namespace migration {
+
+using ::testing::_;
+using ::testing::Invoke;
+using ::testing::InSequence;
+
+class TestMockMigrationHttpStream : public TestMockFixture {
+public:
+ typedef HttpStream<MockTestImageCtx> MockHttpStream;
+ typedef HttpClient<MockTestImageCtx> MockHttpClient;
+
+ librbd::ImageCtx *m_image_ctx;
+
+ void SetUp() override {
+ TestMockFixture::SetUp();
+
+ ASSERT_EQ(0, open_image(m_image_name, &m_image_ctx));
+ json_object["url"] = "http://some.site/file";
+ }
+
+ void expect_open(MockHttpClient& mock_http_client, int r) {
+ EXPECT_CALL(mock_http_client, open(_))
+ .WillOnce(Invoke([r](Context* ctx) { ctx->complete(r); }));
+ }
+
+ void expect_close(MockHttpClient& mock_http_client, int r) {
+ EXPECT_CALL(mock_http_client, close(_))
+ .WillOnce(Invoke([r](Context* ctx) { ctx->complete(r); }));
+ }
+
+ void expect_get_size(MockHttpClient& mock_http_client, uint64_t size, int r) {
+ EXPECT_CALL(mock_http_client, get_size(_, _))
+ .WillOnce(Invoke([size, r](uint64_t* out_size, Context* ctx) {
+ *out_size = size;
+ ctx->complete(r);
+ }));
+ }
+
+ void expect_read(MockHttpClient& mock_http_client, io::Extent byte_extent,
+ const std::string& data, int r) {
+ std::stringstream byte_range;
+ byte_range << byte_extent.first << "-"
+ << (byte_extent.first + byte_extent.second - 1);
+
+ EXPECT_CALL(mock_http_client, issue(_, _, _))
+ .WillOnce(Invoke(
+ [data, r, byte_range=byte_range.str()]
+ (const boost::beast::http::request<
+ boost::beast::http::empty_body>& request,
+ boost::beast::http::response<
+ boost::beast::http::string_body>* response, Context* ctx) {
+ ASSERT_EQ("bytes=" + byte_range,
+ request[boost::beast::http::field::range]);
+
+ response->result(boost::beast::http::status::partial_content);
+ response->set(boost::beast::http::field::content_range,
+ "bytes " + byte_range + "/*");
+ response->body() = data;
+ response->prepare_payload();
+ ctx->complete(r);
+ }));
+ }
+
+ json_spirit::mObject json_object;
+};
+
+TEST_F(TestMockMigrationHttpStream, OpenClose) {
+ MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+ InSequence seq;
+
+ auto mock_http_client = new MockHttpClient();
+ expect_open(*mock_http_client, 0);
+
+ expect_close(*mock_http_client, 0);
+
+ MockHttpStream mock_http_stream(&mock_image_ctx, json_object);
+
+ C_SaferCond ctx1;
+ mock_http_stream.open(&ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ C_SaferCond ctx2;
+ mock_http_stream.close(&ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+}
+
+TEST_F(TestMockMigrationHttpStream, GetSize) {
+ MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+ InSequence seq;
+
+ auto mock_http_client = new MockHttpClient();
+ expect_open(*mock_http_client, 0);
+
+ expect_get_size(*mock_http_client, 128, 0);
+
+ expect_close(*mock_http_client, 0);
+
+ MockHttpStream mock_http_stream(&mock_image_ctx, json_object);
+
+ C_SaferCond ctx1;
+ mock_http_stream.open(&ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ C_SaferCond ctx2;
+ uint64_t size;
+ mock_http_stream.get_size(&size, &ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+ ASSERT_EQ(128, size);
+
+ C_SaferCond ctx3;
+ mock_http_stream.close(&ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationHttpStream, Read) {
+ MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+ InSequence seq;
+
+ auto mock_http_client = new MockHttpClient();
+ expect_open(*mock_http_client, 0);
+
+ expect_read(*mock_http_client, {0, 128}, std::string(128, '1'), 0);
+ expect_read(*mock_http_client, {256, 64}, std::string(64, '2'), 0);
+
+ expect_close(*mock_http_client, 0);
+
+ MockHttpStream mock_http_stream(&mock_image_ctx, json_object);
+
+ C_SaferCond ctx1;
+ mock_http_stream.open(&ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ C_SaferCond ctx2;
+ bufferlist bl;
+ mock_http_stream.read({{0, 128}, {256, 64}}, &bl, &ctx2);
+ ASSERT_EQ(192, ctx2.wait());
+
+ bufferlist expect_bl;
+ expect_bl.append(std::string(128, '1'));
+ expect_bl.append(std::string(64, '2'));
+ ASSERT_EQ(expect_bl, bl);
+
+ C_SaferCond ctx3;
+ mock_http_stream.close(&ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+}
+
+} // namespace migration
+} // namespace librbd