#include "cls/queue/cls_queue_types.h"
#include "cls/queue/cls_queue_ops.h"
#include "cls/queue/cls_queue_const.h"
-#include "cls/queue/cls_queue.h"
+#include "cls/queue/cls_queue_src.h"
CLS_VER(1,0)
CLS_NAME(queue)
+static int cls_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+ cls_queue_init_op op;
+ op.has_urgent_data = false;
+ try {
+ decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_queue_init_op(): failed to decode entry\n");
+ return -EINVAL;
+ }
+
+ return init_queue(hctx, op);
+}
+
+static int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ cls_queue_get_size_ret op_ret;
+ auto ret = get_queue_size(hctx, op_ret);
+ if (ret < 0) {
+ return ret;
+ }
+
+ encode(op_ret, *out);
+ return 0;
+}
+
+static int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto iter = in->cbegin();
+ cls_queue_enqueue_op op;
+ try {
+ decode(op, iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_enqueue: failed to decode input data \n");
+ return -EINVAL;
+ }
+
+ cls_queue_head head;
+ auto ret = get_queue_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = enqueue(hctx, op, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ //Write back head
+ return write_queue_head(hctx, head);
+}
+
+static int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+ cls_queue_list_op op;
+ try {
+ decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_queue_list_entries(): failed to decode input data\n");
+ return -EINVAL;
+ }
+
+ cls_queue_head head;
+ auto ret = get_queue_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_queue_list_ret op_ret;
+ ret = queue_list_entries(hctx, op, op_ret, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ encode(op_ret, *out);
+ return 0;
+}
+
+static int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+ cls_queue_remove_op op;
+ try {
+ decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode input data\n");
+ return -EINVAL;
+ }
+
+ cls_queue_head head;
+ auto ret = get_queue_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+ ret = queue_remove_entries(hctx, op, head);
+ if (ret < 0) {
+ return ret;
+ }
+ return write_queue_head(hctx, head);
+}
+
CLS_INIT(queue)
{
CLS_LOG(1, "Loaded queue class!");
cls_handle_t h_class;
- cls_method_handle_t h_create_queue;
cls_method_handle_t h_init_queue;
cls_method_handle_t h_get_queue_size;
cls_method_handle_t h_enqueue;
cls_method_handle_t h_queue_list_entries;
cls_method_handle_t h_queue_remove_entries;
- cls_method_handle_t h_queue_update_last_entry;
- cls_method_handle_t h_queue_get_last_entry;
- cls_method_handle_t h_queue_read_urgent_data;
- cls_method_handle_t h_queue_write_urgent_data;
- cls_method_handle_t h_queue_can_urgent_data_fit;
cls_register(QUEUE_CLASS, &h_class);
/* queue*/
- cls_register_cxx_method(h_class, CREATE_QUEUE, CLS_METHOD_WR, cls_create_queue, &h_create_queue);
cls_register_cxx_method(h_class, INIT_QUEUE, CLS_METHOD_WR, cls_init_queue, &h_init_queue);
cls_register_cxx_method(h_class, GET_QUEUE_SIZE, CLS_METHOD_RD, cls_get_queue_size, &h_get_queue_size);
cls_register_cxx_method(h_class, ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_enqueue, &h_enqueue);
- cls_register_cxx_method(h_class, QUEUE_LIST_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_list_entries, &h_queue_list_entries);
+ cls_register_cxx_method(h_class, QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_queue_list_entries, &h_queue_list_entries);
cls_register_cxx_method(h_class, QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_remove_entries, &h_queue_remove_entries);
- cls_register_cxx_method(h_class, QUEUE_UPDATE_LAST_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_update_last_entry, &h_queue_update_last_entry);
- cls_register_cxx_method(h_class, QUEUE_GET_LAST_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_get_last_entry, &h_queue_get_last_entry);
- cls_register_cxx_method(h_class, QUEUE_READ_URGENT_DATA, CLS_METHOD_RD, cls_queue_read_urgent_data, &h_queue_read_urgent_data);
- cls_register_cxx_method(h_class, QUEUE_WRITE_URGENT_DATA, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_write_urgent_data, &h_queue_write_urgent_data);
- cls_register_cxx_method(h_class, QUEUE_CAN_URGENT_DATA_FIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_can_urgent_data_fit, &h_queue_can_urgent_data_fit);
return;
}
+++ /dev/null
-#ifndef CEPH_CLS_QUEUE_H
-#define CEPH_CLS_QUEUE_H
-
-int cls_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_dequeue(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_get_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_update_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_read_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_write_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-int cls_queue_can_urgent_data_fit(cls_method_context_t hctx, bufferlist *in, bufferlist *out);
-
-#endif /* CEPH_CLS_QUEUE_H */
\ No newline at end of file
#include "cls/rgw/cls_rgw_ops.h"
#include "cls/queue/cls_rgw_queue_ops.h"
+#include "cls/queue/cls_queue_ops.h"
#include "cls/queue/cls_queue_const.h"
#include "cls/queue/cls_queue_client.h"
using namespace librados;
-void cls_rgw_gc_create_queue(ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries)
-{
- bufferlist in;
- cls_gc_create_queue_op call;
- call.name = queue_name;
- call.size = size;
- call.num_urgent_data_entries = num_urgent_data_entries;
- encode(call, in);
- op.exec(RGW_QUEUE_CLASS, GC_CREATE_QUEUE, in);
-}
-
void cls_rgw_gc_init_queue(ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries)
{
bufferlist in;
- cls_gc_create_queue_op call;
+ cls_gc_init_queue_op call;
call.name = queue_name;
call.size = size;
call.num_urgent_data_entries = num_urgent_data_entries;
if (r < 0)
return r;
+ cls_queue_get_size_ret op_ret;
auto iter = out.cbegin();
try {
- decode(size, iter);
+ decode(op_ret, iter);
} catch (buffer::error& err) {
return -EIO;
}
+ size = op_ret.queue_size;
+
return 0;
}
entries.swap(ret.entries);
- if (truncated)
- *truncated = ret.truncated;
+ *truncated = ret.truncated;
next_marker = std::move(ret.next_marker);
return 0;
}
-void cls_rgw_gc_remove_entries_queue(ObjectWriteOperation& op, string& marker, uint32_t num_entries)
+void cls_rgw_gc_remove_entries_queue(ObjectWriteOperation& op, uint32_t num_entries)
{
bufferlist in, out;
cls_rgw_gc_queue_remove_op rem_op;
- rem_op.marker = marker;
rem_op.num_entries = num_entries;
encode(rem_op, in);
op.exec(RGW_QUEUE_CLASS, GC_QUEUE_REMOVE_ENTRIES, in);
#include "common/ceph_time.h"
#include "common/Cond.h"
-void cls_rgw_gc_create_queue(librados::ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries);
void cls_rgw_gc_init_queue(librados::ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries);
int cls_rgw_gc_get_queue_size(librados::IoCtx& io_ctx, string& oid, uint64_t& size);
void cls_rgw_gc_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info);
-int cls_rgw_gc_dequeue(librados::IoCtx& io_ctx, string& oid, cls_rgw_gc_obj_info& info);
int cls_rgw_gc_list_queue(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only,
list<cls_rgw_gc_obj_info>& entries, bool *truncated, string& next_marker);
-void cls_rgw_gc_remove_entries_queue(librados::ObjectWriteOperation& op, string& marker, uint32_t num_entries);
+void cls_rgw_gc_remove_entries_queue(librados::ObjectWriteOperation& op, uint32_t num_entries);
void cls_rgw_gc_defer_entry_queue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info);
#endif
\ No newline at end of file
#define QUEUE_CLASS "queue"
#define RGW_QUEUE_CLASS "rgw_queue"
-#define CREATE_QUEUE "create_queue"
#define INIT_QUEUE "init_queue"
#define GET_QUEUE_SIZE "get_queue_size"
#define ENQUEUE "enqueue"
-#define DEQUEUE "dequeue"
#define QUEUE_LIST_ENTRIES "queue_list_entries"
#define QUEUE_REMOVE_ENTRIES "queue_remove_entries"
-#define QUEUE_UPDATE_LAST_ENTRY "queue_update_last_entry"
-#define QUEUE_GET_LAST_ENTRY "queue_get_last_entry"
#define QUEUE_READ_URGENT_DATA "queue_read_urgent_data"
#define QUEUE_WRITE_URGENT_DATA "queue_write_urgent_data"
#define QUEUE_CAN_URGENT_DATA_FIT "queue_can_urgent_data_fit"
-#define GC_CREATE_QUEUE "gc_create_queue"
#define GC_INIT_QUEUE "gc_init_queue"
#define GC_ENQUEUE "gc_enqueue"
#define GC_DEQUEUE "gc_dequeue"
#include "common/Formatter.h"
#include "common/ceph_json.h"
#include "include/utime.h"
-
-void cls_create_queue_op::dump(Formatter *f) const
-{
- head.dump(f);
-}
-
-void cls_create_queue_op::generate_test_instances(list<cls_create_queue_op*>& ls)
-{
- ls.push_back(new cls_create_queue_op);
- ls.push_back(new cls_create_queue_op);
-}
\ No newline at end of file
#include "cls/queue/cls_queue_types.h"
-struct cls_create_queue_op {
+struct cls_queue_init_op {
cls_queue_head head;
uint64_t head_size{0};
+ bool has_urgent_data{false};
- cls_create_queue_op() {}
+ cls_queue_init_op() {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
encode(head, bl);
encode(head_size, bl);
+ encode(has_urgent_data, bl);
ENCODE_FINISH(bl);
}
DECODE_START(1, bl);
decode(head, bl);
decode(head_size, bl);
+ decode(has_urgent_data, bl);
DECODE_FINISH(bl);
}
- void dump(Formatter *f) const;
- static void generate_test_instances(list<cls_create_queue_op*>& ls);
};
-WRITE_CLASS_ENCODER(cls_create_queue_op)
+WRITE_CLASS_ENCODER(cls_queue_init_op)
-struct cls_enqueue_op {
+struct cls_queue_enqueue_op {
vector<bufferlist> bl_data_vec;
- bufferlist bl_urgent_data;
- cls_enqueue_op() {}
+ cls_queue_enqueue_op() {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
encode(bl_data_vec, bl);
- encode(bl_urgent_data, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
decode(bl_data_vec, bl);
- decode(bl_urgent_data, bl);
DECODE_FINISH(bl);
}
};
-WRITE_CLASS_ENCODER(cls_enqueue_op)
-
-struct cls_dequeue_op {
- bufferlist bl;
-
- cls_dequeue_op() {}
-
- static void generate_test_instances(list<cls_dequeue_op*>& ls);
-};
+WRITE_CLASS_ENCODER(cls_queue_enqueue_op)
struct cls_queue_list_op {
uint64_t max;
- uint64_t start_offset;
+ string start_marker;
cls_queue_list_op() {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
encode(max, bl);
- encode(start_offset, bl);
+ encode(start_marker, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
decode(max, bl);
- decode(start_offset, bl);
+ decode(start_marker, bl);
DECODE_FINISH(bl);
}
};
struct cls_queue_list_ret {
bool is_truncated;
- uint64_t next_offset;
+ string next_marker;
vector<bufferlist> data;
- vector<uint64_t> offsets;
- bufferlist bl_urgent_data;
+ vector<string> markers;
cls_queue_list_ret() {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
encode(is_truncated, bl);
- encode(next_offset, bl);
+ encode(next_marker, bl);
encode(data, bl);
- encode(offsets, bl);
- encode(bl_urgent_data, bl);
+ encode(markers, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
decode(is_truncated, bl);
- decode(next_offset, bl);
+ decode(next_marker, bl);
decode(data, bl);
- decode(offsets, bl);
- decode(bl_urgent_data, bl);
+ decode(markers, bl);
DECODE_FINISH(bl);
}
};
WRITE_CLASS_ENCODER(cls_queue_list_ret)
struct cls_queue_remove_op {
- uint64_t start_offset;
- uint64_t end_offset;
- bufferlist bl_urgent_data;
+ string end_marker;
cls_queue_remove_op() {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- encode(start_offset, bl);
- encode(end_offset, bl);
- encode(bl_urgent_data, bl);
+ encode(end_marker, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
- decode(start_offset, bl);
- decode(end_offset, bl);
- decode(bl_urgent_data, bl);
+ decode(end_marker, bl);
DECODE_FINISH(bl);
}
};
WRITE_CLASS_ENCODER(cls_queue_remove_op)
-struct cls_queue_urgent_data_ret {
- bufferlist bl_urgent_data;
-
- cls_queue_urgent_data_ret() {}
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(bl_urgent_data, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(bl_urgent_data, bl);
- DECODE_FINISH(bl);
- }
-};
-WRITE_CLASS_ENCODER(cls_queue_urgent_data_ret)
-
-struct cls_queue_write_urgent_data_op {
- bufferlist bl_urgent_data;
+struct cls_queue_get_size_ret {
+ uint64_t queue_size;
- cls_queue_write_urgent_data_op() {}
+ cls_queue_get_size_ret() {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- encode(bl_urgent_data, bl);
+ encode(queue_size, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
- decode(bl_urgent_data, bl);
+ decode(queue_size, bl);
DECODE_FINISH(bl);
}
};
-WRITE_CLASS_ENCODER(cls_queue_write_urgent_data_op)
-
-struct cls_queue_update_last_entry_op {
- bufferlist bl_data;
- bufferlist bl_urgent_data;
-
- cls_queue_update_last_entry_op() {}
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(bl_data, bl);
- encode(bl_urgent_data, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(bl_data, bl);
- decode(bl_urgent_data, bl);
- DECODE_FINISH(bl);
- }
-};
-WRITE_CLASS_ENCODER(cls_queue_update_last_entry_op)
-
-struct cls_init_queue_op : cls_create_queue_op{
-
-};
-WRITE_CLASS_ENCODER(cls_init_queue_op)
+WRITE_CLASS_ENCODER(cls_queue_get_size_ret)
#endif /* CEPH_CLS_QUEUE_OPS_H */
\ No newline at end of file
#include "cls/queue/cls_queue_types.h"
#include "cls/queue/cls_queue_ops.h"
#include "cls/queue/cls_queue_const.h"
-#include "cls/queue/cls_queue.h"
+#include "cls/queue/cls_queue_src.h"
-static int get_queue_head_and_size(cls_method_context_t hctx, cls_queue_head& head, uint64_t& head_size)
+int write_queue_head(cls_method_context_t hctx, cls_queue_head& head)
{
- //read head size
- bufferlist bl_head_size;
- int ret = cls_cxx_read(hctx, 0, sizeof(uint64_t), &bl_head_size);
+ bufferlist bl;
+ uint16_t entry_start = QUEUE_HEAD_START;
+ encode(entry_start, bl);
+
+ bufferlist bl_head;
+ encode(head, bl_head);
+
+ uint64_t encoded_len = bl_head.length();
+ encode(encoded_len, bl);
+
+ bl.claim_append(bl_head);
+
+ int ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
if (ret < 0) {
- CLS_LOG(0, "ERROR: get_queue_head_and_size: failed to read head with error %d\n", ret);
+ CLS_LOG(5, "ERROR: write_queue_head: failed to write head \n");
return ret;
}
- //decode head size
- auto iter = bl_head_size.cbegin();
- try {
- decode(head_size, iter);
- } catch (buffer::error& err) {
- CLS_LOG(0, "ERROR: get_queue_head_and_size: failed to decode head size \n");
- return -EINVAL;
- }
+ return 0;
+}
- CLS_LOG(10, "INFO: get_queue_head_and_size: head size is %lu\n", head_size);
+int get_queue_head(cls_method_context_t hctx, cls_queue_head& head)
+{
+ uint64_t chunk_size = 1024, start_offset = 0;
- //read the head
bufferlist bl_head;
- ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ int ret = cls_cxx_read(hctx, start_offset, chunk_size, &bl_head);
if (ret < 0) {
+ CLS_LOG(5, "ERROR: get_queue_head: failed to read head \n");
return ret;
}
- //decode head
- iter = bl_head.cbegin();
+ //Process the chunk of data read
+ auto it = bl_head.cbegin();
+ // Queue head start
+ uint16_t queue_head_start;
try {
- decode(head, iter);
+ decode(queue_head_start, it);
} catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: get_queue_head_and_size: failed to decode head\n");
+ CLS_LOG(0, "ERROR: get_queue_head: failed to decode queue start \n");
+ return -EINVAL;
+ }
+ if (queue_head_start != QUEUE_HEAD_START) {
+ CLS_LOG(0, "ERROR: get_queue_head: invalid queue start \n");
return -EINVAL;
}
- return 0;
-}
-
-int cls_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- auto in_iter = in->cbegin();
-
- cls_create_queue_op op;
+ uint64_t encoded_len;
try {
- decode(op, in_iter);
+ decode(encoded_len, it);
} catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_create_queue(): failed to decode entry\n");
+ CLS_LOG(0, "ERROR: get_queue_head: failed to decode encoded head size \n");
return -EINVAL;
}
- // create the object
- int ret = cls_cxx_create(hctx, true);
- if (ret < 0) {
- CLS_LOG(0, "ERROR: %s(): cls_cxx_create returned %d", __func__, ret);
- return ret;
- }
-
- CLS_LOG(10, "INFO: cls_create_queue create queue of head size %lu", op.head_size);
- CLS_LOG(10, "INFO: cls_create_queue create queue of size %lu", op.head.queue_size);
-
- uint64_t head_size = QUEUE_HEAD_SIZE_1K;
-
- if (op.head.has_urgent_data) {
- if (op.head_size == 0) {
- head_size = QUEUE_HEAD_SIZE_4K;
- op.head.tail = op.head.front = QUEUE_START_OFFSET_4K;
- op.head.last_entry_offset = QUEUE_START_OFFSET_4K;
- } else {
- head_size = op.head_size;
- op.head.tail = op.head.front = head_size;
- op.head.last_entry_offset = head_size;
+ uint8_t decoded_head_size = sizeof(uint64_t) + sizeof(uint16_t);
+ if (encoded_len > (chunk_size - decoded_head_size)) {
+ chunk_size = (encoded_len - (chunk_size - decoded_head_size));
+ start_offset += decoded_head_size;
+ bufferlist bl_remaining_head;
+ int ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+ if (ret < 0) {
+ CLS_LOG(5, "ERROR: get_queue_head: failed to read remaining part of head \n");
+ return ret;
}
- } else {
- head_size = QUEUE_HEAD_SIZE_1K;
- op.head.tail = op.head.front = QUEUE_START_OFFSET_1K;
- op.head.last_entry_offset = QUEUE_START_OFFSET_1K;
+ bl_head.claim_append(bl_remaining_head);
}
- op.head.queue_size += head_size;
-
- CLS_LOG(10, "INFO: cls_create_queue queue actual size %lu", op.head.queue_size);
- CLS_LOG(10, "INFO: cls_create_queue head size %lu", head_size);
- CLS_LOG(10, "INFO: cls_create_queue queue front offset %lu", op.head.front);
-
-
- //encode head size
- bufferlist bl;
- encode(head_size, bl);
- CLS_LOG(0, "INFO: cls_create_queue head size %u", bl.length());
- //encode head
- bufferlist bl_head;
- encode(op.head, bl_head);
-
- bl.claim_append(bl_head);
-
- CLS_LOG(0, "INFO: cls_create_queue writing head of size %u", bl.length());
- ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
- if (ret < 0) {
- CLS_LOG(0, "ERROR: %s(): cls_cxx_write returned %d", __func__, ret);
- return ret;
+ try {
+ decode(head, it);
+ } catch (buffer::error& err) {
+ CLS_LOG(0, "ERROR: get_queue_head: failed to decode head\n");
+ return -EINVAL;
}
+
return 0;
}
-int cls_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int init_queue(cls_method_context_t hctx, const cls_queue_init_op& op)
{
//get head and its size
- uint64_t head_size = 0;
cls_queue_head head;
- int ret = get_queue_head_and_size(hctx, head, head_size);
+ int ret = get_queue_head(hctx, head);
//head is already initialized
if (ret == 0) {
- CLS_LOG(1, "INFO: cls_init_queue_op(): head is already initialized\n");
- return 0;
+ return -EEXIST;
}
if (ret < 0 && ret != -EINVAL) {
return ret;
}
- auto in_iter = in->cbegin();
- cls_init_queue_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_init_queue_op(): failed to decode entry\n");
- return -EINVAL;
- }
+ head = std::move(op.head);
- CLS_LOG(10, "INFO: cls_init_queue_op init queue of head size %lu", op.head_size);
- CLS_LOG(10, "INFO: cls_init_queue_op init queue of size %lu", op.head.queue_size);
-
- head_size = QUEUE_HEAD_SIZE_1K;
-
- if (op.head.has_urgent_data) {
+ if (op.has_urgent_data) {
if (op.head_size == 0) {
- head_size = QUEUE_HEAD_SIZE_4K;
- op.head.tail = op.head.front = QUEUE_START_OFFSET_4K;
- op.head.last_entry_offset = QUEUE_START_OFFSET_4K;
+ head.max_head_size = QUEUE_HEAD_SIZE_4K;
+ head.tail.offset = head.front.offset = QUEUE_START_OFFSET_4K;
} else {
- head_size = op.head_size;
- op.head.tail = op.head.front = head_size;
- op.head.last_entry_offset = head_size;
+ head.max_head_size = op.head_size;
+ head.tail.offset = head.front.offset = head.max_head_size;
}
+ bufferlist bl_non_urgent;
+ uint16_t queue_start = QUEUE_HEAD_START;
+ uint64_t queue_encoded_len = 0;//dummy value for calculating max size of urgent data
+ encode(queue_start, bl_non_urgent);
+ encode(queue_encoded_len, bl_non_urgent);
+ encode(head, bl_non_urgent);
+ head.max_urgent_data_size = head.max_head_size - (bl_non_urgent.length() - head.bl_urgent_data.length());
} else {
- head_size = QUEUE_HEAD_SIZE_1K;
- op.head.tail = op.head.front = QUEUE_START_OFFSET_1K;
- op.head.last_entry_offset = QUEUE_START_OFFSET_1K;
+ head.max_head_size = QUEUE_HEAD_SIZE_1K;
+ head.tail.offset = head.front.offset = QUEUE_START_OFFSET_1K;
+ head.max_urgent_data_size = 0;
}
- op.head.queue_size += head_size;
-
- CLS_LOG(10, "INFO: cls_init_queue_op queue actual size %lu", op.head.queue_size);
- CLS_LOG(10, "INFO: cls_init_queue_op head size %lu", head_size);
- CLS_LOG(10, "INFO: cls_init_queue_op queue front offset %lu", op.head.front);
-
- //encode head size
- bufferlist bl;
- encode(head_size, bl);
- CLS_LOG(0, "INFO: cls_init_queue_op head size %u", bl.length());
-
- //encode head
- bufferlist bl_head;
- encode(op.head, bl_head);
-
- bl.claim_append(bl_head);
+ head.tail.gen = head.front.gen = 0;
+ head.queue_size += head.max_head_size;
+
+ CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head.queue_size);
+ CLS_LOG(20, "INFO: init_queue_op head size %lu", head.max_head_size);
+ CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head.front.to_str().c_str());
+ CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head.max_urgent_data_size);
- CLS_LOG(0, "INFO: cls_init_queue_op writing head of size %u", bl.length());
- ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
- if (ret < 0) {
- CLS_LOG(0, "ERROR: %s(): cls_init_queue_op returned %d", __func__, ret);
- return ret;
- }
- return 0;
+ return write_queue_head(hctx, head);
}
-int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int get_queue_size(cls_method_context_t hctx, cls_queue_get_size_ret& op_ret)
{
- //get head and its size
- uint64_t head_size = 0;
+ //get head
cls_queue_head head;
- int ret = get_queue_head_and_size(hctx, head, head_size);
+ int ret = get_queue_head(hctx, head);
if (ret < 0) {
return ret;
}
- head.queue_size -= head_size;
-
- CLS_LOG(10, "INFO: cls_get_queue_size: size of queue is %lu\n", head.queue_size);
+ op_ret.queue_size = head.queue_size - head.max_head_size;
- encode(head.queue_size, *out);
+ CLS_LOG(20, "INFO: get_queue_size: size of queue is %lu\n", op_ret.queue_size);
return 0;
}
-int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head)
{
- //get head and its size
- uint64_t head_size = 0;
- cls_queue_head head;
- int ret = get_queue_head_and_size(hctx, head, head_size);
- if (ret < 0) {
- return ret;
- }
-
- if ((head.front == head.tail) && (! head.is_empty)) {
- CLS_LOG(1, "ERROR: No space left in queue\n");
+ if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) {
+ CLS_LOG(0, "ERROR: No space left in queue\n");
return -ENOSPC;
}
- auto iter = in->cbegin();
- cls_enqueue_op op;
- try {
- decode(op, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_enqueue: failed to decode input data \n");
- return -EINVAL;
- }
-
- for (auto bl_data : op.bl_data_vec) {
+ for (auto& bl_data : op.bl_data_vec) {
bufferlist bl;
+ uint16_t entry_start = QUEUE_ENTRY_START;
+ encode(entry_start, bl);
uint64_t data_size = bl_data.length();
encode(data_size, bl);
bl.claim_append(bl_data);
+
+ CLS_LOG(10, "INFO: enqueue(): Total size to be written is %u and data size is %u\n", bl.length(), bl_data.length());
- CLS_LOG(1, "INFO: cls_enqueue(): Total size to be written is %u and data size is %u\n", bl.length(), bl_data.length());
-
- if (head.tail >= head.front) {
+ if (head.tail.offset >= head.front.offset) {
// check if data can fit in the remaining space in queue
- if ((head.tail + bl.length()) <= head.queue_size) {
- CLS_LOG(1, "INFO: cls_enqueue: Writing data size and data: offset: %lu, size: %u\n", head.tail, bl.length());
- head.last_entry_offset = head.tail;
+ if ((head.tail.offset + bl.length()) <= head.queue_size) {
+ CLS_LOG(5, "INFO: enqueue: Writing data size and data: offset: %s, size: %u\n", head.tail.to_str().c_str(), bl.length());
//write data size and data at tail offset
- ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+ auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
return ret;
}
- head.tail += bl.length();
+ head.tail.offset += bl.length();
} else {
- CLS_LOG(1, "INFO: Wrapping around and checking for free space\n");
- uint64_t free_space_available = (head.queue_size - head.tail) + (head.front - head_size);
+ uint64_t free_space_available = (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size);
//Split data if there is free space available
if (bl.length() <= free_space_available) {
- uint64_t size_before_wrap = head.queue_size - head.tail;
+ uint64_t size_before_wrap = head.queue_size - head.tail.offset;
bufferlist bl_data_before_wrap;
bl.splice(0, size_before_wrap, &bl_data_before_wrap);
- head.last_entry_offset = head.tail;
//write spliced (data size and data) at tail offset
- CLS_LOG(1, "INFO: cls_enqueue: Writing spliced data at offset: %lu and data size: %u\n", head.tail, bl_data_before_wrap.length());
- ret = cls_cxx_write2(hctx, head.tail, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+ CLS_LOG(5, "INFO: enqueue: Writing spliced data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl_data_before_wrap.length());
+ auto ret = cls_cxx_write2(hctx, head.tail.offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
return ret;
}
- head.tail = head_size;
+ head.tail.offset = head.max_head_size;
+ head.tail.gen += 1;
//write remaining data at tail offset after wrapping around
- CLS_LOG(1, "INFO: cls_enqueue: Writing remaining data at offset: %lu and data size: %u\n", head.tail, bl.length());
- ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+ CLS_LOG(5, "INFO: enqueue: Writing remaining data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl.length());
+ ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
return ret;
}
- head.tail += bl.length();
+ head.tail.offset += bl.length();
} else {
- CLS_LOG(1, "ERROR: No space left in queue\n");
+ CLS_LOG(0, "ERROR: No space left in queue\n");
// return queue full error
return -ENOSPC;
}
}
- } else if (head.front > head.tail) {
- if ((head.tail + bl.length()) <= head.front) {
- CLS_LOG(1, "INFO: cls_enqueue: Writing data size and data: offset: %lu, size: %u\n\n", head.tail, bl.length());
- head.last_entry_offset = head.tail;
+ } else if (head.front.offset > head.tail.offset) {
+ if ((head.tail.offset + bl.length()) <= head.front.offset) {
+ CLS_LOG(5, "INFO: enqueue: Writing data size and data: offset: %s, size: %u\n\n", head.tail.to_str().c_str(), bl.length());
//write data size and data at tail offset
- ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+ auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
return ret;
}
- head.tail += bl.length();
+ head.tail.offset += bl.length();
} else {
- CLS_LOG(1, "ERROR: No space left in queue\n");
+ CLS_LOG(0, "ERROR: No space left in queue\n");
// return queue full error
return -ENOSPC;
}
}
- if (head.tail == head.queue_size) {
- head.tail = head_size;
+ if (head.tail.offset == head.queue_size) {
+ head.tail.offset = head.max_head_size;
+ head.tail.gen += 1;
}
- CLS_LOG(1, "INFO: cls_enqueue: New tail offset: %lu \n", head.tail);
+ CLS_LOG(20, "INFO: enqueue: New tail offset: %s \n", head.tail.to_str().c_str());
} //end - for
- head.is_empty = false;
-
- //Update urgent data if set
- if (op.bl_urgent_data.length() > 0) {
- head.bl_urgent_data = op.bl_urgent_data;
- }
-
- bufferlist bl_head;
- encode(head, bl_head);
- CLS_LOG(1, "INFO: cls_enqueue: Writing head of size: %u \n", bl_head.length());
- ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
- if (ret < 0) {
- CLS_LOG(1, "INFO: cls_enqueue: Writing head returned error: %d \n", ret);
- return ret;
- }
-
return 0;
}
-int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head)
{
- //get head and its size
- uint64_t head_size = 0;
- cls_queue_head head;
- int ret = get_queue_head_and_size(hctx, head, head_size);
- if (ret < 0) {
- return ret;
- }
-
// If queue is empty, return from here
- if (head.is_empty) {
- return -ENOENT;
+ if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
+ CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s\n", head.front.to_str().c_str());
+ op_ret.next_marker = head.front.to_str();
+ op_ret.is_truncated = false;
+ return 0;
}
- cls_queue_list_ret op_ret;
- CLS_LOG(1, "INFO: cls_queue_list_entries: Is urgent data present: %d\n", head.has_urgent_data);
- //Info related to urgent data
- op_ret.bl_urgent_data = head.bl_urgent_data;
-
- auto in_iter = in->cbegin();
-
- cls_queue_list_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_queue_list_entries(): failed to decode input data\n");
- return -EINVAL;
- }
+ cls_queue_marker start_marker;
+ start_marker.from_str(op.start_marker.c_str());
+ cls_queue_marker next_marker = {0, 0};
- uint64_t start_offset = 0;
- if (op.start_offset == 0) {
- start_offset = head.front;
+ uint64_t start_offset = 0, gen = 0;
+ if (start_marker.offset == 0) {
+ start_offset = head.front.offset;
+ gen = head.front.gen;
} else {
- start_offset = op.start_offset;
+ start_offset = start_marker.offset;
+ gen = start_marker.gen;
}
op_ret.is_truncated = true;
bool wrap_around = false;
//Calculate length of contiguous data to be read depending on front, tail and start offset
- if (head.tail > head.front) {
- contiguous_data_size = head.tail - start_offset;
- } else if (head.front >= head.tail) {
- if (start_offset >= head.front) {
+ if (head.tail.offset > head.front.offset) {
+ contiguous_data_size = head.tail.offset - start_offset;
+ } else if (head.front.offset >= head.tail.offset) {
+ if (start_offset >= head.front.offset) {
contiguous_data_size = head.queue_size - start_offset;
wrap_around = true;
- } else if (start_offset <= head.tail) {
- contiguous_data_size = head.tail - start_offset;
+ } else if (start_offset <= head.tail.offset) {
+ contiguous_data_size = head.tail.offset - start_offset;
}
}
- CLS_LOG(1, "INFO: cls_queue_list_entries(): front is: %lu, tail is %lu\n", head.front, head.tail);
+ CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s\n", head.front.to_str().c_str(), head.tail.to_str().c_str());
- bool offset_populated = false;
- uint64_t num_ops = 0;
+ bool offset_populated = false, entry_start_processed = false;
+ uint64_t data_size = 0, num_ops = 0;
+ uint16_t entry_start = 0;
bufferlist bl;
do
{
- CLS_LOG(1, "INFO: cls_queue_list_entries(): start_offset is %lu\n", start_offset);
+ CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu\n", start_offset);
bufferlist bl_chunk;
//Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size
} else {
size_to_read = contiguous_data_size;
}
- CLS_LOG(1, "INFO: cls_queue_list_entries(): size_to_read is %lu\n", size_to_read);
+ CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu\n", size_to_read);
if (size_to_read == 0) {
+ next_marker = head.tail;
op_ret.is_truncated = false;
- CLS_LOG(1, "INFO: cls_queue_list_entries(): size_to_read is 0, hence breaking out!\n");
+ CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n");
break;
}
- ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk);
+ auto ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk);
if (ret < 0) {
return ret;
}
//If there is leftover data from previous iteration, append new data to leftover data
bl.claim_append(bl_chunk);
- bl_chunk = bl;
- bl.clear();
+ bl_chunk = std::move(bl);
- CLS_LOG(1, "INFO: cls_queue_list_entries(): size of chunk %u\n", bl_chunk.length());
+ CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u\n", bl_chunk.length());
//Process the chunk of data read
unsigned index = 0;
auto it = bl_chunk.cbegin();
uint64_t size_to_process = bl_chunk.length();
do {
- CLS_LOG(1, "INFO: cls_queue_list_entries(): index: %u, size_to_process: %lu\n", index, size_to_process);
+ CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu\n", index, size_to_process);
it.seek(index);
- uint64_t data_size = 0;
- //Populate offset if not done, else don't if already done in previous iteration
+ //Populate offset if not done in previous iteration
if (! offset_populated) {
- uint64_t data_offset = start_offset + index;
- op_ret.offsets.emplace_back(data_offset);
- CLS_LOG(1, "INFO: cls_queue_list_entries(): offset: %lu\n", data_offset);
+ cls_queue_marker marker = {start_offset + index, gen};
+ CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker.to_str().c_str());
+ string opaque_marker = marker.to_str();
+ op_ret.markers.emplace_back(opaque_marker);
}
- if (size_to_process >= sizeof(uint64_t)) {
- try {
- decode(data_size, it);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_queue_list_entries: failed to decode data size \n");
- return -EINVAL;
+ // Magic number + Data size - process if not done in previous iteration
+ if (! entry_start_processed ) {
+ if (size_to_process >= (sizeof(uint16_t) + sizeof(uint64_t))) {
+ // Decode magic number at start
+ try {
+ decode(entry_start, it);
+ } catch (buffer::error& err) {
+ CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start \n");
+ return -EINVAL;
+ }
+ if (entry_start != QUEUE_ENTRY_START) {
+ CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u\n", entry_start);
+ return -EINVAL;
+ }
+ index += sizeof(uint16_t);
+ size_to_process -= sizeof(uint16_t);
+ // Decode data size
+ try {
+ decode(data_size, it);
+ } catch (buffer::error& err) {
+ CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size \n");
+ return -EINVAL;
+ }
+ } else {
+ // Copy unprocessed data to bl
+ bl_chunk.splice(index, size_to_process, &bl);
+ offset_populated = true;
+ CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!\n");
+ break;
}
- } else {
- // Copy unprocessed data to bl
- bl_chunk.copy(index, size_to_process, bl);
- offset_populated = true;
- CLS_LOG(1, "INFO: cls_queue_list_entries: not enough data to read data size, breaking out!\n");
- break;
+ CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu\n", data_size);
+ index += sizeof(uint64_t);
+ size_to_process -= sizeof(uint64_t);
}
- CLS_LOG(1, "INFO: cls_queue_list_entries(): data size: %lu\n", data_size);
- index += sizeof(uint64_t);
- size_to_process -= sizeof(uint64_t);
- bufferlist bl_data;
+ // Data
if (data_size <= size_to_process) {
+ bufferlist bl_data;
bl_chunk.copy(index, data_size, bl_data);
//Return data here
op_ret.data.emplace_back(bl_data);
index += bl_data.length();
size_to_process -= bl_data.length();
} else {
- index -= sizeof(uint64_t);
- size_to_process += sizeof(uint64_t);
bl_chunk.copy(index, size_to_process, bl);
offset_populated = true;
- CLS_LOG(1, "INFO: cls_queue_list_entries(): not enough data to read data, breaking out!\n");
+ entry_start_processed = true;
+ CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!\n");
break;
}
+ // Resetting some values
offset_populated = false;
+ entry_start_processed = false;
+ data_size = 0;
+ entry_start = 0;
num_ops++;
if (num_ops == op.max) {
- CLS_LOG(1, "INFO: cls_queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n");
- break;
- }
- if (index == bl_chunk.length()) {
+ CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n");
break;
}
} while(index < bl_chunk.length());
- CLS_LOG(1, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max);
+ CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max);
if (num_ops == op.max) {
- op_ret.next_offset = start_offset + index;
- CLS_LOG(1, "INFO: cls_queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", op_ret.next_offset);
+ next_marker = cls_queue_marker{(start_offset + index), gen};
+ CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", next_marker.offset);
break;
}
contiguous_data_size -= size_to_read;
if (contiguous_data_size == 0) {
if (wrap_around) {
- start_offset = head_size;
- contiguous_data_size = head.tail - head_size;
+ start_offset = head.max_head_size;
+ contiguous_data_size = head.tail.offset - head.max_head_size;
+ gen += 1;
wrap_around = false;
} else {
- CLS_LOG(1, "INFO: cls_queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n");
- op_ret.next_offset = head.front;
+ CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n");
+ next_marker = head.tail;
op_ret.is_truncated = false;
break;
}
} while(num_ops < op.max);
//Wrap around next offset if it has reached end of queue
- if (op_ret.next_offset == head.queue_size) {
- op_ret.next_offset = head_size;
+ if (next_marker.offset == head.queue_size) {
+ next_marker.offset = head.max_head_size;
+ next_marker.gen += 1;
}
- if (op_ret.next_offset == head.tail) {
+ if ((next_marker.offset == head.tail.offset) && (next_marker.gen == head.tail.gen)) {
op_ret.is_truncated = false;
}
- encode(op_ret, *out);
+ CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s\n", next_marker.to_str().c_str());
+ op_ret.next_marker = next_marker.to_str();
return 0;
}
-int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head)
{
- //get head and its size
- uint64_t head_size = 0;
- cls_queue_head head;
- int ret = get_queue_head_and_size(hctx, head, head_size);
- if (ret < 0) {
- return ret;
- }
-
- if (head.is_empty) {
- return -ENOENT;
- }
-
- auto in_iter = in->cbegin();
- cls_queue_remove_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode input data\n");
- return -EINVAL;
- }
-
- // If start offset is not set or set to zero, then we need to shift it to actual front of queue
- if (op.start_offset == 0) {
- op.start_offset = head.front;
- }
-
- if (op.start_offset != head.front) {
- CLS_LOG(1, "ERROR: cls_queue_remove_entries: invalid start offset\n");
- CLS_LOG(1, "ERROR: cls_queue_remove_entries: start offset = %lu\n", op.start_offset);
- CLS_LOG(1, "ERROR: cls_queue_remove_entries: front = %lu\n", head.front);
- return -EINVAL;
+ //Queue is empty
+ if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
+ return 0;
}
- uint64_t end_offset = 0;
- uint64_t data_size = 0;
-
- //Check for data size wrap around
- if ((head.queue_size - op.end_offset) >= sizeof(uint64_t)) {
- // Read the size from the end offset
- bufferlist bl_size;
- ret = cls_cxx_read(hctx, op.end_offset, sizeof(uint64_t), &bl_size);
- if (ret < 0) {
- CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read data size \n");
- return ret;
- }
- auto iter = bl_size.cbegin();
- try {
- decode(data_size, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n");
- return -EINVAL;
- }
- //Check for data wrap around
- if ((head.queue_size - (op.end_offset + sizeof(uint64_t))) >= data_size) {
- end_offset = op.end_offset + sizeof(uint64_t) + data_size;
- } else {
- uint64_t rem_size = data_size - (head.queue_size - (op.end_offset + sizeof(uint64_t)));
- end_offset = head_size + rem_size;
- }
- } else {
- bufferlist bl_size;
- ret = cls_cxx_read(hctx, op.end_offset, (head.queue_size - op.end_offset), &bl_size);
- if (ret < 0) {
- CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read first part of data size \n");
- return ret;
- }
- uint64_t rem_size = sizeof(uint64_t) - (head.queue_size - op.end_offset);
- bufferlist bl;
- ret = cls_cxx_read(hctx, head_size, rem_size, &bl);
- if (ret < 0) {
- CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read second part of data size \n");
- return ret;
- }
- bl_size.claim_append(bl);
- auto iter = bl_size.cbegin();
- try {
- decode(data_size, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n");
- return -EINVAL;
- }
- end_offset = head_size + rem_size + data_size;
- }
+ cls_queue_marker end_marker;
+ end_marker.from_str(op.end_marker.c_str());
- CLS_LOG(1, "INFO: cls_queue_remove_entries: op.end_offset = %lu\n", op.end_offset);
- CLS_LOG(1, "INFO: cls_queue_remove_entries: data_size = %lu\n", data_size);
- CLS_LOG(1, "INFO: cls_queue_remove_entries: end_offset = %lu\n", end_offset);
+ CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s\n", end_marker.to_str().c_str());
//Zero out the entries that have been removed, to reclaim storage space
- if (end_offset > op.start_offset) {
- ret = cls_cxx_write_zero(hctx, op.start_offset, (end_offset - op.start_offset));
- if (ret < 0) {
- CLS_LOG(1, "INFO: cls_queue_remove_entries: Failed to zero out entries\n");
- CLS_LOG(1, "INFO: cls_queue_remove_entries: Start offset = %lu\n", op.start_offset);
- CLS_LOG(1, "INFO: cls_queue_remove_entries: Length = %lu\n", (end_offset - op.start_offset));
- return ret;
- }
- } else { //start offset > end offset
- ret = cls_cxx_write_zero(hctx, op.start_offset, (head.queue_size - op.start_offset));
- if (ret < 0) {
- CLS_LOG(1, "INFO: cls_queue_remove_entries: Failed to zero out entries\n");
- CLS_LOG(1, "INFO: cls_queue_remove_entries: Start offset = %lu\n", op.start_offset);
- CLS_LOG(1, "INFO: cls_queue_remove_entries: Length = %lu\n", (head.queue_size - op.start_offset));
- return ret;
- }
- ret = cls_cxx_write_zero(hctx, head_size, (end_offset - head_size));
- if (ret < 0) {
- CLS_LOG(1, "INFO: cls_queue_remove_entries: Failed to zero out entries\n");
- CLS_LOG(1, "INFO: cls_queue_remove_entries: Start offset = %lu\n", head_size);
- CLS_LOG(1, "INFO: cls_queue_remove_entries: Length = %lu\n", (end_offset - head_size));
- return ret;
- }
- }
-
- head.front = end_offset;
-
- // Check if it is the end, then wrap around
- if (head.front == head.queue_size) {
- head.front = head_size;
- }
-
- CLS_LOG(1, "INFO: cls_queue_remove_entries: front offset is: %lu and tail offset is %lu\n", head.front, head.tail);
-
- // We've reached the last element
- if (head.front == head.tail) {
- CLS_LOG(1, "INFO: cls_queue_remove_entries: Queue is empty now!\n");
- head.is_empty = true;
- }
-
- //Update urgent data map
- if (op.bl_urgent_data.length() > 0) {
- head.bl_urgent_data = op.bl_urgent_data;
- }
-
- //Write head back
- bufferlist bl_head;
- encode(head, bl_head);
- CLS_LOG(1, "INFO: cls_queue_remove_entries: Writing head of size: %u and front offset is: %lu\n", bl_head.length(), head.front);
- ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
- if (ret < 0) {
- CLS_LOG(1, "INFO: cls_queue_remove_entries: Writing head returned error: %d \n", ret);
- return ret;
- }
-
- return 0;
-}
-
-int cls_queue_get_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- //get head and its size
- uint64_t head_size = 0;
- cls_queue_head head;
- int ret = get_queue_head_and_size(hctx, head, head_size);
- if (ret < 0) {
- return ret;
- }
-
- uint64_t data_size = 0, last_entry_offset = head.last_entry_offset;
-
- //Check for data size wrap around
- if ((head.queue_size - last_entry_offset) >= sizeof(uint64_t)) {
- // Read the size from the end offset
- bufferlist bl_size;
- ret = cls_cxx_read(hctx, last_entry_offset, sizeof(uint64_t), &bl_size);
- if (ret < 0) {
- CLS_LOG(1, "ERROR: cls_queue_get_last_entry: failed to read data size \n");
- return ret;
- }
- auto iter = bl_size.cbegin();
- try {
- decode(data_size, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_queue_get_last_entry: failed to decode data size \n");
- return -EINVAL;
- }
- last_entry_offset += sizeof(uint64_t);
- //Check for data wrap around
- if ((head.queue_size - (last_entry_offset + sizeof(uint64_t))) >= data_size) {
- CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data is read from from last entry offset %lu\n", last_entry_offset);
- ret = cls_cxx_read2(hctx, last_entry_offset, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
- if (ret < 0) {
- return ret;
- }
- } else {
- CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data is read from from last entry offset %lu\n", last_entry_offset);
- ret = cls_cxx_read2(hctx, last_entry_offset, (head.queue_size - (last_entry_offset + sizeof(uint64_t))), out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+ if (end_marker.offset > head.front.offset && end_marker.gen == head.front.gen) {
+ uint64_t len = end_marker.offset - head.front.offset;
+ if (len > 0) {
+ auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
if (ret < 0) {
+ CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
+ CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head.front.to_str().c_str());
return ret;
}
- uint64_t rem_size = data_size - (head.queue_size - (last_entry_offset + sizeof(uint64_t)));
- bufferlist bl;
- ret = cls_cxx_read(hctx, head_size, rem_size, &bl);
- if (ret < 0) {
- return ret;
- }
- out->claim_append(bl);
- }
- } else {
- bufferlist bl_size;
- ret = cls_cxx_read(hctx, last_entry_offset, (head.queue_size - last_entry_offset), &bl_size);
- if (ret < 0) {
- CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read first part of data size \n");
- return ret;
- }
- uint64_t rem_size = sizeof(uint64_t) - (head.queue_size - last_entry_offset);
- bufferlist bl;
- ret = cls_cxx_read(hctx, head_size, rem_size, &bl);
- if (ret < 0) {
- CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read second part of data size \n");
- return ret;
}
- bl_size.claim_append(bl);
- auto iter = bl_size.cbegin();
- try {
- decode(data_size, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n");
- return -EINVAL;
- }
- last_entry_offset = head_size + rem_size;
- CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data is read from from last entry offset %lu\n", last_entry_offset);
- ret = cls_cxx_read2(hctx, last_entry_offset, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
- if (ret < 0) {
- return ret;
- }
- }
-
- return 0;
-}
-
-int cls_queue_update_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- //get head and its size
- uint64_t head_size = 0;
- cls_queue_head head;
- int ret = get_queue_head_and_size(hctx, head, head_size);
- if (ret < 0) {
- return ret;
- }
-
- auto in_iter = in->cbegin();
-
- cls_queue_update_last_entry_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_queue_update_last_entry: failed to decode input data\n");
- return -EINVAL;
- }
-
- bufferlist bl;
- uint64_t data_size = op.bl_data.length();
- encode(data_size, bl);
- bl.claim_append(op.bl_data);
-
- CLS_LOG(1, "INFO: cls_queue_update_last_entry_op: Updating data at last offset: %lu and total data size is %u\n", head.last_entry_offset, bl.length());
-
- // check if data can fit in the remaining space in queue
- if ((head.last_entry_offset + bl.length()) <= head.queue_size) {
- //write data size and data at tail offset
- ret = cls_cxx_write2(hctx, head.last_entry_offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
- if (ret < 0) {
- return ret;
- }
- } else {
- CLS_LOG(1, "INFO: Wrapping around and checking for free space\n");
- uint64_t free_space_available = (head.queue_size - head.last_entry_offset) + (head.front - head_size);
- //Split data if there is free space available
- if (bl.length() <= free_space_available) {
- uint64_t size_before_wrap = head.queue_size - head.last_entry_offset;
- bufferlist bl_data_before_wrap;
- bl.splice(0, size_before_wrap, &bl_data_before_wrap);
- //write spliced (data size and data) at last entry offset
- CLS_LOG(1, "INFO: cls_enqueue: Writing spliced data at offset: %lu and data size: %u\n", head.last_entry_offset, bl_data_before_wrap.length());
- ret = cls_cxx_write2(hctx, head.last_entry_offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+ } else if ((head.front.offset >= end_marker.offset) && (end_marker.gen == head.front.gen + 1)) { //start offset > end offset
+ uint64_t len = head.queue_size - head.front.offset;
+ if (len > 0) {
+ auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
if (ret < 0) {
+ CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
+ CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head.front.to_str().c_str());
return ret;
}
- //write remaining data after wrapping around
- CLS_LOG(1, "INFO: cls_enqueue: Writing remaining data at offset: %lu and data size: %u\n", head_size, bl.length());
- ret = cls_cxx_write2(hctx, head_size, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+ }
+ len = end_marker.offset - head.max_head_size;
+ if (len > 0) {
+ auto ret = cls_cxx_write_zero(hctx, head.max_head_size, len);
if (ret < 0) {
+ CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
+ CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu\n", head.max_head_size);
return ret;
}
}
- else {
- CLS_LOG(1, "ERROR: No space left in queue\n");
- // return queue full error
- return -ENOSPC;
- }
- }
-
- if (op.bl_urgent_data.length() > 0) {
- head.bl_urgent_data = op.bl_urgent_data;
- }
-
- bufferlist bl_head;
- encode(head, bl_head);
- CLS_LOG(1, "INFO: cls_queue_update_last_entry: Writing head of size: %u \n", bl_head.length());
- ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
- if (ret < 0) {
- CLS_LOG(1, "INFO: cls_queue_update_last_entry: Writing head returned error: %d \n", ret);
- return ret;
- }
- return 0;
-}
-
-int cls_queue_read_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- //get head and its size
- uint64_t head_size = 0;
- cls_queue_head head;
- int ret = get_queue_head_and_size(hctx, head, head_size);
- if (ret < 0) {
- return ret;
- }
-
- CLS_LOG(1, "INFO: cls_queue_read_urgent_data: tail offset %lu\n", head.tail);
-
- cls_queue_urgent_data_ret op_ret;
-
- op_ret.bl_urgent_data = head.bl_urgent_data;
-
- encode(op_ret, *out);
-
- return 0;
-}
-
-int cls_queue_write_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- //get head and its size
- uint64_t head_size = 0;
- cls_queue_head head;
- int ret = get_queue_head_and_size(hctx, head, head_size);
- if (ret < 0) {
- return ret;
- }
-
- CLS_LOG(1, "INFO: cls_queue_write_urgent_data: tail offset %lu\n", head.tail);
-
- auto in_iter = in->cbegin();
-
- cls_queue_write_urgent_data_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_queue_write_urgent_data: failed to decode input data\n");
+ } else if ((head.front.offset == end_marker.offset) && (head.front.gen == end_marker.gen)) {
+ //no-op
+ } else {
+ CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu\n", end_marker.to_str().c_str(), end_marker.gen);
return -EINVAL;
}
- //Write urgent data
- head.bl_urgent_data = op.bl_urgent_data;
-
- //Write head back
- bufferlist bl_head;
- encode(head, bl_head);
- CLS_LOG(1, "INFO: cls_queue_write_urgent_data: Writing head of size: %u\n", bl_head.length());
- ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
- if (ret < 0) {
- CLS_LOG(1, "INFO: cls_queue_write_urgent_data: Writing head returned error: %d \n", ret);
- return ret;
- }
-
- return 0;
-}
-
-int cls_queue_can_urgent_data_fit(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- bool can_fit = true;
-
- //get head and its size
- uint64_t head_size = 0;
- cls_queue_head head;
- int ret = get_queue_head_and_size(hctx, head, head_size);
- if (ret < 0) {
- return ret;
- }
- head.bl_urgent_data = *in;
+ head.front = end_marker;
- bufferlist bl_head;
- encode(head, bl_head);
-
- if(bl_head.length() > head_size) {
- can_fit = false;
+ // Check if it is the end, then wrap around
+ if (head.front.offset == head.queue_size) {
+ head.front.offset = head.max_head_size;
+ head.front.gen += 1;
}
- encode(can_fit, *out);
+ CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s\n", head.front.to_str().c_str(), head.tail.to_str().c_str());
return 0;
}
-
--- /dev/null
+#ifndef CEPH_CLS_QUEUE_SRC_H
+#define CEPH_CLS_QUEUE_SRC_H
+
+int write_queue_head(cls_method_context_t hctx, cls_queue_head& head);
+int get_queue_head(cls_method_context_t hctx, cls_queue_head& head);
+int init_queue(cls_method_context_t hctx, const cls_queue_init_op& op);
+int get_queue_size(cls_method_context_t hctx, cls_queue_get_size_ret& op_ret);
+int enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head);
+int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head);
+int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head);
+
+#endif /* CEPH_CLS_QUEUE_SRC_H */
\ No newline at end of file
#include "common/ceph_json.h"
#include "include/utime.h"
-void cls_queue_head::dump(Formatter *f) const
-{
- f->dump_bool("is_empty", is_empty);
- f->dump_unsigned("front", front);
- f->dump_unsigned("tail", tail);
- f->dump_unsigned("size", queue_size);
- f->dump_unsigned("has_urgent_data", front);
-}
-
-void cls_queue_head::generate_test_instances(list<cls_queue_head*>& ls)
-{
- ls.push_back(new cls_queue_head);
- ls.push_back(new cls_queue_head);
-}
#ifndef CEPH_CLS_QUEUE_TYPES_H
#define CEPH_CLS_QUEUE_TYPES_H
+#include <errno.h>
#include "include/types.h"
#include "common/ceph_time.h"
#include "common/Formatter.h"
//Actual start offset of queue data
#define QUEUE_START_OFFSET_4K QUEUE_HEAD_SIZE_4K
+constexpr unsigned int QUEUE_HEAD_START = 0xDEAD;
+constexpr unsigned int QUEUE_ENTRY_START = 0xBEEF;
+
+struct cls_queue_marker
+{
+ uint64_t offset{0};
+ uint64_t gen{0};
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(gen, bl);
+ encode(offset, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(gen, bl);
+ decode(offset, bl);
+ DECODE_FINISH(bl);
+ }
+
+ string to_str() {
+ string marker = std::to_string(gen) + '/' + std::to_string(offset);
+ return marker;
+ }
+
+ int from_str(const char* str) {
+ errno = 0;
+ char* end = nullptr;
+ gen = ::strtoull(str, &end, 10);
+ if (errno) {
+ return errno;
+ }
+ if (str == end || *end != '/') { // expects delimiter
+ return -EINVAL;
+ }
+ str = end + 1;
+ offset = ::strtoull(str, &end, 10);
+ if (errno) {
+ return errno;
+ }
+ if (str == end || *end != 0) { // expects null terminator
+ return -EINVAL;
+ }
+ return 0;
+ }
+
+};
+WRITE_CLASS_ENCODER(cls_queue_marker)
+
struct cls_queue_head
{
- uint64_t front = QUEUE_START_OFFSET_1K;
- uint64_t tail = QUEUE_START_OFFSET_1K;
+ uint64_t max_head_size = QUEUE_HEAD_SIZE_1K;
+ cls_queue_marker front{QUEUE_START_OFFSET_1K, 0};
+ cls_queue_marker tail{QUEUE_START_OFFSET_1K, 0};
uint64_t queue_size{0}; // size of queue requested by user, with head size added to it
- uint64_t last_entry_offset = QUEUE_START_OFFSET_1K;
- bool is_empty{true};
- bool has_urgent_data{false};
+ uint64_t max_urgent_data_size{0};
bufferlist bl_urgent_data; // special data known to application using queue
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
+ encode(max_head_size, bl);
encode(front, bl);
encode(tail, bl);
encode(queue_size, bl);
- encode(last_entry_offset, bl);
- encode(is_empty, bl);
- encode(has_urgent_data, bl);
+ encode(max_urgent_data_size, bl);
encode(bl_urgent_data, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
+ decode(max_head_size, bl);
decode(front, bl);
decode(tail, bl);
decode(queue_size, bl);
- decode(last_entry_offset, bl);
- decode(is_empty, bl);
- decode(has_urgent_data, bl);
+ decode(max_urgent_data_size, bl);
decode(bl_urgent_data, bl);
DECODE_FINISH(bl);
}
-
- void dump(Formatter *f) const;
- static void generate_test_instances(list<cls_queue_head*>& o);
};
WRITE_CLASS_ENCODER(cls_queue_head)
#include "cls/queue/cls_queue_ops.h"
#include "cls/queue/cls_rgw_queue_ops.h"
#include "cls/queue/cls_queue_const.h"
-#include "cls/queue/cls_queue.h"
+#include "cls/queue/cls_queue_src.h"
#include <boost/lexical_cast.hpp>
#include <unordered_map>
+#include <sstream>
#include "common/ceph_context.h"
#include "global/global_context.h"
CLS_VER(1,0)
CLS_NAME(rgw_queue)
-static int cls_gc_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- auto in_iter = in->cbegin();
-
- cls_gc_create_queue_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_create_queue: failed to decode entry\n");
- return -EINVAL;
- }
-
- cls_gc_urgent_data urgent_data;
- urgent_data.num_urgent_data_entries = op.num_urgent_data_entries;
-
- cls_create_queue_op create_op;
-
- CLS_LOG(10, "INFO: cls_gc_create_queue: queue size is %lu\n", op.size);
- CLS_LOG(10, "INFO: cls_gc_create_queue: queue name is %s\n", op.name.c_str());
- create_op.head.queue_size = op.size;
- create_op.head_size = g_ceph_context->_conf->rgw_gc_queue_head_size;
- create_op.head.has_urgent_data = true;
- encode(urgent_data, create_op.head.bl_urgent_data);
-
- in->clear();
- encode(create_op, *in);
-
- return cls_create_queue(hctx, in, out);
-}
-
static int cls_gc_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
auto in_iter = in->cbegin();
cls_gc_urgent_data urgent_data;
urgent_data.num_urgent_data_entries = op.num_urgent_data_entries;
- cls_init_queue_op init_op;
+ cls_queue_init_op init_op;
CLS_LOG(10, "INFO: cls_gc_init_queue: queue size is %lu\n", op.size);
CLS_LOG(10, "INFO: cls_gc_init_queue: queue name is %s\n", op.name.c_str());
init_op.head.queue_size = op.size;
init_op.head_size = g_ceph_context->_conf->rgw_gc_queue_head_size;
- init_op.head.has_urgent_data = true;
+ init_op.has_urgent_data = true;
encode(urgent_data, init_op.head.bl_urgent_data);
- in->clear();
- encode(init_op, *in);
-
- return cls_init_queue(hctx, in, out);
+ return init_queue(hctx, init_op);
}
static int cls_gc_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
auto in_iter = in->cbegin();
-
cls_rgw_gc_set_entry_op op;
try {
decode(op, in_iter);
op.info.time = ceph::real_clock::now();
op.info.time += make_timespan(op.expiration_secs);
- cls_enqueue_op enqueue_op;
+ //get head
+ cls_queue_head head;
+ int ret = get_queue_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_queue_enqueue_op enqueue_op;
bufferlist bl_data;
encode(op.info, bl_data);
enqueue_op.bl_data_vec.emplace_back(bl_data);
CLS_LOG(1, "INFO: cls_gc_enqueue: Data size is: %u \n", bl_data.length());
- in->clear();
- encode(enqueue_op, *in);
+ ret = enqueue(hctx, enqueue_op, head);
+ if (ret < 0) {
+ return ret;
+ }
- return cls_enqueue(hctx, in, out);
+ //Write back head
+ return write_queue_head(hctx, head);
}
static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(1, "INFO: cls_gc_queue_list(): Entered cls_gc_queue_list \n");
auto in_iter = in->cbegin();
-
cls_rgw_gc_list_op op;
try {
decode(op, in_iter);
return -EINVAL;
}
- cls_queue_list_op list_op;
- if (op.marker.empty()) {
- list_op.start_offset = 0;
- } else {
- list_op.start_offset = boost::lexical_cast<uint64_t>(op.marker.c_str());
+ cls_queue_head head;
+ auto ret = get_queue_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_gc_urgent_data urgent_data;
+ if (head.bl_urgent_data.length() > 0) {
+ auto iter_urgent_data = head.bl_urgent_data.cbegin();
+ try {
+ decode(urgent_data, iter_urgent_data);
+ } catch (buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_gc_queue_list(): failed to decode urgent data\n");
+ return -EINVAL;
+ }
}
+ cls_queue_list_op list_op;
if (! op.max) {
op.max = GC_LIST_DEFAULT_MAX;
}
list_op.max = op.max;
+ list_op.start_marker = op.marker;
- cls_queue_list_ret op_ret;
cls_rgw_gc_list_ret list_ret;
uint32_t num_entries = 0; //Entries excluding the deferred ones
- bool urgent_data_decoded = false;
- cls_gc_urgent_data urgent_data;
+ bool is_truncated = true;
+ string next_marker;
do {
- in->clear();
- encode(list_op, *in);
-
- CLS_LOG(1, "INFO: cls_gc_queue_list(): Entering cls_queue_list_entries \n");
- int ret = cls_queue_list_entries(hctx, in, out);
+ CLS_LOG(1, "INFO: cls_gc_queue_list(): Entering queue_list_entries \n");
+ cls_queue_list_ret op_ret;
+ int ret = queue_list_entries(hctx, list_op, op_ret, head);
if (ret < 0) {
- CLS_LOG(1, "ERROR: cls_queue_list_entries(): returned error %d\n", ret);
+ CLS_LOG(1, "ERROR: queue_list_entries(): returned error %d\n", ret);
return ret;
}
-
- auto iter = out->cbegin();
- try {
- decode(op_ret, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode output\n");
- return -EINVAL;
- }
-
- //Each cls_queue_list_entries will fetch the same urgent data, decode it only once
- if (! urgent_data_decoded) {
- auto iter_urgent_data = op_ret.bl_urgent_data.cbegin();
- try {
- decode(urgent_data, iter_urgent_data);
- urgent_data_decoded = true;
- } catch (buffer::error& err) {
- CLS_LOG(5, "ERROR: cls_gc_queue_list(): failed to decode urgent data\n");
- return -EINVAL;
- }
- }
-
+ is_truncated = op_ret.is_truncated;
+ next_marker = op_ret.next_marker;
+
if (op_ret.data.size()) {
for (auto it : op_ret.data) {
cls_rgw_gc_obj_info info;
}
bool found = false;
//Check for info tag in urgent data map
- if (urgent_data.urgent_data_map.size() > 0) {
- auto it = urgent_data.urgent_data_map.find(info.tag);
- if (it != urgent_data.urgent_data_map.end()) {
- found = true;
- if (it->second > info.time) {
- CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in urgent data: %s\n", info.tag.c_str());
- continue;
- }
+ auto iter = urgent_data.urgent_data_map.find(info.tag);
+ if (iter != urgent_data.urgent_data_map.end()) {
+ found = true;
+ stringstream ss1, ss2;
+ ss1 << iter->second;
+ ss2 << info.time;
+ if (iter->second > info.time) {
+ CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in urgent data: %s\n", info.tag.c_str());
+ CLS_LOG(1, "INFO: cls_gc_queue_list(): time found in urgent data: %s\n", ss1.str().c_str());
+ CLS_LOG(1, "INFO: cls_gc_queue_list(): time found in queue data: %s\n", ss2.str().c_str());
+ continue;
}
}
//Search in xattrs
- if (! found) {
+ if (! found && urgent_data.num_xattr_urgent_entries > 0) {
bufferlist bl_xattrs;
int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode xattrs urgent data map\n");
return -EINVAL;
} //end - catch
- if (xattr_urgent_data_map.size() > 0) {
- auto it = xattr_urgent_data_map.find(info.tag);
- if (it != xattr_urgent_data_map.end()) {
- if (it->second > info.time) {
- CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
- continue;
- }
+ auto xattr_iter = xattr_urgent_data_map.find(info.tag);
+ if (xattr_iter != xattr_urgent_data_map.end()) {
+ if (xattr_iter->second > info.time) {
+ CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
+ continue;
}
- } // end - if xattrs size ...
+ }
} // end - ret != ENOENT && ENODATA
} // end - if not found
if (op.expired_only) {
CLS_LOG(1, "INFO: cls_gc_queue_list(): num_entries: %u and op.max: %u\n", num_entries, op.max);
if (num_entries < op.max) {
list_op.max = (op.max - num_entries);
- list_op.start_offset = op_ret.next_offset;
+ list_op.start_marker = op_ret.next_marker;
out->clear();
} else {
//We've reached the max number of entries needed
//We dont have data to process
break;
}
- } while(op_ret.is_truncated);
+ } while(is_truncated);
- list_ret.truncated = op_ret.is_truncated;
+ list_ret.truncated = is_truncated;
if (list_ret.truncated) {
- list_ret.next_marker = boost::lexical_cast<string>(op_ret.next_offset);
+ list_ret.next_marker = next_marker;
}
out->clear();
encode(list_ret, *out);
return -EINVAL;
}
- // List entries and calculate total number of entries (including invalid entries)
- cls_queue_list_op list_op;
- if (op.marker.empty()) {
- list_op.start_offset = 0;
- } else {
- list_op.start_offset = boost::lexical_cast<uint64_t>(op.marker.c_str());
+ cls_queue_head head;
+ auto ret = get_queue_head(hctx, head);
+ if (ret < 0) {
+ return ret;
}
+ cls_gc_urgent_data urgent_data;
+ if (head.bl_urgent_data.length() > 0) {
+ auto iter_urgent_data = head.bl_urgent_data.cbegin();
+ try {
+ decode(urgent_data, iter_urgent_data);
+ } catch (buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_gc_queue_remove(): failed to decode urgent data\n");
+ return -EINVAL;
+ }
+ }
+
+ // List entries and calculate total number of entries (including invalid entries)
if (! op.num_entries) {
op.num_entries = GC_LIST_DEFAULT_MAX;
}
-
- list_op.max = op.num_entries;
+ cls_queue_list_op list_op;
+ list_op.max = op.num_entries + 1; // +1 to get the offset of last + 1 entry
bool is_truncated = true;
uint32_t total_num_entries = 0, num_entries = 0;
- cls_gc_urgent_data urgent_data;
- bool urgent_data_decoded = false;
- uint64_t end_offset = 0;
+ string end_marker;
do {
- in->clear();
- encode(list_op, *in);
-
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering cls_queue_list_entries \n");
- int ret = cls_queue_list_entries(hctx, in, out);
+ CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering queue_list_entries \n");
+ cls_queue_list_ret op_ret;
+ int ret = queue_list_entries(hctx, list_op, op_ret, head);
if (ret < 0) {
- CLS_LOG(1, "ERROR: cls_queue_list_entries(): returned error %d\n", ret);
+ CLS_LOG(1, "ERROR: queue_list_entries(): returned error %d\n", ret);
return ret;
}
- cls_queue_list_ret op_ret;
- auto iter = out->cbegin();
- try {
- decode(op_ret, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode output\n");
- return -EINVAL;
- }
is_truncated = op_ret.is_truncated;
unsigned int index = 0;
- if (! urgent_data_decoded) {
- auto iter_urgent_data = op_ret.bl_urgent_data.cbegin();
- try {
- decode(urgent_data, iter_urgent_data);
- urgent_data_decoded = true;
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode urgent data\n");
- return -EINVAL;
- }
- }
// If data is not empty
if (op_ret.data.size()) {
for (auto it : op_ret.data) {
index++;
bool found = false;
//Search for tag in urgent data map
- if (urgent_data.urgent_data_map.size() > 0) {
- auto it = urgent_data.urgent_data_map.find(info.tag);
- if (it != urgent_data.urgent_data_map.end()) {
- found = true;
- if (it->second > info.time) {
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in urgent data: %s\n", info.tag.c_str());
- continue;
- } else if (it->second == info.time) {
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from urgent data: %s\n", info.tag.c_str());
- urgent_data.urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later from queue
- urgent_data.num_head_urgent_entries -= 1;
- }
- }//end-if map end
- }//end-if urgent data
- if (! found) {
+ auto iter = urgent_data.urgent_data_map.find(info.tag);
+ if (iter != urgent_data.urgent_data_map.end()) {
+ found = true;
+ if (iter->second > info.time) {
+ CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in urgent data: %s\n", info.tag.c_str());
+ continue;
+ } else if (iter->second == info.time) {
+ CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from urgent data: %s\n", info.tag.c_str());
+ urgent_data.urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later from queue
+ urgent_data.num_head_urgent_entries -= 1;
+ }
+ }//end-if map end
+ if (! found && urgent_data.num_xattr_urgent_entries > 0) {
//Search in xattrs
bufferlist bl_xattrs;
int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode xattrs urgent data map\n");
return -EINVAL;
} //end - catch
- if (xattr_urgent_data_map.size() > 0) {
- auto found = xattr_urgent_data_map.find(info.tag);
- if (found != xattr_urgent_data_map.end()) {
- if (found->second > info.time) {
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
- continue;
- } else if (found->second == info.time) {
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from xattrs urgent data: %s\n", info.tag.c_str());
- xattr_urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later
- urgent_data.num_xattr_urgent_entries -= 1;
- }
+ auto xattr_iter = xattr_urgent_data_map.find(info.tag);
+ if (xattr_iter != xattr_urgent_data_map.end()) {
+ if (xattr_iter->second > info.time) {
+ CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
+ continue;
+ } else if (xattr_iter->second == info.time) {
+ CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from xattrs urgent data: %s\n", info.tag.c_str());
+ xattr_urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later
+ urgent_data.num_xattr_urgent_entries -= 1;
}
- } // end - if xattrs size ...
+ }
} // end - ret != ENOENT && ENODATA
}// search in xattrs
num_entries++;
}//end-for
- if (num_entries < op.num_entries) {
- list_op.max = (op.num_entries - num_entries);
- list_op.start_offset = op_ret.next_offset;
+ if (! op_ret.is_truncated && num_entries < (op.num_entries + 1)) {
+ end_marker = op_ret.next_marker;
+ CLS_LOG(1, "INFO: cls_gc_queue_remove(): truncated and end offset is %s\n", end_marker.c_str());
+ break;
+ }
+ if (num_entries < (op.num_entries + 1)) {
+ list_op.max = ((op.num_entries + 1) - num_entries);
+ list_op.start_marker = op_ret.next_marker;
out->clear();
} else {
- end_offset = op_ret.offsets[index - 1];
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): index is %u and end_offset is: %lu\n", index, end_offset);
+ end_marker = op_ret.markers[index - 1];
+ CLS_LOG(1, "INFO: cls_gc_queue_remove(): index is %u and end_offset is: %s\n", index, end_marker.c_str());
break;
}
} //end-if
break;
}
} while(is_truncated);
+
CLS_LOG(1, "INFO: cls_gc_queue_remove(): Total number of entries to remove: %d\n", total_num_entries);
+ CLS_LOG(1, "INFO: cls_gc_queue_remove(): End offset is %s\n", end_marker.c_str());
- if (end_offset != 0) {
+ if (! end_marker.empty()) {
cls_queue_remove_op rem_op;
- if (op.marker.empty()) {
- rem_op.start_offset = 0;
- } else {
- rem_op.start_offset = boost::lexical_cast<uint64_t>(op.marker.c_str());
- }
-
- rem_op.end_offset = end_offset;
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): start offset: %lu and end offset: %lu\n", rem_op.start_offset, rem_op.end_offset);
+ rem_op.end_marker = end_marker;
- encode(urgent_data, rem_op.bl_urgent_data);
-
- in->clear();
- encode(rem_op, *in);
-
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering cls_queue_remove_entries \n");
- int ret = cls_queue_remove_entries(hctx, in, out);
+ CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering queue_remove_entries \n");
+ int ret = queue_remove_entries(hctx, rem_op, head);
if (ret < 0) {
- CLS_LOG(1, "ERROR: cls_queue_remove_entries(): returned error %d\n", ret);
+ CLS_LOG(1, "ERROR: queue_remove_entries(): returned error %d\n", ret);
return ret;
}
}
- return 0;
+ //Update urgent data map
+ encode(urgent_data, head.bl_urgent_data);
+
+ return write_queue_head(hctx, head);
}
static int cls_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
op.info.time = ceph::real_clock::now();
op.info.time += make_timespan(op.expiration_secs);
- //Read urgent data
- in->clear();
- out->clear();
-
- ret = cls_queue_read_urgent_data(hctx, in, out);
-
- auto out_iter = out->cbegin();
-
- cls_queue_urgent_data_ret op_ret;
- try {
- decode(op_ret, out_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_queue_urgent_data_ret(): failed to decode ouput\n");
- return -EINVAL;
- }
-
- auto bl_iter = op_ret.bl_urgent_data.cbegin();
- cls_gc_urgent_data urgent_data;
- try {
- decode(urgent_data, bl_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_queue_urgent_data_ret(): failed to decode urgent data\n");
- return -EINVAL;
- }
-
- bool is_last_entry = false;
- in->clear();
- out->clear();
- ret = cls_queue_get_last_entry(hctx, in, out);
+ // Read head
+ cls_queue_head head;
+ ret = get_queue_head(hctx, head);
if (ret < 0) {
return ret;
}
- cls_rgw_gc_obj_info info;
- auto iter = out->cbegin();
+ auto bl_iter = head.bl_urgent_data.cbegin();
+ cls_gc_urgent_data urgent_data;
try {
- decode(info, iter);
+ decode(urgent_data, bl_iter);
} catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode entry\n");
+ CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode urgent data\n");
return -EINVAL;
}
- CLS_LOG(1, "INFO: tag of gc info is %s\n", info.tag.c_str());
- if (info.tag == op.info.tag) {
- is_last_entry = true;
- }
-
//has_urgent_data signifies whether urgent data in queue has changed
bool has_urgent_data = false, tag_found = false;
//search in unordered map in head
try {
decode(xattr_urgent_data_map, iter);
} catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode xattrs urgent data map\n");
+ CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode xattrs urgent data map\n");
return -EINVAL;
} //end - catch
- if (xattr_urgent_data_map.size() > 0) {
- auto found = xattr_urgent_data_map.find(info.tag);
- if (found != xattr_urgent_data_map.end()) {
- it->second = op.info.time;
- tag_found = true;
- //write the updated map back
- bufferlist bl_map;
- encode(xattr_urgent_data_map, bl_map);
- ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map);
- CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data");
- if (ret < 0) {
- CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret);
- return ret;
- }
+ auto xattr_iter = xattr_urgent_data_map.find(op.info.tag);
+ if (xattr_iter != xattr_urgent_data_map.end()) {
+ it->second = op.info.time;
+ tag_found = true;
+ //write the updated map back
+ bufferlist bl_map;
+ encode(xattr_urgent_data_map, bl_map);
+ ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map);
+ CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data");
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret);
+ return ret;
}
- } // end - if xattrs size ...
+ }
}// end ret != ENOENT ...
}
- bufferlist bl_urgent_data;
if (! tag_found) {
+ stringstream ss1;
+ ss1 << op.info.time;
+ CLS_LOG(1, "INFO: cls_gc_queue_update_entry(): time inserted in urgent data: %s\n", ss1.str().c_str());
//try inserting in queue head
urgent_data.urgent_data_map.insert({op.info.tag, op.info.time});
urgent_data.num_head_urgent_entries += 1;
has_urgent_data = true;
- //check if urgent data can fit in head
- out->clear();
- bool can_fit = false;
+ bufferlist bl_urgent_data;
encode(urgent_data, bl_urgent_data);
- ret = cls_queue_can_urgent_data_fit(hctx, &bl_urgent_data, out);
- if (ret < 0) {
- return ret;
- }
- iter = out->cbegin();
- decode(can_fit, iter);
- CLS_LOG(1, "INFO: Can urgent data fit: %d \n", can_fit);
-
//insert as xattrs
- if (! can_fit) {
+ if (bl_urgent_data.length() > head.max_urgent_data_size) {
//remove inserted entry from urgent data
urgent_data.urgent_data_map.erase(op.info.tag);
urgent_data.num_head_urgent_entries -= 1;
return -ENOSPC;
}
- bl_urgent_data.clear();
- encode(urgent_data, bl_urgent_data);
- in->clear();
- if (! is_last_entry) {
- cls_enqueue_op enqueue_op;
- bufferlist bl_data;
- encode(op.info, bl_data);
- enqueue_op.bl_data_vec.emplace_back(bl_data);
- CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data.length());
- if (has_urgent_data) {
- enqueue_op.bl_urgent_data = bl_urgent_data;
- }
- encode(enqueue_op, *in);
- ret = cls_enqueue(hctx, in, out);
- if (ret < 0) {
- return ret;
- }
- } else {
- cls_queue_update_last_entry_op update_op;
- encode(op.info, update_op.bl_data);
- CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", update_op.bl_data.length());
- if (has_urgent_data) {
- update_op.bl_urgent_data = bl_urgent_data;
- }
- encode(update_op, *in);
- ret = cls_queue_update_last_entry(hctx, in, out);
- if (ret < 0) {
- return ret;
- }
+ cls_queue_enqueue_op enqueue_op;
+ bufferlist bl_data;
+ stringstream ss1;
+ ss1 << op.info.time;
+ CLS_LOG(1, "INFO: cls_gc_queue_update_entry(): time inserted in queue: %s\n", ss1.str().c_str());
+ encode(op.info, bl_data);
+ enqueue_op.bl_data_vec.emplace_back(bl_data);
+ CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data.length());
+
+ ret = enqueue(hctx, enqueue_op, head);
+ if (ret < 0) {
+ return ret;
}
- return 0;
+ if (has_urgent_data) {
+ head.bl_urgent_data.clear();
+ encode(urgent_data, head.bl_urgent_data);
+ }
+
+ return write_queue_head(hctx, head);
}
CLS_INIT(rgw_queue)
CLS_LOG(1, "Loaded rgw queue class!");
cls_handle_t h_class;
- cls_method_handle_t h_gc_create_queue;
cls_method_handle_t h_gc_init_queue;
cls_method_handle_t h_gc_enqueue;
cls_method_handle_t h_gc_queue_list_entries;
cls_register(RGW_QUEUE_CLASS, &h_class);
/* gc */
- cls_register_cxx_method(h_class, GC_CREATE_QUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_create_queue, &h_gc_create_queue);
cls_register_cxx_method(h_class, GC_INIT_QUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_init_queue, &h_gc_init_queue);
cls_register_cxx_method(h_class, GC_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_enqueue, &h_gc_enqueue);
cls_register_cxx_method(h_class, GC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_gc_queue_list, &h_gc_queue_list_entries);
#include "cls/rgw/cls_rgw_types.h"
#include "cls/rgw/cls_rgw_ops.h"
-struct cls_gc_create_queue_op {
+struct cls_gc_init_queue_op {
uint64_t size;
uint64_t num_urgent_data_entries{0};
string name; //for debugging, to be removed later
- cls_gc_create_queue_op() {}
+ cls_gc_init_queue_op() {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
decode(name, bl);
DECODE_FINISH(bl);
}
-};
-WRITE_CLASS_ENCODER(cls_gc_create_queue_op)
-
-struct cls_gc_init_queue_op : cls_gc_create_queue_op {
};
WRITE_CLASS_ENCODER(cls_gc_init_queue_op)
struct cls_rgw_gc_queue_remove_op {
uint64_t num_entries;
- string marker;
cls_rgw_gc_queue_remove_op() {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
encode(num_entries, bl);
- encode(marker, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
decode(num_entries, bl);
- decode(marker, bl);
DECODE_FINISH(bl);
}
};
for (int i = 0; i < max_objs; i++) {
ldpp_dout(this, 20) << "RGWGC::initialize initing gc queue with name = " << obj_names[i] << dendl;
librados::ObjectWriteOperation op;
- op.assert_exists();
+ op.create(true);
uint64_t queue_size = 1048576, num_urgent_data_entries = 50;
cls_rgw_gc_init_queue(op, obj_names[i], queue_size, num_urgent_data_entries);
- int ret = store->gc_operate(obj_names[i], &op);
- if (ret == -ENOENT) {
- ldpp_dout(this, 20) << "RGWGC::initialize creating gc queue with name = " << obj_names[i] << dendl;
- cls_rgw_gc_create_queue(op, obj_names[i], queue_size, num_urgent_data_entries);
- store->gc_operate(obj_names[i], &op);
- }
+ store->gc_operate(obj_names[i], &op);
}
}
return store->gc_aio_operate(obj_names[index], &op, pc);
}
-int RGWGC::remove(int index, string& marker, int num_entries, librados::AioCompletion **pc)
+int RGWGC::remove(int index, int num_entries, librados::AioCompletion **pc)
{
ObjectWriteOperation op;
- cls_rgw_gc_remove_entries_queue(op, marker, num_entries);
+ cls_rgw_gc_remove_entries_queue(op, num_entries);
return store->gc_aio_operate(obj_names[index], &op, pc);
}
}
}
- int remove_queue_entries(int index, string& marker, int num_entries) {
+ int remove_queue_entries(int index, int num_entries) {
IO index_io;
index_io.type = IO::IndexIO;
index_io.index = index;
- int ret = gc->remove(index, marker, num_entries, &index_io.c);
+ int ret = gc->remove(index, num_entries, &index_io.c);
if (ret < 0) {
ldpp_dout(dpp, 0) << "WARNING: failed to remove queue entries on index=" <<
index << " ret=" << ret << dendl;
if (entries.size() > 0) {
//Remove the entries from the queue
ldpp_dout(this, 5) << "RGWGC::process removing entries, marker: " << marker << dendl;
- ret = io_manager.remove_queue_entries(index, marker, entries.size());
+ ret = io_manager.remove_queue_entries(index, entries.size());
if (ret < 0) {
ldpp_dout(this, 0) <<
"WARNING: failed to remove queue entries" << dendl;
int send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync);
int defer_chain(const string& tag, cls_rgw_obj_chain& info, bool sync);
int remove(int index, const std::vector<string>& tags, librados::AioCompletion **pc);
- int remove(int index, string& marker, int num_entries, librados::AioCompletion **pc);
+ int remove(int index, int num_entries, librados::AioCompletion **pc);
void initialize(CephContext *_cct, RGWRados *_store);
void finalize();
obj.loc.append(buf);
}
-static bool cmp_objs(cls_rgw_obj& obj1, cls_rgw_obj& obj2)
-{
- return (obj1.pool == obj2.pool) &&
- (obj1.key == obj2.key) &&
- (obj1.loc == obj2.loc);
-}
-
TEST(cls_queue, gc_queue_ops1)
{
//Testing queue ops when data size is NOT a multiple of queue size
string queue_name = "my-queue";
- uint64_t queue_size = 320, num_urgent_data_entries = 10;
+ uint64_t queue_size = 322, num_urgent_data_entries = 10;
librados::ObjectWriteOperation op;
- cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+ op.create(true);
+ cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
uint64_t size = 0;
librados::ObjectWriteOperation remove_op;
string marker1;
uint64_t num_entries = 1;
- cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries);
+ cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
//Test enqueue again
{
//Testing list queue
string queue_name = "my-second-queue";
- uint64_t queue_size = 330, num_urgent_data_entries = 10;
+ uint64_t queue_size = 334, num_urgent_data_entries = 10;
librados::ObjectWriteOperation op;
- cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+ op.create(true);
+ cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
uint64_t size = 0;
{
//Testing remove queue entries
string queue_name = "my-third-queue";
- uint64_t queue_size = 495, num_urgent_data_entries = 10;
+ uint64_t queue_size = 501, num_urgent_data_entries = 10;
librados::ObjectWriteOperation op;
- cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+ op.create(true);
+ cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
uint64_t size = 0;
librados::ObjectWriteOperation remove_op;
string marker1;
uint64_t num_entries = 2;
- cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries);
- ASSERT_EQ(-ENOENT, ioctx.operate(queue_name, &remove_op));
+ cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
cls_rgw_gc_obj_info defer_info;
//Test remove entries
num_entries = 2;
- cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries);
+ cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
//Test list queue again
{
//Testing remove queue entries
string queue_name = "my-fourth-queue";
- uint64_t queue_size = 495, num_urgent_data_entries = 10;
+ uint64_t queue_size = 501, num_urgent_data_entries = 10;
librados::ObjectWriteOperation op;
- cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+ op.create(true);
+ cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
uint64_t size = 0;
string marker1;
uint64_t num_entries = 2;
- cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries);
- ASSERT_EQ(-ENOENT, ioctx.operate(queue_name, &remove_op));
+ cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
cls_rgw_gc_obj_info defer_info;
//Test remove entries
num_entries = 2;
- cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries);
+ cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
//Test list queue again
{
//Testing remove queue entries
string queue_name = "my-fifth-queue";
- uint64_t queue_size = 495, num_urgent_data_entries = 10;
+ uint64_t queue_size = 501, num_urgent_data_entries = 10;
librados::ObjectWriteOperation op;
- cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+ op.create(true);
+ cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
uint64_t size = 0;
//Test remove entries
librados::ObjectWriteOperation remove_op;
auto num_entries = list_info1.size();
- cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries);
+ cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
//Test list queue again for all entries
{
//Testing list queue, when data size is split at the end of the queue
string queue_name = "my-sixth-queue";
- uint64_t queue_size = 337, num_urgent_data_entries = 10;
+ uint64_t queue_size = 341, num_urgent_data_entries = 10;
librados::ObjectWriteOperation op;
- cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+ op.create(true);
+ cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
uint64_t size = 0;
string marker1;
uint64_t num_entries = 1;
- cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries);
+ cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
//Enqueue one more element
{
//Testing list queue, when data size is written at the end of queue and data is written after wrap around
string queue_name = "my-seventh-queue";
- uint64_t queue_size = 338, num_urgent_data_entries = 10;
+ uint64_t queue_size = 342, num_urgent_data_entries = 10;
librados::ObjectWriteOperation op;
- cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+ op.create(true);
+ cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
uint64_t size = 0;
string marker1;
uint64_t num_entries = 1;
- cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries);
+ cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
//Enqueue one more element
{
//Testing list queue, when data is split at the end of the queue
string queue_name = "my-eighth-queue";
- uint64_t queue_size = 340, num_urgent_data_entries = 10;
+ uint64_t queue_size = 344, num_urgent_data_entries = 10;
librados::ObjectWriteOperation op;
- cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+ op.create(true);
+ cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
uint64_t size = 0;
string marker1;
uint64_t num_entries = 1;
- cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries);
+ cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
//Enqueue one more element
{
//Testing remove queue entries
string queue_name = "my-ninth-queue";
- uint64_t queue_size = 660, num_urgent_data_entries = 1;
+ uint64_t queue_size = 668, num_urgent_data_entries = 1;
librados::ObjectWriteOperation op;
- cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries);
+ op.create(true);
+ cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
uint64_t size = 0;