From 491be9ab38dc3a428755010d733fb207aa4be613 Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Thu, 11 Jul 2019 20:54:11 +0530 Subject: [PATCH] Restructuring code. Signed-off-by: Pritha Srivastava --- src/cls/queue/cls_queue.cc | 119 +++- src/cls/queue/cls_queue.h | 17 - src/cls/queue/cls_queue_client.cc | 25 +- src/cls/queue/cls_queue_client.h | 4 +- src/cls/queue/cls_queue_const.h | 5 - src/cls/queue/cls_queue_ops.cc | 11 - src/cls/queue/cls_queue_ops.h | 119 +--- src/cls/queue/cls_queue_src.cc | 900 ++++++++------------------- src/cls/queue/cls_queue_src.h | 12 + src/cls/queue/cls_queue_types.cc | 14 - src/cls/queue/cls_queue_types.h | 74 ++- src/cls/queue/cls_rgw_queue.cc | 461 ++++++-------- src/cls/queue/cls_rgw_queue_ops.h | 11 +- src/rgw/rgw_gc.cc | 19 +- src/rgw/rgw_gc.h | 2 +- src/test/cls_queue/test_cls_queue.cc | 74 +-- 16 files changed, 684 insertions(+), 1183 deletions(-) delete mode 100644 src/cls/queue/cls_queue.h create mode 100644 src/cls/queue/cls_queue_src.h diff --git a/src/cls/queue/cls_queue.cc b/src/cls/queue/cls_queue.cc index fd4082ad80f..f2ba7d754ad 100644 --- a/src/cls/queue/cls_queue.cc +++ b/src/cls/queue/cls_queue.cc @@ -9,42 +9,133 @@ #include "cls/queue/cls_queue_types.h" #include "cls/queue/cls_queue_ops.h" #include "cls/queue/cls_queue_const.h" -#include "cls/queue/cls_queue.h" +#include "cls/queue/cls_queue_src.h" CLS_VER(1,0) CLS_NAME(queue) +static int cls_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + cls_queue_init_op op; + op.has_urgent_data = false; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_init_op(): failed to decode entry\n"); + return -EINVAL; + } + + return init_queue(hctx, op); +} + +static int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + cls_queue_get_size_ret op_ret; + auto ret = get_queue_size(hctx, op_ret); + if (ret < 0) { + return ret; + } + + encode(op_ret, *out); + return 0; +} + +static int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto iter = in->cbegin(); + cls_queue_enqueue_op op; + try { + decode(op, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_enqueue: failed to decode input data \n"); + return -EINVAL; + } + + cls_queue_head head; + auto ret = get_queue_head(hctx, head); + if (ret < 0) { + return ret; + } + + ret = enqueue(hctx, op, head); + if (ret < 0) { + return ret; + } + + //Write back head + return write_queue_head(hctx, head); +} + +static int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + cls_queue_list_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_list_entries(): failed to decode input data\n"); + return -EINVAL; + } + + cls_queue_head head; + auto ret = get_queue_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_queue_list_ret op_ret; + ret = queue_list_entries(hctx, op, op_ret, head); + if (ret < 0) { + return ret; + } + + encode(op_ret, *out); + return 0; +} + +static int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + cls_queue_remove_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode input data\n"); + return -EINVAL; + } + + cls_queue_head head; + auto ret = get_queue_head(hctx, head); + if (ret < 0) { + return ret; + } + ret = queue_remove_entries(hctx, op, head); + if (ret < 0) { + return ret; + } + return write_queue_head(hctx, head); +} + CLS_INIT(queue) { CLS_LOG(1, "Loaded queue class!"); cls_handle_t h_class; - cls_method_handle_t h_create_queue; cls_method_handle_t h_init_queue; cls_method_handle_t h_get_queue_size; cls_method_handle_t h_enqueue; cls_method_handle_t h_queue_list_entries; cls_method_handle_t h_queue_remove_entries; - cls_method_handle_t h_queue_update_last_entry; - cls_method_handle_t h_queue_get_last_entry; - cls_method_handle_t h_queue_read_urgent_data; - cls_method_handle_t h_queue_write_urgent_data; - cls_method_handle_t h_queue_can_urgent_data_fit; cls_register(QUEUE_CLASS, &h_class); /* queue*/ - cls_register_cxx_method(h_class, CREATE_QUEUE, CLS_METHOD_WR, cls_create_queue, &h_create_queue); cls_register_cxx_method(h_class, INIT_QUEUE, CLS_METHOD_WR, cls_init_queue, &h_init_queue); cls_register_cxx_method(h_class, GET_QUEUE_SIZE, CLS_METHOD_RD, cls_get_queue_size, &h_get_queue_size); cls_register_cxx_method(h_class, ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_enqueue, &h_enqueue); - cls_register_cxx_method(h_class, QUEUE_LIST_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_list_entries, &h_queue_list_entries); + cls_register_cxx_method(h_class, QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_queue_list_entries, &h_queue_list_entries); cls_register_cxx_method(h_class, QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_remove_entries, &h_queue_remove_entries); - cls_register_cxx_method(h_class, QUEUE_UPDATE_LAST_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_update_last_entry, &h_queue_update_last_entry); - cls_register_cxx_method(h_class, QUEUE_GET_LAST_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_get_last_entry, &h_queue_get_last_entry); - cls_register_cxx_method(h_class, QUEUE_READ_URGENT_DATA, CLS_METHOD_RD, cls_queue_read_urgent_data, &h_queue_read_urgent_data); - cls_register_cxx_method(h_class, QUEUE_WRITE_URGENT_DATA, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_write_urgent_data, &h_queue_write_urgent_data); - cls_register_cxx_method(h_class, QUEUE_CAN_URGENT_DATA_FIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_can_urgent_data_fit, &h_queue_can_urgent_data_fit); return; } diff --git a/src/cls/queue/cls_queue.h b/src/cls/queue/cls_queue.h deleted file mode 100644 index 1f070570b40..00000000000 --- a/src/cls/queue/cls_queue.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef CEPH_CLS_QUEUE_H -#define CEPH_CLS_QUEUE_H - -int cls_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out); -int cls_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out); -int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out); -int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out); -int cls_dequeue(cls_method_context_t hctx, bufferlist *in, bufferlist *out); -int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out); -int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out); -int cls_queue_get_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out); -int cls_queue_update_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out); -int cls_queue_read_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out); -int cls_queue_write_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out); -int cls_queue_can_urgent_data_fit(cls_method_context_t hctx, bufferlist *in, bufferlist *out); - -#endif /* CEPH_CLS_QUEUE_H */ \ No newline at end of file diff --git a/src/cls/queue/cls_queue_client.cc b/src/cls/queue/cls_queue_client.cc index c610c71a912..ea6116030fd 100644 --- a/src/cls/queue/cls_queue_client.cc +++ b/src/cls/queue/cls_queue_client.cc @@ -4,6 +4,7 @@ #include "cls/rgw/cls_rgw_ops.h" #include "cls/queue/cls_rgw_queue_ops.h" +#include "cls/queue/cls_queue_ops.h" #include "cls/queue/cls_queue_const.h" #include "cls/queue/cls_queue_client.h" @@ -11,21 +12,10 @@ using namespace librados; -void cls_rgw_gc_create_queue(ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries) -{ - bufferlist in; - cls_gc_create_queue_op call; - call.name = queue_name; - call.size = size; - call.num_urgent_data_entries = num_urgent_data_entries; - encode(call, in); - op.exec(RGW_QUEUE_CLASS, GC_CREATE_QUEUE, in); -} - void cls_rgw_gc_init_queue(ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries) { bufferlist in; - cls_gc_create_queue_op call; + cls_gc_init_queue_op call; call.name = queue_name; call.size = size; call.num_urgent_data_entries = num_urgent_data_entries; @@ -40,13 +30,16 @@ int cls_rgw_gc_get_queue_size(IoCtx& io_ctx, string& oid, uint64_t& size) if (r < 0) return r; + cls_queue_get_size_ret op_ret; auto iter = out.cbegin(); try { - decode(size, iter); + decode(op_ret, iter); } catch (buffer::error& err) { return -EIO; } + size = op_ret.queue_size; + return 0; } @@ -84,19 +77,17 @@ int cls_rgw_gc_list_queue(IoCtx& io_ctx, string& oid, string& marker, uint32_t m entries.swap(ret.entries); - if (truncated) - *truncated = ret.truncated; + *truncated = ret.truncated; next_marker = std::move(ret.next_marker); return 0; } -void cls_rgw_gc_remove_entries_queue(ObjectWriteOperation& op, string& marker, uint32_t num_entries) +void cls_rgw_gc_remove_entries_queue(ObjectWriteOperation& op, uint32_t num_entries) { bufferlist in, out; cls_rgw_gc_queue_remove_op rem_op; - rem_op.marker = marker; rem_op.num_entries = num_entries; encode(rem_op, in); op.exec(RGW_QUEUE_CLASS, GC_QUEUE_REMOVE_ENTRIES, in); diff --git a/src/cls/queue/cls_queue_client.h b/src/cls/queue/cls_queue_client.h index 09809d3699c..2650a0ad5c3 100644 --- a/src/cls/queue/cls_queue_client.h +++ b/src/cls/queue/cls_queue_client.h @@ -10,14 +10,12 @@ #include "common/ceph_time.h" #include "common/Cond.h" -void cls_rgw_gc_create_queue(librados::ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries); void cls_rgw_gc_init_queue(librados::ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries); int cls_rgw_gc_get_queue_size(librados::IoCtx& io_ctx, string& oid, uint64_t& size); void cls_rgw_gc_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info); -int cls_rgw_gc_dequeue(librados::IoCtx& io_ctx, string& oid, cls_rgw_gc_obj_info& info); int cls_rgw_gc_list_queue(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only, list& entries, bool *truncated, string& next_marker); -void cls_rgw_gc_remove_entries_queue(librados::ObjectWriteOperation& op, string& marker, uint32_t num_entries); +void cls_rgw_gc_remove_entries_queue(librados::ObjectWriteOperation& op, uint32_t num_entries); void cls_rgw_gc_defer_entry_queue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info); #endif \ No newline at end of file diff --git a/src/cls/queue/cls_queue_const.h b/src/cls/queue/cls_queue_const.h index 0e45739d1c0..3b61f00e0ba 100644 --- a/src/cls/queue/cls_queue_const.h +++ b/src/cls/queue/cls_queue_const.h @@ -4,20 +4,15 @@ #define QUEUE_CLASS "queue" #define RGW_QUEUE_CLASS "rgw_queue" -#define CREATE_QUEUE "create_queue" #define INIT_QUEUE "init_queue" #define GET_QUEUE_SIZE "get_queue_size" #define ENQUEUE "enqueue" -#define DEQUEUE "dequeue" #define QUEUE_LIST_ENTRIES "queue_list_entries" #define QUEUE_REMOVE_ENTRIES "queue_remove_entries" -#define QUEUE_UPDATE_LAST_ENTRY "queue_update_last_entry" -#define QUEUE_GET_LAST_ENTRY "queue_get_last_entry" #define QUEUE_READ_URGENT_DATA "queue_read_urgent_data" #define QUEUE_WRITE_URGENT_DATA "queue_write_urgent_data" #define QUEUE_CAN_URGENT_DATA_FIT "queue_can_urgent_data_fit" -#define GC_CREATE_QUEUE "gc_create_queue" #define GC_INIT_QUEUE "gc_init_queue" #define GC_ENQUEUE "gc_enqueue" #define GC_DEQUEUE "gc_dequeue" diff --git a/src/cls/queue/cls_queue_ops.cc b/src/cls/queue/cls_queue_ops.cc index 52792c9b70d..4e35090f6e9 100644 --- a/src/cls/queue/cls_queue_ops.cc +++ b/src/cls/queue/cls_queue_ops.cc @@ -5,14 +5,3 @@ #include "common/Formatter.h" #include "common/ceph_json.h" #include "include/utime.h" - -void cls_create_queue_op::dump(Formatter *f) const -{ - head.dump(f); -} - -void cls_create_queue_op::generate_test_instances(list& ls) -{ - ls.push_back(new cls_create_queue_op); - ls.push_back(new cls_create_queue_op); -} \ No newline at end of file diff --git a/src/cls/queue/cls_queue_ops.h b/src/cls/queue/cls_queue_ops.h index d2c90a179eb..c7a8e709699 100644 --- a/src/cls/queue/cls_queue_ops.h +++ b/src/cls/queue/cls_queue_ops.h @@ -3,16 +3,18 @@ #include "cls/queue/cls_queue_types.h" -struct cls_create_queue_op { +struct cls_queue_init_op { cls_queue_head head; uint64_t head_size{0}; + bool has_urgent_data{false}; - cls_create_queue_op() {} + cls_queue_init_op() {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(head, bl); encode(head_size, bl); + encode(has_urgent_data, bl); ENCODE_FINISH(bl); } @@ -20,61 +22,49 @@ struct cls_create_queue_op { DECODE_START(1, bl); decode(head, bl); decode(head_size, bl); + decode(has_urgent_data, bl); DECODE_FINISH(bl); } - void dump(Formatter *f) const; - static void generate_test_instances(list& ls); }; -WRITE_CLASS_ENCODER(cls_create_queue_op) +WRITE_CLASS_ENCODER(cls_queue_init_op) -struct cls_enqueue_op { +struct cls_queue_enqueue_op { vector bl_data_vec; - bufferlist bl_urgent_data; - cls_enqueue_op() {} + cls_queue_enqueue_op() {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(bl_data_vec, bl); - encode(bl_urgent_data, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); decode(bl_data_vec, bl); - decode(bl_urgent_data, bl); DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_enqueue_op) - -struct cls_dequeue_op { - bufferlist bl; - - cls_dequeue_op() {} - - static void generate_test_instances(list& ls); -}; +WRITE_CLASS_ENCODER(cls_queue_enqueue_op) struct cls_queue_list_op { uint64_t max; - uint64_t start_offset; + string start_marker; cls_queue_list_op() {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(max, bl); - encode(start_offset, bl); + encode(start_marker, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); decode(max, bl); - decode(start_offset, bl); + decode(start_marker, bl); DECODE_FINISH(bl); } }; @@ -82,123 +72,68 @@ WRITE_CLASS_ENCODER(cls_queue_list_op) struct cls_queue_list_ret { bool is_truncated; - uint64_t next_offset; + string next_marker; vector data; - vector offsets; - bufferlist bl_urgent_data; + vector markers; cls_queue_list_ret() {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(is_truncated, bl); - encode(next_offset, bl); + encode(next_marker, bl); encode(data, bl); - encode(offsets, bl); - encode(bl_urgent_data, bl); + encode(markers, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); decode(is_truncated, bl); - decode(next_offset, bl); + decode(next_marker, bl); decode(data, bl); - decode(offsets, bl); - decode(bl_urgent_data, bl); + decode(markers, bl); DECODE_FINISH(bl); } }; WRITE_CLASS_ENCODER(cls_queue_list_ret) struct cls_queue_remove_op { - uint64_t start_offset; - uint64_t end_offset; - bufferlist bl_urgent_data; + string end_marker; cls_queue_remove_op() {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); - encode(start_offset, bl); - encode(end_offset, bl); - encode(bl_urgent_data, bl); + encode(end_marker, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); - decode(start_offset, bl); - decode(end_offset, bl); - decode(bl_urgent_data, bl); + decode(end_marker, bl); DECODE_FINISH(bl); } }; WRITE_CLASS_ENCODER(cls_queue_remove_op) -struct cls_queue_urgent_data_ret { - bufferlist bl_urgent_data; - - cls_queue_urgent_data_ret() {} - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(bl_urgent_data, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(bl_urgent_data, bl); - DECODE_FINISH(bl); - } -}; -WRITE_CLASS_ENCODER(cls_queue_urgent_data_ret) - -struct cls_queue_write_urgent_data_op { - bufferlist bl_urgent_data; +struct cls_queue_get_size_ret { + uint64_t queue_size; - cls_queue_write_urgent_data_op() {} + cls_queue_get_size_ret() {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); - encode(bl_urgent_data, bl); + encode(queue_size, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); - decode(bl_urgent_data, bl); + decode(queue_size, bl); DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_queue_write_urgent_data_op) - -struct cls_queue_update_last_entry_op { - bufferlist bl_data; - bufferlist bl_urgent_data; - - cls_queue_update_last_entry_op() {} - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(bl_data, bl); - encode(bl_urgent_data, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(bl_data, bl); - decode(bl_urgent_data, bl); - DECODE_FINISH(bl); - } -}; -WRITE_CLASS_ENCODER(cls_queue_update_last_entry_op) - -struct cls_init_queue_op : cls_create_queue_op{ - -}; -WRITE_CLASS_ENCODER(cls_init_queue_op) +WRITE_CLASS_ENCODER(cls_queue_get_size_ret) #endif /* CEPH_CLS_QUEUE_OPS_H */ \ No newline at end of file diff --git a/src/cls/queue/cls_queue_src.cc b/src/cls/queue/cls_queue_src.cc index bf396971bc9..52aae9b57f5 100644 --- a/src/cls/queue/cls_queue_src.cc +++ b/src/cls/queue/cls_queue_src.cc @@ -9,353 +9,253 @@ #include "cls/queue/cls_queue_types.h" #include "cls/queue/cls_queue_ops.h" #include "cls/queue/cls_queue_const.h" -#include "cls/queue/cls_queue.h" +#include "cls/queue/cls_queue_src.h" -static int get_queue_head_and_size(cls_method_context_t hctx, cls_queue_head& head, uint64_t& head_size) +int write_queue_head(cls_method_context_t hctx, cls_queue_head& head) { - //read head size - bufferlist bl_head_size; - int ret = cls_cxx_read(hctx, 0, sizeof(uint64_t), &bl_head_size); + bufferlist bl; + uint16_t entry_start = QUEUE_HEAD_START; + encode(entry_start, bl); + + bufferlist bl_head; + encode(head, bl_head); + + uint64_t encoded_len = bl_head.length(); + encode(encoded_len, bl); + + bl.claim_append(bl_head); + + int ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); if (ret < 0) { - CLS_LOG(0, "ERROR: get_queue_head_and_size: failed to read head with error %d\n", ret); + CLS_LOG(5, "ERROR: write_queue_head: failed to write head \n"); return ret; } - //decode head size - auto iter = bl_head_size.cbegin(); - try { - decode(head_size, iter); - } catch (buffer::error& err) { - CLS_LOG(0, "ERROR: get_queue_head_and_size: failed to decode head size \n"); - return -EINVAL; - } + return 0; +} - CLS_LOG(10, "INFO: get_queue_head_and_size: head size is %lu\n", head_size); +int get_queue_head(cls_method_context_t hctx, cls_queue_head& head) +{ + uint64_t chunk_size = 1024, start_offset = 0; - //read the head bufferlist bl_head; - ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + int ret = cls_cxx_read(hctx, start_offset, chunk_size, &bl_head); if (ret < 0) { + CLS_LOG(5, "ERROR: get_queue_head: failed to read head \n"); return ret; } - //decode head - iter = bl_head.cbegin(); + //Process the chunk of data read + auto it = bl_head.cbegin(); + // Queue head start + uint16_t queue_head_start; try { - decode(head, iter); + decode(queue_head_start, it); } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: get_queue_head_and_size: failed to decode head\n"); + CLS_LOG(0, "ERROR: get_queue_head: failed to decode queue start \n"); + return -EINVAL; + } + if (queue_head_start != QUEUE_HEAD_START) { + CLS_LOG(0, "ERROR: get_queue_head: invalid queue start \n"); return -EINVAL; } - return 0; -} - -int cls_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - auto in_iter = in->cbegin(); - - cls_create_queue_op op; + uint64_t encoded_len; try { - decode(op, in_iter); + decode(encoded_len, it); } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_create_queue(): failed to decode entry\n"); + CLS_LOG(0, "ERROR: get_queue_head: failed to decode encoded head size \n"); return -EINVAL; } - // create the object - int ret = cls_cxx_create(hctx, true); - if (ret < 0) { - CLS_LOG(0, "ERROR: %s(): cls_cxx_create returned %d", __func__, ret); - return ret; - } - - CLS_LOG(10, "INFO: cls_create_queue create queue of head size %lu", op.head_size); - CLS_LOG(10, "INFO: cls_create_queue create queue of size %lu", op.head.queue_size); - - uint64_t head_size = QUEUE_HEAD_SIZE_1K; - - if (op.head.has_urgent_data) { - if (op.head_size == 0) { - head_size = QUEUE_HEAD_SIZE_4K; - op.head.tail = op.head.front = QUEUE_START_OFFSET_4K; - op.head.last_entry_offset = QUEUE_START_OFFSET_4K; - } else { - head_size = op.head_size; - op.head.tail = op.head.front = head_size; - op.head.last_entry_offset = head_size; + uint8_t decoded_head_size = sizeof(uint64_t) + sizeof(uint16_t); + if (encoded_len > (chunk_size - decoded_head_size)) { + chunk_size = (encoded_len - (chunk_size - decoded_head_size)); + start_offset += decoded_head_size; + bufferlist bl_remaining_head; + int ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + CLS_LOG(5, "ERROR: get_queue_head: failed to read remaining part of head \n"); + return ret; } - } else { - head_size = QUEUE_HEAD_SIZE_1K; - op.head.tail = op.head.front = QUEUE_START_OFFSET_1K; - op.head.last_entry_offset = QUEUE_START_OFFSET_1K; + bl_head.claim_append(bl_remaining_head); } - op.head.queue_size += head_size; - - CLS_LOG(10, "INFO: cls_create_queue queue actual size %lu", op.head.queue_size); - CLS_LOG(10, "INFO: cls_create_queue head size %lu", head_size); - CLS_LOG(10, "INFO: cls_create_queue queue front offset %lu", op.head.front); - - - //encode head size - bufferlist bl; - encode(head_size, bl); - CLS_LOG(0, "INFO: cls_create_queue head size %u", bl.length()); - //encode head - bufferlist bl_head; - encode(op.head, bl_head); - - bl.claim_append(bl_head); - - CLS_LOG(0, "INFO: cls_create_queue writing head of size %u", bl.length()); - ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - CLS_LOG(0, "ERROR: %s(): cls_cxx_write returned %d", __func__, ret); - return ret; + try { + decode(head, it); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: get_queue_head: failed to decode head\n"); + return -EINVAL; } + return 0; } -int cls_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +int init_queue(cls_method_context_t hctx, const cls_queue_init_op& op) { //get head and its size - uint64_t head_size = 0; cls_queue_head head; - int ret = get_queue_head_and_size(hctx, head, head_size); + int ret = get_queue_head(hctx, head); //head is already initialized if (ret == 0) { - CLS_LOG(1, "INFO: cls_init_queue_op(): head is already initialized\n"); - return 0; + return -EEXIST; } if (ret < 0 && ret != -EINVAL) { return ret; } - auto in_iter = in->cbegin(); - cls_init_queue_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_init_queue_op(): failed to decode entry\n"); - return -EINVAL; - } + head = std::move(op.head); - CLS_LOG(10, "INFO: cls_init_queue_op init queue of head size %lu", op.head_size); - CLS_LOG(10, "INFO: cls_init_queue_op init queue of size %lu", op.head.queue_size); - - head_size = QUEUE_HEAD_SIZE_1K; - - if (op.head.has_urgent_data) { + if (op.has_urgent_data) { if (op.head_size == 0) { - head_size = QUEUE_HEAD_SIZE_4K; - op.head.tail = op.head.front = QUEUE_START_OFFSET_4K; - op.head.last_entry_offset = QUEUE_START_OFFSET_4K; + head.max_head_size = QUEUE_HEAD_SIZE_4K; + head.tail.offset = head.front.offset = QUEUE_START_OFFSET_4K; } else { - head_size = op.head_size; - op.head.tail = op.head.front = head_size; - op.head.last_entry_offset = head_size; + head.max_head_size = op.head_size; + head.tail.offset = head.front.offset = head.max_head_size; } + bufferlist bl_non_urgent; + uint16_t queue_start = QUEUE_HEAD_START; + uint64_t queue_encoded_len = 0;//dummy value for calculating max size of urgent data + encode(queue_start, bl_non_urgent); + encode(queue_encoded_len, bl_non_urgent); + encode(head, bl_non_urgent); + head.max_urgent_data_size = head.max_head_size - (bl_non_urgent.length() - head.bl_urgent_data.length()); } else { - head_size = QUEUE_HEAD_SIZE_1K; - op.head.tail = op.head.front = QUEUE_START_OFFSET_1K; - op.head.last_entry_offset = QUEUE_START_OFFSET_1K; + head.max_head_size = QUEUE_HEAD_SIZE_1K; + head.tail.offset = head.front.offset = QUEUE_START_OFFSET_1K; + head.max_urgent_data_size = 0; } - op.head.queue_size += head_size; - - CLS_LOG(10, "INFO: cls_init_queue_op queue actual size %lu", op.head.queue_size); - CLS_LOG(10, "INFO: cls_init_queue_op head size %lu", head_size); - CLS_LOG(10, "INFO: cls_init_queue_op queue front offset %lu", op.head.front); - - //encode head size - bufferlist bl; - encode(head_size, bl); - CLS_LOG(0, "INFO: cls_init_queue_op head size %u", bl.length()); - - //encode head - bufferlist bl_head; - encode(op.head, bl_head); - - bl.claim_append(bl_head); + head.tail.gen = head.front.gen = 0; + head.queue_size += head.max_head_size; + + CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head.queue_size); + CLS_LOG(20, "INFO: init_queue_op head size %lu", head.max_head_size); + CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head.front.to_str().c_str()); + CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head.max_urgent_data_size); - CLS_LOG(0, "INFO: cls_init_queue_op writing head of size %u", bl.length()); - ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - CLS_LOG(0, "ERROR: %s(): cls_init_queue_op returned %d", __func__, ret); - return ret; - } - return 0; + return write_queue_head(hctx, head); } -int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +int get_queue_size(cls_method_context_t hctx, cls_queue_get_size_ret& op_ret) { - //get head and its size - uint64_t head_size = 0; + //get head cls_queue_head head; - int ret = get_queue_head_and_size(hctx, head, head_size); + int ret = get_queue_head(hctx, head); if (ret < 0) { return ret; } - head.queue_size -= head_size; - - CLS_LOG(10, "INFO: cls_get_queue_size: size of queue is %lu\n", head.queue_size); + op_ret.queue_size = head.queue_size - head.max_head_size; - encode(head.queue_size, *out); + CLS_LOG(20, "INFO: get_queue_size: size of queue is %lu\n", op_ret.queue_size); return 0; } -int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +int enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head) { - //get head and its size - uint64_t head_size = 0; - cls_queue_head head; - int ret = get_queue_head_and_size(hctx, head, head_size); - if (ret < 0) { - return ret; - } - - if ((head.front == head.tail) && (! head.is_empty)) { - CLS_LOG(1, "ERROR: No space left in queue\n"); + if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) { + CLS_LOG(0, "ERROR: No space left in queue\n"); return -ENOSPC; } - auto iter = in->cbegin(); - cls_enqueue_op op; - try { - decode(op, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_enqueue: failed to decode input data \n"); - return -EINVAL; - } - - for (auto bl_data : op.bl_data_vec) { + for (auto& bl_data : op.bl_data_vec) { bufferlist bl; + uint16_t entry_start = QUEUE_ENTRY_START; + encode(entry_start, bl); uint64_t data_size = bl_data.length(); encode(data_size, bl); bl.claim_append(bl_data); + + CLS_LOG(10, "INFO: enqueue(): Total size to be written is %u and data size is %u\n", bl.length(), bl_data.length()); - CLS_LOG(1, "INFO: cls_enqueue(): Total size to be written is %u and data size is %u\n", bl.length(), bl_data.length()); - - if (head.tail >= head.front) { + if (head.tail.offset >= head.front.offset) { // check if data can fit in the remaining space in queue - if ((head.tail + bl.length()) <= head.queue_size) { - CLS_LOG(1, "INFO: cls_enqueue: Writing data size and data: offset: %lu, size: %u\n", head.tail, bl.length()); - head.last_entry_offset = head.tail; + if ((head.tail.offset + bl.length()) <= head.queue_size) { + CLS_LOG(5, "INFO: enqueue: Writing data size and data: offset: %s, size: %u\n", head.tail.to_str().c_str(), bl.length()); //write data size and data at tail offset - ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); if (ret < 0) { return ret; } - head.tail += bl.length(); + head.tail.offset += bl.length(); } else { - CLS_LOG(1, "INFO: Wrapping around and checking for free space\n"); - uint64_t free_space_available = (head.queue_size - head.tail) + (head.front - head_size); + uint64_t free_space_available = (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size); //Split data if there is free space available if (bl.length() <= free_space_available) { - uint64_t size_before_wrap = head.queue_size - head.tail; + uint64_t size_before_wrap = head.queue_size - head.tail.offset; bufferlist bl_data_before_wrap; bl.splice(0, size_before_wrap, &bl_data_before_wrap); - head.last_entry_offset = head.tail; //write spliced (data size and data) at tail offset - CLS_LOG(1, "INFO: cls_enqueue: Writing spliced data at offset: %lu and data size: %u\n", head.tail, bl_data_before_wrap.length()); - ret = cls_cxx_write2(hctx, head.tail, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + CLS_LOG(5, "INFO: enqueue: Writing spliced data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl_data_before_wrap.length()); + auto ret = cls_cxx_write2(hctx, head.tail.offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); if (ret < 0) { return ret; } - head.tail = head_size; + head.tail.offset = head.max_head_size; + head.tail.gen += 1; //write remaining data at tail offset after wrapping around - CLS_LOG(1, "INFO: cls_enqueue: Writing remaining data at offset: %lu and data size: %u\n", head.tail, bl.length()); - ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + CLS_LOG(5, "INFO: enqueue: Writing remaining data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl.length()); + ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); if (ret < 0) { return ret; } - head.tail += bl.length(); + head.tail.offset += bl.length(); } else { - CLS_LOG(1, "ERROR: No space left in queue\n"); + CLS_LOG(0, "ERROR: No space left in queue\n"); // return queue full error return -ENOSPC; } } - } else if (head.front > head.tail) { - if ((head.tail + bl.length()) <= head.front) { - CLS_LOG(1, "INFO: cls_enqueue: Writing data size and data: offset: %lu, size: %u\n\n", head.tail, bl.length()); - head.last_entry_offset = head.tail; + } else if (head.front.offset > head.tail.offset) { + if ((head.tail.offset + bl.length()) <= head.front.offset) { + CLS_LOG(5, "INFO: enqueue: Writing data size and data: offset: %s, size: %u\n\n", head.tail.to_str().c_str(), bl.length()); //write data size and data at tail offset - ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); if (ret < 0) { return ret; } - head.tail += bl.length(); + head.tail.offset += bl.length(); } else { - CLS_LOG(1, "ERROR: No space left in queue\n"); + CLS_LOG(0, "ERROR: No space left in queue\n"); // return queue full error return -ENOSPC; } } - if (head.tail == head.queue_size) { - head.tail = head_size; + if (head.tail.offset == head.queue_size) { + head.tail.offset = head.max_head_size; + head.tail.gen += 1; } - CLS_LOG(1, "INFO: cls_enqueue: New tail offset: %lu \n", head.tail); + CLS_LOG(20, "INFO: enqueue: New tail offset: %s \n", head.tail.to_str().c_str()); } //end - for - head.is_empty = false; - - //Update urgent data if set - if (op.bl_urgent_data.length() > 0) { - head.bl_urgent_data = op.bl_urgent_data; - } - - bufferlist bl_head; - encode(head, bl_head); - CLS_LOG(1, "INFO: cls_enqueue: Writing head of size: %u \n", bl_head.length()); - ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - CLS_LOG(1, "INFO: cls_enqueue: Writing head returned error: %d \n", ret); - return ret; - } - return 0; } -int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head) { - //get head and its size - uint64_t head_size = 0; - cls_queue_head head; - int ret = get_queue_head_and_size(hctx, head, head_size); - if (ret < 0) { - return ret; - } - // If queue is empty, return from here - if (head.is_empty) { - return -ENOENT; + if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) { + CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s\n", head.front.to_str().c_str()); + op_ret.next_marker = head.front.to_str(); + op_ret.is_truncated = false; + return 0; } - cls_queue_list_ret op_ret; - CLS_LOG(1, "INFO: cls_queue_list_entries: Is urgent data present: %d\n", head.has_urgent_data); - //Info related to urgent data - op_ret.bl_urgent_data = head.bl_urgent_data; - - auto in_iter = in->cbegin(); - - cls_queue_list_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_list_entries(): failed to decode input data\n"); - return -EINVAL; - } + cls_queue_marker start_marker; + start_marker.from_str(op.start_marker.c_str()); + cls_queue_marker next_marker = {0, 0}; - uint64_t start_offset = 0; - if (op.start_offset == 0) { - start_offset = head.front; + uint64_t start_offset = 0, gen = 0; + if (start_marker.offset == 0) { + start_offset = head.front.offset; + gen = head.front.gen; } else { - start_offset = op.start_offset; + start_offset = start_marker.offset; + gen = start_marker.gen; } op_ret.is_truncated = true; @@ -364,25 +264,26 @@ int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist bool wrap_around = false; //Calculate length of contiguous data to be read depending on front, tail and start offset - if (head.tail > head.front) { - contiguous_data_size = head.tail - start_offset; - } else if (head.front >= head.tail) { - if (start_offset >= head.front) { + if (head.tail.offset > head.front.offset) { + contiguous_data_size = head.tail.offset - start_offset; + } else if (head.front.offset >= head.tail.offset) { + if (start_offset >= head.front.offset) { contiguous_data_size = head.queue_size - start_offset; wrap_around = true; - } else if (start_offset <= head.tail) { - contiguous_data_size = head.tail - start_offset; + } else if (start_offset <= head.tail.offset) { + contiguous_data_size = head.tail.offset - start_offset; } } - CLS_LOG(1, "INFO: cls_queue_list_entries(): front is: %lu, tail is %lu\n", head.front, head.tail); + CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s\n", head.front.to_str().c_str(), head.tail.to_str().c_str()); - bool offset_populated = false; - uint64_t num_ops = 0; + bool offset_populated = false, entry_start_processed = false; + uint64_t data_size = 0, num_ops = 0; + uint16_t entry_start = 0; bufferlist bl; do { - CLS_LOG(1, "INFO: cls_queue_list_entries(): start_offset is %lu\n", start_offset); + CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu\n", start_offset); bufferlist bl_chunk; //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size @@ -391,87 +292,105 @@ int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist } else { size_to_read = contiguous_data_size; } - CLS_LOG(1, "INFO: cls_queue_list_entries(): size_to_read is %lu\n", size_to_read); + CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu\n", size_to_read); if (size_to_read == 0) { + next_marker = head.tail; op_ret.is_truncated = false; - CLS_LOG(1, "INFO: cls_queue_list_entries(): size_to_read is 0, hence breaking out!\n"); + CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n"); break; } - ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk); + auto ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk); if (ret < 0) { return ret; } //If there is leftover data from previous iteration, append new data to leftover data bl.claim_append(bl_chunk); - bl_chunk = bl; - bl.clear(); + bl_chunk = std::move(bl); - CLS_LOG(1, "INFO: cls_queue_list_entries(): size of chunk %u\n", bl_chunk.length()); + CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u\n", bl_chunk.length()); //Process the chunk of data read unsigned index = 0; auto it = bl_chunk.cbegin(); uint64_t size_to_process = bl_chunk.length(); do { - CLS_LOG(1, "INFO: cls_queue_list_entries(): index: %u, size_to_process: %lu\n", index, size_to_process); + CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu\n", index, size_to_process); it.seek(index); - uint64_t data_size = 0; - //Populate offset if not done, else don't if already done in previous iteration + //Populate offset if not done in previous iteration if (! offset_populated) { - uint64_t data_offset = start_offset + index; - op_ret.offsets.emplace_back(data_offset); - CLS_LOG(1, "INFO: cls_queue_list_entries(): offset: %lu\n", data_offset); + cls_queue_marker marker = {start_offset + index, gen}; + CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker.to_str().c_str()); + string opaque_marker = marker.to_str(); + op_ret.markers.emplace_back(opaque_marker); } - if (size_to_process >= sizeof(uint64_t)) { - try { - decode(data_size, it); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_list_entries: failed to decode data size \n"); - return -EINVAL; + // Magic number + Data size - process if not done in previous iteration + if (! entry_start_processed ) { + if (size_to_process >= (sizeof(uint16_t) + sizeof(uint64_t))) { + // Decode magic number at start + try { + decode(entry_start, it); + } catch (buffer::error& err) { + CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start \n"); + return -EINVAL; + } + if (entry_start != QUEUE_ENTRY_START) { + CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u\n", entry_start); + return -EINVAL; + } + index += sizeof(uint16_t); + size_to_process -= sizeof(uint16_t); + // Decode data size + try { + decode(data_size, it); + } catch (buffer::error& err) { + CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size \n"); + return -EINVAL; + } + } else { + // Copy unprocessed data to bl + bl_chunk.splice(index, size_to_process, &bl); + offset_populated = true; + CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!\n"); + break; } - } else { - // Copy unprocessed data to bl - bl_chunk.copy(index, size_to_process, bl); - offset_populated = true; - CLS_LOG(1, "INFO: cls_queue_list_entries: not enough data to read data size, breaking out!\n"); - break; + CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu\n", data_size); + index += sizeof(uint64_t); + size_to_process -= sizeof(uint64_t); } - CLS_LOG(1, "INFO: cls_queue_list_entries(): data size: %lu\n", data_size); - index += sizeof(uint64_t); - size_to_process -= sizeof(uint64_t); - bufferlist bl_data; + // Data if (data_size <= size_to_process) { + bufferlist bl_data; bl_chunk.copy(index, data_size, bl_data); //Return data here op_ret.data.emplace_back(bl_data); index += bl_data.length(); size_to_process -= bl_data.length(); } else { - index -= sizeof(uint64_t); - size_to_process += sizeof(uint64_t); bl_chunk.copy(index, size_to_process, bl); offset_populated = true; - CLS_LOG(1, "INFO: cls_queue_list_entries(): not enough data to read data, breaking out!\n"); + entry_start_processed = true; + CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!\n"); break; } + // Resetting some values offset_populated = false; + entry_start_processed = false; + data_size = 0; + entry_start = 0; num_ops++; if (num_ops == op.max) { - CLS_LOG(1, "INFO: cls_queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n"); - break; - } - if (index == bl_chunk.length()) { + CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n"); break; } } while(index < bl_chunk.length()); - CLS_LOG(1, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max); + CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max); if (num_ops == op.max) { - op_ret.next_offset = start_offset + index; - CLS_LOG(1, "INFO: cls_queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", op_ret.next_offset); + next_marker = cls_queue_marker{(start_offset + index), gen}; + CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", next_marker.offset); break; } @@ -480,12 +399,13 @@ int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist contiguous_data_size -= size_to_read; if (contiguous_data_size == 0) { if (wrap_around) { - start_offset = head_size; - contiguous_data_size = head.tail - head_size; + start_offset = head.max_head_size; + contiguous_data_size = head.tail.offset - head.max_head_size; + gen += 1; wrap_around = false; } else { - CLS_LOG(1, "INFO: cls_queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n"); - op_ret.next_offset = head.front; + CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n"); + next_marker = head.tail; op_ret.is_truncated = false; break; } @@ -494,408 +414,78 @@ int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist } while(num_ops < op.max); //Wrap around next offset if it has reached end of queue - if (op_ret.next_offset == head.queue_size) { - op_ret.next_offset = head_size; + if (next_marker.offset == head.queue_size) { + next_marker.offset = head.max_head_size; + next_marker.gen += 1; } - if (op_ret.next_offset == head.tail) { + if ((next_marker.offset == head.tail.offset) && (next_marker.gen == head.tail.gen)) { op_ret.is_truncated = false; } - encode(op_ret, *out); + CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s\n", next_marker.to_str().c_str()); + op_ret.next_marker = next_marker.to_str(); return 0; } -int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head) { - //get head and its size - uint64_t head_size = 0; - cls_queue_head head; - int ret = get_queue_head_and_size(hctx, head, head_size); - if (ret < 0) { - return ret; - } - - if (head.is_empty) { - return -ENOENT; - } - - auto in_iter = in->cbegin(); - cls_queue_remove_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode input data\n"); - return -EINVAL; - } - - // If start offset is not set or set to zero, then we need to shift it to actual front of queue - if (op.start_offset == 0) { - op.start_offset = head.front; - } - - if (op.start_offset != head.front) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: invalid start offset\n"); - CLS_LOG(1, "ERROR: cls_queue_remove_entries: start offset = %lu\n", op.start_offset); - CLS_LOG(1, "ERROR: cls_queue_remove_entries: front = %lu\n", head.front); - return -EINVAL; + //Queue is empty + if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) { + return 0; } - uint64_t end_offset = 0; - uint64_t data_size = 0; - - //Check for data size wrap around - if ((head.queue_size - op.end_offset) >= sizeof(uint64_t)) { - // Read the size from the end offset - bufferlist bl_size; - ret = cls_cxx_read(hctx, op.end_offset, sizeof(uint64_t), &bl_size); - if (ret < 0) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read data size \n"); - return ret; - } - auto iter = bl_size.cbegin(); - try { - decode(data_size, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n"); - return -EINVAL; - } - //Check for data wrap around - if ((head.queue_size - (op.end_offset + sizeof(uint64_t))) >= data_size) { - end_offset = op.end_offset + sizeof(uint64_t) + data_size; - } else { - uint64_t rem_size = data_size - (head.queue_size - (op.end_offset + sizeof(uint64_t))); - end_offset = head_size + rem_size; - } - } else { - bufferlist bl_size; - ret = cls_cxx_read(hctx, op.end_offset, (head.queue_size - op.end_offset), &bl_size); - if (ret < 0) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read first part of data size \n"); - return ret; - } - uint64_t rem_size = sizeof(uint64_t) - (head.queue_size - op.end_offset); - bufferlist bl; - ret = cls_cxx_read(hctx, head_size, rem_size, &bl); - if (ret < 0) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read second part of data size \n"); - return ret; - } - bl_size.claim_append(bl); - auto iter = bl_size.cbegin(); - try { - decode(data_size, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n"); - return -EINVAL; - } - end_offset = head_size + rem_size + data_size; - } + cls_queue_marker end_marker; + end_marker.from_str(op.end_marker.c_str()); - CLS_LOG(1, "INFO: cls_queue_remove_entries: op.end_offset = %lu\n", op.end_offset); - CLS_LOG(1, "INFO: cls_queue_remove_entries: data_size = %lu\n", data_size); - CLS_LOG(1, "INFO: cls_queue_remove_entries: end_offset = %lu\n", end_offset); + CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s\n", end_marker.to_str().c_str()); //Zero out the entries that have been removed, to reclaim storage space - if (end_offset > op.start_offset) { - ret = cls_cxx_write_zero(hctx, op.start_offset, (end_offset - op.start_offset)); - if (ret < 0) { - CLS_LOG(1, "INFO: cls_queue_remove_entries: Failed to zero out entries\n"); - CLS_LOG(1, "INFO: cls_queue_remove_entries: Start offset = %lu\n", op.start_offset); - CLS_LOG(1, "INFO: cls_queue_remove_entries: Length = %lu\n", (end_offset - op.start_offset)); - return ret; - } - } else { //start offset > end offset - ret = cls_cxx_write_zero(hctx, op.start_offset, (head.queue_size - op.start_offset)); - if (ret < 0) { - CLS_LOG(1, "INFO: cls_queue_remove_entries: Failed to zero out entries\n"); - CLS_LOG(1, "INFO: cls_queue_remove_entries: Start offset = %lu\n", op.start_offset); - CLS_LOG(1, "INFO: cls_queue_remove_entries: Length = %lu\n", (head.queue_size - op.start_offset)); - return ret; - } - ret = cls_cxx_write_zero(hctx, head_size, (end_offset - head_size)); - if (ret < 0) { - CLS_LOG(1, "INFO: cls_queue_remove_entries: Failed to zero out entries\n"); - CLS_LOG(1, "INFO: cls_queue_remove_entries: Start offset = %lu\n", head_size); - CLS_LOG(1, "INFO: cls_queue_remove_entries: Length = %lu\n", (end_offset - head_size)); - return ret; - } - } - - head.front = end_offset; - - // Check if it is the end, then wrap around - if (head.front == head.queue_size) { - head.front = head_size; - } - - CLS_LOG(1, "INFO: cls_queue_remove_entries: front offset is: %lu and tail offset is %lu\n", head.front, head.tail); - - // We've reached the last element - if (head.front == head.tail) { - CLS_LOG(1, "INFO: cls_queue_remove_entries: Queue is empty now!\n"); - head.is_empty = true; - } - - //Update urgent data map - if (op.bl_urgent_data.length() > 0) { - head.bl_urgent_data = op.bl_urgent_data; - } - - //Write head back - bufferlist bl_head; - encode(head, bl_head); - CLS_LOG(1, "INFO: cls_queue_remove_entries: Writing head of size: %u and front offset is: %lu\n", bl_head.length(), head.front); - ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - CLS_LOG(1, "INFO: cls_queue_remove_entries: Writing head returned error: %d \n", ret); - return ret; - } - - return 0; -} - -int cls_queue_get_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - //get head and its size - uint64_t head_size = 0; - cls_queue_head head; - int ret = get_queue_head_and_size(hctx, head, head_size); - if (ret < 0) { - return ret; - } - - uint64_t data_size = 0, last_entry_offset = head.last_entry_offset; - - //Check for data size wrap around - if ((head.queue_size - last_entry_offset) >= sizeof(uint64_t)) { - // Read the size from the end offset - bufferlist bl_size; - ret = cls_cxx_read(hctx, last_entry_offset, sizeof(uint64_t), &bl_size); - if (ret < 0) { - CLS_LOG(1, "ERROR: cls_queue_get_last_entry: failed to read data size \n"); - return ret; - } - auto iter = bl_size.cbegin(); - try { - decode(data_size, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_get_last_entry: failed to decode data size \n"); - return -EINVAL; - } - last_entry_offset += sizeof(uint64_t); - //Check for data wrap around - if ((head.queue_size - (last_entry_offset + sizeof(uint64_t))) >= data_size) { - CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data is read from from last entry offset %lu\n", last_entry_offset); - ret = cls_cxx_read2(hctx, last_entry_offset, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - } else { - CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data is read from from last entry offset %lu\n", last_entry_offset); - ret = cls_cxx_read2(hctx, last_entry_offset, (head.queue_size - (last_entry_offset + sizeof(uint64_t))), out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (end_marker.offset > head.front.offset && end_marker.gen == head.front.gen) { + uint64_t len = end_marker.offset - head.front.offset; + if (len > 0) { + auto ret = cls_cxx_write_zero(hctx, head.front.offset, len); if (ret < 0) { + CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n"); + CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head.front.to_str().c_str()); return ret; } - uint64_t rem_size = data_size - (head.queue_size - (last_entry_offset + sizeof(uint64_t))); - bufferlist bl; - ret = cls_cxx_read(hctx, head_size, rem_size, &bl); - if (ret < 0) { - return ret; - } - out->claim_append(bl); - } - } else { - bufferlist bl_size; - ret = cls_cxx_read(hctx, last_entry_offset, (head.queue_size - last_entry_offset), &bl_size); - if (ret < 0) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read first part of data size \n"); - return ret; - } - uint64_t rem_size = sizeof(uint64_t) - (head.queue_size - last_entry_offset); - bufferlist bl; - ret = cls_cxx_read(hctx, head_size, rem_size, &bl); - if (ret < 0) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read second part of data size \n"); - return ret; } - bl_size.claim_append(bl); - auto iter = bl_size.cbegin(); - try { - decode(data_size, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n"); - return -EINVAL; - } - last_entry_offset = head_size + rem_size; - CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data is read from from last entry offset %lu\n", last_entry_offset); - ret = cls_cxx_read2(hctx, last_entry_offset, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - } - - return 0; -} - -int cls_queue_update_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - //get head and its size - uint64_t head_size = 0; - cls_queue_head head; - int ret = get_queue_head_and_size(hctx, head, head_size); - if (ret < 0) { - return ret; - } - - auto in_iter = in->cbegin(); - - cls_queue_update_last_entry_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_update_last_entry: failed to decode input data\n"); - return -EINVAL; - } - - bufferlist bl; - uint64_t data_size = op.bl_data.length(); - encode(data_size, bl); - bl.claim_append(op.bl_data); - - CLS_LOG(1, "INFO: cls_queue_update_last_entry_op: Updating data at last offset: %lu and total data size is %u\n", head.last_entry_offset, bl.length()); - - // check if data can fit in the remaining space in queue - if ((head.last_entry_offset + bl.length()) <= head.queue_size) { - //write data size and data at tail offset - ret = cls_cxx_write2(hctx, head.last_entry_offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - } else { - CLS_LOG(1, "INFO: Wrapping around and checking for free space\n"); - uint64_t free_space_available = (head.queue_size - head.last_entry_offset) + (head.front - head_size); - //Split data if there is free space available - if (bl.length() <= free_space_available) { - uint64_t size_before_wrap = head.queue_size - head.last_entry_offset; - bufferlist bl_data_before_wrap; - bl.splice(0, size_before_wrap, &bl_data_before_wrap); - //write spliced (data size and data) at last entry offset - CLS_LOG(1, "INFO: cls_enqueue: Writing spliced data at offset: %lu and data size: %u\n", head.last_entry_offset, bl_data_before_wrap.length()); - ret = cls_cxx_write2(hctx, head.last_entry_offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + } else if ((head.front.offset >= end_marker.offset) && (end_marker.gen == head.front.gen + 1)) { //start offset > end offset + uint64_t len = head.queue_size - head.front.offset; + if (len > 0) { + auto ret = cls_cxx_write_zero(hctx, head.front.offset, len); if (ret < 0) { + CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n"); + CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head.front.to_str().c_str()); return ret; } - //write remaining data after wrapping around - CLS_LOG(1, "INFO: cls_enqueue: Writing remaining data at offset: %lu and data size: %u\n", head_size, bl.length()); - ret = cls_cxx_write2(hctx, head_size, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + } + len = end_marker.offset - head.max_head_size; + if (len > 0) { + auto ret = cls_cxx_write_zero(hctx, head.max_head_size, len); if (ret < 0) { + CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n"); + CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu\n", head.max_head_size); return ret; } } - else { - CLS_LOG(1, "ERROR: No space left in queue\n"); - // return queue full error - return -ENOSPC; - } - } - - if (op.bl_urgent_data.length() > 0) { - head.bl_urgent_data = op.bl_urgent_data; - } - - bufferlist bl_head; - encode(head, bl_head); - CLS_LOG(1, "INFO: cls_queue_update_last_entry: Writing head of size: %u \n", bl_head.length()); - ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - CLS_LOG(1, "INFO: cls_queue_update_last_entry: Writing head returned error: %d \n", ret); - return ret; - } - return 0; -} - -int cls_queue_read_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - //get head and its size - uint64_t head_size = 0; - cls_queue_head head; - int ret = get_queue_head_and_size(hctx, head, head_size); - if (ret < 0) { - return ret; - } - - CLS_LOG(1, "INFO: cls_queue_read_urgent_data: tail offset %lu\n", head.tail); - - cls_queue_urgent_data_ret op_ret; - - op_ret.bl_urgent_data = head.bl_urgent_data; - - encode(op_ret, *out); - - return 0; -} - -int cls_queue_write_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - //get head and its size - uint64_t head_size = 0; - cls_queue_head head; - int ret = get_queue_head_and_size(hctx, head, head_size); - if (ret < 0) { - return ret; - } - - CLS_LOG(1, "INFO: cls_queue_write_urgent_data: tail offset %lu\n", head.tail); - - auto in_iter = in->cbegin(); - - cls_queue_write_urgent_data_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_write_urgent_data: failed to decode input data\n"); + } else if ((head.front.offset == end_marker.offset) && (head.front.gen == end_marker.gen)) { + //no-op + } else { + CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu\n", end_marker.to_str().c_str(), end_marker.gen); return -EINVAL; } - //Write urgent data - head.bl_urgent_data = op.bl_urgent_data; - - //Write head back - bufferlist bl_head; - encode(head, bl_head); - CLS_LOG(1, "INFO: cls_queue_write_urgent_data: Writing head of size: %u\n", bl_head.length()); - ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - CLS_LOG(1, "INFO: cls_queue_write_urgent_data: Writing head returned error: %d \n", ret); - return ret; - } - - return 0; -} - -int cls_queue_can_urgent_data_fit(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - bool can_fit = true; - - //get head and its size - uint64_t head_size = 0; - cls_queue_head head; - int ret = get_queue_head_and_size(hctx, head, head_size); - if (ret < 0) { - return ret; - } - head.bl_urgent_data = *in; + head.front = end_marker; - bufferlist bl_head; - encode(head, bl_head); - - if(bl_head.length() > head_size) { - can_fit = false; + // Check if it is the end, then wrap around + if (head.front.offset == head.queue_size) { + head.front.offset = head.max_head_size; + head.front.gen += 1; } - encode(can_fit, *out); + CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s\n", head.front.to_str().c_str(), head.tail.to_str().c_str()); return 0; } - diff --git a/src/cls/queue/cls_queue_src.h b/src/cls/queue/cls_queue_src.h new file mode 100644 index 00000000000..2b225f60143 --- /dev/null +++ b/src/cls/queue/cls_queue_src.h @@ -0,0 +1,12 @@ +#ifndef CEPH_CLS_QUEUE_SRC_H +#define CEPH_CLS_QUEUE_SRC_H + +int write_queue_head(cls_method_context_t hctx, cls_queue_head& head); +int get_queue_head(cls_method_context_t hctx, cls_queue_head& head); +int init_queue(cls_method_context_t hctx, const cls_queue_init_op& op); +int get_queue_size(cls_method_context_t hctx, cls_queue_get_size_ret& op_ret); +int enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head); +int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head); +int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head); + +#endif /* CEPH_CLS_QUEUE_SRC_H */ \ No newline at end of file diff --git a/src/cls/queue/cls_queue_types.cc b/src/cls/queue/cls_queue_types.cc index b5374b28c19..33319d9a34c 100644 --- a/src/cls/queue/cls_queue_types.cc +++ b/src/cls/queue/cls_queue_types.cc @@ -5,17 +5,3 @@ #include "common/ceph_json.h" #include "include/utime.h" -void cls_queue_head::dump(Formatter *f) const -{ - f->dump_bool("is_empty", is_empty); - f->dump_unsigned("front", front); - f->dump_unsigned("tail", tail); - f->dump_unsigned("size", queue_size); - f->dump_unsigned("has_urgent_data", front); -} - -void cls_queue_head::generate_test_instances(list& ls) -{ - ls.push_back(new cls_queue_head); - ls.push_back(new cls_queue_head); -} diff --git a/src/cls/queue/cls_queue_types.h b/src/cls/queue/cls_queue_types.h index 96630ea65f9..0caa269ed25 100644 --- a/src/cls/queue/cls_queue_types.h +++ b/src/cls/queue/cls_queue_types.h @@ -1,6 +1,7 @@ #ifndef CEPH_CLS_QUEUE_TYPES_H #define CEPH_CLS_QUEUE_TYPES_H +#include #include "include/types.h" #include "common/ceph_time.h" #include "common/Formatter.h" @@ -13,42 +14,87 @@ //Actual start offset of queue data #define QUEUE_START_OFFSET_4K QUEUE_HEAD_SIZE_4K +constexpr unsigned int QUEUE_HEAD_START = 0xDEAD; +constexpr unsigned int QUEUE_ENTRY_START = 0xBEEF; + +struct cls_queue_marker +{ + uint64_t offset{0}; + uint64_t gen{0}; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(gen, bl); + encode(offset, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(gen, bl); + decode(offset, bl); + DECODE_FINISH(bl); + } + + string to_str() { + string marker = std::to_string(gen) + '/' + std::to_string(offset); + return marker; + } + + int from_str(const char* str) { + errno = 0; + char* end = nullptr; + gen = ::strtoull(str, &end, 10); + if (errno) { + return errno; + } + if (str == end || *end != '/') { // expects delimiter + return -EINVAL; + } + str = end + 1; + offset = ::strtoull(str, &end, 10); + if (errno) { + return errno; + } + if (str == end || *end != 0) { // expects null terminator + return -EINVAL; + } + return 0; + } + +}; +WRITE_CLASS_ENCODER(cls_queue_marker) + struct cls_queue_head { - uint64_t front = QUEUE_START_OFFSET_1K; - uint64_t tail = QUEUE_START_OFFSET_1K; + uint64_t max_head_size = QUEUE_HEAD_SIZE_1K; + cls_queue_marker front{QUEUE_START_OFFSET_1K, 0}; + cls_queue_marker tail{QUEUE_START_OFFSET_1K, 0}; uint64_t queue_size{0}; // size of queue requested by user, with head size added to it - uint64_t last_entry_offset = QUEUE_START_OFFSET_1K; - bool is_empty{true}; - bool has_urgent_data{false}; + uint64_t max_urgent_data_size{0}; bufferlist bl_urgent_data; // special data known to application using queue void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); + encode(max_head_size, bl); encode(front, bl); encode(tail, bl); encode(queue_size, bl); - encode(last_entry_offset, bl); - encode(is_empty, bl); - encode(has_urgent_data, bl); + encode(max_urgent_data_size, bl); encode(bl_urgent_data, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); + decode(max_head_size, bl); decode(front, bl); decode(tail, bl); decode(queue_size, bl); - decode(last_entry_offset, bl); - decode(is_empty, bl); - decode(has_urgent_data, bl); + decode(max_urgent_data_size, bl); decode(bl_urgent_data, bl); DECODE_FINISH(bl); } - - void dump(Formatter *f) const; - static void generate_test_instances(list& o); }; WRITE_CLASS_ENCODER(cls_queue_head) diff --git a/src/cls/queue/cls_rgw_queue.cc b/src/cls/queue/cls_rgw_queue.cc index 563d8213d4d..bea69fa6617 100644 --- a/src/cls/queue/cls_rgw_queue.cc +++ b/src/cls/queue/cls_rgw_queue.cc @@ -13,10 +13,11 @@ #include "cls/queue/cls_queue_ops.h" #include "cls/queue/cls_rgw_queue_ops.h" #include "cls/queue/cls_queue_const.h" -#include "cls/queue/cls_queue.h" +#include "cls/queue/cls_queue_src.h" #include #include +#include #include "common/ceph_context.h" #include "global/global_context.h" @@ -29,36 +30,6 @@ CLS_VER(1,0) CLS_NAME(rgw_queue) -static int cls_gc_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - auto in_iter = in->cbegin(); - - cls_gc_create_queue_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_create_queue: failed to decode entry\n"); - return -EINVAL; - } - - cls_gc_urgent_data urgent_data; - urgent_data.num_urgent_data_entries = op.num_urgent_data_entries; - - cls_create_queue_op create_op; - - CLS_LOG(10, "INFO: cls_gc_create_queue: queue size is %lu\n", op.size); - CLS_LOG(10, "INFO: cls_gc_create_queue: queue name is %s\n", op.name.c_str()); - create_op.head.queue_size = op.size; - create_op.head_size = g_ceph_context->_conf->rgw_gc_queue_head_size; - create_op.head.has_urgent_data = true; - encode(urgent_data, create_op.head.bl_urgent_data); - - in->clear(); - encode(create_op, *in); - - return cls_create_queue(hctx, in, out); -} - static int cls_gc_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { auto in_iter = in->cbegin(); @@ -74,25 +45,21 @@ static int cls_gc_init_queue(cls_method_context_t hctx, bufferlist *in, bufferli cls_gc_urgent_data urgent_data; urgent_data.num_urgent_data_entries = op.num_urgent_data_entries; - cls_init_queue_op init_op; + cls_queue_init_op init_op; CLS_LOG(10, "INFO: cls_gc_init_queue: queue size is %lu\n", op.size); CLS_LOG(10, "INFO: cls_gc_init_queue: queue name is %s\n", op.name.c_str()); init_op.head.queue_size = op.size; init_op.head_size = g_ceph_context->_conf->rgw_gc_queue_head_size; - init_op.head.has_urgent_data = true; + init_op.has_urgent_data = true; encode(urgent_data, init_op.head.bl_urgent_data); - in->clear(); - encode(init_op, *in); - - return cls_init_queue(hctx, in, out); + return init_queue(hctx, init_op); } static int cls_gc_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { auto in_iter = in->cbegin(); - cls_rgw_gc_set_entry_op op; try { decode(op, in_iter); @@ -104,24 +71,33 @@ static int cls_gc_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist op.info.time = ceph::real_clock::now(); op.info.time += make_timespan(op.expiration_secs); - cls_enqueue_op enqueue_op; + //get head + cls_queue_head head; + int ret = get_queue_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_queue_enqueue_op enqueue_op; bufferlist bl_data; encode(op.info, bl_data); enqueue_op.bl_data_vec.emplace_back(bl_data); CLS_LOG(1, "INFO: cls_gc_enqueue: Data size is: %u \n", bl_data.length()); - in->clear(); - encode(enqueue_op, *in); + ret = enqueue(hctx, enqueue_op, head); + if (ret < 0) { + return ret; + } - return cls_enqueue(hctx, in, out); + //Write back head + return write_queue_head(hctx, head); } static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { CLS_LOG(1, "INFO: cls_gc_queue_list(): Entered cls_gc_queue_list \n"); auto in_iter = in->cbegin(); - cls_rgw_gc_list_op op; try { decode(op, in_iter); @@ -130,55 +106,46 @@ static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferli return -EINVAL; } - cls_queue_list_op list_op; - if (op.marker.empty()) { - list_op.start_offset = 0; - } else { - list_op.start_offset = boost::lexical_cast(op.marker.c_str()); + cls_queue_head head; + auto ret = get_queue_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_gc_urgent_data urgent_data; + if (head.bl_urgent_data.length() > 0) { + auto iter_urgent_data = head.bl_urgent_data.cbegin(); + try { + decode(urgent_data, iter_urgent_data); + } catch (buffer::error& err) { + CLS_LOG(5, "ERROR: cls_gc_queue_list(): failed to decode urgent data\n"); + return -EINVAL; + } } + cls_queue_list_op list_op; if (! op.max) { op.max = GC_LIST_DEFAULT_MAX; } list_op.max = op.max; + list_op.start_marker = op.marker; - cls_queue_list_ret op_ret; cls_rgw_gc_list_ret list_ret; uint32_t num_entries = 0; //Entries excluding the deferred ones - bool urgent_data_decoded = false; - cls_gc_urgent_data urgent_data; + bool is_truncated = true; + string next_marker; do { - in->clear(); - encode(list_op, *in); - - CLS_LOG(1, "INFO: cls_gc_queue_list(): Entering cls_queue_list_entries \n"); - int ret = cls_queue_list_entries(hctx, in, out); + CLS_LOG(1, "INFO: cls_gc_queue_list(): Entering queue_list_entries \n"); + cls_queue_list_ret op_ret; + int ret = queue_list_entries(hctx, list_op, op_ret, head); if (ret < 0) { - CLS_LOG(1, "ERROR: cls_queue_list_entries(): returned error %d\n", ret); + CLS_LOG(1, "ERROR: queue_list_entries(): returned error %d\n", ret); return ret; } - - auto iter = out->cbegin(); - try { - decode(op_ret, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode output\n"); - return -EINVAL; - } - - //Each cls_queue_list_entries will fetch the same urgent data, decode it only once - if (! urgent_data_decoded) { - auto iter_urgent_data = op_ret.bl_urgent_data.cbegin(); - try { - decode(urgent_data, iter_urgent_data); - urgent_data_decoded = true; - } catch (buffer::error& err) { - CLS_LOG(5, "ERROR: cls_gc_queue_list(): failed to decode urgent data\n"); - return -EINVAL; - } - } - + is_truncated = op_ret.is_truncated; + next_marker = op_ret.next_marker; + if (op_ret.data.size()) { for (auto it : op_ret.data) { cls_rgw_gc_obj_info info; @@ -190,18 +157,21 @@ static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferli } bool found = false; //Check for info tag in urgent data map - if (urgent_data.urgent_data_map.size() > 0) { - auto it = urgent_data.urgent_data_map.find(info.tag); - if (it != urgent_data.urgent_data_map.end()) { - found = true; - if (it->second > info.time) { - CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in urgent data: %s\n", info.tag.c_str()); - continue; - } + auto iter = urgent_data.urgent_data_map.find(info.tag); + if (iter != urgent_data.urgent_data_map.end()) { + found = true; + stringstream ss1, ss2; + ss1 << iter->second; + ss2 << info.time; + if (iter->second > info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in urgent data: %s\n", info.tag.c_str()); + CLS_LOG(1, "INFO: cls_gc_queue_list(): time found in urgent data: %s\n", ss1.str().c_str()); + CLS_LOG(1, "INFO: cls_gc_queue_list(): time found in queue data: %s\n", ss2.str().c_str()); + continue; } } //Search in xattrs - if (! found) { + if (! found && urgent_data.num_xattr_urgent_entries > 0) { bufferlist bl_xattrs; int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs); if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { @@ -217,15 +187,13 @@ static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferli CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode xattrs urgent data map\n"); return -EINVAL; } //end - catch - if (xattr_urgent_data_map.size() > 0) { - auto it = xattr_urgent_data_map.find(info.tag); - if (it != xattr_urgent_data_map.end()) { - if (it->second > info.time) { - CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in xattrs urgent data map: %s\n", info.tag.c_str()); - continue; - } + auto xattr_iter = xattr_urgent_data_map.find(info.tag); + if (xattr_iter != xattr_urgent_data_map.end()) { + if (xattr_iter->second > info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in xattrs urgent data map: %s\n", info.tag.c_str()); + continue; } - } // end - if xattrs size ... + } } // end - ret != ENOENT && ENODATA } // end - if not found if (op.expired_only) { @@ -242,7 +210,7 @@ static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferli CLS_LOG(1, "INFO: cls_gc_queue_list(): num_entries: %u and op.max: %u\n", num_entries, op.max); if (num_entries < op.max) { list_op.max = (op.max - num_entries); - list_op.start_offset = op_ret.next_offset; + list_op.start_marker = op_ret.next_marker; out->clear(); } else { //We've reached the max number of entries needed @@ -252,11 +220,11 @@ static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferli //We dont have data to process break; } - } while(op_ret.is_truncated); + } while(is_truncated); - list_ret.truncated = op_ret.is_truncated; + list_ret.truncated = is_truncated; if (list_ret.truncated) { - list_ret.next_marker = boost::lexical_cast(op_ret.next_offset); + list_ret.next_marker = next_marker; } out->clear(); encode(list_ret, *out); @@ -277,55 +245,43 @@ static int cls_gc_queue_remove(cls_method_context_t hctx, bufferlist *in, buffer return -EINVAL; } - // List entries and calculate total number of entries (including invalid entries) - cls_queue_list_op list_op; - if (op.marker.empty()) { - list_op.start_offset = 0; - } else { - list_op.start_offset = boost::lexical_cast(op.marker.c_str()); + cls_queue_head head; + auto ret = get_queue_head(hctx, head); + if (ret < 0) { + return ret; } + cls_gc_urgent_data urgent_data; + if (head.bl_urgent_data.length() > 0) { + auto iter_urgent_data = head.bl_urgent_data.cbegin(); + try { + decode(urgent_data, iter_urgent_data); + } catch (buffer::error& err) { + CLS_LOG(5, "ERROR: cls_gc_queue_remove(): failed to decode urgent data\n"); + return -EINVAL; + } + } + + // List entries and calculate total number of entries (including invalid entries) if (! op.num_entries) { op.num_entries = GC_LIST_DEFAULT_MAX; } - - list_op.max = op.num_entries; + cls_queue_list_op list_op; + list_op.max = op.num_entries + 1; // +1 to get the offset of last + 1 entry bool is_truncated = true; uint32_t total_num_entries = 0, num_entries = 0; - cls_gc_urgent_data urgent_data; - bool urgent_data_decoded = false; - uint64_t end_offset = 0; + string end_marker; do { - in->clear(); - encode(list_op, *in); - - CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering cls_queue_list_entries \n"); - int ret = cls_queue_list_entries(hctx, in, out); + CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering queue_list_entries \n"); + cls_queue_list_ret op_ret; + int ret = queue_list_entries(hctx, list_op, op_ret, head); if (ret < 0) { - CLS_LOG(1, "ERROR: cls_queue_list_entries(): returned error %d\n", ret); + CLS_LOG(1, "ERROR: queue_list_entries(): returned error %d\n", ret); return ret; } - cls_queue_list_ret op_ret; - auto iter = out->cbegin(); - try { - decode(op_ret, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode output\n"); - return -EINVAL; - } is_truncated = op_ret.is_truncated; unsigned int index = 0; - if (! urgent_data_decoded) { - auto iter_urgent_data = op_ret.bl_urgent_data.cbegin(); - try { - decode(urgent_data, iter_urgent_data); - urgent_data_decoded = true; - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode urgent data\n"); - return -EINVAL; - } - } // If data is not empty if (op_ret.data.size()) { for (auto it : op_ret.data) { @@ -341,21 +297,19 @@ static int cls_gc_queue_remove(cls_method_context_t hctx, bufferlist *in, buffer index++; bool found = false; //Search for tag in urgent data map - if (urgent_data.urgent_data_map.size() > 0) { - auto it = urgent_data.urgent_data_map.find(info.tag); - if (it != urgent_data.urgent_data_map.end()) { - found = true; - if (it->second > info.time) { - CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in urgent data: %s\n", info.tag.c_str()); - continue; - } else if (it->second == info.time) { - CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from urgent data: %s\n", info.tag.c_str()); - urgent_data.urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later from queue - urgent_data.num_head_urgent_entries -= 1; - } - }//end-if map end - }//end-if urgent data - if (! found) { + auto iter = urgent_data.urgent_data_map.find(info.tag); + if (iter != urgent_data.urgent_data_map.end()) { + found = true; + if (iter->second > info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in urgent data: %s\n", info.tag.c_str()); + continue; + } else if (iter->second == info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from urgent data: %s\n", info.tag.c_str()); + urgent_data.urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later from queue + urgent_data.num_head_urgent_entries -= 1; + } + }//end-if map end + if (! found && urgent_data.num_xattr_urgent_entries > 0) { //Search in xattrs bufferlist bl_xattrs; int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs); @@ -372,31 +326,34 @@ static int cls_gc_queue_remove(cls_method_context_t hctx, bufferlist *in, buffer CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode xattrs urgent data map\n"); return -EINVAL; } //end - catch - if (xattr_urgent_data_map.size() > 0) { - auto found = xattr_urgent_data_map.find(info.tag); - if (found != xattr_urgent_data_map.end()) { - if (found->second > info.time) { - CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in xattrs urgent data map: %s\n", info.tag.c_str()); - continue; - } else if (found->second == info.time) { - CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from xattrs urgent data: %s\n", info.tag.c_str()); - xattr_urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later - urgent_data.num_xattr_urgent_entries -= 1; - } + auto xattr_iter = xattr_urgent_data_map.find(info.tag); + if (xattr_iter != xattr_urgent_data_map.end()) { + if (xattr_iter->second > info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in xattrs urgent data map: %s\n", info.tag.c_str()); + continue; + } else if (xattr_iter->second == info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from xattrs urgent data: %s\n", info.tag.c_str()); + xattr_urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later + urgent_data.num_xattr_urgent_entries -= 1; } - } // end - if xattrs size ... + } } // end - ret != ENOENT && ENODATA }// search in xattrs num_entries++; }//end-for - if (num_entries < op.num_entries) { - list_op.max = (op.num_entries - num_entries); - list_op.start_offset = op_ret.next_offset; + if (! op_ret.is_truncated && num_entries < (op.num_entries + 1)) { + end_marker = op_ret.next_marker; + CLS_LOG(1, "INFO: cls_gc_queue_remove(): truncated and end offset is %s\n", end_marker.c_str()); + break; + } + if (num_entries < (op.num_entries + 1)) { + list_op.max = ((op.num_entries + 1) - num_entries); + list_op.start_marker = op_ret.next_marker; out->clear(); } else { - end_offset = op_ret.offsets[index - 1]; - CLS_LOG(1, "INFO: cls_gc_queue_remove(): index is %u and end_offset is: %lu\n", index, end_offset); + end_marker = op_ret.markers[index - 1]; + CLS_LOG(1, "INFO: cls_gc_queue_remove(): index is %u and end_offset is: %s\n", index, end_marker.c_str()); break; } } //end-if @@ -404,33 +361,26 @@ static int cls_gc_queue_remove(cls_method_context_t hctx, bufferlist *in, buffer break; } } while(is_truncated); + CLS_LOG(1, "INFO: cls_gc_queue_remove(): Total number of entries to remove: %d\n", total_num_entries); + CLS_LOG(1, "INFO: cls_gc_queue_remove(): End offset is %s\n", end_marker.c_str()); - if (end_offset != 0) { + if (! end_marker.empty()) { cls_queue_remove_op rem_op; - if (op.marker.empty()) { - rem_op.start_offset = 0; - } else { - rem_op.start_offset = boost::lexical_cast(op.marker.c_str()); - } - - rem_op.end_offset = end_offset; - CLS_LOG(1, "INFO: cls_gc_queue_remove(): start offset: %lu and end offset: %lu\n", rem_op.start_offset, rem_op.end_offset); + rem_op.end_marker = end_marker; - encode(urgent_data, rem_op.bl_urgent_data); - - in->clear(); - encode(rem_op, *in); - - CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering cls_queue_remove_entries \n"); - int ret = cls_queue_remove_entries(hctx, in, out); + CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering queue_remove_entries \n"); + int ret = queue_remove_entries(hctx, rem_op, head); if (ret < 0) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries(): returned error %d\n", ret); + CLS_LOG(1, "ERROR: queue_remove_entries(): returned error %d\n", ret); return ret; } } - return 0; + //Update urgent data map + encode(urgent_data, head.bl_urgent_data); + + return write_queue_head(hctx, head); } static int cls_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) @@ -449,53 +399,22 @@ static int cls_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in, op.info.time = ceph::real_clock::now(); op.info.time += make_timespan(op.expiration_secs); - //Read urgent data - in->clear(); - out->clear(); - - ret = cls_queue_read_urgent_data(hctx, in, out); - - auto out_iter = out->cbegin(); - - cls_queue_urgent_data_ret op_ret; - try { - decode(op_ret, out_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_urgent_data_ret(): failed to decode ouput\n"); - return -EINVAL; - } - - auto bl_iter = op_ret.bl_urgent_data.cbegin(); - cls_gc_urgent_data urgent_data; - try { - decode(urgent_data, bl_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_urgent_data_ret(): failed to decode urgent data\n"); - return -EINVAL; - } - - bool is_last_entry = false; - in->clear(); - out->clear(); - ret = cls_queue_get_last_entry(hctx, in, out); + // Read head + cls_queue_head head; + ret = get_queue_head(hctx, head); if (ret < 0) { return ret; } - cls_rgw_gc_obj_info info; - auto iter = out->cbegin(); + auto bl_iter = head.bl_urgent_data.cbegin(); + cls_gc_urgent_data urgent_data; try { - decode(info, iter); + decode(urgent_data, bl_iter); } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode entry\n"); + CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode urgent data\n"); return -EINVAL; } - CLS_LOG(1, "INFO: tag of gc info is %s\n", info.tag.c_str()); - if (info.tag == op.info.tag) { - is_last_entry = true; - } - //has_urgent_data signifies whether urgent data in queue has changed bool has_urgent_data = false, tag_found = false; //search in unordered map in head @@ -517,49 +436,39 @@ static int cls_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in, try { decode(xattr_urgent_data_map, iter); } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode xattrs urgent data map\n"); + CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode xattrs urgent data map\n"); return -EINVAL; } //end - catch - if (xattr_urgent_data_map.size() > 0) { - auto found = xattr_urgent_data_map.find(info.tag); - if (found != xattr_urgent_data_map.end()) { - it->second = op.info.time; - tag_found = true; - //write the updated map back - bufferlist bl_map; - encode(xattr_urgent_data_map, bl_map); - ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map); - CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data"); - if (ret < 0) { - CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret); - return ret; - } + auto xattr_iter = xattr_urgent_data_map.find(op.info.tag); + if (xattr_iter != xattr_urgent_data_map.end()) { + it->second = op.info.time; + tag_found = true; + //write the updated map back + bufferlist bl_map; + encode(xattr_urgent_data_map, bl_map); + ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map); + CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data"); + if (ret < 0) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret); + return ret; } - } // end - if xattrs size ... + } }// end ret != ENOENT ... } - bufferlist bl_urgent_data; if (! tag_found) { + stringstream ss1; + ss1 << op.info.time; + CLS_LOG(1, "INFO: cls_gc_queue_update_entry(): time inserted in urgent data: %s\n", ss1.str().c_str()); //try inserting in queue head urgent_data.urgent_data_map.insert({op.info.tag, op.info.time}); urgent_data.num_head_urgent_entries += 1; has_urgent_data = true; - //check if urgent data can fit in head - out->clear(); - bool can_fit = false; + bufferlist bl_urgent_data; encode(urgent_data, bl_urgent_data); - ret = cls_queue_can_urgent_data_fit(hctx, &bl_urgent_data, out); - if (ret < 0) { - return ret; - } - iter = out->cbegin(); - decode(can_fit, iter); - CLS_LOG(1, "INFO: Can urgent data fit: %d \n", can_fit); - //insert as xattrs - if (! can_fit) { + if (bl_urgent_data.length() > head.max_urgent_data_size) { //remove inserted entry from urgent data urgent_data.urgent_data_map.erase(op.info.tag); urgent_data.num_head_urgent_entries -= 1; @@ -603,38 +512,26 @@ static int cls_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in, return -ENOSPC; } - bl_urgent_data.clear(); - encode(urgent_data, bl_urgent_data); - in->clear(); - if (! is_last_entry) { - cls_enqueue_op enqueue_op; - bufferlist bl_data; - encode(op.info, bl_data); - enqueue_op.bl_data_vec.emplace_back(bl_data); - CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data.length()); - if (has_urgent_data) { - enqueue_op.bl_urgent_data = bl_urgent_data; - } - encode(enqueue_op, *in); - ret = cls_enqueue(hctx, in, out); - if (ret < 0) { - return ret; - } - } else { - cls_queue_update_last_entry_op update_op; - encode(op.info, update_op.bl_data); - CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", update_op.bl_data.length()); - if (has_urgent_data) { - update_op.bl_urgent_data = bl_urgent_data; - } - encode(update_op, *in); - ret = cls_queue_update_last_entry(hctx, in, out); - if (ret < 0) { - return ret; - } + cls_queue_enqueue_op enqueue_op; + bufferlist bl_data; + stringstream ss1; + ss1 << op.info.time; + CLS_LOG(1, "INFO: cls_gc_queue_update_entry(): time inserted in queue: %s\n", ss1.str().c_str()); + encode(op.info, bl_data); + enqueue_op.bl_data_vec.emplace_back(bl_data); + CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data.length()); + + ret = enqueue(hctx, enqueue_op, head); + if (ret < 0) { + return ret; } - return 0; + if (has_urgent_data) { + head.bl_urgent_data.clear(); + encode(urgent_data, head.bl_urgent_data); + } + + return write_queue_head(hctx, head); } CLS_INIT(rgw_queue) @@ -642,7 +539,6 @@ CLS_INIT(rgw_queue) CLS_LOG(1, "Loaded rgw queue class!"); cls_handle_t h_class; - cls_method_handle_t h_gc_create_queue; cls_method_handle_t h_gc_init_queue; cls_method_handle_t h_gc_enqueue; cls_method_handle_t h_gc_queue_list_entries; @@ -652,7 +548,6 @@ CLS_INIT(rgw_queue) cls_register(RGW_QUEUE_CLASS, &h_class); /* gc */ - cls_register_cxx_method(h_class, GC_CREATE_QUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_create_queue, &h_gc_create_queue); cls_register_cxx_method(h_class, GC_INIT_QUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_init_queue, &h_gc_init_queue); cls_register_cxx_method(h_class, GC_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_enqueue, &h_gc_enqueue); cls_register_cxx_method(h_class, GC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_gc_queue_list, &h_gc_queue_list_entries); diff --git a/src/cls/queue/cls_rgw_queue_ops.h b/src/cls/queue/cls_rgw_queue_ops.h index e2b3ce3405e..78ea1af21b0 100644 --- a/src/cls/queue/cls_rgw_queue_ops.h +++ b/src/cls/queue/cls_rgw_queue_ops.h @@ -4,12 +4,12 @@ #include "cls/rgw/cls_rgw_types.h" #include "cls/rgw/cls_rgw_ops.h" -struct cls_gc_create_queue_op { +struct cls_gc_init_queue_op { uint64_t size; uint64_t num_urgent_data_entries{0}; string name; //for debugging, to be removed later - cls_gc_create_queue_op() {} + cls_gc_init_queue_op() {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); @@ -26,31 +26,24 @@ struct cls_gc_create_queue_op { decode(name, bl); DECODE_FINISH(bl); } -}; -WRITE_CLASS_ENCODER(cls_gc_create_queue_op) - -struct cls_gc_init_queue_op : cls_gc_create_queue_op { }; WRITE_CLASS_ENCODER(cls_gc_init_queue_op) struct cls_rgw_gc_queue_remove_op { uint64_t num_entries; - string marker; cls_rgw_gc_queue_remove_op() {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(num_entries, bl); - encode(marker, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); decode(num_entries, bl); - decode(marker, bl); DECODE_FINISH(bl); } }; diff --git a/src/rgw/rgw_gc.cc b/src/rgw/rgw_gc.cc index aca2f29143a..fbc7b4589ee 100644 --- a/src/rgw/rgw_gc.cc +++ b/src/rgw/rgw_gc.cc @@ -44,15 +44,10 @@ void RGWGC::initialize(CephContext *_cct, RGWRados *_store) { for (int i = 0; i < max_objs; i++) { ldpp_dout(this, 20) << "RGWGC::initialize initing gc queue with name = " << obj_names[i] << dendl; librados::ObjectWriteOperation op; - op.assert_exists(); + op.create(true); uint64_t queue_size = 1048576, num_urgent_data_entries = 50; cls_rgw_gc_init_queue(op, obj_names[i], queue_size, num_urgent_data_entries); - int ret = store->gc_operate(obj_names[i], &op); - if (ret == -ENOENT) { - ldpp_dout(this, 20) << "RGWGC::initialize creating gc queue with name = " << obj_names[i] << dendl; - cls_rgw_gc_create_queue(op, obj_names[i], queue_size, num_urgent_data_entries); - store->gc_operate(obj_names[i], &op); - } + store->gc_operate(obj_names[i], &op); } } @@ -115,10 +110,10 @@ int RGWGC::remove(int index, const std::vector& tags, AioCompletion **pc return store->gc_aio_operate(obj_names[index], &op, pc); } -int RGWGC::remove(int index, string& marker, int num_entries, librados::AioCompletion **pc) +int RGWGC::remove(int index, int num_entries, librados::AioCompletion **pc) { ObjectWriteOperation op; - cls_rgw_gc_remove_entries_queue(op, marker, num_entries); + cls_rgw_gc_remove_entries_queue(op, num_entries); return store->gc_aio_operate(obj_names[index], &op, pc); } @@ -343,11 +338,11 @@ public: } } - int remove_queue_entries(int index, string& marker, int num_entries) { + int remove_queue_entries(int index, int num_entries) { IO index_io; index_io.type = IO::IndexIO; index_io.index = index; - int ret = gc->remove(index, marker, num_entries, &index_io.c); + int ret = gc->remove(index, num_entries, &index_io.c); if (ret < 0) { ldpp_dout(dpp, 0) << "WARNING: failed to remove queue entries on index=" << index << " ret=" << ret << dendl; @@ -478,7 +473,7 @@ int RGWGC::process(int index, int max_secs, bool expired_only, if (entries.size() > 0) { //Remove the entries from the queue ldpp_dout(this, 5) << "RGWGC::process removing entries, marker: " << marker << dendl; - ret = io_manager.remove_queue_entries(index, marker, entries.size()); + ret = io_manager.remove_queue_entries(index, entries.size()); if (ret < 0) { ldpp_dout(this, 0) << "WARNING: failed to remove queue entries" << dendl; diff --git a/src/rgw/rgw_gc.h b/src/rgw/rgw_gc.h index 3624b3d0a46..6311fa98a97 100644 --- a/src/rgw/rgw_gc.h +++ b/src/rgw/rgw_gc.h @@ -52,7 +52,7 @@ public: int send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync); int defer_chain(const string& tag, cls_rgw_obj_chain& info, bool sync); int remove(int index, const std::vector& tags, librados::AioCompletion **pc); - int remove(int index, string& marker, int num_entries, librados::AioCompletion **pc); + int remove(int index, int num_entries, librados::AioCompletion **pc); void initialize(CephContext *_cct, RGWRados *_store); void finalize(); diff --git a/src/test/cls_queue/test_cls_queue.cc b/src/test/cls_queue/test_cls_queue.cc index 521a18b997c..b8d4bae1f17 100644 --- a/src/test/cls_queue/test_cls_queue.cc +++ b/src/test/cls_queue/test_cls_queue.cc @@ -56,20 +56,14 @@ static void create_obj(cls_rgw_obj& obj, int i, int j) obj.loc.append(buf); } -static bool cmp_objs(cls_rgw_obj& obj1, cls_rgw_obj& obj2) -{ - return (obj1.pool == obj2.pool) && - (obj1.key == obj2.key) && - (obj1.loc == obj2.loc); -} - TEST(cls_queue, gc_queue_ops1) { //Testing queue ops when data size is NOT a multiple of queue size string queue_name = "my-queue"; - uint64_t queue_size = 320, num_urgent_data_entries = 10; + uint64_t queue_size = 322, num_urgent_data_entries = 10; librados::ObjectWriteOperation op; - cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries); + op.create(true); + cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); uint64_t size = 0; @@ -102,7 +96,7 @@ TEST(cls_queue, gc_queue_ops1) librados::ObjectWriteOperation remove_op; string marker1; uint64_t num_entries = 1; - cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries); + cls_rgw_gc_remove_entries_queue(remove_op, num_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op)); //Test enqueue again @@ -140,9 +134,10 @@ TEST(cls_queue, gc_queue_ops2) { //Testing list queue string queue_name = "my-second-queue"; - uint64_t queue_size = 330, num_urgent_data_entries = 10; + uint64_t queue_size = 334, num_urgent_data_entries = 10; librados::ObjectWriteOperation op; - cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries); + op.create(true); + cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); uint64_t size = 0; @@ -215,9 +210,10 @@ TEST(cls_queue, gc_queue_ops3) { //Testing remove queue entries string queue_name = "my-third-queue"; - uint64_t queue_size = 495, num_urgent_data_entries = 10; + uint64_t queue_size = 501, num_urgent_data_entries = 10; librados::ObjectWriteOperation op; - cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries); + op.create(true); + cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); uint64_t size = 0; @@ -229,8 +225,8 @@ TEST(cls_queue, gc_queue_ops3) librados::ObjectWriteOperation remove_op; string marker1; uint64_t num_entries = 2; - cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries); - ASSERT_EQ(-ENOENT, ioctx.operate(queue_name, &remove_op)); + cls_rgw_gc_remove_entries_queue(remove_op, num_entries); + ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op)); cls_rgw_gc_obj_info defer_info; @@ -280,7 +276,7 @@ TEST(cls_queue, gc_queue_ops3) //Test remove entries num_entries = 2; - cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries); + cls_rgw_gc_remove_entries_queue(remove_op, num_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op)); //Test list queue again @@ -293,9 +289,10 @@ TEST(cls_queue, gc_queue_ops4) { //Testing remove queue entries string queue_name = "my-fourth-queue"; - uint64_t queue_size = 495, num_urgent_data_entries = 10; + uint64_t queue_size = 501, num_urgent_data_entries = 10; librados::ObjectWriteOperation op; - cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries); + op.create(true); + cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); uint64_t size = 0; @@ -308,8 +305,8 @@ TEST(cls_queue, gc_queue_ops4) string marker1; uint64_t num_entries = 2; - cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries); - ASSERT_EQ(-ENOENT, ioctx.operate(queue_name, &remove_op)); + cls_rgw_gc_remove_entries_queue(remove_op, num_entries); + ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op)); cls_rgw_gc_obj_info defer_info; @@ -353,7 +350,7 @@ TEST(cls_queue, gc_queue_ops4) //Test remove entries num_entries = 2; - cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries); + cls_rgw_gc_remove_entries_queue(remove_op, num_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op)); //Test list queue again @@ -366,9 +363,10 @@ TEST(cls_queue, gc_queue_ops5) { //Testing remove queue entries string queue_name = "my-fifth-queue"; - uint64_t queue_size = 495, num_urgent_data_entries = 10; + uint64_t queue_size = 501, num_urgent_data_entries = 10; librados::ObjectWriteOperation op; - cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries); + op.create(true); + cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); uint64_t size = 0; @@ -414,7 +412,7 @@ TEST(cls_queue, gc_queue_ops5) //Test remove entries librados::ObjectWriteOperation remove_op; auto num_entries = list_info1.size(); - cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries); + cls_rgw_gc_remove_entries_queue(remove_op, num_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op)); //Test list queue again for all entries @@ -428,9 +426,10 @@ TEST(cls_queue, gc_queue_ops6) { //Testing list queue, when data size is split at the end of the queue string queue_name = "my-sixth-queue"; - uint64_t queue_size = 337, num_urgent_data_entries = 10; + uint64_t queue_size = 341, num_urgent_data_entries = 10; librados::ObjectWriteOperation op; - cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries); + op.create(true); + cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); uint64_t size = 0; @@ -461,7 +460,7 @@ TEST(cls_queue, gc_queue_ops6) string marker1; uint64_t num_entries = 1; - cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries); + cls_rgw_gc_remove_entries_queue(remove_op, num_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op)); //Enqueue one more element @@ -499,9 +498,10 @@ TEST(cls_queue, gc_queue_ops7) { //Testing list queue, when data size is written at the end of queue and data is written after wrap around string queue_name = "my-seventh-queue"; - uint64_t queue_size = 338, num_urgent_data_entries = 10; + uint64_t queue_size = 342, num_urgent_data_entries = 10; librados::ObjectWriteOperation op; - cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries); + op.create(true); + cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); uint64_t size = 0; @@ -532,7 +532,7 @@ TEST(cls_queue, gc_queue_ops7) string marker1; uint64_t num_entries = 1; - cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries); + cls_rgw_gc_remove_entries_queue(remove_op, num_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op)); //Enqueue one more element @@ -570,9 +570,10 @@ TEST(cls_queue, gc_queue_ops8) { //Testing list queue, when data is split at the end of the queue string queue_name = "my-eighth-queue"; - uint64_t queue_size = 340, num_urgent_data_entries = 10; + uint64_t queue_size = 344, num_urgent_data_entries = 10; librados::ObjectWriteOperation op; - cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries); + op.create(true); + cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); uint64_t size = 0; @@ -603,7 +604,7 @@ TEST(cls_queue, gc_queue_ops8) string marker1; uint64_t num_entries = 1; - cls_rgw_gc_remove_entries_queue(remove_op, marker1, num_entries); + cls_rgw_gc_remove_entries_queue(remove_op, num_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op)); //Enqueue one more element @@ -641,9 +642,10 @@ TEST(cls_queue, gc_queue_ops9) { //Testing remove queue entries string queue_name = "my-ninth-queue"; - uint64_t queue_size = 660, num_urgent_data_entries = 1; + uint64_t queue_size = 668, num_urgent_data_entries = 1; librados::ObjectWriteOperation op; - cls_rgw_gc_create_queue(op, queue_name, queue_size, num_urgent_data_entries); + op.create(true); + cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); uint64_t size = 0; -- 2.39.5