]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/cls_2pc_queue: prevent list+remove race between consumers 58911/head
authorYuval Lifshitz <ylifshit@ibm.com>
Mon, 29 Jul 2024 16:51:33 +0000 (16:51 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Mon, 5 Aug 2024 08:34:56 +0000 (08:34 +0000)
* make sure that the queue full condition is covered
* add cls debugging to test

Fixes: https://tracker.ceph.com/issues/67229
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
qa/suites/rgw/verify/tasks/cls.yaml
src/cls/2pc_queue/cls_2pc_queue.cc
src/test/cls_2pc_queue/test_cls_2pc_queue.cc

index 8034715353f70b1c834f819deea3a80dcc085c77..26f948d42ecabd8a6d3599ca0cad2eb8da290668 100644 (file)
@@ -1,3 +1,8 @@
+overrides:
+  ceph:
+    conf:
+      osd:
+        debug objclass: 20
 tasks:
 - workunit:
     clients:
index 759d360b014846ec0736020b9a004290322bea94..6e6b6e02db5e5fefb5ba7e1def84ab82ffad703e 100644 (file)
@@ -616,10 +616,10 @@ static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *i
     list_op.end_marker = rem_2pc_op.end_marker;
     ret = cls_2pc_queue_count_entries(hctx, list_op, head, rem_2pc_op.entries_to_remove);
     if (ret < 0) {
-      CLS_LOG(1, "ERROR: cls_2pc_queue_count_entries: returned: %d", ret);
+      CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: returned: %d", ret);
       return ret;
     }
-    CLS_LOG(10, "INFO: cls_2pc_queue_count_entries: counted: %u", rem_2pc_op.entries_to_remove);
+    CLS_LOG(10, "INFO: cls_2pc_queue_remove_entries: counted: %u", rem_2pc_op.entries_to_remove);
   }
 
   cls_queue_remove_op rem_op;
index f0c71c7492e3239e58a4151ff945ddea4a975d0b..9d988a498087d39e77d26621eda51236a95a8880 100644 (file)
@@ -964,34 +964,54 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer)
   }
 
   const auto max_elements = 128;
-  std::vector<std::thread> consumers(max_workers/2);
-  for (auto& c : consumers) {
-    c = std::thread([this, &queue_name, &producer_count] {
+  std::vector<std::thread> readers(max_workers/2);
+  for (auto& c : readers) {
+    c = std::thread([this, &queue_name, &producer_count, &retry_happened] {
           librados::ObjectWriteOperation op;
           const std::string marker;
           bool truncated = true;
           std::string end_marker;
           std::vector<cls_queue_entry> entries;
           while (producer_count > 0 || truncated) {
+            if (!retry_happened) {
+              // queue was never full, let it fill
+              std::this_thread::sleep_for(std::chrono::milliseconds(100));
+              continue;
+            }
             const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
             ASSERT_EQ(0, ret);
             if (entries.empty()) {
-              // queue is empty, let it fill
-              std::this_thread::sleep_for(std::chrono::milliseconds(100));
-            } else {
-              cls_2pc_queue_remove_entries(op, end_marker, max_elements);
-              ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+              // another consumer has emptied the queue
+              return; 
             }
           }
        });
   }
+  
+  auto deleter = std::thread([this, &queue_name, &producer_count, &retry_happened] {
+      librados::ObjectWriteOperation op;
+      const std::string marker;
+      bool truncated = true;
+      std::string end_marker;
+      std::vector<cls_queue_entry> entries;
+      while (producer_count > 0 || truncated) {
+        if (!retry_happened) {
+          // queue was never full, let it fill
+          std::this_thread::sleep_for(std::chrono::milliseconds(100));
+          continue;
+        }
+        const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
+        ASSERT_EQ(0, ret);
+        ASSERT_FALSE(entries.empty());
+        cls_2pc_queue_remove_entries(op, end_marker, max_elements);
+        ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+      }
+  });
 
   std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); });
-  std::for_each(consumers.begin(), consumers.end(), [](auto& c) { c.join(); });
-  if (!retry_happened) {
-      std::cerr << "Queue was never full - all reservations were successful." <<
-          "Please decrease the amount of consumer threads" << std::endl;
-  }
+  std::for_each(readers.begin(), readers.end(), [](auto& c) { c.join(); });
+  deleter.join();
+  ASSERT_TRUE(retry_happened);
   // make sure that queue is empty and no reservations remain
   cls_2pc_reservations reservations;
   ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));