From: Yuval Lifshitz Date: Sun, 3 Sep 2023 16:40:12 +0000 (+0000) Subject: test/cls_2pc_queue: fix race condition with producers X-Git-Tag: v19.0.0~540^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=2b21513269fa0aa509545314e276be5fa3322265;p=ceph-ci.git test/cls_2pc_queue: fix race condition with producers this is happening when all producers are finished before the consumer starts. Fixes: https://tracker.ceph.com/issues/62449 Signed-off-by: Yuval Lifshitz --- diff --git a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc index d3570bf9c55..14947244d41 100644 --- a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc +++ b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc @@ -337,7 +337,7 @@ TEST_F(TestCls2PCQueue, MultiReserve) for (auto& p : producers) { p = std::thread([this, &queue_name] { librados::ObjectWriteOperation op; - for (auto i = 0U; i < number_of_ops; ++i) { + for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, 0); @@ -373,7 +373,7 @@ TEST_F(TestCls2PCQueue, MultiCommit) for (auto& p : producers) { p = std::thread([this, &queue_name] { librados::ObjectWriteOperation op; - for (auto i = 0U; i < number_of_ops; ++i) { + for (auto i = 0U; i < number_of_ops; ++i) { const std::string element_prefix("op-" +to_string(i) + "-element-"); std::vector data(number_of_elements); auto total_size = 0UL; @@ -417,7 +417,7 @@ TEST_F(TestCls2PCQueue, MultiAbort) for (auto& p : producers) { p = std::thread([this, &queue_name] { librados::ObjectWriteOperation op; - for (auto i = 0U; i < number_of_ops; ++i) { + for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, 0); @@ -451,7 +451,7 @@ TEST_F(TestCls2PCQueue, ReserveCommit) for (auto& r : reservers) { r = std::thread([this, &queue_name] { librados::ObjectWriteOperation op; - for (auto i = 0U; i < number_of_ops; ++i) { + for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); @@ -506,7 +506,7 @@ TEST_F(TestCls2PCQueue, ReserveAbort) for (auto& r : reservers) { r = std::thread([this, &queue_name] { librados::ObjectWriteOperation op; - for (auto i = 0U; i < number_of_ops; ++i) { + for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); @@ -556,7 +556,7 @@ TEST_F(TestCls2PCQueue, ManualCleanup) for (auto& r : reservers) { r = std::thread([this, &queue_name] { librados::ObjectWriteOperation op; - for (auto i = 0U; i < number_of_ops; ++i) { + for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); @@ -630,7 +630,7 @@ TEST_F(TestCls2PCQueue, Cleanup) for (auto& r : reservers) { r = std::thread([this, &queue_name] { librados::ObjectWriteOperation op; - for (auto i = 0U; i < number_of_ops; ++i) { + for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); @@ -683,7 +683,7 @@ TEST_F(TestCls2PCQueue, MultiProducer) for (auto& p : producers) { p = std::thread([this, &queue_name, &producer_count] { librados::ObjectWriteOperation op; - for (auto i = 0U; i < number_of_ops; ++i) { + for (auto i = 0U; i < number_of_ops; ++i) { const std::string element_prefix("op-" +to_string(i) + "-element-"); std::vector data(number_of_elements); auto total_size = 0UL; @@ -709,15 +709,20 @@ TEST_F(TestCls2PCQueue, MultiProducer) librados::ObjectWriteOperation op; const auto max_elements = 42; const std::string marker; - bool truncated = false; + bool truncated = true; std::string end_marker; std::vector entries; while (producer_count > 0 || truncated) { const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker); ASSERT_EQ(0, ret); - consume_count += entries.size(); - cls_2pc_queue_remove_entries(op, end_marker, max_elements); - ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + if (entries.empty()) { + // queue is empty, let it fill + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } else { + consume_count += entries.size(); + cls_2pc_queue_remove_entries(op, end_marker, max_elements); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } } }); @@ -737,7 +742,6 @@ TEST_F(TestCls2PCQueue, AsyncConsumer) cls_2pc_queue_init(wop, queue_name, max_size); ASSERT_EQ(0, ioctx.operate(queue_name, &wop)); - for (auto i = 0U; i < number_of_ops; ++i) { const std::string element_prefix("op-" +to_string(i) + "-element-"); std::vector data(number_of_elements); @@ -762,21 +766,21 @@ TEST_F(TestCls2PCQueue, AsyncConsumer) librados::ObjectReadOperation rop; auto consume_count = 0U; std::vector entries; - bool truncated = true; + bool truncated = true; while (truncated) { - bufferlist bl; - int rc; + bufferlist bl; + int rc; cls_2pc_queue_list_entries(rop, marker, max_elements, &bl, &rc); ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr)); ASSERT_EQ(rc, 0); ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0); consume_count += entries.size(); cls_2pc_queue_remove_entries(wop, end_marker, max_elements); - marker = end_marker; + marker = end_marker; } ASSERT_EQ(consume_count, number_of_ops*number_of_elements); - // execute all delete operations in a batch + // execute all delete operations in a batch ASSERT_EQ(0, ioctx.operate(queue_name, &wop)); // make sure that queue is empty ASSERT_EQ(cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker), 0); @@ -803,7 +807,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer) for (auto& p : producers) { p = std::thread([this, &queue_name, &producer_count, &retry_happened] { librados::ObjectWriteOperation op; - for (auto i = 0U; i < number_of_ops; ++i) { + for (auto i = 0U; i < number_of_ops; ++i) { const std::string element_prefix("op-" +to_string(i) + "-element-"); std::vector data(number_of_elements); auto total_size = 0UL; @@ -839,7 +843,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer) c = std::thread([this, &queue_name, &producer_count] { librados::ObjectWriteOperation op; const std::string marker; - bool truncated = false; + bool truncated = true; std::string end_marker; std::vector entries; while (producer_count > 0 || truncated) {