From: Pritha Srivastava Date: Tue, 25 Jun 2019 10:34:06 +0000 (+0530) Subject: Moving out gc related queue code to another module(cls_rgw_queue). X-Git-Tag: v15.1.0~1185^2~16 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=d1c70c64b270778ce74335b98e905625d46e27ad;p=ceph-ci.git Moving out gc related queue code to another module(cls_rgw_queue). Signed-off-by: Pritha Srivastava --- diff --git a/src/cls/CMakeLists.txt b/src/cls/CMakeLists.txt index 8a6703a7dd9..add5ec0f7bb 100644 --- a/src/cls/CMakeLists.txt +++ b/src/cls/CMakeLists.txt @@ -265,20 +265,36 @@ set(cls_cas_client_srcs cas/cls_cas_client.cc) add_library(cls_cas_client STATIC ${cls_cas_client_srcs}) -# cls_queue +#cls_queue +set(cls_queue_srcs + queue/cls_queue.cc + queue/cls_queue_src.cc + queue/cls_queue_ops.cc + queue/cls_queue_types.cc + ${CMAKE_SOURCE_DIR}/src/common/ceph_json.cc) +add_library(cls_queue SHARED ${cls_queue_srcs}) +set_target_properties(cls_queue PROPERTIES + VERSION "1.0.0" + SOVERSION "1" + INSTALL_RPATH "" + CXX_VISIBILITY_PRESET hidden) +install(TARGETS cls_queue DESTINATION ${cls_dir}) + +# cls_rgw_queue if (WITH_RADOSGW) - set(cls_queue_srcs - queue/cls_queue.cc + set(cls_rgw_queue_srcs + queue/cls_rgw_queue.cc + queue/cls_queue_src.cc queue/cls_queue_ops.cc queue/cls_queue_types.cc ${CMAKE_SOURCE_DIR}/src/common/ceph_json.cc) - add_library(cls_queue SHARED ${cls_queue_srcs}) - set_target_properties(cls_queue PROPERTIES + add_library(cls_rgw_queue SHARED ${cls_rgw_queue_srcs}) + set_target_properties(cls_rgw_queue PROPERTIES VERSION "1.0.0" SOVERSION "1" INSTALL_RPATH "" CXX_VISIBILITY_PRESET hidden) - install(TARGETS cls_queue DESTINATION ${cls_dir}) + install(TARGETS cls_rgw_queue DESTINATION ${cls_dir}) set(cls_queue_client_srcs queue/cls_queue_client.cc diff --git a/src/cls/queue/cls_queue.cc b/src/cls/queue/cls_queue.cc index 40427c1b4ee..32873b89aaa 100644 --- a/src/cls/queue/cls_queue.cc +++ b/src/cls/queue/cls_queue.cc @@ -6,1520 +6,21 @@ #include #include "objclass/objclass.h" -#include "cls/rgw/cls_rgw_ops.h" -#include "cls/rgw/cls_rgw_types.h" -#include "common/Clock.h" #include "cls/queue/cls_queue_types.h" #include "cls/queue/cls_queue_ops.h" #include "cls/queue/cls_queue_const.h" -#include "common/Clock.h" -#include "common/strtol.h" -#include "common/escape.h" - -#include "include/compat.h" -#include -#include - -#include "common/ceph_context.h" -#include "global/global_context.h" - -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_rgw - -#define GC_LIST_DEFAULT_MAX 128 +#include "cls/queue/cls_queue.h" CLS_VER(1,0) CLS_NAME(queue) -static int get_queue_head_size(cls_method_context_t hctx, uint64_t& head_size) -{ - //read head size - bufferlist bl_head_size; - int ret = cls_cxx_read(hctx, 0, sizeof(uint64_t), &bl_head_size); - if (ret < 0) { - CLS_LOG(0, "ERROR: get_queue_head_size: failed to read head\n"); - return ret; - } - //decode head size - auto iter = bl_head_size.cbegin(); - try { - decode(head_size, iter); - } catch (buffer::error& err) { - CLS_LOG(0, "ERROR: get_queue_head_size: failed to decode head size \n"); - return -EINVAL; - } - - CLS_LOG(10, "INFO: get_queue_head_size: head size is %lu\n", head_size); - - return 0; -} - -static int cls_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - auto in_iter = in->cbegin(); - - cls_create_queue_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_create_queue(): failed to decode entry\n"); - return -EINVAL; - } - - // create the object - int ret = cls_cxx_create(hctx, true); - if (ret < 0) { - CLS_LOG(0, "ERROR: %s(): cls_cxx_create returned %d", __func__, ret); - return ret; - } - CLS_LOG(10, "INFO: cls_create_queue create queue of size %lu", op.head.size); - CLS_LOG(10, "INFO: cls_create_queue: Is urgent data present: %d\n", op.head.has_urgent_data); - CLS_LOG(10, "INFO: cls_create_queue: Is urgent data present: %d\n", op.head.num_urgent_data_entries); - - uint64_t head_size = QUEUE_HEAD_SIZE_1K; - - if (op.head.num_head_urgent_entries) { - if (! op.head_size) { - head_size = QUEUE_HEAD_SIZE_4K; - op.head.tail = op.head.front = QUEUE_START_OFFSET_4K; - op.head.last_entry_offset = QUEUE_START_OFFSET_4K; - } else { - head_size = op.head_size; - op.head.tail = op.head.front = head_size; - op.head.last_entry_offset = head_size; - } - } else { - head_size = QUEUE_HEAD_SIZE_1K; - op.head.tail = op.head.front = QUEUE_START_OFFSET_1K; - op.head.last_entry_offset = QUEUE_START_OFFSET_1K; - } - op.head.size += head_size; - - CLS_LOG(10, "INFO: cls_create_queue queue actual size %lu", op.head.size); - CLS_LOG(10, "INFO: cls_create_queue head size %lu", head_size); - CLS_LOG(10, "INFO: cls_create_queue queue front offset %lu", op.head.front); - - - //encode head size - bufferlist bl; - encode(head_size, bl); - CLS_LOG(0, "INFO: cls_create_queue head size %u", bl.length()); - - //encode head - bufferlist bl_head; - encode(op.head, bl_head); - - bl.claim_append(bl_head); - - CLS_LOG(0, "INFO: cls_create_queue writing head of size %u", bl.length()); - ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - CLS_LOG(0, "ERROR: %s(): cls_cxx_write returned %d", __func__, ret); - return ret; - } - return 0; -} - -static int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - //get head size - uint64_t head_size = 0; - int ret = get_queue_head_size(hctx, head_size); - if (ret < 0) { - return ret; - } - - // read the head - cls_queue_head head; - bufferlist bl_head; - ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - return ret; - } - - try { - decode(head, bl_head); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_get_queue_size: failed to decode entry\n"); - return -EINVAL; - } - - head.size -= head_size; - - CLS_LOG(10, "INFO: cls_get_queue_size: size of queue is %lu\n", head.size); - - encode(head.size, *out); - - return 0; -} - -static int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - //get head size - uint64_t head_size = 0; - int ret = get_queue_head_size(hctx, head_size); - if (ret < 0) { - return ret; - } - - CLS_LOG(1, "INFO: cls_enqueue: Read Head of size: %lu\n", head_size); - // read the head - bufferlist bl_head; - ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - return ret; - } - - cls_queue_head head; - auto iter = bl_head.cbegin(); - try { - decode(head, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_enqueue: failed to decode head\n"); - return -EINVAL; - } - - if (head.front == head.tail && ! head.is_empty) { - // return queue full error - return -ENOSPC; - } - - iter = in->cbegin(); - cls_enqueue_op op; - try { - decode(op, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_enqueue: failed to decode input data \n"); - return -EINVAL; - } - - for (auto bl_data : op.bl_data_vec) { - uint64_t total_size = sizeof(uint64_t) + bl_data.length(); - CLS_LOG(1, "INFO: cls_enqueue(): Total size to be written is %lu and data size is %u\n", total_size, bl_data.length()); - - bufferlist bl; - uint64_t data_size = bl_data.length(); - encode(data_size, bl); - CLS_LOG(1, "INFO: cls_enqueue(): bufferlist length after encoding is %u\n", bl.length()); - bl.claim_append(bl_data); - - if (head.tail >= head.front) { - // check if data can fit in the remaining space in queue - if ((head.tail + total_size) <= head.size) { - CLS_LOG(1, "INFO: cls_enqueue: Writing data size and data: offset: %lu, size: %u\n", head.tail, bl.length()); - head.last_entry_offset = head.tail; - //write data size and data at tail offset - ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - head.tail += total_size; - } else { - CLS_LOG(1, "INFO: Wrapping around and checking for free space\n"); - uint64_t free_space_available = (head.size - head.tail) + (head.front - head_size); - //Split data if there is free space available - if (total_size <= free_space_available) { - uint64_t size_before_wrap = head.size - head.tail; - bufferlist bl_data_before_wrap; - bl.splice(0, size_before_wrap, &bl_data_before_wrap); - head.last_entry_offset = head.tail; - //write spliced (data size and data) at tail offset - CLS_LOG(1, "INFO: cls_enqueue: Writing spliced data at offset: %lu and data size: %u\n", head.tail, bl_data_before_wrap.length()); - ret = cls_cxx_write2(hctx, head.tail, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - head.tail = head_size; - //write remaining data at tail offset - CLS_LOG(1, "INFO: cls_enqueue: Writing remaining data at offset: %lu and data size: %u\n", head.tail, bl.length()); - ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - head.tail = bl.length(); - } - else { - CLS_LOG(1, "ERROR: No space left in queue\n"); - // return queue full error - return -ENOSPC; - } - } - } else if (head.front > head.tail) { - if ((head.tail + total_size) < head.front) { - CLS_LOG(1, "INFO: cls_enqueue: Writing data size and data: offset: %lu, size: %u\n\n", head.tail, bl.length()); - head.last_entry_offset = head.tail; - //write data size and data at tail offset - ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - head.tail += total_size; - } else { - CLS_LOG(1, "ERROR: No space left in queue\n"); - // return queue full error - return -ENOSPC; - } - } - - bl_head.clear(); - if (head.tail == head.size) { - head.tail = head_size; - } - CLS_LOG(1, "INFO: cls_enqueue: New tail offset: %lu \n", head.tail); - } //end - for - - head.is_empty = false; - - //Update urgent data if set - if (op.has_urgent_data) { - head.has_urgent_data = true; - head.bl_urgent_data = op.bl_urgent_data; - } - - encode(head, bl_head); - CLS_LOG(1, "INFO: cls_enqueue: Writing head of size: %u \n", bl_head.length()); - ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - CLS_LOG(1, "INFO: cls_enqueue: Writing head returned error: %d \n", ret); - return ret; - } - - return 0; -} - -static int cls_dequeue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - //get head size - uint64_t head_size = 0; - int ret = get_queue_head_size(hctx, head_size); - if (ret < 0) { - return ret; - } - - CLS_LOG(1, "INFO: cls_dequeue: Reading head of size: %lu\n", head_size); - // read the head - bufferlist bl_head; - ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - return ret; - } - cls_queue_head head; - auto iter = bl_head.cbegin(); - try { - decode(head, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_dequeue: failed to decode entry %s\n", bl_head.c_str()); - return -EINVAL; - } - - if (head.front == head.tail && head.is_empty) { - CLS_LOG(1, "ERROR: Queue is empty\n"); - return -ENOENT; - } - - uint64_t data_size = 0; - bufferlist bl_size; - - if (head.front < head.tail) { - //Read size of data first - ret = cls_cxx_read(hctx, head.front, sizeof(uint64_t), &bl_size); - if (ret < 0) { - return ret; - } - iter = bl_size.cbegin(); - try { - decode(data_size, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_dequeue: failed to decode data size \n"); - return -EINVAL; - } - CLS_LOG(1, "INFO: cls_dequeue: Data size: %lu, front offset: %lu\n", data_size, head.front); - head.front += sizeof(uint64_t); - //Read data based on size obtained above - CLS_LOG(1, "INFO: cls_dequeue: Data is read from from front offset %lu\n", head.front); - ret = cls_cxx_read2(hctx, head.front, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - head.front += data_size; - } else if (head.front >= head.tail) { - uint64_t actual_data_size = head.size - head.front; - if (actual_data_size < sizeof(uint64_t)) { - //Case 1. Data size has been spliced, first reconstruct data size - CLS_LOG(1, "INFO: cls_dequeue: Spliced data size is read from from front offset %lu\n", head.front); - ret = cls_cxx_read2(hctx, head.front, actual_data_size, &bl_size, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - head.front = head_size; - uint64_t remainder_data_size = sizeof(uint64_t) - actual_data_size; - bufferlist bl_rem_data_size; - CLS_LOG(1, "INFO: cls_dequeue: Remainder Spliced data size is read from from front offset %lu\n", head.front); - ret = cls_cxx_read(hctx, head.front, remainder_data_size, &bl_rem_data_size); - if (ret < 0) { - return ret; - } - bl_size.claim_append(bl_rem_data_size); - iter = bl_size.cbegin(); - try { - decode(data_size, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_dequeue: failed to decode data size \n"); - return -EINVAL; - } - head.front += remainder_data_size; - CLS_LOG(1, "INFO: cls_dequeue: Data is read from from front offset %lu\n", head.front); - ret = cls_cxx_read2(hctx, head.front, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - head.front += data_size; - } else { - ret = cls_cxx_read(hctx, head.front, sizeof(uint64_t), &bl_size); - if (ret < 0) { - return ret; - } - iter = bl_size.cbegin(); - try { - decode(data_size, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_dequeue: failed to decode data size \n"); - return -EINVAL; - } - CLS_LOG(1, "INFO: cls_dequeue: Data size: %lu, front offset: %lu\n", data_size, head.front); - head.front += sizeof(uint64_t); - - actual_data_size = head.size - head.front; - - if (actual_data_size < data_size) { - if (actual_data_size != 0) { - //Case 2. Data has been spliced - CLS_LOG(1, "INFO: cls_dequeue: Spliced data is read from from front offset %lu\n", head.front); - ret = cls_cxx_read2(hctx, head.front, actual_data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - } - head.front = head_size; - bufferlist bl_remainder; - uint64_t remainder_size = data_size - actual_data_size; - CLS_LOG(1, "INFO: cls_dequeue: Remaining Data is read from from front offset %lu\n", head.front); - ret = cls_cxx_read2(hctx, head.front, remainder_size, &bl_remainder, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - out->claim_append(bl_remainder); - head.front += remainder_size; - } else { - //Case 3. No splicing - CLS_LOG(1, "INFO: cls_dequeue: Data is read from from front offset %lu\n", head.front); - ret = cls_cxx_read2(hctx, head.front, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - head.front += data_size; - } - } - } - - //front has reached the end, wrap it around - if (head.front == head.size) { - head.front = head_size; - } - - if (head.front == head.tail) { - head.is_empty = true; - } - //Write head back - bl_head.clear(); - encode(head, bl_head); - CLS_LOG(1, "INFO: cls_enqueue: Writing head of size: %u and front offset is: %lu\n", bl_head.length(), head.front); - ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - CLS_LOG(1, "INFO: cls_enqueue: Writing head returned error: %d \n", ret); - return ret; - } - - return 0; -} - -static int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - //get head size - uint64_t head_size = 0; - int ret = get_queue_head_size(hctx, head_size); - if (ret < 0) { - return ret; - } - - CLS_LOG(1, "INFO: cls_queue_list_entries: Reading head at offset %lu\n", head_size); - uint64_t start_offset = 0; - // read the head - bufferlist bl_head; - ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - return ret; - } - cls_queue_head head; - auto iter = bl_head.cbegin(); - try { - decode(head, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_list_entries: failed to decode entry %s\n", bl_head.c_str()); - return -EINVAL; - } - - // If queue is empty, return from here - if (head.is_empty) { - return -ENOENT; - } - - cls_queue_list_ret op_ret; - CLS_LOG(1, "INFO: cls_queue_list_entries: Is urgent data present: %d\n", head.has_urgent_data); - //Info related to urgent data - op_ret.has_urgent_data = head.has_urgent_data; - op_ret.bl_urgent_data = head.bl_urgent_data; - if ((head.front == head.tail) && head.is_empty) { - op_ret.is_truncated = false; - encode(op_ret, *out); - return 0; - } - - auto in_iter = in->cbegin(); - - cls_queue_list_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_list_entries(): failed to decode input data\n"); - return -EINVAL; - } - if (op.start_offset == 0) { - start_offset = head.front; - } else { - start_offset = op.start_offset; - } - - op_ret.is_truncated = true; - uint64_t chunk_size = 1024; - uint64_t contiguous_data_size = 0, size_to_read = 0; - bool wrap_around = false; - - //Calculate length of contiguous data to be read depending on front, tail and start offset - if (head.tail > head.front) { - contiguous_data_size = head.tail - start_offset; - } else if (head.front >= head.tail) { - if (start_offset >= head.front) { - contiguous_data_size = head.size - start_offset; - wrap_around = true; - } else if (start_offset <= head.tail) { - contiguous_data_size = head.tail - start_offset; - } - } - - uint64_t num_ops = 0; - bufferlist bl; - do - { - CLS_LOG(1, "INFO: cls_queue_list_entries(): front is: %lu, tail is %lu, and start_offset is %lu\n", head.front, head.tail, start_offset); - - bufferlist bl_chunk; - //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size - if (contiguous_data_size > chunk_size) { - size_to_read = chunk_size; - } else { - size_to_read = contiguous_data_size; - } - CLS_LOG(1, "INFO: cls_queue_list_entries(): size_to_read is %lu\n", size_to_read); - if (size_to_read == 0) { - op_ret.is_truncated = false; - CLS_LOG(1, "INFO: cls_queue_list_entries(): size_to_read is 0, hence breaking out!\n"); - break; - } - - ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk); - if (ret < 0) { - return ret; - } - - //If there is leftover data from previous iteration, append new data to leftover data - bl.claim_append(bl_chunk); - bl_chunk = bl; - bl.clear(); - - CLS_LOG(1, "INFO: cls_queue_list_entries(): size of chunk %u\n", bl_chunk.length()); - - //Process the chunk of data read - unsigned index = 0; - auto it = bl_chunk.cbegin(); - uint64_t size_to_process = bl_chunk.length(); - do { - CLS_LOG(1, "INFO: cls_queue_list_entries(): index: %u, size_to_process: %lu\n", index, size_to_process); - it.seek(index); - uint64_t data_size = 0; - if (size_to_process >= sizeof(uint64_t)) { - try { - decode(data_size, it); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_list_entries: failed to decode data size \n"); - return -EINVAL; - } - } else { - // Copy unprocessed data to bl - bl_chunk.copy(index, size_to_process, bl); - CLS_LOG(1, "INFO: cls_queue_list_entries: not enough data to read data size, breaking out!\n"); - break; - } - CLS_LOG(1, "INFO: cls_queue_list_entries(): data size: %lu\n", data_size); - index += sizeof(uint64_t); - size_to_process -= sizeof(uint64_t); - bufferlist bl_data; - if (data_size <= size_to_process) { - bl_chunk.copy(index, data_size, bl_data); - //Return data and offset here - op_ret.data.emplace_back(bl_data); - uint64_t data_offset = start_offset + (index - sizeof(uint64_t)); - op_ret.offsets.emplace_back(data_offset); - CLS_LOG(1, "INFO: cls_queue_list_entries(): offset: %lu\n", data_offset); - index += bl_data.length(); - size_to_process -= bl_data.length(); - } else { - index -= sizeof(uint64_t); - size_to_process += sizeof(uint64_t); - bl_chunk.copy(index, size_to_process, bl); - CLS_LOG(1, "INFO: cls_queue_list_entries(): not enough data to read data, breaking out!\n"); - break; - } - num_ops++; - if (num_ops == op.max) { - CLS_LOG(1, "INFO: cls_queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n"); - break; - } - if (index == bl_chunk.length()) { - break; - } - } while(index < bl_chunk.length()); - - CLS_LOG(1, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max); - - if (num_ops == op.max) { - op_ret.next_offset = start_offset + index; - CLS_LOG(1, "INFO: cls_queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", op_ret.next_offset); - break; - } - - //Calculate new start_offset and contiguous data size - start_offset += size_to_read; - contiguous_data_size -= size_to_read; - if (contiguous_data_size == 0) { - if (wrap_around) { - start_offset = head_size; - contiguous_data_size = head.tail - head_size; - wrap_around = false; - } else { - CLS_LOG(1, "INFO: cls_queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n"); - op_ret.next_offset = head.front; - op_ret.is_truncated = false; - break; - } - } - - } while(num_ops < op.max); - - //Wrap around next offset if it has reached end of queue - if (op_ret.next_offset == head.size) { - op_ret.next_offset = head_size; - } - if (op_ret.next_offset == head.tail) { - op_ret.is_truncated = false; - } - - encode(op_ret, *out); - - return 0; -} - -static int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - //get head size - uint64_t head_size = 0; - int ret = get_queue_head_size(hctx, head_size); - if (ret < 0) { - return ret; - } - - // read the head - bufferlist bl_head; - ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - return ret; - } - cls_queue_head head; - auto iter = bl_head.cbegin(); - try { - decode(head, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode entry %s\n", bl_head.c_str()); - return -EINVAL; - } - - if ((head.front == head.tail) && head.is_empty) { - return -ENOENT; - } - - auto in_iter = in->cbegin(); - - cls_queue_remove_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode input data\n"); - return -EINVAL; - } - - if (op.start_offset == op.end_offset) { - return -EINVAL; - } - - // If start offset is not set or set to zero, then we need to shift it to actual front of queue - if (op.start_offset == 0) { - op.start_offset = head_size; - } - - if (op.start_offset != head.front) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: invalid start offset\n"); - return -EINVAL; - } - - // Read the size from the end offset - bufferlist bl_size; - uint64_t data_size = 0; - ret = cls_cxx_read(hctx, op.end_offset, sizeof(uint64_t), &bl_size); - if (ret < 0) { - return ret; - } - iter = bl_size.cbegin(); - try { - decode(data_size, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n"); - return -EINVAL; - } - - //offset obtained by adding last entry's size - uint64_t end_offset = op.end_offset + sizeof(uint64_t) + data_size; - - //Zero out the entries that have been removed, to reclaim storage space - if (end_offset > op.start_offset) { - ret = cls_cxx_write_zero(hctx, op.start_offset, (end_offset - op.start_offset)); - if (ret < 0) { - return ret; - } - } else { //start offset > end offset - ret = cls_cxx_write_zero(hctx, op.start_offset, (head.size - op.start_offset)); - if (ret < 0) { - return ret; - } - ret = cls_cxx_write_zero(hctx, head_size, (end_offset - head_size)); - if (ret < 0) { - return ret; - } - } - - head.front = end_offset; - - // Check if it is the end, then wrap around - if (head.front == head.size) { - head.front = head_size; - } - - CLS_LOG(1, "INFO: cls_queue_remove_entries: front offset is: %lu and tail offset is %lu\n", head.front, head.tail); - - // We've reached the last element - if (head.front == head.tail) { - CLS_LOG(1, "INFO: cls_queue_remove_entries: Queue is empty now!\n"); - head.is_empty = true; - } - - //Update urgent data map - head.bl_urgent_data = op.bl_urgent_data; - head.has_urgent_data = op.has_urgent_data; - - //Write head back - bl_head.clear(); - encode(head, bl_head); - CLS_LOG(1, "INFO: cls_queue_remove_entries: Writing head of size: %u and front offset is: %lu\n", bl_head.length(), head.front); - ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - CLS_LOG(1, "INFO: cls_queue_remove_entries: Writing head returned error: %d \n", ret); - return ret; - } - - return 0; -} - -static int cls_queue_get_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - //get head size - uint64_t head_size = 0; - int ret = get_queue_head_size(hctx, head_size); - if (ret < 0) { - return ret; - } - - // read the head - bufferlist bl_head; - ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - return ret; - } - cls_queue_head head; - auto iter = bl_head.cbegin(); - try { - decode(head, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_get_last_entry: failed to decode entry %s\n", bl_head.c_str()); - return -EINVAL; - } - - uint64_t data_size = 0, last_entry_offset = head.last_entry_offset; - bufferlist bl_size; - //Read size of data first - ret = cls_cxx_read(hctx, last_entry_offset, sizeof(uint64_t), &bl_size); - if (ret < 0) { - return ret; - } - iter = bl_size.cbegin(); - try { - decode(data_size, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_get_last_entry: failed to decode data size \n"); - return -EINVAL; - } - CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data size: %lu, last data offset: %lu\n", data_size, last_entry_offset); - - //Read data based on size obtained above - last_entry_offset += sizeof(uint64_t); - CLS_LOG(1, "INFO: cls_dequeue: Data is read from from last entry offset %lu\n", last_entry_offset); - ret = cls_cxx_read2(hctx, last_entry_offset, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); - if (ret < 0) { - return ret; - } - return 0; -} - -static int cls_queue_update_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - //get head size - uint64_t head_size = 0; - int ret = get_queue_head_size(hctx, head_size); - if (ret < 0) { - return ret; - } - - // read the head - bufferlist bl_head; - ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - return ret; - } - cls_queue_head head; - auto iter = bl_head.cbegin(); - try { - decode(head, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_update_last_entry: failed to decode entry %s\n", bl_head.c_str()); - return -EINVAL; - } - - auto in_iter = in->cbegin(); - - cls_queue_update_last_entry_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_update_last_entry: failed to decode input data\n"); - return -EINVAL; - } - - bufferlist bl; - uint64_t data_size = op.bl_data.length(); - encode(data_size, bl); - bl.claim_append(op.bl_data); - - CLS_LOG(1, "INFO: cls_queue_update_last_entry_op: Updating data at last offset: %lu and total data size is %u\n", head.last_entry_offset, bl.length()); - - //write data size + data at offset - ret = cls_cxx_write(hctx, head.last_entry_offset, bl.length(), &bl); - if (ret < 0) { - return ret; - } - - if (op.has_urgent_data) { - head.has_urgent_data = true; - head.bl_urgent_data = op.bl_urgent_data; - } - - bl_head.clear(); - encode(head, bl_head); - CLS_LOG(1, "INFO: cls_queue_update_last_entry: Writing head of size: %u \n", bl_head.length()); - ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - CLS_LOG(1, "INFO: cls_queue_update_last_entry: Writing head returned error: %d \n", ret); - return ret; - } - return 0; -} - -static int cls_queue_read_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - //get head size - uint64_t head_size = 0; - int ret = get_queue_head_size(hctx, head_size); - if (ret < 0) { - return ret; - } - - // read the head - bufferlist bl_head; - ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - return ret; - } - cls_queue_head head; - auto iter = bl_head.cbegin(); - try { - decode(head, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_read_urgent_data: failed to decode entry %s\n", bl_head.c_str()); - return -EINVAL; - } - - CLS_LOG(1, "INFO: cls_queue_read_urgent_data: tail offset %lu\n", head.tail); - - cls_queue_urgent_data_ret op_ret; - if(head.has_urgent_data) { - op_ret.has_urgent_data = true; - op_ret.bl_urgent_data = head.bl_urgent_data; - } - - encode(op_ret, *out); - - return 0; -} - -static int cls_queue_write_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - //get head size - uint64_t head_size = 0; - int ret = get_queue_head_size(hctx, head_size); - if (ret < 0) { - return ret; - } - - // read the head - bufferlist bl_head; - ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - return ret; - } - cls_queue_head head; - auto iter = bl_head.cbegin(); - try { - decode(head, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_write_urgent_data: failed to decode entry %s\n", bl_head.c_str()); - return -EINVAL; - } - - CLS_LOG(1, "INFO: cls_queue_write_urgent_data: tail offset %lu\n", head.tail); - - auto in_iter = in->cbegin(); - - cls_queue_write_urgent_data_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_write_urgent_data: failed to decode input data\n"); - return -EINVAL; - } - //Write urgent data - head.has_urgent_data = op.has_urgent_data; - head.bl_urgent_data = op.bl_urgent_data; - - //Write head back - bl_head.clear(); - encode(head, bl_head); - CLS_LOG(1, "INFO: cls_queue_write_urgent_data: Writing head of size: %u\n", bl_head.length()); - ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - CLS_LOG(1, "INFO: cls_queue_write_urgent_data: Writing head returned error: %d \n", ret); - return ret; - } - - return 0; -} - -static int cls_queue_can_urgent_data_fit(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - bool can_fit = true; - - //get head size - uint64_t head_size = 0; - int ret = get_queue_head_size(hctx, head_size); - if (ret < 0) { - return ret; - } - - // read the head - bufferlist bl_head; - ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (ret < 0) { - return ret; - } - cls_queue_head head; - auto iter = bl_head.cbegin(); - try { - decode(head, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_write_urgent_data: failed to decode entry %s\n", bl_head.c_str()); - return -EINVAL; - } - - head.has_urgent_data = true; - head.bl_urgent_data = *in; - - bl_head.clear(); - encode(head, bl_head); - - if(bl_head.length() > head_size) { - can_fit = false; - } - - encode(can_fit, *out); - - return 0; -} - -static int cls_gc_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - auto in_iter = in->cbegin(); - - cls_gc_create_queue_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_create_queue: failed to decode entry\n"); - return -EINVAL; - } - - cls_create_queue_op create_op; - - if (op.num_urgent_data_entries > 0) { - std::unordered_map urgent_data_map; - urgent_data_map.reserve(op.num_urgent_data_entries); - encode(urgent_data_map, create_op.head.bl_urgent_data); - } - - CLS_LOG(10, "INFO: cls_gc_create_queue: queue size is %lu\n", op.size); - create_op.head.size = op.size; - create_op.head.num_urgent_data_entries = op.num_urgent_data_entries; - create_op.head_size = g_ceph_context->_conf->rgw_gc_queue_head_size; - - in->clear(); - encode(create_op, *in); - - return cls_create_queue(hctx, in, out); -} - -static int cls_gc_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - auto in_iter = in->cbegin(); - - cls_rgw_gc_set_entry_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_enqueue: failed to decode entry\n"); - return -EINVAL; - } - - op.info.time = ceph::real_clock::now(); - op.info.time += make_timespan(op.expiration_secs); - - cls_enqueue_op enqueue_op; - bufferlist bl_data; - encode(op.info, bl_data); - enqueue_op.bl_data_vec.emplace_back(bl_data); - enqueue_op.has_urgent_data = false; - - CLS_LOG(1, "INFO: cls_gc_enqueue: Data size is: %u \n", bl_data.length()); - - in->clear(); - encode(enqueue_op, *in); - - return cls_enqueue(hctx, in, out); -} - -static int cls_gc_dequeue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - int r = cls_dequeue(hctx, in, out); - if (r < 0) - return r; - - cls_rgw_gc_obj_info data; - auto iter = out->cbegin(); - try { - decode(data, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_dequeue(): failed to decode entry\n"); - return -EINVAL; - } - - CLS_LOG(1, "INFO: tag of gc info is %s\n", data.tag.c_str()); - - return 0; -} - -static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - CLS_LOG(1, "INFO: cls_gc_queue_list(): Entered cls_gc_queue_list \n"); - auto in_iter = in->cbegin(); - - cls_rgw_gc_list_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode input\n"); - return -EINVAL; - } - - cls_queue_list_op list_op; - if (op.marker.empty()) { - list_op.start_offset = 0; - } else { - list_op.start_offset = boost::lexical_cast(op.marker.c_str()); - } - - if (! op.max) { - op.max = GC_LIST_DEFAULT_MAX; - } - - list_op.max = op.max; - - cls_queue_list_ret op_ret; - cls_rgw_gc_list_ret list_ret; - uint32_t num_entries = 0; - bool urgent_data_decoded = false; - std::unordered_map urgent_data_map; - do { - in->clear(); - encode(list_op, *in); - - CLS_LOG(1, "INFO: cls_gc_queue_list(): Entering cls_queue_list_entries \n"); - int ret = cls_queue_list_entries(hctx, in, out); - if (ret < 0) { - CLS_LOG(1, "ERROR: cls_queue_list_entries(): returned error %d\n", ret); - return ret; - } - - auto iter = out->cbegin(); - try { - decode(op_ret, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode output\n"); - return -EINVAL; - } - - if (op_ret.has_urgent_data && ! urgent_data_decoded) { - auto iter_urgent_data = op_ret.bl_urgent_data.cbegin(); - decode(urgent_data_map, iter_urgent_data); - urgent_data_decoded = true; - } - - if (op_ret.data.size()) { - for (auto it : op_ret.data) { - cls_rgw_gc_obj_info info; - try { - decode(info, it); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode gc info\n"); - return -EINVAL; - } - //Check for info tag in urgent data map - if (urgent_data_map.size() > 0) { - auto found = urgent_data_map.find(info.tag); - if (found != urgent_data_map.end()) { - if (found->second > info.time) { - CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in urgent data: %s\n", info.tag.c_str()); - continue; - } - } - } else { - //Search in xattrs - bufferlist bl_xattrs; - int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs); - if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { - CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret); - return ret; - } - if (ret != -ENOENT && ret != -ENODATA) { - std::unordered_map xattr_urgent_data_map; - auto iter = bl_xattrs.cbegin(); - try { - decode(xattr_urgent_data_map, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode xattrs urgent data map\n"); - return -EINVAL; - } //end - catch - if (xattr_urgent_data_map.size() > 0) { - auto found = xattr_urgent_data_map.find(info.tag); - if (found != xattr_urgent_data_map.end()) { - if (found->second > info.time) { - CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in xattrs urgent data map: %s\n", info.tag.c_str()); - continue; - } - } - } // end - if xattrs size ... - } // end - ret != ENOENT && ENODATA - } - if (op.expired_only) { - real_time now = ceph::real_clock::now(); - if (info.time <= now) { - list_ret.entries.emplace_back(info); - } - } else { - list_ret.entries.emplace_back(info); - } - num_entries++; - } - CLS_LOG(1, "INFO: cls_gc_queue_list(): num_entries: %u and op.max: %u\n", num_entries, op.max); - if (num_entries < op.max) { - list_op.max = (op.max - num_entries); - list_op.start_offset = op_ret.next_offset; - out->clear(); - } else { - break; - } - } else { - break; - } - } while(op_ret.is_truncated); - - list_ret.truncated = op_ret.is_truncated; - if (list_ret.truncated) { - list_ret.next_marker = boost::lexical_cast(op_ret.next_offset); - } - out->clear(); - encode(list_ret, *out); - return 0; -} - -static int cls_gc_queue_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entered cls_gc_queue_remove \n"); - - auto in_iter = in->cbegin(); - - cls_rgw_gc_queue_remove_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode input\n"); - return -EINVAL; - } - - // List entries and calculate total number of entries (including invalid entries) - cls_queue_list_op list_op; - if (op.marker.empty()) { - list_op.start_offset = 0; - } else { - list_op.start_offset = boost::lexical_cast(op.marker.c_str()); - } - - if (! op.num_entries) { - op.num_entries = GC_LIST_DEFAULT_MAX; - } - - list_op.max = op.num_entries; - bool is_truncated = true; - uint32_t total_num_entries = 0, num_entries = 0; - std::unordered_map urgent_data_map; - bool urgent_data_decoded = false; - uint64_t end_offset = 0; - do { - in->clear(); - encode(list_op, *in); - - CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering cls_queue_list_entries \n"); - int ret = cls_queue_list_entries(hctx, in, out); - if (ret < 0) { - CLS_LOG(1, "ERROR: cls_gc_queue_remove(): returned error %d\n", ret); - return ret; - } - - cls_queue_list_ret op_ret; - auto iter = out->cbegin(); - try { - decode(op_ret, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode output\n"); - return -EINVAL; - } - is_truncated = op_ret.is_truncated; - unsigned int index = 0; - if (op_ret.has_urgent_data && ! urgent_data_decoded) { - auto iter_urgent_data = op_ret.bl_urgent_data.cbegin(); - try { - decode(urgent_data_map, iter_urgent_data); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode urgent data map\n"); - return -EINVAL; - } - urgent_data_decoded = true; - } - // If data is not empty - if (op_ret.data.size()) { - for (auto it : op_ret.data) { - cls_rgw_gc_obj_info info; - try { - decode(info, it); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode gc info\n"); - return -EINVAL; - } - CLS_LOG(1, "INFO: cls_gc_queue_remove(): entry: %s\n", info.tag.c_str()); - total_num_entries++; - index++; - //Search for tag in urgent data map - if (urgent_data_map.size() > 0) { - auto found = urgent_data_map.find(info.tag); - if (found != urgent_data_map.end()) { - if (found->second > info.time) { - CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in urgent data: %s\n", info.tag.c_str()); - continue; - } else if (found->second == info.time) { - CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from urgent data: %s\n", info.tag.c_str()); - urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later - } - }//end-if map end - }//end-if urgent data - else { - //Search in xattrs - bufferlist bl_xattrs; - int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs); - if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { - CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret); - return ret; - } - if (ret != -ENOENT && ret != -ENODATA) { - std::unordered_map xattr_urgent_data_map; - auto iter = bl_xattrs.cbegin(); - try { - decode(xattr_urgent_data_map, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode xattrs urgent data map\n"); - return -EINVAL; - } //end - catch - if (xattr_urgent_data_map.size() > 0) { - auto found = xattr_urgent_data_map.find(info.tag); - if (found != xattr_urgent_data_map.end()) { - if (found->second > info.time) { - CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in xattrs urgent data map: %s\n", info.tag.c_str()); - continue; - } else if (found->second == info.time) { - CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from xattrs urgent data: %s\n", info.tag.c_str()); - xattr_urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later - } - } - } // end - if xattrs size ... - if (xattr_urgent_data_map.size() == 0) { - //remove from xattrs ??? - } - } // end - ret != ENOENT && ENODATA - }// search in xattrs - num_entries++; - }//end-for - - if (num_entries < op.num_entries) { - list_op.max = (op.num_entries - num_entries); - list_op.start_offset = op_ret.next_offset; - out->clear(); - } else { - end_offset = op_ret.offsets[index - 1]; - CLS_LOG(1, "INFO: cls_gc_queue_remove(): index is %u and end_offset is: %lu\n", index, end_offset); - break; - } - } //end-if - else { - break; - } - } while(is_truncated); - CLS_LOG(1, "INFO: cls_gc_queue_remove(): Total number of entries to remove: %d\n", total_num_entries); - - cls_queue_remove_op rem_op; - if (op.marker.empty()) { - rem_op.start_offset = 0; - } else { - rem_op.start_offset = boost::lexical_cast(op.marker.c_str()); - } - - rem_op.end_offset = end_offset; - CLS_LOG(1, "INFO: cls_gc_queue_remove(): start offset: %lu and end offset: %lu\n", rem_op.start_offset, rem_op.end_offset); - if(urgent_data_map.size() == 0) { - rem_op.has_urgent_data = false; - } - encode(urgent_data_map, rem_op.bl_urgent_data); - - in->clear(); - encode(rem_op, *in); - - CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering cls_queue_remove_entries \n"); - int ret = cls_queue_remove_entries(hctx, in, out); - if (ret < 0) { - CLS_LOG(1, "ERROR: cls_queue_remove_entries(): returned error %d\n", ret); - return ret; - } - - return 0; -} - -static int cls_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - int ret = 0; - auto in_iter = in->cbegin(); - - cls_gc_defer_entry_op op; - try { - decode(op, in_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode input\n"); - return -EINVAL; - } - - op.info.time = ceph::real_clock::now(); - op.info.time += make_timespan(op.expiration_secs); - - //Read urgent data - in->clear(); - out->clear(); - - ret = cls_queue_read_urgent_data(hctx, in, out); - - auto out_iter = out->cbegin(); - - cls_queue_urgent_data_ret op_ret; - try { - decode(op_ret, out_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_urgent_data_ret(): failed to decode ouput\n"); - return -EINVAL; - } - - auto bl_iter = op_ret.bl_urgent_data.cbegin(); - std::unordered_map urgent_data_map; - if (op_ret.has_urgent_data) { - try { - decode(urgent_data_map, bl_iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_queue_urgent_data_ret(): failed to decode urgent data map\n"); - return -EINVAL; - } - } - - bool is_last_entry = false; - in->clear(); - out->clear(); - ret = cls_queue_get_last_entry(hctx, in, out); - if (ret < 0) { - return ret; - } - - cls_rgw_gc_obj_info info; - auto iter = out->cbegin(); - try { - decode(info, iter); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode entry\n"); - return -EINVAL; - } - - CLS_LOG(1, "INFO: tag of gc info is %s\n", info.tag.c_str()); - if (info.tag == op.info.tag) { - is_last_entry = true; - } - - bool has_urgent_data = false; - auto it = urgent_data_map.find(op.info.tag); - if (it != urgent_data_map.end()) { - it->second = op.info.time; - } else { - urgent_data_map.insert({op.info.tag, op.info.time}); - has_urgent_data = true; - } - - out->clear(); - bool can_fit = false; - bufferlist bl_urgent_data; - encode(urgent_data_map, bl_urgent_data); - ret = cls_queue_can_urgent_data_fit(hctx, &bl_urgent_data, out); - if (ret < 0) { - return ret; - } - iter = out->cbegin(); - decode(can_fit, iter); - CLS_LOG(1, "INFO: Can urgent data fit: %d \n", can_fit); - - if (can_fit) { - in->clear(); - if (! is_last_entry) { - cls_enqueue_op enqueue_op; - bufferlist bl_data; - encode(op.info, bl_data); - enqueue_op.bl_data_vec.emplace_back(bl_data); - CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data.length()); - enqueue_op.bl_urgent_data = bl_urgent_data; - enqueue_op.has_urgent_data = has_urgent_data; - encode(enqueue_op, *in); - ret = cls_enqueue(hctx, in, out); - if (ret < 0) { - return ret; - } - } else { - cls_queue_update_last_entry_op update_op; - encode(op.info, update_op.bl_data); - CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", update_op.bl_data.length()); - update_op.bl_urgent_data = bl_urgent_data; - update_op.has_urgent_data = has_urgent_data; - encode(update_op, *in); - ret = cls_queue_update_last_entry(hctx, in, out); - if (ret < 0) { - return ret; - } - } - } - // Else write urgent data as xattrs - else { - std::unordered_map xattr_urgent_data_map; - xattr_urgent_data_map.insert({op.info.tag, op.info.time}); - bufferlist bl_map; - encode(xattr_urgent_data_map, bl_map); - ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map); - CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data"); - if (ret < 0) { - CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret); - return ret; - } - } - return 0; -} - CLS_INIT(queue) { CLS_LOG(1, "Loaded queue class!"); cls_handle_t h_class; cls_method_handle_t h_create_queue; + cls_method_handle_t h_init_queue; cls_method_handle_t h_get_queue_size; cls_method_handle_t h_enqueue; cls_method_handle_t h_dequeue; @@ -1531,17 +32,11 @@ CLS_INIT(queue) cls_method_handle_t h_queue_write_urgent_data; cls_method_handle_t h_queue_can_urgent_data_fit; - cls_method_handle_t h_gc_create_queue; - cls_method_handle_t h_gc_enqueue; - cls_method_handle_t h_gc_dequeue; - cls_method_handle_t h_gc_queue_list_entries; - cls_method_handle_t h_gc_queue_remove_entries; - cls_method_handle_t h_gc_queue_update_entry; - cls_register(QUEUE_CLASS, &h_class); /* queue*/ cls_register_cxx_method(h_class, CREATE_QUEUE, CLS_METHOD_WR, cls_create_queue, &h_create_queue); + cls_register_cxx_method(h_class, INIT_QUEUE, CLS_METHOD_WR, cls_init_queue, &h_init_queue); cls_register_cxx_method(h_class, GET_QUEUE_SIZE, CLS_METHOD_RD, cls_get_queue_size, &h_get_queue_size); cls_register_cxx_method(h_class, ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_enqueue, &h_enqueue); cls_register_cxx_method(h_class, DEQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_dequeue, &h_dequeue); @@ -1553,14 +48,6 @@ CLS_INIT(queue) cls_register_cxx_method(h_class, QUEUE_WRITE_URGENT_DATA, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_write_urgent_data, &h_queue_write_urgent_data); cls_register_cxx_method(h_class, QUEUE_CAN_URGENT_DATA_FIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_can_urgent_data_fit, &h_queue_can_urgent_data_fit); - /* gc */ - cls_register_cxx_method(h_class, GC_CREATE_QUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_create_queue, &h_gc_create_queue); - cls_register_cxx_method(h_class, GC_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_enqueue, &h_gc_enqueue); - cls_register_cxx_method(h_class, GC_DEQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_dequeue, &h_gc_dequeue); - cls_register_cxx_method(h_class, GC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_gc_queue_list, &h_gc_queue_list_entries); - cls_register_cxx_method(h_class, GC_QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_queue_remove, &h_gc_queue_remove_entries); - cls_register_cxx_method(h_class, GC_QUEUE_UPDATE_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_queue_update_entry, &h_gc_queue_update_entry); - return; } diff --git a/src/cls/queue/cls_queue.h b/src/cls/queue/cls_queue.h new file mode 100644 index 00000000000..1f070570b40 --- /dev/null +++ b/src/cls/queue/cls_queue.h @@ -0,0 +1,17 @@ +#ifndef CEPH_CLS_QUEUE_H +#define CEPH_CLS_QUEUE_H + +int cls_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out); +int cls_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out); +int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out); +int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out); +int cls_dequeue(cls_method_context_t hctx, bufferlist *in, bufferlist *out); +int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out); +int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out); +int cls_queue_get_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out); +int cls_queue_update_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out); +int cls_queue_read_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out); +int cls_queue_write_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out); +int cls_queue_can_urgent_data_fit(cls_method_context_t hctx, bufferlist *in, bufferlist *out); + +#endif /* CEPH_CLS_QUEUE_H */ \ No newline at end of file diff --git a/src/cls/queue/cls_queue_client.cc b/src/cls/queue/cls_queue_client.cc index 3fd5933d0fd..3f6c721f577 100644 --- a/src/cls/queue/cls_queue_client.cc +++ b/src/cls/queue/cls_queue_client.cc @@ -3,6 +3,7 @@ #include #include "cls/rgw/cls_rgw_ops.h" +#include "cls/queue/cls_rgw_queue_ops.h" #include "cls/queue/cls_queue_const.h" #include "cls/queue/cls_queue_client.h" @@ -14,10 +15,22 @@ void cls_rgw_gc_create_queue(ObjectWriteOperation& op, string& queue_name, uint6 { bufferlist in; cls_gc_create_queue_op call; + call.name = queue_name; call.size = size; call.num_urgent_data_entries = num_urgent_data_entries; encode(call, in); - op.exec(QUEUE_CLASS, GC_CREATE_QUEUE, in); + op.exec(RGW_QUEUE_CLASS, GC_CREATE_QUEUE, in); +} + +void cls_rgw_gc_init_queue(ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries) +{ + bufferlist in; + cls_gc_create_queue_op call; + call.name = queue_name; + call.size = size; + call.num_urgent_data_entries = num_urgent_data_entries; + encode(call, in); + op.exec(RGW_QUEUE_CLASS, GC_INIT_QUEUE, in); } int cls_rgw_gc_get_queue_size(IoCtx& io_ctx, string& oid, uint64_t& size) @@ -44,14 +57,14 @@ void cls_rgw_gc_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, cls_ call.expiration_secs = expiration_secs; call.info = info; encode(call, in); - op.exec(QUEUE_CLASS, GC_ENQUEUE, in); + op.exec(RGW_QUEUE_CLASS, GC_ENQUEUE, in); } int cls_rgw_gc_dequeue(IoCtx& io_ctx, string& oid, cls_rgw_gc_obj_info& info) { bufferlist in, out; - int r = io_ctx.exec(oid, QUEUE_CLASS, GC_DEQUEUE, in, out); + int r = io_ctx.exec(oid, RGW_QUEUE_CLASS, GC_DEQUEUE, in, out); if (r < 0) return r; @@ -75,7 +88,7 @@ int cls_rgw_gc_list_queue(IoCtx& io_ctx, string& oid, string& marker, uint32_t m op.expired_only = expired_only; encode(op, in); - int r = io_ctx.exec(oid, QUEUE_CLASS, GC_QUEUE_LIST_ENTRIES, in, out); + int r = io_ctx.exec(oid, RGW_QUEUE_CLASS, GC_QUEUE_LIST_ENTRIES, in, out); if (r < 0) return r; @@ -97,14 +110,14 @@ int cls_rgw_gc_list_queue(IoCtx& io_ctx, string& oid, string& marker, uint32_t m return 0; } -void cls_rgw_gc_remove_queue(ObjectWriteOperation& op, string& marker, uint32_t num_entries) +void cls_rgw_gc_remove_entries_queue(ObjectWriteOperation& op, string& marker, uint32_t num_entries) { bufferlist in, out; cls_rgw_gc_queue_remove_op rem_op; rem_op.marker = marker; rem_op.num_entries = num_entries; encode(rem_op, in); - op.exec(QUEUE_CLASS, GC_QUEUE_REMOVE_ENTRIES, in); + op.exec(RGW_QUEUE_CLASS, GC_QUEUE_REMOVE_ENTRIES, in); } void cls_rgw_gc_defer_entry_queue(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info) @@ -114,5 +127,5 @@ void cls_rgw_gc_defer_entry_queue(ObjectWriteOperation& op, uint32_t expiration_ defer_op.expiration_secs = expiration_secs; defer_op.info = info; encode(defer_op, in); - op.exec(QUEUE_CLASS, GC_QUEUE_UPDATE_ENTRY, in); + op.exec(RGW_QUEUE_CLASS, GC_QUEUE_UPDATE_ENTRY, in); } diff --git a/src/cls/queue/cls_queue_client.h b/src/cls/queue/cls_queue_client.h index f509aabbed9..09809d3699c 100644 --- a/src/cls/queue/cls_queue_client.h +++ b/src/cls/queue/cls_queue_client.h @@ -11,12 +11,13 @@ #include "common/Cond.h" void cls_rgw_gc_create_queue(librados::ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries); +void cls_rgw_gc_init_queue(librados::ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries); int cls_rgw_gc_get_queue_size(librados::IoCtx& io_ctx, string& oid, uint64_t& size); void cls_rgw_gc_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info); int cls_rgw_gc_dequeue(librados::IoCtx& io_ctx, string& oid, cls_rgw_gc_obj_info& info); int cls_rgw_gc_list_queue(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only, list& entries, bool *truncated, string& next_marker); -void cls_rgw_gc_remove_queue(librados::ObjectWriteOperation& op, string& marker, uint32_t num_entries); +void cls_rgw_gc_remove_entries_queue(librados::ObjectWriteOperation& op, string& marker, uint32_t num_entries); void cls_rgw_gc_defer_entry_queue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info); #endif \ No newline at end of file diff --git a/src/cls/queue/cls_queue_const.h b/src/cls/queue/cls_queue_const.h index a9fac61ebe0..0e45739d1c0 100644 --- a/src/cls/queue/cls_queue_const.h +++ b/src/cls/queue/cls_queue_const.h @@ -2,8 +2,10 @@ #define CEPH_CLS_QUEUE_CONSTS_H #define QUEUE_CLASS "queue" +#define RGW_QUEUE_CLASS "rgw_queue" #define CREATE_QUEUE "create_queue" +#define INIT_QUEUE "init_queue" #define GET_QUEUE_SIZE "get_queue_size" #define ENQUEUE "enqueue" #define DEQUEUE "dequeue" @@ -16,6 +18,7 @@ #define QUEUE_CAN_URGENT_DATA_FIT "queue_can_urgent_data_fit" #define GC_CREATE_QUEUE "gc_create_queue" +#define GC_INIT_QUEUE "gc_init_queue" #define GC_ENQUEUE "gc_enqueue" #define GC_DEQUEUE "gc_dequeue" #define GC_QUEUE_LIST_ENTRIES "gc_queue_list_entries" diff --git a/src/cls/queue/cls_queue_ops.h b/src/cls/queue/cls_queue_ops.h index 6e0d1f17399..d2c90a179eb 100644 --- a/src/cls/queue/cls_queue_ops.h +++ b/src/cls/queue/cls_queue_ops.h @@ -1,31 +1,8 @@ #ifndef CEPH_CLS_QUEUE_OPS_H #define CEPH_CLS_QUEUE_OPS_H -#include "cls/rgw/cls_rgw_types.h" #include "cls/queue/cls_queue_types.h" -struct cls_gc_create_queue_op { - uint64_t size; - uint64_t num_urgent_data_entries; - - cls_gc_create_queue_op() {} - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(size, bl); - encode(num_urgent_data_entries, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(size, bl); - decode(num_urgent_data_entries, bl); - DECODE_FINISH(bl); - } -}; -WRITE_CLASS_ENCODER(cls_gc_create_queue_op) - struct cls_create_queue_op { cls_queue_head head; uint64_t head_size{0}; @@ -53,7 +30,6 @@ WRITE_CLASS_ENCODER(cls_create_queue_op) struct cls_enqueue_op { vector bl_data_vec; - bool has_urgent_data{false}; bufferlist bl_urgent_data; cls_enqueue_op() {} @@ -61,7 +37,6 @@ struct cls_enqueue_op { void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(bl_data_vec, bl); - encode(has_urgent_data, bl); encode(bl_urgent_data, bl); ENCODE_FINISH(bl); } @@ -69,7 +44,6 @@ struct cls_enqueue_op { void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); decode(bl_data_vec, bl); - decode(has_urgent_data, bl); decode(bl_urgent_data, bl); DECODE_FINISH(bl); } @@ -111,7 +85,6 @@ struct cls_queue_list_ret { uint64_t next_offset; vector data; vector offsets; - bool has_urgent_data; bufferlist bl_urgent_data; cls_queue_list_ret() {} @@ -122,7 +95,6 @@ struct cls_queue_list_ret { encode(next_offset, bl); encode(data, bl); encode(offsets, bl); - encode(has_urgent_data, bl); encode(bl_urgent_data, bl); ENCODE_FINISH(bl); } @@ -133,7 +105,6 @@ struct cls_queue_list_ret { decode(next_offset, bl); decode(data, bl); decode(offsets, bl); - decode(has_urgent_data, bl); decode(bl_urgent_data, bl); DECODE_FINISH(bl); } @@ -143,7 +114,6 @@ WRITE_CLASS_ENCODER(cls_queue_list_ret) struct cls_queue_remove_op { uint64_t start_offset; uint64_t end_offset; - bool has_urgent_data; bufferlist bl_urgent_data; cls_queue_remove_op() {} @@ -152,7 +122,6 @@ struct cls_queue_remove_op { ENCODE_START(1, 1, bl); encode(start_offset, bl); encode(end_offset, bl); - encode(has_urgent_data, bl); encode(bl_urgent_data, bl); ENCODE_FINISH(bl); } @@ -161,7 +130,6 @@ struct cls_queue_remove_op { DECODE_START(1, bl); decode(start_offset, bl); decode(end_offset, bl); - decode(has_urgent_data, bl); decode(bl_urgent_data, bl); DECODE_FINISH(bl); } @@ -169,21 +137,18 @@ struct cls_queue_remove_op { WRITE_CLASS_ENCODER(cls_queue_remove_op) struct cls_queue_urgent_data_ret { - bool has_urgent_data{false}; bufferlist bl_urgent_data; cls_queue_urgent_data_ret() {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); - encode(has_urgent_data, bl); encode(bl_urgent_data, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); - decode(has_urgent_data, bl); decode(bl_urgent_data, bl); DECODE_FINISH(bl); } @@ -191,21 +156,18 @@ struct cls_queue_urgent_data_ret { WRITE_CLASS_ENCODER(cls_queue_urgent_data_ret) struct cls_queue_write_urgent_data_op { - bool has_urgent_data{false}; bufferlist bl_urgent_data; cls_queue_write_urgent_data_op() {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); - encode(has_urgent_data, bl); encode(bl_urgent_data, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); - decode(has_urgent_data, bl); decode(bl_urgent_data, bl); DECODE_FINISH(bl); } @@ -214,7 +176,6 @@ WRITE_CLASS_ENCODER(cls_queue_write_urgent_data_op) struct cls_queue_update_last_entry_op { bufferlist bl_data; - bool has_urgent_data{false}; bufferlist bl_urgent_data; cls_queue_update_last_entry_op() {} @@ -222,7 +183,6 @@ struct cls_queue_update_last_entry_op { void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(bl_data, bl); - encode(has_urgent_data, bl); encode(bl_urgent_data, bl); ENCODE_FINISH(bl); } @@ -230,54 +190,15 @@ struct cls_queue_update_last_entry_op { void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); decode(bl_data, bl); - decode(has_urgent_data, bl); decode(bl_urgent_data, bl); DECODE_FINISH(bl); } }; WRITE_CLASS_ENCODER(cls_queue_update_last_entry_op) -struct cls_rgw_gc_queue_remove_op { - uint64_t num_entries; - string marker; - - cls_rgw_gc_queue_remove_op() {} +struct cls_init_queue_op : cls_create_queue_op{ - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(num_entries, bl); - encode(marker, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(num_entries, bl); - decode(marker, bl); - DECODE_FINISH(bl); - } -}; -WRITE_CLASS_ENCODER(cls_rgw_gc_queue_remove_op) - -struct cls_gc_defer_entry_op { - uint32_t expiration_secs; - cls_rgw_gc_obj_info info; - cls_gc_defer_entry_op() : expiration_secs(0) {} - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(expiration_secs, bl); - encode(info, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(expiration_secs, bl); - decode(info, bl); - DECODE_FINISH(bl); - } }; -WRITE_CLASS_ENCODER(cls_gc_defer_entry_op) +WRITE_CLASS_ENCODER(cls_init_queue_op) #endif /* CEPH_CLS_QUEUE_OPS_H */ \ No newline at end of file diff --git a/src/cls/queue/cls_queue_src.cc b/src/cls/queue/cls_queue_src.cc new file mode 100644 index 00000000000..dda4690f069 --- /dev/null +++ b/src/cls/queue/cls_queue_src.cc @@ -0,0 +1,1042 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" + +#include + +#include "objclass/objclass.h" +#include "cls/queue/cls_queue_types.h" +#include "cls/queue/cls_queue_ops.h" +#include "cls/queue/cls_queue_const.h" +#include "cls/queue/cls_queue.h" + +static int get_queue_head_and_size(cls_method_context_t hctx, cls_queue_head& head, uint64_t& head_size) +{ + //read head size + bufferlist bl_head_size; + int ret = cls_cxx_read(hctx, 0, sizeof(uint64_t), &bl_head_size); + if (ret < 0) { + CLS_LOG(0, "ERROR: get_queue_head_and_size: failed to read head with error %d\n", ret); + return ret; + } + //decode head size + auto iter = bl_head_size.cbegin(); + try { + decode(head_size, iter); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: get_queue_head_and_size: failed to decode head size \n"); + return -EINVAL; + } + + CLS_LOG(10, "INFO: get_queue_head_and_size: head size is %lu\n", head_size); + + //read the head + bufferlist bl_head; + ret = cls_cxx_read2(hctx, sizeof(uint64_t), (head_size - sizeof(uint64_t)), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + return ret; + } + + //decode head + iter = bl_head.cbegin(); + try { + decode(head, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: get_queue_head_and_size: failed to decode head\n"); + return -EINVAL; + } + + return 0; +} + +int cls_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_create_queue_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_create_queue(): failed to decode entry\n"); + return -EINVAL; + } + + // create the object + int ret = cls_cxx_create(hctx, true); + if (ret < 0) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_create returned %d", __func__, ret); + return ret; + } + + CLS_LOG(10, "INFO: cls_create_queue create queue of head size %lu", op.head_size); + CLS_LOG(10, "INFO: cls_create_queue create queue of size %lu", op.head.queue_size); + + uint64_t head_size = QUEUE_HEAD_SIZE_1K; + + if (op.head.has_urgent_data) { + if (op.head_size == 0) { + head_size = QUEUE_HEAD_SIZE_4K; + op.head.tail = op.head.front = QUEUE_START_OFFSET_4K; + op.head.last_entry_offset = QUEUE_START_OFFSET_4K; + } else { + head_size = op.head_size; + op.head.tail = op.head.front = head_size; + op.head.last_entry_offset = head_size; + } + } else { + head_size = QUEUE_HEAD_SIZE_1K; + op.head.tail = op.head.front = QUEUE_START_OFFSET_1K; + op.head.last_entry_offset = QUEUE_START_OFFSET_1K; + } + op.head.queue_size += head_size; + + CLS_LOG(10, "INFO: cls_create_queue queue actual size %lu", op.head.queue_size); + CLS_LOG(10, "INFO: cls_create_queue head size %lu", head_size); + CLS_LOG(10, "INFO: cls_create_queue queue front offset %lu", op.head.front); + + + //encode head size + bufferlist bl; + encode(head_size, bl); + CLS_LOG(0, "INFO: cls_create_queue head size %u", bl.length()); + + //encode head + bufferlist bl_head; + encode(op.head, bl_head); + + bl.claim_append(bl_head); + + CLS_LOG(0, "INFO: cls_create_queue writing head of size %u", bl.length()); + ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_write returned %d", __func__, ret); + return ret; + } + return 0; +} + +int cls_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head and its size + uint64_t head_size = 0; + cls_queue_head head; + int ret = get_queue_head_and_size(hctx, head, head_size); + + //head is already initialized + if (ret == 0) { + CLS_LOG(1, "INFO: cls_init_queue_op(): head is already initialized\n"); + return 0; + } + + if (ret < 0 && ret != -EINVAL) { + return ret; + } + + auto in_iter = in->cbegin(); + cls_init_queue_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_init_queue_op(): failed to decode entry\n"); + return -EINVAL; + } + + CLS_LOG(10, "INFO: cls_init_queue_op init queue of head size %lu", op.head_size); + CLS_LOG(10, "INFO: cls_init_queue_op init queue of size %lu", op.head.queue_size); + + head_size = QUEUE_HEAD_SIZE_1K; + + if (op.head.has_urgent_data) { + if (op.head_size == 0) { + head_size = QUEUE_HEAD_SIZE_4K; + op.head.tail = op.head.front = QUEUE_START_OFFSET_4K; + op.head.last_entry_offset = QUEUE_START_OFFSET_4K; + } else { + head_size = op.head_size; + op.head.tail = op.head.front = head_size; + op.head.last_entry_offset = head_size; + } + } else { + head_size = QUEUE_HEAD_SIZE_1K; + op.head.tail = op.head.front = QUEUE_START_OFFSET_1K; + op.head.last_entry_offset = QUEUE_START_OFFSET_1K; + } + op.head.queue_size += head_size; + + CLS_LOG(10, "INFO: cls_init_queue_op queue actual size %lu", op.head.queue_size); + CLS_LOG(10, "INFO: cls_init_queue_op head size %lu", head_size); + CLS_LOG(10, "INFO: cls_init_queue_op queue front offset %lu", op.head.front); + + //encode head size + bufferlist bl; + encode(head_size, bl); + CLS_LOG(0, "INFO: cls_init_queue_op head size %u", bl.length()); + + //encode head + bufferlist bl_head; + encode(op.head, bl_head); + + bl.claim_append(bl_head); + + CLS_LOG(0, "INFO: cls_init_queue_op writing head of size %u", bl.length()); + ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(0, "ERROR: %s(): cls_init_queue_op returned %d", __func__, ret); + return ret; + } + return 0; +} + +int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head and its size + uint64_t head_size = 0; + cls_queue_head head; + int ret = get_queue_head_and_size(hctx, head, head_size); + if (ret < 0) { + return ret; + } + + head.queue_size -= head_size; + + CLS_LOG(10, "INFO: cls_get_queue_size: size of queue is %lu\n", head.queue_size); + + encode(head.queue_size, *out); + + return 0; +} + +int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head and its size + uint64_t head_size = 0; + cls_queue_head head; + int ret = get_queue_head_and_size(hctx, head, head_size); + if (ret < 0) { + return ret; + } + + if ((head.front == head.tail) && (! head.is_empty)) { + CLS_LOG(1, "ERROR: No space left in queue\n"); + return -ENOSPC; + } + + auto iter = in->cbegin(); + cls_enqueue_op op; + try { + decode(op, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_enqueue: failed to decode input data \n"); + return -EINVAL; + } + + for (auto bl_data : op.bl_data_vec) { + bufferlist bl; + uint64_t data_size = bl_data.length(); + encode(data_size, bl); + bl.claim_append(bl_data); + + CLS_LOG(1, "INFO: cls_enqueue(): Total size to be written is %u and data size is %u\n", bl.length(), bl_data.length()); + + if (head.tail >= head.front) { + // check if data can fit in the remaining space in queue + if ((head.tail + bl.length()) <= head.queue_size) { + CLS_LOG(1, "INFO: cls_enqueue: Writing data size and data: offset: %lu, size: %u\n", head.tail, bl.length()); + head.last_entry_offset = head.tail; + //write data size and data at tail offset + ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail += bl.length(); + } else { + CLS_LOG(1, "INFO: Wrapping around and checking for free space\n"); + uint64_t free_space_available = (head.queue_size - head.tail) + (head.front - head_size); + //Split data if there is free space available + if (bl.length() <= free_space_available) { + uint64_t size_before_wrap = head.queue_size - head.tail; + bufferlist bl_data_before_wrap; + bl.splice(0, size_before_wrap, &bl_data_before_wrap); + head.last_entry_offset = head.tail; + //write spliced (data size and data) at tail offset + CLS_LOG(1, "INFO: cls_enqueue: Writing spliced data at offset: %lu and data size: %u\n", head.tail, bl_data_before_wrap.length()); + ret = cls_cxx_write2(hctx, head.tail, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail = head_size; + //write remaining data at tail offset after wrapping around + CLS_LOG(1, "INFO: cls_enqueue: Writing remaining data at offset: %lu and data size: %u\n", head.tail, bl.length()); + ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail += bl.length(); + } else { + CLS_LOG(1, "ERROR: No space left in queue\n"); + // return queue full error + return -ENOSPC; + } + } + } else if (head.front > head.tail) { + if ((head.tail + bl.length()) <= head.front) { + CLS_LOG(1, "INFO: cls_enqueue: Writing data size and data: offset: %lu, size: %u\n\n", head.tail, bl.length()); + head.last_entry_offset = head.tail; + //write data size and data at tail offset + ret = cls_cxx_write2(hctx, head.tail, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail += bl.length(); + } else { + CLS_LOG(1, "ERROR: No space left in queue\n"); + // return queue full error + return -ENOSPC; + } + } + + if (head.tail == head.queue_size) { + head.tail = head_size; + } + CLS_LOG(1, "INFO: cls_enqueue: New tail offset: %lu \n", head.tail); + } //end - for + + head.is_empty = false; + + //Update urgent data if set + if (op.bl_urgent_data.length() > 0) { + head.bl_urgent_data = op.bl_urgent_data; + } + + bufferlist bl_head; + encode(head, bl_head); + CLS_LOG(1, "INFO: cls_enqueue: Writing head of size: %u \n", bl_head.length()); + ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(1, "INFO: cls_enqueue: Writing head returned error: %d \n", ret); + return ret; + } + + return 0; +} + +int cls_dequeue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head and its size + uint64_t head_size = 0; + cls_queue_head head; + int ret = get_queue_head_and_size(hctx, head, head_size); + if (ret < 0) { + return ret; + } + + if (head.front == head.tail && head.is_empty) { + CLS_LOG(1, "ERROR: Queue is empty\n"); + return -ENOENT; + } + + uint64_t data_size = 0; + bufferlist bl_size; + + if (head.front < head.tail) { + //Read size of data first + ret = cls_cxx_read(hctx, head.front, sizeof(uint64_t), &bl_size); + if (ret < 0) { + return ret; + } + auto iter = bl_size.cbegin(); + try { + decode(data_size, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_dequeue: failed to decode data size \n"); + return -EINVAL; + } + CLS_LOG(1, "INFO: cls_dequeue: Data size: %lu, front offset: %lu\n", data_size, head.front); + head.front += sizeof(uint64_t); + //Read data based on size obtained above + CLS_LOG(1, "INFO: cls_dequeue: Data is read from from front offset %lu\n", head.front); + ret = cls_cxx_read2(hctx, head.front, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.front += data_size; + } else if (head.front >= head.tail) { + uint64_t actual_data_size = head.queue_size - head.front; + if (actual_data_size < sizeof(uint64_t)) { + //Case 1. Data size has been spliced, first reconstruct data size + CLS_LOG(1, "INFO: cls_dequeue: Spliced data size is read from from front offset %lu\n", head.front); + ret = cls_cxx_read2(hctx, head.front, actual_data_size, &bl_size, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.front = head_size; + uint64_t remainder_data_size = sizeof(uint64_t) - actual_data_size; + bufferlist bl_rem_data_size; + CLS_LOG(1, "INFO: cls_dequeue: Remainder Spliced data size is read from from front offset %lu\n", head.front); + ret = cls_cxx_read(hctx, head.front, remainder_data_size, &bl_rem_data_size); + if (ret < 0) { + return ret; + } + bl_size.claim_append(bl_rem_data_size); + auto iter = bl_size.cbegin(); + try { + decode(data_size, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_dequeue: failed to decode data size \n"); + return -EINVAL; + } + head.front += remainder_data_size; + CLS_LOG(1, "INFO: cls_dequeue: Data is read from from front offset %lu\n", head.front); + ret = cls_cxx_read2(hctx, head.front, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.front += data_size; + } else { + ret = cls_cxx_read(hctx, head.front, sizeof(uint64_t), &bl_size); + if (ret < 0) { + return ret; + } + auto iter = bl_size.cbegin(); + try { + decode(data_size, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_dequeue: failed to decode data size \n"); + return -EINVAL; + } + CLS_LOG(1, "INFO: cls_dequeue: Data size: %lu, front offset: %lu\n", data_size, head.front); + head.front += sizeof(uint64_t); + + actual_data_size = head.queue_size - head.front; + + if (actual_data_size < data_size) { + if (actual_data_size != 0) { + //Case 2. Data has been spliced + CLS_LOG(1, "INFO: cls_dequeue: Spliced data is read from from front offset %lu\n", head.front); + ret = cls_cxx_read2(hctx, head.front, actual_data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + } + head.front = head_size; + bufferlist bl_remainder; + uint64_t remainder_size = data_size - actual_data_size; + CLS_LOG(1, "INFO: cls_dequeue: Remaining Data is read from from front offset %lu\n", head.front); + ret = cls_cxx_read2(hctx, head.front, remainder_size, &bl_remainder, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + out->claim_append(bl_remainder); + head.front += remainder_size; + } else { + //Case 3. No splicing + CLS_LOG(1, "INFO: cls_dequeue: Data is read from from front offset %lu\n", head.front); + ret = cls_cxx_read2(hctx, head.front, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.front += data_size; + } + } + } + + //front has reached the end, wrap it around + if (head.front == head.queue_size) { + head.front = head_size; + } + + if (head.front == head.tail) { + head.is_empty = true; + } + //Write head back + bufferlist bl_head; + encode(head, bl_head); + CLS_LOG(1, "INFO: cls_enqueue: Writing head of size: %u and front offset is: %lu\n", bl_head.length(), head.front); + ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(1, "INFO: cls_enqueue: Writing head returned error: %d \n", ret); + return ret; + } + + return 0; +} + +int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head and its size + uint64_t head_size = 0; + cls_queue_head head; + int ret = get_queue_head_and_size(hctx, head, head_size); + if (ret < 0) { + return ret; + } + + // If queue is empty, return from here + if (head.is_empty) { + return -ENOENT; + } + + cls_queue_list_ret op_ret; + CLS_LOG(1, "INFO: cls_queue_list_entries: Is urgent data present: %d\n", head.has_urgent_data); + //Info related to urgent data + op_ret.bl_urgent_data = head.bl_urgent_data; + + auto in_iter = in->cbegin(); + + cls_queue_list_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_list_entries(): failed to decode input data\n"); + return -EINVAL; + } + + uint64_t start_offset = 0; + if (op.start_offset == 0) { + start_offset = head.front; + } else { + start_offset = op.start_offset; + } + + op_ret.is_truncated = true; + uint64_t chunk_size = 1024; + uint64_t contiguous_data_size = 0, size_to_read = 0; + bool wrap_around = false; + + //Calculate length of contiguous data to be read depending on front, tail and start offset + if (head.tail > head.front) { + contiguous_data_size = head.tail - start_offset; + } else if (head.front >= head.tail) { + if (start_offset >= head.front) { + contiguous_data_size = head.queue_size - start_offset; + wrap_around = true; + } else if (start_offset <= head.tail) { + contiguous_data_size = head.tail - start_offset; + } + } + + CLS_LOG(1, "INFO: cls_queue_list_entries(): front is: %lu, tail is %lu\n", head.front, head.tail); + + bool offset_populated = false; + uint64_t num_ops = 0; + bufferlist bl; + do + { + CLS_LOG(1, "INFO: cls_queue_list_entries(): start_offset is %lu\n", start_offset); + + bufferlist bl_chunk; + //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size + if (contiguous_data_size > chunk_size) { + size_to_read = chunk_size; + } else { + size_to_read = contiguous_data_size; + } + CLS_LOG(1, "INFO: cls_queue_list_entries(): size_to_read is %lu\n", size_to_read); + if (size_to_read == 0) { + op_ret.is_truncated = false; + CLS_LOG(1, "INFO: cls_queue_list_entries(): size_to_read is 0, hence breaking out!\n"); + break; + } + + ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk); + if (ret < 0) { + return ret; + } + + //If there is leftover data from previous iteration, append new data to leftover data + bl.claim_append(bl_chunk); + bl_chunk = bl; + bl.clear(); + + CLS_LOG(1, "INFO: cls_queue_list_entries(): size of chunk %u\n", bl_chunk.length()); + + //Process the chunk of data read + unsigned index = 0; + auto it = bl_chunk.cbegin(); + uint64_t size_to_process = bl_chunk.length(); + do { + CLS_LOG(1, "INFO: cls_queue_list_entries(): index: %u, size_to_process: %lu\n", index, size_to_process); + it.seek(index); + uint64_t data_size = 0; + //Populate offset if not done, else don't if already done in previous iteration + if (! offset_populated) { + uint64_t data_offset = start_offset + index; + op_ret.offsets.emplace_back(data_offset); + CLS_LOG(1, "INFO: cls_queue_list_entries(): offset: %lu\n", data_offset); + } + if (size_to_process >= sizeof(uint64_t)) { + try { + decode(data_size, it); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_list_entries: failed to decode data size \n"); + return -EINVAL; + } + } else { + // Copy unprocessed data to bl + bl_chunk.copy(index, size_to_process, bl); + offset_populated = true; + CLS_LOG(1, "INFO: cls_queue_list_entries: not enough data to read data size, breaking out!\n"); + break; + } + CLS_LOG(1, "INFO: cls_queue_list_entries(): data size: %lu\n", data_size); + index += sizeof(uint64_t); + size_to_process -= sizeof(uint64_t); + bufferlist bl_data; + if (data_size <= size_to_process) { + bl_chunk.copy(index, data_size, bl_data); + //Return data here + op_ret.data.emplace_back(bl_data); + index += bl_data.length(); + size_to_process -= bl_data.length(); + } else { + index -= sizeof(uint64_t); + size_to_process += sizeof(uint64_t); + bl_chunk.copy(index, size_to_process, bl); + offset_populated = true; + CLS_LOG(1, "INFO: cls_queue_list_entries(): not enough data to read data, breaking out!\n"); + break; + } + offset_populated = false; + num_ops++; + if (num_ops == op.max) { + CLS_LOG(1, "INFO: cls_queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n"); + break; + } + if (index == bl_chunk.length()) { + break; + } + } while(index < bl_chunk.length()); + + CLS_LOG(1, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max); + + if (num_ops == op.max) { + op_ret.next_offset = start_offset + index; + CLS_LOG(1, "INFO: cls_queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", op_ret.next_offset); + break; + } + + //Calculate new start_offset and contiguous data size + start_offset += size_to_read; + contiguous_data_size -= size_to_read; + if (contiguous_data_size == 0) { + if (wrap_around) { + start_offset = head_size; + contiguous_data_size = head.tail - head_size; + wrap_around = false; + } else { + CLS_LOG(1, "INFO: cls_queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n"); + op_ret.next_offset = head.front; + op_ret.is_truncated = false; + break; + } + } + + } while(num_ops < op.max); + + //Wrap around next offset if it has reached end of queue + if (op_ret.next_offset == head.queue_size) { + op_ret.next_offset = head_size; + } + if (op_ret.next_offset == head.tail) { + op_ret.is_truncated = false; + } + + encode(op_ret, *out); + + return 0; +} + +int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head and its size + uint64_t head_size = 0; + cls_queue_head head; + int ret = get_queue_head_and_size(hctx, head, head_size); + if (ret < 0) { + return ret; + } + + if (head.is_empty) { + return -ENOENT; + } + + auto in_iter = in->cbegin(); + cls_queue_remove_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode input data\n"); + return -EINVAL; + } + + // If start offset is not set or set to zero, then we need to shift it to actual front of queue + if (op.start_offset == 0) { + op.start_offset = head.front; + } + + if (op.start_offset != head.front) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: invalid start offset\n"); + CLS_LOG(1, "ERROR: cls_queue_remove_entries: start offset = %lu\n", op.start_offset); + CLS_LOG(1, "ERROR: cls_queue_remove_entries: front = %lu\n", head.front); + return -EINVAL; + } + + uint64_t end_offset = 0; + uint64_t data_size = 0; + + //Check for data size wrap around + if ((head.queue_size - op.end_offset) >= sizeof(uint64_t)) { + // Read the size from the end offset + bufferlist bl_size; + ret = cls_cxx_read(hctx, op.end_offset, sizeof(uint64_t), &bl_size); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read data size \n"); + return ret; + } + auto iter = bl_size.cbegin(); + try { + decode(data_size, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n"); + return -EINVAL; + } + //Check for data wrap around + if ((head.queue_size - (op.end_offset + sizeof(uint64_t))) >= data_size) { + end_offset = op.end_offset + sizeof(uint64_t) + data_size; + } else { + uint64_t rem_size = data_size - (head.queue_size - (op.end_offset + sizeof(uint64_t))); + end_offset = head_size + rem_size; + } + } else { + bufferlist bl_size; + ret = cls_cxx_read(hctx, op.end_offset, (head.queue_size - op.end_offset), &bl_size); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read first part of data size \n"); + return ret; + } + uint64_t rem_size = sizeof(uint64_t) - (head.queue_size - op.end_offset); + bufferlist bl; + ret = cls_cxx_read(hctx, head_size, rem_size, &bl); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read second part of data size \n"); + return ret; + } + bl_size.claim_append(bl); + auto iter = bl_size.cbegin(); + try { + decode(data_size, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n"); + return -EINVAL; + } + end_offset = head_size + rem_size + data_size; + } + + CLS_LOG(1, "INFO: cls_queue_remove_entries: op.end_offset = %lu\n", op.end_offset); + CLS_LOG(1, "INFO: cls_queue_remove_entries: data_size = %lu\n", data_size); + CLS_LOG(1, "INFO: cls_queue_remove_entries: end_offset = %lu\n", end_offset); + + //Zero out the entries that have been removed, to reclaim storage space + if (end_offset > op.start_offset) { + ret = cls_cxx_write_zero(hctx, op.start_offset, (end_offset - op.start_offset)); + if (ret < 0) { + CLS_LOG(1, "INFO: cls_queue_remove_entries: Failed to zero out entries\n"); + CLS_LOG(1, "INFO: cls_queue_remove_entries: Start offset = %lu\n", op.start_offset); + CLS_LOG(1, "INFO: cls_queue_remove_entries: Length = %lu\n", (end_offset - op.start_offset)); + return ret; + } + } else { //start offset > end offset + ret = cls_cxx_write_zero(hctx, op.start_offset, (head.queue_size - op.start_offset)); + if (ret < 0) { + CLS_LOG(1, "INFO: cls_queue_remove_entries: Failed to zero out entries\n"); + CLS_LOG(1, "INFO: cls_queue_remove_entries: Start offset = %lu\n", op.start_offset); + CLS_LOG(1, "INFO: cls_queue_remove_entries: Length = %lu\n", (head.queue_size - op.start_offset)); + return ret; + } + ret = cls_cxx_write_zero(hctx, head_size, (end_offset - head_size)); + if (ret < 0) { + CLS_LOG(1, "INFO: cls_queue_remove_entries: Failed to zero out entries\n"); + CLS_LOG(1, "INFO: cls_queue_remove_entries: Start offset = %lu\n", head_size); + CLS_LOG(1, "INFO: cls_queue_remove_entries: Length = %lu\n", (end_offset - head_size)); + return ret; + } + } + + head.front = end_offset; + + // Check if it is the end, then wrap around + if (head.front == head.queue_size) { + head.front = head_size; + } + + CLS_LOG(1, "INFO: cls_queue_remove_entries: front offset is: %lu and tail offset is %lu\n", head.front, head.tail); + + // We've reached the last element + if (head.front == head.tail) { + CLS_LOG(1, "INFO: cls_queue_remove_entries: Queue is empty now!\n"); + head.is_empty = true; + } + + //Update urgent data map + if (op.bl_urgent_data.length() > 0) { + head.bl_urgent_data = op.bl_urgent_data; + } + + //Write head back + bufferlist bl_head; + encode(head, bl_head); + CLS_LOG(1, "INFO: cls_queue_remove_entries: Writing head of size: %u and front offset is: %lu\n", bl_head.length(), head.front); + ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(1, "INFO: cls_queue_remove_entries: Writing head returned error: %d \n", ret); + return ret; + } + + return 0; +} + +int cls_queue_get_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head and its size + uint64_t head_size = 0; + cls_queue_head head; + int ret = get_queue_head_and_size(hctx, head, head_size); + if (ret < 0) { + return ret; + } + + uint64_t data_size = 0, last_entry_offset = head.last_entry_offset; + + //Check for data size wrap around + if ((head.queue_size - last_entry_offset) >= sizeof(uint64_t)) { + // Read the size from the end offset + bufferlist bl_size; + ret = cls_cxx_read(hctx, last_entry_offset, sizeof(uint64_t), &bl_size); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_queue_get_last_entry: failed to read data size \n"); + return ret; + } + auto iter = bl_size.cbegin(); + try { + decode(data_size, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_get_last_entry: failed to decode data size \n"); + return -EINVAL; + } + last_entry_offset += sizeof(uint64_t); + //Check for data wrap around + if ((head.queue_size - (last_entry_offset + sizeof(uint64_t))) >= data_size) { + CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data is read from from last entry offset %lu\n", last_entry_offset); + ret = cls_cxx_read2(hctx, last_entry_offset, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + } else { + CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data is read from from last entry offset %lu\n", last_entry_offset); + ret = cls_cxx_read2(hctx, last_entry_offset, (head.queue_size - (last_entry_offset + sizeof(uint64_t))), out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + uint64_t rem_size = data_size - (head.queue_size - (last_entry_offset + sizeof(uint64_t))); + bufferlist bl; + ret = cls_cxx_read(hctx, head_size, rem_size, &bl); + if (ret < 0) { + return ret; + } + out->claim_append(bl); + } + } else { + bufferlist bl_size; + ret = cls_cxx_read(hctx, last_entry_offset, (head.queue_size - last_entry_offset), &bl_size); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read first part of data size \n"); + return ret; + } + uint64_t rem_size = sizeof(uint64_t) - (head.queue_size - last_entry_offset); + bufferlist bl; + ret = cls_cxx_read(hctx, head_size, rem_size, &bl); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to read second part of data size \n"); + return ret; + } + bl_size.claim_append(bl); + auto iter = bl_size.cbegin(); + try { + decode(data_size, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode data size \n"); + return -EINVAL; + } + last_entry_offset = head_size + rem_size; + CLS_LOG(1, "INFO: cls_queue_get_last_entry: Data is read from from last entry offset %lu\n", last_entry_offset); + ret = cls_cxx_read2(hctx, last_entry_offset, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + } + + return 0; +} + +int cls_queue_update_last_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head and its size + uint64_t head_size = 0; + cls_queue_head head; + int ret = get_queue_head_and_size(hctx, head, head_size); + if (ret < 0) { + return ret; + } + + auto in_iter = in->cbegin(); + + cls_queue_update_last_entry_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_update_last_entry: failed to decode input data\n"); + return -EINVAL; + } + + bufferlist bl; + uint64_t data_size = op.bl_data.length(); + encode(data_size, bl); + bl.claim_append(op.bl_data); + + CLS_LOG(1, "INFO: cls_queue_update_last_entry_op: Updating data at last offset: %lu and total data size is %u\n", head.last_entry_offset, bl.length()); + + // check if data can fit in the remaining space in queue + if ((head.last_entry_offset + bl.length()) <= head.queue_size) { + //write data size and data at tail offset + ret = cls_cxx_write2(hctx, head.last_entry_offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + } else { + CLS_LOG(1, "INFO: Wrapping around and checking for free space\n"); + uint64_t free_space_available = (head.queue_size - head.last_entry_offset) + (head.front - head_size); + //Split data if there is free space available + if (bl.length() <= free_space_available) { + uint64_t size_before_wrap = head.queue_size - head.last_entry_offset; + bufferlist bl_data_before_wrap; + bl.splice(0, size_before_wrap, &bl_data_before_wrap); + //write spliced (data size and data) at last entry offset + CLS_LOG(1, "INFO: cls_enqueue: Writing spliced data at offset: %lu and data size: %u\n", head.last_entry_offset, bl_data_before_wrap.length()); + ret = cls_cxx_write2(hctx, head.last_entry_offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + //write remaining data after wrapping around + CLS_LOG(1, "INFO: cls_enqueue: Writing remaining data at offset: %lu and data size: %u\n", head_size, bl.length()); + ret = cls_cxx_write2(hctx, head_size, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + } + else { + CLS_LOG(1, "ERROR: No space left in queue\n"); + // return queue full error + return -ENOSPC; + } + } + + if (op.bl_urgent_data.length() > 0) { + head.bl_urgent_data = op.bl_urgent_data; + } + + bufferlist bl_head; + encode(head, bl_head); + CLS_LOG(1, "INFO: cls_queue_update_last_entry: Writing head of size: %u \n", bl_head.length()); + ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(1, "INFO: cls_queue_update_last_entry: Writing head returned error: %d \n", ret); + return ret; + } + return 0; +} + +int cls_queue_read_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head and its size + uint64_t head_size = 0; + cls_queue_head head; + int ret = get_queue_head_and_size(hctx, head, head_size); + if (ret < 0) { + return ret; + } + + CLS_LOG(1, "INFO: cls_queue_read_urgent_data: tail offset %lu\n", head.tail); + + cls_queue_urgent_data_ret op_ret; + + op_ret.bl_urgent_data = head.bl_urgent_data; + + encode(op_ret, *out); + + return 0; +} + +int cls_queue_write_urgent_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + //get head and its size + uint64_t head_size = 0; + cls_queue_head head; + int ret = get_queue_head_and_size(hctx, head, head_size); + if (ret < 0) { + return ret; + } + + CLS_LOG(1, "INFO: cls_queue_write_urgent_data: tail offset %lu\n", head.tail); + + auto in_iter = in->cbegin(); + + cls_queue_write_urgent_data_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_write_urgent_data: failed to decode input data\n"); + return -EINVAL; + } + //Write urgent data + head.bl_urgent_data = op.bl_urgent_data; + + //Write head back + bufferlist bl_head; + encode(head, bl_head); + CLS_LOG(1, "INFO: cls_queue_write_urgent_data: Writing head of size: %u\n", bl_head.length()); + ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(1, "INFO: cls_queue_write_urgent_data: Writing head returned error: %d \n", ret); + return ret; + } + + return 0; +} + +int cls_queue_can_urgent_data_fit(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bool can_fit = true; + + //get head and its size + uint64_t head_size = 0; + cls_queue_head head; + int ret = get_queue_head_and_size(hctx, head, head_size); + if (ret < 0) { + return ret; + } + + head.bl_urgent_data = *in; + + bufferlist bl_head; + encode(head, bl_head); + + if(bl_head.length() > head_size) { + can_fit = false; + } + + encode(can_fit, *out); + + return 0; +} + diff --git a/src/cls/queue/cls_queue_types.cc b/src/cls/queue/cls_queue_types.cc index 6c4bac725b4..b5374b28c19 100644 --- a/src/cls/queue/cls_queue_types.cc +++ b/src/cls/queue/cls_queue_types.cc @@ -10,9 +10,8 @@ void cls_queue_head::dump(Formatter *f) const f->dump_bool("is_empty", is_empty); f->dump_unsigned("front", front); f->dump_unsigned("tail", tail); - f->dump_unsigned("size", size); + f->dump_unsigned("size", queue_size); f->dump_unsigned("has_urgent_data", front); - f->dump_unsigned("bl_urgent_data", size); } void cls_queue_head::generate_test_instances(list& ls) diff --git a/src/cls/queue/cls_queue_types.h b/src/cls/queue/cls_queue_types.h index dd09b0acebd..96630ea65f9 100644 --- a/src/cls/queue/cls_queue_types.h +++ b/src/cls/queue/cls_queue_types.h @@ -1,11 +1,10 @@ #ifndef CEPH_CLS_QUEUE_TYPES_H #define CEPH_CLS_QUEUE_TYPES_H +#include "include/types.h" #include "common/ceph_time.h" #include "common/Formatter.h" -#include "rgw/rgw_basic_types.h" - #define QUEUE_HEAD_SIZE_1K 1024 //Actual start offset of queue data #define QUEUE_START_OFFSET_1K QUEUE_HEAD_SIZE_1K @@ -18,11 +17,8 @@ struct cls_queue_head { uint64_t front = QUEUE_START_OFFSET_1K; uint64_t tail = QUEUE_START_OFFSET_1K; - uint64_t size{0}; // size of queue requested by user, with head size added to it + uint64_t queue_size{0}; // size of queue requested by user, with head size added to it uint64_t last_entry_offset = QUEUE_START_OFFSET_1K; - uint32_t num_urgent_data_entries{0}; // requested by user - uint32_t num_head_urgent_entries{0}; // actual number of entries in head - uint32_t num_xattr_urgent_entries{0}; // actual number of entries in xattr in case of spill over bool is_empty{true}; bool has_urgent_data{false}; bufferlist bl_urgent_data; // special data known to application using queue @@ -31,11 +27,8 @@ struct cls_queue_head ENCODE_START(1, 1, bl); encode(front, bl); encode(tail, bl); - encode(size, bl); + encode(queue_size, bl); encode(last_entry_offset, bl); - encode(num_urgent_data_entries, bl); - encode(num_head_urgent_entries, bl); - encode(num_xattr_urgent_entries, bl); encode(is_empty, bl); encode(has_urgent_data, bl); encode(bl_urgent_data, bl); @@ -46,11 +39,8 @@ struct cls_queue_head DECODE_START(1, bl); decode(front, bl); decode(tail, bl); - decode(size, bl); + decode(queue_size, bl); decode(last_entry_offset, bl); - decode(num_urgent_data_entries, bl); - decode(num_head_urgent_entries, bl); - decode(num_xattr_urgent_entries, bl); decode(is_empty, bl); decode(has_urgent_data, bl); decode(bl_urgent_data, bl); diff --git a/src/cls/queue/cls_rgw_queue.cc b/src/cls/queue/cls_rgw_queue.cc new file mode 100644 index 00000000000..ef289d9bf02 --- /dev/null +++ b/src/cls/queue/cls_rgw_queue.cc @@ -0,0 +1,686 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" + +#include + +#include "objclass/objclass.h" +#include "cls/rgw/cls_rgw_ops.h" +#include "cls/rgw/cls_rgw_types.h" +#include "cls/queue/cls_queue_types.h" +#include "cls/queue/cls_rgw_queue_types.h" +#include "cls/queue/cls_queue_ops.h" +#include "cls/queue/cls_rgw_queue_ops.h" +#include "cls/queue/cls_queue_const.h" +#include "cls/queue/cls_queue.h" + +#include +#include + +#include "common/ceph_context.h" +#include "global/global_context.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +#define GC_LIST_DEFAULT_MAX 128 + +CLS_VER(1,0) +CLS_NAME(rgw_queue) + +static int cls_gc_create_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_gc_create_queue_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_create_queue: failed to decode entry\n"); + return -EINVAL; + } + + cls_gc_urgent_data urgent_data; + urgent_data.num_urgent_data_entries = op.num_urgent_data_entries; + + cls_create_queue_op create_op; + + CLS_LOG(10, "INFO: cls_gc_create_queue: queue size is %lu\n", op.size); + CLS_LOG(10, "INFO: cls_gc_create_queue: queue name is %s\n", op.name.c_str()); + create_op.head.queue_size = op.size; + create_op.head_size = g_ceph_context->_conf->rgw_gc_queue_head_size; + create_op.head.has_urgent_data = true; + encode(urgent_data, create_op.head.bl_urgent_data); + + in->clear(); + encode(create_op, *in); + + return cls_create_queue(hctx, in, out); +} + +static int cls_gc_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_gc_init_queue_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_init_queue: failed to decode entry\n"); + return -EINVAL; + } + + cls_gc_urgent_data urgent_data; + urgent_data.num_urgent_data_entries = op.num_urgent_data_entries; + + cls_init_queue_op init_op; + + CLS_LOG(10, "INFO: cls_gc_init_queue: queue size is %lu\n", op.size); + CLS_LOG(10, "INFO: cls_gc_init_queue: queue name is %s\n", op.name.c_str()); + init_op.head.queue_size = op.size; + init_op.head_size = g_ceph_context->_conf->rgw_gc_queue_head_size; + init_op.head.has_urgent_data = true; + encode(urgent_data, init_op.head.bl_urgent_data); + + in->clear(); + encode(init_op, *in); + + return cls_init_queue(hctx, in, out); +} + +static int cls_gc_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_rgw_gc_set_entry_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_enqueue: failed to decode entry\n"); + return -EINVAL; + } + + op.info.time = ceph::real_clock::now(); + op.info.time += make_timespan(op.expiration_secs); + + cls_enqueue_op enqueue_op; + bufferlist bl_data; + encode(op.info, bl_data); + enqueue_op.bl_data_vec.emplace_back(bl_data); + + CLS_LOG(1, "INFO: cls_gc_enqueue: Data size is: %u \n", bl_data.length()); + + in->clear(); + encode(enqueue_op, *in); + + return cls_enqueue(hctx, in, out); +} + +static int cls_gc_dequeue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + int r = cls_dequeue(hctx, in, out); + if (r < 0) + return r; + + cls_rgw_gc_obj_info data; + auto iter = out->cbegin(); + try { + decode(data, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_dequeue(): failed to decode entry\n"); + return -EINVAL; + } + + CLS_LOG(1, "INFO: tag of gc info is %s\n", data.tag.c_str()); + + return 0; +} + +static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + CLS_LOG(1, "INFO: cls_gc_queue_list(): Entered cls_gc_queue_list \n"); + auto in_iter = in->cbegin(); + + cls_rgw_gc_list_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode input\n"); + return -EINVAL; + } + + cls_queue_list_op list_op; + if (op.marker.empty()) { + list_op.start_offset = 0; + } else { + list_op.start_offset = boost::lexical_cast(op.marker.c_str()); + } + + if (! op.max) { + op.max = GC_LIST_DEFAULT_MAX; + } + + list_op.max = op.max; + + cls_queue_list_ret op_ret; + cls_rgw_gc_list_ret list_ret; + uint32_t num_entries = 0; //Entries excluding the deferred ones + bool urgent_data_decoded = false; + cls_gc_urgent_data urgent_data; + do { + in->clear(); + encode(list_op, *in); + + CLS_LOG(1, "INFO: cls_gc_queue_list(): Entering cls_queue_list_entries \n"); + int ret = cls_queue_list_entries(hctx, in, out); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_queue_list_entries(): returned error %d\n", ret); + return ret; + } + + auto iter = out->cbegin(); + try { + decode(op_ret, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode output\n"); + return -EINVAL; + } + + //Each cls_queue_list_entries will fetch the same urgent data, decode it only once + if (! urgent_data_decoded) { + auto iter_urgent_data = op_ret.bl_urgent_data.cbegin(); + try { + decode(urgent_data, iter_urgent_data); + urgent_data_decoded = true; + } catch (buffer::error& err) { + CLS_LOG(5, "ERROR: cls_gc_queue_list(): failed to decode urgent data\n"); + return -EINVAL; + } + } + + if (op_ret.data.size()) { + for (auto it : op_ret.data) { + cls_rgw_gc_obj_info info; + try { + decode(info, it); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode gc info\n"); + return -EINVAL; + } + bool found = false; + //Check for info tag in urgent data map + if (urgent_data.urgent_data_map.size() > 0) { + auto it = urgent_data.urgent_data_map.find(info.tag); + if (it != urgent_data.urgent_data_map.end()) { + found = true; + if (it->second > info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in urgent data: %s\n", info.tag.c_str()); + continue; + } + } + } + //Search in xattrs + if (! found) { + bufferlist bl_xattrs; + int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs); + if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret); + return ret; + } + if (ret != -ENOENT && ret != -ENODATA) { + std::unordered_map xattr_urgent_data_map; + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_urgent_data_map, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode xattrs urgent data map\n"); + return -EINVAL; + } //end - catch + if (xattr_urgent_data_map.size() > 0) { + auto it = xattr_urgent_data_map.find(info.tag); + if (it != xattr_urgent_data_map.end()) { + if (it->second > info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in xattrs urgent data map: %s\n", info.tag.c_str()); + continue; + } + } + } // end - if xattrs size ... + } // end - ret != ENOENT && ENODATA + } // end - if not found + if (op.expired_only) { + real_time now = ceph::real_clock::now(); + if (info.time <= now) { + list_ret.entries.emplace_back(info); + } + //Can break out here if info.time > now, since all subsequent entries won't have expired + } else { + list_ret.entries.emplace_back(info); + } + num_entries++; + } + CLS_LOG(1, "INFO: cls_gc_queue_list(): num_entries: %u and op.max: %u\n", num_entries, op.max); + if (num_entries < op.max) { + list_op.max = (op.max - num_entries); + list_op.start_offset = op_ret.next_offset; + out->clear(); + } else { + //We've reached the max number of entries needed + break; + } + } else { + //We dont have data to process + break; + } + } while(op_ret.is_truncated); + + list_ret.truncated = op_ret.is_truncated; + if (list_ret.truncated) { + list_ret.next_marker = boost::lexical_cast(op_ret.next_offset); + } + out->clear(); + encode(list_ret, *out); + return 0; +} + +static int cls_gc_queue_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entered cls_gc_queue_remove \n"); + + auto in_iter = in->cbegin(); + + cls_rgw_gc_queue_remove_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode input\n"); + return -EINVAL; + } + + // List entries and calculate total number of entries (including invalid entries) + cls_queue_list_op list_op; + if (op.marker.empty()) { + list_op.start_offset = 0; + } else { + list_op.start_offset = boost::lexical_cast(op.marker.c_str()); + } + + if (! op.num_entries) { + op.num_entries = GC_LIST_DEFAULT_MAX; + } + + list_op.max = op.num_entries; + bool is_truncated = true; + uint32_t total_num_entries = 0, num_entries = 0; + cls_gc_urgent_data urgent_data; + bool urgent_data_decoded = false; + uint64_t end_offset = 0; + do { + in->clear(); + encode(list_op, *in); + + CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering cls_queue_list_entries \n"); + int ret = cls_queue_list_entries(hctx, in, out); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_queue_list_entries(): returned error %d\n", ret); + return ret; + } + + cls_queue_list_ret op_ret; + auto iter = out->cbegin(); + try { + decode(op_ret, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode output\n"); + return -EINVAL; + } + is_truncated = op_ret.is_truncated; + unsigned int index = 0; + if (! urgent_data_decoded) { + auto iter_urgent_data = op_ret.bl_urgent_data.cbegin(); + try { + decode(urgent_data, iter_urgent_data); + urgent_data_decoded = true; + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode urgent data\n"); + return -EINVAL; + } + } + // If data is not empty + if (op_ret.data.size()) { + for (auto it : op_ret.data) { + cls_rgw_gc_obj_info info; + try { + decode(info, it); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode gc info\n"); + return -EINVAL; + } + CLS_LOG(1, "INFO: cls_gc_queue_remove(): entry: %s\n", info.tag.c_str()); + total_num_entries++; + index++; + bool found = false; + //Search for tag in urgent data map + if (urgent_data.urgent_data_map.size() > 0) { + auto it = urgent_data.urgent_data_map.find(info.tag); + if (it != urgent_data.urgent_data_map.end()) { + found = true; + if (it->second > info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in urgent data: %s\n", info.tag.c_str()); + continue; + } else if (it->second == info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from urgent data: %s\n", info.tag.c_str()); + urgent_data.urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later from queue + urgent_data.num_head_urgent_entries -= 1; + } + }//end-if map end + }//end-if urgent data + if (! found) { + //Search in xattrs + bufferlist bl_xattrs; + int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs); + if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret); + return ret; + } + if (ret != -ENOENT && ret != -ENODATA) { + std::unordered_map xattr_urgent_data_map; + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_urgent_data_map, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode xattrs urgent data map\n"); + return -EINVAL; + } //end - catch + if (xattr_urgent_data_map.size() > 0) { + auto found = xattr_urgent_data_map.find(info.tag); + if (found != xattr_urgent_data_map.end()) { + if (found->second > info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in xattrs urgent data map: %s\n", info.tag.c_str()); + continue; + } else if (found->second == info.time) { + CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from xattrs urgent data: %s\n", info.tag.c_str()); + xattr_urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later + urgent_data.num_xattr_urgent_entries -= 1; + } + } + } // end - if xattrs size ... + } // end - ret != ENOENT && ENODATA + }// search in xattrs + num_entries++; + }//end-for + + if (num_entries < op.num_entries) { + list_op.max = (op.num_entries - num_entries); + list_op.start_offset = op_ret.next_offset; + out->clear(); + } else { + end_offset = op_ret.offsets[index - 1]; + CLS_LOG(1, "INFO: cls_gc_queue_remove(): index is %u and end_offset is: %lu\n", index, end_offset); + break; + } + } //end-if + else { + break; + } + } while(is_truncated); + CLS_LOG(1, "INFO: cls_gc_queue_remove(): Total number of entries to remove: %d\n", total_num_entries); + + if (end_offset != 0) { + cls_queue_remove_op rem_op; + if (op.marker.empty()) { + rem_op.start_offset = 0; + } else { + rem_op.start_offset = boost::lexical_cast(op.marker.c_str()); + } + + rem_op.end_offset = end_offset; + CLS_LOG(1, "INFO: cls_gc_queue_remove(): start offset: %lu and end offset: %lu\n", rem_op.start_offset, rem_op.end_offset); + + encode(urgent_data, rem_op.bl_urgent_data); + + in->clear(); + encode(rem_op, *in); + + CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering cls_queue_remove_entries \n"); + int ret = cls_queue_remove_entries(hctx, in, out); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_queue_remove_entries(): returned error %d\n", ret); + return ret; + } + } + + return 0; +} + +static int cls_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + int ret = 0; + auto in_iter = in->cbegin(); + + cls_gc_defer_entry_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode input\n"); + return -EINVAL; + } + + op.info.time = ceph::real_clock::now(); + op.info.time += make_timespan(op.expiration_secs); + + //Read urgent data + in->clear(); + out->clear(); + + ret = cls_queue_read_urgent_data(hctx, in, out); + + auto out_iter = out->cbegin(); + + cls_queue_urgent_data_ret op_ret; + try { + decode(op_ret, out_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_urgent_data_ret(): failed to decode ouput\n"); + return -EINVAL; + } + + auto bl_iter = op_ret.bl_urgent_data.cbegin(); + cls_gc_urgent_data urgent_data; + try { + decode(urgent_data, bl_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_urgent_data_ret(): failed to decode urgent data\n"); + return -EINVAL; + } + + bool is_last_entry = false; + in->clear(); + out->clear(); + ret = cls_queue_get_last_entry(hctx, in, out); + if (ret < 0) { + return ret; + } + + cls_rgw_gc_obj_info info; + auto iter = out->cbegin(); + try { + decode(info, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode entry\n"); + return -EINVAL; + } + + CLS_LOG(1, "INFO: tag of gc info is %s\n", info.tag.c_str()); + if (info.tag == op.info.tag) { + is_last_entry = true; + } + + //has_urgent_data signifies whether urgent data in queue has changed + bool has_urgent_data = false, tag_found = false; + //search in unordered map in head + auto it = urgent_data.urgent_data_map.find(op.info.tag); + if (it != urgent_data.urgent_data_map.end()) { + it->second = op.info.time; + tag_found = true; + has_urgent_data = true; + } else { //search in xattrs + bufferlist bl_xattrs; + int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs); + if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret); + return ret; + } + if (ret != -ENOENT && ret != -ENODATA) { + std::unordered_map xattr_urgent_data_map; + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_urgent_data_map, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode xattrs urgent data map\n"); + return -EINVAL; + } //end - catch + if (xattr_urgent_data_map.size() > 0) { + auto found = xattr_urgent_data_map.find(info.tag); + if (found != xattr_urgent_data_map.end()) { + it->second = op.info.time; + tag_found = true; + //write the updated map back + bufferlist bl_map; + encode(xattr_urgent_data_map, bl_map); + ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map); + CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data"); + if (ret < 0) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret); + return ret; + } + } + } // end - if xattrs size ... + }// end ret != ENOENT ... + } + + bufferlist bl_urgent_data; + if (! tag_found) { + //try inserting in queue head + urgent_data.urgent_data_map.insert({op.info.tag, op.info.time}); + urgent_data.num_head_urgent_entries += 1; + has_urgent_data = true; + + //check if urgent data can fit in head + out->clear(); + bool can_fit = false; + encode(urgent_data, bl_urgent_data); + ret = cls_queue_can_urgent_data_fit(hctx, &bl_urgent_data, out); + if (ret < 0) { + return ret; + } + iter = out->cbegin(); + decode(can_fit, iter); + CLS_LOG(1, "INFO: Can urgent data fit: %d \n", can_fit); + + //insert as xattrs + if (! can_fit) { + //remove inserted entry from urgent data + urgent_data.urgent_data_map.erase(op.info.tag); + urgent_data.num_head_urgent_entries -= 1; + has_urgent_data = false; + + bufferlist bl_xattrs; + int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs); + if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret); + return ret; + } + std::unordered_map xattr_urgent_data_map; + if (ret != -ENOENT && ret != -ENODATA) { + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_urgent_data_map, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode xattrs urgent data map\n"); + return -EINVAL; + } //end - catch + } + xattr_urgent_data_map.insert({op.info.tag, op.info.time}); + urgent_data.num_xattr_urgent_entries += 1; + has_urgent_data = true; + bufferlist bl_map; + encode(xattr_urgent_data_map, bl_map); + ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map); + CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data"); + if (ret < 0) { + CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret); + return ret; + } + } + } + + if ((urgent_data.num_head_urgent_entries + urgent_data.num_xattr_urgent_entries) > urgent_data.num_urgent_data_entries) { + CLS_LOG(0, "Total num entries %u", urgent_data.num_urgent_data_entries); + CLS_LOG(0, "Num xattr entries %u", urgent_data.num_xattr_urgent_entries); + CLS_LOG(0, "Num head entries %u", urgent_data.num_head_urgent_entries); + CLS_LOG(0, "ERROR: Number of urgent data entries exceeded that requested by user, returning no space!"); + return -ENOSPC; + } + + bl_urgent_data.clear(); + encode(urgent_data, bl_urgent_data); + in->clear(); + if (! is_last_entry) { + cls_enqueue_op enqueue_op; + bufferlist bl_data; + encode(op.info, bl_data); + enqueue_op.bl_data_vec.emplace_back(bl_data); + CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data.length()); + if (has_urgent_data) { + enqueue_op.bl_urgent_data = bl_urgent_data; + } + encode(enqueue_op, *in); + ret = cls_enqueue(hctx, in, out); + if (ret < 0) { + return ret; + } + } else { + cls_queue_update_last_entry_op update_op; + encode(op.info, update_op.bl_data); + CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", update_op.bl_data.length()); + if (has_urgent_data) { + update_op.bl_urgent_data = bl_urgent_data; + } + encode(update_op, *in); + ret = cls_queue_update_last_entry(hctx, in, out); + if (ret < 0) { + return ret; + } + } + + return 0; +} + +CLS_INIT(rgw_queue) +{ + CLS_LOG(1, "Loaded rgw queue class!"); + + cls_handle_t h_class; + cls_method_handle_t h_gc_create_queue; + cls_method_handle_t h_gc_init_queue; + cls_method_handle_t h_gc_enqueue; + cls_method_handle_t h_gc_dequeue; + cls_method_handle_t h_gc_queue_list_entries; + cls_method_handle_t h_gc_queue_remove_entries; + cls_method_handle_t h_gc_queue_update_entry; + + cls_register(RGW_QUEUE_CLASS, &h_class); + + /* gc */ + cls_register_cxx_method(h_class, GC_CREATE_QUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_create_queue, &h_gc_create_queue); + cls_register_cxx_method(h_class, GC_INIT_QUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_init_queue, &h_gc_init_queue); + cls_register_cxx_method(h_class, GC_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_enqueue, &h_gc_enqueue); + cls_register_cxx_method(h_class, GC_DEQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_dequeue, &h_gc_dequeue); + cls_register_cxx_method(h_class, GC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_gc_queue_list, &h_gc_queue_list_entries); + cls_register_cxx_method(h_class, GC_QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_queue_remove, &h_gc_queue_remove_entries); + cls_register_cxx_method(h_class, GC_QUEUE_UPDATE_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_queue_update_entry, &h_gc_queue_update_entry); + + return; +} + diff --git a/src/cls/queue/cls_rgw_queue_ops.h b/src/cls/queue/cls_rgw_queue_ops.h new file mode 100644 index 00000000000..e2b3ce3405e --- /dev/null +++ b/src/cls/queue/cls_rgw_queue_ops.h @@ -0,0 +1,79 @@ +#ifndef CEPH_CLS_RGW_QUEUE_OPS_H +#define CEPH_CLS_RGW_QUEUE_OPS_H + +#include "cls/rgw/cls_rgw_types.h" +#include "cls/rgw/cls_rgw_ops.h" + +struct cls_gc_create_queue_op { + uint64_t size; + uint64_t num_urgent_data_entries{0}; + string name; //for debugging, to be removed later + + cls_gc_create_queue_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(size, bl); + encode(num_urgent_data_entries, bl); + encode(name, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(size, bl); + decode(num_urgent_data_entries, bl); + decode(name, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_gc_create_queue_op) + +struct cls_gc_init_queue_op : cls_gc_create_queue_op { + +}; +WRITE_CLASS_ENCODER(cls_gc_init_queue_op) + +struct cls_rgw_gc_queue_remove_op { + uint64_t num_entries; + string marker; + + cls_rgw_gc_queue_remove_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(num_entries, bl); + encode(marker, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(num_entries, bl); + decode(marker, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_rgw_gc_queue_remove_op) + +struct cls_gc_defer_entry_op { + uint32_t expiration_secs; + cls_rgw_gc_obj_info info; + cls_gc_defer_entry_op() : expiration_secs(0) {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(expiration_secs, bl); + encode(info, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(expiration_secs, bl); + decode(info, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_gc_defer_entry_op) +#endif /* CEPH_CLS_RGW_QUEUE_OPS_H */ \ No newline at end of file diff --git a/src/cls/queue/cls_rgw_queue_types.h b/src/cls/queue/cls_rgw_queue_types.h new file mode 100644 index 00000000000..95a7f00b134 --- /dev/null +++ b/src/cls/queue/cls_rgw_queue_types.h @@ -0,0 +1,37 @@ +#ifndef CEPH_CLS_RGW_QUEUE_TYPES_H +#define CEPH_CLS_RGW_QUEUE_TYPES_H + +#include "include/types.h" +#include "common/ceph_time.h" +#include "common/Formatter.h" + +#include + +struct cls_gc_urgent_data +{ + std::unordered_map urgent_data_map; + uint32_t num_urgent_data_entries{0}; // requested by user + uint32_t num_head_urgent_entries{0}; // actual number of entries in queue head + uint32_t num_xattr_urgent_entries{0}; // actual number of entries in xattr in case of spill over + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(urgent_data_map, bl); + encode(num_urgent_data_entries, bl); + encode(num_head_urgent_entries, bl); + encode(num_xattr_urgent_entries, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(urgent_data_map, bl); + decode(num_urgent_data_entries, bl); + decode(num_head_urgent_entries, bl); + decode(num_xattr_urgent_entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_gc_urgent_data) + +#endif \ No newline at end of file