]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add rgw::putobj::StripeProcessor
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:11:51 +0000 (15:11 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 15 Oct 2018 21:05:06 +0000 (17:05 -0400)
a pipe that stripes data over multiple objects according to an
abstract StripeGenerator

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_putobj.cc
src/rgw/rgw_putobj.h
src/test/rgw/test_rgw_putobj.cc

index 845ef44ee9ced76a175a36aa227c01403ece0315..cd4d4bf2fcdc1c52bb84f376a29f9d7a6d3f9b45 100644 (file)
@@ -47,4 +47,52 @@ int ChunkProcessor::process(bufferlist&& data, uint64_t offset)
   return 0;
 }
 
+
+int StripeProcessor::process(bufferlist&& data, uint64_t offset)
+{
+  ceph_assert(offset >= bounds.first);
+
+  const bool flush = (data.length() == 0);
+  if (flush) {
+    return Pipe::process({}, offset - bounds.first);
+  }
+
+  auto max = bounds.second - offset;
+  while (data.length() > max) {
+    if (max > 0) {
+      bufferlist bl;
+      data.splice(0, max, &bl);
+
+      int r = Pipe::process(std::move(bl), offset - bounds.first);
+      if (r < 0) {
+        return r;
+      }
+      offset += max;
+    }
+
+    // flush the current chunk
+    int r = Pipe::process({}, offset - bounds.first);
+    if (r < 0) {
+      return r;
+    }
+    // generate the next stripe
+    uint64_t stripe_size;
+    r = gen->next(offset, &stripe_size);
+    if (r < 0) {
+      return r;
+    }
+    ceph_assert(stripe_size > 0);
+
+    bounds.first = offset;
+    bounds.second = offset + stripe_size;
+
+    max = stripe_size;
+  }
+
+  if (data.length() == 0) { // don't flush the chunk here
+    return 0;
+  }
+  return Pipe::process(std::move(data), offset - bounds.first);
+}
+
 } // namespace rgw::putobj
index eb885a8ec5d8b4c3d4748e16be8ccac9c10f65cb..1e4058eadf8b8cc597a658b4c9359854ad0a043f 100644 (file)
@@ -53,4 +53,26 @@ class ChunkProcessor : public Pipe {
   int process(bufferlist&& data, uint64_t offset) override;
 };
 
+
+// interface to generate the next stripe description
+class StripeGenerator {
+ public:
+  virtual ~StripeGenerator() {}
+
+  virtual int next(uint64_t offset, uint64_t *stripe_size) = 0;
+};
+
+// pipe that respects stripe boundaries and restarts each stripe at offset 0
+class StripeProcessor : public Pipe {
+  StripeGenerator *gen;
+  std::pair<uint64_t, uint64_t> bounds; // bounds of current stripe
+ public:
+  StripeProcessor(DataProcessor *next, StripeGenerator *gen,
+                  uint64_t stripe_size)
+    : Pipe(next), gen(gen), bounds(0, stripe_size)
+  {}
+
+  int process(bufferlist&& data, uint64_t data_offset) override;
+};
+
 } // namespace rgw::putobj
index 01e07571dba1f7940ba5a87fa7370c880e842b57..91661f45055ff0268251079f2e6e5d4073a3b3c1 100644 (file)
@@ -120,3 +120,77 @@ TEST(PutObj_Chunk, TwoAndFlushHalf)
   EXPECT_EQ(Op({"88", 8}), mock.ops[2]);
   EXPECT_EQ(Op({"", 10}), mock.ops[3]);
 }
+
+
+using StripeMap = std::map<uint64_t, uint64_t>; // offset, stripe_size
+
+class StripeMapGen : public rgw::putobj::StripeGenerator {
+  const StripeMap& stripes;
+ public:
+  StripeMapGen(const StripeMap& stripes) : stripes(stripes) {}
+
+  int next(uint64_t offset, uint64_t *stripe_size) override {
+    auto i = stripes.find(offset);
+    if (i == stripes.end()) {
+      return -ENOENT;
+    }
+    *stripe_size = i->second;
+    return 0;
+  }
+};
+
+TEST(PutObj_Stripe, DifferentStripeSize)
+{
+  MockProcessor mock;
+  StripeMap stripes{
+    { 0, 4},
+    { 4, 6},
+    {10, 2}
+  };
+  StripeMapGen gen(stripes);
+  rgw::putobj::StripeProcessor processor(&mock, &gen, stripes.begin()->second);
+
+  ASSERT_EQ(0, processor.process(string_buf("22"), 0));
+  ASSERT_EQ(1u, mock.ops.size());
+  EXPECT_EQ(Op({"22", 0}), mock.ops[0]);
+
+  ASSERT_EQ(0, processor.process(string_buf("4444"), 2));
+  ASSERT_EQ(4u, mock.ops.size());
+  EXPECT_EQ(Op({"44", 2}), mock.ops[1]);
+  EXPECT_EQ(Op({"", 4}), mock.ops[2]); // flush
+  EXPECT_EQ(Op({"44", 0}), mock.ops[3]);
+
+  ASSERT_EQ(0, processor.process(string_buf("666666"), 6));
+  ASSERT_EQ(7u, mock.ops.size());
+  EXPECT_EQ(Op({"6666", 2}), mock.ops[4]);
+  EXPECT_EQ(Op({"", 6}), mock.ops[5]); // flush
+  EXPECT_EQ(Op({"66", 0}), mock.ops[6]);
+
+  ASSERT_EQ(0, processor.process({}, 12));
+  ASSERT_EQ(8u, mock.ops.size());
+  EXPECT_EQ(Op({"", 2}), mock.ops[7]); // flush
+
+  // gen returns an error past this
+  ASSERT_EQ(-ENOENT, processor.process(string_buf("1"), 12));
+}
+
+TEST(PutObj_Stripe, SkipFirstChunk)
+{
+  MockProcessor mock;
+  StripeMap stripes{
+    {0, 4},
+    {4, 4},
+  };
+  StripeMapGen gen(stripes);
+  rgw::putobj::StripeProcessor processor(&mock, &gen, stripes.begin()->second);
+
+  ASSERT_EQ(0, processor.process(string_buf("666666"), 2));
+  ASSERT_EQ(3u, mock.ops.size());
+  EXPECT_EQ(Op({"66", 2}), mock.ops[0]);
+  EXPECT_EQ(Op({"", 4}), mock.ops[1]); // flush
+  EXPECT_EQ(Op({"6666", 0}), mock.ops[2]);
+
+  ASSERT_EQ(0, processor.process({}, 8));
+  ASSERT_EQ(4u, mock.ops.size());
+  EXPECT_EQ(Op({"", 4}), mock.ops[3]); // flush
+}