#include "include/types.h"
-#include <errno.h>
-
#include "objclass/objclass.h"
#include "cls/queue/cls_queue_types.h"
#include "cls/queue/cls_queue_ops.h"
int ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
if (ret < 0) {
- CLS_LOG(5, "ERROR: queue_write_head: failed to write head\n");
+ CLS_LOG(5, "ERROR: queue_write_head: failed to write head");
return ret;
}
return 0;
bufferlist bl_head;
const auto ret = cls_cxx_read(hctx, start_offset, chunk_size, &bl_head);
if (ret < 0) {
- CLS_LOG(5, "ERROR: queue_read_head: failed to read head\n");
+ CLS_LOG(5, "ERROR: queue_read_head: failed to read head");
return ret;
}
+ if (ret == 0) {
+ CLS_LOG(20, "INFO: queue_read_head: empty head, not initialized yet");
+ return -EINVAL;
+ }
//Process the chunk of data read
auto it = bl_head.cbegin();
try {
decode(queue_head_start, it);
} catch (buffer::error& err) {
- CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start: %s \n", err.what());
+ CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start: %s", err.what());
return -EINVAL;
}
if (queue_head_start != QUEUE_HEAD_START) {
- CLS_LOG(0, "ERROR: queue_read_head: invalid queue start\n");
+ CLS_LOG(0, "ERROR: queue_read_head: invalid queue start");
return -EINVAL;
}
try {
decode(encoded_len, it);
} catch (buffer::error& err) {
- CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size: %s\n", err.what());
+ CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size: %s", err.what());
return -EINVAL;
}
bufferlist bl_remaining_head;
const auto ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
- CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head\n");
+ CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head");
return ret;
}
bl_head.claim_append(bl_remaining_head);
try {
decode(head, it);
} catch (buffer::error& err) {
- CLS_LOG(0, "ERROR: queue_read_head: failed to decode head: %s\n", err.what());
+ CLS_LOG(0, "ERROR: queue_read_head: failed to decode head: %s", err.what());
return -EINVAL;
}
op_ret.queue_capacity = head.queue_size - head.max_head_size;
- CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu\n", op_ret.queue_capacity);
+ CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu", op_ret.queue_capacity);
return 0;
}
int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head)
{
if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) {
- CLS_LOG(0, "ERROR: No space left in queue\n");
+ CLS_LOG(0, "ERROR: No space left in queue");
return -ENOSPC;
}
encode(data_size, bl);
bl.claim_append(bl_data);
- CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %lu\n", bl.length(), data_size);
+ CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %lu", bl.length(), data_size);
if (head.tail.offset >= head.front.offset) {
// check if data can fit in the remaining space in queue
if ((head.tail.offset + bl.length()) <= head.queue_size) {
- CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u\n", head.tail.to_str().c_str(), bl.length());
+ CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length());
//write data size and data at tail offset
auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
bufferlist bl_data_before_wrap;
bl.splice(0, size_before_wrap, &bl_data_before_wrap);
//write spliced (data size and data) at tail offset
- CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl_data_before_wrap.length());
+ CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl_data_before_wrap.length());
auto ret = cls_cxx_write2(hctx, head.tail.offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
return ret;
head.tail.offset = head.max_head_size;
head.tail.gen += 1;
//write remaining data at tail offset after wrapping around
- CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl.length());
+ CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl.length());
ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
return ret;
}
} else if (head.front.offset > head.tail.offset) {
if ((head.tail.offset + bl.length()) <= head.front.offset) {
- CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u\n\n", head.tail.to_str().c_str(), bl.length());
+ CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length());
//write data size and data at tail offset
auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
if (ret < 0) {
}
head.tail.offset += bl.length();
} else {
- CLS_LOG(0, "ERROR: No space left in queue\n");
+ CLS_LOG(0, "ERROR: No space left in queue");
// return queue full error
return -ENOSPC;
}
head.tail.offset = head.max_head_size;
head.tail.gen += 1;
}
- CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s \n", head.tail.to_str().c_str());
+ CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s", head.tail.to_str().c_str());
} //end - for
return 0;
{
// If queue is empty, return from here
if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
- CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s\n", head.front.to_str().c_str());
+ CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s", head.front.to_str().c_str());
op_ret.next_marker = head.front.to_str();
op_ret.is_truncated = false;
return 0;
}
}
- CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s\n", head.front.to_str().c_str(), head.tail.to_str().c_str());
+ CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s", head.front.to_str().c_str(), head.tail.to_str().c_str());
bool offset_populated = false, entry_start_processed = false;
uint64_t data_size = 0, num_ops = 0;
bufferlist bl;
do
{
- CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu\n", start_offset);
+ CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu", start_offset);
bufferlist bl_chunk;
//Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size
} else {
size_to_read = contiguous_data_size;
}
- CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu\n", size_to_read);
+ CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu", size_to_read);
if (size_to_read == 0) {
next_marker = head.tail;
op_ret.is_truncated = false;
//If there is leftover data from previous iteration, append new data to leftover data
uint64_t entry_start_offset = start_offset - bl.length();
- CLS_LOG(20, "INFO: queue_list_entries(): Entry start offset accounting for leftover data is %lu\n", entry_start_offset);
+ CLS_LOG(20, "INFO: queue_list_entries(): Entry start offset accounting for leftover data is %lu", entry_start_offset);
bl.claim_append(bl_chunk);
bl_chunk = std::move(bl);
- CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u\n", bl_chunk.length());
+ CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u", bl_chunk.length());
//Process the chunk of data read
unsigned index = 0;
auto it = bl_chunk.cbegin();
uint64_t size_to_process = bl_chunk.length();
do {
- CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu\n", index, size_to_process);
+ CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu", index, size_to_process);
cls_queue_entry entry;
ceph_assert(it.get_off() == index);
//Populate offset if not done in previous iteration
try {
decode(entry_start, it);
} catch (buffer::error& err) {
- CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start: %s\n", err.what());
+ CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start: %s", err.what());
return -EINVAL;
}
if (entry_start != QUEUE_ENTRY_START) {
- CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u\n", entry_start);
+ CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u", entry_start);
return -EINVAL;
}
index += sizeof(uint16_t);
try {
decode(data_size, it);
} catch (buffer::error& err) {
- CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size: %s\n", err.what());
+ CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size: %s", err.what());
return -EINVAL;
}
} else {
// Copy unprocessed data to bl
bl_chunk.splice(index, size_to_process, &bl);
offset_populated = true;
- CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!\n");
+ CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!");
break;
}
- CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu\n", data_size);
+ CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu", data_size);
index += sizeof(uint64_t);
size_to_process -= sizeof(uint64_t);
}
it.copy(size_to_process, bl);
offset_populated = true;
entry_start_processed = true;
- CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!\n");
+ CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!");
break;
}
op_ret.entries.emplace_back(entry);
entry_start = 0;
num_ops++;
if (num_ops == op.max) {
- CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n");
+ CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!");
break;
}
} while(index < bl_chunk.length());
if (num_ops == op.max) {
next_marker = cls_queue_marker{(entry_start_offset + index), gen};
- CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", next_marker.offset);
+ CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu", next_marker.offset);
break;
}
gen += 1;
wrap_around = false;
} else {
- CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n");
+ CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!");
next_marker = head.tail;
op_ret.is_truncated = false;
break;
op_ret.is_truncated = false;
}
- CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s\n", next_marker.to_str().c_str());
+ CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s", next_marker.to_str().c_str());
op_ret.next_marker = next_marker.to_str();
return 0;
cls_queue_marker end_marker;
end_marker.from_str(op.end_marker.c_str());
- CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s\n", end_marker.to_str().c_str());
+ CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s", end_marker.to_str().c_str());
//Zero out the entries that have been removed, to reclaim storage space
if (end_marker.offset > head.front.offset && end_marker.gen == head.front.gen) {
if (len > 0) {
auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
if (ret < 0) {
- CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
- CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head.front.to_str().c_str());
+ CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
+ CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str());
return ret;
}
}
if (len > 0) {
auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
if (ret < 0) {
- CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
- CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head.front.to_str().c_str());
+ CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
+ CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str());
return ret;
}
}
if (len > 0) {
auto ret = cls_cxx_write_zero(hctx, head.max_head_size, len);
if (ret < 0) {
- CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
- CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu\n", head.max_head_size);
+ CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
+ CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu", head.max_head_size);
return ret;
}
}
} else if ((head.front.offset == end_marker.offset) && (head.front.gen == end_marker.gen)) {
//no-op
} else {
- CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu\n", end_marker.to_str().c_str(), end_marker.gen);
+ CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu", end_marker.to_str().c_str(), end_marker.gen);
return -EINVAL;
}
head.front.gen += 1;
}
- CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s\n", head.front.to_str().c_str(), head.tail.to_str().c_str());
+ CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s", head.front.to_str().c_str(), head.tail.to_str().c_str());
return 0;
}