]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notification: fix reserved_size drift in 2pc_queue causing ENOSPC errors 67169/head
authorKrunal Chheda <kchheda3@bloomberg.net>
Tue, 10 Feb 2026 21:01:03 +0000 (16:01 -0500)
committerKrunal Chheda <kchheda3@bloomberg.net>
Tue, 10 Feb 2026 21:24:23 +0000 (16:24 -0500)
The urgent_data.reserved_size field was accumulating incorrect values over time due to a mismatch between what was added during reserve() and what was subtracted during commit()/abort(). This caused the reserved_size to grow unbounded, eventually hitting the queue capacity limit and returning ENOSPC errors even when the queue had plenty of actual space.

solution:
Add a one time self healing capability, where the reservation value is re calculated during the reserve and counter is updated with correct value.

Signed-off-by: Krunal Chheda <kchheda3@bloomberg.net>
src/cls/2pc_queue/cls_2pc_queue.cc
src/cls/2pc_queue/cls_2pc_queue_types.h

index 64c03cff5ad166b7305224c0f830b3f9a11159a6..f8727d156ba21cbe48985b27e11ec3002764d511 100644 (file)
@@ -21,6 +21,16 @@ using ceph::encode;
 
 constexpr auto CLS_QUEUE_URGENT_DATA_XATTR_NAME = "cls_queue_urgent_data";
 
+// Compute reserved size from a reservations map
+static uint64_t calc_reservations_size(
+    const cls_2pc_reservations& reservations) {
+  uint64_t total = 0;
+  for (const auto& [id, res] : reservations) {
+    total += res.size + (res.entries * QUEUE_ENTRY_OVERHEAD);
+  }
+  return total;
+}
+
 static int cls_2pc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
   auto in_iter = in->cbegin();
 
@@ -122,6 +132,42 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff
     return -EINVAL;
   }
 
+  // For old queues (v1/v2), recalculate reserved_size from actual reservations
+  // to fix any historical drift. Once written back, queue becomes v3.
+  if (urgent_data.decoded_struct_v < 3) {
+    urgent_data.reserved_size =
+        calc_reservations_size(urgent_data.reservations);
+
+    // Also check xattrs if they exist
+    cls_2pc_reservations xattr_reservations;
+    bufferlist bl_xattrs;
+    if (urgent_data.has_xattrs) {
+      ret =
+          cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+      if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
+        CLS_LOG(1,
+                "ERROR: cls_2pc_queue_reserve: failed to read xattrs with: %d",
+                ret);
+        return ret;
+      }
+      if (ret >= 0) {
+        auto iter = bl_xattrs.cbegin();
+        try {
+          decode(xattr_reservations, iter);
+        } catch (ceph::buffer::error& err) {
+          CLS_LOG(1,
+                  "ERROR: cls_2pc_queue_reserve: failed to decode xattrs "
+                  "urgent data map");
+          return -EINVAL;
+        }
+        urgent_data.reserved_size += calc_reservations_size(xattr_reservations);
+      }
+    }
+    CLS_LOG(
+        1,
+        "INFO: cls_2pc_queue_reserve: re-calculated urgent_data.reserved_size, reserved_size=%lu",
+        urgent_data.reserved_size);
+  }
   const auto overhead = res_op.entries*QUEUE_ENTRY_OVERHEAD;
   const auto remaining_size = (head.tail.offset >= head.front.offset) ?
     (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size) :
index af4ef0ef17a4e6ab652f63e039668a86d006d92f..7556f69aa0d64d6257e5f5e913cff455cf917341 100644 (file)
@@ -62,14 +62,22 @@ using cls_2pc_reservations = std::unordered_map<cls_2pc_reservation::id_t, cls_2
 
 struct cls_2pc_urgent_data
 {
-  uint64_t reserved_size{0};   // pending reservations size in bytes
-  cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id
-  cls_2pc_reservations reservations; // reservation list (keyed by id)
+  uint64_t reserved_size{0};
+  // pending reservations size in bytes
+  // For version >= 3: this counter is accurate and can be used directly
+  // For version < 3: ignore this value and compute from reservations (fixes
+  // historical drift)
+  cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID};
+  // last allocated id
+  cls_2pc_reservations reservations;  // reservation list (keyed by id)
   bool has_xattrs{false};
   uint32_t committed_entries{0}; // how many entries have been committed so far
+  // Transient field (not persisted) - stores the version from which this was
+  // decoded
+  uint8_t decoded_struct_v{3};
 
   void encode(ceph::buffer::list& bl) const {
-    ENCODE_START(2, 1, bl);
+    ENCODE_START(3, 1, bl);
     encode(reserved_size, bl);
     encode(last_id, bl);
     encode(reservations, bl);
@@ -79,7 +87,7 @@ struct cls_2pc_urgent_data
   }
 
   void decode(ceph::buffer::list::const_iterator& bl) {
-    DECODE_START(2, bl);
+    DECODE_START(3, bl);
     decode(reserved_size, bl);
     decode(last_id, bl);
     decode(reservations, bl);
@@ -87,6 +95,7 @@ struct cls_2pc_urgent_data
     if (struct_v >= 2) {
       decode(committed_entries, bl);
     }
+    decoded_struct_v = struct_v;
     DECODE_FINISH(bl);
   }