]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: migration file stream source
authorJason Dillaman <dillaman@redhat.com>
Mon, 19 Oct 2020 22:28:21 +0000 (18:28 -0400)
committerJason Dillaman <dillaman@redhat.com>
Sun, 25 Oct 2020 23:37:55 +0000 (19:37 -0400)
The file stream helper will translate byte-extent IO read requests
to ASIO file read requests on the specified backing file. This can be
layered with file format helpers to translate image IO extents down
to the backing file.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/CMakeLists.txt
src/librbd/migration/FileStream.cc [new file with mode: 0644]
src/librbd/migration/FileStream.h [new file with mode: 0644]
src/librbd/migration/StreamInterface.h [new file with mode: 0644]
src/test/librbd/CMakeLists.txt
src/test/librbd/migration/test_mock_FileStream.cc [new file with mode: 0644]

index 677b240a5fa18bec74f4717f56a5633a390e4655..424f6dd230b06a9cf22b01659c6ca5bc56b13d83 100644 (file)
@@ -116,6 +116,7 @@ set(librbd_internal_srcs
   managed_lock/ReacquireRequest.cc
   managed_lock/ReleaseRequest.cc
   managed_lock/Utils.cc
+  migration/FileStream.cc
   migration/ImageDispatch.cc
   migration/NativeFormat.cc
   migration/OpenSourceImageRequest.cc
diff --git a/src/librbd/migration/FileStream.cc b/src/librbd/migration/FileStream.cc
new file mode 100644 (file)
index 0000000..e2920e2
--- /dev/null
@@ -0,0 +1,221 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef _LARGEFILE64_SOURCE
+#define _LARGEFILE64_SOURCE
+#endif // _LARGEFILE64_SOURCE
+
+#include "librbd/migration/FileStream.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/AsioEngine.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/asio/Utils.h"
+#include <boost/asio/buffer.hpp>
+#include <boost/asio/post.hpp>
+#include <boost/asio/read.hpp>
+#include <fcntl.h>
+#include <unistd.h>
+
+namespace librbd {
+namespace migration {
+
+static std::string FILE_PATH {"file_path"};
+
+#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::FileStream::ReadRequest " \
+                           << this << " " << __func__ << ": "
+
+template <typename I>
+struct FileStream<I>::ReadRequest {
+  FileStream* file_stream;
+  io::Extents byte_extents;
+  bufferlist* data;
+  Context* on_finish;
+  size_t index = 0;
+
+  ReadRequest(FileStream* file_stream, io::Extents&& byte_extents,
+              bufferlist* data, Context* on_finish)
+    : file_stream(file_stream), byte_extents(std::move(byte_extents)),
+      data(data), on_finish(on_finish) {
+    auto cct = file_stream->m_cct;
+    ldout(cct, 20) << dendl;
+  }
+
+  void send() {
+    data->clear();
+    read();
+  }
+
+  void read() {
+    auto cct = file_stream->m_cct;
+    if (index >= byte_extents.size()) {
+      finish(0);
+      return;
+    }
+
+    auto& byte_extent = byte_extents[index++];
+    ldout(cct, 20) << "byte_extent=" << byte_extent << dendl;
+
+    auto ptr = buffer::ptr_node::create(buffer::create_small_page_aligned(
+      byte_extent.second));
+    auto buffer = boost::asio::mutable_buffer(
+      ptr->c_str(), byte_extent.second);
+    data->push_back(std::move(ptr));
+
+    int r;
+    auto offset = lseek64(file_stream->m_file_no, byte_extent.first, SEEK_SET);
+    if (offset == -1) {
+      r = -errno;
+      lderr(cct) << "failed to seek file stream: " << cpp_strerror(r) << dendl;
+      finish(r);
+      return;
+    }
+
+    boost::system::error_code ec;
+    size_t bytes_read = boost::asio::read(
+      *file_stream->m_stream_descriptor, std::move(buffer), ec);
+    r = -ec.value();
+    if (r < 0 && r != -ENOENT) {
+      lderr(cct) << "failed to read from file stream: " << cpp_strerror(r)
+                 << dendl;
+      finish(r);
+      return;
+    } else if (bytes_read < byte_extent.second) {
+      lderr(cct) << "failed to read " << byte_extent.second << " bytes from "
+                 << "file stream" << dendl;
+      finish(-ERANGE);
+      return;
+    }
+
+    // re-queue the remainder of the read requests
+    boost::asio::post(file_stream->m_strand, [this]() { read(); });
+  }
+
+  void finish(int r) {
+    auto cct = file_stream->m_cct;
+    ldout(cct, 20) << "r=" << r << dendl;
+
+    if (r < 0) {
+      data->clear();
+    }
+
+    on_finish->complete(r);
+    delete this;
+  }
+};
+
+#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::FileStream: " << this \
+                           << " " << __func__ << ": "
+
+template <typename I>
+FileStream<I>::FileStream(I* image_ctx, const json_spirit::mObject& json_object)
+  : m_cct(image_ctx->cct), m_asio_engine(image_ctx->asio_engine),
+    m_json_object(json_object), m_strand(*m_asio_engine) {
+}
+
+template <typename I>
+FileStream<I>::~FileStream() {
+  if (m_file_no != -1) {
+    ::close(m_file_no);
+  }
+}
+
+#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+template <typename I>
+void FileStream<I>::open(Context* on_finish) {
+  auto& file_path_value = m_json_object[FILE_PATH];
+  if (file_path_value.type() != json_spirit::str_type) {
+    lderr(m_cct) << "failed to locate '" << FILE_PATH << "' key" << dendl;
+    on_finish->complete(-EINVAL);
+  }
+
+  auto& file_path = file_path_value.get_str();
+  ldout(m_cct, 10) << "file_path=" << file_path << dendl;
+
+  m_file_no = ::open(file_path.c_str(), O_RDONLY);
+  if (m_file_no < 0) {
+    int r = -errno;
+    lderr(m_cct) << "failed to open file stream '" << file_path << "': "
+                 << cpp_strerror(r) << dendl;
+    on_finish->complete(r);
+    return;
+  }
+
+  m_stream_descriptor = std::make_optional<
+    boost::asio::posix::stream_descriptor>(m_strand, m_file_no);
+  on_finish->complete(0);
+}
+
+template <typename I>
+void FileStream<I>::close(Context* on_finish) {
+  ldout(m_cct, 10) << dendl;
+
+  m_stream_descriptor.reset();
+  on_finish->complete(0);
+}
+
+template <typename I>
+void FileStream<I>::get_size(uint64_t* size, Context* on_finish) {
+  ldout(m_cct, 10) << dendl;
+
+  // execute IO operations in a single strand to prevent seek races
+  boost::asio::post(
+    m_strand, [this, size, on_finish]() {
+      auto offset = lseek64(m_file_no, 0, SEEK_END);
+      if (offset == -1) {
+        int r = -errno;
+        lderr(m_cct) << "failed to seek to file end: " << cpp_strerror(r)
+                     << dendl;
+        on_finish->complete(r);
+        return;
+      }
+
+      ldout(m_cct, 10) << "size=" << offset << dendl;
+      *size = offset;
+      on_finish->complete(0);
+    });
+}
+
+template <typename I>
+void FileStream<I>::read(io::Extents&& byte_extents, bufferlist* data,
+                         Context* on_finish) {
+  ldout(m_cct, 20) << byte_extents << dendl;
+
+  auto ctx = new ReadRequest(this, std::move(byte_extents), data, on_finish);
+
+  // execute IO operations in a single strand to prevent seek races
+  boost::asio::post(m_strand, [ctx]() { ctx->send(); });
+}
+
+#else  // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+template <typename I>
+void FileStream<I>::open(Context* on_finish) {
+  on_finish->complete(-EIO);
+}
+
+template <typename I>
+void FileStream<I>::close(Context* on_finish) {
+  on_finish->complete(-EIO);
+}
+
+template <typename I>
+void FileStream<I>::read(io::Extents&& byte_extents, bufferlist* data,
+                         Context* on_finish) {
+  on_finish->complete(-EIO);
+}
+
+#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::FileStream<librbd::ImageCtx>;
diff --git a/src/librbd/migration/FileStream.h b/src/librbd/migration/FileStream.h
new file mode 100644 (file)
index 0000000..61c6906
--- /dev/null
@@ -0,0 +1,67 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_FILE_STREAM_H
+#define CEPH_LIBRBD_MIGRATION_FILE_STREAM_H
+
+#include "include/int_types.h"
+#include "librbd/migration/StreamInterface.h"
+#include <boost/asio/io_context_strand.hpp>
+#include <boost/asio/posix/basic_stream_descriptor.hpp>
+#include <json_spirit/json_spirit.h>
+#include <memory>
+#include <string>
+
+struct Context;
+
+namespace librbd {
+
+struct AsioEngine;
+struct ImageCtx;
+
+namespace migration {
+
+template <typename ImageCtxT>
+class FileStream : public StreamInterface {
+public:
+  static FileStream* create(ImageCtxT* image_ctx,
+                            const json_spirit::mObject& json_object) {
+    return new FileStream(image_ctx, json_object);
+  }
+
+  FileStream(ImageCtxT* image_ctx, const json_spirit::mObject& json_object);
+  ~FileStream() override;
+
+  FileStream(const FileStream&) = delete;
+  FileStream& operator=(const FileStream&) = delete;
+
+  void open(Context* on_finish);
+  void close(Context* on_finish);
+
+  void get_size(uint64_t* size, Context* on_finish) override;
+
+  void read(io::Extents&& byte_extents, bufferlist* data,
+            Context* on_finish);
+
+private:
+  CephContext* m_cct;
+  std::shared_ptr<AsioEngine> m_asio_engine;
+  json_spirit::mObject m_json_object;
+
+  boost::asio::io_context::strand m_strand;
+#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+  std::optional<boost::asio::posix::stream_descriptor> m_stream_descriptor;
+
+  struct ReadRequest;
+
+#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+  int m_file_no = -1;
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::FileStream<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_FILE_STREAM_H
diff --git a/src/librbd/migration/StreamInterface.h b/src/librbd/migration/StreamInterface.h
new file mode 100644 (file)
index 0000000..782a9a5
--- /dev/null
@@ -0,0 +1,32 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H
+#define CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H
+
+#include "include/buffer_fwd.h"
+#include "include/int_types.h"
+#include "librbd/io/Types.h"
+
+struct Context;
+
+namespace librbd {
+namespace migration {
+
+struct StreamInterface {
+  virtual ~StreamInterface() {
+  }
+
+  virtual void open(Context* on_finish) = 0;
+  virtual void close(Context* on_finish) = 0;
+
+  virtual void get_size(uint64_t* size, Context* on_finish) = 0;
+
+  virtual void read(io::Extents&& byte_extents, bufferlist* data,
+                    Context* on_finish) = 0;
+};
+
+} // namespace migration
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H
index d2f2cad05da716eebe9ce4e376620b0255761f12..80a6266ea77f88b1a96b0bc9bb9301b8e43d40c1 100644 (file)
@@ -89,6 +89,7 @@ set(unittest_librbd_srcs
   managed_lock/test_mock_GetLockerRequest.cc
   managed_lock/test_mock_ReacquireRequest.cc
   managed_lock/test_mock_ReleaseRequest.cc
+  migration/test_mock_FileStream.cc
   mirror/snapshot/test_mock_CreateNonPrimaryRequest.cc
   mirror/snapshot/test_mock_CreatePrimaryRequest.cc
   mirror/snapshot/test_mock_ImageMeta.cc
diff --git a/src/test/librbd/migration/test_mock_FileStream.cc b/src/test/librbd/migration/test_mock_FileStream.cc
new file mode 100644 (file)
index 0000000..3688b56
--- /dev/null
@@ -0,0 +1,217 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/test_mock_fixture.h"
+#include "test/librbd/test_support.h"
+#include "include/rbd_types.h"
+#include "common/ceph_mutex.h"
+#include "librbd/migration/FileStream.h"
+#include "gtest/gtest.h"
+#include "gmock/gmock.h"
+#include "json_spirit/json_spirit.h"
+#if __has_include(<filesystem>)
+#include <filesystem>
+namespace fs = std::filesystem;
+#else
+#include <experimental/filesystem>
+namespace fs = std::experimental::filesystem;
+#endif
+
+namespace librbd {
+namespace {
+
+struct MockTestImageCtx : public MockImageCtx {
+  MockTestImageCtx(ImageCtx &image_ctx) : MockImageCtx(image_ctx) {
+  }
+};
+
+} // anonymous namespace
+} // namespace librbd
+
+#include "librbd/migration/FileStream.cc"
+
+namespace librbd {
+namespace migration {
+
+using ::testing::Invoke;
+
+class TestMockMigrationFileStream : public TestMockFixture {
+public:
+  typedef FileStream<MockTestImageCtx> MockFileStream;
+
+  librbd::ImageCtx *m_image_ctx;
+
+  void SetUp() override {
+    TestMockFixture::SetUp();
+
+    ASSERT_EQ(0, open_image(m_image_name, &m_image_ctx));
+    file_name = fs::temp_directory_path() / "TestMockMigrationFileStream";
+    file_name += stringify(getpid());
+    json_object["file_path"] = file_name;
+  }
+
+  void TearDown() override {
+    fs::remove(file_name);
+    TestMockFixture::TearDown();
+  }
+
+  std::string file_name;
+  json_spirit::mObject json_object;
+};
+
+TEST_F(TestMockMigrationFileStream, OpenClose) {
+  MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+  bufferlist bl;
+  ASSERT_EQ(0, bl.write_file(file_name.c_str()));
+
+  MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+  C_SaferCond ctx1;
+  mock_file_stream.open(&ctx1);
+  ASSERT_EQ(0, ctx1.wait());
+
+  C_SaferCond ctx2;
+  mock_file_stream.close(&ctx2);
+  ASSERT_EQ(0, ctx2.wait());
+}
+
+TEST_F(TestMockMigrationFileStream, GetSize) {
+  MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+  bufferlist expect_bl;
+  expect_bl.append(std::string(128, '1'));
+  ASSERT_EQ(0, expect_bl.write_file(file_name.c_str()));
+
+  MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+  C_SaferCond ctx1;
+  mock_file_stream.open(&ctx1);
+  ASSERT_EQ(0, ctx1.wait());
+
+  C_SaferCond ctx2;
+  uint64_t size;
+  mock_file_stream.get_size(&size, &ctx2);
+  ASSERT_EQ(0, ctx2.wait());
+  ASSERT_EQ(128, size);
+
+  C_SaferCond ctx3;
+  mock_file_stream.close(&ctx3);
+  ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationFileStream, Read) {
+  MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+  bufferlist expect_bl;
+  expect_bl.append(std::string(128, '1'));
+  ASSERT_EQ(0, expect_bl.write_file(file_name.c_str()));
+
+  MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+  C_SaferCond ctx1;
+  mock_file_stream.open(&ctx1);
+  ASSERT_EQ(0, ctx1.wait());
+
+  C_SaferCond ctx2;
+  bufferlist bl;
+  mock_file_stream.read({{0, 128}}, &bl, &ctx2);
+  ASSERT_EQ(0, ctx2.wait());
+  ASSERT_EQ(expect_bl, bl);
+
+  C_SaferCond ctx3;
+  mock_file_stream.close(&ctx3);
+  ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationFileStream, SeekRead) {
+  MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+  bufferlist write_bl;
+  write_bl.append(std::string(32, '1'));
+  write_bl.append(std::string(64, '2'));
+  write_bl.append(std::string(16, '3'));
+  ASSERT_EQ(0, write_bl.write_file(file_name.c_str()));
+
+  MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+  C_SaferCond ctx1;
+  mock_file_stream.open(&ctx1);
+  ASSERT_EQ(0, ctx1.wait());
+
+  C_SaferCond ctx2;
+  bufferlist bl;
+  mock_file_stream.read({{96, 16}, {32, 64}, {0, 32}}, &bl, &ctx2);
+  ASSERT_EQ(0, ctx2.wait());
+
+  bufferlist expect_bl;
+  expect_bl.append(std::string(16, '3'));
+  expect_bl.append(std::string(64, '2'));
+  expect_bl.append(std::string(32, '1'));
+  ASSERT_EQ(expect_bl, bl);
+
+  C_SaferCond ctx3;
+  mock_file_stream.close(&ctx3);
+  ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationFileStream, DNE) {
+  MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+  MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+  C_SaferCond ctx1;
+  mock_file_stream.open(&ctx1);
+  ASSERT_EQ(-ENOENT, ctx1.wait());
+
+  C_SaferCond ctx2;
+  mock_file_stream.close(&ctx2);
+  ASSERT_EQ(0, ctx2.wait());
+}
+
+TEST_F(TestMockMigrationFileStream, SeekError) {
+  MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+  bufferlist bl;
+  ASSERT_EQ(0, bl.write_file(file_name.c_str()));
+
+  MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+  C_SaferCond ctx1;
+  mock_file_stream.open(&ctx1);
+  ASSERT_EQ(0, ctx1.wait());
+
+  C_SaferCond ctx2;
+  mock_file_stream.read({{128, 128}}, &bl, &ctx2);
+  ASSERT_EQ(-ERANGE, ctx2.wait());
+
+  C_SaferCond ctx3;
+  mock_file_stream.close(&ctx3);
+  ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationFileStream, ShortReadError) {
+  MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+  bufferlist expect_bl;
+  expect_bl.append(std::string(128, '1'));
+  ASSERT_EQ(0, expect_bl.write_file(file_name.c_str()));
+
+  MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+  C_SaferCond ctx1;
+  mock_file_stream.open(&ctx1);
+  ASSERT_EQ(0, ctx1.wait());
+
+  C_SaferCond ctx2;
+  bufferlist bl;
+  mock_file_stream.read({{0, 256}}, &bl, &ctx2);
+  ASSERT_EQ(-ERANGE, ctx2.wait());
+
+  C_SaferCond ctx3;
+  mock_file_stream.close(&ctx3);
+  ASSERT_EQ(0, ctx3.wait());
+}
+
+} // namespace migration
+} // namespace librbd