From 4ba0348611ff52da12e9cf454aa49faf3cd28a62 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 10 Oct 2018 15:11:51 -0400 Subject: [PATCH] rgw: add rgw::putobj::StripeProcessor a pipe that stripes data over multiple objects according to an abstract StripeGenerator Signed-off-by: Casey Bodley --- src/rgw/rgw_putobj.cc | 48 +++++++++++++++++++++ src/rgw/rgw_putobj.h | 22 ++++++++++ src/test/rgw/test_rgw_putobj.cc | 74 +++++++++++++++++++++++++++++++++ 3 files changed, 144 insertions(+) diff --git a/src/rgw/rgw_putobj.cc b/src/rgw/rgw_putobj.cc index 845ef44ee9ced..cd4d4bf2fcdc1 100644 --- a/src/rgw/rgw_putobj.cc +++ b/src/rgw/rgw_putobj.cc @@ -47,4 +47,52 @@ int ChunkProcessor::process(bufferlist&& data, uint64_t offset) return 0; } + +int StripeProcessor::process(bufferlist&& data, uint64_t offset) +{ + ceph_assert(offset >= bounds.first); + + const bool flush = (data.length() == 0); + if (flush) { + return Pipe::process({}, offset - bounds.first); + } + + auto max = bounds.second - offset; + while (data.length() > max) { + if (max > 0) { + bufferlist bl; + data.splice(0, max, &bl); + + int r = Pipe::process(std::move(bl), offset - bounds.first); + if (r < 0) { + return r; + } + offset += max; + } + + // flush the current chunk + int r = Pipe::process({}, offset - bounds.first); + if (r < 0) { + return r; + } + // generate the next stripe + uint64_t stripe_size; + r = gen->next(offset, &stripe_size); + if (r < 0) { + return r; + } + ceph_assert(stripe_size > 0); + + bounds.first = offset; + bounds.second = offset + stripe_size; + + max = stripe_size; + } + + if (data.length() == 0) { // don't flush the chunk here + return 0; + } + return Pipe::process(std::move(data), offset - bounds.first); +} + } // namespace rgw::putobj diff --git a/src/rgw/rgw_putobj.h b/src/rgw/rgw_putobj.h index eb885a8ec5d8b..1e4058eadf8b8 100644 --- a/src/rgw/rgw_putobj.h +++ b/src/rgw/rgw_putobj.h @@ -53,4 +53,26 @@ class ChunkProcessor : public Pipe { int process(bufferlist&& data, uint64_t offset) override; }; + +// interface to generate the next stripe description +class StripeGenerator { + public: + virtual ~StripeGenerator() {} + + virtual int next(uint64_t offset, uint64_t *stripe_size) = 0; +}; + +// pipe that respects stripe boundaries and restarts each stripe at offset 0 +class StripeProcessor : public Pipe { + StripeGenerator *gen; + std::pair bounds; // bounds of current stripe + public: + StripeProcessor(DataProcessor *next, StripeGenerator *gen, + uint64_t stripe_size) + : Pipe(next), gen(gen), bounds(0, stripe_size) + {} + + int process(bufferlist&& data, uint64_t data_offset) override; +}; + } // namespace rgw::putobj diff --git a/src/test/rgw/test_rgw_putobj.cc b/src/test/rgw/test_rgw_putobj.cc index 01e07571dba1f..91661f45055ff 100644 --- a/src/test/rgw/test_rgw_putobj.cc +++ b/src/test/rgw/test_rgw_putobj.cc @@ -120,3 +120,77 @@ TEST(PutObj_Chunk, TwoAndFlushHalf) EXPECT_EQ(Op({"88", 8}), mock.ops[2]); EXPECT_EQ(Op({"", 10}), mock.ops[3]); } + + +using StripeMap = std::map; // offset, stripe_size + +class StripeMapGen : public rgw::putobj::StripeGenerator { + const StripeMap& stripes; + public: + StripeMapGen(const StripeMap& stripes) : stripes(stripes) {} + + int next(uint64_t offset, uint64_t *stripe_size) override { + auto i = stripes.find(offset); + if (i == stripes.end()) { + return -ENOENT; + } + *stripe_size = i->second; + return 0; + } +}; + +TEST(PutObj_Stripe, DifferentStripeSize) +{ + MockProcessor mock; + StripeMap stripes{ + { 0, 4}, + { 4, 6}, + {10, 2} + }; + StripeMapGen gen(stripes); + rgw::putobj::StripeProcessor processor(&mock, &gen, stripes.begin()->second); + + ASSERT_EQ(0, processor.process(string_buf("22"), 0)); + ASSERT_EQ(1u, mock.ops.size()); + EXPECT_EQ(Op({"22", 0}), mock.ops[0]); + + ASSERT_EQ(0, processor.process(string_buf("4444"), 2)); + ASSERT_EQ(4u, mock.ops.size()); + EXPECT_EQ(Op({"44", 2}), mock.ops[1]); + EXPECT_EQ(Op({"", 4}), mock.ops[2]); // flush + EXPECT_EQ(Op({"44", 0}), mock.ops[3]); + + ASSERT_EQ(0, processor.process(string_buf("666666"), 6)); + ASSERT_EQ(7u, mock.ops.size()); + EXPECT_EQ(Op({"6666", 2}), mock.ops[4]); + EXPECT_EQ(Op({"", 6}), mock.ops[5]); // flush + EXPECT_EQ(Op({"66", 0}), mock.ops[6]); + + ASSERT_EQ(0, processor.process({}, 12)); + ASSERT_EQ(8u, mock.ops.size()); + EXPECT_EQ(Op({"", 2}), mock.ops[7]); // flush + + // gen returns an error past this + ASSERT_EQ(-ENOENT, processor.process(string_buf("1"), 12)); +} + +TEST(PutObj_Stripe, SkipFirstChunk) +{ + MockProcessor mock; + StripeMap stripes{ + {0, 4}, + {4, 4}, + }; + StripeMapGen gen(stripes); + rgw::putobj::StripeProcessor processor(&mock, &gen, stripes.begin()->second); + + ASSERT_EQ(0, processor.process(string_buf("666666"), 2)); + ASSERT_EQ(3u, mock.ops.size()); + EXPECT_EQ(Op({"66", 2}), mock.ops[0]); + EXPECT_EQ(Op({"", 4}), mock.ops[1]); // flush + EXPECT_EQ(Op({"6666", 0}), mock.ops[2]); + + ASSERT_EQ(0, processor.process({}, 8)); + ASSERT_EQ(4u, mock.ops.size()); + EXPECT_EQ(Op({"", 4}), mock.ops[3]); // flush +} -- 2.39.5