]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test/cls_2pc_queue: fix race condition with producers
authorYuval Lifshitz <ylifshit@redhat.com>
Sun, 3 Sep 2023 16:40:12 +0000 (16:40 +0000)
committerYuval Lifshitz <ylifshit@redhat.com>
Tue, 5 Sep 2023 09:18:09 +0000 (09:18 +0000)
this is happening when all producers are finished before the consumer
starts.

Fixes: https://tracker.ceph.com/issues/62449
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/test/cls_2pc_queue/test_cls_2pc_queue.cc

index d3570bf9c557e523431ed8bd07d53430c3c78faf..14947244d41f4d2528dac04c096c5f53cc99ead8 100644 (file)
@@ -337,7 +337,7 @@ TEST_F(TestCls2PCQueue, MultiReserve)
   for (auto& p : producers) {
     p = std::thread([this, &queue_name] {
       librados::ObjectWriteOperation op;
-         for (auto i = 0U; i < number_of_ops; ++i) {
+      for (auto i = 0U; i < number_of_ops; ++i) {
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
         ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
         ASSERT_NE(res_id, 0);
@@ -373,7 +373,7 @@ TEST_F(TestCls2PCQueue, MultiCommit)
   for (auto& p : producers) {
     p = std::thread([this, &queue_name] {
       librados::ObjectWriteOperation op;
-         for (auto i = 0U; i < number_of_ops; ++i) {
+      for (auto i = 0U; i < number_of_ops; ++i) {
         const std::string element_prefix("op-" +to_string(i) + "-element-");
         std::vector<bufferlist> data(number_of_elements);
         auto total_size = 0UL;
@@ -417,7 +417,7 @@ TEST_F(TestCls2PCQueue, MultiAbort)
   for (auto& p : producers) {
     p = std::thread([this, &queue_name] {
       librados::ObjectWriteOperation op;
-         for (auto i = 0U; i < number_of_ops; ++i) {
+      for (auto i = 0U; i < number_of_ops; ++i) {
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
         ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
         ASSERT_NE(res_id, 0);
@@ -451,7 +451,7 @@ TEST_F(TestCls2PCQueue, ReserveCommit)
   for (auto& r : reservers) {
     r = std::thread([this, &queue_name] {
       librados::ObjectWriteOperation op;
-         for (auto i = 0U; i < number_of_ops; ++i) {
+      for (auto i = 0U; i < number_of_ops; ++i) {
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
         ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
         ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
@@ -506,7 +506,7 @@ TEST_F(TestCls2PCQueue, ReserveAbort)
   for (auto& r : reservers) {
     r = std::thread([this, &queue_name] {
       librados::ObjectWriteOperation op;
-         for (auto i = 0U; i < number_of_ops; ++i) {
+      for (auto i = 0U; i < number_of_ops; ++i) {
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
         ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
         ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
@@ -556,7 +556,7 @@ TEST_F(TestCls2PCQueue, ManualCleanup)
   for (auto& r : reservers) {
     r = std::thread([this, &queue_name] {
       librados::ObjectWriteOperation op;
-         for (auto i = 0U; i < number_of_ops; ++i) {
+      for (auto i = 0U; i < number_of_ops; ++i) {
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
         ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
         ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
@@ -630,7 +630,7 @@ TEST_F(TestCls2PCQueue, Cleanup)
   for (auto& r : reservers) {
     r = std::thread([this, &queue_name] {
       librados::ObjectWriteOperation op;
-         for (auto i = 0U; i < number_of_ops; ++i) {
+      for (auto i = 0U; i < number_of_ops; ++i) {
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
         ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
         ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
@@ -683,7 +683,7 @@ TEST_F(TestCls2PCQueue, MultiProducer)
   for (auto& p : producers) {
     p = std::thread([this, &queue_name, &producer_count] {
       librados::ObjectWriteOperation op;
-         for (auto i = 0U; i < number_of_ops; ++i) {
+      for (auto i = 0U; i < number_of_ops; ++i) {
         const std::string element_prefix("op-" +to_string(i) + "-element-");
         std::vector<bufferlist> data(number_of_elements);
         auto total_size = 0UL;
@@ -709,15 +709,20 @@ TEST_F(TestCls2PCQueue, MultiProducer)
           librados::ObjectWriteOperation op;
           const auto max_elements = 42;
           const std::string marker;
-          bool truncated = false;
+          bool truncated = true;
           std::string end_marker;
           std::vector<cls_queue_entry> entries;
           while (producer_count > 0 || truncated) {
             const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
             ASSERT_EQ(0, ret);
-            consume_count += entries.size();
-            cls_2pc_queue_remove_entries(op, end_marker, max_elements);
-            ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+            if (entries.empty()) {
+              // queue is empty, let it fill
+              std::this_thread::sleep_for(std::chrono::milliseconds(100));
+            } else {
+              consume_count += entries.size();
+              cls_2pc_queue_remove_entries(op, end_marker, max_elements);
+              ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+            }
           }
        });
 
@@ -737,7 +742,6 @@ TEST_F(TestCls2PCQueue, AsyncConsumer)
   cls_2pc_queue_init(wop, queue_name, max_size);
   ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
 
-
   for (auto i = 0U; i < number_of_ops; ++i) {
     const std::string element_prefix("op-" +to_string(i) + "-element-");
     std::vector<bufferlist> data(number_of_elements);
@@ -762,21 +766,21 @@ TEST_F(TestCls2PCQueue, AsyncConsumer)
   librados::ObjectReadOperation rop;
   auto consume_count = 0U;
   std::vector<cls_queue_entry> entries;
-       bool truncated = true;
+  bool truncated = true;
   while (truncated) {
-               bufferlist bl;
-               int rc;
+    bufferlist bl;
+    int rc;
     cls_2pc_queue_list_entries(rop, marker, max_elements, &bl, &rc);
     ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
     ASSERT_EQ(rc, 0);
     ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0);
     consume_count += entries.size();
     cls_2pc_queue_remove_entries(wop, end_marker, max_elements);
-               marker = end_marker;
+    marker = end_marker;
   }
 
   ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
-       // execute all delete operations in a batch
+  // execute all delete operations in a batch
   ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
   // make sure that queue is empty
   ASSERT_EQ(cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker), 0);
@@ -803,7 +807,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer)
   for (auto& p : producers) {
     p = std::thread([this, &queue_name, &producer_count, &retry_happened] {
       librados::ObjectWriteOperation op;
-         for (auto i = 0U; i < number_of_ops; ++i) {
+      for (auto i = 0U; i < number_of_ops; ++i) {
         const std::string element_prefix("op-" +to_string(i) + "-element-");
         std::vector<bufferlist> data(number_of_elements);
         auto total_size = 0UL;
@@ -839,7 +843,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer)
     c = std::thread([this, &queue_name, &producer_count] {
           librados::ObjectWriteOperation op;
           const std::string marker;
-          bool truncated = false;
+          bool truncated = true;
           std::string end_marker;
           std::vector<cls_queue_entry> entries;
           while (producer_count > 0 || truncated) {