cls_method_handle_t h_init_queue;
cls_method_handle_t h_get_queue_size;
cls_method_handle_t h_enqueue;
- cls_method_handle_t h_dequeue;
cls_method_handle_t h_queue_list_entries;
cls_method_handle_t h_queue_remove_entries;
cls_method_handle_t h_queue_update_last_entry;
cls_register_cxx_method(h_class, INIT_QUEUE, CLS_METHOD_WR, cls_init_queue, &h_init_queue);
cls_register_cxx_method(h_class, GET_QUEUE_SIZE, CLS_METHOD_RD, cls_get_queue_size, &h_get_queue_size);
cls_register_cxx_method(h_class, ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_enqueue, &h_enqueue);
- cls_register_cxx_method(h_class, DEQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_dequeue, &h_dequeue);
cls_register_cxx_method(h_class, QUEUE_LIST_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_list_entries, &h_queue_list_entries);
cls_register_cxx_method(h_class, QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_remove_entries, &h_queue_remove_entries);
cls_register_cxx_method(h_class, QUEUE_UPDATE_LAST_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_update_last_entry, &h_queue_update_last_entry);
return 0;
}
-int cls_dequeue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- //get head and its size
- uint64_t head_size = 0;
- cls_queue_head head;
- int ret = get_queue_head_and_size(hctx, head, head_size);
- if (ret < 0) {
- return ret;
- }
-
- if (head.front == head.tail && head.is_empty) {
- CLS_LOG(1, "ERROR: Queue is empty\n");
- return -ENOENT;
- }
-
- uint64_t data_size = 0;
- bufferlist bl_size;
-
- if (head.front < head.tail) {
- //Read size of data first
- ret = cls_cxx_read(hctx, head.front, sizeof(uint64_t), &bl_size);
- if (ret < 0) {
- return ret;
- }
- auto iter = bl_size.cbegin();
- try {
- decode(data_size, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_dequeue: failed to decode data size \n");
- return -EINVAL;
- }
- CLS_LOG(1, "INFO: cls_dequeue: Data size: %lu, front offset: %lu\n", data_size, head.front);
- head.front += sizeof(uint64_t);
- //Read data based on size obtained above
- CLS_LOG(1, "INFO: cls_dequeue: Data is read from from front offset %lu\n", head.front);
- ret = cls_cxx_read2(hctx, head.front, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
- if (ret < 0) {
- return ret;
- }
- head.front += data_size;
- } else if (head.front >= head.tail) {
- uint64_t actual_data_size = head.queue_size - head.front;
- if (actual_data_size < sizeof(uint64_t)) {
- //Case 1. Data size has been spliced, first reconstruct data size
- CLS_LOG(1, "INFO: cls_dequeue: Spliced data size is read from from front offset %lu\n", head.front);
- ret = cls_cxx_read2(hctx, head.front, actual_data_size, &bl_size, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
- if (ret < 0) {
- return ret;
- }
- head.front = head_size;
- uint64_t remainder_data_size = sizeof(uint64_t) - actual_data_size;
- bufferlist bl_rem_data_size;
- CLS_LOG(1, "INFO: cls_dequeue: Remainder Spliced data size is read from from front offset %lu\n", head.front);
- ret = cls_cxx_read(hctx, head.front, remainder_data_size, &bl_rem_data_size);
- if (ret < 0) {
- return ret;
- }
- bl_size.claim_append(bl_rem_data_size);
- auto iter = bl_size.cbegin();
- try {
- decode(data_size, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_dequeue: failed to decode data size \n");
- return -EINVAL;
- }
- head.front += remainder_data_size;
- CLS_LOG(1, "INFO: cls_dequeue: Data is read from from front offset %lu\n", head.front);
- ret = cls_cxx_read2(hctx, head.front, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
- if (ret < 0) {
- return ret;
- }
- head.front += data_size;
- } else {
- ret = cls_cxx_read(hctx, head.front, sizeof(uint64_t), &bl_size);
- if (ret < 0) {
- return ret;
- }
- auto iter = bl_size.cbegin();
- try {
- decode(data_size, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_dequeue: failed to decode data size \n");
- return -EINVAL;
- }
- CLS_LOG(1, "INFO: cls_dequeue: Data size: %lu, front offset: %lu\n", data_size, head.front);
- head.front += sizeof(uint64_t);
-
- actual_data_size = head.queue_size - head.front;
-
- if (actual_data_size < data_size) {
- if (actual_data_size != 0) {
- //Case 2. Data has been spliced
- CLS_LOG(1, "INFO: cls_dequeue: Spliced data is read from from front offset %lu\n", head.front);
- ret = cls_cxx_read2(hctx, head.front, actual_data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
- if (ret < 0) {
- return ret;
- }
- }
- head.front = head_size;
- bufferlist bl_remainder;
- uint64_t remainder_size = data_size - actual_data_size;
- CLS_LOG(1, "INFO: cls_dequeue: Remaining Data is read from from front offset %lu\n", head.front);
- ret = cls_cxx_read2(hctx, head.front, remainder_size, &bl_remainder, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
- if (ret < 0) {
- return ret;
- }
- out->claim_append(bl_remainder);
- head.front += remainder_size;
- } else {
- //Case 3. No splicing
- CLS_LOG(1, "INFO: cls_dequeue: Data is read from from front offset %lu\n", head.front);
- ret = cls_cxx_read2(hctx, head.front, data_size, out, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
- if (ret < 0) {
- return ret;
- }
- head.front += data_size;
- }
- }
- }
-
- //front has reached the end, wrap it around
- if (head.front == head.queue_size) {
- head.front = head_size;
- }
-
- if (head.front == head.tail) {
- head.is_empty = true;
- }
- //Write head back
- bufferlist bl_head;
- encode(head, bl_head);
- CLS_LOG(1, "INFO: cls_enqueue: Writing head of size: %u and front offset is: %lu\n", bl_head.length(), head.front);
- ret = cls_cxx_write2(hctx, sizeof(uint64_t), bl_head.length(), &bl_head, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
- if (ret < 0) {
- CLS_LOG(1, "INFO: cls_enqueue: Writing head returned error: %d \n", ret);
- return ret;
- }
-
- return 0;
-}
-
int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
//get head and its size
return cls_enqueue(hctx, in, out);
}
-static int cls_gc_dequeue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- int r = cls_dequeue(hctx, in, out);
- if (r < 0)
- return r;
-
- cls_rgw_gc_obj_info data;
- auto iter = out->cbegin();
- try {
- decode(data, iter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_gc_dequeue(): failed to decode entry\n");
- return -EINVAL;
- }
-
- CLS_LOG(1, "INFO: tag of gc info is %s\n", data.tag.c_str());
-
- return 0;
-}
-
static int cls_gc_queue_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(1, "INFO: cls_gc_queue_list(): Entered cls_gc_queue_list \n");
cls_method_handle_t h_gc_create_queue;
cls_method_handle_t h_gc_init_queue;
cls_method_handle_t h_gc_enqueue;
- cls_method_handle_t h_gc_dequeue;
cls_method_handle_t h_gc_queue_list_entries;
cls_method_handle_t h_gc_queue_remove_entries;
cls_method_handle_t h_gc_queue_update_entry;
cls_register_cxx_method(h_class, GC_CREATE_QUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_create_queue, &h_gc_create_queue);
cls_register_cxx_method(h_class, GC_INIT_QUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_init_queue, &h_gc_init_queue);
cls_register_cxx_method(h_class, GC_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_enqueue, &h_gc_enqueue);
- cls_register_cxx_method(h_class, GC_DEQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_dequeue, &h_gc_dequeue);
cls_register_cxx_method(h_class, GC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_gc_queue_list, &h_gc_queue_list_entries);
cls_register_cxx_method(h_class, GC_QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_queue_remove, &h_gc_queue_remove_entries);
cls_register_cxx_method(h_class, GC_QUEUE_UPDATE_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_gc_queue_update_entry, &h_gc_queue_update_entry);