From: Jason Dillaman Date: Mon, 19 Oct 2020 22:28:21 +0000 (-0400) Subject: librbd: migration file stream source X-Git-Tag: v16.1.0~752^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=951571879bfaee0df9b82473d551273e0009ac03;p=ceph.git librbd: migration file stream source The file stream helper will translate byte-extent IO read requests to ASIO file read requests on the specified backing file. This can be layered with file format helpers to translate image IO extents down to the backing file. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt index 677b240a5fa1..424f6dd230b0 100644 --- a/src/librbd/CMakeLists.txt +++ b/src/librbd/CMakeLists.txt @@ -116,6 +116,7 @@ set(librbd_internal_srcs managed_lock/ReacquireRequest.cc managed_lock/ReleaseRequest.cc managed_lock/Utils.cc + migration/FileStream.cc migration/ImageDispatch.cc migration/NativeFormat.cc migration/OpenSourceImageRequest.cc diff --git a/src/librbd/migration/FileStream.cc b/src/librbd/migration/FileStream.cc new file mode 100644 index 000000000000..e2920e27a144 --- /dev/null +++ b/src/librbd/migration/FileStream.cc @@ -0,0 +1,221 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef _LARGEFILE64_SOURCE +#define _LARGEFILE64_SOURCE +#endif // _LARGEFILE64_SOURCE + +#include "librbd/migration/FileStream.h" +#include "common/dout.h" +#include "common/errno.h" +#include "librbd/AsioEngine.h" +#include "librbd/ImageCtx.h" +#include "librbd/asio/Utils.h" +#include +#include +#include +#include +#include + +namespace librbd { +namespace migration { + +static std::string FILE_PATH {"file_path"}; + +#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::FileStream::ReadRequest " \ + << this << " " << __func__ << ": " + +template +struct FileStream::ReadRequest { + FileStream* file_stream; + io::Extents byte_extents; + bufferlist* data; + Context* on_finish; + size_t index = 0; + + ReadRequest(FileStream* file_stream, io::Extents&& byte_extents, + bufferlist* data, Context* on_finish) + : file_stream(file_stream), byte_extents(std::move(byte_extents)), + data(data), on_finish(on_finish) { + auto cct = file_stream->m_cct; + ldout(cct, 20) << dendl; + } + + void send() { + data->clear(); + read(); + } + + void read() { + auto cct = file_stream->m_cct; + if (index >= byte_extents.size()) { + finish(0); + return; + } + + auto& byte_extent = byte_extents[index++]; + ldout(cct, 20) << "byte_extent=" << byte_extent << dendl; + + auto ptr = buffer::ptr_node::create(buffer::create_small_page_aligned( + byte_extent.second)); + auto buffer = boost::asio::mutable_buffer( + ptr->c_str(), byte_extent.second); + data->push_back(std::move(ptr)); + + int r; + auto offset = lseek64(file_stream->m_file_no, byte_extent.first, SEEK_SET); + if (offset == -1) { + r = -errno; + lderr(cct) << "failed to seek file stream: " << cpp_strerror(r) << dendl; + finish(r); + return; + } + + boost::system::error_code ec; + size_t bytes_read = boost::asio::read( + *file_stream->m_stream_descriptor, std::move(buffer), ec); + r = -ec.value(); + if (r < 0 && r != -ENOENT) { + lderr(cct) << "failed to read from file stream: " << cpp_strerror(r) + << dendl; + finish(r); + return; + } else if (bytes_read < byte_extent.second) { + lderr(cct) << "failed to read " << byte_extent.second << " bytes from " + << "file stream" << dendl; + finish(-ERANGE); + return; + } + + // re-queue the remainder of the read requests + boost::asio::post(file_stream->m_strand, [this]() { read(); }); + } + + void finish(int r) { + auto cct = file_stream->m_cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + data->clear(); + } + + on_finish->complete(r); + delete this; + } +}; + +#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::FileStream: " << this \ + << " " << __func__ << ": " + +template +FileStream::FileStream(I* image_ctx, const json_spirit::mObject& json_object) + : m_cct(image_ctx->cct), m_asio_engine(image_ctx->asio_engine), + m_json_object(json_object), m_strand(*m_asio_engine) { +} + +template +FileStream::~FileStream() { + if (m_file_no != -1) { + ::close(m_file_no); + } +} + +#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + +template +void FileStream::open(Context* on_finish) { + auto& file_path_value = m_json_object[FILE_PATH]; + if (file_path_value.type() != json_spirit::str_type) { + lderr(m_cct) << "failed to locate '" << FILE_PATH << "' key" << dendl; + on_finish->complete(-EINVAL); + } + + auto& file_path = file_path_value.get_str(); + ldout(m_cct, 10) << "file_path=" << file_path << dendl; + + m_file_no = ::open(file_path.c_str(), O_RDONLY); + if (m_file_no < 0) { + int r = -errno; + lderr(m_cct) << "failed to open file stream '" << file_path << "': " + << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + + m_stream_descriptor = std::make_optional< + boost::asio::posix::stream_descriptor>(m_strand, m_file_no); + on_finish->complete(0); +} + +template +void FileStream::close(Context* on_finish) { + ldout(m_cct, 10) << dendl; + + m_stream_descriptor.reset(); + on_finish->complete(0); +} + +template +void FileStream::get_size(uint64_t* size, Context* on_finish) { + ldout(m_cct, 10) << dendl; + + // execute IO operations in a single strand to prevent seek races + boost::asio::post( + m_strand, [this, size, on_finish]() { + auto offset = lseek64(m_file_no, 0, SEEK_END); + if (offset == -1) { + int r = -errno; + lderr(m_cct) << "failed to seek to file end: " << cpp_strerror(r) + << dendl; + on_finish->complete(r); + return; + } + + ldout(m_cct, 10) << "size=" << offset << dendl; + *size = offset; + on_finish->complete(0); + }); +} + +template +void FileStream::read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) { + ldout(m_cct, 20) << byte_extents << dendl; + + auto ctx = new ReadRequest(this, std::move(byte_extents), data, on_finish); + + // execute IO operations in a single strand to prevent seek races + boost::asio::post(m_strand, [ctx]() { ctx->send(); }); +} + +#else // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + +template +void FileStream::open(Context* on_finish) { + on_finish->complete(-EIO); +} + +template +void FileStream::close(Context* on_finish) { + on_finish->complete(-EIO); +} + +template +void FileStream::read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) { + on_finish->complete(-EIO); +} + +#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + +} // namespace migration +} // namespace librbd + +template class librbd::migration::FileStream; diff --git a/src/librbd/migration/FileStream.h b/src/librbd/migration/FileStream.h new file mode 100644 index 000000000000..61c6906555c3 --- /dev/null +++ b/src/librbd/migration/FileStream.h @@ -0,0 +1,67 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_FILE_STREAM_H +#define CEPH_LIBRBD_MIGRATION_FILE_STREAM_H + +#include "include/int_types.h" +#include "librbd/migration/StreamInterface.h" +#include +#include +#include +#include +#include + +struct Context; + +namespace librbd { + +struct AsioEngine; +struct ImageCtx; + +namespace migration { + +template +class FileStream : public StreamInterface { +public: + static FileStream* create(ImageCtxT* image_ctx, + const json_spirit::mObject& json_object) { + return new FileStream(image_ctx, json_object); + } + + FileStream(ImageCtxT* image_ctx, const json_spirit::mObject& json_object); + ~FileStream() override; + + FileStream(const FileStream&) = delete; + FileStream& operator=(const FileStream&) = delete; + + void open(Context* on_finish); + void close(Context* on_finish); + + void get_size(uint64_t* size, Context* on_finish) override; + + void read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish); + +private: + CephContext* m_cct; + std::shared_ptr m_asio_engine; + json_spirit::mObject m_json_object; + + boost::asio::io_context::strand m_strand; +#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + std::optional m_stream_descriptor; + + struct ReadRequest; + +#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + + int m_file_no = -1; +}; + +} // namespace migration +} // namespace librbd + +extern template class librbd::migration::FileStream; + +#endif // CEPH_LIBRBD_MIGRATION_FILE_STREAM_H diff --git a/src/librbd/migration/StreamInterface.h b/src/librbd/migration/StreamInterface.h new file mode 100644 index 000000000000..782a9a5f8d59 --- /dev/null +++ b/src/librbd/migration/StreamInterface.h @@ -0,0 +1,32 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H +#define CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H + +#include "include/buffer_fwd.h" +#include "include/int_types.h" +#include "librbd/io/Types.h" + +struct Context; + +namespace librbd { +namespace migration { + +struct StreamInterface { + virtual ~StreamInterface() { + } + + virtual void open(Context* on_finish) = 0; + virtual void close(Context* on_finish) = 0; + + virtual void get_size(uint64_t* size, Context* on_finish) = 0; + + virtual void read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) = 0; +}; + +} // namespace migration +} // namespace librbd + +#endif // CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H diff --git a/src/test/librbd/CMakeLists.txt b/src/test/librbd/CMakeLists.txt index d2f2cad05da7..80a6266ea77f 100644 --- a/src/test/librbd/CMakeLists.txt +++ b/src/test/librbd/CMakeLists.txt @@ -89,6 +89,7 @@ set(unittest_librbd_srcs managed_lock/test_mock_GetLockerRequest.cc managed_lock/test_mock_ReacquireRequest.cc managed_lock/test_mock_ReleaseRequest.cc + migration/test_mock_FileStream.cc mirror/snapshot/test_mock_CreateNonPrimaryRequest.cc mirror/snapshot/test_mock_CreatePrimaryRequest.cc mirror/snapshot/test_mock_ImageMeta.cc diff --git a/src/test/librbd/migration/test_mock_FileStream.cc b/src/test/librbd/migration/test_mock_FileStream.cc new file mode 100644 index 000000000000..3688b56dd556 --- /dev/null +++ b/src/test/librbd/migration/test_mock_FileStream.cc @@ -0,0 +1,217 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/librbd/test_mock_fixture.h" +#include "test/librbd/test_support.h" +#include "include/rbd_types.h" +#include "common/ceph_mutex.h" +#include "librbd/migration/FileStream.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "json_spirit/json_spirit.h" +#if __has_include() +#include +namespace fs = std::filesystem; +#else +#include +namespace fs = std::experimental::filesystem; +#endif + +namespace librbd { +namespace { + +struct MockTestImageCtx : public MockImageCtx { + MockTestImageCtx(ImageCtx &image_ctx) : MockImageCtx(image_ctx) { + } +}; + +} // anonymous namespace +} // namespace librbd + +#include "librbd/migration/FileStream.cc" + +namespace librbd { +namespace migration { + +using ::testing::Invoke; + +class TestMockMigrationFileStream : public TestMockFixture { +public: + typedef FileStream MockFileStream; + + librbd::ImageCtx *m_image_ctx; + + void SetUp() override { + TestMockFixture::SetUp(); + + ASSERT_EQ(0, open_image(m_image_name, &m_image_ctx)); + file_name = fs::temp_directory_path() / "TestMockMigrationFileStream"; + file_name += stringify(getpid()); + json_object["file_path"] = file_name; + } + + void TearDown() override { + fs::remove(file_name); + TestMockFixture::TearDown(); + } + + std::string file_name; + json_spirit::mObject json_object; +}; + +TEST_F(TestMockMigrationFileStream, OpenClose) { + MockTestImageCtx mock_image_ctx(*m_image_ctx); + + bufferlist bl; + ASSERT_EQ(0, bl.write_file(file_name.c_str())); + + MockFileStream mock_file_stream(&mock_image_ctx, json_object); + + C_SaferCond ctx1; + mock_file_stream.open(&ctx1); + ASSERT_EQ(0, ctx1.wait()); + + C_SaferCond ctx2; + mock_file_stream.close(&ctx2); + ASSERT_EQ(0, ctx2.wait()); +} + +TEST_F(TestMockMigrationFileStream, GetSize) { + MockTestImageCtx mock_image_ctx(*m_image_ctx); + + bufferlist expect_bl; + expect_bl.append(std::string(128, '1')); + ASSERT_EQ(0, expect_bl.write_file(file_name.c_str())); + + MockFileStream mock_file_stream(&mock_image_ctx, json_object); + + C_SaferCond ctx1; + mock_file_stream.open(&ctx1); + ASSERT_EQ(0, ctx1.wait()); + + C_SaferCond ctx2; + uint64_t size; + mock_file_stream.get_size(&size, &ctx2); + ASSERT_EQ(0, ctx2.wait()); + ASSERT_EQ(128, size); + + C_SaferCond ctx3; + mock_file_stream.close(&ctx3); + ASSERT_EQ(0, ctx3.wait()); +} + +TEST_F(TestMockMigrationFileStream, Read) { + MockTestImageCtx mock_image_ctx(*m_image_ctx); + + bufferlist expect_bl; + expect_bl.append(std::string(128, '1')); + ASSERT_EQ(0, expect_bl.write_file(file_name.c_str())); + + MockFileStream mock_file_stream(&mock_image_ctx, json_object); + + C_SaferCond ctx1; + mock_file_stream.open(&ctx1); + ASSERT_EQ(0, ctx1.wait()); + + C_SaferCond ctx2; + bufferlist bl; + mock_file_stream.read({{0, 128}}, &bl, &ctx2); + ASSERT_EQ(0, ctx2.wait()); + ASSERT_EQ(expect_bl, bl); + + C_SaferCond ctx3; + mock_file_stream.close(&ctx3); + ASSERT_EQ(0, ctx3.wait()); +} + +TEST_F(TestMockMigrationFileStream, SeekRead) { + MockTestImageCtx mock_image_ctx(*m_image_ctx); + + bufferlist write_bl; + write_bl.append(std::string(32, '1')); + write_bl.append(std::string(64, '2')); + write_bl.append(std::string(16, '3')); + ASSERT_EQ(0, write_bl.write_file(file_name.c_str())); + + MockFileStream mock_file_stream(&mock_image_ctx, json_object); + + C_SaferCond ctx1; + mock_file_stream.open(&ctx1); + ASSERT_EQ(0, ctx1.wait()); + + C_SaferCond ctx2; + bufferlist bl; + mock_file_stream.read({{96, 16}, {32, 64}, {0, 32}}, &bl, &ctx2); + ASSERT_EQ(0, ctx2.wait()); + + bufferlist expect_bl; + expect_bl.append(std::string(16, '3')); + expect_bl.append(std::string(64, '2')); + expect_bl.append(std::string(32, '1')); + ASSERT_EQ(expect_bl, bl); + + C_SaferCond ctx3; + mock_file_stream.close(&ctx3); + ASSERT_EQ(0, ctx3.wait()); +} + +TEST_F(TestMockMigrationFileStream, DNE) { + MockTestImageCtx mock_image_ctx(*m_image_ctx); + + MockFileStream mock_file_stream(&mock_image_ctx, json_object); + + C_SaferCond ctx1; + mock_file_stream.open(&ctx1); + ASSERT_EQ(-ENOENT, ctx1.wait()); + + C_SaferCond ctx2; + mock_file_stream.close(&ctx2); + ASSERT_EQ(0, ctx2.wait()); +} + +TEST_F(TestMockMigrationFileStream, SeekError) { + MockTestImageCtx mock_image_ctx(*m_image_ctx); + + bufferlist bl; + ASSERT_EQ(0, bl.write_file(file_name.c_str())); + + MockFileStream mock_file_stream(&mock_image_ctx, json_object); + + C_SaferCond ctx1; + mock_file_stream.open(&ctx1); + ASSERT_EQ(0, ctx1.wait()); + + C_SaferCond ctx2; + mock_file_stream.read({{128, 128}}, &bl, &ctx2); + ASSERT_EQ(-ERANGE, ctx2.wait()); + + C_SaferCond ctx3; + mock_file_stream.close(&ctx3); + ASSERT_EQ(0, ctx3.wait()); +} + +TEST_F(TestMockMigrationFileStream, ShortReadError) { + MockTestImageCtx mock_image_ctx(*m_image_ctx); + + bufferlist expect_bl; + expect_bl.append(std::string(128, '1')); + ASSERT_EQ(0, expect_bl.write_file(file_name.c_str())); + + MockFileStream mock_file_stream(&mock_image_ctx, json_object); + + C_SaferCond ctx1; + mock_file_stream.open(&ctx1); + ASSERT_EQ(0, ctx1.wait()); + + C_SaferCond ctx2; + bufferlist bl; + mock_file_stream.read({{0, 256}}, &bl, &ctx2); + ASSERT_EQ(-ERANGE, ctx2.wait()); + + C_SaferCond ctx3; + mock_file_stream.close(&ctx3); + ASSERT_EQ(0, ctx3.wait()); +} + +} // namespace migration +} // namespace librbd