CLS_VER(1,0)
CLS_NAME(2pc_queue)
+constexpr auto CLS_QUEUE_URGENT_DATA_XATTR_NAME = "cls_queue_urgent_data";
+
static int cls_2pc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
auto in_iter = in->cbegin();
}
urgent_data.reserved_size += res_op.size + overhead;
+ // note that last id is incremented regadless of failures
+ // to avoid "old reservation" issues below
+ ++urgent_data.last_id;
bool result;
- std::tie(std::ignore, result) = urgent_data.reservations.emplace(std::piecewise_construct,
- std::forward_as_tuple(++urgent_data.last_id),
+ cls_2pc_reservations::iterator last_reservation;
+ std::tie(last_reservation, result) = urgent_data.reservations.emplace(std::piecewise_construct,
+ std::forward_as_tuple(urgent_data.last_id),
std::forward_as_tuple(res_op.size, ceph::real_clock::now()));
if (!result) {
// an old reservation that was never committed or aborted is in the map
encode(urgent_data, head.bl_urgent_data);
const uint64_t urgent_data_length = head.bl_urgent_data.length();
+ auto total_entries = urgent_data.reservations.size();
if (head.max_urgent_data_size < urgent_data_length) {
- // TODO: use objects xattr for spillover
- CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: urgent data size: %lu exceeded maximum: %lu", urgent_data_length, head.max_urgent_data_size);
- return -ENOSPC;
+ CLS_LOG(10, "INFO: cls_2pc_queue_reserve: urgent data size: %lu exceeded maximum: %lu using xattrs", urgent_data_length, head.max_urgent_data_size);
+ // add the last reservation to xattrs
+ bufferlist bl_xattrs;
+ auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to read xattrs with: %d", ret);
+ return ret;
+ }
+ cls_2pc_reservations xattr_reservations;
+ if (ret >= 0) {
+ // xattrs exist
+ auto iter = bl_xattrs.cbegin();
+ try {
+ decode(xattr_reservations, iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to decode xattrs urgent data map");
+ return -EINVAL;
+ } //end - catch
+ }
+ std::tie(std::ignore, result) = xattr_reservations.emplace(std::piecewise_construct,
+ std::forward_as_tuple(urgent_data.last_id),
+ std::forward_as_tuple(res_op.size, ceph::real_clock::now()));
+ if (!result) {
+ // an old reservation that was never committed or aborted is in the map
+ // caller should try again assuming other IDs are ok
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: reservation id conflict inside xattrs after rollover: %u", urgent_data.last_id);
+ return -EAGAIN;
+ }
+ bl_xattrs.clear();
+ encode(xattr_reservations, bl_xattrs);
+ ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to write xattrs with: %d", ret);
+ return ret;
+ }
+ // remove the last reservation from the reservation list
+ // and indicate that spillover happened
+ urgent_data.has_xattrs = true;
+ urgent_data.reservations.erase(last_reservation);
+ head.bl_urgent_data.clear();
+ encode(urgent_data, head.bl_urgent_data);
}
ret = queue_write_head(hctx, head);
CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservations: %lu (bytes)", urgent_data.reserved_size);
CLS_LOG(20, "INFO: cls_2pc_queue_reserve: requested size: %lu (bytes)", res_op.size);
CLS_LOG(20, "INFO: cls_2pc_queue_reserve: urgent data size: %lu (bytes)", urgent_data_length);
- CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size());
+ CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation entries: %lu", total_entries);
cls_2pc_queue_reserve_ret op_ret;
op_ret.id = urgent_data.last_id;
}
auto it = urgent_data.reservations.find(commit_op.id);
+ cls_2pc_reservations xattr_reservations;
+ bufferlist bl_xattrs;
if (it == urgent_data.reservations.end()) {
- CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id);
- return -ENOENT;
+ if (!urgent_data.has_xattrs) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id);
+ return -ENOENT;
+ }
+ // try to look for the reservation in xattrs
+ auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0) {
+ if (ret == -ENOENT || ret == -ENODATA) {
+ // no xattrs, reservation does not exists
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id);
+ return -ENOENT;
+ }
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to read xattrs with: %d", ret);
+ return ret;
+ }
+ auto iter = bl_xattrs.cbegin();
+ try {
+ decode(xattr_reservations, iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to decode xattrs urgent data map");
+ return -EINVAL;
+ } //end - catch
+ it = xattr_reservations.find(commit_op.id);
+ if (it == urgent_data.reservations.end()) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id);
+ return -ENOENT;
+ }
}
auto& res = it->second;
return ret;
}
- // remove the reservation
urgent_data.reserved_size -= res.size;
- urgent_data.reservations.erase(it);
+
+ if (xattr_reservations.empty()) {
+ // remove the reservation from urgent data
+ urgent_data.reservations.erase(it);
+ } else {
+ // remove the reservation from xattrs
+ xattr_reservations.erase(it);
+ bl_xattrs.clear();
+ encode(xattr_reservations, bl_xattrs);
+ ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to write xattrs with: %d", ret);
+ return ret;
+ }
+ }
CLS_LOG(20, "INFO: cls_2pc_queue_commit: current reservations: %lu (bytes)", urgent_data.reserved_size);
- CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size());
+ CLS_LOG(20, "INFO: cls_2pc_queue_commit: current reservation entries: %lu",
+ urgent_data.reservations.size() + xattr_reservations.size());
+
// write back head
head.bl_urgent_data.clear();
encode(urgent_data, head.bl_urgent_data);
CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode entry: %s", err.what());
return -EINVAL;
}
-
+
+ auto total_entries = urgent_data.reservations.size();
auto it = urgent_data.reservations.find(abort_op.id);
if (it == urgent_data.reservations.end()) {
- CLS_LOG(10, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id);
- return 0;
+ if (!urgent_data.has_xattrs) {
+ CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id);
+ return 0;
+ }
+ // try to look for the reservation in xattrs
+ bufferlist bl_xattrs;
+ auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0) {
+ if (ret == -ENOENT || ret == -ENODATA) {
+ // no xattrs, reservation does not exists
+ CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id);
+ return 0;
+ }
+ CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to read xattrs with: %d", ret);
+ return ret;
+ }
+ auto iter = bl_xattrs.cbegin();
+ cls_2pc_reservations xattr_reservations;
+ try {
+ decode(xattr_reservations, iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode xattrs urgent data map");
+ return -EINVAL;
+ } //end - catch
+ it = xattr_reservations.find(abort_op.id);
+ if (it == xattr_reservations.end()) {
+ CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id);
+ return 0;
+ }
+ total_entries += xattr_reservations.size();
+ xattr_reservations.erase(it);
+ bl_xattrs.clear();
+ encode(xattr_reservations, bl_xattrs);
+ ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to write xattrs with: %d", ret);
+ return ret;
+ }
+ } else {
+ urgent_data.reservations.erase(it);
}
// remove the reservation
urgent_data.reserved_size -= it->second.size;
- urgent_data.reservations.erase(it);
CLS_LOG(20, "INFO: cls_2pc_queue_abort: current reservations: %lu (bytes)", urgent_data.reserved_size);
- CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size());
+ CLS_LOG(20, "INFO: cls_2pc_queue_abort: current reservation entries: %lu", total_entries-1);
// write back head
head.bl_urgent_data.clear();
static int cls_2pc_queue_list_reservations(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
//get head
cls_queue_head head;
- int ret = queue_read_head(hctx, head);
+ auto ret = queue_read_head(hctx, head);
if (ret < 0) {
return ret;
}
return -EINVAL;
}
+ CLS_LOG(20, "INFO: cls_2pc_queue_list_reservations: %lu reservation entries found", urgent_data.reservations.size());
cls_2pc_queue_reservations_ret op_ret;
op_ret.reservations = std::move(urgent_data.reservations);
-
+ if (urgent_data.has_xattrs) {
+ // try to look for the reservation in xattrs
+ cls_2pc_reservations xattr_reservations;
+ bufferlist bl_xattrs;
+ ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to read xattrs with: %d", ret);
+ return ret;
+ }
+ if (ret >= 0) {
+ auto iter = bl_xattrs.cbegin();
+ try {
+ decode(xattr_reservations, iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to decode xattrs urgent data map");
+ return -EINVAL;
+ } //end - catch
+ CLS_LOG(20, "INFO: cls_2pc_queue_list_reservations: %lu reservation entries found in xatts", xattr_reservations.size());
+ op_ret.reservations.merge(xattr_reservations);
+ }
+ }
encode(op_ret, *out);
return 0;
ASSERT_EQ(entries.size(), 0);
}
+TEST_F(TestCls2PCQueue, ReserveSpillover)
+{
+ const std::string queue_name = "my-queue";
+ const auto max_size = 1024U*1024U;
+ const auto number_of_ops = 1024U;
+ const auto number_of_elements = 8U;
+ const auto size_to_reserve = 64U;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_2pc_queue_init(op, queue_name, max_size);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ for (auto i = 0U; i < number_of_ops; ++i) {
+ cls_2pc_reservation::id_t res_id;
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+ }
+ cls_2pc_reservations reservations;
+ ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+ ASSERT_EQ(reservations.size(), number_of_ops);
+ for (const auto& r : reservations) {
+ ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
+ ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0);
+ }
+}
+
+TEST_F(TestCls2PCQueue, CommitSpillover)
+{
+ const std::string queue_name = "my-queue";
+ const auto max_size = 1024U*1024U;
+ const auto number_of_ops = 1024U;
+ const auto number_of_elements = 4U;
+ const auto size_to_reserve = 128U;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_2pc_queue_init(op, queue_name, max_size);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ for (auto i = 0U; i < number_of_ops; ++i) {
+ cls_2pc_reservation::id_t res_id;
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+ }
+ cls_2pc_reservations reservations;
+ ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+ for (const auto& r : reservations) {
+ const std::string element_prefix("foo");
+ std::vector<bufferlist> data(number_of_elements);
+ auto total_size = 0UL;
+ // create vector of buffer lists
+ std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
+ bufferlist bl;
+ bl.append(element_prefix + to_string(j++));
+ total_size += bl.length();
+ return bl;
+ });
+ ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
+ cls_2pc_queue_commit(op, data, r.first);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ }
+ ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+ ASSERT_EQ(reservations.size(), 0);
+}
+
+TEST_F(TestCls2PCQueue, AbortSpillover)
+{
+ const std::string queue_name = "my-queue";
+ const auto max_size = 1024U*1024U;
+ const auto number_of_ops = 1024U;
+ const auto number_of_elements = 4U;
+ const auto size_to_reserve = 128U;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_2pc_queue_init(op, queue_name, max_size);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ for (auto i = 0U; i < number_of_ops; ++i) {
+ cls_2pc_reservation::id_t res_id;
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+ }
+ cls_2pc_reservations reservations;
+ ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+ for (const auto& r : reservations) {
+ ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
+ cls_2pc_queue_abort(op, r.first);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ }
+ ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+ ASSERT_EQ(reservations.size(), 0);
+}
+