From: Pritha Srivastava Date: Thu, 2 May 2019 09:41:41 +0000 (+0530) Subject: Added cls ops for queue, gc(using queue) and test cases. X-Git-Tag: v15.1.0~1185^2~17 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=2ce449c4056f43d488eee71e40dea1c3ef2fbf3c;p=ceph-ci.git Added cls ops for queue, gc(using queue) and test cases. Signed-off-by: Pritha Srivastava --- diff --git a/src/cls/CMakeLists.txt b/src/cls/CMakeLists.txt index 127af83881b..8a6703a7dd9 100644 --- a/src/cls/CMakeLists.txt +++ b/src/cls/CMakeLists.txt @@ -264,3 +264,25 @@ install(TARGETS cls_cas DESTINATION ${cls_dir}) set(cls_cas_client_srcs cas/cls_cas_client.cc) add_library(cls_cas_client STATIC ${cls_cas_client_srcs}) + +# cls_queue +if (WITH_RADOSGW) + set(cls_queue_srcs + queue/cls_queue.cc + queue/cls_queue_ops.cc + queue/cls_queue_types.cc + ${CMAKE_SOURCE_DIR}/src/common/ceph_json.cc) + add_library(cls_queue SHARED ${cls_queue_srcs}) + set_target_properties(cls_queue PROPERTIES + VERSION "1.0.0" + SOVERSION "1" + INSTALL_RPATH "" + CXX_VISIBILITY_PRESET hidden) + install(TARGETS cls_queue DESTINATION ${cls_dir}) + + set(cls_queue_client_srcs + queue/cls_queue_client.cc + queue/cls_queue_types.cc + queue/cls_queue_ops.cc) + add_library(cls_queue_client STATIC ${cls_queue_client_srcs}) +endif (WITH_RADOSGW) diff --git a/src/cls/queue/cls_queue.cc b/src/cls/queue/cls_queue.cc new file mode 100644 index 00000000000..40427c1b4ee --- /dev/null +++ b/src/cls/queue/cls_queue.cc @@ -0,0 +1,1566 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" + +#include + +#include "objclass/objclass.h" +#include "cls/rgw/cls_rgw_ops.h" +#include "cls/rgw/cls_rgw_types.h" +#include "common/Clock.h" +#include "cls/queue/cls_queue_types.h" +#include "cls/queue/cls_queue_ops.h" +#include "cls/queue/cls_queue_const.h" +#include "common/Clock.h" +#include "common/strtol.h" +#include "common/escape.h" + +#include "include/compat.h" +#include +#include + +#include "common/ceph_context.h" +#include "global/global_context.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +#define GC_LIST_DEFAULT_MAX 128 + +CLS_VER(1,0) +CLS_NAME(queue) + +static int get_queue_head_size(cls_method_context_t hctx, uint64_t& head_size) +{ + //read head size + bufferlist bl_head_size; + int ret = cls_cxx_read(hctx, 0, sizeof(uint64_t), &bl_head_size); + if (ret < 0) { + CLS_LOG(0, "ERROR: get_queue_head_size: failed to read head\n"); + return ret; + } + //decode head size + auto iter = bl_head_size.cbegin(); + try { + decode(head_size, iter); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: get_queue_head_size: failed to decode head size \n"); + return -EINVAL; + } + + CLS_LOG(10, "INFO: get_queue_head_size: head size is %lu\n", head_size); + + return 0; +} + +static int cls_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_create_queue_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_create_queue(): failed to decode entry\n"); + return -EINVAL; + } + + // create the object + int ret = cls_cxx_create(hctx, true); + if (ret < 0) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_create returned %d", __func__, ret); + return ret; + } + CLS_LOG(10, "INFO: cls_create_queue create queue of size %lu", op.head.size); + CLS_LOG(10, "INFO: cls_create_queue: Is urgent data present: %d\n", op.head.has_urgent_data); + CLS_LOG(10, "INFO: cls_create_queue: Is urgent data present: %d\n", op.head.num_urgent_data_entries); + + uint64_t head_size = QUEUE_HEAD_SIZE_1K; + + if (op.head.num_head_urgent_entries) { + if (! op.head_size) { + head_size = QUEUE_HEAD_SIZE_4K; + op.head.tail = op.head.front = QUEUE_START_OFFSET_4K; + op.head.last_entry_offset = QUEUE_START_OFFSET_4K; + } else { + head_size = op.head_size; + op.head.tail = op.head.front = head_size; + op.head.last_entry_offset = head_size; + } + } else { + head_size = QUEUE_HEAD_SIZE_1K; + op.head.tail = op.head.front = QUEUE_START_OFFSET_1K; + op.head.last_entry_offset = QUEUE_START_OFFSET_1K; + } + op.head.size += head_size; + + CLS_LOG(10, "INFO: cls_create_queue queue actual size %lu", op.head.size); + CLS_LOG(10, "INFO: cls_create_queue head size %lu", head_size); + CLS_LOG(10, "INFO: cls_create_queue queue front offset %lu", op.head.front); + + + //encode head size + bufferlist bl; + encode(head_size, bl); + CLS_LOG(0, "INFO: cls_create_queue head size %u", bl.length()); + + //encode head + bufferlist bl_head; + encode(op.head, bl_head); + + bl.claim_append(bl_head); + + CLS_LOG(0, "INFO: cls_create_queue writing head of size %u", bl.length()); + ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_write returned %d", __func__, ret); + return ret; + } + return 0; +} + +static int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head size + uint64_t head_size = 0; + int ret = get_queue_head_size(hctx, head_size); + if (ret < 0) { + return ret; + } + + // read the head + cls_queue_head head; + bufferlist bl_head; + ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + return ret; + } + + try { + decode(head, bl_head); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_get_queue_size: failed to decode entry\n"); + return -EINVAL; + } + + head.size -= head_size; + + CLS_LOG(10, "INFO: cls_get_queue_size: size of queue is %lu\n", head.size); + + encode(head.size, *out); + + return 0; +} + +static int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head size + uint64_t head_size = 0; + int ret = get_queue_head_size(hctx, head_size); + if (ret < 0) { + return ret; + } + + CLS_LOG(1, "INFO: cls_enqueue: Read Head of size: %lu\n", head_size); + // read the head + bufferlist bl_head; + ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + return ret; + } + + cls_queue_head head; + auto iter = bl_head.cbegin(); + try { + decode(head, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_enqueue: failed to decode head\n"); + return -EINVAL; + } + + if (head.front == head.tail && ! head.is_empty) { + // return queue full error + return -ENOSPC; + } + + iter = in->cbegin(); + cls_enqueue_op op; + try { + decode(op, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_enqueue: failed to decode input data \n"); + return -EINVAL; + } + + for (auto bl_data : op.bl_data_vec) { + uint64_t total_size = sizeof(uint64_t) + bl_data.length(); + CLS_LOG(1, "INFO: cls_enqueue(): Total size to be written is %lu and data size is %u\n", total_size, bl_data.length()); + + bufferlist bl; + uint64_t data_size = bl_data.length(); + encode(data_size, bl); + CLS_LOG(1, "INFO: cls_enqueue(): bufferlist length after encoding is %u\n", bl.length()); + bl.claim_append(bl_data); + + if (head.tail >= head.front) { + // check if data can fit in the remaining space in queue + if ((head.tail + total_size) <= head.size) { + CLS_LOG(1, "INFO: cls_enqueue: Writing data size and data: offset: %lu, size: %u\n", head.tail, bl.length()); + head.last_entry_offset = head.tail; + //write data size and data at tail offset + ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail += total_size; + } else { + CLS_LOG(1, "INFO: Wrapping around and checking for free space\n"); + uint64_t free_space_available = (head.size - head.tail) + (head.front - head_size); + //Split data if there is free space available + if (total_size <= free_space_available) { + uint64_t size_before_wrap = head.size - head.tail; + bufferlist bl_data_before_wrap; + bl.splice(0, size_before_wrap, &bl_data_before_wrap); + head.last_entry_offset = head.tail; + //write spliced (data size and data) at tail offset + CLS_LOG(1, "INFO: cls_enqueue: Writing spliced data at offset: %lu and data size: %u\n", head.tail, bl_data_before_wrap.length()); + ret = cls_cxx_write2(hctx, head.tail, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail = head_size; + //write remaining data at tail offset + CLS_LOG(1, "INFO: cls_enqueue: Writing remaining data at offset: %lu and data size: %u\n", head.tail, bl.length()); + ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail = bl.length(); + } + else { + CLS_LOG(1, "ERROR: No space left in queue\n"); + // return queue full error + return -ENOSPC; + } + } + } else if (head.front > head.tail) { + if ((head.tail + total_size) < head.front) { + CLS_LOG(1, "INFO: cls_enqueue: Writing data size and data: offset: %lu, size: %u\n\n", head.tail, bl.length()); + head.last_entry_offset = head.tail; + //write data size and data at tail offset + ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail += total_size; + } else { + CLS_LOG(1, "ERROR: No space left in queue\n"); + // return queue full error + return -ENOSPC; + } + } + + bl_head.clear(); + if (head.tail == head.size) { + head.tail = head_size; + } + CLS_LOG(1, "INFO: cls_enqueue: New tail offset: %lu \n", head.tail); + } //end - for + + head.is_empty = false; + + //Update urgent data if set + if (op.has_urgent_data) { + head.has_urgent_data = true; + head.bl_urgent_data = op.bl_urgent_data; + } + + encode(head, bl_head); + CLS_LOG(1, "INFO: cls_enqueue: Writing head of size: %u \n", bl_head.length()); + ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(1, "INFO: cls_enqueue: Writing head returned error: %d \n", ret); + return ret; + } + + return 0; +} + +static int cls_dequeue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head size + uint64_t head_size = 0; + int ret = get_queue_head_size(hctx, head_size); + if (ret < 0) { + return ret; + } + + CLS_LOG(1, "INFO: cls_dequeue: Reading head of size: %lu\n", head_size); + // read the head + bufferlist bl_head; + ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + return ret; + } + cls_queue_head head; + auto iter = bl_head.cbegin(); + try { + decode(head, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_dequeue: failed to decode entry %s\n", bl_head.c_str()); + return -EINVAL; + } + + if (head.front == head.tail && head.is_empty) { + CLS_LOG(1, "ERROR: Queue is empty\n"); + return -ENOENT; + } + + uint64_t data_size = 0; + bufferlist bl_size; + + if (head.front < head.tail) { + //Read size of data first + ret = cls_cxx_read(hctx, head.front, sizeof(uint64_t), &bl_size); + if (ret < 0) { + return ret; + } + iter = bl_size.cbegin(); + try { + decode(data_size, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_dequeue: failed to decode data size \n"); + return -EINVAL; + } + CLS_LOG(1, "INFO: cls_dequeue: Data size: %lu, front offset: %lu\n", data_size, head.front); + head.front += sizeof(uint64_t); + //Read data based on size obtained above + CLS_LOG(1, "INFO: cls_dequeue: Data is read from from front offset %lu\n", head.front); + ret = cls_cxx_read2(hctx, head.front, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.front += data_size; + } else if (head.front >= head.tail) { + uint64_t actual_data_size = head.size - head.front; + if (actual_data_size < sizeof(uint64_t)) { + //Case 1. Data size has been spliced, first reconstruct data size + CLS_LOG(1, "INFO: cls_dequeue: Spliced data size is read from from front offset %lu\n", head.front); + ret = cls_cxx_read2(hctx, head.front, actual_data_size, &bl_size, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.front = head_size; + uint64_t remainder_data_size = sizeof(uint64_t) - actual_data_size; + bufferlist bl_rem_data_size; + CLS_LOG(1, "INFO: cls_dequeue: Remainder Spliced data size is read from from front offset %lu\n", head.front); + ret = cls_cxx_read(hctx, head.front, remainder_data_size, &bl_rem_data_size); + if (ret < 0) { + return ret; + } + bl_size.claim_append(bl_rem_data_size); + iter = bl_size.cbegin(); + try { + decode(data_size, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_dequeue: failed to decode data size \n"); + return -EINVAL; + } + head.front += remainder_data_size; + CLS_LOG(1, "INFO: cls_dequeue: Data is read from from front offset %lu\n", head.front); + ret = cls_cxx_read2(hctx, head.front, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.front += data_size; + } else { + ret = cls_cxx_read(hctx, head.front, sizeof(uint64_t), &bl_size); + if (ret < 0) { + return ret; + } + iter = bl_size.cbegin(); + try { + decode(data_size, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_dequeue: failed to decode data size \n"); + return -EINVAL; + } + CLS_LOG(1, "INFO: cls_dequeue: Data size: %lu, front offset: %lu\n", data_size, head.front); + head.front += sizeof(uint64_t); + + actual_data_size = head.size - head.front; + + if (actual_data_size < data_size) { + if (actual_data_size != 0) { + //Case 2. Data has been spliced + CLS_LOG(1, "INFO: cls_dequeue: Spliced data is read from from front offset %lu\n", head.front); + ret = cls_cxx_read2(hctx, head.front, actual_data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + } + head.front = head_size; + bufferlist bl_remainder; + uint64_t remainder_size = data_size - actual_data_size; + CLS_LOG(1, "INFO: cls_dequeue: Remaining Data is read from from front offset %lu\n", head.front); + ret = cls_cxx_read2(hctx, head.front, remainder_size, &bl_remainder, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + out->claim_append(bl_remainder); + head.front += remainder_size; + } else { + //Case 3. No splicing + CLS_LOG(1, "INFO: cls_dequeue: Data is read from from front offset %lu\n", head.front); + ret = cls_cxx_read2(hctx, head.front, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.front += data_size; + } + } + } + + //front has reached the end, wrap it around + if (head.front == head.size) { + head.front = head_size; + } + + if (head.front == head.tail) { + head.is_empty = true; + } + //Write head back + bl_head.clear(); + encode(head, bl_head); + CLS_LOG(1, "INFO: cls_enqueue: Writing head of size: %u and front offset is: %lu\n", bl_head.length(), head.front); + ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(1, "INFO: cls_enqueue: Writing head returned error: %d \n", ret); + return ret; + } + + return 0; +} + +static int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head size + uint64_t head_size = 0; + int ret = get_queue_head_size(hctx, head_size); + if (ret < 0) { + return ret; + } + + CLS_LOG(1, "INFO: cls_queue_list_entries: Reading head at offset %lu\n", head_size); + uint64_t start_offset = 0; + // read the head + bufferlist bl_head; + ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + return ret; + } + cls_queue_head head; + auto iter = bl_head.cbegin(); + try { + decode(head, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_list_entries: failed to decode entry %s\n", bl_head.c_str()); + return -EINVAL; + } + + // If queue is empty, return from here + if (head.is_empty) { + return -ENOENT; + } + + cls_queue_list_ret op_ret; + CLS_LOG(1, "INFO: cls_queue_list_entries: Is urgent data present: %d\n", head.has_urgent_data); + //Info related to urgent data + op_ret.has_urgent_data = head.has_urgent_data; + op_ret.bl_urgent_data = head.bl_urgent_data; + if ((head.front == head.tail) && head.is_empty) { + op_ret.is_truncated = false; + encode(op_ret, *out); + return 0; + } + + auto in_iter = in->cbegin(); + + cls_queue_list_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_list_entries(): failed to decode input data\n"); + return -EINVAL; + } + if (op.start_offset == 0) { + start_offset = head.front; + } else { + start_offset = op.start_offset; + } + + op_ret.is_truncated = true; + uint64_t chunk_size = 1024; + uint64_t contiguous_data_size = 0, size_to_read = 0; + bool wrap_around = false; + + //Calculate length of contiguous data to be read depending on front, tail and start offset + if (head.tail > head.front) { + contiguous_data_size = head.tail - start_offset; + } else if (head.front >= head.tail) { + if (start_offset >= head.front) { + contiguous_data_size = head.size - start_offset; + wrap_around = true; + } else if (start_offset <= head.tail) { + contiguous_data_size = head.tail - start_offset; + } + } + + uint64_t num_ops = 0; + bufferlist bl; + do + { + CLS_LOG(1, "INFO: cls_queue_list_entries(): front is: %lu, tail is %lu, and start_offset is %lu\n", head.front, head.tail, start_offset); + + bufferlist bl_chunk; + //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size + if (contiguous_data_size > chunk_size) { + size_to_read = chunk_size; + } else { + size_to_read = contiguous_data_size; + } + CLS_LOG(1, "INFO: cls_queue_list_entries(): size_to_read is %lu\n", size_to_read); + if (size_to_read == 0) { + op_ret.is_truncated = false; + CLS_LOG(1, "INFO: cls_queue_list_entries(): size_to_read is 0, hence breaking out!\n"); + break; + } + + ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk); + if (ret < 0) { + return ret; + } + + //If there is leftover data from previous iteration, append new data to leftover data + bl.claim_append(bl_chunk); + bl_chunk = bl; + bl.clear(); + + CLS_LOG(1, "INFO: cls_queue_list_entries(): size of chunk %u\n", bl_chunk.length()); + + //Process the chunk of data read + unsigned index = 0; + auto it = bl_chunk.cbegin(); + uint64_t size_to_process = bl_chunk.length(); + do { + CLS_LOG(1, "INFO: cls_queue_list_entries(): index: %u, size_to_process: %lu\n", index, size_to_process); + it.seek(index); + uint64_t data_size = 0; + if (size_to_process >= sizeof(uint64_t)) { + try { + decode(data_size, it); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_list_entries: failed to decode data size \n"); + return -EINVAL; + } + } else { + // Copy unprocessed data to bl + bl_chunk.copy(index, size_to_process, bl); + CLS_LOG(1, "INFO: cls_queue_list_entries: not enough data to read data size, breaking out!\n"); + break; + } + CLS_LOG(1, "INFO: cls_queue_list_entries(): data size: %lu\n", data_size); + index += sizeof(uint64_t); + size_to_process -= sizeof(uint64_t); + bufferlist bl_data; + if (data_size <= size_to_process) { + bl_chunk.copy(index, data_size, bl_data); + //Return data and offset here + op_ret.data.emplace_back(bl_data); + uint64_t data_offset = start_offset + (index - sizeof(uint64_t)); + op_ret.offsets.emplace_back(data_offset); + CLS_LOG(1, "INFO: cls_queue_list_entries(): offset: %lu\n", data_offset); + index += bl_data.length(); + size_to_process -= bl_data.length(); + } else { + index -= sizeof(uint64_t); + size_to_process += sizeof(uint64_t); + bl_chunk.copy(index, size_to_process, bl); + CLS_LOG(1, "INFO: cls_queue_list_entries(): not enough data to read data, breaking out!\n"); + break; + } + num_ops++; + if (num_ops == op.max) { + CLS_LOG(1, "INFO: cls_queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n"); + break; + } + if (index == bl_chunk.length()) { + break; + } + } while(index < bl_chunk.length()); + + CLS_LOG(1, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max); + + if (num_ops == op.max) { + op_ret.next_offset = start_offset + index; + CLS_LOG(1, "INFO: cls_queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", op_ret.next_offset); + break; + } + + //Calculate new start_offset and contiguous data size + start_offset += size_to_read; + contiguous_data_size -= size_to_read; + if (contiguous_data_size == 0) { + if (wrap_around) { + start_offset = head_size; + contiguous_data_size = head.tail - head_size; + wrap_around = false; + } else { + CLS_LOG(1, "INFO: cls_queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n"); + op_ret.next_offset = head.front; + op_ret.is_truncated = false; + break; + } + } + + } while(num_ops < op.max); + + //Wrap around next offset if it has reached end of queue + if (op_ret.next_offset == head.size) { + op_ret.next_offset = head_size; + } + if (op_ret.next_offset == head.tail) { + op_ret.is_truncated = false; + } + + encode(op_ret, *out); + + return 0; +} + +static int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head size + uint64_t head_size = 0; + int ret = get_queue_head_size(hctx, head_size); + if (ret < 0) { + return ret; + } + + // read the head + bufferlist bl_head; + ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + return ret; + } + cls_queue_head head; + auto iter = bl_head.cbegin(); + try { + decode(head, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode entry %s\n", bl_head.c_str()); + return -EINVAL; + } + + if ((head.front == head.tail) && head.is_empty) { + return -ENOENT; + } + + auto in_iter = in->cbegin(); + + cls_queue_remove_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode input data\n"); + return -EINVAL; + } + + if (op.start_offset == op.end_offset) { + return -EINVAL; + } + + // If start offset is not set or set to zero, then we need to shift it to actual front of queue + if (op.start_offset == 0) { + op.start_offset = head_size; + } + + if (op.start_offset != head.front) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: invalid start offset\n"); + return -EINVAL; + } + + // Read the size from the end offset + bufferlist bl_size; + uint64_t data_size = 0; + ret = cls_cxx_read(hctx, op.end_offset, sizeof(uint64_t), &bl_size); + if (ret < 0) { + return ret; + } + iter = bl_size.cbegin(); + try { + decode(data_size, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n"); + return -EINVAL; + } + + //offset obtained by adding last entry's size + uint64_t end_offset = op.end_offset + sizeof(uint64_t) + data_size; + + //Zero out the entries that have been removed, to reclaim storage space + if (end_offset > op.start_offset) { + ret = cls_cxx_write_zero(hctx, op.start_offset, (end_offset - op.start_offset)); + if (ret < 0) { + return ret; + } + } else { //start offset > end offset + ret = cls_cxx_write_zero(hctx, op.start_offset, (head.size - op.start_offset)); + if (ret < 0) { + return ret; + } + ret = cls_cxx_write_zero(hctx, head_size, (end_offset - head_size)); + if (ret < 0) { + return ret; + } + } + + head.front = end_offset; + + // Check if it is the end, then wrap around + if (head.front == head.size) { + head.front = head_size; + } + + CLS_LOG(1, "INFO: cls_queue_remove_entries: front offset is: %lu and tail offset is %lu\n", head.front, head.tail); + + // We've reached the last element + if (head.front == head.tail) { + CLS_LOG(1, "INFO: cls_queue_remove_entries: Queue is empty now!\n"); + head.is_empty = true; + } + + //Update urgent data map + head.bl_urgent_data = op.bl_urgent_data; + head.has_urgent_data = op.has_urgent_data; + + //Write head back + bl_head.clear(); + encode(head, bl_head); + CLS_LOG(1, "INFO: cls_queue_remove_entries: Writing head of size: %u and front offset is: %lu\n", bl_head.length(), head.front); + ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(1, "INFO: cls_queue_remove_entries: Writing head returned error: %d \n", ret); + return ret; + } + + return 0; +} + +static int cls_queue_get_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head size + uint64_t head_size = 0; + int ret = get_queue_head_size(hctx, head_size); + if (ret < 0) { + return ret; + } + + // read the head + bufferlist bl_head; + ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + return ret; + } + cls_queue_head head; + auto iter = bl_head.cbegin(); + try { + decode(head, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_get_last_entry: failed to decode entry %s\n", bl_head.c_str()); + return -EINVAL; + } + + uint64_t data_size = 0, last_entry_offset = head.last_entry_offset; + bufferlist bl_size; + //Read size of data first + ret = cls_cxx_read(hctx, last_entry_offset, sizeof(uint64_t), &bl_size); + if (ret < 0) { + return ret; + } + iter = bl_size.cbegin(); + try { + decode(data_size, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_get_last_entry: failed to decode data size \n"); + return -EINVAL; + } + CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data size: %lu, last data offset: %lu\n", data_size, last_entry_offset); + + //Read data based on size obtained above + last_entry_offset += sizeof(uint64_t); + CLS_LOG(1, "INFO: cls_dequeue: Data is read from from last entry offset %lu\n", last_entry_offset); + ret = cls_cxx_read2(hctx, last_entry_offset, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + return 0; +} + +static int cls_queue_update_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head size + uint64_t head_size = 0; + int ret = get_queue_head_size(hctx, head_size); + if (ret < 0) { + return ret; + } + + // read the head + bufferlist bl_head; + ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + return ret; + } + cls_queue_head head; + auto iter = bl_head.cbegin(); + try { + decode(head, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_update_last_entry: failed to decode entry %s\n", bl_head.c_str()); + return -EINVAL; + } + + auto in_iter = in->cbegin(); + + cls_queue_update_last_entry_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_update_last_entry: failed to decode input data\n"); + return -EINVAL; + } + + bufferlist bl; + uint64_t data_size = op.bl_data.length(); + encode(data_size, bl); + bl.claim_append(op.bl_data); + + CLS_LOG(1, "INFO: cls_queue_update_last_entry_op: Updating data at last offset: %lu and total data size is %u\n", head.last_entry_offset, bl.length()); + + //write data size + data at offset + ret = cls_cxx_write(hctx, head.last_entry_offset, bl.length(), &bl); + if (ret < 0) { + return ret; + } + + if (op.has_urgent_data) { + head.has_urgent_data = true; + head.bl_urgent_data = op.bl_urgent_data; + } + + bl_head.clear(); + encode(head, bl_head); + CLS_LOG(1, "INFO: cls_queue_update_last_entry: Writing head of size: %u \n", bl_head.length()); + ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(1, "INFO: cls_queue_update_last_entry: Writing head returned error: %d \n", ret); + return ret; + } + return 0; +} + +static int cls_queue_read_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head size + uint64_t head_size = 0; + int ret = get_queue_head_size(hctx, head_size); + if (ret < 0) { + return ret; + } + + // read the head + bufferlist bl_head; + ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + return ret; + } + cls_queue_head head; + auto iter = bl_head.cbegin(); + try { + decode(head, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_read_urgent_data: failed to decode entry %s\n", bl_head.c_str()); + return -EINVAL; + } + + CLS_LOG(1, "INFO: cls_queue_read_urgent_data: tail offset %lu\n", head.tail); + + cls_queue_urgent_data_ret op_ret; + if(head.has_urgent_data) { + op_ret.has_urgent_data = true; + op_ret.bl_urgent_data = head.bl_urgent_data; + } + + encode(op_ret, *out); + + return 0; +} + +static int cls_queue_write_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head size + uint64_t head_size = 0; + int ret = get_queue_head_size(hctx, head_size); + if (ret < 0) { + return ret; + } + + // read the head + bufferlist bl_head; + ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + return ret; + } + cls_queue_head head; + auto iter = bl_head.cbegin(); + try { + decode(head, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_write_urgent_data: failed to decode entry %s\n", bl_head.c_str()); + return -EINVAL; + } + + CLS_LOG(1, "INFO: cls_queue_write_urgent_data: tail offset %lu\n", head.tail); + + auto in_iter = in->cbegin(); + + cls_queue_write_urgent_data_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_write_urgent_data: failed to decode input data\n"); + return -EINVAL; + } + //Write urgent data + head.has_urgent_data = op.has_urgent_data; + head.bl_urgent_data = op.bl_urgent_data; + + //Write head back + bl_head.clear(); + encode(head, bl_head); + CLS_LOG(1, "INFO: cls_queue_write_urgent_data: Writing head of size: %u\n", bl_head.length()); + ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(1, "INFO: cls_queue_write_urgent_data: Writing head returned error: %d \n", ret); + return ret; + } + + return 0; +} + +static int cls_queue_can_urgent_data_fit(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bool can_fit = true; + + //get head size + uint64_t head_size = 0; + int ret = get_queue_head_size(hctx, head_size); + if (ret < 0) { + return ret; + } + + // read the head + bufferlist bl_head; + ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + return ret; + } + cls_queue_head head; + auto iter = bl_head.cbegin(); + try { + decode(head, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_write_urgent_data: failed to decode entry %s\n", bl_head.c_str()); + return -EINVAL; + } + + head.has_urgent_data = true; + head.bl_urgent_data = *in; + + bl_head.clear(); + encode(head, bl_head); + + if(bl_head.length() > head_size) { + can_fit = false; + } + + encode(can_fit, *out); + + return 0; +} + +static int cls_gc_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_gc_create_queue_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_create_queue: failed to decode entry\n"); + return -EINVAL; + } + + cls_create_queue_op create_op; + + if (op.num_urgent_data_entries > 0) { + std::unordered_map urgent_data_map; + urgent_data_map.reserve(op.num_urgent_data_entries); + encode(urgent_data_map, create_op.head.bl_urgent_data); + } + + CLS_LOG(10, "INFO: cls_gc_create_queue: queue size is %lu\n", op.size); + create_op.head.size = op.size; + create_op.head.num_urgent_data_entries = op.num_urgent_data_entries; + create_op.head_size = g_ceph_context->_conf->rgw_gc_queue_head_size; + + in->clear(); + encode(create_op, *in); + + return cls_create_queue(hctx, in, out); +} + +static int cls_gc_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_rgw_gc_set_entry_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_enqueue: failed to decode entry\n"); + return -EINVAL; + } + + op.info.time = ceph::real_clock::now(); + op.info.time += make_timespan(op.expiration_secs); + + cls_enqueue_op enqueue_op; + bufferlist bl_data; + encode(op.info, bl_data); + enqueue_op.bl_data_vec.emplace_back(bl_data); + enqueue_op.has_urgent_data = false; + + CLS_LOG(1, "INFO: cls_gc_enqueue: Data size is: %u \n", bl_data.length()); + + in->clear(); + encode(enqueue_op, *in); + + return cls_enqueue(hctx, in, out); +} + +static int cls_gc_dequeue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + int r = cls_dequeue(hctx, in, out); + if (r < 0) + return r; + + cls_rgw_gc_obj_info data; + auto iter = out->cbegin(); + try { + decode(data, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_dequeue(): failed to decode entry\n"); + return -EINVAL; + } + + CLS_LOG(1, "INFO: tag of gc info is %s\n", data.tag.c_str()); + + return 0; +} + +static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + CLS_LOG(1, "INFO: cls_gc_queue_list(): Entered cls_gc_queue_list \n"); + auto in_iter = in->cbegin(); + + cls_rgw_gc_list_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode input\n"); + return -EINVAL; + } + + cls_queue_list_op list_op; + if (op.marker.empty()) { + list_op.start_offset = 0; + } else { + list_op.start_offset = boost::lexical_cast(op.marker.c_str()); + } + + if (! op.max) { + op.max = GC_LIST_DEFAULT_MAX; + } + + list_op.max = op.max; + + cls_queue_list_ret op_ret; + cls_rgw_gc_list_ret list_ret; + uint32_t num_entries = 0; + bool urgent_data_decoded = false; + std::unordered_map urgent_data_map; + do { + in->clear(); + encode(list_op, *in); + + CLS_LOG(1, "INFO: cls_gc_queue_list(): Entering cls_queue_list_entries \n"); + int ret = cls_queue_list_entries(hctx, in, out); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_queue_list_entries(): returned error %d\n", ret); + return ret; + } + + auto iter = out->cbegin(); + try { + decode(op_ret, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode output\n"); + return -EINVAL; + } + + if (op_ret.has_urgent_data && ! urgent_data_decoded) { + auto iter_urgent_data = op_ret.bl_urgent_data.cbegin(); + decode(urgent_data_map, iter_urgent_data); + urgent_data_decoded = true; + } + + if (op_ret.data.size()) { + for (auto it : op_ret.data) { + cls_rgw_gc_obj_info info; + try { + decode(info, it); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode gc info\n"); + return -EINVAL; + } + //Check for info tag in urgent data map + if (urgent_data_map.size() > 0) { + auto found = urgent_data_map.find(info.tag); + if (found != urgent_data_map.end()) { + if (found->second > info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in urgent data: %s\n", info.tag.c_str()); + continue; + } + } + } else { + //Search in xattrs + bufferlist bl_xattrs; + int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs); + if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret); + return ret; + } + if (ret != -ENOENT && ret != -ENODATA) { + std::unordered_map xattr_urgent_data_map; + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_urgent_data_map, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode xattrs urgent data map\n"); + return -EINVAL; + } //end - catch + if (xattr_urgent_data_map.size() > 0) { + auto found = xattr_urgent_data_map.find(info.tag); + if (found != xattr_urgent_data_map.end()) { + if (found->second > info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in xattrs urgent data map: %s\n", info.tag.c_str()); + continue; + } + } + } // end - if xattrs size ... + } // end - ret != ENOENT && ENODATA + } + if (op.expired_only) { + real_time now = ceph::real_clock::now(); + if (info.time <= now) { + list_ret.entries.emplace_back(info); + } + } else { + list_ret.entries.emplace_back(info); + } + num_entries++; + } + CLS_LOG(1, "INFO: cls_gc_queue_list(): num_entries: %u and op.max: %u\n", num_entries, op.max); + if (num_entries < op.max) { + list_op.max = (op.max - num_entries); + list_op.start_offset = op_ret.next_offset; + out->clear(); + } else { + break; + } + } else { + break; + } + } while(op_ret.is_truncated); + + list_ret.truncated = op_ret.is_truncated; + if (list_ret.truncated) { + list_ret.next_marker = boost::lexical_cast(op_ret.next_offset); + } + out->clear(); + encode(list_ret, *out); + return 0; +} + +static int cls_gc_queue_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entered cls_gc_queue_remove \n"); + + auto in_iter = in->cbegin(); + + cls_rgw_gc_queue_remove_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode input\n"); + return -EINVAL; + } + + // List entries and calculate total number of entries (including invalid entries) + cls_queue_list_op list_op; + if (op.marker.empty()) { + list_op.start_offset = 0; + } else { + list_op.start_offset = boost::lexical_cast(op.marker.c_str()); + } + + if (! op.num_entries) { + op.num_entries = GC_LIST_DEFAULT_MAX; + } + + list_op.max = op.num_entries; + bool is_truncated = true; + uint32_t total_num_entries = 0, num_entries = 0; + std::unordered_map urgent_data_map; + bool urgent_data_decoded = false; + uint64_t end_offset = 0; + do { + in->clear(); + encode(list_op, *in); + + CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering cls_queue_list_entries \n"); + int ret = cls_queue_list_entries(hctx, in, out); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_gc_queue_remove(): returned error %d\n", ret); + return ret; + } + + cls_queue_list_ret op_ret; + auto iter = out->cbegin(); + try { + decode(op_ret, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode output\n"); + return -EINVAL; + } + is_truncated = op_ret.is_truncated; + unsigned int index = 0; + if (op_ret.has_urgent_data && ! urgent_data_decoded) { + auto iter_urgent_data = op_ret.bl_urgent_data.cbegin(); + try { + decode(urgent_data_map, iter_urgent_data); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode urgent data map\n"); + return -EINVAL; + } + urgent_data_decoded = true; + } + // If data is not empty + if (op_ret.data.size()) { + for (auto it : op_ret.data) { + cls_rgw_gc_obj_info info; + try { + decode(info, it); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode gc info\n"); + return -EINVAL; + } + CLS_LOG(1, "INFO: cls_gc_queue_remove(): entry: %s\n", info.tag.c_str()); + total_num_entries++; + index++; + //Search for tag in urgent data map + if (urgent_data_map.size() > 0) { + auto found = urgent_data_map.find(info.tag); + if (found != urgent_data_map.end()) { + if (found->second > info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in urgent data: %s\n", info.tag.c_str()); + continue; + } else if (found->second == info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from urgent data: %s\n", info.tag.c_str()); + urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later + } + }//end-if map end + }//end-if urgent data + else { + //Search in xattrs + bufferlist bl_xattrs; + int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs); + if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret); + return ret; + } + if (ret != -ENOENT && ret != -ENODATA) { + std::unordered_map xattr_urgent_data_map; + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_urgent_data_map, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode xattrs urgent data map\n"); + return -EINVAL; + } //end - catch + if (xattr_urgent_data_map.size() > 0) { + auto found = xattr_urgent_data_map.find(info.tag); + if (found != xattr_urgent_data_map.end()) { + if (found->second > info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in xattrs urgent data map: %s\n", info.tag.c_str()); + continue; + } else if (found->second == info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from xattrs urgent data: %s\n", info.tag.c_str()); + xattr_urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later + } + } + } // end - if xattrs size ... + if (xattr_urgent_data_map.size() == 0) { + //remove from xattrs ??? + } + } // end - ret != ENOENT && ENODATA + }// search in xattrs + num_entries++; + }//end-for + + if (num_entries < op.num_entries) { + list_op.max = (op.num_entries - num_entries); + list_op.start_offset = op_ret.next_offset; + out->clear(); + } else { + end_offset = op_ret.offsets[index - 1]; + CLS_LOG(1, "INFO: cls_gc_queue_remove(): index is %u and end_offset is: %lu\n", index, end_offset); + break; + } + } //end-if + else { + break; + } + } while(is_truncated); + CLS_LOG(1, "INFO: cls_gc_queue_remove(): Total number of entries to remove: %d\n", total_num_entries); + + cls_queue_remove_op rem_op; + if (op.marker.empty()) { + rem_op.start_offset = 0; + } else { + rem_op.start_offset = boost::lexical_cast(op.marker.c_str()); + } + + rem_op.end_offset = end_offset; + CLS_LOG(1, "INFO: cls_gc_queue_remove(): start offset: %lu and end offset: %lu\n", rem_op.start_offset, rem_op.end_offset); + if(urgent_data_map.size() == 0) { + rem_op.has_urgent_data = false; + } + encode(urgent_data_map, rem_op.bl_urgent_data); + + in->clear(); + encode(rem_op, *in); + + CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering cls_queue_remove_entries \n"); + int ret = cls_queue_remove_entries(hctx, in, out); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries(): returned error %d\n", ret); + return ret; + } + + return 0; +} + +static int cls_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + int ret = 0; + auto in_iter = in->cbegin(); + + cls_gc_defer_entry_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode input\n"); + return -EINVAL; + } + + op.info.time = ceph::real_clock::now(); + op.info.time += make_timespan(op.expiration_secs); + + //Read urgent data + in->clear(); + out->clear(); + + ret = cls_queue_read_urgent_data(hctx, in, out); + + auto out_iter = out->cbegin(); + + cls_queue_urgent_data_ret op_ret; + try { + decode(op_ret, out_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_urgent_data_ret(): failed to decode ouput\n"); + return -EINVAL; + } + + auto bl_iter = op_ret.bl_urgent_data.cbegin(); + std::unordered_map urgent_data_map; + if (op_ret.has_urgent_data) { + try { + decode(urgent_data_map, bl_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_urgent_data_ret(): failed to decode urgent data map\n"); + return -EINVAL; + } + } + + bool is_last_entry = false; + in->clear(); + out->clear(); + ret = cls_queue_get_last_entry(hctx, in, out); + if (ret < 0) { + return ret; + } + + cls_rgw_gc_obj_info info; + auto iter = out->cbegin(); + try { + decode(info, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode entry\n"); + return -EINVAL; + } + + CLS_LOG(1, "INFO: tag of gc info is %s\n", info.tag.c_str()); + if (info.tag == op.info.tag) { + is_last_entry = true; + } + + bool has_urgent_data = false; + auto it = urgent_data_map.find(op.info.tag); + if (it != urgent_data_map.end()) { + it->second = op.info.time; + } else { + urgent_data_map.insert({op.info.tag, op.info.time}); + has_urgent_data = true; + } + + out->clear(); + bool can_fit = false; + bufferlist bl_urgent_data; + encode(urgent_data_map, bl_urgent_data); + ret = cls_queue_can_urgent_data_fit(hctx, &bl_urgent_data, out); + if (ret < 0) { + return ret; + } + iter = out->cbegin(); + decode(can_fit, iter); + CLS_LOG(1, "INFO: Can urgent data fit: %d \n", can_fit); + + if (can_fit) { + in->clear(); + if (! is_last_entry) { + cls_enqueue_op enqueue_op; + bufferlist bl_data; + encode(op.info, bl_data); + enqueue_op.bl_data_vec.emplace_back(bl_data); + CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data.length()); + enqueue_op.bl_urgent_data = bl_urgent_data; + enqueue_op.has_urgent_data = has_urgent_data; + encode(enqueue_op, *in); + ret = cls_enqueue(hctx, in, out); + if (ret < 0) { + return ret; + } + } else { + cls_queue_update_last_entry_op update_op; + encode(op.info, update_op.bl_data); + CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", update_op.bl_data.length()); + update_op.bl_urgent_data = bl_urgent_data; + update_op.has_urgent_data = has_urgent_data; + encode(update_op, *in); + ret = cls_queue_update_last_entry(hctx, in, out); + if (ret < 0) { + return ret; + } + } + } + // Else write urgent data as xattrs + else { + std::unordered_map xattr_urgent_data_map; + xattr_urgent_data_map.insert({op.info.tag, op.info.time}); + bufferlist bl_map; + encode(xattr_urgent_data_map, bl_map); + ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map); + CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data"); + if (ret < 0) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret); + return ret; + } + } + return 0; +} + +CLS_INIT(queue) +{ + CLS_LOG(1, "Loaded queue class!"); + + cls_handle_t h_class; + cls_method_handle_t h_create_queue; + cls_method_handle_t h_get_queue_size; + cls_method_handle_t h_enqueue; + cls_method_handle_t h_dequeue; + cls_method_handle_t h_queue_list_entries; + cls_method_handle_t h_queue_remove_entries; + cls_method_handle_t h_queue_update_last_entry; + cls_method_handle_t h_queue_get_last_entry; + cls_method_handle_t h_queue_read_urgent_data; + cls_method_handle_t h_queue_write_urgent_data; + cls_method_handle_t h_queue_can_urgent_data_fit; + + cls_method_handle_t h_gc_create_queue; + cls_method_handle_t h_gc_enqueue; + cls_method_handle_t h_gc_dequeue; + cls_method_handle_t h_gc_queue_list_entries; + cls_method_handle_t h_gc_queue_remove_entries; + cls_method_handle_t h_gc_queue_update_entry; + + cls_register(QUEUE_CLASS, &h_class); + + /* queue*/ + cls_register_cxx_method(h_class, CREATE_QUEUE, CLS_METHOD_WR, cls_create_queue, &h_create_queue); + cls_register_cxx_method(h_class, GET_QUEUE_SIZE, CLS_METHOD_RD, cls_get_queue_size, &h_get_queue_size); + cls_register_cxx_method(h_class, ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_enqueue, &h_enqueue); + cls_register_cxx_method(h_class, DEQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_dequeue, &h_dequeue); + cls_register_cxx_method(h_class, QUEUE_LIST_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_list_entries, &h_queue_list_entries); + cls_register_cxx_method(h_class, QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_remove_entries, &h_queue_remove_entries); + cls_register_cxx_method(h_class, QUEUE_UPDATE_LAST_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_update_last_entry, &h_queue_update_last_entry); + cls_register_cxx_method(h_class, QUEUE_GET_LAST_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_get_last_entry, &h_queue_get_last_entry); + cls_register_cxx_method(h_class, QUEUE_READ_URGENT_DATA, CLS_METHOD_RD, cls_queue_read_urgent_data, &h_queue_read_urgent_data); + cls_register_cxx_method(h_class, QUEUE_WRITE_URGENT_DATA, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_write_urgent_data, &h_queue_write_urgent_data); + cls_register_cxx_method(h_class, QUEUE_CAN_URGENT_DATA_FIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_can_urgent_data_fit, &h_queue_can_urgent_data_fit); + + /* gc */ + cls_register_cxx_method(h_class, GC_CREATE_QUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_create_queue, &h_gc_create_queue); + cls_register_cxx_method(h_class, GC_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_enqueue, &h_gc_enqueue); + cls_register_cxx_method(h_class, GC_DEQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_dequeue, &h_gc_dequeue); + cls_register_cxx_method(h_class, GC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_gc_queue_list, &h_gc_queue_list_entries); + cls_register_cxx_method(h_class, GC_QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_queue_remove, &h_gc_queue_remove_entries); + cls_register_cxx_method(h_class, GC_QUEUE_UPDATE_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_queue_update_entry, &h_gc_queue_update_entry); + + return; +} + diff --git a/src/cls/queue/cls_queue_client.cc b/src/cls/queue/cls_queue_client.cc new file mode 100644 index 00000000000..3fd5933d0fd --- /dev/null +++ b/src/cls/queue/cls_queue_client.cc @@ -0,0 +1,118 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include + +#include "cls/rgw/cls_rgw_ops.h" +#include "cls/queue/cls_queue_const.h" +#include "cls/queue/cls_queue_client.h" + +#include "common/debug.h" + +using namespace librados; + +void cls_rgw_gc_create_queue(ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries) +{ + bufferlist in; + cls_gc_create_queue_op call; + call.size = size; + call.num_urgent_data_entries = num_urgent_data_entries; + encode(call, in); + op.exec(QUEUE_CLASS, GC_CREATE_QUEUE, in); +} + +int cls_rgw_gc_get_queue_size(IoCtx& io_ctx, string& oid, uint64_t& size) +{ + bufferlist in, out; + int r = io_ctx.exec(oid, QUEUE_CLASS, GET_QUEUE_SIZE, in, out); + if (r < 0) + return r; + + auto iter = out.cbegin(); + try { + decode(size, iter); + } catch (buffer::error& err) { + return -EIO; + } + + return 0; +} + +void cls_rgw_gc_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info) +{ + bufferlist in; + cls_rgw_gc_set_entry_op call; + call.expiration_secs = expiration_secs; + call.info = info; + encode(call, in); + op.exec(QUEUE_CLASS, GC_ENQUEUE, in); +} + +int cls_rgw_gc_dequeue(IoCtx& io_ctx, string& oid, cls_rgw_gc_obj_info& info) +{ + bufferlist in, out; + + int r = io_ctx.exec(oid, QUEUE_CLASS, GC_DEQUEUE, in, out); + if (r < 0) + return r; + + auto iter = out.cbegin(); + try { + decode(info, iter); + } catch (buffer::error& err) { + return -EIO; + } + + return 0; +} + +int cls_rgw_gc_list_queue(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only, + list& entries, bool *truncated, string& next_marker) +{ + bufferlist in, out; + cls_rgw_gc_list_op op; + op.marker = marker; + op.max = max; + op.expired_only = expired_only; + encode(op, in); + + int r = io_ctx.exec(oid, QUEUE_CLASS, GC_QUEUE_LIST_ENTRIES, in, out); + if (r < 0) + return r; + + cls_rgw_gc_list_ret ret; + auto iter = out.cbegin(); + try { + decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + entries.swap(ret.entries); + + if (truncated) + *truncated = ret.truncated; + + next_marker = std::move(ret.next_marker); + + return 0; +} + +void cls_rgw_gc_remove_queue(ObjectWriteOperation& op, string& marker, uint32_t num_entries) +{ + bufferlist in, out; + cls_rgw_gc_queue_remove_op rem_op; + rem_op.marker = marker; + rem_op.num_entries = num_entries; + encode(rem_op, in); + op.exec(QUEUE_CLASS, GC_QUEUE_REMOVE_ENTRIES, in); +} + +void cls_rgw_gc_defer_entry_queue(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info) +{ + bufferlist in; + cls_gc_defer_entry_op defer_op; + defer_op.expiration_secs = expiration_secs; + defer_op.info = info; + encode(defer_op, in); + op.exec(QUEUE_CLASS, GC_QUEUE_UPDATE_ENTRY, in); +} diff --git a/src/cls/queue/cls_queue_client.h b/src/cls/queue/cls_queue_client.h new file mode 100644 index 00000000000..f509aabbed9 --- /dev/null +++ b/src/cls/queue/cls_queue_client.h @@ -0,0 +1,22 @@ +#ifndef CEPH_CLS_QUEUE_CLIENT_H +#define CEPH_CLS_QUEUE_CLIENT_H + +#include "include/str_list.h" +#include "include/rados/librados.hpp" +#include "cls/queue/cls_queue_types.h" +#include "cls_queue_ops.h" +#include "common/RefCountedObj.h" +#include "include/compat.h" +#include "common/ceph_time.h" +#include "common/Cond.h" + +void cls_rgw_gc_create_queue(librados::ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries); +int cls_rgw_gc_get_queue_size(librados::IoCtx& io_ctx, string& oid, uint64_t& size); +void cls_rgw_gc_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info); +int cls_rgw_gc_dequeue(librados::IoCtx& io_ctx, string& oid, cls_rgw_gc_obj_info& info); +int cls_rgw_gc_list_queue(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only, + list& entries, bool *truncated, string& next_marker); +void cls_rgw_gc_remove_queue(librados::ObjectWriteOperation& op, string& marker, uint32_t num_entries); +void cls_rgw_gc_defer_entry_queue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info); + +#endif \ No newline at end of file diff --git a/src/cls/queue/cls_queue_const.h b/src/cls/queue/cls_queue_const.h new file mode 100644 index 00000000000..a9fac61ebe0 --- /dev/null +++ b/src/cls/queue/cls_queue_const.h @@ -0,0 +1,25 @@ +#ifndef CEPH_CLS_QUEUE_CONSTS_H +#define CEPH_CLS_QUEUE_CONSTS_H + +#define QUEUE_CLASS "queue" + +#define CREATE_QUEUE "create_queue" +#define GET_QUEUE_SIZE "get_queue_size" +#define ENQUEUE "enqueue" +#define DEQUEUE "dequeue" +#define QUEUE_LIST_ENTRIES "queue_list_entries" +#define QUEUE_REMOVE_ENTRIES "queue_remove_entries" +#define QUEUE_UPDATE_LAST_ENTRY "queue_update_last_entry" +#define QUEUE_GET_LAST_ENTRY "queue_get_last_entry" +#define QUEUE_READ_URGENT_DATA "queue_read_urgent_data" +#define QUEUE_WRITE_URGENT_DATA "queue_write_urgent_data" +#define QUEUE_CAN_URGENT_DATA_FIT "queue_can_urgent_data_fit" + +#define GC_CREATE_QUEUE "gc_create_queue" +#define GC_ENQUEUE "gc_enqueue" +#define GC_DEQUEUE "gc_dequeue" +#define GC_QUEUE_LIST_ENTRIES "gc_queue_list_entries" +#define GC_QUEUE_REMOVE_ENTRIES "gc_queue_remove_entries" +#define GC_QUEUE_UPDATE_ENTRY "gc_queue_update_entry" + +#endif \ No newline at end of file diff --git a/src/cls/queue/cls_queue_ops.cc b/src/cls/queue/cls_queue_ops.cc new file mode 100644 index 00000000000..52792c9b70d --- /dev/null +++ b/src/cls/queue/cls_queue_ops.cc @@ -0,0 +1,18 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include "cls/queue/cls_queue_ops.h" + +#include "common/Formatter.h" +#include "common/ceph_json.h" +#include "include/utime.h" + +void cls_create_queue_op::dump(Formatter *f) const +{ + head.dump(f); +} + +void cls_create_queue_op::generate_test_instances(list& ls) +{ + ls.push_back(new cls_create_queue_op); + ls.push_back(new cls_create_queue_op); +} \ No newline at end of file diff --git a/src/cls/queue/cls_queue_ops.h b/src/cls/queue/cls_queue_ops.h new file mode 100644 index 00000000000..6e0d1f17399 --- /dev/null +++ b/src/cls/queue/cls_queue_ops.h @@ -0,0 +1,283 @@ +#ifndef CEPH_CLS_QUEUE_OPS_H +#define CEPH_CLS_QUEUE_OPS_H + +#include "cls/rgw/cls_rgw_types.h" +#include "cls/queue/cls_queue_types.h" + +struct cls_gc_create_queue_op { + uint64_t size; + uint64_t num_urgent_data_entries; + + cls_gc_create_queue_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(size, bl); + encode(num_urgent_data_entries, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(size, bl); + decode(num_urgent_data_entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_gc_create_queue_op) + +struct cls_create_queue_op { + cls_queue_head head; + uint64_t head_size{0}; + + cls_create_queue_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(head, bl); + encode(head_size, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(head, bl); + decode(head_size, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + static void generate_test_instances(list& ls); +}; +WRITE_CLASS_ENCODER(cls_create_queue_op) + +struct cls_enqueue_op { + vector bl_data_vec; + bool has_urgent_data{false}; + bufferlist bl_urgent_data; + + cls_enqueue_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(bl_data_vec, bl); + encode(has_urgent_data, bl); + encode(bl_urgent_data, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(bl_data_vec, bl); + decode(has_urgent_data, bl); + decode(bl_urgent_data, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_enqueue_op) + +struct cls_dequeue_op { + bufferlist bl; + + cls_dequeue_op() {} + + static void generate_test_instances(list& ls); +}; + +struct cls_queue_list_op { + uint64_t max; + uint64_t start_offset; + + cls_queue_list_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(max, bl); + encode(start_offset, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(max, bl); + decode(start_offset, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_list_op) + +struct cls_queue_list_ret { + bool is_truncated; + uint64_t next_offset; + vector data; + vector offsets; + bool has_urgent_data; + bufferlist bl_urgent_data; + + cls_queue_list_ret() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(is_truncated, bl); + encode(next_offset, bl); + encode(data, bl); + encode(offsets, bl); + encode(has_urgent_data, bl); + encode(bl_urgent_data, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(is_truncated, bl); + decode(next_offset, bl); + decode(data, bl); + decode(offsets, bl); + decode(has_urgent_data, bl); + decode(bl_urgent_data, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_list_ret) + +struct cls_queue_remove_op { + uint64_t start_offset; + uint64_t end_offset; + bool has_urgent_data; + bufferlist bl_urgent_data; + + cls_queue_remove_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(start_offset, bl); + encode(end_offset, bl); + encode(has_urgent_data, bl); + encode(bl_urgent_data, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(start_offset, bl); + decode(end_offset, bl); + decode(has_urgent_data, bl); + decode(bl_urgent_data, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_remove_op) + +struct cls_queue_urgent_data_ret { + bool has_urgent_data{false}; + bufferlist bl_urgent_data; + + cls_queue_urgent_data_ret() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(has_urgent_data, bl); + encode(bl_urgent_data, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(has_urgent_data, bl); + decode(bl_urgent_data, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_urgent_data_ret) + +struct cls_queue_write_urgent_data_op { + bool has_urgent_data{false}; + bufferlist bl_urgent_data; + + cls_queue_write_urgent_data_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(has_urgent_data, bl); + encode(bl_urgent_data, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(has_urgent_data, bl); + decode(bl_urgent_data, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_write_urgent_data_op) + +struct cls_queue_update_last_entry_op { + bufferlist bl_data; + bool has_urgent_data{false}; + bufferlist bl_urgent_data; + + cls_queue_update_last_entry_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(bl_data, bl); + encode(has_urgent_data, bl); + encode(bl_urgent_data, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(bl_data, bl); + decode(has_urgent_data, bl); + decode(bl_urgent_data, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_update_last_entry_op) + +struct cls_rgw_gc_queue_remove_op { + uint64_t num_entries; + string marker; + + cls_rgw_gc_queue_remove_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(num_entries, bl); + encode(marker, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(num_entries, bl); + decode(marker, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_rgw_gc_queue_remove_op) + +struct cls_gc_defer_entry_op { + uint32_t expiration_secs; + cls_rgw_gc_obj_info info; + cls_gc_defer_entry_op() : expiration_secs(0) {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(expiration_secs, bl); + encode(info, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(expiration_secs, bl); + decode(info, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_gc_defer_entry_op) + +#endif /* CEPH_CLS_QUEUE_OPS_H */ \ No newline at end of file diff --git a/src/cls/queue/cls_queue_types.cc b/src/cls/queue/cls_queue_types.cc new file mode 100644 index 00000000000..6c4bac725b4 --- /dev/null +++ b/src/cls/queue/cls_queue_types.cc @@ -0,0 +1,22 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "cls/queue/cls_queue_types.h" +#include "common/ceph_json.h" +#include "include/utime.h" + +void cls_queue_head::dump(Formatter *f) const +{ + f->dump_bool("is_empty", is_empty); + f->dump_unsigned("front", front); + f->dump_unsigned("tail", tail); + f->dump_unsigned("size", size); + f->dump_unsigned("has_urgent_data", front); + f->dump_unsigned("bl_urgent_data", size); +} + +void cls_queue_head::generate_test_instances(list& ls) +{ + ls.push_back(new cls_queue_head); + ls.push_back(new cls_queue_head); +} diff --git a/src/cls/queue/cls_queue_types.h b/src/cls/queue/cls_queue_types.h new file mode 100644 index 00000000000..dd09b0acebd --- /dev/null +++ b/src/cls/queue/cls_queue_types.h @@ -0,0 +1,65 @@ +#ifndef CEPH_CLS_QUEUE_TYPES_H +#define CEPH_CLS_QUEUE_TYPES_H + +#include "common/ceph_time.h" +#include "common/Formatter.h" + +#include "rgw/rgw_basic_types.h" + +#define QUEUE_HEAD_SIZE_1K 1024 +//Actual start offset of queue data +#define QUEUE_START_OFFSET_1K QUEUE_HEAD_SIZE_1K + +#define QUEUE_HEAD_SIZE_4K (4 * 1024) +//Actual start offset of queue data +#define QUEUE_START_OFFSET_4K QUEUE_HEAD_SIZE_4K + +struct cls_queue_head +{ + uint64_t front = QUEUE_START_OFFSET_1K; + uint64_t tail = QUEUE_START_OFFSET_1K; + uint64_t size{0}; // size of queue requested by user, with head size added to it + uint64_t last_entry_offset = QUEUE_START_OFFSET_1K; + uint32_t num_urgent_data_entries{0}; // requested by user + uint32_t num_head_urgent_entries{0}; // actual number of entries in head + uint32_t num_xattr_urgent_entries{0}; // actual number of entries in xattr in case of spill over + bool is_empty{true}; + bool has_urgent_data{false}; + bufferlist bl_urgent_data; // special data known to application using queue + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(front, bl); + encode(tail, bl); + encode(size, bl); + encode(last_entry_offset, bl); + encode(num_urgent_data_entries, bl); + encode(num_head_urgent_entries, bl); + encode(num_xattr_urgent_entries, bl); + encode(is_empty, bl); + encode(has_urgent_data, bl); + encode(bl_urgent_data, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(front, bl); + decode(tail, bl); + decode(size, bl); + decode(last_entry_offset, bl); + decode(num_urgent_data_entries, bl); + decode(num_head_urgent_entries, bl); + decode(num_xattr_urgent_entries, bl); + decode(is_empty, bl); + decode(has_urgent_data, bl); + decode(bl_urgent_data, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + static void generate_test_instances(list& o); +}; +WRITE_CLASS_ENCODER(cls_queue_head) + +#endif \ No newline at end of file diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 46949f46ea7..f8100a36ac1 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -13,6 +13,7 @@ #include "common/escape.h" #include "include/compat.h" +#include CLS_VER(1,0) CLS_NAME(rgw) @@ -3944,7 +3945,6 @@ CLS_INIT(rgw) cls_method_handle_t h_rgw_guard_bucket_resharding; cls_method_handle_t h_rgw_get_bucket_resharding; - cls_register(RGW_CLASS, &h_class); /* bucket index */ diff --git a/src/cls/rgw/cls_rgw_const.h b/src/cls/rgw/cls_rgw_const.h index fc3537ea880..5957d2ffbfd 100644 --- a/src/cls/rgw/cls_rgw_const.h +++ b/src/cls/rgw/cls_rgw_const.h @@ -72,5 +72,4 @@ #define RGW_GUARD_BUCKET_RESHARDING "guard_bucket_resharding" #define RGW_GET_BUCKET_RESHARDING "get_bucket_resharding" - #endif diff --git a/src/cls/rgw/cls_rgw_ops.cc b/src/cls/rgw/cls_rgw_ops.cc index 713cee413f8..afca9a28169 100644 --- a/src/cls/rgw/cls_rgw_ops.cc +++ b/src/cls/rgw/cls_rgw_ops.cc @@ -541,8 +541,3 @@ void cls_rgw_get_bucket_resharding_op::generate_test_instances( void cls_rgw_get_bucket_resharding_op::dump(Formatter *f) const { } - - - - - diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index cdeb1301f61..2b2907ba6fa 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -1173,4 +1173,4 @@ struct cls_rgw_reshard_entry }; WRITE_CLASS_ENCODER(cls_rgw_reshard_entry) -#endif +#endif \ No newline at end of file diff --git a/src/common/legacy_config_opts.h b/src/common/legacy_config_opts.h index 35d4895c3b5..6f9414bc1b6 100644 --- a/src/common/legacy_config_opts.h +++ b/src/common/legacy_config_opts.h @@ -1564,3 +1564,4 @@ OPTION(rgw_sts_token_introspection_url, OPT_STR) // url for introspecting web t OPTION(rgw_sts_client_id, OPT_STR) // Client Id OPTION(rgw_sts_client_secret, OPT_STR) // Client Secret OPTION(debug_allow_any_pool_priority, OPT_BOOL) +OPTION(rgw_gc_queue_head_size, OPT_U32) // GC queue head size diff --git a/src/common/options.cc b/src/common/options.cc index 77d04ed6231..70a01b7c033 100644 --- a/src/common/options.cc +++ b/src/common/options.cc @@ -6462,6 +6462,10 @@ std::vector