From: Krunal Chheda Date: Tue, 10 Feb 2026 21:01:03 +0000 (-0500) Subject: rgw/notification: fix reserved_size drift in 2pc_queue causing ENOSPC errors X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7f4eaee30cba6efd3e0acc5b3c315c182a3bc8d9;p=ceph.git rgw/notification: fix reserved_size drift in 2pc_queue causing ENOSPC errors The urgent_data.reserved_size field was accumulating incorrect values over time due to a mismatch between what was added during reserve() and what was subtracted during commit()/abort(). This caused the reserved_size to grow unbounded, eventually hitting the queue capacity limit and returning ENOSPC errors even when the queue had plenty of actual space. solution: Add a one time self healing capability, where the reservation value is re calculated during the reserve and counter is updated with correct value. Signed-off-by: Krunal Chheda --- diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc index 64c03cff5ad..f8727d156ba 100644 --- a/src/cls/2pc_queue/cls_2pc_queue.cc +++ b/src/cls/2pc_queue/cls_2pc_queue.cc @@ -21,6 +21,16 @@ using ceph::encode; constexpr auto CLS_QUEUE_URGENT_DATA_XATTR_NAME = "cls_queue_urgent_data"; +// Compute reserved size from a reservations map +static uint64_t calc_reservations_size( + const cls_2pc_reservations& reservations) { + uint64_t total = 0; + for (const auto& [id, res] : reservations) { + total += res.size + (res.entries * QUEUE_ENTRY_OVERHEAD); + } + return total; +} + static int cls_2pc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { auto in_iter = in->cbegin(); @@ -122,6 +132,42 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff return -EINVAL; } + // For old queues (v1/v2), recalculate reserved_size from actual reservations + // to fix any historical drift. Once written back, queue becomes v3. + if (urgent_data.decoded_struct_v < 3) { + urgent_data.reserved_size = + calc_reservations_size(urgent_data.reservations); + + // Also check xattrs if they exist + cls_2pc_reservations xattr_reservations; + bufferlist bl_xattrs; + if (urgent_data.has_xattrs) { + ret = + cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { + CLS_LOG(1, + "ERROR: cls_2pc_queue_reserve: failed to read xattrs with: %d", + ret); + return ret; + } + if (ret >= 0) { + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_reservations, iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, + "ERROR: cls_2pc_queue_reserve: failed to decode xattrs " + "urgent data map"); + return -EINVAL; + } + urgent_data.reserved_size += calc_reservations_size(xattr_reservations); + } + } + CLS_LOG( + 1, + "INFO: cls_2pc_queue_reserve: re-calculated urgent_data.reserved_size, reserved_size=%lu", + urgent_data.reserved_size); + } const auto overhead = res_op.entries*QUEUE_ENTRY_OVERHEAD; const auto remaining_size = (head.tail.offset >= head.front.offset) ? (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size) : diff --git a/src/cls/2pc_queue/cls_2pc_queue_types.h b/src/cls/2pc_queue/cls_2pc_queue_types.h index af4ef0ef17a..7556f69aa0d 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_types.h +++ b/src/cls/2pc_queue/cls_2pc_queue_types.h @@ -62,14 +62,22 @@ using cls_2pc_reservations = std::unordered_map= 3: this counter is accurate and can be used directly + // For version < 3: ignore this value and compute from reservations (fixes + // historical drift) + cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; + // last allocated id + cls_2pc_reservations reservations; // reservation list (keyed by id) bool has_xattrs{false}; uint32_t committed_entries{0}; // how many entries have been committed so far + // Transient field (not persisted) - stores the version from which this was + // decoded + uint8_t decoded_struct_v{3}; void encode(ceph::buffer::list& bl) const { - ENCODE_START(2, 1, bl); + ENCODE_START(3, 1, bl); encode(reserved_size, bl); encode(last_id, bl); encode(reservations, bl); @@ -79,7 +87,7 @@ struct cls_2pc_urgent_data } void decode(ceph::buffer::list::const_iterator& bl) { - DECODE_START(2, bl); + DECODE_START(3, bl); decode(reserved_size, bl); decode(last_id, bl); decode(reservations, bl); @@ -87,6 +95,7 @@ struct cls_2pc_urgent_data if (struct_v >= 2) { decode(committed_entries, bl); } + decoded_struct_v = struct_v; DECODE_FINISH(bl); }