]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: add rgw::putobj::ChunkProcessor and test
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:11:48 +0000 (15:11 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 15 Oct 2018 21:01:36 +0000 (17:01 -0400)
ChunkProcessor turns the input stream into a series of discrete chunks
before forwarding to the wrapped DataProcessor

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_putobj.cc [new file with mode: 0644]
src/rgw/rgw_putobj.h
src/test/rgw/CMakeLists.txt
src/test/rgw/test_rgw_putobj.cc [new file with mode: 0644]

index 31efc05b8b547ea3c86ec3cf471ad465a7f958d0..c830d8e79c49bfa38175b2b66d4c0f2d70c3fa4c 100644 (file)
@@ -84,6 +84,7 @@ set(librgw_common_srcs
   rgw_op.cc
   rgw_otp.cc
   rgw_policy_s3.cc
+  rgw_putobj.cc
   rgw_quota.cc
   rgw_rados.cc
   rgw_resolve.cc
diff --git a/src/rgw/rgw_putobj.cc b/src/rgw/rgw_putobj.cc
new file mode 100644 (file)
index 0000000..845ef44
--- /dev/null
@@ -0,0 +1,50 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "rgw_putobj.h"
+
+namespace rgw::putobj {
+
+int ChunkProcessor::process(bufferlist&& data, uint64_t offset)
+{
+  ceph_assert(offset >= chunk.length());
+  uint64_t position = offset - chunk.length();
+
+  const bool flush = (data.length() == 0);
+  if (flush) {
+    if (chunk.length() > 0) {
+      int r = Pipe::process(std::move(chunk), position);
+      if (r < 0) {
+        return r;
+      }
+    }
+    return Pipe::process({}, offset);
+  }
+  chunk.claim_append(data);
+
+  // write each full chunk
+  while (chunk.length() >= chunk_size) {
+    bufferlist bl;
+    chunk.splice(0, chunk_size, &bl);
+
+    int r = Pipe::process(std::move(bl), position);
+    if (r < 0) {
+      return r;
+    }
+    position += chunk_size;
+  }
+  return 0;
+}
+
+} // namespace rgw::putobj
index 0c8995443ed67e09640fb0ac2daad568e77d9d5e..eb885a8ec5d8b4c3d4748e16be8ccac9c10f65cb 100644 (file)
@@ -41,4 +41,16 @@ class Pipe : public DataProcessor {
   }
 };
 
+// pipe that writes to the next processor in discrete chunks
+class ChunkProcessor : public Pipe {
+  uint64_t chunk_size;
+  bufferlist chunk; // leftover bytes from the last call to process()
+ public:
+  ChunkProcessor(DataProcessor *next, uint64_t chunk_size)
+    : Pipe(next), chunk_size(chunk_size)
+  {}
+
+  int process(bufferlist&& data, uint64_t offset) override;
+};
+
 } // namespace rgw::putobj
index 6ba12e3177df02dd6f76abc27a3db46539d43f1e..e37c1723611ec81eebe44c4148cbfb1f759aaf2b 100644 (file)
@@ -92,6 +92,10 @@ target_link_libraries(unittest_rgw_crypto
   ${CRYPTO_LIBS}
   )
 
+add_executable(unittest_rgw_putobj test_rgw_putobj.cc)
+add_ceph_unittest(unittest_rgw_putobj)
+target_link_libraries(unittest_rgw_putobj rgw_a ${UNITTEST_LIBS})
+
 add_executable(unittest_rgw_iam_policy test_rgw_iam_policy.cc)
 add_ceph_unittest(unittest_rgw_iam_policy)
 target_link_libraries(unittest_rgw_iam_policy
diff --git a/src/test/rgw/test_rgw_putobj.cc b/src/test/rgw/test_rgw_putobj.cc
new file mode 100644 (file)
index 0000000..01e0757
--- /dev/null
@@ -0,0 +1,122 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "rgw/rgw_putobj.h"
+#include <gtest/gtest.h>
+
+inline bufferlist string_buf(const char* buf) {
+  bufferlist bl;
+  bl.append(buffer::create_static(strlen(buf), (char*)buf));
+  return bl;
+}
+
+struct Op {
+  std::string data;
+  uint64_t offset;
+};
+inline bool operator==(const Op& lhs, const Op& rhs) {
+  return lhs.data == rhs.data && lhs.offset == rhs.offset;
+}
+inline std::ostream& operator<<(std::ostream& out, const Op& op) {
+  return out << "{off=" << op.offset << " data='" << op.data << "'}";
+}
+
+struct MockProcessor : rgw::putobj::DataProcessor {
+  std::vector<Op> ops;
+
+  int process(bufferlist&& data, uint64_t offset) override {
+    ops.push_back({data.to_str(), offset});
+    return {};
+  }
+};
+
+TEST(PutObj_Chunk, FlushHalf)
+{
+  MockProcessor mock;
+  rgw::putobj::ChunkProcessor chunk(&mock, 4);
+
+  ASSERT_EQ(0, chunk.process(string_buf("22"), 0));
+  ASSERT_TRUE(mock.ops.empty()); // no writes
+
+  ASSERT_EQ(0, chunk.process({}, 2)); // flush
+  ASSERT_EQ(2u, mock.ops.size());
+  EXPECT_EQ(Op({"22", 0}), mock.ops[0]);
+  EXPECT_EQ(Op({"", 2}), mock.ops[1]);
+}
+
+TEST(PutObj_Chunk, One)
+{
+  MockProcessor mock;
+  rgw::putobj::ChunkProcessor chunk(&mock, 4);
+
+  ASSERT_EQ(0, chunk.process(string_buf("4444"), 0));
+  ASSERT_EQ(1u, mock.ops.size());
+  EXPECT_EQ(Op({"4444", 0}), mock.ops[0]);
+
+  ASSERT_EQ(0, chunk.process({}, 4)); // flush
+  ASSERT_EQ(2u, mock.ops.size());
+  EXPECT_EQ(Op({"", 4}), mock.ops[1]);
+}
+
+TEST(PutObj_Chunk, OneAndFlushHalf)
+{
+  MockProcessor mock;
+  rgw::putobj::ChunkProcessor chunk(&mock, 4);
+
+  ASSERT_EQ(0, chunk.process(string_buf("22"), 0));
+  ASSERT_TRUE(mock.ops.empty());
+
+  ASSERT_EQ(0, chunk.process(string_buf("4444"), 2));
+  ASSERT_EQ(1u, mock.ops.size());
+  EXPECT_EQ(Op({"2244", 0}), mock.ops[0]);
+
+  ASSERT_EQ(0, chunk.process({}, 6)); // flush
+  ASSERT_EQ(3u, mock.ops.size());
+  EXPECT_EQ(Op({"44", 4}), mock.ops[1]);
+  EXPECT_EQ(Op({"", 6}), mock.ops[2]);
+}
+
+TEST(PutObj_Chunk, Two)
+{
+  MockProcessor mock;
+  rgw::putobj::ChunkProcessor chunk(&mock, 4);
+
+  ASSERT_EQ(0, chunk.process(string_buf("88888888"), 0));
+  ASSERT_EQ(2u, mock.ops.size());
+  EXPECT_EQ(Op({"8888", 0}), mock.ops[0]);
+  EXPECT_EQ(Op({"8888", 4}), mock.ops[1]);
+
+  ASSERT_EQ(0, chunk.process({}, 8)); // flush
+  ASSERT_EQ(3u, mock.ops.size());
+  EXPECT_EQ(Op({"", 8}), mock.ops[2]);
+}
+
+TEST(PutObj_Chunk, TwoAndFlushHalf)
+{
+  MockProcessor mock;
+  rgw::putobj::ChunkProcessor chunk(&mock, 4);
+
+  ASSERT_EQ(0, chunk.process(string_buf("22"), 0));
+  ASSERT_TRUE(mock.ops.empty());
+
+  ASSERT_EQ(0, chunk.process(string_buf("88888888"), 2));
+  ASSERT_EQ(2u, mock.ops.size());
+  EXPECT_EQ(Op({"2288", 0}), mock.ops[0]);
+  EXPECT_EQ(Op({"8888", 4}), mock.ops[1]);
+
+  ASSERT_EQ(0, chunk.process({}, 10)); // flush
+  ASSERT_EQ(4u, mock.ops.size());
+  EXPECT_EQ(Op({"88", 8}), mock.ops[2]);
+  EXPECT_EQ(Op({"", 10}), mock.ops[3]);
+}