return 0;
}
+static int cls_2pc_queue_count_entries(cls_method_context_t hctx, cls_queue_list_op& op, cls_queue_head& head,
+ uint32_t& entries_to_remove)
+{
+ cls_queue_list_ret op_ret;
+ auto ret = queue_list_entries(hctx, op, op_ret, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ entries_to_remove = op_ret.entries.size();
+ return 0;
+}
+
static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
auto in_iter = in->cbegin();
if (ret < 0) {
return ret;
}
+
+ // Old RGW is running, and it sent cls_queue_remove_op instead of cls_2pc_queue_remove_op
+ if (rem_2pc_op.entries_to_remove == 0) {
+ CLS_LOG(10, "INFO: cls_2pc_queue_remove_entries: incompatible RGW with rados, counting entries to remove...");
+ cls_queue_list_op list_op;
+ list_op.max = std::numeric_limits<uint64_t>::max(); // max length because endmarker is the stopping condition.
+ list_op.end_marker = rem_2pc_op.end_marker;
+ ret = cls_2pc_queue_count_entries(hctx, list_op, head, rem_2pc_op.entries_to_remove);
+ if (ret < 0) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_count_entries: returned: %d", ret);
+ return ret;
+ }
+ CLS_LOG(10, "INFO: cls_2pc_queue_count_entries: counted: %u", rem_2pc_op.entries_to_remove);
+ }
+
cls_queue_remove_op rem_op;
rem_op.end_marker = std::move(rem_2pc_op.end_marker);
ret = queue_remove_entries(hctx, rem_op, head);
ceph::coarse_real_time stale_time);
// remove all entries up to the given marker
-void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove);
+// if there is no race condition, providing the number of entries_to_remove is recommended, as it is more efficient.
+// if there is no guarantee against two clienst deleting entries at the same time, you can leave the entries_to_remove unprovided or input zero entries_to_remove
+// the function will count how many entries it needs to removed
+void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove=0);
struct cls_2pc_queue_remove_op {
std::string end_marker;
- uint32_t entries_to_remove;
+ uint32_t entries_to_remove = 0;
cls_2pc_queue_remove_op() {}
void encode(ceph::buffer::list& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(end_marker, bl);
encode(entries_to_remove, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(end_marker, bl);
- decode(entries_to_remove, bl);
+ if (struct_v > 1) {
+ decode(entries_to_remove, bl);
+ }
DECODE_FINISH(bl);
}
};
op.exec(QUEUE_CLASS, QUEUE_ENQUEUE, in);
}
-int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max,
- vector<cls_queue_entry>& entries,
- bool *truncated, string& next_marker)
+int cls_queue_list_entries_inner(IoCtx& io_ctx, const string& oid, vector<cls_queue_entry>& entries,
+ bool *truncated, string& next_marker, bufferlist& in, bufferlist& out)
{
- bufferlist in, out;
- cls_queue_list_op op;
- op.start_marker = marker;
- op.max = max;
- encode(op, in);
-
int r = io_ctx.exec(oid, QUEUE_CLASS, QUEUE_LIST_ENTRIES, in, out);
if (r < 0)
return r;
return 0;
}
+int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max,
+ vector<cls_queue_entry>& entries,
+ bool *truncated, string& next_marker)
+{
+ bufferlist in, out;
+ cls_queue_list_op op;
+ op.start_marker = marker;
+ op.max = max;
+ encode(op, in);
+
+ return cls_queue_list_entries_inner(io_ctx, oid, entries, truncated, next_marker, in, out);
+}
+
+int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, const string& end_marker,
+ vector<cls_queue_entry>& entries,
+ bool *truncated, string& next_marker)
+{
+ bufferlist in, out;
+ cls_queue_list_op op;
+ op.start_marker = marker;
+ op.max = std::numeric_limits<uint64_t>::max();
+ op.end_marker = end_marker;
+ encode(op, in);
+
+ return cls_queue_list_entries_inner(io_ctx, oid, entries, truncated, next_marker, in, out);
+}
+
void cls_queue_remove_entries(ObjectWriteOperation& op, const string& end_marker)
{
bufferlist in, out;
void cls_queue_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, std::vector<bufferlist> bl_data_vec);
int cls_queue_list_entries(librados::IoCtx& io_ctx, const std::string& oid, const std::string& marker, uint32_t max,
std::vector<cls_queue_entry>& entries, bool *truncated, std::string& next_marker);
+int cls_queue_list_entries(librados::IoCtx& io_ctx, const std::string& oid, const std::string& marker, const std::string& end_marker,
+ std::vector<cls_queue_entry>& entries, bool *truncated, std::string& next_marker);
void cls_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker);
#endif
struct cls_queue_list_op {
uint64_t max;
std::string start_marker;
+ std::string end_marker;
cls_queue_list_op() {}
void encode(ceph::buffer::list& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(max, bl);
encode(start_marker, bl);
+ encode(end_marker, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(max, bl);
decode(start_marker, bl);
+ if (struct_v > 1) {
+ decode(end_marker, bl);
+ }
DECODE_FINISH(bl);
}
};
CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!");
break;
}
+ if (!op.end_marker.empty() && entry.marker == op.end_marker) {
+ last_marker = entry.marker;
+ break;
+ }
op_ret.entries.emplace_back(entry);
// Resetting some values
offset_populated = false;
}
} while(index < bl_chunk.length());
- CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max);
+ CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu, last_marker: %s and op.end_marker is %s\n",
+ num_ops, op.max, last_marker.c_str(), op.end_marker.c_str());
- if (num_ops == op.max) {
- next_marker = cls_queue_marker{(entry_start_offset + index), gen};
- CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu", next_marker.offset);
+ if (num_ops == op.max || (!op.end_marker.empty() && op.end_marker == last_marker)) {
+ if (!op.end_marker.empty()) {
+ next_marker.from_str(op.end_marker.c_str());
+ } else {
+ next_marker = cls_queue_marker{(entry_start_offset + index), gen};
+ }
+ CLS_LOG(10, "INFO: queue_list_entries(): either num_ops is same as op.max or last_marker is same as op.end_marker, "
+ "hence breaking out from outer loop with next offset: %lu", next_marker.offset);
break;
}
#include "gtest/gtest.h"
#include "test/librados/test_cxx.h"
#include "global/global_context.h"
+#include "cls/2pc_queue/cls_2pc_queue_const.h"
#include <string>
#include <vector>
ASSERT_EQ(reservations.size(), 0);
}
+TEST_F(TestCls2PCQueue, Stats)
+{
+ const std::string queue_name = __PRETTY_FUNCTION__;
+ const auto max_size = 1024*1024*128;
+ const auto number_of_ops = 200U;
+ const auto number_of_elements = 23U;
+ auto total_committed_elements = 0U;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_2pc_queue_init(op, queue_name, max_size);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ for (auto i = 0U; i < number_of_ops; ++i) {
+ const std::string element_prefix("op-" +to_string(i) + "-element-");
+ auto total_size = 0UL;
+ std::vector<bufferlist> data(number_of_elements);
+ // create vector of buffer lists
+ std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
+ bufferlist bl;
+ bl.append(element_prefix + to_string(j++));
+ total_size += bl.length();
+ return bl;
+ });
+
+ cls_2pc_reservation::id_t res_id;
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
+ ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+ cls_2pc_queue_commit(op, data, res_id);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ total_committed_elements += number_of_elements;
+ uint32_t committed_entries;
+ uint64_t size;
+
+ ASSERT_EQ(cls_2pc_queue_get_topic_stats(ioctx, queue_name, committed_entries, size), 0);
+ ASSERT_EQ(committed_entries, total_committed_elements);
+ }
+ cls_2pc_reservations reservations;
+ ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+ ASSERT_EQ(reservations.size(), 0);
+}
+
+TEST_F(TestCls2PCQueue, UpgradeFromReef)
+{
+ const std::string queue_name = __PRETTY_FUNCTION__;
+ const auto max_size = 1024*1024*128;
+ const auto number_of_ops = 200U;
+ const auto number_of_elements = 23U;
+ auto total_committed_elements = 0U;
+ librados::ObjectWriteOperation wop;
+ wop.create(true);
+ cls_2pc_queue_init(wop, queue_name, max_size);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+
+ for (auto i = 0U; i < number_of_ops; ++i) {
+ const std::string element_prefix("wop-" +to_string(i) + "-element-");
+ auto total_size = 0UL;
+ std::vector<bufferlist> data(number_of_elements);
+ // create vector of buffer lists
+ std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
+ bufferlist bl;
+ bl.append(element_prefix + to_string(j++));
+ total_size += bl.length();
+ return bl;
+ });
+
+ cls_2pc_reservation::id_t res_id;
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
+ ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+ cls_2pc_queue_commit(wop, data, res_id);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+
+ total_committed_elements += number_of_elements;
+ uint32_t committed_entries;
+ uint64_t size;
+
+ ASSERT_EQ(cls_2pc_queue_get_topic_stats(ioctx, queue_name, committed_entries, size), 0);
+ ASSERT_EQ(committed_entries, total_committed_elements);
+ }
+ cls_2pc_reservations reservations;
+ ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+ ASSERT_EQ(reservations.size(), 0);
+
+ constexpr auto max_elements = 42U;
+ std::string marker;
+ std::string end_marker;
+ librados::ObjectReadOperation rop;
+ auto consume_count = 0U;
+ std::vector<cls_queue_entry> entries;
+ bool truncated = true;
+
+ auto simulate_reef_cls_2pc_queue_remove_entries = [](librados::ObjectWriteOperation& wop, const std::string& end_marker) {
+ bufferlist in;
+ cls_queue_remove_op rem_op;
+ rem_op.end_marker = end_marker;
+ encode(rem_op, in);
+ wop.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in);
+ };
+
+ while (truncated) {
+ bufferlist bl;
+ int rc;
+ cls_2pc_queue_list_entries(rop, marker, max_elements, &bl, &rc);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
+ ASSERT_EQ(rc, 0);
+ ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0);
+
+ consume_count += entries.size();
+ // simulating reef cls_2pc_queue_remove_entries with cls_queue_remove_op
+ simulate_reef_cls_2pc_queue_remove_entries(wop, end_marker);
+ marker = end_marker;
+ total_committed_elements -= entries.size();
+ }
+
+ // execute all delete operations in a batch
+ ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+ ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
+
+ uint32_t entries_number;
+ uint64_t size;
+ ASSERT_EQ(cls_2pc_queue_get_topic_stats(ioctx, queue_name, entries_number, size), 0);
+ ASSERT_EQ(total_committed_elements, 0);
+ ASSERT_EQ(entries_number, 0);
+}
+
TEST_F(TestCls2PCQueue, Abort)
{
const std::string queue_name = __PRETTY_FUNCTION__;
ASSERT_EQ(total_elements, number_of_ops*number_of_elements);
}
+TEST_F(TestClsQueue, ListByEndMarker)
+{
+ const std::string queue_name = "my-queue";
+ const uint64_t queue_size = 1024*1024;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_queue_init(op, queue_name, queue_size);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ const auto number_of_ops = 10;
+ const auto number_of_elements = 100;
+
+ // test multiple enqueues
+ test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
+
+ const auto max_elements = 42;
+ std::string marker, end_marker;
+ bool truncated = false;
+ std::string max_op_next_marker;
+ auto total_elements = 0;
+ do {
+ std::vector<cls_queue_entry> entries;
+ auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, max_op_next_marker);
+ ASSERT_EQ(0, ret);
+ end_marker = max_op_next_marker;
+
+ std::vector<cls_queue_entry> end_marker_entries;
+ std::string end_marker_next_marker;
+ bool end_marker_truncated = false;
+ ret = cls_queue_list_entries(ioctx, queue_name, marker, end_marker, end_marker_entries,
+ &end_marker_truncated, end_marker_next_marker);
+ ASSERT_EQ(0, ret);
+
+ ASSERT_EQ(end_marker_next_marker, end_marker);
+ ASSERT_EQ(end_marker_entries.size(), entries.size());
+ for (auto i = 0U; i < end_marker_entries.size() && i < entries.size(); ++i) {
+ ASSERT_EQ(end_marker_entries[i].marker, entries[i].marker);
+ }
+
+ marker = max_op_next_marker;
+ total_elements += entries.size();
+ } while (truncated);
+
+ ASSERT_EQ(total_elements, number_of_ops*number_of_elements);
+}
+
TEST_F(TestClsQueue, Dequeue)
{
const std::string queue_name = "my-queue";