From: Yuval Lifshitz Date: Tue, 21 Apr 2020 16:15:42 +0000 (+0300) Subject: cls/2pc_queue: remove the dependency of cls_2pc_queue with cls_queue X-Git-Tag: v16.1.0~2539^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F34669%2Fhead;p=ceph.git cls/2pc_queue: remove the dependency of cls_2pc_queue with cls_queue both queues share the same code base, however, cls_2pc_queue should work even if cls_queue objectclass is not loaded into the osd Signed-off-by: Yuval Lifshitz --- diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc index 41e8166cecf1..7f7303ae0764 100644 --- a/src/cls/2pc_queue/cls_2pc_queue.cc +++ b/src/cls/2pc_queue/cls_2pc_queue.cc @@ -43,6 +43,18 @@ static int cls_2pc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferl return queue_init(hctx, init_op); } +static int cls_2pc_queue_get_capacity(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + cls_queue_get_capacity_ret op_ret; + auto ret = queue_get_capacity(hctx, op_ret); + if (ret < 0) { + return ret; + } + + encode(op_ret, *out); + return 0; +} + static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { cls_2pc_queue_reserve_op res_op; try { @@ -413,24 +425,80 @@ static int cls_2pc_queue_list_reservations(cls_method_context_t hctx, bufferlist return 0; } +static int cls_2pc_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + cls_queue_list_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_list_entries: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + cls_queue_head head; + auto ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_queue_list_ret op_ret; + ret = queue_list_entries(hctx, op, op_ret, head); + if (ret < 0) { + return ret; + } + + encode(op_ret, *out); + return 0; +} + +static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + cls_queue_remove_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + cls_queue_head head; + auto ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + ret = queue_remove_entries(hctx, op, head); + if (ret < 0) { + return ret; + } + return queue_write_head(hctx, head); +} + CLS_INIT(2pc_queue) { CLS_LOG(1, "Loaded 2pc queue class!"); cls_handle_t h_class; cls_method_handle_t h_2pc_queue_init; + cls_method_handle_t h_2pc_queue_get_capacity; cls_method_handle_t h_2pc_queue_reserve; cls_method_handle_t h_2pc_queue_commit; cls_method_handle_t h_2pc_queue_abort; cls_method_handle_t h_2pc_queue_list_reservations; + cls_method_handle_t h_2pc_queue_list_entries; + cls_method_handle_t h_2pc_queue_remove_entries; cls_register(TPC_QUEUE_CLASS, &h_class); cls_register_cxx_method(h_class, TPC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_init, &h_2pc_queue_init); + cls_register_cxx_method(h_class, TPC_QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_2pc_queue_get_capacity, &h_2pc_queue_get_capacity); cls_register_cxx_method(h_class, TPC_QUEUE_RESERVE, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_reserve, &h_2pc_queue_reserve); cls_register_cxx_method(h_class, TPC_QUEUE_COMMIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_commit, &h_2pc_queue_commit); cls_register_cxx_method(h_class, TPC_QUEUE_ABORT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_abort, &h_2pc_queue_abort); cls_register_cxx_method(h_class, TPC_QUEUE_LIST_RESERVATIONS, CLS_METHOD_RD, cls_2pc_queue_list_reservations, &h_2pc_queue_list_reservations); + cls_register_cxx_method(h_class, TPC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_2pc_queue_list_entries, &h_2pc_queue_list_entries); + cls_register_cxx_method(h_class, TPC_QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_remove_entries, &h_2pc_queue_remove_entries); return; } diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.cc b/src/cls/2pc_queue/cls_2pc_queue_client.cc index ebd7c4ada67f..45f31cf401a3 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_client.cc +++ b/src/cls/2pc_queue/cls_2pc_queue_client.cc @@ -19,7 +19,7 @@ void cls_2pc_queue_init(ObjectWriteOperation& op, const std::string& queue_name, int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const string& queue_name, uint64_t& size) { bufferlist in, out; - const auto r = io_ctx.exec(queue_name, QUEUE_CLASS, QUEUE_GET_CAPACITY, in, out); + const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, out); if (r < 0 ) { return r; } @@ -91,7 +91,7 @@ int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const st op.max = max; encode(op, in); - const auto r = io_ctx.exec(queue_name, QUEUE_CLASS, QUEUE_LIST_ENTRIES, in, out); + const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out); if (r < 0) { return r; } @@ -138,6 +138,6 @@ void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& e cls_queue_remove_op rem_op; rem_op.end_marker = end_marker; encode(rem_op, in); - op.exec(QUEUE_CLASS, QUEUE_REMOVE_ENTRIES, in); + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in); } diff --git a/src/cls/2pc_queue/cls_2pc_queue_const.h b/src/cls/2pc_queue/cls_2pc_queue_const.h index 2a22ff1a62c0..8bbdcd57529d 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_const.h +++ b/src/cls/2pc_queue/cls_2pc_queue_const.h @@ -3,8 +3,11 @@ #define TPC_QUEUE_CLASS "2pc_queue" #define TPC_QUEUE_INIT "2pc_queue_init" +#define TPC_QUEUE_GET_CAPACITY "2pc_queue_get_capacity" #define TPC_QUEUE_RESERVE "2pc_queue_reserve" #define TPC_QUEUE_COMMIT "2pc_queue_commit" #define TPC_QUEUE_ABORT "2pc_queue_abort" #define TPC_QUEUE_LIST_RESERVATIONS "2pc_queue_list_reservations" +#define TPC_QUEUE_LIST_ENTRIES "2pc_queue_list_entries" +#define TPC_QUEUE_REMOVE_ENTRIES "2pc_queue_remove_entries"