return 0;
}
+
+int StripeProcessor::process(bufferlist&& data, uint64_t offset)
+{
+ ceph_assert(offset >= bounds.first);
+
+ const bool flush = (data.length() == 0);
+ if (flush) {
+ return Pipe::process({}, offset - bounds.first);
+ }
+
+ auto max = bounds.second - offset;
+ while (data.length() > max) {
+ if (max > 0) {
+ bufferlist bl;
+ data.splice(0, max, &bl);
+
+ int r = Pipe::process(std::move(bl), offset - bounds.first);
+ if (r < 0) {
+ return r;
+ }
+ offset += max;
+ }
+
+ // flush the current chunk
+ int r = Pipe::process({}, offset - bounds.first);
+ if (r < 0) {
+ return r;
+ }
+ // generate the next stripe
+ uint64_t stripe_size;
+ r = gen->next(offset, &stripe_size);
+ if (r < 0) {
+ return r;
+ }
+ ceph_assert(stripe_size > 0);
+
+ bounds.first = offset;
+ bounds.second = offset + stripe_size;
+
+ max = stripe_size;
+ }
+
+ if (data.length() == 0) { // don't flush the chunk here
+ return 0;
+ }
+ return Pipe::process(std::move(data), offset - bounds.first);
+}
+
} // namespace rgw::putobj
int process(bufferlist&& data, uint64_t offset) override;
};
+
+// interface to generate the next stripe description
+class StripeGenerator {
+ public:
+ virtual ~StripeGenerator() {}
+
+ virtual int next(uint64_t offset, uint64_t *stripe_size) = 0;
+};
+
+// pipe that respects stripe boundaries and restarts each stripe at offset 0
+class StripeProcessor : public Pipe {
+ StripeGenerator *gen;
+ std::pair<uint64_t, uint64_t> bounds; // bounds of current stripe
+ public:
+ StripeProcessor(DataProcessor *next, StripeGenerator *gen,
+ uint64_t stripe_size)
+ : Pipe(next), gen(gen), bounds(0, stripe_size)
+ {}
+
+ int process(bufferlist&& data, uint64_t data_offset) override;
+};
+
} // namespace rgw::putobj
EXPECT_EQ(Op({"88", 8}), mock.ops[2]);
EXPECT_EQ(Op({"", 10}), mock.ops[3]);
}
+
+
+using StripeMap = std::map<uint64_t, uint64_t>; // offset, stripe_size
+
+class StripeMapGen : public rgw::putobj::StripeGenerator {
+ const StripeMap& stripes;
+ public:
+ StripeMapGen(const StripeMap& stripes) : stripes(stripes) {}
+
+ int next(uint64_t offset, uint64_t *stripe_size) override {
+ auto i = stripes.find(offset);
+ if (i == stripes.end()) {
+ return -ENOENT;
+ }
+ *stripe_size = i->second;
+ return 0;
+ }
+};
+
+TEST(PutObj_Stripe, DifferentStripeSize)
+{
+ MockProcessor mock;
+ StripeMap stripes{
+ { 0, 4},
+ { 4, 6},
+ {10, 2}
+ };
+ StripeMapGen gen(stripes);
+ rgw::putobj::StripeProcessor processor(&mock, &gen, stripes.begin()->second);
+
+ ASSERT_EQ(0, processor.process(string_buf("22"), 0));
+ ASSERT_EQ(1u, mock.ops.size());
+ EXPECT_EQ(Op({"22", 0}), mock.ops[0]);
+
+ ASSERT_EQ(0, processor.process(string_buf("4444"), 2));
+ ASSERT_EQ(4u, mock.ops.size());
+ EXPECT_EQ(Op({"44", 2}), mock.ops[1]);
+ EXPECT_EQ(Op({"", 4}), mock.ops[2]); // flush
+ EXPECT_EQ(Op({"44", 0}), mock.ops[3]);
+
+ ASSERT_EQ(0, processor.process(string_buf("666666"), 6));
+ ASSERT_EQ(7u, mock.ops.size());
+ EXPECT_EQ(Op({"6666", 2}), mock.ops[4]);
+ EXPECT_EQ(Op({"", 6}), mock.ops[5]); // flush
+ EXPECT_EQ(Op({"66", 0}), mock.ops[6]);
+
+ ASSERT_EQ(0, processor.process({}, 12));
+ ASSERT_EQ(8u, mock.ops.size());
+ EXPECT_EQ(Op({"", 2}), mock.ops[7]); // flush
+
+ // gen returns an error past this
+ ASSERT_EQ(-ENOENT, processor.process(string_buf("1"), 12));
+}
+
+TEST(PutObj_Stripe, SkipFirstChunk)
+{
+ MockProcessor mock;
+ StripeMap stripes{
+ {0, 4},
+ {4, 4},
+ };
+ StripeMapGen gen(stripes);
+ rgw::putobj::StripeProcessor processor(&mock, &gen, stripes.begin()->second);
+
+ ASSERT_EQ(0, processor.process(string_buf("666666"), 2));
+ ASSERT_EQ(3u, mock.ops.size());
+ EXPECT_EQ(Op({"66", 2}), mock.ops[0]);
+ EXPECT_EQ(Op({"", 4}), mock.ops[1]); // flush
+ EXPECT_EQ(Op({"6666", 0}), mock.ops[2]);
+
+ ASSERT_EQ(0, processor.process({}, 8));
+ ASSERT_EQ(4u, mock.ops.size());
+ EXPECT_EQ(Op({"", 4}), mock.ops[3]); // flush
+}