return queue_init(hctx, init_op);
}
+static int cls_2pc_queue_get_capacity(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ cls_queue_get_capacity_ret op_ret;
+ auto ret = queue_get_capacity(hctx, op_ret);
+ if (ret < 0) {
+ return ret;
+ }
+
+ encode(op_ret, *out);
+ return 0;
+}
+
static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
cls_2pc_queue_reserve_op res_op;
try {
return 0;
}
+static int cls_2pc_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+ cls_queue_list_op op;
+ try {
+ decode(op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_list_entries: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ cls_queue_head head;
+ auto ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_queue_list_ret op_ret;
+ ret = queue_list_entries(hctx, op, op_ret, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ encode(op_ret, *out);
+ return 0;
+}
+
+static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+ cls_queue_remove_op op;
+ try {
+ decode(op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ cls_queue_head head;
+ auto ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+ ret = queue_remove_entries(hctx, op, head);
+ if (ret < 0) {
+ return ret;
+ }
+ return queue_write_head(hctx, head);
+}
+
CLS_INIT(2pc_queue)
{
CLS_LOG(1, "Loaded 2pc queue class!");
cls_handle_t h_class;
cls_method_handle_t h_2pc_queue_init;
+ cls_method_handle_t h_2pc_queue_get_capacity;
cls_method_handle_t h_2pc_queue_reserve;
cls_method_handle_t h_2pc_queue_commit;
cls_method_handle_t h_2pc_queue_abort;
cls_method_handle_t h_2pc_queue_list_reservations;
+ cls_method_handle_t h_2pc_queue_list_entries;
+ cls_method_handle_t h_2pc_queue_remove_entries;
cls_register(TPC_QUEUE_CLASS, &h_class);
cls_register_cxx_method(h_class, TPC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_init, &h_2pc_queue_init);
+ cls_register_cxx_method(h_class, TPC_QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_2pc_queue_get_capacity, &h_2pc_queue_get_capacity);
cls_register_cxx_method(h_class, TPC_QUEUE_RESERVE, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_reserve, &h_2pc_queue_reserve);
cls_register_cxx_method(h_class, TPC_QUEUE_COMMIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_commit, &h_2pc_queue_commit);
cls_register_cxx_method(h_class, TPC_QUEUE_ABORT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_abort, &h_2pc_queue_abort);
cls_register_cxx_method(h_class, TPC_QUEUE_LIST_RESERVATIONS, CLS_METHOD_RD, cls_2pc_queue_list_reservations, &h_2pc_queue_list_reservations);
+ cls_register_cxx_method(h_class, TPC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_2pc_queue_list_entries, &h_2pc_queue_list_entries);
+ cls_register_cxx_method(h_class, TPC_QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_remove_entries, &h_2pc_queue_remove_entries);
return;
}
int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const string& queue_name, uint64_t& size) {
bufferlist in, out;
- const auto r = io_ctx.exec(queue_name, QUEUE_CLASS, QUEUE_GET_CAPACITY, in, out);
+ const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, out);
if (r < 0 ) {
return r;
}
op.max = max;
encode(op, in);
- const auto r = io_ctx.exec(queue_name, QUEUE_CLASS, QUEUE_LIST_ENTRIES, in, out);
+ const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out);
if (r < 0) {
return r;
}
cls_queue_remove_op rem_op;
rem_op.end_marker = end_marker;
encode(rem_op, in);
- op.exec(QUEUE_CLASS, QUEUE_REMOVE_ENTRIES, in);
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in);
}