]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
RGW: make new rados support old RGW 2pc remove 54459/head
authorAli Masarwa <ali.saed.masarwa@gmail.com>
Tue, 31 Oct 2023 13:55:41 +0000 (15:55 +0200)
committerAli Masarwa <ali.saed.masarwa@gmail.com>
Sun, 26 Nov 2023 19:57:51 +0000 (21:57 +0200)
Signed-off-by: Ali Masarwa <ali.saed.masarwa@gmail.com>
src/cls/2pc_queue/cls_2pc_queue.cc
src/cls/2pc_queue/cls_2pc_queue_client.h
src/cls/2pc_queue/cls_2pc_queue_ops.h
src/cls/queue/cls_queue_client.cc
src/cls/queue/cls_queue_client.h
src/cls/queue/cls_queue_ops.h
src/cls/queue/cls_queue_src.cc
src/test/cls_2pc_queue/test_cls_2pc_queue.cc
src/test/cls_queue/test_cls_queue.cc

index 019f2c96deafe71df902060452a41f5bb6d23dd3..1e99503943a3512209f694e996acc5239618887a 100644 (file)
@@ -578,6 +578,19 @@ static int cls_2pc_queue_list_entries(cls_method_context_t hctx, bufferlist *in,
   return 0;
 }
 
+static int cls_2pc_queue_count_entries(cls_method_context_t hctx, cls_queue_list_op& op, cls_queue_head& head,
+                                       uint32_t& entries_to_remove)
+{
+  cls_queue_list_ret op_ret;
+  auto ret = queue_list_entries(hctx, op, op_ret, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  entries_to_remove = op_ret.entries.size();
+  return 0;
+}
+
 static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   auto in_iter = in->cbegin();
@@ -594,6 +607,21 @@ static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *i
   if (ret < 0) {
     return ret;
   }
+
+  // Old RGW is running, and it sent cls_queue_remove_op instead of cls_2pc_queue_remove_op
+  if (rem_2pc_op.entries_to_remove == 0) {
+    CLS_LOG(10, "INFO: cls_2pc_queue_remove_entries: incompatible RGW with rados, counting entries to remove...");
+    cls_queue_list_op list_op;
+    list_op.max = std::numeric_limits<uint64_t>::max(); // max length because endmarker is the stopping condition.
+    list_op.end_marker = rem_2pc_op.end_marker;
+    ret = cls_2pc_queue_count_entries(hctx, list_op, head, rem_2pc_op.entries_to_remove);
+    if (ret < 0) {
+      CLS_LOG(1, "ERROR: cls_2pc_queue_count_entries: returned: %d", ret);
+      return ret;
+    }
+    CLS_LOG(10, "INFO: cls_2pc_queue_count_entries: counted: %u", rem_2pc_op.entries_to_remove);
+  }
+
   cls_queue_remove_op rem_op;
   rem_op.end_marker = std::move(rem_2pc_op.end_marker);
   ret = queue_remove_entries(hctx, rem_op, head);
index c806d30f59e0fd404ff94097b6af192df02ddf7b..0d55d68e7a05c1ef37d784ec2edd6d616172155c 100644 (file)
@@ -87,5 +87,8 @@ void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op,
         ceph::coarse_real_time stale_time);
 
 // remove all entries up to the given marker
-void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove);
+// if there is no race condition, providing the number of entries_to_remove is recommended, as it is more efficient.
+// if there is no guarantee against two clienst deleting entries at the same time, you can leave the entries_to_remove unprovided or input zero entries_to_remove
+// the function will count how many entries it needs to removed
+void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove=0);
 
index bb61ef341ac1e8523569beb3a815b46a12a4c999..194fdf6da9d56faacf431d6f0a3cd78cd28b7a82 100644 (file)
@@ -118,21 +118,23 @@ WRITE_CLASS_ENCODER(cls_2pc_queue_reservations_ret)
 
 struct cls_2pc_queue_remove_op {
   std::string end_marker;
-  uint32_t entries_to_remove;
+  uint32_t entries_to_remove = 0;
 
   cls_2pc_queue_remove_op() {}
 
   void encode(ceph::buffer::list& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(end_marker, bl);
     encode(entries_to_remove, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(ceph::buffer::list::const_iterator& bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
     decode(end_marker, bl);
-    decode(entries_to_remove, bl);
+    if (struct_v > 1) {
+      decode(entries_to_remove, bl);
+    }
     DECODE_FINISH(bl);
   }
 };
index 87d17bb9e3157df6ba9ae2cf8194415fb1838ee6..d3d38a9214e5030a99e6fbfcfc8c968da6643a8b 100644 (file)
@@ -48,16 +48,9 @@ void cls_queue_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, vecto
   op.exec(QUEUE_CLASS, QUEUE_ENQUEUE, in);
 }
 
-int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max,
-                            vector<cls_queue_entry>& entries,
-                            bool *truncated, string& next_marker)
+int cls_queue_list_entries_inner(IoCtx& io_ctx, const string& oid, vector<cls_queue_entry>& entries,
+                                 bool *truncated, string& next_marker, bufferlist& in, bufferlist& out)
 {
-  bufferlist in, out;
-  cls_queue_list_op op;
-  op.start_marker = marker;
-  op.max = max;
-  encode(op, in);
-
   int r = io_ctx.exec(oid, QUEUE_CLASS, QUEUE_LIST_ENTRIES, in, out);
   if (r < 0)
     return r;
@@ -78,6 +71,33 @@ int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marke
   return 0;
 }
 
+int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max,
+                            vector<cls_queue_entry>& entries,
+                            bool *truncated, string& next_marker)
+{
+  bufferlist in, out;
+  cls_queue_list_op op;
+  op.start_marker = marker;
+  op.max = max;
+  encode(op, in);
+
+  return cls_queue_list_entries_inner(io_ctx, oid, entries, truncated, next_marker, in, out);
+}
+
+int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, const string& end_marker,
+                           vector<cls_queue_entry>& entries,
+                           bool *truncated, string& next_marker)
+{
+  bufferlist in, out;
+  cls_queue_list_op op;
+  op.start_marker = marker;
+  op.max = std::numeric_limits<uint64_t>::max();
+  op.end_marker = end_marker;
+  encode(op, in);
+
+  return cls_queue_list_entries_inner(io_ctx, oid, entries, truncated, next_marker, in, out);
+}
+
 void cls_queue_remove_entries(ObjectWriteOperation& op, const string& end_marker)
 {
   bufferlist in, out;
index 895a51c117379c6fbb3af5916aee30cff336aa51..903448fd480bf2a6fc791a5699a36e3aa1da59d9 100644 (file)
@@ -11,6 +11,8 @@ int cls_queue_get_capacity(librados::IoCtx& io_ctx, const std::string& oid, uint
 void cls_queue_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, std::vector<bufferlist> bl_data_vec);
 int cls_queue_list_entries(librados::IoCtx& io_ctx, const std::string& oid, const std::string& marker, uint32_t max,
                     std::vector<cls_queue_entry>& entries, bool *truncated, std::string& next_marker);
+int cls_queue_list_entries(librados::IoCtx& io_ctx, const std::string& oid, const std::string& marker, const std::string& end_marker,
+                           std::vector<cls_queue_entry>& entries, bool *truncated, std::string& next_marker);
 void cls_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker);
 
 #endif
index 8209659bda907b7492f6a6366e4762698fa0eec1..f41572036c51474cc322616c8ea8782cf3904b32 100644 (file)
@@ -54,20 +54,25 @@ WRITE_CLASS_ENCODER(cls_queue_enqueue_op)
 struct cls_queue_list_op {
   uint64_t max;
   std::string start_marker;
+  std::string end_marker;
 
   cls_queue_list_op() {}
 
   void encode(ceph::buffer::list& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(max, bl);
     encode(start_marker, bl);
+    encode(end_marker, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(ceph::buffer::list::const_iterator& bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
     decode(max, bl);
     decode(start_marker, bl);
+    if (struct_v > 1) {
+      decode(end_marker, bl);
+    }
     DECODE_FINISH(bl);
   }
 };
index b34d9929b93a136e6d7fa34db86d1e983b8c0b56..028b1cb123c913aea2158df0675b910b22469ef3 100644 (file)
@@ -400,6 +400,10 @@ int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, c
         CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!");
         break;
       }
+      if (!op.end_marker.empty() && entry.marker == op.end_marker) {
+        last_marker = entry.marker;
+        break;
+      }
       op_ret.entries.emplace_back(entry);
       // Resetting some values
       offset_populated = false;
@@ -414,11 +418,17 @@ int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, c
       }
     } while(index < bl_chunk.length());
 
-    CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max);
+    CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu, last_marker: %s and op.end_marker is %s\n",
+            num_ops, op.max, last_marker.c_str(), op.end_marker.c_str());
 
-    if (num_ops == op.max) {
-      next_marker = cls_queue_marker{(entry_start_offset + index), gen};
-      CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu", next_marker.offset);
+    if (num_ops == op.max || (!op.end_marker.empty() && op.end_marker == last_marker)) {
+      if (!op.end_marker.empty()) {
+        next_marker.from_str(op.end_marker.c_str());
+      } else {
+        next_marker = cls_queue_marker{(entry_start_offset + index), gen};
+      }
+      CLS_LOG(10, "INFO: queue_list_entries(): either num_ops is same as op.max or last_marker is same as op.end_marker, "
+                  "hence breaking out from outer loop with next offset: %lu", next_marker.offset);
       break;
     }
 
index 14947244d41f4d2528dac04c096c5f53cc99ead8..8f392721f5b745f8d7788234c7706595e8e2c992 100644 (file)
@@ -11,6 +11,7 @@
 #include "gtest/gtest.h"
 #include "test/librados/test_cxx.h"
 #include "global/global_context.h"
+#include "cls/2pc_queue/cls_2pc_queue_const.h"
 
 #include <string>
 #include <vector>
@@ -173,6 +174,131 @@ TEST_F(TestCls2PCQueue, Commit)
   ASSERT_EQ(reservations.size(), 0);
 }
 
+TEST_F(TestCls2PCQueue, Stats)
+{
+  const std::string queue_name = __PRETTY_FUNCTION__;
+  const auto max_size = 1024*1024*128;
+  const auto number_of_ops = 200U;
+  const auto number_of_elements = 23U;
+  auto total_committed_elements = 0U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  for (auto i = 0U; i < number_of_ops; ++i) {
+    const std::string element_prefix("op-" +to_string(i) + "-element-");
+    auto total_size = 0UL;
+    std::vector<bufferlist> data(number_of_elements);
+    // create vector of buffer lists
+    std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
+      bufferlist bl;
+      bl.append(element_prefix + to_string(j++));
+      total_size += bl.length();
+      return bl;
+    });
+
+    cls_2pc_reservation::id_t res_id;
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
+    ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+    cls_2pc_queue_commit(op, data, res_id);
+    ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+    total_committed_elements += number_of_elements;
+    uint32_t committed_entries;
+    uint64_t size;
+
+    ASSERT_EQ(cls_2pc_queue_get_topic_stats(ioctx, queue_name, committed_entries, size), 0);
+    ASSERT_EQ(committed_entries, total_committed_elements);
+  }
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), 0);
+}
+
+TEST_F(TestCls2PCQueue, UpgradeFromReef)
+{
+  const std::string queue_name = __PRETTY_FUNCTION__;
+  const auto max_size = 1024*1024*128;
+  const auto number_of_ops = 200U;
+  const auto number_of_elements = 23U;
+  auto total_committed_elements = 0U;
+  librados::ObjectWriteOperation wop;
+  wop.create(true);
+  cls_2pc_queue_init(wop, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+
+  for (auto i = 0U; i < number_of_ops; ++i) {
+    const std::string element_prefix("wop-" +to_string(i) + "-element-");
+    auto total_size = 0UL;
+    std::vector<bufferlist> data(number_of_elements);
+    // create vector of buffer lists
+    std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
+      bufferlist bl;
+      bl.append(element_prefix + to_string(j++));
+      total_size += bl.length();
+      return bl;
+    });
+
+    cls_2pc_reservation::id_t res_id;
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
+    ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+    cls_2pc_queue_commit(wop, data, res_id);
+    ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+
+    total_committed_elements += number_of_elements;
+    uint32_t committed_entries;
+    uint64_t size;
+
+    ASSERT_EQ(cls_2pc_queue_get_topic_stats(ioctx, queue_name, committed_entries, size), 0);
+    ASSERT_EQ(committed_entries, total_committed_elements);
+  }
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), 0);
+
+  constexpr auto max_elements = 42U;
+  std::string marker;
+  std::string end_marker;
+  librados::ObjectReadOperation rop;
+  auto consume_count = 0U;
+  std::vector<cls_queue_entry> entries;
+  bool truncated = true;
+
+  auto simulate_reef_cls_2pc_queue_remove_entries = [](librados::ObjectWriteOperation& wop, const std::string& end_marker) {
+    bufferlist in;
+    cls_queue_remove_op rem_op;
+    rem_op.end_marker = end_marker;
+    encode(rem_op, in);
+    wop.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in);
+  };
+
+  while (truncated) {
+    bufferlist bl;
+    int rc;
+    cls_2pc_queue_list_entries(rop, marker, max_elements, &bl, &rc);
+    ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
+    ASSERT_EQ(rc, 0);
+    ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0);
+
+    consume_count += entries.size();
+    // simulating reef cls_2pc_queue_remove_entries with cls_queue_remove_op
+    simulate_reef_cls_2pc_queue_remove_entries(wop, end_marker);
+    marker = end_marker;
+    total_committed_elements -= entries.size();
+  }
+
+  // execute all delete operations in a batch
+  ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+  ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
+
+  uint32_t entries_number;
+  uint64_t size;
+  ASSERT_EQ(cls_2pc_queue_get_topic_stats(ioctx, queue_name, entries_number, size), 0);
+  ASSERT_EQ(total_committed_elements, 0);
+  ASSERT_EQ(entries_number, 0);
+}
+
 TEST_F(TestCls2PCQueue, Abort)
 {
   const std::string queue_name = __PRETTY_FUNCTION__;
index cca615afb0a58e957a079a38f4de26f9e182a840..a1f68638c1f41a62c94126cca9d7ab2fd1c233d8 100644 (file)
@@ -137,6 +137,51 @@ TEST_F(TestClsQueue, List)
   ASSERT_EQ(total_elements, number_of_ops*number_of_elements);
 }
 
+TEST_F(TestClsQueue, ListByEndMarker)
+{
+  const std::string queue_name = "my-queue";
+  const uint64_t queue_size = 1024*1024;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_queue_init(op, queue_name, queue_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+  const auto number_of_ops = 10;
+  const auto number_of_elements = 100;
+
+  // test multiple enqueues
+  test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
+
+  const auto max_elements = 42;
+  std::string marker, end_marker;
+  bool truncated = false;
+  std::string max_op_next_marker;
+  auto total_elements = 0;
+  do {
+    std::vector<cls_queue_entry> entries;
+    auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, max_op_next_marker);
+    ASSERT_EQ(0, ret);
+    end_marker = max_op_next_marker;
+
+    std::vector<cls_queue_entry> end_marker_entries;
+    std::string end_marker_next_marker;
+    bool end_marker_truncated = false;
+    ret = cls_queue_list_entries(ioctx, queue_name, marker, end_marker, end_marker_entries,
+                                 &end_marker_truncated, end_marker_next_marker);
+    ASSERT_EQ(0, ret);
+
+    ASSERT_EQ(end_marker_next_marker, end_marker);
+    ASSERT_EQ(end_marker_entries.size(), entries.size());
+    for (auto i = 0U; i < end_marker_entries.size() && i < entries.size(); ++i) {
+      ASSERT_EQ(end_marker_entries[i].marker, entries[i].marker);
+    }
+
+    marker = max_op_next_marker;
+    total_elements += entries.size();
+  } while (truncated);
+
+  ASSERT_EQ(total_elements, number_of_ops*number_of_elements);
+}
+
 TEST_F(TestClsQueue, Dequeue)
 {
   const std::string queue_name = "my-queue";