]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cls/2pc_queue: remove the dependency of cls_2pc_queue with cls_queue 34669/head
authorYuval Lifshitz <ylifshit@redhat.com>
Tue, 21 Apr 2020 16:15:42 +0000 (19:15 +0300)
committerYuval Lifshitz <ylifshit@redhat.com>
Tue, 21 Apr 2020 16:15:42 +0000 (19:15 +0300)
both queues share the same code base, however, cls_2pc_queue should work
even if cls_queue objectclass is not loaded into the osd

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/cls/2pc_queue/cls_2pc_queue.cc
src/cls/2pc_queue/cls_2pc_queue_client.cc
src/cls/2pc_queue/cls_2pc_queue_const.h

index 41e8166cecf10adcd254e11c2e65a427646b6c8d..7f7303ae07640d45294957165f0b990309237d01 100644 (file)
@@ -43,6 +43,18 @@ static int cls_2pc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferl
   return queue_init(hctx, init_op);
 }
 
+static int cls_2pc_queue_get_capacity(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  cls_queue_get_capacity_ret op_ret;
+  auto ret = queue_get_capacity(hctx, op_ret);
+  if (ret < 0) {
+    return ret;
+  }
+
+  encode(op_ret, *out);
+  return 0;
+}
+
 static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
   cls_2pc_queue_reserve_op res_op;
   try {
@@ -413,24 +425,80 @@ static int cls_2pc_queue_list_reservations(cls_method_context_t hctx, bufferlist
   return 0;
 }
 
+static int cls_2pc_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  auto in_iter = in->cbegin();
+  cls_queue_list_op op;
+  try {
+    decode(op, in_iter);
+  } catch (ceph::buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_list_entries: failed to decode entry: %s", err.what());
+    return -EINVAL;
+  }
+
+  cls_queue_head head;
+  auto ret = queue_read_head(hctx, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  cls_queue_list_ret op_ret;
+  ret = queue_list_entries(hctx, op, op_ret, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  encode(op_ret, *out);
+  return 0;
+}
+
+static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  auto in_iter = in->cbegin();
+  cls_queue_remove_op op;
+  try {
+    decode(op, in_iter);
+  } catch (ceph::buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode entry: %s", err.what());
+    return -EINVAL;
+  }
+
+  cls_queue_head head;
+  auto ret = queue_read_head(hctx, head);
+  if (ret < 0) {
+    return ret;
+  }
+  ret = queue_remove_entries(hctx, op, head);
+  if (ret < 0) {
+    return ret;
+  }
+  return queue_write_head(hctx, head);
+}
+
 CLS_INIT(2pc_queue)
 {
   CLS_LOG(1, "Loaded 2pc queue class!");
 
   cls_handle_t h_class;
   cls_method_handle_t h_2pc_queue_init;
+  cls_method_handle_t h_2pc_queue_get_capacity;
   cls_method_handle_t h_2pc_queue_reserve;
   cls_method_handle_t h_2pc_queue_commit;
   cls_method_handle_t h_2pc_queue_abort;
   cls_method_handle_t h_2pc_queue_list_reservations;
+  cls_method_handle_t h_2pc_queue_list_entries;
+  cls_method_handle_t h_2pc_queue_remove_entries;
 
   cls_register(TPC_QUEUE_CLASS, &h_class);
 
   cls_register_cxx_method(h_class, TPC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_init, &h_2pc_queue_init);
+  cls_register_cxx_method(h_class, TPC_QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_2pc_queue_get_capacity, &h_2pc_queue_get_capacity);
   cls_register_cxx_method(h_class, TPC_QUEUE_RESERVE, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_reserve, &h_2pc_queue_reserve);
   cls_register_cxx_method(h_class, TPC_QUEUE_COMMIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_commit, &h_2pc_queue_commit);
   cls_register_cxx_method(h_class, TPC_QUEUE_ABORT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_abort, &h_2pc_queue_abort);
   cls_register_cxx_method(h_class, TPC_QUEUE_LIST_RESERVATIONS, CLS_METHOD_RD, cls_2pc_queue_list_reservations, &h_2pc_queue_list_reservations);
+  cls_register_cxx_method(h_class, TPC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_2pc_queue_list_entries, &h_2pc_queue_list_entries);
+  cls_register_cxx_method(h_class, TPC_QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_remove_entries, &h_2pc_queue_remove_entries);
 
   return;
 }
index ebd7c4ada67f586363f4ddc7223ce261ff0d6cc2..45f31cf401a309b5d7b7616bee1e8e2b394c11a1 100644 (file)
@@ -19,7 +19,7 @@ void cls_2pc_queue_init(ObjectWriteOperation& op, const std::string& queue_name,
 
 int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const string& queue_name, uint64_t& size) {
   bufferlist in, out;
-  const auto r = io_ctx.exec(queue_name, QUEUE_CLASS, QUEUE_GET_CAPACITY, in, out);
+  const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, out);
   if (r < 0 ) {
     return r;
   }
@@ -91,7 +91,7 @@ int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const st
   op.max = max;
   encode(op, in);
 
-  const auto r = io_ctx.exec(queue_name, QUEUE_CLASS, QUEUE_LIST_ENTRIES, in, out);
+  const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out);
   if (r < 0) {
     return r;
   }
@@ -138,6 +138,6 @@ void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& e
   cls_queue_remove_op rem_op;
   rem_op.end_marker = end_marker;
   encode(rem_op, in);
-  op.exec(QUEUE_CLASS, QUEUE_REMOVE_ENTRIES, in);
+  op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in);
 }
 
index 2a22ff1a62c08eeff19c1341e6af9e5655ef8eb7..8bbdcd57529d50d4828280026146f988460a0e0d 100644 (file)
@@ -3,8 +3,11 @@
 #define TPC_QUEUE_CLASS "2pc_queue"
 
 #define TPC_QUEUE_INIT "2pc_queue_init"
+#define TPC_QUEUE_GET_CAPACITY "2pc_queue_get_capacity"
 #define TPC_QUEUE_RESERVE "2pc_queue_reserve"
 #define TPC_QUEUE_COMMIT "2pc_queue_commit"
 #define TPC_QUEUE_ABORT "2pc_queue_abort"
 #define TPC_QUEUE_LIST_RESERVATIONS "2pc_queue_list_reservations"
+#define TPC_QUEUE_LIST_ENTRIES "2pc_queue_list_entries"
+#define TPC_QUEUE_REMOVE_ENTRIES "2pc_queue_remove_entries"