set(cls_queue_srcs
queue/cls_queue.cc
queue/cls_queue_src.cc
- queue/cls_queue_ops.cc
- queue/cls_queue_types.cc
${CMAKE_SOURCE_DIR}/src/common/ceph_json.cc)
add_library(cls_queue SHARED ${cls_queue_srcs})
set_target_properties(cls_queue PROPERTIES
CXX_VISIBILITY_PRESET hidden)
install(TARGETS cls_queue DESTINATION ${cls_dir})
-# cls_rgw_queue
+set(cls_queue_client_srcs
+ queue/cls_queue_client.cc)
+add_library(cls_queue_client STATIC ${cls_queue_client_srcs})
+
+# cls_rgw_gc
if (WITH_RADOSGW)
- set(cls_rgw_queue_srcs
- queue/cls_rgw_queue.cc
+ set(cls_rgw_gc_srcs
+ rgw_gc/cls_rgw_gc.cc
queue/cls_queue_src.cc
- queue/cls_queue_ops.cc
- queue/cls_queue_types.cc
${CMAKE_SOURCE_DIR}/src/common/ceph_json.cc)
- add_library(cls_rgw_queue SHARED ${cls_rgw_queue_srcs})
- set_target_properties(cls_rgw_queue PROPERTIES
+ add_library(cls_rgw_gc SHARED ${cls_rgw_gc_srcs})
+ set_target_properties(cls_rgw_gc PROPERTIES
VERSION "1.0.0"
SOVERSION "1"
INSTALL_RPATH ""
CXX_VISIBILITY_PRESET hidden)
- install(TARGETS cls_rgw_queue DESTINATION ${cls_dir})
+ install(TARGETS cls_rgw_gc DESTINATION ${cls_dir})
- set(cls_queue_client_srcs
- queue/cls_queue_client.cc
- queue/cls_queue_types.cc
- queue/cls_queue_ops.cc)
- add_library(cls_queue_client STATIC ${cls_queue_client_srcs})
+ set(cls_rgw_gc_client_srcs
+ rgw_gc/cls_rgw_gc_client.cc)
+ add_library(cls_rgw_gc_client STATIC ${cls_rgw_gc_client_srcs})
endif (WITH_RADOSGW)
CLS_VER(1,0)
CLS_NAME(queue)
-static int cls_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+static int cls_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
auto in_iter = in->cbegin();
cls_queue_init_op op;
return -EINVAL;
}
- return init_queue(hctx, op);
+ return queue_init(hctx, op);
}
-static int cls_get_queue_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+static int cls_queue_get_capacity(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
- cls_queue_get_size_ret op_ret;
- auto ret = get_queue_size(hctx, op_ret);
+ cls_queue_get_capacity_ret op_ret;
+ auto ret = queue_get_capacity(hctx, op_ret);
if (ret < 0) {
return ret;
}
return 0;
}
-static int cls_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+static int cls_queue_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
auto iter = in->cbegin();
cls_queue_enqueue_op op;
try {
decode(op, iter);
} catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_enqueue: failed to decode input data \n");
+ CLS_LOG(1, "ERROR: cls_queue_enqueue: failed to decode input data \n");
return -EINVAL;
}
cls_queue_head head;
- auto ret = get_queue_head(hctx, head);
+ auto ret = queue_read_head(hctx, head);
if (ret < 0) {
return ret;
}
- ret = enqueue(hctx, op, head);
+ ret = queue_enqueue(hctx, op, head);
if (ret < 0) {
return ret;
}
//Write back head
- return write_queue_head(hctx, head);
+ return queue_write_head(hctx, head);
}
static int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
try {
decode(op, in_iter);
} catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_queue_list_entries(): failed to decode input data\n");
+ CLS_LOG(5, "ERROR: cls_queue_list_entries(): failed to decode input data\n");
return -EINVAL;
}
cls_queue_head head;
- auto ret = get_queue_head(hctx, head);
+ auto ret = queue_read_head(hctx, head);
if (ret < 0) {
return ret;
}
try {
decode(op, in_iter);
} catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_queue_remove_entries: failed to decode input data\n");
+ CLS_LOG(5, "ERROR: cls_queue_remove_entries: failed to decode input data\n");
return -EINVAL;
}
cls_queue_head head;
- auto ret = get_queue_head(hctx, head);
+ auto ret = queue_read_head(hctx, head);
if (ret < 0) {
return ret;
}
if (ret < 0) {
return ret;
}
- return write_queue_head(hctx, head);
+ return queue_write_head(hctx, head);
}
CLS_INIT(queue)
CLS_LOG(1, "Loaded queue class!");
cls_handle_t h_class;
- cls_method_handle_t h_init_queue;
- cls_method_handle_t h_get_queue_size;
- cls_method_handle_t h_enqueue;
+ cls_method_handle_t h_queue_init;
+ cls_method_handle_t h_queue_get_capacity;
+ cls_method_handle_t h_queue_enqueue;
cls_method_handle_t h_queue_list_entries;
cls_method_handle_t h_queue_remove_entries;
cls_register(QUEUE_CLASS, &h_class);
/* queue*/
- cls_register_cxx_method(h_class, INIT_QUEUE, CLS_METHOD_WR, cls_init_queue, &h_init_queue);
- cls_register_cxx_method(h_class, GET_QUEUE_SIZE, CLS_METHOD_RD, cls_get_queue_size, &h_get_queue_size);
- cls_register_cxx_method(h_class, ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_enqueue, &h_enqueue);
+ cls_register_cxx_method(h_class, QUEUE_INIT, CLS_METHOD_WR, cls_queue_init, &h_queue_init);
+ cls_register_cxx_method(h_class, QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_queue_get_capacity, &h_queue_get_capacity);
+ cls_register_cxx_method(h_class, QUEUE_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_enqueue, &h_queue_enqueue);
cls_register_cxx_method(h_class, QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_queue_list_entries, &h_queue_list_entries);
cls_register_cxx_method(h_class, QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_remove_entries, &h_queue_remove_entries);
// vim: ts=8 sw=2 smarttab
#include <errno.h>
-#include "cls/rgw/cls_rgw_ops.h"
-#include "cls/queue/cls_rgw_queue_ops.h"
#include "cls/queue/cls_queue_ops.h"
#include "cls/queue/cls_queue_const.h"
#include "cls/queue/cls_queue_client.h"
-#include "common/debug.h"
-
using namespace librados;
-void cls_rgw_gc_init_queue(ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries)
+void cls_queue_init(ObjectWriteOperation& op, const string& queue_name, uint64_t size)
{
bufferlist in;
- cls_gc_init_queue_op call;
- call.name = queue_name;
- call.size = size;
- call.num_urgent_data_entries = num_urgent_data_entries;
+ cls_queue_init_op call;
+ call.has_urgent_data = false;
+ call.head.queue_size = size;
encode(call, in);
- op.exec(RGW_QUEUE_CLASS, GC_INIT_QUEUE, in);
+ op.exec(QUEUE_CLASS, QUEUE_INIT, in);
}
-int cls_rgw_gc_get_queue_size(IoCtx& io_ctx, string& oid, uint64_t& size)
+int cls_queue_get_capacity(IoCtx& io_ctx, const string& oid, uint64_t& size)
{
bufferlist in, out;
- int r = io_ctx.exec(oid, QUEUE_CLASS, GET_QUEUE_SIZE, in, out);
+ int r = io_ctx.exec(oid, QUEUE_CLASS, QUEUE_GET_CAPACITY, in, out);
if (r < 0)
return r;
- cls_queue_get_size_ret op_ret;
+ cls_queue_get_capacity_ret op_ret;
auto iter = out.cbegin();
try {
decode(op_ret, iter);
return -EIO;
}
- size = op_ret.queue_size;
+ size = op_ret.queue_capacity;
return 0;
}
-void cls_rgw_gc_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info)
+void cls_queue_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, vector<bufferlist> bl_data_vec)
{
bufferlist in;
- cls_rgw_gc_set_entry_op call;
- call.expiration_secs = expiration_secs;
- call.info = info;
+ cls_queue_enqueue_op call;
+ call.bl_data_vec = std::move(bl_data_vec);
encode(call, in);
- op.exec(RGW_QUEUE_CLASS, GC_ENQUEUE, in);
+ op.exec(QUEUE_CLASS, QUEUE_ENQUEUE, in);
}
-int cls_rgw_gc_list_queue(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only,
- list<cls_rgw_gc_obj_info>& entries, bool *truncated, string& next_marker)
+int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max,
+ vector<cls_queue_entry>& entries,
+ bool *truncated, string& next_marker)
{
bufferlist in, out;
- cls_rgw_gc_list_op op;
- op.marker = marker;
+ cls_queue_list_op op;
+ op.start_marker = marker;
op.max = max;
- op.expired_only = expired_only;
encode(op, in);
- int r = io_ctx.exec(oid, RGW_QUEUE_CLASS, GC_QUEUE_LIST_ENTRIES, in, out);
+ int r = io_ctx.exec(oid, QUEUE_CLASS, QUEUE_LIST_ENTRIES, in, out);
if (r < 0)
return r;
- cls_rgw_gc_list_ret ret;
+ cls_queue_list_ret ret;
auto iter = out.cbegin();
try {
decode(ret, iter);
return -EIO;
}
- entries.swap(ret.entries);
-
- *truncated = ret.truncated;
+ entries = std::move(ret.entries);
+ *truncated = ret.is_truncated;
next_marker = std::move(ret.next_marker);
return 0;
}
-void cls_rgw_gc_remove_entries_queue(ObjectWriteOperation& op, uint32_t num_entries)
+void cls_queue_remove_entries(ObjectWriteOperation& op, const string& end_marker)
{
bufferlist in, out;
- cls_rgw_gc_queue_remove_op rem_op;
- rem_op.num_entries = num_entries;
+ cls_queue_remove_op rem_op;
+ rem_op.end_marker = end_marker;
encode(rem_op, in);
- op.exec(RGW_QUEUE_CLASS, GC_QUEUE_REMOVE_ENTRIES, in);
-}
-
-void cls_rgw_gc_defer_entry_queue(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info)
-{
- bufferlist in;
- cls_gc_defer_entry_op defer_op;
- defer_op.expiration_secs = expiration_secs;
- defer_op.info = info;
- encode(defer_op, in);
- op.exec(RGW_QUEUE_CLASS, GC_QUEUE_UPDATE_ENTRY, in);
+ op.exec(QUEUE_CLASS, QUEUE_REMOVE_ENTRIES, in);
}
#ifndef CEPH_CLS_QUEUE_CLIENT_H
#define CEPH_CLS_QUEUE_CLIENT_H
-#include "include/str_list.h"
#include "include/rados/librados.hpp"
#include "cls/queue/cls_queue_types.h"
#include "cls_queue_ops.h"
-#include "common/RefCountedObj.h"
-#include "include/compat.h"
#include "common/ceph_time.h"
-#include "common/Cond.h"
-void cls_rgw_gc_init_queue(librados::ObjectWriteOperation& op, string& queue_name, uint64_t& size, uint64_t& num_urgent_data_entries);
-int cls_rgw_gc_get_queue_size(librados::IoCtx& io_ctx, string& oid, uint64_t& size);
-void cls_rgw_gc_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info);
-int cls_rgw_gc_list_queue(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only,
- list<cls_rgw_gc_obj_info>& entries, bool *truncated, string& next_marker);
-void cls_rgw_gc_remove_entries_queue(librados::ObjectWriteOperation& op, uint32_t num_entries);
-void cls_rgw_gc_defer_entry_queue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info);
+void cls_queue_init(librados::ObjectWriteOperation& op, const string& queue_name, uint64_t size);
+int cls_queue_get_capacity(librados::IoCtx& io_ctx, const string& oid, uint64_t& size);
+void cls_queue_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, vector<bufferlist> bl_data_vec);
+int cls_queue_list_entries(librados::IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max,
+ vector<cls_queue_entry>& entries, bool *truncated, string& next_marker);
+void cls_queue_remove_entries(librados::ObjectWriteOperation& op, const string& end_marker);
#endif
\ No newline at end of file
#define CEPH_CLS_QUEUE_CONSTS_H
#define QUEUE_CLASS "queue"
-#define RGW_QUEUE_CLASS "rgw_queue"
-#define INIT_QUEUE "init_queue"
-#define GET_QUEUE_SIZE "get_queue_size"
-#define ENQUEUE "enqueue"
+#define QUEUE_INIT "queue_init"
+#define QUEUE_GET_CAPACITY "queue_get_capacity"
+#define QUEUE_ENQUEUE "queue_enqueue"
#define QUEUE_LIST_ENTRIES "queue_list_entries"
#define QUEUE_REMOVE_ENTRIES "queue_remove_entries"
-#define QUEUE_READ_URGENT_DATA "queue_read_urgent_data"
-#define QUEUE_WRITE_URGENT_DATA "queue_write_urgent_data"
-#define QUEUE_CAN_URGENT_DATA_FIT "queue_can_urgent_data_fit"
-
-#define GC_INIT_QUEUE "gc_init_queue"
-#define GC_ENQUEUE "gc_enqueue"
-#define GC_DEQUEUE "gc_dequeue"
-#define GC_QUEUE_LIST_ENTRIES "gc_queue_list_entries"
-#define GC_QUEUE_REMOVE_ENTRIES "gc_queue_remove_entries"
-#define GC_QUEUE_UPDATE_ENTRY "gc_queue_update_entry"
#endif
\ No newline at end of file
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-#include "cls/queue/cls_queue_ops.h"
-
-#include "common/Formatter.h"
-#include "common/ceph_json.h"
-#include "include/utime.h"
struct cls_queue_list_ret {
bool is_truncated;
string next_marker;
- vector<bufferlist> data;
- vector<string> markers;
+ vector<cls_queue_entry> entries;
cls_queue_list_ret() {}
ENCODE_START(1, 1, bl);
encode(is_truncated, bl);
encode(next_marker, bl);
- encode(data, bl);
- encode(markers, bl);
+ encode(entries, bl);
ENCODE_FINISH(bl);
}
DECODE_START(1, bl);
decode(is_truncated, bl);
decode(next_marker, bl);
- decode(data, bl);
- decode(markers, bl);
+ decode(entries, bl);
DECODE_FINISH(bl);
}
};
};
WRITE_CLASS_ENCODER(cls_queue_remove_op)
-struct cls_queue_get_size_ret {
- uint64_t queue_size;
+struct cls_queue_get_capacity_ret {
+ uint64_t queue_capacity;
- cls_queue_get_size_ret() {}
+ cls_queue_get_capacity_ret() {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- encode(queue_size, bl);
+ encode(queue_capacity, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
- decode(queue_size, bl);
+ decode(queue_capacity, bl);
DECODE_FINISH(bl);
}
};
-WRITE_CLASS_ENCODER(cls_queue_get_size_ret)
+WRITE_CLASS_ENCODER(cls_queue_get_capacity_ret)
#endif /* CEPH_CLS_QUEUE_OPS_H */
\ No newline at end of file
#include "cls/queue/cls_queue_const.h"
#include "cls/queue/cls_queue_src.h"
-int write_queue_head(cls_method_context_t hctx, cls_queue_head& head)
+int queue_write_head(cls_method_context_t hctx, cls_queue_head& head)
{
bufferlist bl;
uint16_t entry_start = QUEUE_HEAD_START;
int ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
if (ret < 0) {
- CLS_LOG(5, "ERROR: write_queue_head: failed to write head \n");
+ CLS_LOG(5, "ERROR: queue_write_head: failed to write head \n");
return ret;
}
return 0;
}
-int get_queue_head(cls_method_context_t hctx, cls_queue_head& head)
+int queue_read_head(cls_method_context_t hctx, cls_queue_head& head)
{
uint64_t chunk_size = 1024, start_offset = 0;
bufferlist bl_head;
int ret = cls_cxx_read(hctx, start_offset, chunk_size, &bl_head);
if (ret < 0) {
- CLS_LOG(5, "ERROR: get_queue_head: failed to read head \n");
+ CLS_LOG(5, "ERROR: queue_read_head: failed to read head \n");
return ret;
}
try {
decode(queue_head_start, it);
} catch (buffer::error& err) {
- CLS_LOG(0, "ERROR: get_queue_head: failed to decode queue start \n");
+ CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start \n");
return -EINVAL;
}
if (queue_head_start != QUEUE_HEAD_START) {
- CLS_LOG(0, "ERROR: get_queue_head: invalid queue start \n");
+ CLS_LOG(0, "ERROR: queue_read_head: invalid queue start \n");
return -EINVAL;
}
try {
decode(encoded_len, it);
} catch (buffer::error& err) {
- CLS_LOG(0, "ERROR: get_queue_head: failed to decode encoded head size \n");
+ CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size \n");
return -EINVAL;
}
bufferlist bl_remaining_head;
int ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
- CLS_LOG(5, "ERROR: get_queue_head: failed to read remaining part of head \n");
+ CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head \n");
return ret;
}
bl_head.claim_append(bl_remaining_head);
try {
decode(head, it);
} catch (buffer::error& err) {
- CLS_LOG(0, "ERROR: get_queue_head: failed to decode head\n");
+ CLS_LOG(0, "ERROR: queue_read_head: failed to decode head\n");
return -EINVAL;
}
return 0;
}
-int init_queue(cls_method_context_t hctx, const cls_queue_init_op& op)
+int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op)
{
//get head and its size
cls_queue_head head;
- int ret = get_queue_head(hctx, head);
+ int ret = queue_read_head(hctx, head);
//head is already initialized
if (ret == 0) {
CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head.front.to_str().c_str());
CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head.max_urgent_data_size);
- return write_queue_head(hctx, head);
+ return queue_write_head(hctx, head);
}
-int get_queue_size(cls_method_context_t hctx, cls_queue_get_size_ret& op_ret)
+int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op_ret)
{
//get head
cls_queue_head head;
- int ret = get_queue_head(hctx, head);
+ int ret = queue_read_head(hctx, head);
if (ret < 0) {
return ret;
}
- op_ret.queue_size = head.queue_size - head.max_head_size;
+ op_ret.queue_capacity = head.queue_size - head.max_head_size;
- CLS_LOG(20, "INFO: get_queue_size: size of queue is %lu\n", op_ret.queue_size);
+ CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu\n", op_ret.queue_capacity);
return 0;
}
-int enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head)
+int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head)
{
if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) {
CLS_LOG(0, "ERROR: No space left in queue\n");
encode(data_size, bl);
bl.claim_append(bl_data);
- CLS_LOG(10, "INFO: enqueue(): Total size to be written is %u and data size is %u\n", bl.length(), bl_data.length());
+ CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %u\n", bl.length(), bl_data.length());
if (head.tail.offset >= head.front.offset) {
// check if data can fit in the remaining space in queue
if ((head.tail.offset + bl.length()) <= head.queue_size) {
- CLS_LOG(5, "INFO: enqueue: Writing data size and data: offset: %s, size: %u\n", head.tail.to_str().c_str(), bl.length());
+ CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u\n", head.tail.to_str().c_str(), bl.length());
//write data size and data at tail offset
auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
bufferlist bl_data_before_wrap;
bl.splice(0, size_before_wrap, &bl_data_before_wrap);
//write spliced (data size and data) at tail offset
- CLS_LOG(5, "INFO: enqueue: Writing spliced data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl_data_before_wrap.length());
+ CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl_data_before_wrap.length());
auto ret = cls_cxx_write2(hctx, head.tail.offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
return ret;
head.tail.offset = head.max_head_size;
head.tail.gen += 1;
//write remaining data at tail offset after wrapping around
- CLS_LOG(5, "INFO: enqueue: Writing remaining data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl.length());
+ CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl.length());
ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
return ret;
}
} else if (head.front.offset > head.tail.offset) {
if ((head.tail.offset + bl.length()) <= head.front.offset) {
- CLS_LOG(5, "INFO: enqueue: Writing data size and data: offset: %s, size: %u\n\n", head.tail.to_str().c_str(), bl.length());
+ CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u\n\n", head.tail.to_str().c_str(), bl.length());
//write data size and data at tail offset
auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
head.tail.offset = head.max_head_size;
head.tail.gen += 1;
}
- CLS_LOG(20, "INFO: enqueue: New tail offset: %s \n", head.tail.to_str().c_str());
+ CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s \n", head.tail.to_str().c_str());
} //end - for
return 0;
uint64_t size_to_process = bl_chunk.length();
do {
CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu\n", index, size_to_process);
+ cls_queue_entry entry;
it.seek(index);
//Populate offset if not done in previous iteration
if (! offset_populated) {
cls_queue_marker marker = {start_offset + index, gen};
CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker.to_str().c_str());
- string opaque_marker = marker.to_str();
- op_ret.markers.emplace_back(opaque_marker);
+ entry.marker = marker.to_str();
}
// Magic number + Data size - process if not done in previous iteration
if (! entry_start_processed ) {
}
// Data
if (data_size <= size_to_process) {
- bufferlist bl_data;
- bl_chunk.copy(index, data_size, bl_data);
- //Return data here
- op_ret.data.emplace_back(bl_data);
- index += bl_data.length();
- size_to_process -= bl_data.length();
+ bl_chunk.copy(index, data_size, entry.data);
+ index += entry.data.length();
+ size_to_process -= entry.data.length();
} else {
bl_chunk.copy(index, size_to_process, bl);
offset_populated = true;
CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!\n");
break;
}
+ op_ret.entries.emplace_back(entry);
// Resetting some values
offset_populated = false;
entry_start_processed = false;
#ifndef CEPH_CLS_QUEUE_SRC_H
#define CEPH_CLS_QUEUE_SRC_H
-int write_queue_head(cls_method_context_t hctx, cls_queue_head& head);
-int get_queue_head(cls_method_context_t hctx, cls_queue_head& head);
-int init_queue(cls_method_context_t hctx, const cls_queue_init_op& op);
-int get_queue_size(cls_method_context_t hctx, cls_queue_get_size_ret& op_ret);
-int enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head);
+int queue_write_head(cls_method_context_t hctx, cls_queue_head& head);
+int queue_read_head(cls_method_context_t hctx, cls_queue_head& head);
+int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op);
+int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op_ret);
+int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head);
int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head);
int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head);
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "cls/queue/cls_queue_types.h"
-#include "common/ceph_json.h"
-#include "include/utime.h"
-
#include <errno.h>
#include "include/types.h"
-#include "common/ceph_time.h"
-#include "common/Formatter.h"
#define QUEUE_HEAD_SIZE_1K 1024
//Actual start offset of queue data
constexpr unsigned int QUEUE_HEAD_START = 0xDEAD;
constexpr unsigned int QUEUE_ENTRY_START = 0xBEEF;
+struct cls_queue_entry
+{
+ bufferlist data;
+ std::string marker;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(data, bl);
+ encode(marker, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(data, bl);
+ decode(marker, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_queue_entry)
+
struct cls_queue_marker
{
uint64_t offset{0};
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "include/types.h"
-
-#include <errno.h>
-
-#include "objclass/objclass.h"
-#include "cls/rgw/cls_rgw_ops.h"
-#include "cls/rgw/cls_rgw_types.h"
-#include "cls/queue/cls_queue_types.h"
-#include "cls/queue/cls_rgw_queue_types.h"
-#include "cls/queue/cls_queue_ops.h"
-#include "cls/queue/cls_rgw_queue_ops.h"
-#include "cls/queue/cls_queue_const.h"
-#include "cls/queue/cls_queue_src.h"
-
-#include <boost/lexical_cast.hpp>
-#include <unordered_map>
-#include <sstream>
-
-#include "common/ceph_context.h"
-#include "global/global_context.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_rgw
-
-#define GC_LIST_DEFAULT_MAX 128
-
-CLS_VER(1,0)
-CLS_NAME(rgw_queue)
-
-static int cls_gc_init_queue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- auto in_iter = in->cbegin();
-
- cls_gc_init_queue_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_init_queue: failed to decode entry\n");
- return -EINVAL;
- }
-
- cls_gc_urgent_data urgent_data;
- urgent_data.num_urgent_data_entries = op.num_urgent_data_entries;
-
- cls_queue_init_op init_op;
-
- CLS_LOG(10, "INFO: cls_gc_init_queue: queue size is %lu\n", op.size);
- CLS_LOG(10, "INFO: cls_gc_init_queue: queue name is %s\n", op.name.c_str());
- init_op.head.queue_size = op.size;
- init_op.head_size = g_ceph_context->_conf->rgw_gc_queue_head_size;
- init_op.has_urgent_data = true;
- encode(urgent_data, init_op.head.bl_urgent_data);
-
- return init_queue(hctx, init_op);
-}
-
-static int cls_gc_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- auto in_iter = in->cbegin();
- cls_rgw_gc_set_entry_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_enqueue: failed to decode entry\n");
- return -EINVAL;
- }
-
- op.info.time = ceph::real_clock::now();
- op.info.time += make_timespan(op.expiration_secs);
-
- //get head
- cls_queue_head head;
- int ret = get_queue_head(hctx, head);
- if (ret < 0) {
- return ret;
- }
-
- cls_queue_enqueue_op enqueue_op;
- bufferlist bl_data;
- encode(op.info, bl_data);
- enqueue_op.bl_data_vec.emplace_back(bl_data);
-
- CLS_LOG(1, "INFO: cls_gc_enqueue: Data size is: %u \n", bl_data.length());
-
- ret = enqueue(hctx, enqueue_op, head);
- if (ret < 0) {
- return ret;
- }
-
- //Write back head
- return write_queue_head(hctx, head);
-}
-
-static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- CLS_LOG(1, "INFO: cls_gc_queue_list(): Entered cls_gc_queue_list \n");
- auto in_iter = in->cbegin();
- cls_rgw_gc_list_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode input\n");
- return -EINVAL;
- }
-
- cls_queue_head head;
- auto ret = get_queue_head(hctx, head);
- if (ret < 0) {
- return ret;
- }
-
- cls_gc_urgent_data urgent_data;
- if (head.bl_urgent_data.length() > 0) {
- auto iter_urgent_data = head.bl_urgent_data.cbegin();
- try {
- decode(urgent_data, iter_urgent_data);
- } catch (buffer::error& err) {
- CLS_LOG(5, "ERROR: cls_gc_queue_list(): failed to decode urgent data\n");
- return -EINVAL;
- }
- }
-
- cls_queue_list_op list_op;
- if (! op.max) {
- op.max = GC_LIST_DEFAULT_MAX;
- }
-
- list_op.max = op.max;
- list_op.start_marker = op.marker;
-
- cls_rgw_gc_list_ret list_ret;
- uint32_t num_entries = 0; //Entries excluding the deferred ones
- bool is_truncated = true;
- string next_marker;
- do {
- CLS_LOG(1, "INFO: cls_gc_queue_list(): Entering queue_list_entries \n");
- cls_queue_list_ret op_ret;
- int ret = queue_list_entries(hctx, list_op, op_ret, head);
- if (ret < 0) {
- CLS_LOG(1, "ERROR: queue_list_entries(): returned error %d\n", ret);
- return ret;
- }
- is_truncated = op_ret.is_truncated;
- next_marker = op_ret.next_marker;
-
- if (op_ret.data.size()) {
- for (auto it : op_ret.data) {
- cls_rgw_gc_obj_info info;
- try {
- decode(info, it);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode gc info\n");
- return -EINVAL;
- }
- bool found = false;
- //Check for info tag in urgent data map
- auto iter = urgent_data.urgent_data_map.find(info.tag);
- if (iter != urgent_data.urgent_data_map.end()) {
- found = true;
- stringstream ss1, ss2;
- ss1 << iter->second;
- ss2 << info.time;
- if (iter->second > info.time) {
- CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in urgent data: %s\n", info.tag.c_str());
- CLS_LOG(1, "INFO: cls_gc_queue_list(): time found in urgent data: %s\n", ss1.str().c_str());
- CLS_LOG(1, "INFO: cls_gc_queue_list(): time found in queue data: %s\n", ss2.str().c_str());
- continue;
- }
- }
- //Search in xattrs
- if (! found && urgent_data.num_xattr_urgent_entries > 0) {
- bufferlist bl_xattrs;
- int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
- if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
- CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
- return ret;
- }
- if (ret != -ENOENT && ret != -ENODATA) {
- std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
- auto iter = bl_xattrs.cbegin();
- try {
- decode(xattr_urgent_data_map, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_list(): failed to decode xattrs urgent data map\n");
- return -EINVAL;
- } //end - catch
- auto xattr_iter = xattr_urgent_data_map.find(info.tag);
- if (xattr_iter != xattr_urgent_data_map.end()) {
- if (xattr_iter->second > info.time) {
- CLS_LOG(1, "INFO: cls_gc_queue_list(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
- continue;
- }
- }
- } // end - ret != ENOENT && ENODATA
- } // end - if not found
- if (op.expired_only) {
- real_time now = ceph::real_clock::now();
- if (info.time <= now) {
- list_ret.entries.emplace_back(info);
- }
- //Can break out here if info.time > now, since all subsequent entries won't have expired
- } else {
- list_ret.entries.emplace_back(info);
- }
- num_entries++;
- }
- CLS_LOG(1, "INFO: cls_gc_queue_list(): num_entries: %u and op.max: %u\n", num_entries, op.max);
- if (num_entries < op.max) {
- list_op.max = (op.max - num_entries);
- list_op.start_marker = op_ret.next_marker;
- out->clear();
- } else {
- //We've reached the max number of entries needed
- break;
- }
- } else {
- //We dont have data to process
- break;
- }
- } while(is_truncated);
-
- list_ret.truncated = is_truncated;
- if (list_ret.truncated) {
- list_ret.next_marker = next_marker;
- }
- out->clear();
- encode(list_ret, *out);
- return 0;
-}
-
-static int cls_gc_queue_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entered cls_gc_queue_remove \n");
-
- auto in_iter = in->cbegin();
-
- cls_rgw_gc_queue_remove_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode input\n");
- return -EINVAL;
- }
-
- cls_queue_head head;
- auto ret = get_queue_head(hctx, head);
- if (ret < 0) {
- return ret;
- }
-
- cls_gc_urgent_data urgent_data;
- if (head.bl_urgent_data.length() > 0) {
- auto iter_urgent_data = head.bl_urgent_data.cbegin();
- try {
- decode(urgent_data, iter_urgent_data);
- } catch (buffer::error& err) {
- CLS_LOG(5, "ERROR: cls_gc_queue_remove(): failed to decode urgent data\n");
- return -EINVAL;
- }
- }
-
- // List entries and calculate total number of entries (including invalid entries)
- if (! op.num_entries) {
- op.num_entries = GC_LIST_DEFAULT_MAX;
- }
- cls_queue_list_op list_op;
- list_op.max = op.num_entries + 1; // +1 to get the offset of last + 1 entry
- bool is_truncated = true;
- uint32_t total_num_entries = 0, num_entries = 0;
- string end_marker;
- do {
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering queue_list_entries \n");
- cls_queue_list_ret op_ret;
- int ret = queue_list_entries(hctx, list_op, op_ret, head);
- if (ret < 0) {
- CLS_LOG(1, "ERROR: queue_list_entries(): returned error %d\n", ret);
- return ret;
- }
-
- is_truncated = op_ret.is_truncated;
- unsigned int index = 0;
- // If data is not empty
- if (op_ret.data.size()) {
- for (auto it : op_ret.data) {
- cls_rgw_gc_obj_info info;
- try {
- decode(info, it);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode gc info\n");
- return -EINVAL;
- }
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): entry: %s\n", info.tag.c_str());
- total_num_entries++;
- index++;
- bool found = false;
- //Search for tag in urgent data map
- auto iter = urgent_data.urgent_data_map.find(info.tag);
- if (iter != urgent_data.urgent_data_map.end()) {
- found = true;
- if (iter->second > info.time) {
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in urgent data: %s\n", info.tag.c_str());
- continue;
- } else if (iter->second == info.time) {
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from urgent data: %s\n", info.tag.c_str());
- urgent_data.urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later from queue
- urgent_data.num_head_urgent_entries -= 1;
- }
- }//end-if map end
- if (! found && urgent_data.num_xattr_urgent_entries > 0) {
- //Search in xattrs
- bufferlist bl_xattrs;
- int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
- if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
- CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
- return ret;
- }
- if (ret != -ENOENT && ret != -ENODATA) {
- std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
- auto iter = bl_xattrs.cbegin();
- try {
- decode(xattr_urgent_data_map, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode xattrs urgent data map\n");
- return -EINVAL;
- } //end - catch
- auto xattr_iter = xattr_urgent_data_map.find(info.tag);
- if (xattr_iter != xattr_urgent_data_map.end()) {
- if (xattr_iter->second > info.time) {
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
- continue;
- } else if (xattr_iter->second == info.time) {
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): erasing tag from xattrs urgent data: %s\n", info.tag.c_str());
- xattr_urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later
- urgent_data.num_xattr_urgent_entries -= 1;
- }
- }
- } // end - ret != ENOENT && ENODATA
- }// search in xattrs
- num_entries++;
- }//end-for
-
- if (! op_ret.is_truncated && num_entries < (op.num_entries + 1)) {
- end_marker = op_ret.next_marker;
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): truncated and end offset is %s\n", end_marker.c_str());
- break;
- }
- if (num_entries < (op.num_entries + 1)) {
- list_op.max = ((op.num_entries + 1) - num_entries);
- list_op.start_marker = op_ret.next_marker;
- out->clear();
- } else {
- end_marker = op_ret.markers[index - 1];
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): index is %u and end_offset is: %s\n", index, end_marker.c_str());
- break;
- }
- } //end-if
- else {
- break;
- }
- } while(is_truncated);
-
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): Total number of entries to remove: %d\n", total_num_entries);
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): End offset is %s\n", end_marker.c_str());
-
- if (! end_marker.empty()) {
- cls_queue_remove_op rem_op;
- rem_op.end_marker = end_marker;
-
- CLS_LOG(1, "INFO: cls_gc_queue_remove(): Entering queue_remove_entries \n");
- int ret = queue_remove_entries(hctx, rem_op, head);
- if (ret < 0) {
- CLS_LOG(1, "ERROR: queue_remove_entries(): returned error %d\n", ret);
- return ret;
- }
- }
-
- //Update urgent data map
- encode(urgent_data, head.bl_urgent_data);
-
- return write_queue_head(hctx, head);
-}
-
-static int cls_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- int ret = 0;
- auto in_iter = in->cbegin();
-
- cls_gc_defer_entry_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode input\n");
- return -EINVAL;
- }
-
- op.info.time = ceph::real_clock::now();
- op.info.time += make_timespan(op.expiration_secs);
-
- // Read head
- cls_queue_head head;
- ret = get_queue_head(hctx, head);
- if (ret < 0) {
- return ret;
- }
-
- auto bl_iter = head.bl_urgent_data.cbegin();
- cls_gc_urgent_data urgent_data;
- try {
- decode(urgent_data, bl_iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode urgent data\n");
- return -EINVAL;
- }
-
- //has_urgent_data signifies whether urgent data in queue has changed
- bool has_urgent_data = false, tag_found = false;
- //search in unordered map in head
- auto it = urgent_data.urgent_data_map.find(op.info.tag);
- if (it != urgent_data.urgent_data_map.end()) {
- it->second = op.info.time;
- tag_found = true;
- has_urgent_data = true;
- } else { //search in xattrs
- bufferlist bl_xattrs;
- int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
- if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
- CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
- return ret;
- }
- if (ret != -ENOENT && ret != -ENODATA) {
- std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
- auto iter = bl_xattrs.cbegin();
- try {
- decode(xattr_urgent_data_map, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_update_entry(): failed to decode xattrs urgent data map\n");
- return -EINVAL;
- } //end - catch
- auto xattr_iter = xattr_urgent_data_map.find(op.info.tag);
- if (xattr_iter != xattr_urgent_data_map.end()) {
- it->second = op.info.time;
- tag_found = true;
- //write the updated map back
- bufferlist bl_map;
- encode(xattr_urgent_data_map, bl_map);
- ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map);
- CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data");
- if (ret < 0) {
- CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret);
- return ret;
- }
- }
- }// end ret != ENOENT ...
- }
-
- if (! tag_found) {
- stringstream ss1;
- ss1 << op.info.time;
- CLS_LOG(1, "INFO: cls_gc_queue_update_entry(): time inserted in urgent data: %s\n", ss1.str().c_str());
- //try inserting in queue head
- urgent_data.urgent_data_map.insert({op.info.tag, op.info.time});
- urgent_data.num_head_urgent_entries += 1;
- has_urgent_data = true;
-
- bufferlist bl_urgent_data;
- encode(urgent_data, bl_urgent_data);
- //insert as xattrs
- if (bl_urgent_data.length() > head.max_urgent_data_size) {
- //remove inserted entry from urgent data
- urgent_data.urgent_data_map.erase(op.info.tag);
- urgent_data.num_head_urgent_entries -= 1;
- has_urgent_data = false;
-
- bufferlist bl_xattrs;
- int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
- if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
- CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
- return ret;
- }
- std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
- if (ret != -ENOENT && ret != -ENODATA) {
- auto iter = bl_xattrs.cbegin();
- try {
- decode(xattr_urgent_data_map, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_queue_remove(): failed to decode xattrs urgent data map\n");
- return -EINVAL;
- } //end - catch
- }
- xattr_urgent_data_map.insert({op.info.tag, op.info.time});
- urgent_data.num_xattr_urgent_entries += 1;
- has_urgent_data = true;
- bufferlist bl_map;
- encode(xattr_urgent_data_map, bl_map);
- ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map);
- CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data");
- if (ret < 0) {
- CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret);
- return ret;
- }
- }
- }
-
- if ((urgent_data.num_head_urgent_entries + urgent_data.num_xattr_urgent_entries) > urgent_data.num_urgent_data_entries) {
- CLS_LOG(0, "Total num entries %u", urgent_data.num_urgent_data_entries);
- CLS_LOG(0, "Num xattr entries %u", urgent_data.num_xattr_urgent_entries);
- CLS_LOG(0, "Num head entries %u", urgent_data.num_head_urgent_entries);
- CLS_LOG(0, "ERROR: Number of urgent data entries exceeded that requested by user, returning no space!");
- return -ENOSPC;
- }
-
- cls_queue_enqueue_op enqueue_op;
- bufferlist bl_data;
- stringstream ss1;
- ss1 << op.info.time;
- CLS_LOG(1, "INFO: cls_gc_queue_update_entry(): time inserted in queue: %s\n", ss1.str().c_str());
- encode(op.info, bl_data);
- enqueue_op.bl_data_vec.emplace_back(bl_data);
- CLS_LOG(1, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data.length());
-
- ret = enqueue(hctx, enqueue_op, head);
- if (ret < 0) {
- return ret;
- }
-
- if (has_urgent_data) {
- head.bl_urgent_data.clear();
- encode(urgent_data, head.bl_urgent_data);
- }
-
- return write_queue_head(hctx, head);
-}
-
-CLS_INIT(rgw_queue)
-{
- CLS_LOG(1, "Loaded rgw queue class!");
-
- cls_handle_t h_class;
- cls_method_handle_t h_gc_init_queue;
- cls_method_handle_t h_gc_enqueue;
- cls_method_handle_t h_gc_queue_list_entries;
- cls_method_handle_t h_gc_queue_remove_entries;
- cls_method_handle_t h_gc_queue_update_entry;
-
- cls_register(RGW_QUEUE_CLASS, &h_class);
-
- /* gc */
- cls_register_cxx_method(h_class, GC_INIT_QUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_init_queue, &h_gc_init_queue);
- cls_register_cxx_method(h_class, GC_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_enqueue, &h_gc_enqueue);
- cls_register_cxx_method(h_class, GC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_gc_queue_list, &h_gc_queue_list_entries);
- cls_register_cxx_method(h_class, GC_QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_queue_remove, &h_gc_queue_remove_entries);
- cls_register_cxx_method(h_class, GC_QUEUE_UPDATE_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_queue_update_entry, &h_gc_queue_update_entry);
-
- return;
-}
-
+++ /dev/null
-#ifndef CEPH_CLS_RGW_QUEUE_OPS_H
-#define CEPH_CLS_RGW_QUEUE_OPS_H
-
-#include "cls/rgw/cls_rgw_types.h"
-#include "cls/rgw/cls_rgw_ops.h"
-
-struct cls_gc_init_queue_op {
- uint64_t size;
- uint64_t num_urgent_data_entries{0};
- string name; //for debugging, to be removed later
-
- cls_gc_init_queue_op() {}
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(size, bl);
- encode(num_urgent_data_entries, bl);
- encode(name, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(size, bl);
- decode(num_urgent_data_entries, bl);
- decode(name, bl);
- DECODE_FINISH(bl);
- }
-
-};
-WRITE_CLASS_ENCODER(cls_gc_init_queue_op)
-
-struct cls_rgw_gc_queue_remove_op {
- uint64_t num_entries;
-
- cls_rgw_gc_queue_remove_op() {}
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(num_entries, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(num_entries, bl);
- DECODE_FINISH(bl);
- }
-};
-WRITE_CLASS_ENCODER(cls_rgw_gc_queue_remove_op)
-
-struct cls_gc_defer_entry_op {
- uint32_t expiration_secs;
- cls_rgw_gc_obj_info info;
- cls_gc_defer_entry_op() : expiration_secs(0) {}
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(expiration_secs, bl);
- encode(info, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(expiration_secs, bl);
- decode(info, bl);
- DECODE_FINISH(bl);
- }
-};
-WRITE_CLASS_ENCODER(cls_gc_defer_entry_op)
-#endif /* CEPH_CLS_RGW_QUEUE_OPS_H */
\ No newline at end of file
+++ /dev/null
-#ifndef CEPH_CLS_RGW_QUEUE_TYPES_H
-#define CEPH_CLS_RGW_QUEUE_TYPES_H
-
-#include "include/types.h"
-#include "common/ceph_time.h"
-#include "common/Formatter.h"
-
-#include <unordered_map>
-
-struct cls_gc_urgent_data
-{
- std::unordered_map<string,ceph::real_time> urgent_data_map;
- uint32_t num_urgent_data_entries{0}; // requested by user
- uint32_t num_head_urgent_entries{0}; // actual number of entries in queue head
- uint32_t num_xattr_urgent_entries{0}; // actual number of entries in xattr in case of spill over
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(urgent_data_map, bl);
- encode(num_urgent_data_entries, bl);
- encode(num_head_urgent_entries, bl);
- encode(num_xattr_urgent_entries, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(urgent_data_map, bl);
- decode(num_urgent_data_entries, bl);
- decode(num_head_urgent_entries, bl);
- decode(num_xattr_urgent_entries, bl);
- DECODE_FINISH(bl);
- }
-};
-WRITE_CLASS_ENCODER(cls_gc_urgent_data)
-
-#endif
\ No newline at end of file
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/types.h"
+
+#include <errno.h>
+
+#include "objclass/objclass.h"
+#include "cls/rgw/cls_rgw_ops.h"
+#include "cls/rgw/cls_rgw_types.h"
+#include "cls/rgw_gc/cls_rgw_gc_types.h"
+#include "cls/rgw_gc/cls_rgw_gc_ops.h"
+#include "cls/queue/cls_queue_ops.h"
+#include "cls/rgw_gc/cls_rgw_gc_const.h"
+#include "cls/queue/cls_queue_src.h"
+
+#include "common/ceph_context.h"
+#include "global/global_context.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+#define GC_LIST_DEFAULT_MAX 128
+
+CLS_VER(1,0)
+CLS_NAME(rgw_gc)
+
+static int cls_rgw_gc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+
+ cls_rgw_gc_queue_init_op op;
+ try {
+ decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_rgw_gc_queue_init: failed to decode entry\n");
+ return -EINVAL;
+ }
+
+ cls_rgw_gc_urgent_data urgent_data;
+ urgent_data.num_urgent_data_entries = op.num_urgent_data_entries;
+
+ cls_queue_init_op init_op;
+
+ CLS_LOG(10, "INFO: cls_rgw_gc_queue_init: queue size is %lu\n", op.size);
+ init_op.head.queue_size = op.size;
+ init_op.head_size = g_ceph_context->_conf->rgw_gc_queue_head_size;
+ init_op.has_urgent_data = true;
+ encode(urgent_data, init_op.head.bl_urgent_data);
+
+ return queue_init(hctx, init_op);
+}
+
+static int cls_rgw_gc_queue_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+ cls_rgw_gc_set_entry_op op;
+ try {
+ decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_rgw_gc_queue_enqueue: failed to decode entry\n");
+ return -EINVAL;
+ }
+
+ op.info.time = ceph::real_clock::now();
+ op.info.time += make_timespan(op.expiration_secs);
+
+ //get head
+ cls_queue_head head;
+ int ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_queue_enqueue_op enqueue_op;
+ bufferlist bl_data;
+ encode(op.info, bl_data);
+ enqueue_op.bl_data_vec.emplace_back(bl_data);
+
+ CLS_LOG(20, "INFO: cls_rgw_gc_queue_enqueue: Data size is: %u \n", bl_data.length());
+
+ ret = queue_enqueue(hctx, enqueue_op, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ //Write back head
+ return queue_write_head(hctx, head);
+}
+
+static int cls_rgw_gc_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+ cls_rgw_gc_list_op op;
+ try {
+ decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode input\n");
+ return -EINVAL;
+ }
+
+ cls_queue_head head;
+ auto ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_rgw_gc_urgent_data urgent_data;
+ if (head.bl_urgent_data.length() > 0) {
+ auto iter_urgent_data = head.bl_urgent_data.cbegin();
+ try {
+ decode(urgent_data, iter_urgent_data);
+ } catch (buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode urgent data\n");
+ return -EINVAL;
+ }
+ }
+
+ cls_queue_list_op list_op;
+ if (! op.max) {
+ op.max = GC_LIST_DEFAULT_MAX;
+ }
+
+ list_op.max = op.max;
+ list_op.start_marker = op.marker;
+
+ cls_rgw_gc_list_ret list_ret;
+ uint32_t num_entries = 0; //Entries excluding the deferred ones
+ bool is_truncated = true;
+ string next_marker;
+ do {
+ cls_queue_list_ret op_ret;
+ int ret = queue_list_entries(hctx, list_op, op_ret, head);
+ if (ret < 0) {
+ CLS_LOG(5, "ERROR: queue_list_entries(): returned error %d\n", ret);
+ return ret;
+ }
+ is_truncated = op_ret.is_truncated;
+ next_marker = op_ret.next_marker;
+
+ if (op_ret.entries.size()) {
+ for (auto it : op_ret.entries) {
+ cls_rgw_gc_obj_info info;
+ try {
+ decode(info, it.data);
+ } catch (buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode gc info\n");
+ return -EINVAL;
+ }
+ bool found = false;
+ //Check for info tag in urgent data map
+ auto iter = urgent_data.urgent_data_map.find(info.tag);
+ if (iter != urgent_data.urgent_data_map.end()) {
+ found = true;
+ if (iter->second > info.time) {
+ CLS_LOG(10, "INFO: cls_rgw_gc_queue_list_entries(): tag found in urgent data: %s\n", info.tag.c_str());
+ continue;
+ }
+ }
+ //Search in xattrs
+ if (! found && urgent_data.num_xattr_urgent_entries > 0) {
+ bufferlist bl_xattrs;
+ int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
+ if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
+ CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
+ return ret;
+ }
+ if (ret != -ENOENT && ret != -ENODATA) {
+ std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
+ auto iter = bl_xattrs.cbegin();
+ try {
+ decode(xattr_urgent_data_map, iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode xattrs urgent data map\n");
+ return -EINVAL;
+ } //end - catch
+ auto xattr_iter = xattr_urgent_data_map.find(info.tag);
+ if (xattr_iter != xattr_urgent_data_map.end()) {
+ if (xattr_iter->second > info.time) {
+ CLS_LOG(1, "INFO: cls_rgw_gc_queue_list_entries(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
+ continue;
+ }
+ }
+ } // end - ret != ENOENT && ENODATA
+ } // end - if not found
+ if (op.expired_only) {
+ real_time now = ceph::real_clock::now();
+ if (info.time <= now) {
+ list_ret.entries.emplace_back(info);
+ }
+ //Can break out here if info.time > now, since all subsequent entries won't have expired
+ } else {
+ list_ret.entries.emplace_back(info);
+ }
+ num_entries++;
+ }
+ CLS_LOG(10, "INFO: cls_rgw_gc_queue_list_entries(): num_entries: %u and op.max: %u\n", num_entries, op.max);
+ if (num_entries < op.max) {
+ list_op.max = (op.max - num_entries);
+ list_op.start_marker = op_ret.next_marker;
+ out->clear();
+ } else {
+ //We've reached the max number of entries needed
+ break;
+ }
+ } else {
+ //We dont have data to process
+ break;
+ }
+ } while(is_truncated);
+
+ list_ret.truncated = is_truncated;
+ if (list_ret.truncated) {
+ list_ret.next_marker = next_marker;
+ }
+ out->clear();
+ encode(list_ret, *out);
+ return 0;
+}
+
+static int cls_rgw_gc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+
+ cls_rgw_gc_queue_remove_entries_op op;
+ try {
+ decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode input\n");
+ return -EINVAL;
+ }
+
+ cls_queue_head head;
+ auto ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_rgw_gc_urgent_data urgent_data;
+ if (head.bl_urgent_data.length() > 0) {
+ auto iter_urgent_data = head.bl_urgent_data.cbegin();
+ try {
+ decode(urgent_data, iter_urgent_data);
+ } catch (buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode urgent data\n");
+ return -EINVAL;
+ }
+ }
+
+ // List entries and calculate total number of entries (including invalid entries)
+ if (! op.num_entries) {
+ op.num_entries = GC_LIST_DEFAULT_MAX;
+ }
+ cls_queue_list_op list_op;
+ list_op.max = op.num_entries + 1; // +1 to get the offset of last + 1 entry
+ bool is_truncated = true;
+ uint32_t total_num_entries = 0, num_entries = 0;
+ string end_marker;
+ do {
+ cls_queue_list_ret op_ret;
+ int ret = queue_list_entries(hctx, list_op, op_ret, head);
+ if (ret < 0) {
+ CLS_LOG(5, "ERROR: queue_list_entries(): returned error %d\n", ret);
+ return ret;
+ }
+
+ is_truncated = op_ret.is_truncated;
+ unsigned int index = 0;
+ // If data is not empty
+ if (op_ret.entries.size()) {
+ for (auto it : op_ret.entries) {
+ cls_rgw_gc_obj_info info;
+ try {
+ decode(info, it.data);
+ } catch (buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode gc info\n");
+ return -EINVAL;
+ }
+ CLS_LOG(20, "INFO: cls_rgw_gc_queue_remove_entries(): entry: %s\n", info.tag.c_str());
+ total_num_entries++;
+ index++;
+ bool found = false;
+ //Search for tag in urgent data map
+ auto iter = urgent_data.urgent_data_map.find(info.tag);
+ if (iter != urgent_data.urgent_data_map.end()) {
+ found = true;
+ if (iter->second > info.time) {
+ CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): tag found in urgent data: %s\n", info.tag.c_str());
+ continue;
+ } else if (iter->second == info.time) {
+ CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): erasing tag from urgent data: %s\n", info.tag.c_str());
+ urgent_data.urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later from queue
+ urgent_data.num_head_urgent_entries -= 1;
+ }
+ }//end-if map end
+ if (! found && urgent_data.num_xattr_urgent_entries > 0) {
+ //Search in xattrs
+ bufferlist bl_xattrs;
+ int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
+ if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
+ CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
+ return ret;
+ }
+ if (ret != -ENOENT && ret != -ENODATA) {
+ std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
+ auto iter = bl_xattrs.cbegin();
+ try {
+ decode(xattr_urgent_data_map, iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode xattrs urgent data map\n");
+ return -EINVAL;
+ } //end - catch
+ auto xattr_iter = xattr_urgent_data_map.find(info.tag);
+ if (xattr_iter != xattr_urgent_data_map.end()) {
+ if (xattr_iter->second > info.time) {
+ CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
+ continue;
+ } else if (xattr_iter->second == info.time) {
+ CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): erasing tag from xattrs urgent data: %s\n", info.tag.c_str());
+ xattr_urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later
+ urgent_data.num_xattr_urgent_entries -= 1;
+ }
+ }
+ } // end - ret != ENOENT && ENODATA
+ }// search in xattrs
+ num_entries++;
+ }//end-for
+
+ if (num_entries < (op.num_entries + 1)) {
+ if (! op_ret.is_truncated) {
+ end_marker = op_ret.next_marker;
+ CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): not truncated and end offset is %s\n", end_marker.c_str());
+ break;
+ } else {
+ list_op.max = ((op.num_entries + 1) - num_entries);
+ list_op.start_marker = op_ret.next_marker;
+ out->clear();
+ }
+ } else {
+ end_marker = op_ret.entries[index - 1].marker;
+ CLS_LOG(1, "INFO: cls_rgw_gc_queue_remove_entries(): index is %u and end_offset is: %s\n", index, end_marker.c_str());
+ break;
+ }
+ } //end-if
+ else {
+ break;
+ }
+ } while(is_truncated);
+
+ CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): Total number of entries to remove: %d\n", total_num_entries);
+ CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): End offset is %s\n", end_marker.c_str());
+
+ if (! end_marker.empty()) {
+ cls_queue_remove_op rem_op;
+ rem_op.end_marker = end_marker;
+ int ret = queue_remove_entries(hctx, rem_op, head);
+ if (ret < 0) {
+ CLS_LOG(5, "ERROR: queue_remove_entries(): returned error %d\n", ret);
+ return ret;
+ }
+ }
+
+ //Update urgent data map
+ encode(urgent_data, head.bl_urgent_data);
+
+ return queue_write_head(hctx, head);
+}
+
+static int cls_rgw_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ int ret = 0;
+ auto in_iter = in->cbegin();
+
+ cls_rgw_gc_queue_defer_entry_op op;
+ try {
+ decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_rgw_gc_queue_update_entry(): failed to decode input\n");
+ return -EINVAL;
+ }
+
+ op.info.time = ceph::real_clock::now();
+ op.info.time += make_timespan(op.expiration_secs);
+
+ // Read head
+ cls_queue_head head;
+ ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ auto bl_iter = head.bl_urgent_data.cbegin();
+ cls_rgw_gc_urgent_data urgent_data;
+ try {
+ decode(urgent_data, bl_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_rgw_gc_queue_update_entry(): failed to decode urgent data\n");
+ return -EINVAL;
+ }
+
+ //has_urgent_data signifies whether urgent data in queue has changed
+ bool has_urgent_data = false, tag_found = false;
+ //search in unordered map in head
+ auto it = urgent_data.urgent_data_map.find(op.info.tag);
+ if (it != urgent_data.urgent_data_map.end()) {
+ it->second = op.info.time;
+ tag_found = true;
+ has_urgent_data = true;
+ } else { //search in xattrs
+ bufferlist bl_xattrs;
+ int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
+ if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
+ CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
+ return ret;
+ }
+ if (ret != -ENOENT && ret != -ENODATA) {
+ std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
+ auto iter = bl_xattrs.cbegin();
+ try {
+ decode(xattr_urgent_data_map, iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_rgw_gc_queue_update_entry(): failed to decode xattrs urgent data map\n");
+ return -EINVAL;
+ } //end - catch
+ auto xattr_iter = xattr_urgent_data_map.find(op.info.tag);
+ if (xattr_iter != xattr_urgent_data_map.end()) {
+ it->second = op.info.time;
+ tag_found = true;
+ //write the updated map back
+ bufferlist bl_map;
+ encode(xattr_urgent_data_map, bl_map);
+ ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map);
+ CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data");
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret);
+ return ret;
+ }
+ }
+ }// end ret != ENOENT ...
+ }
+
+ if (! tag_found) {
+ //try inserting in queue head
+ urgent_data.urgent_data_map.insert({op.info.tag, op.info.time});
+ urgent_data.num_head_urgent_entries += 1;
+ has_urgent_data = true;
+
+ bufferlist bl_urgent_data;
+ encode(urgent_data, bl_urgent_data);
+ //insert as xattrs
+ if (bl_urgent_data.length() > head.max_urgent_data_size) {
+ //remove inserted entry from urgent data
+ urgent_data.urgent_data_map.erase(op.info.tag);
+ urgent_data.num_head_urgent_entries -= 1;
+ has_urgent_data = false;
+
+ bufferlist bl_xattrs;
+ int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
+ if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
+ CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
+ return ret;
+ }
+ std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
+ if (ret != -ENOENT && ret != -ENODATA) {
+ auto iter = bl_xattrs.cbegin();
+ try {
+ decode(xattr_urgent_data_map, iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode xattrs urgent data map\n");
+ return -EINVAL;
+ } //end - catch
+ }
+ xattr_urgent_data_map.insert({op.info.tag, op.info.time});
+ urgent_data.num_xattr_urgent_entries += 1;
+ has_urgent_data = true;
+ bufferlist bl_map;
+ encode(xattr_urgent_data_map, bl_map);
+ ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map);
+ CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data");
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret);
+ return ret;
+ }
+ }
+ }
+
+ if ((urgent_data.num_head_urgent_entries + urgent_data.num_xattr_urgent_entries) > urgent_data.num_urgent_data_entries) {
+ CLS_LOG(20, "Total num entries %u", urgent_data.num_urgent_data_entries);
+ CLS_LOG(20, "Num xattr entries %u", urgent_data.num_xattr_urgent_entries);
+ CLS_LOG(20, "Num head entries %u", urgent_data.num_head_urgent_entries);
+ CLS_LOG(0, "ERROR: Number of urgent data entries exceeded that requested by user, returning no space!");
+ return -ENOSPC;
+ }
+
+ cls_queue_enqueue_op enqueue_op;
+ bufferlist bl_data;
+ encode(op.info, bl_data);
+ enqueue_op.bl_data_vec.emplace_back(bl_data);
+ CLS_LOG(10, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data.length());
+
+ ret = queue_enqueue(hctx, enqueue_op, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (has_urgent_data) {
+ head.bl_urgent_data.clear();
+ encode(urgent_data, head.bl_urgent_data);
+ }
+
+ return queue_write_head(hctx, head);
+}
+
+CLS_INIT(rgw_gc)
+{
+ CLS_LOG(1, "Loaded rgw gc class!");
+
+ cls_handle_t h_class;
+ cls_method_handle_t h_rgw_gc_queue_init;
+ cls_method_handle_t h_rgw_gc_queue_enqueue;
+ cls_method_handle_t h_rgw_gc_queue_list_entries;
+ cls_method_handle_t h_rgw_gc_queue_remove_entries;
+ cls_method_handle_t h_rgw_gc_queue_update_entry;
+
+ cls_register(RGW_GC_CLASS, &h_class);
+
+ /* gc */
+ cls_register_cxx_method(h_class, RGW_GC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_rgw_gc_queue_init, &h_rgw_gc_queue_init);
+ cls_register_cxx_method(h_class, RGW_GC_QUEUE_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_rgw_gc_queue_enqueue, &h_rgw_gc_queue_enqueue);
+ cls_register_cxx_method(h_class, RGW_GC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_rgw_gc_queue_list_entries, &h_rgw_gc_queue_list_entries);
+ cls_register_cxx_method(h_class, RGW_GC_QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_rgw_gc_queue_remove_entries, &h_rgw_gc_queue_remove_entries);
+ cls_register_cxx_method(h_class, RGW_GC_QUEUE_UPDATE_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_rgw_gc_queue_update_entry, &h_rgw_gc_queue_update_entry);
+
+ return;
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#include <errno.h>
+
+#include "cls/rgw/cls_rgw_ops.h"
+#include "cls/rgw_gc/cls_rgw_gc_ops.h"
+#include "cls/queue/cls_queue_ops.h"
+#include "cls/rgw_gc/cls_rgw_gc_const.h"
+#include "cls/queue/cls_queue_const.h"
+#include "cls/rgw_gc/cls_rgw_gc_client.h"
+
+using namespace librados;
+
+void cls_rgw_gc_queue_init(ObjectWriteOperation& op, uint64_t size, uint64_t num_urgent_data_entries)
+{
+ bufferlist in;
+ cls_rgw_gc_queue_init_op call;
+ call.size = size;
+ call.num_urgent_data_entries = num_urgent_data_entries;
+ encode(call, in);
+ op.exec(RGW_GC_CLASS, RGW_GC_QUEUE_INIT, in);
+}
+
+int cls_rgw_gc_queue_get_capacity(IoCtx& io_ctx, const string& oid, uint64_t& size)
+{
+ bufferlist in, out;
+ int r = io_ctx.exec(oid, QUEUE_CLASS, QUEUE_GET_CAPACITY, in, out);
+ if (r < 0)
+ return r;
+
+ cls_queue_get_capacity_ret op_ret;
+ auto iter = out.cbegin();
+ try {
+ decode(op_ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ size = op_ret.queue_capacity;
+
+ return 0;
+}
+
+void cls_rgw_gc_queue_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info)
+{
+ bufferlist in;
+ cls_rgw_gc_set_entry_op call;
+ call.expiration_secs = expiration_secs;
+ call.info = info;
+ encode(call, in);
+ op.exec(RGW_GC_CLASS, RGW_GC_QUEUE_ENQUEUE, in);
+}
+
+int cls_rgw_gc_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max, bool expired_only,
+ list<cls_rgw_gc_obj_info>& entries, bool *truncated, string& next_marker)
+{
+ bufferlist in, out;
+ cls_rgw_gc_list_op op;
+ op.marker = marker;
+ op.max = max;
+ op.expired_only = expired_only;
+ encode(op, in);
+
+ int r = io_ctx.exec(oid, RGW_GC_CLASS, RGW_GC_QUEUE_LIST_ENTRIES, in, out);
+ if (r < 0)
+ return r;
+
+ cls_rgw_gc_list_ret ret;
+ auto iter = out.cbegin();
+ try {
+ decode(ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ entries.swap(ret.entries);
+
+ *truncated = ret.truncated;
+
+ next_marker = std::move(ret.next_marker);
+
+ return 0;
+}
+
+void cls_rgw_gc_queue_remove_entries(ObjectWriteOperation& op, uint32_t num_entries)
+{
+ bufferlist in, out;
+ cls_rgw_gc_queue_remove_entries_op rem_op;
+ rem_op.num_entries = num_entries;
+ encode(rem_op, in);
+ op.exec(RGW_GC_CLASS, RGW_GC_QUEUE_REMOVE_ENTRIES, in);
+}
+
+void cls_rgw_gc_queue_defer_entry(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info)
+{
+ bufferlist in;
+ cls_rgw_gc_queue_defer_entry_op defer_op;
+ defer_op.expiration_secs = expiration_secs;
+ defer_op.info = info;
+ encode(defer_op, in);
+ op.exec(RGW_GC_CLASS, RGW_GC_QUEUE_UPDATE_ENTRY, in);
+}
--- /dev/null
+#ifndef CEPH_CLS_RGW_GC_CLIENT_H
+#define CEPH_CLS_RGW_GC_CLIENT_H
+
+#include "include/rados/librados.hpp"
+#include "cls/rgw_gc/cls_rgw_gc_types.h"
+#include "cls/queue/cls_queue_ops.h"
+#include "common/ceph_time.h"
+
+void cls_rgw_gc_queue_init(librados::ObjectWriteOperation& op, uint64_t size, uint64_t num_urgent_data_entries);
+int cls_rgw_gc_queue_get_capacity(librados::IoCtx& io_ctx, const string& oid, uint64_t& size);
+void cls_rgw_gc_queue_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info);
+int cls_rgw_gc_queue_list_entries(librados::IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max, bool expired_only,
+ list<cls_rgw_gc_obj_info>& entries, bool *truncated, string& next_marker);
+void cls_rgw_gc_queue_remove_entries(librados::ObjectWriteOperation& op, uint32_t num_entries);
+void cls_rgw_gc_queue_defer_entry(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info);
+
+#endif
\ No newline at end of file
--- /dev/null
+#ifndef CEPH_CLS_RGW_GC_CONSTS_H
+#define CEPH_CLS_RGW_GC_CONSTS_H
+
+#define RGW_GC_CLASS "rgw_gc"
+
+#define RGW_GC_QUEUE_INIT "rgw_gc_queue_init"
+#define RGW_GC_QUEUE_ENQUEUE "rgw_gc_queue_enqueue"
+#define RGW_GC_QUEUE_LIST_ENTRIES "rgw_gc_queue_list_entries"
+#define RGW_GC_QUEUE_REMOVE_ENTRIES "rgw_gc_queue_remove_entries"
+#define RGW_GC_QUEUE_UPDATE_ENTRY "rgw_gc_queue_update_entry"
+
+#endif
\ No newline at end of file
--- /dev/null
+#ifndef CEPH_CLS_RGW_GC_OPS_H
+#define CEPH_CLS_RGW_GC_OPS_H
+
+#include "cls/rgw/cls_rgw_types.h"
+
+struct cls_rgw_gc_queue_init_op {
+ uint64_t size;
+ uint64_t num_urgent_data_entries{0};
+
+ cls_rgw_gc_queue_init_op() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(size, bl);
+ encode(num_urgent_data_entries, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(size, bl);
+ decode(num_urgent_data_entries, bl);
+ DECODE_FINISH(bl);
+ }
+
+};
+WRITE_CLASS_ENCODER(cls_rgw_gc_queue_init_op)
+
+struct cls_rgw_gc_queue_remove_entries_op {
+ uint64_t num_entries;
+
+ cls_rgw_gc_queue_remove_entries_op() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(num_entries, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(num_entries, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_rgw_gc_queue_remove_entries_op)
+
+struct cls_rgw_gc_queue_defer_entry_op {
+ uint32_t expiration_secs;
+ cls_rgw_gc_obj_info info;
+ cls_rgw_gc_queue_defer_entry_op() : expiration_secs(0) {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(expiration_secs, bl);
+ encode(info, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(expiration_secs, bl);
+ decode(info, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_rgw_gc_queue_defer_entry_op)
+#endif /* CEPH_CLS_RGW_GC_OPS_H */
\ No newline at end of file
--- /dev/null
+#ifndef CEPH_CLS_RGW_GC_TYPES_H
+#define CEPH_CLS_RGW_GC_TYPES_H
+
+#include "include/types.h"
+#include <unordered_map>
+
+struct cls_rgw_gc_urgent_data
+{
+ std::unordered_map<string,ceph::real_time> urgent_data_map;
+ uint32_t num_urgent_data_entries{0}; // requested by user
+ uint32_t num_head_urgent_entries{0}; // actual number of entries in queue head
+ uint32_t num_xattr_urgent_entries{0}; // actual number of entries in xattr in case of spill over
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(urgent_data_map, bl);
+ encode(num_urgent_data_entries, bl);
+ encode(num_head_urgent_entries, bl);
+ encode(num_xattr_urgent_entries, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(urgent_data_map, bl);
+ decode(num_urgent_data_entries, bl);
+ decode(num_head_urgent_entries, bl);
+ decode(num_xattr_urgent_entries, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_rgw_gc_urgent_data)
+
+#endif
\ No newline at end of file
add_dependencies(osd cls_rbd)
endif()
if(WITH_RADOSGW)
- add_dependencies(osd cls_otp cls_rgw cls_queue)
+ add_dependencies(osd cls_otp cls_rgw cls_queue cls_rgw_gc)
endif()
PRIVATE
librados cls_otp_client cls_lock_client cls_rgw_client cls_refcount_client
cls_log_client cls_timeindex_client cls_version_client
- cls_user_client cls_queue_client ceph-common common_utf8 global
+ cls_user_client cls_rgw_gc_client ceph-common common_utf8 global
${CURL_LIBRARIES}
${EXPAT_LIBRARIES}
${OPENLDAP_LIBRARIES} ${CRYPTO_LIBS}
cls_timeindex_client
cls_version_client
cls_user_client
- cls_queue_client
+ cls_rgw_gc_client
global
${CURL_LIBRARIES}
${EXPAT_LIBRARIES}
#include "include/scope_guard.h"
#include "include/rados/librados.hpp"
#include "cls/rgw/cls_rgw_client.h"
-#include "cls/queue/cls_queue_client.h"
+#include "cls/rgw_gc/cls_rgw_gc_client.h"
#include "cls/refcount/cls_refcount_client.h"
#include "rgw_perf_counters.h"
#include "cls/lock/cls_lock_client.h"
librados::ObjectWriteOperation op;
op.create(true);
uint64_t queue_size = 1048576, num_urgent_data_entries = 50;
- cls_rgw_gc_init_queue(op, obj_names[i], queue_size, num_urgent_data_entries);
+ cls_rgw_gc_queue_init(op, queue_size, num_urgent_data_entries);
store->gc_operate(obj_names[i], &op);
}
}
info.tag = tag;
//cls_rgw_gc_set_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
- cls_rgw_gc_enqueue(op, cct->_conf->rgw_gc_obj_min_wait, info);
+ cls_rgw_gc_queue_enqueue(op, cct->_conf->rgw_gc_obj_min_wait, info);
}
int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync)
ObjectWriteOperation op;
//cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag);
- cls_rgw_gc_defer_entry_queue(op, cct->_conf->rgw_gc_obj_min_wait, info);
+ cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
int i = tag_index(tag);
int RGWGC::remove(int index, int num_entries, librados::AioCompletion **pc)
{
ObjectWriteOperation op;
- cls_rgw_gc_remove_entries_queue(op, num_entries);
+ cls_rgw_gc_queue_remove_entries(op, num_entries);
return store->gc_aio_operate(obj_names[index], &op, pc);
}
for (; *index < max_objs && result.size() < max; (*index)++, marker.clear()) {
std::list<cls_rgw_gc_obj_info> entries;
//int ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), expired_only, entries, truncated, next_marker);
- int ret = cls_rgw_gc_list_queue(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), expired_only, entries, truncated, next_marker);
+ int ret = cls_rgw_gc_queue_list_entries(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), expired_only, entries, truncated, next_marker);
if (ret == -ENOENT)
continue;
if (ret < 0)
//ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max,
// expired_only, entries, &truncated, next_marker);
- ret = cls_rgw_gc_list_queue(store->gc_pool_ctx, obj_names[index], marker, max, expired_only, entries, &truncated, next_marker);
+ ret = cls_rgw_gc_queue_list_entries(store->gc_pool_ctx, obj_names[index], marker, max, expired_only, entries, &truncated, next_marker);
ldpp_dout(this, 20) <<
- "RGWGC::process cls_rgw_gc_list_queue returned with returned:" << ret <<
+ "RGWGC::process cls_rgw_gc_queue_list_entries returned with returned:" << ret <<
", entries.size=" << entries.size() << ", truncated=" << truncated <<
", next_marker='" << next_marker << "'" << dendl;
ldout(cct, 0) << "defer chain tag=" << tag << dendl;
cls_rgw_obj_chain chain;
- update_gc_chain(state->obj, state->manifest, &chain);
+ update_gc_chain(state->obj, *state->manifest, &chain);
return gc->defer_chain(tag, chain, false);
}
add_subdirectory(cls_rgw)
add_subdirectory(cls_version)
add_subdirectory(cls_lua)
-add_subdirectory(cls_queue)
+add_subdirectory(cls_rgw_gc)
add_subdirectory(common)
add_subdirectory(compressor)
add_subdirectory(crush)
+++ /dev/null
-if(${WITH_RADOSGW})
- add_executable(ceph_test_cls_queue
- test_cls_queue.cc
- )
- target_link_libraries(ceph_test_cls_queue
- cls_queue_client
- librados
- global
- ${UNITTEST_LIBS}
- ${EXTRALIBS}
- ${BLKID_LIBRARIES}
- ${CMAKE_DL_LIBS}
- radostest-cxx)
- install(TARGETS
- ceph_test_cls_queue
- DESTINATION ${CMAKE_INSTALL_BINDIR})
-endif(${WITH_RADOSGW})
-
+++ /dev/null
-// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "include/types.h"
-
-#include "cls/rgw/cls_rgw_types.h"
-#include "cls/queue/cls_queue_client.h"
-#include "cls/queue/cls_queue_ops.h"
-
-#include "gtest/gtest.h"
-#include "test/librados/test_cxx.h"
-#include "global/global_context.h"
-
-#include <errno.h>
-#include <string>
-#include <vector>
-#include <map>
-#include <set>
-
-using namespace librados;
-
-librados::Rados rados;
-librados::IoCtx ioctx;
-string pool_name;
-
-
-/* must be the first test! */
-TEST(cls_queue, init)
-{
- pool_name = get_temp_pool_name();
- /* create pool */
- ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
- ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
-}
-
-
-string str_int(string s, int i)
-{
- char buf[32];
- snprintf(buf, sizeof(buf), "-%d", i);
- s.append(buf);
-
- return s;
-}
-
-/* test garbage collection */
-static void create_obj(cls_rgw_obj& obj, int i, int j)
-{
- char buf[32];
- snprintf(buf, sizeof(buf), "-%d.%d", i, j);
- obj.pool = "pool";
- obj.pool.append(buf);
- obj.key.name = "oid";
- obj.key.name.append(buf);
- obj.loc = "loc";
- obj.loc.append(buf);
-}
-
-TEST(cls_queue, gc_queue_ops1)
-{
- //Testing queue ops when data size is NOT a multiple of queue size
- string queue_name = "my-queue";
- uint64_t queue_size = 322, num_urgent_data_entries = 10;
- librados::ObjectWriteOperation op;
- op.create(true);
- cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
-
- uint64_t size = 0;
- int ret = cls_rgw_gc_get_queue_size(ioctx, queue_name, size);
- ASSERT_EQ(0, ret);
- ASSERT_EQ(queue_size, size);
-
- //Test enqueue
- for (int i = 0; i < 2; i++) {
- string tag = "chain-" + to_string(i);
- librados::ObjectWriteOperation op;
- cls_rgw_gc_obj_info info;
-
- cls_rgw_obj obj1, obj2;
- create_obj(obj1, i, 1);
- create_obj(obj2, i, 2);
- info.chain.objs.push_back(obj1);
- info.chain.objs.push_back(obj2);
-
- info.tag = tag;
- cls_rgw_gc_enqueue(op, 0, info);
- if (i == 1) {
- ASSERT_EQ(-ENOSPC, ioctx.operate(queue_name, &op));
- } else {
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
- }
- }
-
- //Test remove queue entries
- librados::ObjectWriteOperation remove_op;
- string marker1;
- uint64_t num_entries = 1;
- cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
-
- //Test enqueue again
- for (int i = 0; i < 1; i++) {
- string tag = "chain-" + to_string(i);
- librados::ObjectWriteOperation op;
- cls_rgw_gc_obj_info info;
-
- cls_rgw_obj obj1, obj2;
- create_obj(obj1, i, 1);
- create_obj(obj2, i, 2);
- info.chain.objs.push_back(obj1);
- info.chain.objs.push_back(obj2);
-
- info.tag = tag;
- cls_rgw_gc_enqueue(op, 0, info);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
- }
-
- //Test list queue
- list<cls_rgw_gc_obj_info> list_info1;
- string marker, next_marker;
- uint64_t max = 1;
- bool expired_only = false, truncated;
- cls_rgw_gc_list_queue(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
- ASSERT_EQ(1, list_info1.size());
-
- for (auto it : list_info1) {
- std::cerr << "[ ] list info tag = " << it.tag << std::endl;
- ASSERT_EQ("chain-0", it.tag);
- }
-}
-
-TEST(cls_queue, gc_queue_ops2)
-{
- //Testing list queue
- string queue_name = "my-second-queue";
- uint64_t queue_size = 334, num_urgent_data_entries = 10;
- librados::ObjectWriteOperation op;
- op.create(true);
- cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
-
- uint64_t size = 0;
- int ret = cls_rgw_gc_get_queue_size(ioctx, queue_name, size);
- ASSERT_EQ(0, ret);
- ASSERT_EQ(size, queue_size);
-
- //Test list queue, when queue is empty
- list<cls_rgw_gc_obj_info> list_info;
- string marker1, next_marker1;
- uint64_t max1 = 2;
- bool expired_only1 = false, truncated1;
- cls_rgw_gc_list_queue(ioctx, queue_name, marker1, max1, expired_only1, list_info, &truncated1, next_marker1);
- ASSERT_EQ(0, list_info.size());
-
- //Test enqueue
- for (int i = 0; i < 3; i++) {
- string tag = "chain-" + to_string(i);
- librados::ObjectWriteOperation op;
- cls_rgw_gc_obj_info info;
-
- cls_rgw_obj obj1, obj2;
- create_obj(obj1, i, 1);
- create_obj(obj2, i, 2);
- info.chain.objs.push_back(obj1);
- info.chain.objs.push_back(obj2);
-
- info.tag = tag;
- cls_rgw_gc_enqueue(op, 0, info);
- if (i == 2) {
- ASSERT_EQ(-ENOSPC, ioctx.operate(queue_name, &op));
- } else {
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
- }
- }
-
- //Test list queue
- list<cls_rgw_gc_obj_info> list_info1, list_info2, list_info3;
- string marker, next_marker;
- uint64_t max = 2;
- bool expired_only = false, truncated;
- cls_rgw_gc_list_queue(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
- ASSERT_EQ(2, list_info1.size());
-
- int i = 0;
- for (auto it : list_info1) {
- string tag = "chain-" + to_string(i);
- ASSERT_EQ(tag, it.tag);
- i++;
- }
-
- max = 1;
- truncated = false;
- cls_rgw_gc_list_queue(ioctx, queue_name, marker, max, expired_only, list_info2, &truncated, next_marker);
- auto it = list_info2.front();
- ASSERT_EQ(1, list_info2.size());
- ASSERT_EQ(true, truncated);
- ASSERT_EQ("chain-0", it.tag);
- std::cerr << "[ ] next_marker is: = " << next_marker << std::endl;
-
- marker = next_marker;
- cls_rgw_gc_list_queue(ioctx, queue_name, marker, max, expired_only, list_info3, &truncated, next_marker);
- it = list_info3.front();
- ASSERT_EQ(1, list_info3.size());
- ASSERT_EQ(false, truncated);
- ASSERT_EQ("chain-1", it.tag);
-}
-
-TEST(cls_queue, gc_queue_ops3)
-{
- //Testing remove queue entries
- string queue_name = "my-third-queue";
- uint64_t queue_size = 501, num_urgent_data_entries = 10;
- librados::ObjectWriteOperation op;
- op.create(true);
- cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
-
- uint64_t size = 0;
- int ret = cls_rgw_gc_get_queue_size(ioctx, queue_name, size);
- ASSERT_EQ(0, ret);
- ASSERT_EQ(size, queue_size);
-
- //Test remove queue, when queue is empty
- librados::ObjectWriteOperation remove_op;
- string marker1;
- uint64_t num_entries = 2;
- cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
-
- cls_rgw_gc_obj_info defer_info;
-
- //Test enqueue
- for (int i = 0; i < 2; i++) {
- string tag = "chain-" + to_string(i);
- librados::ObjectWriteOperation op;
- cls_rgw_gc_obj_info info;
-
- cls_rgw_obj obj1, obj2;
- create_obj(obj1, i, 1);
- create_obj(obj2, i, 2);
- info.chain.objs.push_back(obj1);
- info.chain.objs.push_back(obj2);
-
- info.tag = tag;
- cls_rgw_gc_enqueue(op, 5, info);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
- if (i == 0)
- defer_info = info;
- }
-
- //Test defer entry for 1st element
- librados::ObjectWriteOperation defer_op;
- cls_rgw_gc_defer_entry_queue(defer_op, 10, defer_info);
- ASSERT_EQ(0, ioctx.operate(queue_name, &defer_op));
-
- //Test list queue
- list<cls_rgw_gc_obj_info> list_info1, list_info2;
- string marker, next_marker;
- uint64_t max = 2;
- bool expired_only = false, truncated;
- cls_rgw_gc_list_queue(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
- ASSERT_EQ(2, list_info1.size());
-
- int i = 0;
- for (auto it : list_info1) {
- std::cerr << "[ ] list info tag = " << it.tag << std::endl;
- if (i == 0) {
- ASSERT_EQ("chain-1", it.tag);
- }
- if (i == 1) {
- ASSERT_EQ("chain-0", it.tag);
- }
- i++;
- }
-
- //Test remove entries
- num_entries = 2;
- cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
-
- //Test list queue again
- cls_rgw_gc_list_queue(ioctx, queue_name, marker, max, expired_only, list_info2, &truncated, next_marker);
- ASSERT_EQ(0, list_info2.size());
-
-}
-
-TEST(cls_queue, gc_queue_ops4)
-{
- //Testing remove queue entries
- string queue_name = "my-fourth-queue";
- uint64_t queue_size = 501, num_urgent_data_entries = 10;
- librados::ObjectWriteOperation op;
- op.create(true);
- cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
-
- uint64_t size = 0;
- int ret = cls_rgw_gc_get_queue_size(ioctx, queue_name, size);
- ASSERT_EQ(0, ret);
- ASSERT_EQ(size, queue_size);
-
- //Test remove queue, when queue is empty
- librados::ObjectWriteOperation remove_op;
- string marker1;
- uint64_t num_entries = 2;
-
- cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
-
- cls_rgw_gc_obj_info defer_info;
-
- //Test enqueue
- for (int i = 0; i < 2; i++) {
- string tag = "chain-" + to_string(i);
- librados::ObjectWriteOperation op;
- cls_rgw_gc_obj_info info;
-
- cls_rgw_obj obj1, obj2;
- create_obj(obj1, i, 1);
- create_obj(obj2, i, 2);
- info.chain.objs.push_back(obj1);
- info.chain.objs.push_back(obj2);
-
- info.tag = tag;
- cls_rgw_gc_enqueue(op, 5, info);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
- defer_info = info;
- }
-
- //Test defer entry for last element
- librados::ObjectWriteOperation defer_op;
- cls_rgw_gc_defer_entry_queue(defer_op, 10, defer_info);
- ASSERT_EQ(0, ioctx.operate(queue_name, &defer_op));
-
- //Test list queue
- list<cls_rgw_gc_obj_info> list_info1, list_info2;
- string marker, next_marker;
- uint64_t max = 2;
- bool expired_only = false, truncated;
- cls_rgw_gc_list_queue(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
- ASSERT_EQ(2, list_info1.size());
-
- int i = 0;
- for (auto it : list_info1) {
- string tag = "chain-" + to_string(i);
- ASSERT_EQ(tag, it.tag);
- i++;
- }
-
- //Test remove entries
- num_entries = 2;
- cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
-
- //Test list queue again
- cls_rgw_gc_list_queue(ioctx, queue_name, marker, max, expired_only, list_info2, &truncated, next_marker);
- ASSERT_EQ(0, list_info2.size());
-
-}
-
-TEST(cls_queue, gc_queue_ops5)
-{
- //Testing remove queue entries
- string queue_name = "my-fifth-queue";
- uint64_t queue_size = 501, num_urgent_data_entries = 10;
- librados::ObjectWriteOperation op;
- op.create(true);
- cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
-
- uint64_t size = 0;
- int ret = cls_rgw_gc_get_queue_size(ioctx, queue_name, size);
- ASSERT_EQ(0, ret);
- ASSERT_EQ(size, queue_size);
-
- //Test enqueue
- for (int i = 0; i < 3; i++) {
- string tag = "chain-" + to_string(i);
- librados::ObjectWriteOperation op;
- cls_rgw_gc_obj_info info;
-
- cls_rgw_obj obj1, obj2;
- create_obj(obj1, i, 1);
- create_obj(obj2, i, 2);
- info.chain.objs.push_back(obj1);
- info.chain.objs.push_back(obj2);
-
- info.tag = tag;
- if (i == 2) {
- cls_rgw_gc_enqueue(op, 300, info);
- } else {
- cls_rgw_gc_enqueue(op, 0, info);
- }
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
- }
- //Test list queue for expired entries only
- list<cls_rgw_gc_obj_info> list_info1, list_info2;
- string marker, next_marker, marker1;
- uint64_t max = 10;
- bool expired_only = true, truncated;
- cls_rgw_gc_list_queue(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
- ASSERT_EQ(2, list_info1.size());
-
- int i = 0;
- for (auto it : list_info1) {
- string tag = "chain-" + to_string(i);
- ASSERT_EQ(tag, it.tag);
- i++;
- }
-
- //Test remove entries
- librados::ObjectWriteOperation remove_op;
- auto num_entries = list_info1.size();
- cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
-
- //Test list queue again for all entries
- expired_only = false;
- cls_rgw_gc_list_queue(ioctx, queue_name, marker, max, expired_only, list_info2, &truncated, next_marker);
- ASSERT_EQ(1, list_info2.size());
-
-}
-
-TEST(cls_queue, gc_queue_ops6)
-{
- //Testing list queue, when data size is split at the end of the queue
- string queue_name = "my-sixth-queue";
- uint64_t queue_size = 341, num_urgent_data_entries = 10;
- librados::ObjectWriteOperation op;
- op.create(true);
- cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
-
- uint64_t size = 0;
- int ret = cls_rgw_gc_get_queue_size(ioctx, queue_name, size);
- ASSERT_EQ(0, ret);
- ASSERT_EQ(size, queue_size);
-
- //Test enqueue
- for (int i = 0; i < 2; i++) {
- string tag = "chain-" + to_string(i);
- librados::ObjectWriteOperation op;
- cls_rgw_gc_obj_info info;
-
- cls_rgw_obj obj1, obj2;
- create_obj(obj1, i, 1);
- create_obj(obj2, i, 2);
- info.chain.objs.push_back(obj1);
- info.chain.objs.push_back(obj2);
-
- info.tag = tag;
- cls_rgw_gc_enqueue(op, 0, info);
-
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
- }
-
- //Remove one element from queue
- librados::ObjectWriteOperation remove_op;
- string marker1;
- uint64_t num_entries = 1;
-
- cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
-
- //Enqueue one more element
- librados::ObjectWriteOperation enq_op;
- cls_rgw_gc_obj_info info;
-
- cls_rgw_obj obj1, obj2;
- create_obj(obj1, 2, 1);
- create_obj(obj2, 2, 2);
- info.chain.objs.push_back(obj1);
- info.chain.objs.push_back(obj2);
-
- info.tag = "chain-2";
- cls_rgw_gc_enqueue(enq_op, 0, info);
-
- ASSERT_EQ(0, ioctx.operate(queue_name, &enq_op));
-
- //Test list queue
- list<cls_rgw_gc_obj_info> list_info1, list_info2, list_info3;
- string marker, next_marker;
- uint64_t max = 2;
- bool expired_only = false, truncated;
- cls_rgw_gc_list_queue(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
- ASSERT_EQ(2, list_info1.size());
-
- int i = 1;
- for (auto it : list_info1) {
- string tag = "chain-" + to_string(i);
- ASSERT_EQ(tag, it.tag);
- i++;
- }
-}
-
-TEST(cls_queue, gc_queue_ops7)
-{
- //Testing list queue, when data size is written at the end of queue and data is written after wrap around
- string queue_name = "my-seventh-queue";
- uint64_t queue_size = 342, num_urgent_data_entries = 10;
- librados::ObjectWriteOperation op;
- op.create(true);
- cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
-
- uint64_t size = 0;
- int ret = cls_rgw_gc_get_queue_size(ioctx, queue_name, size);
- ASSERT_EQ(0, ret);
- ASSERT_EQ(size, queue_size);
-
- //Test enqueue
- for (int i = 0; i < 2; i++) {
- string tag = "chain-" + to_string(i);
- librados::ObjectWriteOperation op;
- cls_rgw_gc_obj_info info;
-
- cls_rgw_obj obj1, obj2;
- create_obj(obj1, i, 1);
- create_obj(obj2, i, 2);
- info.chain.objs.push_back(obj1);
- info.chain.objs.push_back(obj2);
-
- info.tag = tag;
- cls_rgw_gc_enqueue(op, 0, info);
-
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
- }
-
- //Remove one element from queue
- librados::ObjectWriteOperation remove_op;
- string marker1;
- uint64_t num_entries = 1;
-
- cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
-
- //Enqueue one more element
- librados::ObjectWriteOperation enq_op;
- cls_rgw_gc_obj_info info;
-
- cls_rgw_obj obj1, obj2;
- create_obj(obj1, 2, 1);
- create_obj(obj2, 2, 2);
- info.chain.objs.push_back(obj1);
- info.chain.objs.push_back(obj2);
-
- info.tag = "chain-2";
- cls_rgw_gc_enqueue(enq_op, 0, info);
-
- ASSERT_EQ(0, ioctx.operate(queue_name, &enq_op));
-
- //Test list queue
- list<cls_rgw_gc_obj_info> list_info1, list_info2, list_info3;
- string marker, next_marker;
- uint64_t max = 2;
- bool expired_only = false, truncated;
- cls_rgw_gc_list_queue(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
- ASSERT_EQ(2, list_info1.size());
-
- int i = 1;
- for (auto it : list_info1) {
- string tag = "chain-" + to_string(i);
- ASSERT_EQ(tag, it.tag);
- i++;
- }
-}
-
-TEST(cls_queue, gc_queue_ops8)
-{
- //Testing list queue, when data is split at the end of the queue
- string queue_name = "my-eighth-queue";
- uint64_t queue_size = 344, num_urgent_data_entries = 10;
- librados::ObjectWriteOperation op;
- op.create(true);
- cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
-
- uint64_t size = 0;
- int ret = cls_rgw_gc_get_queue_size(ioctx, queue_name, size);
- ASSERT_EQ(0, ret);
- ASSERT_EQ(size, queue_size);
-
- //Test enqueue
- for (int i = 0; i < 2; i++) {
- string tag = "chain-" + to_string(i);
- librados::ObjectWriteOperation op;
- cls_rgw_gc_obj_info info;
-
- cls_rgw_obj obj1, obj2;
- create_obj(obj1, i, 1);
- create_obj(obj2, i, 2);
- info.chain.objs.push_back(obj1);
- info.chain.objs.push_back(obj2);
-
- info.tag = tag;
- cls_rgw_gc_enqueue(op, 0, info);
-
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
- }
-
- //Remove one element from queue
- librados::ObjectWriteOperation remove_op;
- string marker1;
- uint64_t num_entries = 1;
-
- cls_rgw_gc_remove_entries_queue(remove_op, num_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
-
- //Enqueue one more element
- librados::ObjectWriteOperation enq_op;
- cls_rgw_gc_obj_info info;
-
- cls_rgw_obj obj1, obj2;
- create_obj(obj1, 2, 1);
- create_obj(obj2, 2, 2);
- info.chain.objs.push_back(obj1);
- info.chain.objs.push_back(obj2);
-
- info.tag = "chain-2";
- cls_rgw_gc_enqueue(enq_op, 0, info);
-
- ASSERT_EQ(0, ioctx.operate(queue_name, &enq_op));
-
- //Test list queue
- list<cls_rgw_gc_obj_info> list_info1, list_info2, list_info3;
- string marker, next_marker;
- uint64_t max = 2;
- bool expired_only = false, truncated;
- cls_rgw_gc_list_queue(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
- ASSERT_EQ(2, list_info1.size());
-
- int i = 1;
- for (auto it : list_info1) {
- string tag = "chain-" + to_string(i);
- ASSERT_EQ(tag, it.tag);
- i++;
- }
-}
-
-TEST(cls_queue, gc_queue_ops9)
-{
- //Testing remove queue entries
- string queue_name = "my-ninth-queue";
- uint64_t queue_size = 668, num_urgent_data_entries = 1;
- librados::ObjectWriteOperation op;
- op.create(true);
- cls_rgw_gc_init_queue(op, queue_name, queue_size, num_urgent_data_entries);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
-
- uint64_t size = 0;
- int ret = cls_rgw_gc_get_queue_size(ioctx, queue_name, size);
- ASSERT_EQ(0, ret);
- ASSERT_EQ(size, queue_size);
-
- cls_rgw_gc_obj_info defer_info1, defer_info2;
-
- //Test enqueue
- for (int i = 0; i < 2; i++) {
- string tag = "chain-" + to_string(i);
- librados::ObjectWriteOperation op;
- cls_rgw_gc_obj_info info;
-
- cls_rgw_obj obj1, obj2;
- create_obj(obj1, i, 1);
- create_obj(obj2, i, 2);
- info.chain.objs.push_back(obj1);
- info.chain.objs.push_back(obj2);
-
- info.tag = tag;
- cls_rgw_gc_enqueue(op, 5, info);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
- if (i == 0) {
- defer_info1 = info;
- }
- if (i == 1) {
- defer_info2 = info;
- }
- }
-
- //Test defer entry for last element
- librados::ObjectWriteOperation defer_op;
- cls_rgw_gc_defer_entry_queue(defer_op, 10, defer_info2);
- ASSERT_EQ(0, ioctx.operate(queue_name, &defer_op));
-
- //Test defer entry for first element
- cls_rgw_gc_defer_entry_queue(defer_op, 10, defer_info1);
- ASSERT_EQ(-ENOSPC, ioctx.operate(queue_name, &defer_op));
-}
-
-/* must be last test! */
-TEST(cls_queue, finalize)
-{
- /* remove pool */
- ioctx.close();
- ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
-}
--- /dev/null
+if(${WITH_RADOSGW})
+ add_executable(ceph_test_cls_rgw_gc
+ test_cls_rgw_gc.cc
+ )
+ target_link_libraries(ceph_test_cls_rgw_gc
+ cls_rgw_gc_client
+ librados
+ global
+ ${UNITTEST_LIBS}
+ ${EXTRALIBS}
+ ${BLKID_LIBRARIES}
+ ${CMAKE_DL_LIBS}
+ radostest-cxx)
+ install(TARGETS
+ ceph_test_cls_rgw_gc
+ DESTINATION ${CMAKE_INSTALL_BINDIR})
+endif(${WITH_RADOSGW})
+
--- /dev/null
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/types.h"
+
+#include "cls/rgw/cls_rgw_types.h"
+#include "cls/rgw_gc/cls_rgw_gc_client.h"
+#include "cls/rgw_gc/cls_rgw_gc_ops.h"
+
+#include "gtest/gtest.h"
+#include "test/librados/test_cxx.h"
+#include "global/global_context.h"
+
+#include <errno.h>
+#include <string>
+#include <vector>
+#include <map>
+#include <set>
+
+using namespace librados;
+
+librados::Rados rados;
+librados::IoCtx ioctx;
+string pool_name;
+
+
+/* must be the first test! */
+TEST(cls_rgw_gc, init)
+{
+ pool_name = get_temp_pool_name();
+ /* create pool */
+ ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+ ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+}
+
+
+string str_int(string s, int i)
+{
+ char buf[32];
+ snprintf(buf, sizeof(buf), "-%d", i);
+ s.append(buf);
+
+ return s;
+}
+
+/* test garbage collection */
+static void create_obj(cls_rgw_obj& obj, int i, int j)
+{
+ char buf[32];
+ snprintf(buf, sizeof(buf), "-%d.%d", i, j);
+ obj.pool = "pool";
+ obj.pool.append(buf);
+ obj.key.name = "oid";
+ obj.key.name.append(buf);
+ obj.loc = "loc";
+ obj.loc.append(buf);
+}
+
+TEST(cls_rgw_gc, gc_queue_ops1)
+{
+ //Testing queue ops when data size is NOT a multiple of queue size
+ string queue_name = "my-queue";
+ uint64_t queue_size = 322, num_urgent_data_entries = 10;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_rgw_gc_queue_init(op, queue_size, num_urgent_data_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ uint64_t size = 0;
+ int ret = cls_rgw_gc_queue_get_capacity(ioctx, queue_name, size);
+ ASSERT_EQ(0, ret);
+ ASSERT_EQ(queue_size, size);
+
+ //Test enqueue
+ for (int i = 0; i < 2; i++) {
+ string tag = "chain-" + to_string(i);
+ librados::ObjectWriteOperation op;
+ cls_rgw_gc_obj_info info;
+
+ cls_rgw_obj obj1, obj2;
+ create_obj(obj1, i, 1);
+ create_obj(obj2, i, 2);
+ info.chain.objs.push_back(obj1);
+ info.chain.objs.push_back(obj2);
+
+ info.tag = tag;
+ cls_rgw_gc_queue_enqueue(op, 0, info);
+ if (i == 1) {
+ ASSERT_EQ(-ENOSPC, ioctx.operate(queue_name, &op));
+ } else {
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ }
+ }
+
+ //Test remove queue entries
+ librados::ObjectWriteOperation remove_op;
+ string marker1;
+ uint64_t num_entries = 1;
+ cls_rgw_gc_queue_remove_entries(remove_op, num_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
+
+ //Test enqueue again
+ for (int i = 0; i < 1; i++) {
+ string tag = "chain-" + to_string(i);
+ librados::ObjectWriteOperation op;
+ cls_rgw_gc_obj_info info;
+
+ cls_rgw_obj obj1, obj2;
+ create_obj(obj1, i, 1);
+ create_obj(obj2, i, 2);
+ info.chain.objs.push_back(obj1);
+ info.chain.objs.push_back(obj2);
+
+ info.tag = tag;
+ cls_rgw_gc_queue_enqueue(op, 0, info);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ }
+
+ //Test list queue
+ list<cls_rgw_gc_obj_info> list_info1;
+ string marker, next_marker;
+ uint64_t max = 1;
+ bool expired_only = false, truncated;
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
+ ASSERT_EQ(1, list_info1.size());
+
+ for (auto it : list_info1) {
+ std::cerr << "[ ] list info tag = " << it.tag << std::endl;
+ ASSERT_EQ("chain-0", it.tag);
+ }
+}
+
+TEST(cls_rgw_gc, gc_queue_ops2)
+{
+ //Testing list queue
+ string queue_name = "my-second-queue";
+ uint64_t queue_size = 334, num_urgent_data_entries = 10;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_rgw_gc_queue_init(op, queue_size, num_urgent_data_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ uint64_t size = 0;
+ int ret = cls_rgw_gc_queue_get_capacity(ioctx, queue_name, size);
+ ASSERT_EQ(0, ret);
+ ASSERT_EQ(size, queue_size);
+
+ //Test list queue, when queue is empty
+ list<cls_rgw_gc_obj_info> list_info;
+ string marker1, next_marker1;
+ uint64_t max1 = 2;
+ bool expired_only1 = false, truncated1;
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker1, max1, expired_only1, list_info, &truncated1, next_marker1);
+ ASSERT_EQ(0, list_info.size());
+
+ //Test enqueue
+ for (int i = 0; i < 3; i++) {
+ string tag = "chain-" + to_string(i);
+ librados::ObjectWriteOperation op;
+ cls_rgw_gc_obj_info info;
+
+ cls_rgw_obj obj1, obj2;
+ create_obj(obj1, i, 1);
+ create_obj(obj2, i, 2);
+ info.chain.objs.push_back(obj1);
+ info.chain.objs.push_back(obj2);
+
+ info.tag = tag;
+ cls_rgw_gc_queue_enqueue(op, 0, info);
+ if (i == 2) {
+ ASSERT_EQ(-ENOSPC, ioctx.operate(queue_name, &op));
+ } else {
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ }
+ }
+
+ //Test list queue
+ list<cls_rgw_gc_obj_info> list_info1, list_info2, list_info3;
+ string marker, next_marker;
+ uint64_t max = 2;
+ bool expired_only = false, truncated;
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
+ ASSERT_EQ(2, list_info1.size());
+
+ int i = 0;
+ for (auto it : list_info1) {
+ string tag = "chain-" + to_string(i);
+ ASSERT_EQ(tag, it.tag);
+ i++;
+ }
+
+ max = 1;
+ truncated = false;
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker, max, expired_only, list_info2, &truncated, next_marker);
+ auto it = list_info2.front();
+ ASSERT_EQ(1, list_info2.size());
+ ASSERT_EQ(true, truncated);
+ ASSERT_EQ("chain-0", it.tag);
+ std::cerr << "[ ] next_marker is: = " << next_marker << std::endl;
+
+ marker = next_marker;
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker, max, expired_only, list_info3, &truncated, next_marker);
+ it = list_info3.front();
+ ASSERT_EQ(1, list_info3.size());
+ ASSERT_EQ(false, truncated);
+ ASSERT_EQ("chain-1", it.tag);
+}
+
+TEST(cls_rgw_gc, gc_queue_ops3)
+{
+ //Testing remove queue entries
+ string queue_name = "my-third-queue";
+ uint64_t queue_size = 501, num_urgent_data_entries = 10;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_rgw_gc_queue_init(op, queue_size, num_urgent_data_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ uint64_t size = 0;
+ int ret = cls_rgw_gc_queue_get_capacity(ioctx, queue_name, size);
+ ASSERT_EQ(0, ret);
+ ASSERT_EQ(size, queue_size);
+
+ //Test remove queue, when queue is empty
+ librados::ObjectWriteOperation remove_op;
+ string marker1;
+ uint64_t num_entries = 2;
+ cls_rgw_gc_queue_remove_entries(remove_op, num_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
+
+ cls_rgw_gc_obj_info defer_info;
+
+ //Test enqueue
+ for (int i = 0; i < 2; i++) {
+ string tag = "chain-" + to_string(i);
+ librados::ObjectWriteOperation op;
+ cls_rgw_gc_obj_info info;
+
+ cls_rgw_obj obj1, obj2;
+ create_obj(obj1, i, 1);
+ create_obj(obj2, i, 2);
+ info.chain.objs.push_back(obj1);
+ info.chain.objs.push_back(obj2);
+
+ info.tag = tag;
+ cls_rgw_gc_queue_enqueue(op, 5, info);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ if (i == 0)
+ defer_info = info;
+ }
+
+ //Test defer entry for 1st element
+ librados::ObjectWriteOperation defer_op;
+ cls_rgw_gc_queue_defer_entry(defer_op, 10, defer_info);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &defer_op));
+
+ //Test list queue
+ list<cls_rgw_gc_obj_info> list_info1, list_info2;
+ string marker, next_marker;
+ uint64_t max = 2;
+ bool expired_only = false, truncated;
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
+ ASSERT_EQ(2, list_info1.size());
+
+ int i = 0;
+ for (auto it : list_info1) {
+ std::cerr << "[ ] list info tag = " << it.tag << std::endl;
+ if (i == 0) {
+ ASSERT_EQ("chain-1", it.tag);
+ }
+ if (i == 1) {
+ ASSERT_EQ("chain-0", it.tag);
+ }
+ i++;
+ }
+
+ //Test remove entries
+ num_entries = 2;
+ cls_rgw_gc_queue_remove_entries(remove_op, num_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
+
+ //Test list queue again
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker, max, expired_only, list_info2, &truncated, next_marker);
+ ASSERT_EQ(0, list_info2.size());
+
+}
+
+TEST(cls_rgw_gc, gc_queue_ops4)
+{
+ //Testing remove queue entries
+ string queue_name = "my-fourth-queue";
+ uint64_t queue_size = 501, num_urgent_data_entries = 10;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_rgw_gc_queue_init(op, queue_size, num_urgent_data_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ uint64_t size = 0;
+ int ret = cls_rgw_gc_queue_get_capacity(ioctx, queue_name, size);
+ ASSERT_EQ(0, ret);
+ ASSERT_EQ(size, queue_size);
+
+ //Test remove queue, when queue is empty
+ librados::ObjectWriteOperation remove_op;
+ string marker1;
+ uint64_t num_entries = 2;
+
+ cls_rgw_gc_queue_remove_entries(remove_op, num_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
+
+ cls_rgw_gc_obj_info defer_info;
+
+ //Test enqueue
+ for (int i = 0; i < 2; i++) {
+ string tag = "chain-" + to_string(i);
+ librados::ObjectWriteOperation op;
+ cls_rgw_gc_obj_info info;
+
+ cls_rgw_obj obj1, obj2;
+ create_obj(obj1, i, 1);
+ create_obj(obj2, i, 2);
+ info.chain.objs.push_back(obj1);
+ info.chain.objs.push_back(obj2);
+
+ info.tag = tag;
+ cls_rgw_gc_queue_enqueue(op, 5, info);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ defer_info = info;
+ }
+
+ //Test defer entry for last element
+ librados::ObjectWriteOperation defer_op;
+ cls_rgw_gc_queue_defer_entry(defer_op, 10, defer_info);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &defer_op));
+
+ //Test list queue
+ list<cls_rgw_gc_obj_info> list_info1, list_info2;
+ string marker, next_marker;
+ uint64_t max = 2;
+ bool expired_only = false, truncated;
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
+ ASSERT_EQ(2, list_info1.size());
+
+ int i = 0;
+ for (auto it : list_info1) {
+ string tag = "chain-" + to_string(i);
+ ASSERT_EQ(tag, it.tag);
+ i++;
+ }
+
+ //Test remove entries
+ num_entries = 2;
+ cls_rgw_gc_queue_remove_entries(remove_op, num_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
+
+ //Test list queue again
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker, max, expired_only, list_info2, &truncated, next_marker);
+ ASSERT_EQ(0, list_info2.size());
+
+}
+
+TEST(cls_rgw_gc, gc_queue_ops5)
+{
+ //Testing remove queue entries
+ string queue_name = "my-fifth-queue";
+ uint64_t queue_size = 501, num_urgent_data_entries = 10;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_rgw_gc_queue_init(op, queue_size, num_urgent_data_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ uint64_t size = 0;
+ int ret = cls_rgw_gc_queue_get_capacity(ioctx, queue_name, size);
+ ASSERT_EQ(0, ret);
+ ASSERT_EQ(size, queue_size);
+
+ //Test enqueue
+ for (int i = 0; i < 3; i++) {
+ string tag = "chain-" + to_string(i);
+ librados::ObjectWriteOperation op;
+ cls_rgw_gc_obj_info info;
+
+ cls_rgw_obj obj1, obj2;
+ create_obj(obj1, i, 1);
+ create_obj(obj2, i, 2);
+ info.chain.objs.push_back(obj1);
+ info.chain.objs.push_back(obj2);
+
+ info.tag = tag;
+ if (i == 2) {
+ cls_rgw_gc_queue_enqueue(op, 300, info);
+ } else {
+ cls_rgw_gc_queue_enqueue(op, 0, info);
+ }
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ }
+ //Test list queue for expired entries only
+ list<cls_rgw_gc_obj_info> list_info1, list_info2;
+ string marker, next_marker, marker1;
+ uint64_t max = 10;
+ bool expired_only = true, truncated;
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
+ ASSERT_EQ(2, list_info1.size());
+
+ int i = 0;
+ for (auto it : list_info1) {
+ string tag = "chain-" + to_string(i);
+ ASSERT_EQ(tag, it.tag);
+ i++;
+ }
+
+ //Test remove entries
+ librados::ObjectWriteOperation remove_op;
+ auto num_entries = list_info1.size();
+ cls_rgw_gc_queue_remove_entries(remove_op, num_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
+
+ //Test list queue again for all entries
+ expired_only = false;
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker, max, expired_only, list_info2, &truncated, next_marker);
+ ASSERT_EQ(1, list_info2.size());
+
+}
+
+TEST(cls_rgw_gc, gc_queue_ops6)
+{
+ //Testing list queue, when data size is split at the end of the queue
+ string queue_name = "my-sixth-queue";
+ uint64_t queue_size = 341, num_urgent_data_entries = 10;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_rgw_gc_queue_init(op, queue_size, num_urgent_data_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ uint64_t size = 0;
+ int ret = cls_rgw_gc_queue_get_capacity(ioctx, queue_name, size);
+ ASSERT_EQ(0, ret);
+ ASSERT_EQ(size, queue_size);
+
+ //Test enqueue
+ for (int i = 0; i < 2; i++) {
+ string tag = "chain-" + to_string(i);
+ librados::ObjectWriteOperation op;
+ cls_rgw_gc_obj_info info;
+
+ cls_rgw_obj obj1, obj2;
+ create_obj(obj1, i, 1);
+ create_obj(obj2, i, 2);
+ info.chain.objs.push_back(obj1);
+ info.chain.objs.push_back(obj2);
+
+ info.tag = tag;
+ cls_rgw_gc_queue_enqueue(op, 0, info);
+
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ }
+
+ //Remove one element from queue
+ librados::ObjectWriteOperation remove_op;
+ string marker1;
+ uint64_t num_entries = 1;
+
+ cls_rgw_gc_queue_remove_entries(remove_op, num_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
+
+ //Enqueue one more element
+ librados::ObjectWriteOperation enq_op;
+ cls_rgw_gc_obj_info info;
+
+ cls_rgw_obj obj1, obj2;
+ create_obj(obj1, 2, 1);
+ create_obj(obj2, 2, 2);
+ info.chain.objs.push_back(obj1);
+ info.chain.objs.push_back(obj2);
+
+ info.tag = "chain-2";
+ cls_rgw_gc_queue_enqueue(enq_op, 0, info);
+
+ ASSERT_EQ(0, ioctx.operate(queue_name, &enq_op));
+
+ //Test list queue
+ list<cls_rgw_gc_obj_info> list_info1, list_info2, list_info3;
+ string marker, next_marker;
+ uint64_t max = 2;
+ bool expired_only = false, truncated;
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
+ ASSERT_EQ(2, list_info1.size());
+
+ int i = 1;
+ for (auto it : list_info1) {
+ string tag = "chain-" + to_string(i);
+ ASSERT_EQ(tag, it.tag);
+ i++;
+ }
+}
+
+TEST(cls_rgw_gc, gc_queue_ops7)
+{
+ //Testing list queue, when data size is written at the end of queue and data is written after wrap around
+ string queue_name = "my-seventh-queue";
+ uint64_t queue_size = 342, num_urgent_data_entries = 10;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_rgw_gc_queue_init(op, queue_size, num_urgent_data_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ uint64_t size = 0;
+ int ret = cls_rgw_gc_queue_get_capacity(ioctx, queue_name, size);
+ ASSERT_EQ(0, ret);
+ ASSERT_EQ(size, queue_size);
+
+ //Test enqueue
+ for (int i = 0; i < 2; i++) {
+ string tag = "chain-" + to_string(i);
+ librados::ObjectWriteOperation op;
+ cls_rgw_gc_obj_info info;
+
+ cls_rgw_obj obj1, obj2;
+ create_obj(obj1, i, 1);
+ create_obj(obj2, i, 2);
+ info.chain.objs.push_back(obj1);
+ info.chain.objs.push_back(obj2);
+
+ info.tag = tag;
+ cls_rgw_gc_queue_enqueue(op, 0, info);
+
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ }
+
+ //Remove one element from queue
+ librados::ObjectWriteOperation remove_op;
+ string marker1;
+ uint64_t num_entries = 1;
+
+ cls_rgw_gc_queue_remove_entries(remove_op, num_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
+
+ //Enqueue one more element
+ librados::ObjectWriteOperation enq_op;
+ cls_rgw_gc_obj_info info;
+
+ cls_rgw_obj obj1, obj2;
+ create_obj(obj1, 2, 1);
+ create_obj(obj2, 2, 2);
+ info.chain.objs.push_back(obj1);
+ info.chain.objs.push_back(obj2);
+
+ info.tag = "chain-2";
+ cls_rgw_gc_queue_enqueue(enq_op, 0, info);
+
+ ASSERT_EQ(0, ioctx.operate(queue_name, &enq_op));
+
+ //Test list queue
+ list<cls_rgw_gc_obj_info> list_info1, list_info2, list_info3;
+ string marker, next_marker;
+ uint64_t max = 2;
+ bool expired_only = false, truncated;
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
+ ASSERT_EQ(2, list_info1.size());
+
+ int i = 1;
+ for (auto it : list_info1) {
+ string tag = "chain-" + to_string(i);
+ ASSERT_EQ(tag, it.tag);
+ i++;
+ }
+}
+
+TEST(cls_rgw_gc, gc_queue_ops8)
+{
+ //Testing list queue, when data is split at the end of the queue
+ string queue_name = "my-eighth-queue";
+ uint64_t queue_size = 344, num_urgent_data_entries = 10;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_rgw_gc_queue_init(op, queue_size, num_urgent_data_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ uint64_t size = 0;
+ int ret = cls_rgw_gc_queue_get_capacity(ioctx, queue_name, size);
+ ASSERT_EQ(0, ret);
+ ASSERT_EQ(size, queue_size);
+
+ //Test enqueue
+ for (int i = 0; i < 2; i++) {
+ string tag = "chain-" + to_string(i);
+ librados::ObjectWriteOperation op;
+ cls_rgw_gc_obj_info info;
+
+ cls_rgw_obj obj1, obj2;
+ create_obj(obj1, i, 1);
+ create_obj(obj2, i, 2);
+ info.chain.objs.push_back(obj1);
+ info.chain.objs.push_back(obj2);
+
+ info.tag = tag;
+ cls_rgw_gc_queue_enqueue(op, 0, info);
+
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ }
+
+ //Remove one element from queue
+ librados::ObjectWriteOperation remove_op;
+ string marker1;
+ uint64_t num_entries = 1;
+
+ cls_rgw_gc_queue_remove_entries(remove_op, num_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &remove_op));
+
+ //Enqueue one more element
+ librados::ObjectWriteOperation enq_op;
+ cls_rgw_gc_obj_info info;
+
+ cls_rgw_obj obj1, obj2;
+ create_obj(obj1, 2, 1);
+ create_obj(obj2, 2, 2);
+ info.chain.objs.push_back(obj1);
+ info.chain.objs.push_back(obj2);
+
+ info.tag = "chain-2";
+ cls_rgw_gc_queue_enqueue(enq_op, 0, info);
+
+ ASSERT_EQ(0, ioctx.operate(queue_name, &enq_op));
+
+ //Test list queue
+ list<cls_rgw_gc_obj_info> list_info1, list_info2, list_info3;
+ string marker, next_marker;
+ uint64_t max = 2;
+ bool expired_only = false, truncated;
+ cls_rgw_gc_queue_list_entries(ioctx, queue_name, marker, max, expired_only, list_info1, &truncated, next_marker);
+ ASSERT_EQ(2, list_info1.size());
+
+ int i = 1;
+ for (auto it : list_info1) {
+ string tag = "chain-" + to_string(i);
+ ASSERT_EQ(tag, it.tag);
+ i++;
+ }
+}
+
+TEST(cls_rgw_gc, gc_queue_ops9)
+{
+ //Testing remove queue entries
+ string queue_name = "my-ninth-queue";
+ uint64_t queue_size = 668, num_urgent_data_entries = 1;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_rgw_gc_queue_init(op, queue_size, num_urgent_data_entries);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+ uint64_t size = 0;
+ int ret = cls_rgw_gc_queue_get_capacity(ioctx, queue_name, size);
+ ASSERT_EQ(0, ret);
+ ASSERT_EQ(size, queue_size);
+
+ cls_rgw_gc_obj_info defer_info1, defer_info2;
+
+ //Test enqueue
+ for (int i = 0; i < 2; i++) {
+ string tag = "chain-" + to_string(i);
+ librados::ObjectWriteOperation op;
+ cls_rgw_gc_obj_info info;
+
+ cls_rgw_obj obj1, obj2;
+ create_obj(obj1, i, 1);
+ create_obj(obj2, i, 2);
+ info.chain.objs.push_back(obj1);
+ info.chain.objs.push_back(obj2);
+
+ info.tag = tag;
+ cls_rgw_gc_queue_enqueue(op, 5, info);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ if (i == 0) {
+ defer_info1 = info;
+ }
+ if (i == 1) {
+ defer_info2 = info;
+ }
+ }
+
+ //Test defer entry for last element
+ librados::ObjectWriteOperation defer_op;
+ cls_rgw_gc_queue_defer_entry(defer_op, 10, defer_info2);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &defer_op));
+
+ //Test defer entry for first element
+ cls_rgw_gc_queue_defer_entry(defer_op, 10, defer_info1);
+ ASSERT_EQ(-ENOSPC, ioctx.operate(queue_name, &defer_op));
+}
+
+/* must be last test! */
+TEST(cls_rgw_gc, finalize)
+{
+ /* remove pool */
+ ioctx.close();
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}