for (auto& p : producers) {
p = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
- for (auto i = 0U; i < number_of_ops; ++i) {
+ for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, 0);
for (auto& p : producers) {
p = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
- for (auto i = 0U; i < number_of_ops; ++i) {
+ for (auto i = 0U; i < number_of_ops; ++i) {
const std::string element_prefix("op-" +to_string(i) + "-element-");
std::vector<bufferlist> data(number_of_elements);
auto total_size = 0UL;
for (auto& p : producers) {
p = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
- for (auto i = 0U; i < number_of_ops; ++i) {
+ for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, 0);
for (auto& r : reservers) {
r = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
- for (auto i = 0U; i < number_of_ops; ++i) {
+ for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
for (auto& r : reservers) {
r = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
- for (auto i = 0U; i < number_of_ops; ++i) {
+ for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
for (auto& r : reservers) {
r = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
- for (auto i = 0U; i < number_of_ops; ++i) {
+ for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
for (auto& r : reservers) {
r = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
- for (auto i = 0U; i < number_of_ops; ++i) {
+ for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
for (auto& p : producers) {
p = std::thread([this, &queue_name, &producer_count] {
librados::ObjectWriteOperation op;
- for (auto i = 0U; i < number_of_ops; ++i) {
+ for (auto i = 0U; i < number_of_ops; ++i) {
const std::string element_prefix("op-" +to_string(i) + "-element-");
std::vector<bufferlist> data(number_of_elements);
auto total_size = 0UL;
librados::ObjectWriteOperation op;
const auto max_elements = 42;
const std::string marker;
- bool truncated = false;
+ bool truncated = true;
std::string end_marker;
std::vector<cls_queue_entry> entries;
while (producer_count > 0 || truncated) {
const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
ASSERT_EQ(0, ret);
- consume_count += entries.size();
- cls_2pc_queue_remove_entries(op, end_marker, max_elements);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ if (entries.empty()) {
+ // queue is empty, let it fill
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ } else {
+ consume_count += entries.size();
+ cls_2pc_queue_remove_entries(op, end_marker, max_elements);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ }
}
});
cls_2pc_queue_init(wop, queue_name, max_size);
ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
-
for (auto i = 0U; i < number_of_ops; ++i) {
const std::string element_prefix("op-" +to_string(i) + "-element-");
std::vector<bufferlist> data(number_of_elements);
librados::ObjectReadOperation rop;
auto consume_count = 0U;
std::vector<cls_queue_entry> entries;
- bool truncated = true;
+ bool truncated = true;
while (truncated) {
- bufferlist bl;
- int rc;
+ bufferlist bl;
+ int rc;
cls_2pc_queue_list_entries(rop, marker, max_elements, &bl, &rc);
ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
ASSERT_EQ(rc, 0);
ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0);
consume_count += entries.size();
cls_2pc_queue_remove_entries(wop, end_marker, max_elements);
- marker = end_marker;
+ marker = end_marker;
}
ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
- // execute all delete operations in a batch
+ // execute all delete operations in a batch
ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
// make sure that queue is empty
ASSERT_EQ(cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker), 0);
for (auto& p : producers) {
p = std::thread([this, &queue_name, &producer_count, &retry_happened] {
librados::ObjectWriteOperation op;
- for (auto i = 0U; i < number_of_ops; ++i) {
+ for (auto i = 0U; i < number_of_ops; ++i) {
const std::string element_prefix("op-" +to_string(i) + "-element-");
std::vector<bufferlist> data(number_of_elements);
auto total_size = 0UL;
c = std::thread([this, &queue_name, &producer_count] {
librados::ObjectWriteOperation op;
const std::string marker;
- bool truncated = false;
+ bool truncated = true;
std::string end_marker;
std::vector<cls_queue_entry> entries;
while (producer_count > 0 || truncated) {