]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cls/queue: add multithreaded unit tests 33218/head
authorYuval Lifshitz <yuvalif@yahoo.com>
Thu, 13 Feb 2020 17:42:40 +0000 (19:42 +0200)
committerYuval Lifshitz <yuvalif@yahoo.com>
Wed, 26 Feb 2020 14:44:41 +0000 (16:44 +0200)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
src/test/cls_queue/test_cls_queue.cc

index 05914bce9fe3c3f965bc6e148fe0fb191ff5b943..97028f15803b368e26cc8edf7735f9f5865a0614 100644 (file)
@@ -14,6 +14,9 @@
 #include <string>
 #include <vector>
 #include <algorithm>
+#include <thread>
+#include <chrono>
+#include <atomic>
 
 class TestClsQueue : public ::testing::Test {
 protected:
@@ -33,10 +36,10 @@ protected:
   }
 
   void test_enqueue(const std::string& queue_name, 
-          librados::ObjectWriteOperation& op, 
           int number_of_ops, 
           int number_of_elements,
           int expected_rc) {
+    librados::ObjectWriteOperation op;
     // test multiple enqueues
     for (auto i = 0; i < number_of_ops; ++i) {
       const std::string element_prefix("op-" +to_string(i) + "-element-");
@@ -51,7 +54,7 @@ protected:
       // enqueue vector
       cls_queue_enqueue(op, 0, data);
       ASSERT_EQ(expected_rc, ioctx.operate(queue_name, &op));
-      }
+    }
   }
 };
 
@@ -82,7 +85,7 @@ TEST_F(TestClsQueue, Enqueue)
   // test multiple enqueues
   // 10 iterations, 100 elelemts each
   // expect 0 (OK)
-  test_enqueue(queue_name, op, 10, 100, 0);
+  test_enqueue(queue_name, 10, 100, 0);
 }
 
 TEST_F(TestClsQueue, QueueFull)
@@ -96,10 +99,10 @@ TEST_F(TestClsQueue, QueueFull)
 
   // 8 iterations, 5 elelemts each
   // expect 0 (OK)
-  test_enqueue(queue_name, op, 8, 5, 0);
+  test_enqueue(queue_name, 8, 5, 0);
   // 2 iterations, 5 elelemts each
   // expect -28 (Q FULL)
-  test_enqueue(queue_name, op, 2, 5, -28);
+  test_enqueue(queue_name, 2, 5, -28);
 }
 
 TEST_F(TestClsQueue, List)
@@ -110,25 +113,26 @@ TEST_F(TestClsQueue, List)
   op.create(true);
   cls_queue_init(op, queue_name, queue_size);
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+  const auto number_of_ops = 10;
+  const auto number_of_elements = 100;
 
   // test multiple enqueues
-  test_enqueue(queue_name, op, 10, 100, 0);
+  test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
 
-  const auto max_elements = 50;
+  const auto max_elements = 42;
   std::string marker;
   bool truncated = false;
   std::string next_marker;
+  auto total_elements = 0;
   do {
     std::vector<cls_queue_entry> entries;
     const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, next_marker);
     ASSERT_EQ(0, ret);
-    if (truncated) {
-         ASSERT_NE(marker, next_marker);
-    } else {
-         ASSERT_EQ(marker, next_marker);
-    }
     marker = next_marker;
+    total_elements += entries.size();
   } while (truncated);
+
+  ASSERT_EQ(total_elements, number_of_ops*number_of_elements);
 }
 
 TEST_F(TestClsQueue, Dequeue)
@@ -141,7 +145,7 @@ TEST_F(TestClsQueue, Dequeue)
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
 
   // test multiple enqueues
-  test_enqueue(queue_name, op, 10, 100, 0);
+  test_enqueue(queue_name, 10, 100, 0);
 
   // get the end marker for 42 elements
   const auto remove_elements = 42;
@@ -207,7 +211,7 @@ TEST_F(TestClsQueue, ListAll)
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
 
   // test multiple enqueues
-  test_enqueue(queue_name, op, 10, 100, 0);
+  test_enqueue(queue_name, 10, 100, 0);
 
   const auto total_elements = 10*100;
   std::string marker;
@@ -230,7 +234,7 @@ TEST_F(TestClsQueue, DeleteAll)
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
 
   // test multiple enqueues
-  test_enqueue(queue_name, op, 10, 100, 0);
+  test_enqueue(queue_name, 10, 100, 0);
 
   const auto total_elements = 10*100;
   const std::string marker;
@@ -248,3 +252,249 @@ TEST_F(TestClsQueue, DeleteAll)
   ASSERT_EQ(entries.size(), 0);
 }
 
+TEST_F(TestClsQueue, EnqueueDequeue)
+{
+  const std::string queue_name = "my-queue";
+  const uint64_t queue_size = 1024*1024;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_queue_init(op, queue_name, queue_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  bool done = false;
+  const int number_of_ops = 10;
+  const int number_of_elements = 100;
+
+  std::thread producer([this, &queue_name, &done] {
+          test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
+          done = true;
+          });
+
+  auto consume_count = 0U;
+  std::thread consumer([this, &queue_name, &consume_count, &done] {
+          librados::ObjectWriteOperation op;
+          const auto max_elements = 42;
+          const std::string marker;
+          bool truncated = false;
+          std::string end_marker;
+          std::vector<cls_queue_entry> entries;
+          while (!done || truncated) {
+            const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
+            ASSERT_EQ(0, ret);
+            consume_count += entries.size();
+            cls_queue_remove_entries(op, end_marker); 
+            ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+          }
+       });
+
+  producer.join();
+  consumer.join();
+  ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
+}
+
+TEST_F(TestClsQueue, QueueFullDequeue)
+{
+  const std::string queue_name = "my-queue";
+  const uint64_t queue_size = 4096;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_queue_init(op, queue_name, queue_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  bool done = false;
+  const auto number_of_ops = 100;
+  const auto number_of_elements = 50;
+
+  std::thread producer([this, &queue_name, &done] {
+          librados::ObjectWriteOperation op;
+          // test multiple enqueues
+          for (auto i = 0; i < number_of_ops; ++i) {
+            const std::string element_prefix("op-" +to_string(i) + "-element-");
+            std::vector<bufferlist> data(number_of_elements);
+            // create vector of buffer lists
+            std::generate(data.begin(), data.end(), [j = 0, &element_prefix] () mutable {
+                    bufferlist bl;
+                    bl.append(element_prefix + to_string(j++));
+                    return bl;
+            });
+
+            // enqueue vector
+            cls_queue_enqueue(op, 0, data);
+            if (ioctx.operate(queue_name, &op) == -28) {
+              // queue is full - wait and retry
+              --i;
+              std::this_thread::sleep_for(std::chrono::milliseconds(100));
+            }
+          }
+          done = true;
+        });
+
+  auto consume_count = 0;
+  std::thread consumer([this, &queue_name, &consume_count, &done] {
+          librados::ObjectWriteOperation op;
+          const auto max_elements = 42;
+          std::string marker;
+          bool truncated = false;
+          std::string end_marker;
+          std::vector<cls_queue_entry> entries;
+          while (!done || truncated) {
+            auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
+            ASSERT_EQ(0, ret);
+            consume_count += entries.size();
+            cls_queue_remove_entries(op, end_marker); 
+            ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+          }
+       });
+
+  producer.join();
+  consumer.join();
+  ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
+}
+
+TEST_F(TestClsQueue, MultiProducer)
+{
+  const std::string queue_name = "my-queue";
+  const uint64_t queue_size = 1024*1024;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_queue_init(op, queue_name, queue_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  const int max_producer_count = 10;
+  int producer_count = max_producer_count;
+  const int number_of_ops = 10;
+  const int number_of_elements = 100;
+
+  std::vector<std::thread> producers(max_producer_count);
+  for (auto& p : producers) {
+    p = std::move(std::thread([this, &queue_name, &producer_count] {
+                test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
+                --producer_count;
+    }));
+  }
+
+  auto consume_count = 0U;
+  std::thread consumer([this, &queue_name, &consume_count, &producer_count] {
+          librados::ObjectWriteOperation op;
+          const auto max_elements = 42;
+          const std::string marker;
+          bool truncated = false;
+          std::string end_marker;
+          std::vector<cls_queue_entry> entries;
+          while (producer_count > 0 || truncated) {
+            const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
+            ASSERT_EQ(0, ret);
+            consume_count += entries.size();
+            cls_queue_remove_entries(op, end_marker); 
+            ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+          }
+       });
+
+  for (auto& p : producers) {
+      p.join();
+  }
+  consumer.join();
+  ASSERT_EQ(consume_count, number_of_ops*number_of_elements*max_producer_count);
+}
+
+TEST_F(TestClsQueue, MultiConsumer)
+{
+  const std::string queue_name = "my-queue";
+  const uint64_t queue_size = 1024*1024;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_queue_init(op, queue_name, queue_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  bool done = false;
+  const int number_of_ops = 10;
+  const int number_of_elements = 100;
+
+  std::thread producer([this, &queue_name, &done] {
+          test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
+          done = true;
+          });
+
+  int consume_count = 0;
+  std::mutex list_and_remove_lock;
+
+  std::vector<std::thread> consumers(10);
+  for (auto& c : consumers) {
+    c = std::thread([this, &queue_name, &consume_count, &done, &list_and_remove_lock] {
+          librados::ObjectWriteOperation op;
+          const auto max_elements = 42;
+          const std::string marker;
+          bool truncated = false;
+          std::string end_marker;
+          std::vector<cls_queue_entry> entries;
+          while (!done || truncated) {
+            std::lock_guard lock(list_and_remove_lock);
+            const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
+            ASSERT_EQ(0, ret);
+            consume_count += entries.size();
+            cls_queue_remove_entries(op, end_marker); 
+            ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+          }
+    });
+  }
+
+  producer.join();
+  for (auto& c : consumers) {
+      c.join();
+  }
+  ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
+}
+
+TEST_F(TestClsQueue, NoLockMultiConsumer)
+{
+  const std::string queue_name = "my-queue";
+  const uint64_t queue_size = 1024*1024;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_queue_init(op, queue_name, queue_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  bool done = false;
+  const int number_of_ops = 10;
+  const int number_of_elements = 100;
+
+  std::thread producer([this, &queue_name, &done] {
+          test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
+          done = true;
+          });
+
+  std::vector<std::thread> consumers(5);
+  for (auto& c : consumers) {
+    c = std::thread([this, &queue_name, &done] {
+          librados::ObjectWriteOperation op;
+          const auto max_elements = 42;
+          const std::string marker;
+          bool truncated = false;
+          std::string end_marker;
+          std::vector<cls_queue_entry> entries;
+          while (!done || truncated) {
+            const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
+            ASSERT_EQ(0, ret);
+            cls_queue_remove_entries(op, end_marker); 
+            ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+          }
+    });
+  }
+
+  producer.join();
+  for (auto& c : consumers) {
+      c.join();
+  }
+
+  // make sure queue is empty
+  const auto max_elements = 1000;
+  const std::string marker;
+  bool truncated = false;
+  std::string end_marker;
+  std::vector<cls_queue_entry> entries;
+  const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
+  ASSERT_EQ(0, ret);
+  ASSERT_EQ(entries.size(), 0);
+  ASSERT_EQ(truncated, false);
+}
+