]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
Restructuring code.
authorPritha Srivastava <prsrivas@redhat.com>
Thu, 11 Jul 2019 15:24:11 +0000 (20:54 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 15 Oct 2019 17:33:18 +0000 (23:03 +0530)
Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
16 files changed:
src/cls/queue/cls_queue.cc
src/cls/queue/cls_queue.h [deleted file]
src/cls/queue/cls_queue_client.cc
src/cls/queue/cls_queue_client.h
src/cls/queue/cls_queue_const.h
src/cls/queue/cls_queue_ops.cc
src/cls/queue/cls_queue_ops.h
src/cls/queue/cls_queue_src.cc
src/cls/queue/cls_queue_src.h [new file with mode: 0644]
src/cls/queue/cls_queue_types.cc
src/cls/queue/cls_queue_types.h
src/cls/queue/cls_rgw_queue.cc
src/cls/queue/cls_rgw_queue_ops.h
src/rgw/rgw_gc.cc
src/rgw/rgw_gc.h
src/test/cls_queue/test_cls_queue.cc

index fd4082ad80fd24c4630e1807aff91e92e82b4e44..f2ba7d754ad6637151269524002417897c201c03 100644 (file)
 #include "cls/queue/cls_queue_types.h"
 #include "cls/queue/cls_queue_ops.h"
 #include "cls/queue/cls_queue_const.h"
-#include "cls/queue/cls_queue.h"
+#include "cls/queue/cls_queue_src.h"
 
 CLS_VER(1,0)
 CLS_NAME(queue)
 
+static int cls_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  auto in_iter = in->cbegin();
+  cls_queue_init_op op;
+  op.has_urgent_data = false;
+  try {
+    decode(op, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_queue_init_op(): failed to decode entry\n");
+    return -EINVAL;
+  }
+
+  return init_queue(hctx, op);
+}
+
+static int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  cls_queue_get_size_ret op_ret;
+  auto ret = get_queue_size(hctx, op_ret);
+  if (ret < 0) {
+    return ret;
+  }
+
+  encode(op_ret, *out);
+  return 0;
+}
+
+static int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  auto iter = in->cbegin();
+  cls_queue_enqueue_op op;
+  try {
+    decode(op, iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_enqueue: failed to decode input data \n");
+    return -EINVAL;
+  }
+
+  cls_queue_head head;
+  auto ret = get_queue_head(hctx, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  ret = enqueue(hctx, op, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  //Write back head
+  return write_queue_head(hctx, head);
+}
+
+static int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  auto in_iter = in->cbegin();
+  cls_queue_list_op op;
+  try {
+    decode(op, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_queue_list_entries(): failed to decode input data\n");
+    return -EINVAL;
+  }
+
+  cls_queue_head head;
+  auto ret = get_queue_head(hctx, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  cls_queue_list_ret op_ret;
+  ret = queue_list_entries(hctx, op, op_ret, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  encode(op_ret, *out);
+  return 0;
+}
+
+static int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  auto in_iter = in->cbegin();
+  cls_queue_remove_op op;
+  try {
+    decode(op, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode input data\n");
+    return -EINVAL;
+  }
+
+  cls_queue_head head;
+  auto ret = get_queue_head(hctx, head);
+  if (ret < 0) {
+    return ret;
+  }
+  ret = queue_remove_entries(hctx, op, head);
+  if (ret < 0) {
+    return ret;
+  }
+  return write_queue_head(hctx, head);
+}
+
 CLS_INIT(queue)
 {
   CLS_LOG(1, "Loaded queue class!");
 
   cls_handle_t h_class;
-  cls_method_handle_t h_create_queue;
   cls_method_handle_t h_init_queue;
   cls_method_handle_t h_get_queue_size;
   cls_method_handle_t h_enqueue;
   cls_method_handle_t h_queue_list_entries;
   cls_method_handle_t h_queue_remove_entries;
-  cls_method_handle_t h_queue_update_last_entry;
-  cls_method_handle_t h_queue_get_last_entry;
-  cls_method_handle_t h_queue_read_urgent_data;
-  cls_method_handle_t h_queue_write_urgent_data;
-  cls_method_handle_t h_queue_can_urgent_data_fit;
  
   cls_register(QUEUE_CLASS, &h_class);
 
   /* queue*/
-  cls_register_cxx_method(h_class, CREATE_QUEUE, CLS_METHOD_WR, cls_create_queue, &h_create_queue);
   cls_register_cxx_method(h_class, INIT_QUEUE, CLS_METHOD_WR, cls_init_queue, &h_init_queue);
   cls_register_cxx_method(h_class, GET_QUEUE_SIZE, CLS_METHOD_RD, cls_get_queue_size, &h_get_queue_size);
   cls_register_cxx_method(h_class, ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_enqueue, &h_enqueue);
-  cls_register_cxx_method(h_class, QUEUE_LIST_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_list_entries, &h_queue_list_entries);
+  cls_register_cxx_method(h_class, QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_queue_list_entries, &h_queue_list_entries);
   cls_register_cxx_method(h_class, QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_remove_entries, &h_queue_remove_entries);
-  cls_register_cxx_method(h_class, QUEUE_UPDATE_LAST_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_update_last_entry, &h_queue_update_last_entry);
-  cls_register_cxx_method(h_class, QUEUE_GET_LAST_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_get_last_entry, &h_queue_get_last_entry);
-  cls_register_cxx_method(h_class, QUEUE_READ_URGENT_DATA, CLS_METHOD_RD, cls_queue_read_urgent_data, &h_queue_read_urgent_data);
-  cls_register_cxx_method(h_class, QUEUE_WRITE_URGENT_DATA, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_write_urgent_data, &h_queue_write_urgent_data);
-  cls_register_cxx_method(h_class, QUEUE_CAN_URGENT_DATA_FIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_can_urgent_data_fit, &h_queue_can_urgent_data_fit);
 
   return;
 }
diff --git a/src/cls/queue/cls_queue.h b/src/cls/queue/cls_queue.h
deleted file mode 100644 (file)
index 1f07057..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-#ifndef CEPH_CLS_QUEUE_H
-#define CEPH_CLS_QUEUE_H
-
-int cls_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_dequeue(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_get_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_update_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_read_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_write_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_can_urgent_data_fit(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-
-#endif /* CEPH_CLS_QUEUE_H */
\ No newline at end of file
index c610c71a912b6dcfbdb026ed071f973070a6748e..ea6116030fd67c079dd09f4169822d00324c8174 100644 (file)
@@ -4,6 +4,7 @@
 
 #include "cls/rgw/cls_rgw_ops.h"
 #include "cls/queue/cls_rgw_queue_ops.h"
+#include "cls/queue/cls_queue_ops.h"
 #include "cls/queue/cls_queue_const.h"
 #include "cls/queue/cls_queue_client.h"
 
 
 using namespace librados;
 
-void cls_rgw_gc_create_queue(ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries)
-{
-  bufferlist in;
-  cls_gc_create_queue_op call;
-  call.name = queue_name;
-  call.size = size;
-  call.num_urgent_data_entries = num_urgent_data_entries;
-  encode(call, in);
-  op.exec(RGW_QUEUE_CLASS, GC_CREATE_QUEUE, in);
-}
-
 void cls_rgw_gc_init_queue(ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries)
 {
   bufferlist in;
-  cls_gc_create_queue_op call;
+  cls_gc_init_queue_op call;
   call.name = queue_name;
   call.size = size;
   call.num_urgent_data_entries = num_urgent_data_entries;
@@ -40,13 +30,16 @@ int cls_rgw_gc_get_queue_size(IoCtx& io_ctx, string& oid, uint64_t& size)
   if (r < 0)
     return r;
 
+  cls_queue_get_size_ret op_ret;
   auto iter = out.cbegin();
   try {
-    decode(size, iter);
+    decode(op_ret, iter);
   } catch (buffer::error& err) {
     return -EIO;
   }
 
+  size = op_ret.queue_size;
+
   return 0;
 }
 
@@ -84,19 +77,17 @@ int cls_rgw_gc_list_queue(IoCtx& io_ctx, string& oid, string& marker, uint32_t m
 
   entries.swap(ret.entries);
 
-  if (truncated)
-    *truncated = ret.truncated;
+  *truncated = ret.truncated;
 
   next_marker = std::move(ret.next_marker);
 
   return 0;
 }
 
-void cls_rgw_gc_remove_entries_queue(ObjectWriteOperation& op, string& marker, uint32_t num_entries)
+void cls_rgw_gc_remove_entries_queue(ObjectWriteOperation& op, uint32_t num_entries)
 {
   bufferlist in, out;
   cls_rgw_gc_queue_remove_op rem_op;
-  rem_op.marker = marker;
   rem_op.num_entries = num_entries;
   encode(rem_op, in);
   op.exec(RGW_QUEUE_CLASS, GC_QUEUE_REMOVE_ENTRIES, in);
index 09809d3699c7df7ce0312e23eddc1ef39f627537..2650a0ad5c34159748737f024f95ceb9179e3d86 100644 (file)
 #include "common/ceph_time.h"
 #include "common/Cond.h"
 
-void cls_rgw_gc_create_queue(librados::ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries);
 void cls_rgw_gc_init_queue(librados::ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries);
 int cls_rgw_gc_get_queue_size(librados::IoCtx& io_ctx, string& oid, uint64_t& size);
 void cls_rgw_gc_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info);
-int cls_rgw_gc_dequeue(librados::IoCtx& io_ctx, string& oid, cls_rgw_gc_obj_info& info);
 int cls_rgw_gc_list_queue(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only,
                     list<cls_rgw_gc_obj_info>& entries, bool *truncated, string& next_marker);
-void cls_rgw_gc_remove_entries_queue(librados::ObjectWriteOperation& op, string& marker, uint32_t num_entries);
+void cls_rgw_gc_remove_entries_queue(librados::ObjectWriteOperation& op, uint32_t num_entries);
 void cls_rgw_gc_defer_entry_queue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info);
 
 #endif
\ No newline at end of file
index 0e45739d1c0b47c8e644ee108cc01336160c4e4e..3b61f00e0baa475ca624940cbe9c75a81ea0f950 100644 (file)
@@ -4,20 +4,15 @@
 #define QUEUE_CLASS "queue"
 #define RGW_QUEUE_CLASS "rgw_queue"
 
-#define CREATE_QUEUE "create_queue"
 #define INIT_QUEUE "init_queue"
 #define GET_QUEUE_SIZE "get_queue_size"
 #define ENQUEUE "enqueue"
-#define DEQUEUE "dequeue"
 #define QUEUE_LIST_ENTRIES "queue_list_entries"
 #define QUEUE_REMOVE_ENTRIES "queue_remove_entries"
-#define QUEUE_UPDATE_LAST_ENTRY "queue_update_last_entry"
-#define QUEUE_GET_LAST_ENTRY "queue_get_last_entry"
 #define QUEUE_READ_URGENT_DATA "queue_read_urgent_data"
 #define QUEUE_WRITE_URGENT_DATA "queue_write_urgent_data"
 #define QUEUE_CAN_URGENT_DATA_FIT "queue_can_urgent_data_fit"
 
-#define GC_CREATE_QUEUE "gc_create_queue"
 #define GC_INIT_QUEUE "gc_init_queue"
 #define GC_ENQUEUE "gc_enqueue"
 #define GC_DEQUEUE "gc_dequeue"
index 52792c9b70d8127d889dd13ad1d010f41a1de7f9..4e35090f6e9ea24120e1b412ca848fc4f5e9069d 100644 (file)
@@ -5,14 +5,3 @@
 #include "common/Formatter.h"
 #include "common/ceph_json.h"
 #include "include/utime.h"
-
-void cls_create_queue_op::dump(Formatter *f) const
-{
-  head.dump(f);
-}
-
-void cls_create_queue_op::generate_test_instances(list<cls_create_queue_op*>& ls)
-{
-  ls.push_back(new cls_create_queue_op);
-  ls.push_back(new cls_create_queue_op);
-}
\ No newline at end of file
index d2c90a179eb3d5cb82b58474e5b5db05ef669265..c7a8e7096993bf0ff3b1c7fe56794e4d272b8b5c 100644 (file)
@@ -3,16 +3,18 @@
 
 #include "cls/queue/cls_queue_types.h"
 
-struct cls_create_queue_op {
+struct cls_queue_init_op {
   cls_queue_head head;
   uint64_t head_size{0};
+  bool has_urgent_data{false};
 
-  cls_create_queue_op() {}
+  cls_queue_init_op() {}
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     encode(head, bl);
     encode(head_size, bl);
+    encode(has_urgent_data, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -20,61 +22,49 @@ struct cls_create_queue_op {
     DECODE_START(1, bl);
     decode(head, bl);
     decode(head_size, bl);
+    decode(has_urgent_data, bl);
     DECODE_FINISH(bl);
   }
 
-  void dump(Formatter *f) const;
-  static void generate_test_instances(list<cls_create_queue_op*>& ls);
 };
-WRITE_CLASS_ENCODER(cls_create_queue_op)
+WRITE_CLASS_ENCODER(cls_queue_init_op)
 
-struct cls_enqueue_op {
+struct cls_queue_enqueue_op {
   vector<bufferlist> bl_data_vec;
-  bufferlist bl_urgent_data;
 
-  cls_enqueue_op() {}
+  cls_queue_enqueue_op() {}
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     encode(bl_data_vec, bl);
-    encode(bl_urgent_data, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
     decode(bl_data_vec, bl);
-    decode(bl_urgent_data, bl);
     DECODE_FINISH(bl);
   } 
 };
-WRITE_CLASS_ENCODER(cls_enqueue_op)
-
-struct cls_dequeue_op {
-  bufferlist bl;
-
-  cls_dequeue_op() {}
-
-  static void generate_test_instances(list<cls_dequeue_op*>& ls);
-};
+WRITE_CLASS_ENCODER(cls_queue_enqueue_op)
 
 struct cls_queue_list_op {
   uint64_t max;
-  uint64_t start_offset;
+  string start_marker;
 
   cls_queue_list_op() {}
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     encode(max, bl);
-    encode(start_offset, bl);
+    encode(start_marker, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
     decode(max, bl);
-    decode(start_offset, bl);
+    decode(start_marker, bl);
     DECODE_FINISH(bl);
   }
 };
@@ -82,123 +72,68 @@ WRITE_CLASS_ENCODER(cls_queue_list_op)
 
 struct cls_queue_list_ret {
   bool is_truncated;
-  uint64_t next_offset;
+  string next_marker;
   vector<bufferlist> data;
-  vector<uint64_t> offsets;
-  bufferlist bl_urgent_data;
+  vector<string> markers;
 
   cls_queue_list_ret() {}
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     encode(is_truncated, bl);
-    encode(next_offset, bl);
+    encode(next_marker, bl);
     encode(data, bl);
-    encode(offsets, bl);
-    encode(bl_urgent_data, bl);
+    encode(markers, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
     decode(is_truncated, bl);
-    decode(next_offset, bl);
+    decode(next_marker, bl);
     decode(data, bl);
-    decode(offsets, bl);
-    decode(bl_urgent_data, bl);
+    decode(markers, bl);
     DECODE_FINISH(bl);
   }
 };
 WRITE_CLASS_ENCODER(cls_queue_list_ret)
 
 struct cls_queue_remove_op {
-  uint64_t start_offset;
-  uint64_t end_offset;
-  bufferlist bl_urgent_data;
+  string end_marker;
 
   cls_queue_remove_op() {}
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
-    encode(start_offset, bl);
-    encode(end_offset, bl);
-    encode(bl_urgent_data, bl);
+    encode(end_marker, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
-    decode(start_offset, bl);
-    decode(end_offset, bl);
-    decode(bl_urgent_data, bl);
+    decode(end_marker, bl);
     DECODE_FINISH(bl);
   }
 };
 WRITE_CLASS_ENCODER(cls_queue_remove_op)
 
-struct cls_queue_urgent_data_ret {
-  bufferlist bl_urgent_data;
-
-  cls_queue_urgent_data_ret() {}
-
-  void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
-    encode(bl_urgent_data, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(bl_urgent_data, bl);
-    DECODE_FINISH(bl);
-  }
-};
-WRITE_CLASS_ENCODER(cls_queue_urgent_data_ret)
-
-struct cls_queue_write_urgent_data_op {
-  bufferlist bl_urgent_data;
+struct cls_queue_get_size_ret {
+  uint64_t queue_size;
 
-  cls_queue_write_urgent_data_op() {}
+  cls_queue_get_size_ret() {}
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
-    encode(bl_urgent_data, bl);
+    encode(queue_size, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
-    decode(bl_urgent_data, bl);
+    decode(queue_size, bl);
     DECODE_FINISH(bl);
   }
 };
-WRITE_CLASS_ENCODER(cls_queue_write_urgent_data_op)
-
-struct cls_queue_update_last_entry_op {
-  bufferlist bl_data;
-  bufferlist bl_urgent_data;
-
-  cls_queue_update_last_entry_op() {}
-
-  void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
-    encode(bl_data, bl);
-    encode(bl_urgent_data, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(bl_data, bl);
-    decode(bl_urgent_data, bl);
-    DECODE_FINISH(bl);
-  }
-};
-WRITE_CLASS_ENCODER(cls_queue_update_last_entry_op)
-
-struct cls_init_queue_op : cls_create_queue_op{
-
-};
-WRITE_CLASS_ENCODER(cls_init_queue_op)
+WRITE_CLASS_ENCODER(cls_queue_get_size_ret)
 
 #endif /* CEPH_CLS_QUEUE_OPS_H */
\ No newline at end of file
index bf396971bc95039c02304dee30fead83f3c2f881..52aae9b57f5818e78580a52b76d8fe43158f89ad 100644 (file)
 #include "cls/queue/cls_queue_types.h"
 #include "cls/queue/cls_queue_ops.h"
 #include "cls/queue/cls_queue_const.h"
-#include "cls/queue/cls_queue.h"
+#include "cls/queue/cls_queue_src.h"
 
-static int get_queue_head_and_size(cls_method_context_t hctx, cls_queue_head& head, uint64_t& head_size)
+int write_queue_head(cls_method_context_t hctx, cls_queue_head& head)
 {
-  //read head size
-  bufferlist bl_head_size;
-  int ret = cls_cxx_read(hctx, 0, sizeof(uint64_t), &bl_head_size);
+  bufferlist bl;
+  uint16_t entry_start = QUEUE_HEAD_START;
+  encode(entry_start, bl);
+
+  bufferlist bl_head;
+  encode(head, bl_head);
+
+  uint64_t encoded_len = bl_head.length();
+  encode(encoded_len, bl);
+
+  bl.claim_append(bl_head);
+
+  int ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
   if (ret < 0) {
-    CLS_LOG(0, "ERROR: get_queue_head_and_size: failed to read head with error %d\n", ret);
+    CLS_LOG(5, "ERROR: write_queue_head: failed to write head \n");
     return ret;
   }
-  //decode head size
-  auto iter = bl_head_size.cbegin();
-  try {
-    decode(head_size, iter);
-  } catch (buffer::error& err) {
-    CLS_LOG(0, "ERROR: get_queue_head_and_size: failed to decode head size \n");
-    return -EINVAL;
-  }
+  return 0;
+}
 
-  CLS_LOG(10, "INFO: get_queue_head_and_size: head size is %lu\n", head_size);
+int get_queue_head(cls_method_context_t hctx, cls_queue_head& head)
+{
+  uint64_t chunk_size = 1024, start_offset = 0;
 
-  //read the head
   bufferlist bl_head;
-  ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+  int ret = cls_cxx_read(hctx, start_offset, chunk_size, &bl_head);
   if (ret < 0) {
+    CLS_LOG(5, "ERROR: get_queue_head: failed to read head \n");
     return ret;
   }
 
-  //decode head
-  iter = bl_head.cbegin();
+  //Process the chunk of data read
+  auto it = bl_head.cbegin();
+  // Queue head start
+  uint16_t queue_head_start;
   try {
-    decode(head, iter);
+    decode(queue_head_start, it);
   } catch (buffer::error& err) {
-    CLS_LOG(1, "ERROR: get_queue_head_and_size: failed to decode head\n");
+    CLS_LOG(0, "ERROR: get_queue_head: failed to decode queue start \n");
+    return -EINVAL;
+  }
+  if (queue_head_start != QUEUE_HEAD_START) {
+    CLS_LOG(0, "ERROR: get_queue_head: invalid queue start \n");
     return -EINVAL;
   }
 
-  return 0;
-}
-
-int cls_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
-  auto in_iter = in->cbegin();
-
-  cls_create_queue_op op;
+  uint64_t encoded_len;
   try {
-    decode(op, in_iter);
+    decode(encoded_len, it);
   } catch (buffer::error& err) {
-    CLS_LOG(1, "ERROR: cls_create_queue(): failed to decode entry\n");
+    CLS_LOG(0, "ERROR: get_queue_head: failed to decode encoded head size \n");
     return -EINVAL;
   }
 
-  // create the object
-  int ret = cls_cxx_create(hctx, true);
-  if (ret < 0) {
-    CLS_LOG(0, "ERROR: %s(): cls_cxx_create returned %d", __func__, ret);
-    return ret;
-  }
-  
-  CLS_LOG(10, "INFO: cls_create_queue create queue of head size %lu", op.head_size);
-  CLS_LOG(10, "INFO: cls_create_queue create queue of size %lu", op.head.queue_size);
-
-  uint64_t head_size = QUEUE_HEAD_SIZE_1K;
-
-  if (op.head.has_urgent_data) {
-    if (op.head_size == 0) {
-      head_size = QUEUE_HEAD_SIZE_4K;
-      op.head.tail = op.head.front = QUEUE_START_OFFSET_4K;
-      op.head.last_entry_offset = QUEUE_START_OFFSET_4K;
-    } else {
-      head_size = op.head_size;
-      op.head.tail = op.head.front = head_size;
-      op.head.last_entry_offset = head_size;
+  uint8_t decoded_head_size = sizeof(uint64_t) + sizeof(uint16_t);
+  if (encoded_len > (chunk_size - decoded_head_size)) {
+    chunk_size = (encoded_len - (chunk_size - decoded_head_size));
+    start_offset += decoded_head_size;
+    bufferlist bl_remaining_head;
+    int ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+    if (ret < 0) {
+      CLS_LOG(5, "ERROR: get_queue_head: failed to read remaining part of head \n");
+      return ret;
     }
-  } else {
-    head_size = QUEUE_HEAD_SIZE_1K;
-    op.head.tail = op.head.front = QUEUE_START_OFFSET_1K;
-    op.head.last_entry_offset = QUEUE_START_OFFSET_1K;
+    bl_head.claim_append(bl_remaining_head);
   }
-  op.head.queue_size += head_size;
-
-  CLS_LOG(10, "INFO: cls_create_queue queue actual size %lu", op.head.queue_size);
-  CLS_LOG(10, "INFO: cls_create_queue head size %lu", head_size);
-  CLS_LOG(10, "INFO: cls_create_queue queue front offset %lu", op.head.front);
-
-
-  //encode head size
-  bufferlist bl;
-  encode(head_size, bl);
-  CLS_LOG(0, "INFO: cls_create_queue head size %u", bl.length());
 
-  //encode head
-  bufferlist bl_head;
-  encode(op.head, bl_head);
-
-  bl.claim_append(bl_head);
-
-  CLS_LOG(0, "INFO: cls_create_queue writing head of size %u", bl.length());
-  ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
-  if (ret < 0) {
-    CLS_LOG(0, "ERROR: %s(): cls_cxx_write returned %d", __func__, ret);
-    return ret;
+  try {
+    decode(head, it);
+  } catch (buffer::error& err) {
+    CLS_LOG(0, "ERROR: get_queue_head: failed to decode head\n");
+    return -EINVAL;
   }
+
   return 0;
 }
 
-int cls_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int init_queue(cls_method_context_t hctx, const cls_queue_init_op& op)
 {
   //get head and its size
-  uint64_t head_size = 0;
   cls_queue_head head;
-  int ret = get_queue_head_and_size(hctx, head, head_size);
+  int ret = get_queue_head(hctx, head);
 
   //head is already initialized
   if (ret == 0) {
-    CLS_LOG(1, "INFO: cls_init_queue_op(): head is already initialized\n");
-    return 0;
+    return -EEXIST;
   }
 
   if (ret < 0 && ret != -EINVAL) {
     return ret;
   }
 
-  auto in_iter = in->cbegin();
-  cls_init_queue_op op;
-  try {
-    decode(op, in_iter);
-  } catch (buffer::error& err) {
-    CLS_LOG(1, "ERROR: cls_init_queue_op(): failed to decode entry\n");
-    return -EINVAL;
-  }
+  head = std::move(op.head);
 
-  CLS_LOG(10, "INFO: cls_init_queue_op init queue of head size %lu", op.head_size);
-  CLS_LOG(10, "INFO: cls_init_queue_op init queue of size %lu", op.head.queue_size);
-
-  head_size = QUEUE_HEAD_SIZE_1K;
-
-  if (op.head.has_urgent_data) {
+  if (op.has_urgent_data) {
     if (op.head_size == 0) {
-      head_size = QUEUE_HEAD_SIZE_4K;
-      op.head.tail = op.head.front = QUEUE_START_OFFSET_4K;
-      op.head.last_entry_offset = QUEUE_START_OFFSET_4K;
+      head.max_head_size = QUEUE_HEAD_SIZE_4K;
+      head.tail.offset = head.front.offset = QUEUE_START_OFFSET_4K;
     } else {
-      head_size = op.head_size;
-      op.head.tail = op.head.front = head_size;
-      op.head.last_entry_offset = head_size;
+      head.max_head_size = op.head_size;
+      head.tail.offset = head.front.offset = head.max_head_size;
     }
+    bufferlist bl_non_urgent;
+    uint16_t queue_start = QUEUE_HEAD_START;
+    uint64_t queue_encoded_len = 0;//dummy value for calculating max size of urgent data
+    encode(queue_start, bl_non_urgent);
+    encode(queue_encoded_len, bl_non_urgent);
+    encode(head, bl_non_urgent);
+    head.max_urgent_data_size = head.max_head_size - (bl_non_urgent.length() - head.bl_urgent_data.length());
   } else {
-    head_size = QUEUE_HEAD_SIZE_1K;
-    op.head.tail = op.head.front = QUEUE_START_OFFSET_1K;
-    op.head.last_entry_offset = QUEUE_START_OFFSET_1K;
+    head.max_head_size = QUEUE_HEAD_SIZE_1K;
+    head.tail.offset = head.front.offset = QUEUE_START_OFFSET_1K;
+    head.max_urgent_data_size = 0;
   }
-  op.head.queue_size += head_size;
-
-  CLS_LOG(10, "INFO: cls_init_queue_op queue actual size %lu", op.head.queue_size);
-  CLS_LOG(10, "INFO: cls_init_queue_op head size %lu", head_size);
-  CLS_LOG(10, "INFO: cls_init_queue_op queue front offset %lu", op.head.front);
-
-  //encode head size
-  bufferlist bl;
-  encode(head_size, bl);
-  CLS_LOG(0, "INFO: cls_init_queue_op head size %u", bl.length());
-
-  //encode head
-  bufferlist bl_head;
-  encode(op.head, bl_head);
-
-  bl.claim_append(bl_head);
+  head.tail.gen = head.front.gen = 0;
+  head.queue_size += head.max_head_size;
+  
+  CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head.queue_size);
+  CLS_LOG(20, "INFO: init_queue_op head size %lu", head.max_head_size);
+  CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head.front.to_str().c_str());
+  CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head.max_urgent_data_size);
 
-  CLS_LOG(0, "INFO: cls_init_queue_op writing head of size %u", bl.length());
-  ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
-  if (ret < 0) {
-    CLS_LOG(0, "ERROR: %s(): cls_init_queue_op returned %d", __func__, ret);
-    return ret;
-  }
-  return 0;
+  return write_queue_head(hctx, head);
 }
 
-int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int get_queue_size(cls_method_context_t hctx, cls_queue_get_size_ret& op_ret)
 {
-  //get head and its size
-  uint64_t head_size = 0;
+  //get head
   cls_queue_head head;
-  int ret = get_queue_head_and_size(hctx, head, head_size);
+  int ret = get_queue_head(hctx, head);
   if (ret < 0) {
     return ret;
   }
 
-  head.queue_size -= head_size;
-
-  CLS_LOG(10, "INFO: cls_get_queue_size: size of queue is %lu\n", head.queue_size);
+  op_ret.queue_size = head.queue_size - head.max_head_size;
 
-  encode(head.queue_size, *out);
+  CLS_LOG(20, "INFO: get_queue_size: size of queue is %lu\n", op_ret.queue_size);
 
   return 0;
 }
 
-int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head)
 {
-  //get head and its size
-  uint64_t head_size = 0;
-  cls_queue_head head;
-  int ret = get_queue_head_and_size(hctx, head, head_size);
-  if (ret < 0) {
-    return ret;
-  }
-
-  if ((head.front == head.tail) && (! head.is_empty)) {
-    CLS_LOG(1, "ERROR: No space left in queue\n");
+  if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) {
+    CLS_LOG(0, "ERROR: No space left in queue\n");
     return -ENOSPC;
   }
 
-  auto iter = in->cbegin();
-  cls_enqueue_op op;
-  try {
-    decode(op, iter);
-  } catch (buffer::error& err) {
-    CLS_LOG(1, "ERROR: cls_enqueue: failed to decode input data \n");
-    return -EINVAL;
-  }
-
-  for (auto bl_data : op.bl_data_vec) {
+  for (auto& bl_data : op.bl_data_vec) {
     bufferlist bl;
+    uint16_t entry_start = QUEUE_ENTRY_START;
+    encode(entry_start, bl);
     uint64_t data_size = bl_data.length();
     encode(data_size, bl);
     bl.claim_append(bl_data);
+  
+    CLS_LOG(10, "INFO: enqueue(): Total size to be written is %u and data size is %u\n", bl.length(), bl_data.length());
 
-    CLS_LOG(1, "INFO: cls_enqueue(): Total size to be written is %u and data size is %u\n", bl.length(), bl_data.length());
-
-    if (head.tail >= head.front) {
+    if (head.tail.offset >= head.front.offset) {
       // check if data can fit in the remaining space in queue
-      if ((head.tail + bl.length()) <= head.queue_size) {
-        CLS_LOG(1, "INFO: cls_enqueue: Writing data size and data: offset: %lu, size: %u\n", head.tail, bl.length());
-        head.last_entry_offset = head.tail;
+      if ((head.tail.offset + bl.length()) <= head.queue_size) {
+        CLS_LOG(5, "INFO: enqueue: Writing data size and data: offset: %s, size: %u\n", head.tail.to_str().c_str(), bl.length());
         //write data size and data at tail offset
-        ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+        auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
         if (ret < 0) {
           return ret;
         }
-        head.tail += bl.length();
+        head.tail.offset += bl.length();
       } else {
-        CLS_LOG(1, "INFO: Wrapping around and checking for free space\n");
-        uint64_t free_space_available = (head.queue_size - head.tail) + (head.front - head_size);
+        uint64_t free_space_available = (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size);
         //Split data if there is free space available
         if (bl.length() <= free_space_available) {
-          uint64_t size_before_wrap = head.queue_size - head.tail;
+          uint64_t size_before_wrap = head.queue_size - head.tail.offset;
           bufferlist bl_data_before_wrap;
           bl.splice(0, size_before_wrap, &bl_data_before_wrap);
-          head.last_entry_offset = head.tail;
           //write spliced (data size and data) at tail offset
-          CLS_LOG(1, "INFO: cls_enqueue: Writing spliced data at offset: %lu and data size: %u\n", head.tail, bl_data_before_wrap.length());
-          ret = cls_cxx_write2(hctx, head.tail, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+          CLS_LOG(5, "INFO: enqueue: Writing spliced data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl_data_before_wrap.length());
+          auto ret = cls_cxx_write2(hctx, head.tail.offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
           if (ret < 0) {
             return ret;
           }
-          head.tail = head_size;
+          head.tail.offset = head.max_head_size;
+          head.tail.gen += 1;
           //write remaining data at tail offset after wrapping around
-          CLS_LOG(1, "INFO: cls_enqueue: Writing remaining data at offset: %lu and data size: %u\n", head.tail, bl.length());
-          ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+          CLS_LOG(5, "INFO: enqueue: Writing remaining data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl.length());
+          ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
           if (ret < 0) {
             return ret;
           }
-          head.tail += bl.length();
+          head.tail.offset += bl.length();
         } else {
-          CLS_LOG(1, "ERROR: No space left in queue\n");
+          CLS_LOG(0, "ERROR: No space left in queue\n");
           // return queue full error
           return -ENOSPC;
         }
       }
-    } else if (head.front > head.tail) {
-      if ((head.tail + bl.length()) <= head.front) {
-        CLS_LOG(1, "INFO: cls_enqueue: Writing data size and data: offset: %lu, size: %u\n\n", head.tail, bl.length());
-        head.last_entry_offset = head.tail;
+    } else if (head.front.offset > head.tail.offset) {
+      if ((head.tail.offset + bl.length()) <= head.front.offset) {
+        CLS_LOG(5, "INFO: enqueue: Writing data size and data: offset: %s, size: %u\n\n", head.tail.to_str().c_str(), bl.length());
         //write data size and data at tail offset
-        ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+        auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
         if (ret < 0) {
           return ret;
         }
-        head.tail += bl.length();
+        head.tail.offset += bl.length();
       } else {
-        CLS_LOG(1, "ERROR: No space left in queue\n");
+        CLS_LOG(0, "ERROR: No space left in queue\n");
         // return queue full error
         return -ENOSPC;
       }
     }
 
-    if (head.tail == head.queue_size) {
-      head.tail = head_size;
+    if (head.tail.offset == head.queue_size) {
+      head.tail.offset = head.max_head_size;
+      head.tail.gen += 1;
     }
-    CLS_LOG(1, "INFO: cls_enqueue: New tail offset: %lu \n", head.tail);
+    CLS_LOG(20, "INFO: enqueue: New tail offset: %s \n", head.tail.to_str().c_str());
   } //end - for
 
-  head.is_empty = false;
-
-  //Update urgent data if set
-  if (op.bl_urgent_data.length() > 0) {
-    head.bl_urgent_data = op.bl_urgent_data;
-  }
-
-  bufferlist bl_head;
-  encode(head, bl_head);
-  CLS_LOG(1, "INFO: cls_enqueue: Writing head of size: %u \n", bl_head.length());
-  ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
-  if (ret < 0) {
-    CLS_LOG(1, "INFO: cls_enqueue: Writing head returned error: %d \n", ret);
-    return ret;
-  }
-
   return 0;
 }
 
-int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head)
 {
-  //get head and its size
-  uint64_t head_size = 0;
-  cls_queue_head head;
-  int ret = get_queue_head_and_size(hctx, head, head_size);
-  if (ret < 0) {
-    return ret;
-  }
-
   // If queue is empty, return from here
-  if (head.is_empty) {
-    return -ENOENT;
+  if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
+    CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s\n", head.front.to_str().c_str());
+    op_ret.next_marker = head.front.to_str();
+    op_ret.is_truncated = false;
+    return 0;
   }
 
-  cls_queue_list_ret op_ret;
-  CLS_LOG(1, "INFO: cls_queue_list_entries: Is urgent data present: %d\n", head.has_urgent_data);
-  //Info related to urgent data
-  op_ret.bl_urgent_data = head.bl_urgent_data;
-
-  auto in_iter = in->cbegin();
-
-  cls_queue_list_op op;
-  try {
-    decode(op, in_iter);
-  } catch (buffer::error& err) {
-    CLS_LOG(1, "ERROR: cls_queue_list_entries(): failed to decode input data\n");
-    return -EINVAL;
-  }
+  cls_queue_marker start_marker;
+  start_marker.from_str(op.start_marker.c_str());
+  cls_queue_marker next_marker = {0, 0};
 
-  uint64_t start_offset = 0;
-  if (op.start_offset == 0) {
-    start_offset = head.front;
+  uint64_t start_offset = 0, gen = 0;
+  if (start_marker.offset == 0) {
+    start_offset = head.front.offset;
+    gen = head.front.gen;
   } else {
-    start_offset = op.start_offset;
+    start_offset = start_marker.offset;
+    gen = start_marker.gen;
   }
 
   op_ret.is_truncated = true;
@@ -364,25 +264,26 @@ int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist
   bool wrap_around = false;
 
   //Calculate length of contiguous data to be read depending on front, tail and start offset
-  if (head.tail > head.front) {
-    contiguous_data_size = head.tail - start_offset;
-  } else if (head.front >= head.tail) {
-    if (start_offset >= head.front) {
+  if (head.tail.offset > head.front.offset) {
+    contiguous_data_size = head.tail.offset - start_offset;
+  } else if (head.front.offset >= head.tail.offset) {
+    if (start_offset >= head.front.offset) {
       contiguous_data_size = head.queue_size - start_offset;
       wrap_around = true;
-    } else if (start_offset <= head.tail) {
-      contiguous_data_size = head.tail - start_offset;
+    } else if (start_offset <= head.tail.offset) {
+      contiguous_data_size = head.tail.offset - start_offset;
     }
   }
 
-  CLS_LOG(1, "INFO: cls_queue_list_entries(): front is: %lu, tail is %lu\n", head.front, head.tail);
+  CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s\n", head.front.to_str().c_str(), head.tail.to_str().c_str());
 
-  bool offset_populated = false;
-  uint64_t num_ops = 0;
+  bool offset_populated = false, entry_start_processed = false;
+  uint64_t data_size = 0, num_ops = 0;
+  uint16_t entry_start = 0;
   bufferlist bl;
   do
   {
-    CLS_LOG(1, "INFO: cls_queue_list_entries(): start_offset is %lu\n", start_offset);
+    CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu\n", start_offset);
   
     bufferlist bl_chunk;
     //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size
@@ -391,87 +292,105 @@ int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist
     } else {
       size_to_read = contiguous_data_size;
     }
-    CLS_LOG(1, "INFO: cls_queue_list_entries(): size_to_read is %lu\n", size_to_read);
+    CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu\n", size_to_read);
     if (size_to_read == 0) {
+      next_marker = head.tail;
       op_ret.is_truncated = false;
-      CLS_LOG(1, "INFO: cls_queue_list_entries(): size_to_read is 0, hence breaking out!\n");
+      CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n");
       break;
     }
 
-    ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk);
+    auto ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk);
     if (ret < 0) {
       return ret;
     }
 
     //If there is leftover data from previous iteration, append new data to leftover data
     bl.claim_append(bl_chunk);
-    bl_chunk = bl;
-    bl.clear();
+    bl_chunk = std::move(bl);
 
-    CLS_LOG(1, "INFO: cls_queue_list_entries(): size of chunk %u\n", bl_chunk.length());
+    CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u\n", bl_chunk.length());
 
     //Process the chunk of data read
     unsigned index = 0;
     auto it = bl_chunk.cbegin();
     uint64_t size_to_process = bl_chunk.length();
     do {
-      CLS_LOG(1, "INFO: cls_queue_list_entries(): index: %u, size_to_process: %lu\n", index, size_to_process);
+      CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu\n", index, size_to_process);
       it.seek(index);
-      uint64_t data_size = 0;
-      //Populate offset if not done, else don't if already done in previous iteration
+      //Populate offset if not done in previous iteration
       if (! offset_populated) {
-        uint64_t data_offset = start_offset + index;
-        op_ret.offsets.emplace_back(data_offset);
-        CLS_LOG(1, "INFO: cls_queue_list_entries(): offset: %lu\n", data_offset);
+        cls_queue_marker marker = {start_offset + index, gen};
+        CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker.to_str().c_str());
+        string opaque_marker = marker.to_str();
+        op_ret.markers.emplace_back(opaque_marker);
       }
-      if (size_to_process >= sizeof(uint64_t)) {
-        try {
-          decode(data_size, it);
-        } catch (buffer::error& err) {
-          CLS_LOG(1, "ERROR: cls_queue_list_entries: failed to decode data size \n");
-          return -EINVAL;
+      // Magic number + Data size - process if not done in previous iteration
+      if (! entry_start_processed ) {
+        if (size_to_process >= (sizeof(uint16_t) + sizeof(uint64_t))) {
+          // Decode magic number at start
+          try {
+            decode(entry_start, it);
+          } catch (buffer::error& err) {
+            CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start \n");
+            return -EINVAL;
+          }
+          if (entry_start != QUEUE_ENTRY_START) {
+            CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u\n", entry_start);
+            return -EINVAL;
+          }
+          index += sizeof(uint16_t);
+          size_to_process -= sizeof(uint16_t);
+          // Decode data size
+          try {
+            decode(data_size, it);
+          } catch (buffer::error& err) {
+            CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size \n");
+            return -EINVAL;
+          }
+        } else {
+          // Copy unprocessed data to bl
+          bl_chunk.splice(index, size_to_process, &bl);
+          offset_populated = true;
+          CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!\n");
+          break;
         }
-      } else {
-        // Copy unprocessed data to bl
-        bl_chunk.copy(index, size_to_process, bl);
-        offset_populated = true;
-        CLS_LOG(1, "INFO: cls_queue_list_entries: not enough data to read data size, breaking out!\n");
-        break;
+        CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu\n", data_size);
+        index += sizeof(uint64_t);
+        size_to_process -= sizeof(uint64_t);
       }
-      CLS_LOG(1, "INFO: cls_queue_list_entries(): data size: %lu\n", data_size);
-      index += sizeof(uint64_t);
-      size_to_process -= sizeof(uint64_t);
-      bufferlist bl_data;
+      // Data
       if (data_size <= size_to_process) {
+        bufferlist bl_data;
         bl_chunk.copy(index, data_size, bl_data);
         //Return data here
         op_ret.data.emplace_back(bl_data);
         index += bl_data.length();
         size_to_process -= bl_data.length();
       } else {
-        index -= sizeof(uint64_t);
-        size_to_process += sizeof(uint64_t);
         bl_chunk.copy(index, size_to_process, bl);
         offset_populated = true;
-        CLS_LOG(1, "INFO: cls_queue_list_entries(): not enough data to read data, breaking out!\n");
+        entry_start_processed = true;
+        CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!\n");
         break;
       }
+      // Resetting some values
       offset_populated = false;
+      entry_start_processed = false;
+      data_size = 0;
+      entry_start = 0;
       num_ops++;
       if (num_ops == op.max) {
-        CLS_LOG(1, "INFO: cls_queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n");
-        break;
-      }
-      if (index == bl_chunk.length()) {
+        CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n");
         break;
       }
     } while(index < bl_chunk.length());
 
-    CLS_LOG(1, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max);
+    CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max);
 
     if (num_ops == op.max) {
-      op_ret.next_offset = start_offset + index;
-      CLS_LOG(1, "INFO: cls_queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", op_ret.next_offset);
+      next_marker = cls_queue_marker{(start_offset + index), gen};
+      CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", next_marker.offset);
       break;
     }
 
@@ -480,12 +399,13 @@ int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist
     contiguous_data_size -= size_to_read;
     if (contiguous_data_size == 0) {
       if (wrap_around) {
-        start_offset = head_size;
-        contiguous_data_size = head.tail - head_size;
+        start_offset = head.max_head_size;
+        contiguous_data_size = head.tail.offset - head.max_head_size;
+        gen += 1;
         wrap_around = false;
       } else {
-        CLS_LOG(1, "INFO: cls_queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n");
-        op_ret.next_offset = head.front;
+        CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n");
+        next_marker = head.tail;
         op_ret.is_truncated = false;
         break;
       }
@@ -494,408 +414,78 @@ int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist
   } while(num_ops < op.max);
 
   //Wrap around next offset if it has reached end of queue
-  if (op_ret.next_offset == head.queue_size) {
-    op_ret.next_offset = head_size;
+  if (next_marker.offset == head.queue_size) {
+    next_marker.offset = head.max_head_size;
+    next_marker.gen += 1;
   }
-  if (op_ret.next_offset == head.tail) {
+  if ((next_marker.offset == head.tail.offset) && (next_marker.gen == head.tail.gen)) {
     op_ret.is_truncated = false;
   }
 
-  encode(op_ret, *out);
+  CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s\n", next_marker.to_str().c_str());
+  op_ret.next_marker = next_marker.to_str();
 
   return 0;
 }
 
-int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head)
 {
-  //get head and its size
-  uint64_t head_size = 0;
-  cls_queue_head head;
-  int ret = get_queue_head_and_size(hctx, head, head_size);
-  if (ret < 0) {
-    return ret;
-  }
-
-  if (head.is_empty) {
-    return -ENOENT;
-  }
-
-  auto in_iter = in->cbegin();
-  cls_queue_remove_op op;
-  try {
-    decode(op, in_iter);
-  } catch (buffer::error& err) {
-    CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode input data\n");
-    return -EINVAL;
-  }
-
-  // If start offset is not set or set to zero, then we need to shift it to actual front of queue
-  if (op.start_offset == 0) {
-    op.start_offset = head.front;
-  }
-
-  if (op.start_offset != head.front) {
-    CLS_LOG(1, "ERROR: cls_queue_remove_entries: invalid start offset\n");
-    CLS_LOG(1, "ERROR: cls_queue_remove_entries: start offset = %lu\n", op.start_offset);
-    CLS_LOG(1, "ERROR: cls_queue_remove_entries: front = %lu\n", head.front);
-    return -EINVAL;
+  //Queue is empty
+  if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
+    return 0;
   }
 
-  uint64_t end_offset = 0;
-  uint64_t data_size = 0;
-
-  //Check for data size wrap around
-  if ((head.queue_size - op.end_offset) >= sizeof(uint64_t)) {
-    // Read the size from the end offset
-    bufferlist bl_size;
-    ret = cls_cxx_read(hctx, op.end_offset, sizeof(uint64_t), &bl_size);
-    if (ret < 0) {
-      CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read data size \n");
-      return ret;
-    }
-    auto iter = bl_size.cbegin();
-    try {
-      decode(data_size, iter);
-    } catch (buffer::error& err) {
-      CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n");
-      return -EINVAL;
-    }
-    //Check for data wrap around
-    if ((head.queue_size - (op.end_offset + sizeof(uint64_t))) >= data_size) {
-      end_offset = op.end_offset + sizeof(uint64_t) + data_size;
-    } else {
-      uint64_t rem_size = data_size - (head.queue_size - (op.end_offset + sizeof(uint64_t)));
-      end_offset = head_size + rem_size;
-    }
-  } else {
-    bufferlist bl_size;
-    ret = cls_cxx_read(hctx, op.end_offset, (head.queue_size - op.end_offset), &bl_size);
-    if (ret < 0) {
-      CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read first part of data size \n");
-      return ret;
-    }
-    uint64_t rem_size = sizeof(uint64_t) - (head.queue_size - op.end_offset);
-    bufferlist bl;
-    ret = cls_cxx_read(hctx, head_size, rem_size, &bl);
-    if (ret < 0) {
-      CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read second part of data size \n");
-      return ret;
-    }
-    bl_size.claim_append(bl);
-    auto iter = bl_size.cbegin();
-    try {
-      decode(data_size, iter);
-    } catch (buffer::error& err) {
-      CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n");
-      return -EINVAL;
-    }
-    end_offset = head_size + rem_size + data_size;
-  }
+  cls_queue_marker end_marker;
+  end_marker.from_str(op.end_marker.c_str());
 
-  CLS_LOG(1, "INFO: cls_queue_remove_entries: op.end_offset = %lu\n", op.end_offset);
-  CLS_LOG(1, "INFO: cls_queue_remove_entries: data_size = %lu\n", data_size);
-  CLS_LOG(1, "INFO: cls_queue_remove_entries: end_offset = %lu\n", end_offset);
+  CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s\n", end_marker.to_str().c_str());
 
   //Zero out the entries that have been removed, to reclaim storage space
-  if (end_offset > op.start_offset) {
-    ret = cls_cxx_write_zero(hctx, op.start_offset, (end_offset - op.start_offset));
-    if (ret < 0) {
-      CLS_LOG(1, "INFO: cls_queue_remove_entries: Failed to zero out entries\n");
-      CLS_LOG(1, "INFO: cls_queue_remove_entries: Start offset = %lu\n", op.start_offset);
-      CLS_LOG(1, "INFO: cls_queue_remove_entries: Length = %lu\n", (end_offset - op.start_offset));
-      return ret;
-    }
-  } else { //start offset > end offset
-    ret = cls_cxx_write_zero(hctx, op.start_offset, (head.queue_size - op.start_offset));
-    if (ret < 0) {
-      CLS_LOG(1, "INFO: cls_queue_remove_entries: Failed to zero out entries\n");
-      CLS_LOG(1, "INFO: cls_queue_remove_entries: Start offset = %lu\n", op.start_offset);
-      CLS_LOG(1, "INFO: cls_queue_remove_entries: Length = %lu\n", (head.queue_size - op.start_offset));
-      return ret;
-    }
-    ret = cls_cxx_write_zero(hctx, head_size, (end_offset - head_size));
-    if (ret < 0) {
-      CLS_LOG(1, "INFO: cls_queue_remove_entries: Failed to zero out entries\n");
-      CLS_LOG(1, "INFO: cls_queue_remove_entries: Start offset = %lu\n", head_size);
-      CLS_LOG(1, "INFO: cls_queue_remove_entries: Length = %lu\n", (end_offset - head_size));
-      return ret;
-    }
-  }
-
-  head.front = end_offset;
-
-  // Check if it is the end, then wrap around
-  if (head.front == head.queue_size) {
-    head.front = head_size;
-  }
-
-  CLS_LOG(1, "INFO: cls_queue_remove_entries: front offset is: %lu and tail offset is %lu\n", head.front, head.tail);
-
-  // We've reached the last element
-  if (head.front == head.tail) {
-    CLS_LOG(1, "INFO: cls_queue_remove_entries: Queue is empty now!\n");
-    head.is_empty = true;
-  }
-
-  //Update urgent data map
-  if (op.bl_urgent_data.length() > 0) {
-    head.bl_urgent_data = op.bl_urgent_data;
-  }
-
-  //Write head back
-  bufferlist bl_head;
-  encode(head, bl_head);
-  CLS_LOG(1, "INFO: cls_queue_remove_entries: Writing head of size: %u and front offset is: %lu\n", bl_head.length(), head.front);
-  ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
-  if (ret < 0) {
-    CLS_LOG(1, "INFO: cls_queue_remove_entries: Writing head returned error: %d \n", ret);
-    return ret;
-  }
-
-  return 0;
-}
-
-int cls_queue_get_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
-  //get head and its size
-  uint64_t head_size = 0;
-  cls_queue_head head;
-  int ret = get_queue_head_and_size(hctx, head, head_size);
-  if (ret < 0) {
-    return ret;
-  }
-
-  uint64_t data_size = 0, last_entry_offset = head.last_entry_offset;
-
-  //Check for data size wrap around
-  if ((head.queue_size - last_entry_offset) >= sizeof(uint64_t)) {
-    // Read the size from the end offset
-    bufferlist bl_size;
-    ret = cls_cxx_read(hctx, last_entry_offset, sizeof(uint64_t), &bl_size);
-    if (ret < 0) {
-      CLS_LOG(1, "ERROR: cls_queue_get_last_entry: failed to read data size \n");
-      return ret;
-    }
-    auto iter = bl_size.cbegin();
-    try {
-      decode(data_size, iter);
-    } catch (buffer::error& err) {
-      CLS_LOG(1, "ERROR: cls_queue_get_last_entry: failed to decode data size \n");
-      return -EINVAL;
-    }
-    last_entry_offset += sizeof(uint64_t);
-    //Check for data wrap around
-    if ((head.queue_size - (last_entry_offset + sizeof(uint64_t))) >= data_size) {
-      CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data is read from from last entry offset %lu\n", last_entry_offset);
-      ret = cls_cxx_read2(hctx, last_entry_offset, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
-      if (ret < 0) {
-        return ret;
-      }
-    } else {
-      CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data is read from from last entry offset %lu\n", last_entry_offset);
-      ret = cls_cxx_read2(hctx, last_entry_offset, (head.queue_size - (last_entry_offset + sizeof(uint64_t))), out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+  if (end_marker.offset > head.front.offset && end_marker.gen == head.front.gen) {
+    uint64_t len = end_marker.offset - head.front.offset;
+    if (len > 0) {
+      auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
       if (ret < 0) {
+        CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
+        CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head.front.to_str().c_str());
         return ret;
       }
-      uint64_t rem_size = data_size - (head.queue_size - (last_entry_offset + sizeof(uint64_t)));
-      bufferlist bl;
-      ret = cls_cxx_read(hctx, head_size, rem_size, &bl);
-      if (ret < 0) {
-        return ret;
-      }
-      out->claim_append(bl);
-    }
-  } else {
-    bufferlist bl_size;
-    ret = cls_cxx_read(hctx, last_entry_offset, (head.queue_size - last_entry_offset), &bl_size);
-    if (ret < 0) {
-      CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read first part of data size \n");
-      return ret;
-    }
-    uint64_t rem_size = sizeof(uint64_t) - (head.queue_size - last_entry_offset);
-    bufferlist bl;
-    ret = cls_cxx_read(hctx, head_size, rem_size, &bl);
-    if (ret < 0) {
-      CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read second part of data size \n");
-      return ret;
     }
-    bl_size.claim_append(bl);
-    auto iter = bl_size.cbegin();
-    try {
-      decode(data_size, iter);
-    } catch (buffer::error& err) {
-      CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n");
-      return -EINVAL;
-    }
-    last_entry_offset = head_size + rem_size;
-    CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data is read from from last entry offset %lu\n", last_entry_offset);
-    ret = cls_cxx_read2(hctx, last_entry_offset, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
-    if (ret < 0) {
-      return ret;
-    }
-  }
-
-  return 0;
-}
-
-int cls_queue_update_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
-  //get head and its size
-  uint64_t head_size = 0;
-  cls_queue_head head;
-  int ret = get_queue_head_and_size(hctx, head, head_size);
-  if (ret < 0) {
-    return ret;
-  }
-
-  auto in_iter = in->cbegin();
-
-  cls_queue_update_last_entry_op op;
-  try {
-    decode(op, in_iter);
-  } catch (buffer::error& err) {
-    CLS_LOG(1, "ERROR: cls_queue_update_last_entry: failed to decode input data\n");
-    return -EINVAL;
-  }
-
-  bufferlist bl;
-  uint64_t data_size = op.bl_data.length();
-  encode(data_size, bl);
-  bl.claim_append(op.bl_data);
-
-  CLS_LOG(1, "INFO: cls_queue_update_last_entry_op: Updating data at last offset: %lu and total data size is %u\n", head.last_entry_offset, bl.length());
-
-  // check if data can fit in the remaining space in queue
-  if ((head.last_entry_offset + bl.length()) <= head.queue_size) {
-    //write data size and data at tail offset
-    ret = cls_cxx_write2(hctx, head.last_entry_offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
-    if (ret < 0) {
-      return ret;
-    }
-  } else {
-    CLS_LOG(1, "INFO: Wrapping around and checking for free space\n");
-    uint64_t free_space_available = (head.queue_size - head.last_entry_offset) + (head.front - head_size);
-    //Split data if there is free space available
-    if (bl.length() <= free_space_available) {
-      uint64_t size_before_wrap = head.queue_size - head.last_entry_offset;
-      bufferlist bl_data_before_wrap;
-      bl.splice(0, size_before_wrap, &bl_data_before_wrap);
-      //write spliced (data size and data) at last entry offset
-      CLS_LOG(1, "INFO: cls_enqueue: Writing spliced data at offset: %lu and data size: %u\n", head.last_entry_offset, bl_data_before_wrap.length());
-      ret = cls_cxx_write2(hctx, head.last_entry_offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+  } else if ((head.front.offset >= end_marker.offset) && (end_marker.gen == head.front.gen + 1)) { //start offset > end offset
+    uint64_t len = head.queue_size - head.front.offset;
+    if (len > 0) {
+      auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
       if (ret < 0) {
+        CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
+        CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head.front.to_str().c_str());
         return ret;
       }
-      //write remaining data after wrapping around
-      CLS_LOG(1, "INFO: cls_enqueue: Writing remaining data at offset: %lu and data size: %u\n", head_size, bl.length());
-      ret = cls_cxx_write2(hctx, head_size, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+    }
+    len = end_marker.offset - head.max_head_size;
+    if (len > 0) {
+      auto ret = cls_cxx_write_zero(hctx, head.max_head_size, len);
       if (ret < 0) {
+        CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
+        CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu\n", head.max_head_size);
         return ret;
       }
     }
-    else {
-      CLS_LOG(1, "ERROR: No space left in queue\n");
-      // return queue full error
-      return -ENOSPC;
-    }
-  }
-
-  if (op.bl_urgent_data.length() > 0) {
-    head.bl_urgent_data = op.bl_urgent_data;
-  }
-
-  bufferlist bl_head;
-  encode(head, bl_head);
-  CLS_LOG(1, "INFO: cls_queue_update_last_entry: Writing head of size: %u \n", bl_head.length());
-  ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
-  if (ret < 0) {
-    CLS_LOG(1, "INFO: cls_queue_update_last_entry: Writing head returned error: %d \n", ret);
-    return ret;
-  }
-  return 0;
-}
-
-int cls_queue_read_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
-  //get head and its size
-  uint64_t head_size = 0;
-  cls_queue_head head;
-  int ret = get_queue_head_and_size(hctx, head, head_size);
-  if (ret < 0) {
-    return ret;
-  }
-
-  CLS_LOG(1, "INFO: cls_queue_read_urgent_data: tail offset %lu\n", head.tail);
-
-  cls_queue_urgent_data_ret op_ret;
-  
-  op_ret.bl_urgent_data = head.bl_urgent_data;
-  
-  encode(op_ret, *out);
-
-  return 0;
-}
-
-int cls_queue_write_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
-  //get head and its size
-  uint64_t head_size = 0;
-  cls_queue_head head;
-  int ret = get_queue_head_and_size(hctx, head, head_size);
-  if (ret < 0) {
-    return ret;
-  }
-
-  CLS_LOG(1, "INFO: cls_queue_write_urgent_data: tail offset %lu\n", head.tail);
-
-  auto in_iter = in->cbegin();
-
-  cls_queue_write_urgent_data_op op;
-  try {
-    decode(op, in_iter);
-  } catch (buffer::error& err) {
-    CLS_LOG(1, "ERROR: cls_queue_write_urgent_data: failed to decode input data\n");
+  } else if ((head.front.offset == end_marker.offset) && (head.front.gen == end_marker.gen)) {
+    //no-op
+  } else {
+    CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu\n", end_marker.to_str().c_str(), end_marker.gen);
     return -EINVAL;
   }
-  //Write urgent data
-  head.bl_urgent_data = op.bl_urgent_data;
-
-  //Write head back
-  bufferlist bl_head;
-  encode(head, bl_head);
-  CLS_LOG(1, "INFO: cls_queue_write_urgent_data: Writing head of size: %u\n", bl_head.length());
-  ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
-  if (ret < 0) {
-    CLS_LOG(1, "INFO: cls_queue_write_urgent_data: Writing head returned error: %d \n", ret);
-    return ret;
-  }
-
-  return 0;
-}
-
-int cls_queue_can_urgent_data_fit(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
-  bool can_fit = true;
-
-  //get head and its size
-  uint64_t head_size = 0;
-  cls_queue_head head;
-  int ret = get_queue_head_and_size(hctx, head, head_size);
-  if (ret < 0) {
-    return ret;
-  }
 
-  head.bl_urgent_data = *in;
+  head.front = end_marker;
 
-  bufferlist bl_head;
-  encode(head, bl_head);
-
-  if(bl_head.length() > head_size) {
-    can_fit = false;
+  // Check if it is the end, then wrap around
+  if (head.front.offset == head.queue_size) {
+    head.front.offset = head.max_head_size;
+    head.front.gen += 1;
   }
 
-  encode(can_fit, *out);
+  CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s\n", head.front.to_str().c_str(), head.tail.to_str().c_str());
 
   return 0;
 }
-
diff --git a/src/cls/queue/cls_queue_src.h b/src/cls/queue/cls_queue_src.h
new file mode 100644 (file)
index 0000000..2b225f6
--- /dev/null
@@ -0,0 +1,12 @@
+#ifndef CEPH_CLS_QUEUE_SRC_H
+#define CEPH_CLS_QUEUE_SRC_H
+
+int write_queue_head(cls_method_context_t hctx, cls_queue_head& head);
+int get_queue_head(cls_method_context_t hctx, cls_queue_head& head);
+int init_queue(cls_method_context_t hctx, const cls_queue_init_op& op);
+int get_queue_size(cls_method_context_t hctx, cls_queue_get_size_ret& op_ret);
+int enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head);
+int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head);
+int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head);
+
+#endif /* CEPH_CLS_QUEUE_SRC_H */
\ No newline at end of file
index b5374b28c1942c60d648594e9cb095b69207b211..33319d9a34cb233ef2940209047765b52c459602 100644 (file)
@@ -5,17 +5,3 @@
 #include "common/ceph_json.h"
 #include "include/utime.h"
 
-void cls_queue_head::dump(Formatter *f) const
-{
-  f->dump_bool("is_empty", is_empty);
-  f->dump_unsigned("front", front);
-  f->dump_unsigned("tail", tail);
-  f->dump_unsigned("size", queue_size);
-  f->dump_unsigned("has_urgent_data", front);
-}
-
-void cls_queue_head::generate_test_instances(list<cls_queue_head*>& ls)
-{
-  ls.push_back(new cls_queue_head);
-  ls.push_back(new cls_queue_head);
-}
index 96630ea65f9d588acc426bfcc1bd31b223a8e91d..0caa269ed252426bbbfcfe57fd7f60e635c8d37e 100644 (file)
@@ -1,6 +1,7 @@
 #ifndef CEPH_CLS_QUEUE_TYPES_H
 #define CEPH_CLS_QUEUE_TYPES_H
 
+#include <errno.h>
 #include "include/types.h"
 #include "common/ceph_time.h"
 #include "common/Formatter.h"
 //Actual start offset of queue data
 #define QUEUE_START_OFFSET_4K QUEUE_HEAD_SIZE_4K
 
+constexpr unsigned int QUEUE_HEAD_START = 0xDEAD;
+constexpr unsigned int QUEUE_ENTRY_START = 0xBEEF;
+
+struct cls_queue_marker
+{
+  uint64_t offset{0};
+  uint64_t gen{0};
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(gen, bl);
+    encode(offset, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(gen, bl);
+    decode(offset, bl);
+    DECODE_FINISH(bl);
+  }
+
+  string to_str() {
+    string marker = std::to_string(gen) + '/' + std::to_string(offset);
+    return marker;
+  }
+
+  int from_str(const char* str) {
+    errno = 0;
+    char* end = nullptr;
+    gen = ::strtoull(str, &end, 10);
+    if (errno) {
+      return errno;
+    }
+    if (str == end || *end != '/') { // expects delimiter
+      return -EINVAL;
+    }
+    str = end + 1;
+    offset = ::strtoull(str, &end, 10);
+    if (errno) {
+      return errno;
+    }
+    if (str == end || *end != 0) { // expects null terminator
+      return -EINVAL;
+    }
+    return 0;
+  }
+
+};
+WRITE_CLASS_ENCODER(cls_queue_marker)
+
 struct cls_queue_head
 {
-  uint64_t front = QUEUE_START_OFFSET_1K;
-  uint64_t tail = QUEUE_START_OFFSET_1K;
+  uint64_t max_head_size = QUEUE_HEAD_SIZE_1K;
+  cls_queue_marker front{QUEUE_START_OFFSET_1K, 0};
+  cls_queue_marker tail{QUEUE_START_OFFSET_1K, 0};
   uint64_t queue_size{0}; // size of queue requested by user, with head size added to it
-  uint64_t last_entry_offset = QUEUE_START_OFFSET_1K;
-  bool is_empty{true};
-  bool has_urgent_data{false};
+  uint64_t max_urgent_data_size{0};
   bufferlist bl_urgent_data;  // special data known to application using queue
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
+    encode(max_head_size, bl);
     encode(front, bl);
     encode(tail, bl);
     encode(queue_size, bl);
-    encode(last_entry_offset, bl);
-    encode(is_empty, bl);
-    encode(has_urgent_data, bl);
+    encode(max_urgent_data_size, bl);
     encode(bl_urgent_data, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
+    decode(max_head_size, bl);
     decode(front, bl);
     decode(tail, bl);
     decode(queue_size, bl);
-    decode(last_entry_offset, bl);
-    decode(is_empty, bl);
-    decode(has_urgent_data, bl);
+    decode(max_urgent_data_size, bl);
     decode(bl_urgent_data, bl);
     DECODE_FINISH(bl);
   }
-
-  void dump(Formatter *f) const;
-  static void generate_test_instances(list<cls_queue_head*>& o);
 };
 WRITE_CLASS_ENCODER(cls_queue_head)
 
index 563d8213d4d077eaa94525534445dc793adf201c..bea69fa6617185078b0ca181c2393b153b8d3c9d 100644 (file)
 #include "cls/queue/cls_queue_ops.h"
 #include "cls/queue/cls_rgw_queue_ops.h"
 #include "cls/queue/cls_queue_const.h"
-#include "cls/queue/cls_queue.h"
+#include "cls/queue/cls_queue_src.h"
 
 #include <boost/lexical_cast.hpp>
 #include <unordered_map>
+#include <sstream>
 
 #include "common/ceph_context.h"
 #include "global/global_context.h"
 CLS_VER(1,0)
 CLS_NAME(rgw_queue)
 
-static int cls_gc_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
-  auto in_iter = in->cbegin();
-
-  cls_gc_create_queue_op op;
-  try {
-    decode(op, in_iter);
-  } catch (buffer::error& err) {
-    CLS_LOG(1, "ERROR: cls_gc_create_queue: failed to decode entry\n");
-    return -EINVAL;
-  }
-
-  cls_gc_urgent_data urgent_data;
-  urgent_data.num_urgent_data_entries = op.num_urgent_data_entries;
-
-  cls_create_queue_op create_op;
-
-  CLS_LOG(10, "INFO: cls_gc_create_queue: queue size is %lu\n", op.size);
-  CLS_LOG(10, "INFO: cls_gc_create_queue: queue name is %s\n", op.name.c_str());
-  create_op.head.queue_size = op.size;
-  create_op.head_size = g_ceph_context->_conf->rgw_gc_queue_head_size;
-  create_op.head.has_urgent_data = true;
-  encode(urgent_data, create_op.head.bl_urgent_data);
-
-  in->clear();
-  encode(create_op, *in);
-
-  return cls_create_queue(hctx, in, out);
-}
-
 static int cls_gc_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   auto in_iter = in->cbegin();
@@ -74,25 +45,21 @@ static int cls_gc_init_queue(cls_method_context_t hctx, bufferlist *in, bufferli
   cls_gc_urgent_data urgent_data;
   urgent_data.num_urgent_data_entries = op.num_urgent_data_entries;
 
-  cls_init_queue_op init_op;
+  cls_queue_init_op init_op;
 
   CLS_LOG(10, "INFO: cls_gc_init_queue: queue size is %lu\n", op.size);
   CLS_LOG(10, "INFO: cls_gc_init_queue: queue name is %s\n", op.name.c_str());
   init_op.head.queue_size = op.size;
   init_op.head_size = g_ceph_context->_conf->rgw_gc_queue_head_size;
-  init_op.head.has_urgent_data = true;
+  init_op.has_urgent_data = true;
   encode(urgent_data, init_op.head.bl_urgent_data);
 
-  in->clear();
-  encode(init_op, *in);
-
-  return cls_init_queue(hctx, in, out);
+  return init_queue(hctx, init_op);
 }
 
 static int cls_gc_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   auto in_iter = in->cbegin();
-
   cls_rgw_gc_set_entry_op op;
   try {
     decode(op, in_iter);
@@ -104,24 +71,33 @@ static int cls_gc_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist
   op.info.time = ceph::real_clock::now();
   op.info.time += make_timespan(op.expiration_secs);
 
-  cls_enqueue_op enqueue_op;
+  //get head
+  cls_queue_head head;
+  int ret = get_queue_head(hctx, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  cls_queue_enqueue_op enqueue_op;
   bufferlist bl_data;
   encode(op.info, bl_data);
   enqueue_op.bl_data_vec.emplace_back(bl_data);
 
   CLS_LOG(1, "INFO: cls_gc_enqueue: Data size is: %u \n", bl_data.length());
 
-  in->clear();
-  encode(enqueue_op, *in);
+  ret = enqueue(hctx, enqueue_op, head);
+  if (ret < 0) {
+    return ret;
+  }
 
-  return cls_enqueue(hctx, in, out);
+  //Write back head
+  return write_queue_head(hctx, head);
 }
 
 static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   CLS_LOG(1, "INFO: cls_gc_queue_list(): Entered cls_gc_queue_list \n");
   auto in_iter = in->cbegin();
-
   cls_rgw_gc_list_op op;
   try {
     decode(op, in_iter);
@@ -130,55 +106,46 @@ static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferli
     return -EINVAL;
   }
 
-  cls_queue_list_op list_op;
-  if (op.marker.empty()) {
-    list_op.start_offset = 0;
-  } else {
-    list_op.start_offset = boost::lexical_cast<uint64_t>(op.marker.c_str());
+  cls_queue_head head;
+  auto ret = get_queue_head(hctx, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  cls_gc_urgent_data urgent_data;
+  if (head.bl_urgent_data.length() > 0) {
+    auto iter_urgent_data = head.bl_urgent_data.cbegin();
+    try {
+      decode(urgent_data, iter_urgent_data);
+    } catch (buffer::error& err) {
+      CLS_LOG(5, "ERROR: cls_gc_queue_list(): failed to decode urgent data\n");
+      return -EINVAL;
+    }
   }
 
+  cls_queue_list_op list_op;
   if (! op.max) {
     op.max = GC_LIST_DEFAULT_MAX;
   }
   
   list_op.max = op.max;
+  list_op.start_marker = op.marker;
 
-  cls_queue_list_ret op_ret;
   cls_rgw_gc_list_ret list_ret;
   uint32_t num_entries = 0; //Entries excluding the deferred ones
-  bool urgent_data_decoded = false;
-  cls_gc_urgent_data urgent_data;
+  bool is_truncated = true;
+  string next_marker;
   do {
-    in->clear();
-    encode(list_op, *in);
-
-    CLS_LOG(1, "INFO: cls_gc_queue_list(): Entering cls_queue_list_entries \n");
-    int ret = cls_queue_list_entries(hctx, in, out);
+    CLS_LOG(1, "INFO: cls_gc_queue_list(): Entering queue_list_entries \n");
+    cls_queue_list_ret op_ret;
+    int ret = queue_list_entries(hctx, list_op, op_ret, head);
     if (ret < 0) {
-      CLS_LOG(1, "ERROR: cls_queue_list_entries(): returned error %d\n", ret);
+      CLS_LOG(1, "ERROR: queue_list_entries(): returned error %d\n", ret);
       return ret;
     }
-
-    auto iter = out->cbegin();
-    try {
-      decode(op_ret, iter);
-    } catch (buffer::error& err) {
-      CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode output\n");
-      return -EINVAL;
-    }
-
-    //Each cls_queue_list_entries will fetch the same urgent data, decode it only once
-    if (! urgent_data_decoded) {
-        auto iter_urgent_data = op_ret.bl_urgent_data.cbegin();
-        try {
-          decode(urgent_data, iter_urgent_data);
-          urgent_data_decoded = true;
-        } catch (buffer::error& err) {
-          CLS_LOG(5, "ERROR: cls_gc_queue_list(): failed to decode urgent data\n");
-          return -EINVAL;
-        }
-    }
-
+    is_truncated = op_ret.is_truncated;
+    next_marker = op_ret.next_marker;
+  
     if (op_ret.data.size()) {
       for (auto it : op_ret.data) {
         cls_rgw_gc_obj_info info;
@@ -190,18 +157,21 @@ static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferli
         }
         bool found = false;
         //Check for info tag in urgent data map
-        if (urgent_data.urgent_data_map.size() > 0) {
-          auto it = urgent_data.urgent_data_map.find(info.tag);
-          if (it != urgent_data.urgent_data_map.end()) {
-            found = true;
-            if (it->second > info.time) {
-              CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in urgent data: %s\n", info.tag.c_str());
-              continue;
-            }
+        auto iter = urgent_data.urgent_data_map.find(info.tag);
+        if (iter != urgent_data.urgent_data_map.end()) {
+          found = true;
+          stringstream ss1, ss2;
+          ss1 << iter->second;
+          ss2 << info.time;
+          if (iter->second > info.time) {
+            CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in urgent data: %s\n", info.tag.c_str());
+            CLS_LOG(1, "INFO: cls_gc_queue_list(): time found in urgent data: %s\n", ss1.str().c_str());
+            CLS_LOG(1, "INFO: cls_gc_queue_list(): time found in queue data: %s\n", ss2.str().c_str());
+            continue;
           }
         }
         //Search in xattrs
-        if (! found) {
+        if (! found && urgent_data.num_xattr_urgent_entries > 0) {
           bufferlist bl_xattrs;
           int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
           if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
@@ -217,15 +187,13 @@ static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferli
               CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode xattrs urgent data map\n");
               return -EINVAL;
             } //end - catch
-            if (xattr_urgent_data_map.size() > 0) {
-              auto it = xattr_urgent_data_map.find(info.tag);
-              if (it != xattr_urgent_data_map.end()) {
-                if (it->second > info.time) {
-                  CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
-                  continue;
-                }
+            auto xattr_iter = xattr_urgent_data_map.find(info.tag);
+            if (xattr_iter != xattr_urgent_data_map.end()) {
+              if (xattr_iter->second > info.time) {
+                CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
+                continue;
               }
-            } // end - if xattrs size ...
+            }
           } // end - ret != ENOENT && ENODATA
         } // end - if not found
         if (op.expired_only) {
@@ -242,7 +210,7 @@ static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferli
       CLS_LOG(1, "INFO: cls_gc_queue_list(): num_entries: %u and op.max: %u\n", num_entries, op.max);
       if (num_entries < op.max) {
         list_op.max = (op.max - num_entries);
-        list_op.start_offset = op_ret.next_offset;
+        list_op.start_marker = op_ret.next_marker;
         out->clear();
       } else {
         //We've reached the max number of entries needed
@@ -252,11 +220,11 @@ static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferli
       //We dont have data to process
       break;
     }
-  } while(op_ret.is_truncated);
+  } while(is_truncated);
 
-  list_ret.truncated = op_ret.is_truncated;
+  list_ret.truncated = is_truncated;
   if (list_ret.truncated) {
-    list_ret.next_marker = boost::lexical_cast<string>(op_ret.next_offset);
+    list_ret.next_marker = next_marker;
   }
   out->clear();
   encode(list_ret, *out);
@@ -277,55 +245,43 @@ static int cls_gc_queue_remove(cls_method_context_t hctx, bufferlist *in, buffer
     return -EINVAL;
   }
 
-  // List entries and calculate total number of entries (including invalid entries)
-  cls_queue_list_op list_op;
-  if (op.marker.empty()) {
-    list_op.start_offset = 0;
-  } else {
-    list_op.start_offset = boost::lexical_cast<uint64_t>(op.marker.c_str());
+  cls_queue_head head;
+  auto ret = get_queue_head(hctx, head);
+  if (ret < 0) {
+    return ret;
   }
 
+  cls_gc_urgent_data urgent_data;
+  if (head.bl_urgent_data.length() > 0) {
+    auto iter_urgent_data = head.bl_urgent_data.cbegin();
+    try {
+      decode(urgent_data, iter_urgent_data);
+    } catch (buffer::error& err) {
+      CLS_LOG(5, "ERROR: cls_gc_queue_remove(): failed to decode urgent data\n");
+      return -EINVAL;
+    }
+  }
+
+  // List entries and calculate total number of entries (including invalid entries)
   if (! op.num_entries) {
     op.num_entries = GC_LIST_DEFAULT_MAX;
   }
-  
-  list_op.max = op.num_entries;
+  cls_queue_list_op list_op;
+  list_op.max = op.num_entries + 1; // +1 to get the offset of last + 1 entry
   bool is_truncated = true;
   uint32_t total_num_entries = 0, num_entries = 0;
-  cls_gc_urgent_data urgent_data;
-  bool urgent_data_decoded = false;
-  uint64_t end_offset = 0;
+  string end_marker;
   do {
-    in->clear();
-    encode(list_op, *in);
-
-    CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering cls_queue_list_entries \n");
-    int ret = cls_queue_list_entries(hctx, in, out);
+    CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering queue_list_entries \n");
+    cls_queue_list_ret op_ret;
+    int ret = queue_list_entries(hctx, list_op, op_ret, head);
     if (ret < 0) {
-      CLS_LOG(1, "ERROR: cls_queue_list_entries(): returned error %d\n", ret);
+      CLS_LOG(1, "ERROR: queue_list_entries(): returned error %d\n", ret);
       return ret;
     }
 
-    cls_queue_list_ret op_ret;
-    auto iter = out->cbegin();
-    try {
-      decode(op_ret, iter);
-    } catch (buffer::error& err) {
-      CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode output\n");
-      return -EINVAL;
-    }
     is_truncated = op_ret.is_truncated;
     unsigned int index = 0;
-    if (! urgent_data_decoded) {
-      auto iter_urgent_data = op_ret.bl_urgent_data.cbegin();
-      try {
-        decode(urgent_data, iter_urgent_data);
-        urgent_data_decoded = true;
-      } catch (buffer::error& err) {
-        CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode urgent data\n");
-        return -EINVAL;
-      }
-    }
     // If data is not empty
     if (op_ret.data.size()) {
       for (auto it : op_ret.data) {
@@ -341,21 +297,19 @@ static int cls_gc_queue_remove(cls_method_context_t hctx, bufferlist *in, buffer
         index++;
         bool found = false;
         //Search for tag in urgent data map
-        if (urgent_data.urgent_data_map.size() > 0) {
-          auto it = urgent_data.urgent_data_map.find(info.tag);
-          if (it != urgent_data.urgent_data_map.end()) {
-            found = true;
-            if (it->second > info.time) {
-              CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in urgent data: %s\n", info.tag.c_str());
-              continue;
-            } else if (it->second == info.time) {
-              CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from urgent data: %s\n", info.tag.c_str());
-              urgent_data.urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later from queue
-              urgent_data.num_head_urgent_entries -= 1;
-            }
-          }//end-if map end
-        }//end-if urgent data
-        if (! found) {
+        auto iter = urgent_data.urgent_data_map.find(info.tag);
+        if (iter != urgent_data.urgent_data_map.end()) {
+          found = true;
+          if (iter->second > info.time) {
+            CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in urgent data: %s\n", info.tag.c_str());
+            continue;
+          } else if (iter->second == info.time) {
+            CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from urgent data: %s\n", info.tag.c_str());
+            urgent_data.urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later from queue
+            urgent_data.num_head_urgent_entries -= 1;
+          }
+        }//end-if map end
+        if (! found && urgent_data.num_xattr_urgent_entries > 0) {
           //Search in xattrs
           bufferlist bl_xattrs;
           int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
@@ -372,31 +326,34 @@ static int cls_gc_queue_remove(cls_method_context_t hctx, bufferlist *in, buffer
               CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode xattrs urgent data map\n");
               return -EINVAL;
             } //end - catch
-            if (xattr_urgent_data_map.size() > 0) {
-              auto found = xattr_urgent_data_map.find(info.tag);
-              if (found != xattr_urgent_data_map.end()) {
-                if (found->second > info.time) {
-                  CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
-                  continue;
-                } else if (found->second == info.time) {
-                  CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from xattrs urgent data: %s\n", info.tag.c_str());
-                  xattr_urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later
-                  urgent_data.num_xattr_urgent_entries -= 1;
-                }
+            auto xattr_iter = xattr_urgent_data_map.find(info.tag);
+            if (xattr_iter != xattr_urgent_data_map.end()) {
+              if (xattr_iter->second > info.time) {
+                CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
+                continue;
+              } else if (xattr_iter->second == info.time) {
+                CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from xattrs urgent data: %s\n", info.tag.c_str());
+                xattr_urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later
+                urgent_data.num_xattr_urgent_entries -= 1;
               }
-            } // end - if xattrs size ...
+            }
           } // end - ret != ENOENT && ENODATA
         }// search in xattrs
         num_entries++;
       }//end-for
       
-      if (num_entries < op.num_entries) {
-        list_op.max = (op.num_entries - num_entries);
-        list_op.start_offset = op_ret.next_offset;
+      if (! op_ret.is_truncated && num_entries < (op.num_entries + 1)) {
+        end_marker = op_ret.next_marker;
+        CLS_LOG(1, "INFO: cls_gc_queue_remove(): truncated and end offset is %s\n", end_marker.c_str());
+        break;
+      }
+      if (num_entries < (op.num_entries + 1)) {
+        list_op.max = ((op.num_entries + 1) - num_entries);
+        list_op.start_marker = op_ret.next_marker;
         out->clear();
       } else {
-        end_offset = op_ret.offsets[index - 1];
-        CLS_LOG(1, "INFO: cls_gc_queue_remove(): index is %u and end_offset is: %lu\n", index, end_offset);
+        end_marker = op_ret.markers[index - 1];
+        CLS_LOG(1, "INFO: cls_gc_queue_remove(): index is %u and end_offset is: %s\n", index, end_marker.c_str());
         break;
       }
     } //end-if
@@ -404,33 +361,26 @@ static int cls_gc_queue_remove(cls_method_context_t hctx, bufferlist *in, buffer
       break;
     }
   } while(is_truncated);
+
   CLS_LOG(1, "INFO: cls_gc_queue_remove(): Total number of entries to remove: %d\n", total_num_entries);
+  CLS_LOG(1, "INFO: cls_gc_queue_remove(): End offset is %s\n", end_marker.c_str());
 
-  if (end_offset != 0) {
+  if (! end_marker.empty()) {
     cls_queue_remove_op rem_op;
-    if (op.marker.empty()) {
-      rem_op.start_offset = 0;
-    } else {
-      rem_op.start_offset = boost::lexical_cast<uint64_t>(op.marker.c_str());
-    }
-
-    rem_op.end_offset = end_offset;
-    CLS_LOG(1, "INFO: cls_gc_queue_remove(): start offset: %lu and end offset: %lu\n", rem_op.start_offset, rem_op.end_offset);
+    rem_op.end_marker = end_marker;
 
-    encode(urgent_data, rem_op.bl_urgent_data);
-
-    in->clear();
-    encode(rem_op, *in);
-
-    CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering cls_queue_remove_entries \n");
-    int ret = cls_queue_remove_entries(hctx, in, out);
+    CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering queue_remove_entries \n");
+    int ret = queue_remove_entries(hctx, rem_op, head);
     if (ret < 0) {
-      CLS_LOG(1, "ERROR: cls_queue_remove_entries(): returned error %d\n", ret);
+      CLS_LOG(1, "ERROR: queue_remove_entries(): returned error %d\n", ret);
       return ret;
     }
   }
 
-  return 0;
+  //Update urgent data map
+  encode(urgent_data, head.bl_urgent_data);
+
+  return write_queue_head(hctx, head);
 }
 
 static int cls_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
@@ -449,53 +399,22 @@ static int cls_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in,
   op.info.time = ceph::real_clock::now();
   op.info.time += make_timespan(op.expiration_secs);
 
-  //Read urgent data
-  in->clear();
-  out->clear();
-  
-  ret = cls_queue_read_urgent_data(hctx, in, out);
-  
-  auto out_iter = out->cbegin();
-
-  cls_queue_urgent_data_ret op_ret;
-  try {
-    decode(op_ret, out_iter);
-  } catch (buffer::error& err) {
-    CLS_LOG(1, "ERROR: cls_queue_urgent_data_ret(): failed to decode ouput\n");
-    return -EINVAL;
-  }
-
-  auto bl_iter = op_ret.bl_urgent_data.cbegin();
-  cls_gc_urgent_data urgent_data;
-  try {
-    decode(urgent_data, bl_iter);
-  } catch (buffer::error& err) {
-    CLS_LOG(1, "ERROR: cls_queue_urgent_data_ret(): failed to decode urgent data\n");
-    return -EINVAL;
-  }
-
-  bool is_last_entry = false;
-  in->clear();
-  out->clear();
-  ret = cls_queue_get_last_entry(hctx, in, out);
+  // Read head
+  cls_queue_head head;
+  ret = get_queue_head(hctx, head);
   if (ret < 0) {
     return ret;
   }
 
-  cls_rgw_gc_obj_info info;
-  auto iter = out->cbegin();
+  auto bl_iter = head.bl_urgent_data.cbegin();
+  cls_gc_urgent_data urgent_data;
   try {
-    decode(info, iter);
+    decode(urgent_data, bl_iter);
   } catch (buffer::error& err) {
-    CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode entry\n");
+    CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode urgent data\n");
     return -EINVAL;
   }
 
-  CLS_LOG(1, "INFO: tag of gc info is %s\n", info.tag.c_str());
-  if (info.tag == op.info.tag) {
-    is_last_entry = true;
-  }
-
   //has_urgent_data signifies whether urgent data in queue has changed
   bool has_urgent_data = false, tag_found = false;
   //search in unordered map in head
@@ -517,49 +436,39 @@ static int cls_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in,
       try {
         decode(xattr_urgent_data_map, iter);
       } catch (buffer::error& err) {
-        CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode xattrs urgent data map\n");
+        CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode xattrs urgent data map\n");
         return -EINVAL;
       } //end - catch
-      if (xattr_urgent_data_map.size() > 0) {
-        auto found = xattr_urgent_data_map.find(info.tag);
-        if (found != xattr_urgent_data_map.end()) {
-          it->second = op.info.time;
-          tag_found = true;
-          //write the updated map back
-          bufferlist bl_map;
-          encode(xattr_urgent_data_map, bl_map);
-          ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map);
-          CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data");
-          if (ret < 0) {
-            CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret);
-            return ret;
-          }
+      auto xattr_iter = xattr_urgent_data_map.find(op.info.tag);
+      if (xattr_iter != xattr_urgent_data_map.end()) {
+        it->second = op.info.time;
+        tag_found = true;
+        //write the updated map back
+        bufferlist bl_map;
+        encode(xattr_urgent_data_map, bl_map);
+        ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map);
+        CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data");
+        if (ret < 0) {
+          CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret);
+          return ret;
         }
-      } // end - if xattrs size ...
+      }
     }// end ret != ENOENT ...
   }
 
-  bufferlist bl_urgent_data;
   if (! tag_found) {
+    stringstream ss1;
+    ss1 << op.info.time;
+    CLS_LOG(1, "INFO: cls_gc_queue_update_entry(): time inserted in urgent data: %s\n", ss1.str().c_str());
     //try inserting in queue head
     urgent_data.urgent_data_map.insert({op.info.tag, op.info.time});
     urgent_data.num_head_urgent_entries += 1;
     has_urgent_data = true;
 
-    //check if urgent data can fit in head
-    out->clear();
-    bool can_fit = false;
+    bufferlist bl_urgent_data;
     encode(urgent_data, bl_urgent_data);
-    ret = cls_queue_can_urgent_data_fit(hctx, &bl_urgent_data, out);
-    if (ret < 0) {
-      return ret;
-    }
-    iter = out->cbegin();
-    decode(can_fit, iter);
-    CLS_LOG(1, "INFO: Can urgent data fit: %d \n", can_fit);
-
     //insert as xattrs
-    if (! can_fit) {
+    if (bl_urgent_data.length() > head.max_urgent_data_size) {
       //remove inserted entry from urgent data
       urgent_data.urgent_data_map.erase(op.info.tag);
       urgent_data.num_head_urgent_entries -= 1;
@@ -603,38 +512,26 @@ static int cls_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in,
     return -ENOSPC;
   }
 
-  bl_urgent_data.clear();
-  encode(urgent_data, bl_urgent_data);
-  in->clear();
-  if (! is_last_entry) {
-    cls_enqueue_op enqueue_op;
-    bufferlist bl_data;
-    encode(op.info, bl_data);
-    enqueue_op.bl_data_vec.emplace_back(bl_data);
-    CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data.length());
-    if (has_urgent_data) {
-      enqueue_op.bl_urgent_data = bl_urgent_data;
-    }
-    encode(enqueue_op, *in);
-    ret = cls_enqueue(hctx, in, out);
-    if (ret < 0) {
-      return ret;
-    }
-  } else {
-    cls_queue_update_last_entry_op update_op;
-    encode(op.info, update_op.bl_data);
-    CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", update_op.bl_data.length());
-    if (has_urgent_data) {
-      update_op.bl_urgent_data = bl_urgent_data;
-    }
-    encode(update_op, *in);
-    ret = cls_queue_update_last_entry(hctx, in, out);
-    if (ret < 0) {
-      return ret;
-    }
+  cls_queue_enqueue_op enqueue_op;
+  bufferlist bl_data;
+  stringstream ss1;
+  ss1 << op.info.time;
+  CLS_LOG(1, "INFO: cls_gc_queue_update_entry(): time inserted in queue: %s\n", ss1.str().c_str());
+  encode(op.info, bl_data);
+  enqueue_op.bl_data_vec.emplace_back(bl_data);
+  CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data.length());
+  
+  ret = enqueue(hctx, enqueue_op, head);
+  if (ret < 0) {
+    return ret;
   }
 
-  return 0;
+  if (has_urgent_data) {
+    head.bl_urgent_data.clear();
+    encode(urgent_data, head.bl_urgent_data);
+  }
+
+  return write_queue_head(hctx, head);
 }
 
 CLS_INIT(rgw_queue)
@@ -642,7 +539,6 @@ CLS_INIT(rgw_queue)
   CLS_LOG(1, "Loaded rgw queue class!");
 
   cls_handle_t h_class;
-  cls_method_handle_t h_gc_create_queue;
   cls_method_handle_t h_gc_init_queue;
   cls_method_handle_t h_gc_enqueue;
   cls_method_handle_t h_gc_queue_list_entries;
@@ -652,7 +548,6 @@ CLS_INIT(rgw_queue)
   cls_register(RGW_QUEUE_CLASS, &h_class);
 
   /* gc */
-  cls_register_cxx_method(h_class, GC_CREATE_QUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_create_queue, &h_gc_create_queue);
   cls_register_cxx_method(h_class, GC_INIT_QUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_init_queue, &h_gc_init_queue);
   cls_register_cxx_method(h_class, GC_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_enqueue, &h_gc_enqueue);
   cls_register_cxx_method(h_class, GC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_gc_queue_list, &h_gc_queue_list_entries);
index e2b3ce3405ee79dae4ec2e09de091e0fb3c19074..78ea1af21b0fbe6c72a578fdd364d4a947d0c705 100644 (file)
@@ -4,12 +4,12 @@
 #include "cls/rgw/cls_rgw_types.h"
 #include "cls/rgw/cls_rgw_ops.h"
 
-struct cls_gc_create_queue_op {
+struct cls_gc_init_queue_op {
   uint64_t size;
   uint64_t num_urgent_data_entries{0};
   string name; //for debugging, to be removed later
 
-  cls_gc_create_queue_op() {}
+  cls_gc_init_queue_op() {}
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
@@ -26,31 +26,24 @@ struct cls_gc_create_queue_op {
     decode(name, bl);
     DECODE_FINISH(bl);
   }
-};
-WRITE_CLASS_ENCODER(cls_gc_create_queue_op)
-
-struct cls_gc_init_queue_op : cls_gc_create_queue_op {
 
 };
 WRITE_CLASS_ENCODER(cls_gc_init_queue_op)
 
 struct cls_rgw_gc_queue_remove_op {
   uint64_t num_entries;
-  string marker;
 
   cls_rgw_gc_queue_remove_op() {}
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     encode(num_entries, bl);
-    encode(marker, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
     decode(num_entries, bl);
-    decode(marker, bl);
     DECODE_FINISH(bl);
   }
 };
index aca2f29143a87897f4d316be3af2055ecf2c3049..fbc7b4589eed642038aa82f2ef657978da761da4 100644 (file)
@@ -44,15 +44,10 @@ void RGWGC::initialize(CephContext *_cct, RGWRados *_store) {
   for (int i = 0; i < max_objs; i++) {
     ldpp_dout(this, 20) << "RGWGC::initialize initing gc queue with name = " << obj_names[i] << dendl;
     librados::ObjectWriteOperation op;
-    op.assert_exists();
+    op.create(true);
     uint64_t queue_size = 1048576, num_urgent_data_entries = 50;
     cls_rgw_gc_init_queue(op, obj_names[i], queue_size, num_urgent_data_entries);
-    int ret = store->gc_operate(obj_names[i], &op);
-    if (ret == -ENOENT) {
-      ldpp_dout(this, 20) << "RGWGC::initialize creating gc queue with name = " << obj_names[i] << dendl;
-      cls_rgw_gc_create_queue(op, obj_names[i], queue_size, num_urgent_data_entries);
-      store->gc_operate(obj_names[i], &op);
-    }
+    store->gc_operate(obj_names[i], &op);
   }
 }
 
@@ -115,10 +110,10 @@ int RGWGC::remove(int index, const std::vector<string>& tags, AioCompletion **pc
   return store->gc_aio_operate(obj_names[index], &op, pc);
 }
 
-int RGWGC::remove(int index, string& marker, int num_entries, librados::AioCompletion **pc)
+int RGWGC::remove(int index, int num_entries, librados::AioCompletion **pc)
 {
   ObjectWriteOperation op;
-  cls_rgw_gc_remove_entries_queue(op, marker, num_entries);
+  cls_rgw_gc_remove_entries_queue(op, num_entries);
   return store->gc_aio_operate(obj_names[index], &op, pc);
 }
 
@@ -343,11 +338,11 @@ public:
     }
   }
 
-  int remove_queue_entries(int index, string& marker, int num_entries) {
+  int remove_queue_entries(int index, int num_entries) {
     IO index_io;
     index_io.type = IO::IndexIO;
     index_io.index = index;
-    int ret = gc->remove(index, marker, num_entries, &index_io.c);
+    int ret = gc->remove(index, num_entries, &index_io.c);
     if (ret < 0) {
       ldpp_dout(dpp, 0) << "WARNING: failed to remove queue entries on index=" <<
            index << " ret=" << ret << dendl;
@@ -478,7 +473,7 @@ int RGWGC::process(int index, int max_secs, bool expired_only,
     if (entries.size() > 0) {
       //Remove the entries from the queue
       ldpp_dout(this, 5) << "RGWGC::process removing entries, marker: " << marker << dendl;
-      ret = io_manager.remove_queue_entries(index, marker, entries.size());
+      ret = io_manager.remove_queue_entries(index, entries.size());
       if (ret < 0) {
         ldpp_dout(this, 0) <<
           "WARNING: failed to remove queue entries" << dendl;
index 3624b3d0a465cdba2eab03a95516d5849caa9f20..6311fa98a97206151c9a8fba87e0082dd1d2f5ea 100644 (file)
@@ -52,7 +52,7 @@ public:
   int send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync);
   int defer_chain(const string& tag, cls_rgw_obj_chain& info, bool sync);
   int remove(int index, const std::vector<string>& tags, librados::AioCompletion **pc);
-  int remove(int index, string& marker, int num_entries, librados::AioCompletion **pc);
+  int remove(int index, int num_entries, librados::AioCompletion **pc);
 
   void initialize(CephContext *_cct, RGWRados *_store);
   void finalize();
index 521a18b997c8676cad86485e63eb669d5609e8dc..b8d4bae1f17a26d364bcba5ca1934e93e535ab5a 100644 (file)
@@ -56,20 +56,14 @@ static void create_obj(cls_rgw_obj& obj, int i, int j)
   obj.loc.append(buf);
 }
 
-static bool cmp_objs(cls_rgw_obj& obj1, cls_rgw_obj& obj2)
-{
-  return (obj1.pool == obj2.pool) &&
-         (obj1.key == obj2.key) &&
-         (obj1.loc == obj2.loc);
-}
-
 TEST(cls_queue, gc_queue_ops1)
 {
   //Testing queue ops when data size is NOT a multiple of queue size
   string queue_name = "my-queue";
-  uint64_t queue_size = 320, num_urgent_data_entries = 10;
+  uint64_t queue_size = 322, num_urgent_data_entries = 10;
   librados::ObjectWriteOperation op;
-  cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+  op.create(true);
+  cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
 
   uint64_t size = 0;
@@ -102,7 +96,7 @@ TEST(cls_queue, gc_queue_ops1)
   librados::ObjectWriteOperation remove_op;
   string marker1;
   uint64_t num_entries = 1;
-  cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries);
+  cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
 
   //Test enqueue again
@@ -140,9 +134,10 @@ TEST(cls_queue, gc_queue_ops2)
 {
   //Testing list queue
   string queue_name = "my-second-queue";
-  uint64_t queue_size = 330, num_urgent_data_entries = 10;
+  uint64_t queue_size = 334, num_urgent_data_entries = 10;
   librados::ObjectWriteOperation op;
-  cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+  op.create(true);
+  cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
 
   uint64_t size = 0;
@@ -215,9 +210,10 @@ TEST(cls_queue, gc_queue_ops3)
 {
   //Testing remove queue entries
   string queue_name = "my-third-queue";
-  uint64_t queue_size = 495, num_urgent_data_entries = 10;
+  uint64_t queue_size = 501, num_urgent_data_entries = 10;
   librados::ObjectWriteOperation op;
-  cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+  op.create(true);
+  cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
 
   uint64_t size = 0;
@@ -229,8 +225,8 @@ TEST(cls_queue, gc_queue_ops3)
   librados::ObjectWriteOperation remove_op;
   string marker1;
   uint64_t num_entries = 2;
-  cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries);
-  ASSERT_EQ(-ENOENT, ioctx.operate(queue_name, &remove_op));
+  cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
 
   cls_rgw_gc_obj_info defer_info;
 
@@ -280,7 +276,7 @@ TEST(cls_queue, gc_queue_ops3)
 
   //Test remove entries
   num_entries = 2;
-  cls_rgw_gc_remove_entries_queue(remove_op,  marker1, num_entries);
+  cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
 
   //Test list queue again
@@ -293,9 +289,10 @@ TEST(cls_queue, gc_queue_ops4)
 {
   //Testing remove queue entries
   string queue_name = "my-fourth-queue";
-  uint64_t queue_size = 495, num_urgent_data_entries = 10;
+  uint64_t queue_size = 501, num_urgent_data_entries = 10;
   librados::ObjectWriteOperation op;
-  cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+  op.create(true);
+  cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
 
   uint64_t size = 0;
@@ -308,8 +305,8 @@ TEST(cls_queue, gc_queue_ops4)
   string marker1;
   uint64_t num_entries = 2;
 
-  cls_rgw_gc_remove_entries_queue(remove_op,  marker1, num_entries);
-  ASSERT_EQ(-ENOENT, ioctx.operate(queue_name, &remove_op));
+  cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
 
   cls_rgw_gc_obj_info defer_info;
 
@@ -353,7 +350,7 @@ TEST(cls_queue, gc_queue_ops4)
 
   //Test remove entries
   num_entries = 2;
-  cls_rgw_gc_remove_entries_queue(remove_op,  marker1, num_entries);
+  cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
 
   //Test list queue again
@@ -366,9 +363,10 @@ TEST(cls_queue, gc_queue_ops5)
 {
   //Testing remove queue entries
   string queue_name = "my-fifth-queue";
-  uint64_t queue_size = 495, num_urgent_data_entries = 10;
+  uint64_t queue_size = 501, num_urgent_data_entries = 10;
   librados::ObjectWriteOperation op;
-  cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+  op.create(true);
+  cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
 
   uint64_t size = 0;
@@ -414,7 +412,7 @@ TEST(cls_queue, gc_queue_ops5)
   //Test remove entries
   librados::ObjectWriteOperation remove_op;
   auto num_entries = list_info1.size();
-  cls_rgw_gc_remove_entries_queue(remove_op,  marker1, num_entries);
+  cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
 
   //Test list queue again for all entries
@@ -428,9 +426,10 @@ TEST(cls_queue, gc_queue_ops6)
 {
   //Testing list queue, when data size is split at the end of the queue
   string queue_name = "my-sixth-queue";
-  uint64_t queue_size = 337, num_urgent_data_entries = 10;
+  uint64_t queue_size = 341, num_urgent_data_entries = 10;
   librados::ObjectWriteOperation op;
-  cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+  op.create(true);
+  cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
 
   uint64_t size = 0;
@@ -461,7 +460,7 @@ TEST(cls_queue, gc_queue_ops6)
   string marker1;
   uint64_t num_entries = 1;
 
-  cls_rgw_gc_remove_entries_queue(remove_op,  marker1, num_entries);
+  cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
 
   //Enqueue one more element
@@ -499,9 +498,10 @@ TEST(cls_queue, gc_queue_ops7)
 {
   //Testing list queue, when data size is written at the end of queue and data is written after wrap around
   string queue_name = "my-seventh-queue";
-  uint64_t queue_size = 338, num_urgent_data_entries = 10;
+  uint64_t queue_size = 342, num_urgent_data_entries = 10;
   librados::ObjectWriteOperation op;
-  cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+  op.create(true);
+  cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
 
   uint64_t size = 0;
@@ -532,7 +532,7 @@ TEST(cls_queue, gc_queue_ops7)
   string marker1;
   uint64_t num_entries = 1;
 
-  cls_rgw_gc_remove_entries_queue(remove_op,  marker1, num_entries);
+  cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
 
   //Enqueue one more element
@@ -570,9 +570,10 @@ TEST(cls_queue, gc_queue_ops8)
 {
   //Testing list queue, when data is split at the end of the queue
   string queue_name = "my-eighth-queue";
-  uint64_t queue_size = 340, num_urgent_data_entries = 10;
+  uint64_t queue_size = 344, num_urgent_data_entries = 10;
   librados::ObjectWriteOperation op;
-  cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+  op.create(true);
+  cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
 
   uint64_t size = 0;
@@ -603,7 +604,7 @@ TEST(cls_queue, gc_queue_ops8)
   string marker1;
   uint64_t num_entries = 1;
 
-  cls_rgw_gc_remove_entries_queue(remove_op,  marker1, num_entries);
+  cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
 
   //Enqueue one more element
@@ -641,9 +642,10 @@ TEST(cls_queue, gc_queue_ops9)
 {
   //Testing remove queue entries
   string queue_name = "my-ninth-queue";
-  uint64_t queue_size = 660, num_urgent_data_entries = 1;
+  uint64_t queue_size = 668, num_urgent_data_entries = 1;
   librados::ObjectWriteOperation op;
-  cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+  op.create(true);
+  cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
 
   uint64_t size = 0;