managed_lock/ReacquireRequest.cc
managed_lock/ReleaseRequest.cc
managed_lock/Utils.cc
+ migration/FileStream.cc
migration/ImageDispatch.cc
migration/NativeFormat.cc
migration/OpenSourceImageRequest.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef _LARGEFILE64_SOURCE
+#define _LARGEFILE64_SOURCE
+#endif // _LARGEFILE64_SOURCE
+
+#include "librbd/migration/FileStream.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/AsioEngine.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/asio/Utils.h"
+#include <boost/asio/buffer.hpp>
+#include <boost/asio/post.hpp>
+#include <boost/asio/read.hpp>
+#include <fcntl.h>
+#include <unistd.h>
+
+namespace librbd {
+namespace migration {
+
+static std::string FILE_PATH {"file_path"};
+
+#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::FileStream::ReadRequest " \
+ << this << " " << __func__ << ": "
+
+template <typename I>
+struct FileStream<I>::ReadRequest {
+ FileStream* file_stream;
+ io::Extents byte_extents;
+ bufferlist* data;
+ Context* on_finish;
+ size_t index = 0;
+
+ ReadRequest(FileStream* file_stream, io::Extents&& byte_extents,
+ bufferlist* data, Context* on_finish)
+ : file_stream(file_stream), byte_extents(std::move(byte_extents)),
+ data(data), on_finish(on_finish) {
+ auto cct = file_stream->m_cct;
+ ldout(cct, 20) << dendl;
+ }
+
+ void send() {
+ data->clear();
+ read();
+ }
+
+ void read() {
+ auto cct = file_stream->m_cct;
+ if (index >= byte_extents.size()) {
+ finish(0);
+ return;
+ }
+
+ auto& byte_extent = byte_extents[index++];
+ ldout(cct, 20) << "byte_extent=" << byte_extent << dendl;
+
+ auto ptr = buffer::ptr_node::create(buffer::create_small_page_aligned(
+ byte_extent.second));
+ auto buffer = boost::asio::mutable_buffer(
+ ptr->c_str(), byte_extent.second);
+ data->push_back(std::move(ptr));
+
+ int r;
+ auto offset = lseek64(file_stream->m_file_no, byte_extent.first, SEEK_SET);
+ if (offset == -1) {
+ r = -errno;
+ lderr(cct) << "failed to seek file stream: " << cpp_strerror(r) << dendl;
+ finish(r);
+ return;
+ }
+
+ boost::system::error_code ec;
+ size_t bytes_read = boost::asio::read(
+ *file_stream->m_stream_descriptor, std::move(buffer), ec);
+ r = -ec.value();
+ if (r < 0 && r != -ENOENT) {
+ lderr(cct) << "failed to read from file stream: " << cpp_strerror(r)
+ << dendl;
+ finish(r);
+ return;
+ } else if (bytes_read < byte_extent.second) {
+ lderr(cct) << "failed to read " << byte_extent.second << " bytes from "
+ << "file stream" << dendl;
+ finish(-ERANGE);
+ return;
+ }
+
+ // re-queue the remainder of the read requests
+ boost::asio::post(file_stream->m_strand, [this]() { read(); });
+ }
+
+ void finish(int r) {
+ auto cct = file_stream->m_cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ data->clear();
+ }
+
+ on_finish->complete(r);
+ delete this;
+ }
+};
+
+#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::FileStream: " << this \
+ << " " << __func__ << ": "
+
+template <typename I>
+FileStream<I>::FileStream(I* image_ctx, const json_spirit::mObject& json_object)
+ : m_cct(image_ctx->cct), m_asio_engine(image_ctx->asio_engine),
+ m_json_object(json_object), m_strand(*m_asio_engine) {
+}
+
+template <typename I>
+FileStream<I>::~FileStream() {
+ if (m_file_no != -1) {
+ ::close(m_file_no);
+ }
+}
+
+#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+template <typename I>
+void FileStream<I>::open(Context* on_finish) {
+ auto& file_path_value = m_json_object[FILE_PATH];
+ if (file_path_value.type() != json_spirit::str_type) {
+ lderr(m_cct) << "failed to locate '" << FILE_PATH << "' key" << dendl;
+ on_finish->complete(-EINVAL);
+ }
+
+ auto& file_path = file_path_value.get_str();
+ ldout(m_cct, 10) << "file_path=" << file_path << dendl;
+
+ m_file_no = ::open(file_path.c_str(), O_RDONLY);
+ if (m_file_no < 0) {
+ int r = -errno;
+ lderr(m_cct) << "failed to open file stream '" << file_path << "': "
+ << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ m_stream_descriptor = std::make_optional<
+ boost::asio::posix::stream_descriptor>(m_strand, m_file_no);
+ on_finish->complete(0);
+}
+
+template <typename I>
+void FileStream<I>::close(Context* on_finish) {
+ ldout(m_cct, 10) << dendl;
+
+ m_stream_descriptor.reset();
+ on_finish->complete(0);
+}
+
+template <typename I>
+void FileStream<I>::get_size(uint64_t* size, Context* on_finish) {
+ ldout(m_cct, 10) << dendl;
+
+ // execute IO operations in a single strand to prevent seek races
+ boost::asio::post(
+ m_strand, [this, size, on_finish]() {
+ auto offset = lseek64(m_file_no, 0, SEEK_END);
+ if (offset == -1) {
+ int r = -errno;
+ lderr(m_cct) << "failed to seek to file end: " << cpp_strerror(r)
+ << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ ldout(m_cct, 10) << "size=" << offset << dendl;
+ *size = offset;
+ on_finish->complete(0);
+ });
+}
+
+template <typename I>
+void FileStream<I>::read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) {
+ ldout(m_cct, 20) << byte_extents << dendl;
+
+ auto ctx = new ReadRequest(this, std::move(byte_extents), data, on_finish);
+
+ // execute IO operations in a single strand to prevent seek races
+ boost::asio::post(m_strand, [ctx]() { ctx->send(); });
+}
+
+#else // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+template <typename I>
+void FileStream<I>::open(Context* on_finish) {
+ on_finish->complete(-EIO);
+}
+
+template <typename I>
+void FileStream<I>::close(Context* on_finish) {
+ on_finish->complete(-EIO);
+}
+
+template <typename I>
+void FileStream<I>::read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) {
+ on_finish->complete(-EIO);
+}
+
+#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::FileStream<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_FILE_STREAM_H
+#define CEPH_LIBRBD_MIGRATION_FILE_STREAM_H
+
+#include "include/int_types.h"
+#include "librbd/migration/StreamInterface.h"
+#include <boost/asio/io_context_strand.hpp>
+#include <boost/asio/posix/basic_stream_descriptor.hpp>
+#include <json_spirit/json_spirit.h>
+#include <memory>
+#include <string>
+
+struct Context;
+
+namespace librbd {
+
+struct AsioEngine;
+struct ImageCtx;
+
+namespace migration {
+
+template <typename ImageCtxT>
+class FileStream : public StreamInterface {
+public:
+ static FileStream* create(ImageCtxT* image_ctx,
+ const json_spirit::mObject& json_object) {
+ return new FileStream(image_ctx, json_object);
+ }
+
+ FileStream(ImageCtxT* image_ctx, const json_spirit::mObject& json_object);
+ ~FileStream() override;
+
+ FileStream(const FileStream&) = delete;
+ FileStream& operator=(const FileStream&) = delete;
+
+ void open(Context* on_finish);
+ void close(Context* on_finish);
+
+ void get_size(uint64_t* size, Context* on_finish) override;
+
+ void read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish);
+
+private:
+ CephContext* m_cct;
+ std::shared_ptr<AsioEngine> m_asio_engine;
+ json_spirit::mObject m_json_object;
+
+ boost::asio::io_context::strand m_strand;
+#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+ std::optional<boost::asio::posix::stream_descriptor> m_stream_descriptor;
+
+ struct ReadRequest;
+
+#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+ int m_file_no = -1;
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::FileStream<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_FILE_STREAM_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H
+#define CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H
+
+#include "include/buffer_fwd.h"
+#include "include/int_types.h"
+#include "librbd/io/Types.h"
+
+struct Context;
+
+namespace librbd {
+namespace migration {
+
+struct StreamInterface {
+ virtual ~StreamInterface() {
+ }
+
+ virtual void open(Context* on_finish) = 0;
+ virtual void close(Context* on_finish) = 0;
+
+ virtual void get_size(uint64_t* size, Context* on_finish) = 0;
+
+ virtual void read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) = 0;
+};
+
+} // namespace migration
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H
managed_lock/test_mock_GetLockerRequest.cc
managed_lock/test_mock_ReacquireRequest.cc
managed_lock/test_mock_ReleaseRequest.cc
+ migration/test_mock_FileStream.cc
mirror/snapshot/test_mock_CreateNonPrimaryRequest.cc
mirror/snapshot/test_mock_CreatePrimaryRequest.cc
mirror/snapshot/test_mock_ImageMeta.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/test_mock_fixture.h"
+#include "test/librbd/test_support.h"
+#include "include/rbd_types.h"
+#include "common/ceph_mutex.h"
+#include "librbd/migration/FileStream.h"
+#include "gtest/gtest.h"
+#include "gmock/gmock.h"
+#include "json_spirit/json_spirit.h"
+#if __has_include(<filesystem>)
+#include <filesystem>
+namespace fs = std::filesystem;
+#else
+#include <experimental/filesystem>
+namespace fs = std::experimental::filesystem;
+#endif
+
+namespace librbd {
+namespace {
+
+struct MockTestImageCtx : public MockImageCtx {
+ MockTestImageCtx(ImageCtx &image_ctx) : MockImageCtx(image_ctx) {
+ }
+};
+
+} // anonymous namespace
+} // namespace librbd
+
+#include "librbd/migration/FileStream.cc"
+
+namespace librbd {
+namespace migration {
+
+using ::testing::Invoke;
+
+class TestMockMigrationFileStream : public TestMockFixture {
+public:
+ typedef FileStream<MockTestImageCtx> MockFileStream;
+
+ librbd::ImageCtx *m_image_ctx;
+
+ void SetUp() override {
+ TestMockFixture::SetUp();
+
+ ASSERT_EQ(0, open_image(m_image_name, &m_image_ctx));
+ file_name = fs::temp_directory_path() / "TestMockMigrationFileStream";
+ file_name += stringify(getpid());
+ json_object["file_path"] = file_name;
+ }
+
+ void TearDown() override {
+ fs::remove(file_name);
+ TestMockFixture::TearDown();
+ }
+
+ std::string file_name;
+ json_spirit::mObject json_object;
+};
+
+TEST_F(TestMockMigrationFileStream, OpenClose) {
+ MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+ bufferlist bl;
+ ASSERT_EQ(0, bl.write_file(file_name.c_str()));
+
+ MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+ C_SaferCond ctx1;
+ mock_file_stream.open(&ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ C_SaferCond ctx2;
+ mock_file_stream.close(&ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+}
+
+TEST_F(TestMockMigrationFileStream, GetSize) {
+ MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+ bufferlist expect_bl;
+ expect_bl.append(std::string(128, '1'));
+ ASSERT_EQ(0, expect_bl.write_file(file_name.c_str()));
+
+ MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+ C_SaferCond ctx1;
+ mock_file_stream.open(&ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ C_SaferCond ctx2;
+ uint64_t size;
+ mock_file_stream.get_size(&size, &ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+ ASSERT_EQ(128, size);
+
+ C_SaferCond ctx3;
+ mock_file_stream.close(&ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationFileStream, Read) {
+ MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+ bufferlist expect_bl;
+ expect_bl.append(std::string(128, '1'));
+ ASSERT_EQ(0, expect_bl.write_file(file_name.c_str()));
+
+ MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+ C_SaferCond ctx1;
+ mock_file_stream.open(&ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ C_SaferCond ctx2;
+ bufferlist bl;
+ mock_file_stream.read({{0, 128}}, &bl, &ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+ ASSERT_EQ(expect_bl, bl);
+
+ C_SaferCond ctx3;
+ mock_file_stream.close(&ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationFileStream, SeekRead) {
+ MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+ bufferlist write_bl;
+ write_bl.append(std::string(32, '1'));
+ write_bl.append(std::string(64, '2'));
+ write_bl.append(std::string(16, '3'));
+ ASSERT_EQ(0, write_bl.write_file(file_name.c_str()));
+
+ MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+ C_SaferCond ctx1;
+ mock_file_stream.open(&ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ C_SaferCond ctx2;
+ bufferlist bl;
+ mock_file_stream.read({{96, 16}, {32, 64}, {0, 32}}, &bl, &ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+
+ bufferlist expect_bl;
+ expect_bl.append(std::string(16, '3'));
+ expect_bl.append(std::string(64, '2'));
+ expect_bl.append(std::string(32, '1'));
+ ASSERT_EQ(expect_bl, bl);
+
+ C_SaferCond ctx3;
+ mock_file_stream.close(&ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationFileStream, DNE) {
+ MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+ MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+ C_SaferCond ctx1;
+ mock_file_stream.open(&ctx1);
+ ASSERT_EQ(-ENOENT, ctx1.wait());
+
+ C_SaferCond ctx2;
+ mock_file_stream.close(&ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+}
+
+TEST_F(TestMockMigrationFileStream, SeekError) {
+ MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+ bufferlist bl;
+ ASSERT_EQ(0, bl.write_file(file_name.c_str()));
+
+ MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+ C_SaferCond ctx1;
+ mock_file_stream.open(&ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ C_SaferCond ctx2;
+ mock_file_stream.read({{128, 128}}, &bl, &ctx2);
+ ASSERT_EQ(-ERANGE, ctx2.wait());
+
+ C_SaferCond ctx3;
+ mock_file_stream.close(&ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+}
+
+TEST_F(TestMockMigrationFileStream, ShortReadError) {
+ MockTestImageCtx mock_image_ctx(*m_image_ctx);
+
+ bufferlist expect_bl;
+ expect_bl.append(std::string(128, '1'));
+ ASSERT_EQ(0, expect_bl.write_file(file_name.c_str()));
+
+ MockFileStream mock_file_stream(&mock_image_ctx, json_object);
+
+ C_SaferCond ctx1;
+ mock_file_stream.open(&ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ C_SaferCond ctx2;
+ bufferlist bl;
+ mock_file_stream.read({{0, 256}}, &bl, &ctx2);
+ ASSERT_EQ(-ERANGE, ctx2.wait());
+
+ C_SaferCond ctx3;
+ mock_file_stream.close(&ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+}
+
+} // namespace migration
+} // namespace librbd