]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
cls/queue: spillover of urgent data to xattrs
authorYuval Lifshitz <yuvalif@yahoo.com>
Wed, 11 Mar 2020 18:04:04 +0000 (20:04 +0200)
committerYuval Lifshitz <yuvalif@yahoo.com>
Wed, 11 Mar 2020 20:28:52 +0000 (22:28 +0200)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
src/cls/2pc_queue/cls_2pc_queue.cc
src/cls/2pc_queue/cls_2pc_queue_types.h
src/test/cls_2pc_queue/test_cls_2pc_queue.cc

index 539105f9a2882f8cb810464e7f1318818d0b0979..a28fb648edae1cd6f8bc14046d38336c9d41facc 100644 (file)
@@ -13,6 +13,8 @@
 CLS_VER(1,0)
 CLS_NAME(2pc_queue)
 
+constexpr auto CLS_QUEUE_URGENT_DATA_XATTR_NAME = "cls_queue_urgent_data";
+
 static int cls_2pc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
   auto in_iter = in->cbegin();
 
@@ -87,9 +89,13 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff
   }
 
   urgent_data.reserved_size += res_op.size + overhead;
+  // note that last id is incremented regadless of failures
+  // to avoid "old reservation" issues below
+  ++urgent_data.last_id;
   bool result;
-  std::tie(std::ignore, result) = urgent_data.reservations.emplace(std::piecewise_construct,
-          std::forward_as_tuple(++urgent_data.last_id),
+  cls_2pc_reservations::iterator last_reservation;
+  std::tie(last_reservation, result) = urgent_data.reservations.emplace(std::piecewise_construct,
+          std::forward_as_tuple(urgent_data.last_id),
           std::forward_as_tuple(res_op.size, ceph::real_clock::now()));
   if (!result) {
     // an old reservation that was never committed or aborted is in the map
@@ -103,11 +109,50 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff
   encode(urgent_data, head.bl_urgent_data);
 
   const uint64_t urgent_data_length = head.bl_urgent_data.length();
+  auto total_entries = urgent_data.reservations.size();
 
   if (head.max_urgent_data_size < urgent_data_length) {
-    // TODO: use objects xattr for spillover
-    CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: urgent data size: %lu exceeded maximum: %lu", urgent_data_length, head.max_urgent_data_size);
-    return -ENOSPC;
+    CLS_LOG(10, "INFO: cls_2pc_queue_reserve: urgent data size: %lu exceeded maximum: %lu using xattrs", urgent_data_length, head.max_urgent_data_size);
+    // add the last reservation to xattrs 
+    bufferlist bl_xattrs;
+    auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+    if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
+      CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to read xattrs with: %d", ret);
+      return ret;
+    }
+    cls_2pc_reservations xattr_reservations;
+    if (ret >= 0) {
+      // xattrs exist
+      auto iter = bl_xattrs.cbegin();
+      try {
+        decode(xattr_reservations, iter);
+      } catch (buffer::error& err) {
+        CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to decode xattrs urgent data map");
+        return -EINVAL;
+      } //end - catch
+    }
+    std::tie(std::ignore, result) = xattr_reservations.emplace(std::piecewise_construct,
+          std::forward_as_tuple(urgent_data.last_id),
+          std::forward_as_tuple(res_op.size, ceph::real_clock::now()));
+    if (!result) {
+      // an old reservation that was never committed or aborted is in the map
+      // caller should try again assuming other IDs are ok
+      CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: reservation id conflict inside xattrs after rollover: %u", urgent_data.last_id);
+      return -EAGAIN;
+    }
+    bl_xattrs.clear();
+    encode(xattr_reservations, bl_xattrs);
+    ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+    if (ret < 0) {
+      CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to write xattrs with: %d", ret);
+      return ret;
+    }
+    // remove the last reservation from the reservation list
+    // and indicate that spillover happened
+    urgent_data.has_xattrs = true;
+    urgent_data.reservations.erase(last_reservation);
+    head.bl_urgent_data.clear();
+    encode(urgent_data, head.bl_urgent_data);
   }
 
   ret = queue_write_head(hctx, head);
@@ -119,7 +164,7 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff
   CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservations: %lu (bytes)", urgent_data.reserved_size);
   CLS_LOG(20, "INFO: cls_2pc_queue_reserve: requested size: %lu (bytes)", res_op.size);
   CLS_LOG(20, "INFO: cls_2pc_queue_reserve: urgent data size: %lu (bytes)", urgent_data_length);
-  CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size());
+  CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation entries: %lu", total_entries);
 
   cls_2pc_queue_reserve_ret op_ret;
   op_ret.id = urgent_data.last_id;
@@ -155,9 +200,36 @@ static int cls_2pc_queue_commit(cls_method_context_t hctx, bufferlist *in, buffe
   }
   
   auto it = urgent_data.reservations.find(commit_op.id);
+  cls_2pc_reservations xattr_reservations;
+  bufferlist bl_xattrs;
   if (it == urgent_data.reservations.end()) {
-    CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id);
-    return -ENOENT;
+    if (!urgent_data.has_xattrs) {
+        CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id);
+        return -ENOENT;
+    }
+    // try to look for the reservation in xattrs
+    auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+    if (ret < 0) {
+      if (ret == -ENOENT || ret == -ENODATA) {
+        // no xattrs, reservation does not exists
+        CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id);
+        return -ENOENT;
+      }
+      CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to read xattrs with: %d", ret);
+      return ret;
+    }
+    auto iter = bl_xattrs.cbegin();
+    try {
+      decode(xattr_reservations, iter);
+    } catch (buffer::error& err) {
+      CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to decode xattrs urgent data map");
+      return -EINVAL;
+    } //end - catch
+    it = xattr_reservations.find(commit_op.id);
+    if (it == urgent_data.reservations.end()) {
+      CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id);
+      return -ENOENT;
+    }
   }
 
   auto& res = it->second;
@@ -181,12 +253,27 @@ static int cls_2pc_queue_commit(cls_method_context_t hctx, bufferlist *in, buffe
     return ret;
   }
 
-  // remove the reservation
   urgent_data.reserved_size -= res.size;
-  urgent_data.reservations.erase(it);
+
+  if (xattr_reservations.empty()) {
+    // remove the reservation from urgent data
+    urgent_data.reservations.erase(it);
+  } else {
+    // remove the reservation from xattrs
+    xattr_reservations.erase(it);
+    bl_xattrs.clear();
+    encode(xattr_reservations, bl_xattrs);
+    ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+    if (ret < 0) {
+      CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to write xattrs with: %d", ret);
+      return ret;
+    }
+  }
   
   CLS_LOG(20, "INFO: cls_2pc_queue_commit: current reservations: %lu (bytes)", urgent_data.reserved_size);
-  CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size());
+  CLS_LOG(20, "INFO: cls_2pc_queue_commit: current reservation entries: %lu", 
+          urgent_data.reservations.size() + xattr_reservations.size());
+
   // write back head
   head.bl_urgent_data.clear();
   encode(urgent_data, head.bl_urgent_data);
@@ -218,19 +305,57 @@ static int cls_2pc_queue_abort(cls_method_context_t hctx, bufferlist *in, buffer
     CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode entry: %s", err.what());
     return -EINVAL;
   }
-  
+  auto total_entries = urgent_data.reservations.size();
   auto it = urgent_data.reservations.find(abort_op.id);
   if (it == urgent_data.reservations.end()) {
-    CLS_LOG(10, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id);
-    return 0;
+    if (!urgent_data.has_xattrs) {
+      CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id);
+      return 0;
+    }
+    // try to look for the reservation in xattrs
+    bufferlist bl_xattrs;
+    auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+    if (ret < 0) {
+      if (ret == -ENOENT || ret == -ENODATA) {
+        // no xattrs, reservation does not exists
+        CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id);
+        return 0;
+      }
+      CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to read xattrs with: %d", ret);
+      return ret;
+    }
+    auto iter = bl_xattrs.cbegin();
+    cls_2pc_reservations xattr_reservations;
+    try {
+      decode(xattr_reservations, iter);
+    } catch (buffer::error& err) {
+      CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode xattrs urgent data map");
+      return -EINVAL;
+    } //end - catch
+    it = xattr_reservations.find(abort_op.id);
+    if (it == xattr_reservations.end()) {
+      CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id);
+      return 0;
+    }
+    total_entries += xattr_reservations.size();
+    xattr_reservations.erase(it);
+    bl_xattrs.clear();
+    encode(xattr_reservations, bl_xattrs);
+    ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+    if (ret < 0) {
+      CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to write xattrs with: %d", ret);
+      return ret;
+    }
+  } else {
+    urgent_data.reservations.erase(it);
   }
 
   // remove the reservation
   urgent_data.reserved_size -= it->second.size;
-  urgent_data.reservations.erase(it);
 
   CLS_LOG(20, "INFO: cls_2pc_queue_abort: current reservations: %lu (bytes)", urgent_data.reserved_size);
-  CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size());
+  CLS_LOG(20, "INFO: cls_2pc_queue_abort: current reservation entries: %lu", total_entries-1); 
 
   // write back head
   head.bl_urgent_data.clear();
@@ -241,7 +366,7 @@ static int cls_2pc_queue_abort(cls_method_context_t hctx, bufferlist *in, buffer
 static int cls_2pc_queue_list_reservations(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
   //get head
   cls_queue_head head;
-  int ret = queue_read_head(hctx, head);
+  auto ret = queue_read_head(hctx, head);
   if (ret < 0) {
     return ret;
   }
@@ -255,9 +380,30 @@ static int cls_2pc_queue_list_reservations(cls_method_context_t hctx, bufferlist
     return -EINVAL;
   }
   
+  CLS_LOG(20, "INFO: cls_2pc_queue_list_reservations: %lu reservation entries found", urgent_data.reservations.size());
   cls_2pc_queue_reservations_ret op_ret;
   op_ret.reservations = std::move(urgent_data.reservations);
-
+  if (urgent_data.has_xattrs) {
+    // try to look for the reservation in xattrs
+    cls_2pc_reservations xattr_reservations;
+    bufferlist bl_xattrs;
+    ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+    if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
+      CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to read xattrs with: %d", ret);
+      return ret;
+    }
+    if (ret >= 0) {
+      auto iter = bl_xattrs.cbegin();
+      try {
+        decode(xattr_reservations, iter);
+      } catch (buffer::error& err) {
+        CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to decode xattrs urgent data map");
+        return -EINVAL;
+      } //end - catch
+      CLS_LOG(20, "INFO: cls_2pc_queue_list_reservations: %lu reservation entries found in xatts", xattr_reservations.size());
+      op_ret.reservations.merge(xattr_reservations);
+    }
+  }
   encode(op_ret, *out);
 
   return 0;
index acf6534d3d8a57607568c7774a751362551f7266..b0a175cc47210f9201c3bc57b3a952444c7d4430 100644 (file)
@@ -37,12 +37,14 @@ struct cls_2pc_urgent_data
   uint64_t reserved_size{0};   // pending reservations size in bytes
   cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id
   cls_2pc_reservations reservations; // reservation list (keyed by id)
+  bool has_xattrs{false};
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     encode(reserved_size, bl);
     encode(last_id, bl);
     encode(reservations, bl);
+    encode(has_xattrs, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -51,6 +53,7 @@ struct cls_2pc_urgent_data
     decode(reserved_size, bl);
     decode(last_id, bl);
     decode(reservations, bl);
+    decode(has_xattrs, bl);
     DECODE_FINISH(bl);
   }
 };
index 6ca8817e5635270f91f2d683ae1ec393f0b55ba8..9b325b258d079b9324529d985d533d7456add7ea 100644 (file)
@@ -702,3 +702,95 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer)
   ASSERT_EQ(entries.size(), 0);
 }
 
+TEST_F(TestCls2PCQueue, ReserveSpillover)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024U*1024U;
+  const auto number_of_ops = 1024U;
+  const auto number_of_elements = 8U;
+  const auto size_to_reserve = 64U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  for (auto i = 0U; i < number_of_ops; ++i) {
+    cls_2pc_reservation::id_t res_id;
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+  }
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), number_of_ops);
+  for (const auto& r : reservations) {
+      ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
+      ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0);
+  }
+}
+
+TEST_F(TestCls2PCQueue, CommitSpillover)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024U*1024U;
+  const auto number_of_ops = 1024U;
+  const auto number_of_elements = 4U;
+  const auto size_to_reserve = 128U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  for (auto i = 0U; i < number_of_ops; ++i) {
+    cls_2pc_reservation::id_t res_id;
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+  }
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  for (const auto& r : reservations) {
+    const std::string element_prefix("foo");
+        std::vector<bufferlist> data(number_of_elements);
+        auto total_size = 0UL;
+        // create vector of buffer lists
+        std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
+            bufferlist bl;
+            bl.append(element_prefix + to_string(j++));
+            total_size += bl.length();
+            return bl;
+          });
+      ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
+      cls_2pc_queue_commit(op, data, r.first);
+      ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+  }
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), 0);
+}
+
+TEST_F(TestCls2PCQueue, AbortSpillover)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024U*1024U;
+  const auto number_of_ops = 1024U;
+  const auto number_of_elements = 4U;
+  const auto size_to_reserve = 128U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  for (auto i = 0U; i < number_of_ops; ++i) {
+    cls_2pc_reservation::id_t res_id;
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+  }
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  for (const auto& r : reservations) {
+      ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
+      cls_2pc_queue_abort(op, r.first);
+      ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+  }
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), 0);
+}
+