From: Casey Bodley Date: Wed, 10 Oct 2018 19:11:48 +0000 (-0400) Subject: rgw: add rgw::putobj::ChunkProcessor and test X-Git-Tag: 3.2-0~167^2~29 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=594dc4cceb412bdc66244ee297ea3fa919bc0f06;p=ceph-ci.git rgw: add rgw::putobj::ChunkProcessor and test ChunkProcessor turns the input stream into a series of discrete chunks before forwarding to the wrapped DataProcessor Signed-off-by: Casey Bodley --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 31efc05b8b5..c830d8e79c4 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -84,6 +84,7 @@ set(librgw_common_srcs rgw_op.cc rgw_otp.cc rgw_policy_s3.cc + rgw_putobj.cc rgw_quota.cc rgw_rados.cc rgw_resolve.cc diff --git a/src/rgw/rgw_putobj.cc b/src/rgw/rgw_putobj.cc new file mode 100644 index 00000000000..845ef44ee9c --- /dev/null +++ b/src/rgw/rgw_putobj.cc @@ -0,0 +1,50 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "rgw_putobj.h" + +namespace rgw::putobj { + +int ChunkProcessor::process(bufferlist&& data, uint64_t offset) +{ + ceph_assert(offset >= chunk.length()); + uint64_t position = offset - chunk.length(); + + const bool flush = (data.length() == 0); + if (flush) { + if (chunk.length() > 0) { + int r = Pipe::process(std::move(chunk), position); + if (r < 0) { + return r; + } + } + return Pipe::process({}, offset); + } + chunk.claim_append(data); + + // write each full chunk + while (chunk.length() >= chunk_size) { + bufferlist bl; + chunk.splice(0, chunk_size, &bl); + + int r = Pipe::process(std::move(bl), position); + if (r < 0) { + return r; + } + position += chunk_size; + } + return 0; +} + +} // namespace rgw::putobj diff --git a/src/rgw/rgw_putobj.h b/src/rgw/rgw_putobj.h index 0c8995443ed..eb885a8ec5d 100644 --- a/src/rgw/rgw_putobj.h +++ b/src/rgw/rgw_putobj.h @@ -41,4 +41,16 @@ class Pipe : public DataProcessor { } }; +// pipe that writes to the next processor in discrete chunks +class ChunkProcessor : public Pipe { + uint64_t chunk_size; + bufferlist chunk; // leftover bytes from the last call to process() + public: + ChunkProcessor(DataProcessor *next, uint64_t chunk_size) + : Pipe(next), chunk_size(chunk_size) + {} + + int process(bufferlist&& data, uint64_t offset) override; +}; + } // namespace rgw::putobj diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 6ba12e3177d..e37c1723611 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -92,6 +92,10 @@ target_link_libraries(unittest_rgw_crypto ${CRYPTO_LIBS} ) +add_executable(unittest_rgw_putobj test_rgw_putobj.cc) +add_ceph_unittest(unittest_rgw_putobj) +target_link_libraries(unittest_rgw_putobj rgw_a ${UNITTEST_LIBS}) + add_executable(unittest_rgw_iam_policy test_rgw_iam_policy.cc) add_ceph_unittest(unittest_rgw_iam_policy) target_link_libraries(unittest_rgw_iam_policy diff --git a/src/test/rgw/test_rgw_putobj.cc b/src/test/rgw/test_rgw_putobj.cc new file mode 100644 index 00000000000..01e07571dba --- /dev/null +++ b/src/test/rgw/test_rgw_putobj.cc @@ -0,0 +1,122 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "rgw/rgw_putobj.h" +#include + +inline bufferlist string_buf(const char* buf) { + bufferlist bl; + bl.append(buffer::create_static(strlen(buf), (char*)buf)); + return bl; +} + +struct Op { + std::string data; + uint64_t offset; +}; +inline bool operator==(const Op& lhs, const Op& rhs) { + return lhs.data == rhs.data && lhs.offset == rhs.offset; +} +inline std::ostream& operator<<(std::ostream& out, const Op& op) { + return out << "{off=" << op.offset << " data='" << op.data << "'}"; +} + +struct MockProcessor : rgw::putobj::DataProcessor { + std::vector ops; + + int process(bufferlist&& data, uint64_t offset) override { + ops.push_back({data.to_str(), offset}); + return {}; + } +}; + +TEST(PutObj_Chunk, FlushHalf) +{ + MockProcessor mock; + rgw::putobj::ChunkProcessor chunk(&mock, 4); + + ASSERT_EQ(0, chunk.process(string_buf("22"), 0)); + ASSERT_TRUE(mock.ops.empty()); // no writes + + ASSERT_EQ(0, chunk.process({}, 2)); // flush + ASSERT_EQ(2u, mock.ops.size()); + EXPECT_EQ(Op({"22", 0}), mock.ops[0]); + EXPECT_EQ(Op({"", 2}), mock.ops[1]); +} + +TEST(PutObj_Chunk, One) +{ + MockProcessor mock; + rgw::putobj::ChunkProcessor chunk(&mock, 4); + + ASSERT_EQ(0, chunk.process(string_buf("4444"), 0)); + ASSERT_EQ(1u, mock.ops.size()); + EXPECT_EQ(Op({"4444", 0}), mock.ops[0]); + + ASSERT_EQ(0, chunk.process({}, 4)); // flush + ASSERT_EQ(2u, mock.ops.size()); + EXPECT_EQ(Op({"", 4}), mock.ops[1]); +} + +TEST(PutObj_Chunk, OneAndFlushHalf) +{ + MockProcessor mock; + rgw::putobj::ChunkProcessor chunk(&mock, 4); + + ASSERT_EQ(0, chunk.process(string_buf("22"), 0)); + ASSERT_TRUE(mock.ops.empty()); + + ASSERT_EQ(0, chunk.process(string_buf("4444"), 2)); + ASSERT_EQ(1u, mock.ops.size()); + EXPECT_EQ(Op({"2244", 0}), mock.ops[0]); + + ASSERT_EQ(0, chunk.process({}, 6)); // flush + ASSERT_EQ(3u, mock.ops.size()); + EXPECT_EQ(Op({"44", 4}), mock.ops[1]); + EXPECT_EQ(Op({"", 6}), mock.ops[2]); +} + +TEST(PutObj_Chunk, Two) +{ + MockProcessor mock; + rgw::putobj::ChunkProcessor chunk(&mock, 4); + + ASSERT_EQ(0, chunk.process(string_buf("88888888"), 0)); + ASSERT_EQ(2u, mock.ops.size()); + EXPECT_EQ(Op({"8888", 0}), mock.ops[0]); + EXPECT_EQ(Op({"8888", 4}), mock.ops[1]); + + ASSERT_EQ(0, chunk.process({}, 8)); // flush + ASSERT_EQ(3u, mock.ops.size()); + EXPECT_EQ(Op({"", 8}), mock.ops[2]); +} + +TEST(PutObj_Chunk, TwoAndFlushHalf) +{ + MockProcessor mock; + rgw::putobj::ChunkProcessor chunk(&mock, 4); + + ASSERT_EQ(0, chunk.process(string_buf("22"), 0)); + ASSERT_TRUE(mock.ops.empty()); + + ASSERT_EQ(0, chunk.process(string_buf("88888888"), 2)); + ASSERT_EQ(2u, mock.ops.size()); + EXPECT_EQ(Op({"2288", 0}), mock.ops[0]); + EXPECT_EQ(Op({"8888", 4}), mock.ops[1]); + + ASSERT_EQ(0, chunk.process({}, 10)); // flush + ASSERT_EQ(4u, mock.ops.size()); + EXPECT_EQ(Op({"88", 8}), mock.ops[2]); + EXPECT_EQ(Op({"", 10}), mock.ops[3]); +}