]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd/migration: implement http stream interface
authorJason Dillaman <dillaman@redhat.com>
Wed, 4 Nov 2020 20:26:40 +0000 (15:26 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 17 Nov 2020 01:25:16 +0000 (20:25 -0500)
This HttpStream class wraps an HttpClient and proxies all IO requests
to HTTP range get requests.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/CMakeLists.txt
src/librbd/migration/HttpStream.cc [new file with mode: 0644]
src/librbd/migration/HttpStream.h [new file with mode: 0644]
src/librbd/migration/SourceSpecBuilder.cc
src/test/librbd/CMakeLists.txt
src/test/librbd/migration/test_mock_HttpStream.cc [new file with mode: 0644]

index 17fbad4c739a51e16e5e9990215261215efff468..a59b4939ebf06b01473774d58452879424e74ad5 100644 (file)
@@ -127,6 +127,7 @@ set(librbd_internal_srcs
   managed_lock/Utils.cc
   migration/FileStream.cc
   migration/HttpClient.cc
+  migration/HttpStream.cc
   migration/ImageDispatch.cc
   migration/NativeFormat.cc
   migration/OpenSourceImageRequest.cc
diff --git a/src/librbd/migration/HttpStream.cc b/src/librbd/migration/HttpStream.cc
new file mode 100644 (file)
index 0000000..e035092
--- /dev/null
@@ -0,0 +1,148 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/migration/HttpStream.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/AsioEngine.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+#include "librbd/asio/Utils.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/ReadResult.h"
+#include "librbd/migration/HttpClient.h"
+#include <boost/beast/http.hpp>
+
+namespace librbd {
+namespace migration {
+
+namespace {
+
+const std::string URL_KEY {"url"};
+
+} // anonymous namespace
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::HttpStream: " << this \
+                           << " " << __func__ << ": "
+
+template <typename I>
+HttpStream<I>::HttpStream(I* image_ctx, const json_spirit::mObject& json_object)
+  : m_image_ctx(image_ctx), m_cct(image_ctx->cct),
+    m_asio_engine(image_ctx->asio_engine), m_json_object(json_object) {
+}
+
+template <typename I>
+HttpStream<I>::~HttpStream() {
+}
+
+template <typename I>
+void HttpStream<I>::open(Context* on_finish) {
+  auto& url_value = m_json_object[URL_KEY];
+  if (url_value.type() != json_spirit::str_type) {
+    lderr(m_cct) << "failed to locate '" << URL_KEY << "' key" << dendl;
+    on_finish->complete(-EINVAL);
+    return;
+  }
+
+  m_url = url_value.get_str();
+  ldout(m_cct, 10) << "url=" << m_url << dendl;
+
+  m_http_client.reset(HttpClient<I>::create(m_image_ctx, m_url));
+  m_http_client->open(on_finish);
+}
+
+template <typename I>
+void HttpStream<I>::close(Context* on_finish) {
+  ldout(m_cct, 10) << dendl;
+
+  if (!m_http_client) {
+    on_finish->complete(0);
+  }
+
+  m_http_client->close(on_finish);
+}
+
+template <typename I>
+void HttpStream<I>::get_size(uint64_t* size, Context* on_finish) {
+  ldout(m_cct, 10) << dendl;
+
+  m_http_client->get_size(size, on_finish);
+}
+
+template <typename I>
+void HttpStream<I>::read(io::Extents&& byte_extents, bufferlist* data,
+                         Context* on_finish) {
+  using HttpRequest = boost::beast::http::request<
+    boost::beast::http::empty_body>;
+
+  ldout(m_cct, 20) << dendl;
+
+  auto aio_comp = io::AioCompletion::create_and_start(
+    on_finish, util::get_image_ctx(m_image_ctx), io::AIO_TYPE_READ);
+  aio_comp->set_request_count(byte_extents.size());
+
+  // utilize ReadResult to assemble multiple byte extents into a single bl
+  // since boost::beast doesn't support multipart responses out-of-the-box
+  io::ReadResult read_result{data};
+  aio_comp->read_result = std::move(read_result);
+  aio_comp->read_result.set_image_extents(byte_extents);
+
+  // issue a range get request for each extent
+  uint64_t buffer_offset = 0;
+  for (auto [byte_offset, byte_length] : byte_extents) {
+    auto ctx = new io::ReadResult::C_ImageReadRequest(
+      aio_comp, buffer_offset, {{byte_offset, byte_length}});
+    buffer_offset += byte_length;
+
+    HttpRequest req;
+    req.method(boost::beast::http::verb::get);
+
+    std::stringstream range;
+    ceph_assert(byte_length > 0);
+    range << "bytes=" << byte_offset << "-" << (byte_offset + byte_length - 1);
+    req.set(boost::beast::http::field::range, range.str());
+
+    m_http_client->issue(std::move(req),
+      [this, byte_offset, byte_length, ctx](int r, HttpResponse&& response) {
+        handle_read(r, std::move(response), byte_offset, byte_length, &ctx->bl,
+                    ctx);
+     });
+  }
+}
+
+template <typename I>
+void HttpStream<I>::handle_read(int r, HttpResponse&& response,
+                                uint64_t byte_offset, uint64_t byte_length,
+                                bufferlist* data, Context* on_finish) {
+  ldout(m_cct, 20) << "bytes=" << byte_offset << "~" << byte_length << ", "
+                   << "r=" << r << dendl;
+
+  if (r < 0) {
+    lderr(m_cct) << "failed to read requested byte range: "
+                 << cpp_strerror(r) << dendl;
+    on_finish->complete(r);
+    return;
+  } else if (response.result() != boost::beast::http::status::partial_content) {
+    lderr(m_cct) << "failed to retrieve requested byte range: HTTP "
+                 << response.result() << dendl;
+    on_finish->complete(-EIO);
+    return;
+  } else if (byte_length != response.body().size()) {
+    lderr(m_cct) << "unexpected short range read: "
+                 << "wanted=" << byte_length << ", "
+                 << "received=" << response.body().size() << dendl;
+    on_finish->complete(-EINVAL);
+    return;
+  }
+
+  data->clear();
+  data->append(response.body());
+  on_finish->complete(data->length());
+}
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::HttpStream<librbd::ImageCtx>;
diff --git a/src/librbd/migration/HttpStream.h b/src/librbd/migration/HttpStream.h
new file mode 100644 (file)
index 0000000..eba2886
--- /dev/null
@@ -0,0 +1,70 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H
+#define CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H
+
+#include "include/int_types.h"
+#include "librbd/migration/StreamInterface.h"
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <json_spirit/json_spirit.h>
+#include <memory>
+#include <string>
+
+struct Context;
+
+namespace librbd {
+
+struct AsioEngine;
+struct ImageCtx;
+
+namespace migration {
+
+template <typename> class HttpClient;
+
+template <typename ImageCtxT>
+class HttpStream : public StreamInterface {
+public:
+  static HttpStream* create(ImageCtxT* image_ctx,
+                            const json_spirit::mObject& json_object) {
+    return new HttpStream(image_ctx, json_object);
+  }
+
+  HttpStream(ImageCtxT* image_ctx, const json_spirit::mObject& json_object);
+  ~HttpStream() override;
+
+  HttpStream(const HttpStream&) = delete;
+  HttpStream& operator=(const HttpStream&) = delete;
+
+  void open(Context* on_finish) override;
+  void close(Context* on_finish) override;
+
+  void get_size(uint64_t* size, Context* on_finish) override;
+
+  void read(io::Extents&& byte_extents, bufferlist* data,
+            Context* on_finish) override;
+
+private:
+  using HttpResponse = boost::beast::http::response<
+    boost::beast::http::string_body>;
+
+  ImageCtxT* m_image_ctx;
+  CephContext* m_cct;
+  std::shared_ptr<AsioEngine> m_asio_engine;
+  json_spirit::mObject m_json_object;
+
+  std::string m_url;
+
+  std::unique_ptr<HttpClient<ImageCtxT>> m_http_client;
+
+  void handle_read(int r, HttpResponse&& response, uint64_t byte_offset,
+                   uint64_t byte_length, bufferlist* data, Context* on_finish);
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::HttpStream<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H
index 99c5e10c0cd27d4607cb08d8a99e127b5f1aa1ba..45fdce03980960fb6d68f0a740beaa4be832c268 100644 (file)
@@ -5,6 +5,7 @@
 #include "common/dout.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/migration/FileStream.h"
+#include "librbd/migration/HttpStream.h"
 #include "librbd/migration/NativeFormat.h"
 #include "librbd/migration/RawFormat.h"
 
@@ -96,6 +97,8 @@ int SourceSpecBuilder<I>::build_stream(
   auto& type = type_value_it->second.get_str();
   if (type == "file") {
     stream->reset(FileStream<I>::create(m_image_ctx, stream_obj));
+  } else if (type == "http") {
+    stream->reset(HttpStream<I>::create(m_image_ctx, stream_obj));
   } else {
     lderr(cct) << "unknown or unsupported stream type '" << type << "'"
                << dendl;
index 8ee29357d6b157495eb98390a314b88591673acb..12d80ccecba30fcaf8fa2df4108d71103c672f14 100644 (file)
@@ -91,6 +91,7 @@ set(unittest_librbd_srcs
   managed_lock/test_mock_ReleaseRequest.cc
   migration/test_mock_FileStream.cc
   migration/test_mock_HttpClient.cc
+  migration/test_mock_HttpStream.cc
   migration/test_mock_RawFormat.cc
   migration/test_mock_Utils.cc
   mirror/snapshot/test_mock_CreateNonPrimaryRequest.cc
diff --git a/src/test/librbd/migration/test_mock_HttpStream.cc b/src/test/librbd/migration/test_mock_HttpStream.cc
new file mode 100644 (file)
index 0000000..84e32c3
--- /dev/null
@@ -0,0 +1,236 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/test_mock_fixture.h"
+#include "test/librbd/test_support.h"
+#include "include/rbd_types.h"
+#include "common/ceph_mutex.h"
+#include "librbd/migration/HttpClient.h"
+#include "librbd/migration/HttpStream.h"
+#include "gtest/gtest.h"
+#include "gmock/gmock.h"
+#include "json_spirit/json_spirit.h"
+#include <boost/beast/http.hpp>
+
+namespace librbd {
+namespace {
+
+struct MockTestImageCtx : public MockImageCtx {
+  MockTestImageCtx(ImageCtx &image_ctx) : MockImageCtx(image_ctx) {
+  }
+};
+
+} // anonymous namespace
+
+namespace migration {
+
+template <>
+struct HttpClient<MockTestImageCtx> {
+  static HttpClient* s_instance;
+  static HttpClient* create(MockTestImageCtx*, const std::string&) {
+    ceph_assert(s_instance != nullptr);
+    return s_instance;
+  }
+
+  MOCK_METHOD1(open, void(Context*));
+  MOCK_METHOD1(close, void(Context*));
+  MOCK_METHOD2(get_size, void(uint64_t*, Context*));
+
+  MOCK_METHOD3(issue, void(const boost::beast::http::request<
+                             boost::beast::http::empty_body>&,
+                           boost::beast::http::response<
+                             boost::beast::http::string_body>*,
+                           Context*));
+
+  template <class Body, typename Completion>
+  void issue(boost::beast::http::request<Body>&& request,
+             Completion&& completion) {
+    struct ContextImpl : public Context {
+      boost::beast::http::response<boost::beast::http::string_body> res;
+      Completion completion;
+
+      ContextImpl(Completion&& completion) : completion(std::move(completion)) {
+      }
+
+      void finish(int r) override {
+        completion(r, std::move(res));
+      }
+    };
+
+    auto ctx = new ContextImpl(std::move(completion));
+    issue(request, &ctx->res, ctx);
+  }
+
+  HttpClient() {
+    s_instance = this;
+  }
+};
+
+HttpClient<MockTestImageCtx>* HttpClient<MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace migration
+
+namespace util {
+
+inline ImageCtx *get_image_ctx(MockTestImageCtx *image_ctx) {
+  return image_ctx->image_ctx;
+}
+
+} // namespace util
+} // namespace librbd
+
+#include "librbd/migration/HttpStream.cc"
+
+namespace librbd {
+namespace migration {
+
+using ::testing::_;
+using ::testing::Invoke;
+using ::testing::InSequence;
+
+class TestMockMigrationHttpStream : public TestMockFixture {
+public:
+  typedef HttpStream<MockTestImageCtx> MockHttpStream;
+  typedef HttpClient<MockTestImageCtx> MockHttpClient;
+
+  librbd::ImageCtx *m_image_ctx;
+
+  void SetUp() override {
+    TestMockFixture::SetUp();
+
+    ASSERT_EQ(0, open_image(m_image_name, &m_image_ctx));
+    json_object["url"] = "http://some.site/file";
+  }
+
+  void expect_open(MockHttpClient& mock_http_client, int r) {
+    EXPECT_CALL(mock_http_client, open(_))
+      .WillOnce(Invoke([r](Context* ctx) { ctx->complete(r); }));
+  }
+
+  void expect_close(MockHttpClient& mock_http_client, int r) {
+    EXPECT_CALL(mock_http_client, close(_))
+      .WillOnce(Invoke([r](Context* ctx) { ctx->complete(r); }));
+  }
+
+  void expect_get_size(MockHttpClient& mock_http_client, uint64_t size, int r) {
+    EXPECT_CALL(mock_http_client, get_size(_, _))
+      .WillOnce(Invoke([size, r](uint64_t* out_size, Context* ctx) {
+        *out_size = size;
+        ctx->complete(r);
+      }));
+  }
+
+  void expect_read(MockHttpClient& mock_http_client, io::Extent byte_extent,
+                   const std::string& data, int r) {
+    std::stringstream byte_range;
+    byte_range << byte_extent.first << "-"
+               << (byte_extent.first + byte_extent.second - 1);
+
+    EXPECT_CALL(mock_http_client, issue(_, _, _))
+      .WillOnce(Invoke(
+        [data, r, byte_range=byte_range.str()]
+        (const boost::beast::http::request<
+           boost::beast::http::empty_body>& request,
+         boost::beast::http::response<
+           boost::beast::http::string_body>* response, Context* ctx) {
+        ASSERT_EQ("bytes=" + byte_range,
+                  request[boost::beast::http::field::range]);
+
+        response->result(boost::beast::http::status::partial_content);
+        response->set(boost::beast::http::field::content_range,
+                      "bytes " + byte_range + "/*");
+        response->body() = data;
+        response->prepare_payload();
+        ctx->complete(r);
+      }));
+  }
+
+  json_spirit::mObject json_object;
+};
+
+TEST_F(TestMockMigrationHttpStream, OpenClose) {
+  MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+  InSequence seq;
+
+  auto mock_http_client = new MockHttpClient();
+  expect_open(*mock_http_client, 0);
+
+  expect_close(*mock_http_client, 0);
+
+  MockHttpStream mock_http_stream(&mock_image_ctx, json_object);
+
+  C_SaferCond ctx1;
+  mock_http_stream.open(&ctx1);
+  ASSERT_EQ(0, ctx1.wait());
+
+  C_SaferCond ctx2;
+  mock_http_stream.close(&ctx2);
+  ASSERT_EQ(0, ctx2.wait());
+}
+
+TEST_F(TestMockMigrationHttpStream, GetSize) {
+  MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+  InSequence seq;
+
+  auto mock_http_client = new MockHttpClient();
+  expect_open(*mock_http_client, 0);
+
+  expect_get_size(*mock_http_client, 128, 0);
+
+  expect_close(*mock_http_client, 0);
+
+  MockHttpStream mock_http_stream(&mock_image_ctx, json_object);
+
+  C_SaferCond ctx1;
+  mock_http_stream.open(&ctx1);
+  ASSERT_EQ(0, ctx1.wait());
+
+  C_SaferCond ctx2;
+  uint64_t size;
+  mock_http_stream.get_size(&size, &ctx2);
+  ASSERT_EQ(0, ctx2.wait());
+  ASSERT_EQ(128, size);
+
+  C_SaferCond ctx3;
+  mock_http_stream.close(&ctx3);
+  ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationHttpStream, Read) {
+  MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+  InSequence seq;
+
+  auto mock_http_client = new MockHttpClient();
+  expect_open(*mock_http_client, 0);
+
+  expect_read(*mock_http_client, {0, 128}, std::string(128, '1'), 0);
+  expect_read(*mock_http_client, {256, 64}, std::string(64, '2'), 0);
+
+  expect_close(*mock_http_client, 0);
+
+  MockHttpStream mock_http_stream(&mock_image_ctx, json_object);
+
+  C_SaferCond ctx1;
+  mock_http_stream.open(&ctx1);
+  ASSERT_EQ(0, ctx1.wait());
+
+  C_SaferCond ctx2;
+  bufferlist bl;
+  mock_http_stream.read({{0, 128}, {256, 64}}, &bl, &ctx2);
+  ASSERT_EQ(192, ctx2.wait());
+
+  bufferlist expect_bl;
+  expect_bl.append(std::string(128, '1'));
+  expect_bl.append(std::string(64, '2'));
+  ASSERT_EQ(expect_bl, bl);
+
+  C_SaferCond ctx3;
+  mock_http_stream.close(&ctx3);
+  ASSERT_EQ(0, ctx3.wait());
+}
+
+} // namespace migration
+} // namespace librbd