constexpr auto CLS_QUEUE_URGENT_DATA_XATTR_NAME = "cls_queue_urgent_data";
+// Compute reserved size from a reservations map
+static uint64_t calc_reservations_size(
+ const cls_2pc_reservations& reservations) {
+ uint64_t total = 0;
+ for (const auto& [id, res] : reservations) {
+ total += res.size + (res.entries * QUEUE_ENTRY_OVERHEAD);
+ }
+ return total;
+}
+
static int cls_2pc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
auto in_iter = in->cbegin();
return -EINVAL;
}
+ // For old queues (v1/v2), recalculate reserved_size from actual reservations
+ // to fix any historical drift. Once written back, queue becomes v3.
+ if (urgent_data.decoded_struct_v < 3) {
+ urgent_data.reserved_size =
+ calc_reservations_size(urgent_data.reservations);
+
+ // Also check xattrs if they exist
+ cls_2pc_reservations xattr_reservations;
+ bufferlist bl_xattrs;
+ if (urgent_data.has_xattrs) {
+ ret =
+ cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
+ CLS_LOG(1,
+ "ERROR: cls_2pc_queue_reserve: failed to read xattrs with: %d",
+ ret);
+ return ret;
+ }
+ if (ret >= 0) {
+ auto iter = bl_xattrs.cbegin();
+ try {
+ decode(xattr_reservations, iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1,
+ "ERROR: cls_2pc_queue_reserve: failed to decode xattrs "
+ "urgent data map");
+ return -EINVAL;
+ }
+ urgent_data.reserved_size += calc_reservations_size(xattr_reservations);
+ }
+ }
+ CLS_LOG(
+ 1,
+ "INFO: cls_2pc_queue_reserve: re-calculated urgent_data.reserved_size, reserved_size=%lu",
+ urgent_data.reserved_size);
+ }
const auto overhead = res_op.entries*QUEUE_ENTRY_OVERHEAD;
const auto remaining_size = (head.tail.offset >= head.front.offset) ?
(head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size) :
struct cls_2pc_urgent_data
{
- uint64_t reserved_size{0}; // pending reservations size in bytes
- cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id
- cls_2pc_reservations reservations; // reservation list (keyed by id)
+ uint64_t reserved_size{0};
+ // pending reservations size in bytes
+ // For version >= 3: this counter is accurate and can be used directly
+ // For version < 3: ignore this value and compute from reservations (fixes
+ // historical drift)
+ cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID};
+ // last allocated id
+ cls_2pc_reservations reservations; // reservation list (keyed by id)
bool has_xattrs{false};
uint32_t committed_entries{0}; // how many entries have been committed so far
+ // Transient field (not persisted) - stores the version from which this was
+ // decoded
+ uint8_t decoded_struct_v{3};
void encode(ceph::buffer::list& bl) const {
- ENCODE_START(2, 1, bl);
+ ENCODE_START(3, 1, bl);
encode(reserved_size, bl);
encode(last_id, bl);
encode(reservations, bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
- DECODE_START(2, bl);
+ DECODE_START(3, bl);
decode(reserved_size, bl);
decode(last_id, bl);
decode(reservations, bl);
if (struct_v >= 2) {
decode(committed_entries, bl);
}
+ decoded_struct_v = struct_v;
DECODE_FINISH(bl);
}