return 0;
}
+/**
+ * Input:
+ * @param soft_max_size (uint64_t)
+ * @param data (bufferlist) data to append
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ * @returns -EOVERFLOW if object size is equal or more than soft_max_size
+ */
+int journal_object_append(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t soft_max_size;
+ bufferlist data;
+ try {
+ auto iter = in->cbegin();
+ decode(soft_max_size, iter);
+ decode(data, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ uint64_t size = 0;
+ int r = cls_cxx_stat(hctx, &size, nullptr);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("append: failed to stat object: %s", cpp_strerror(r).c_str());
+ return r;
+ }
+
+ if (size >= soft_max_size) {
+ CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64,
+ size, soft_max_size);
+ return -EOVERFLOW;
+ }
+
+ auto offset = size;
+ r = cls_cxx_write2(hctx, offset, data.length(), &data,
+ CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+ if (r < 0) {
+ CLS_ERR("append: error when writing: %s", cpp_strerror(r).c_str());
+ return r;
+ }
+
+ if (cls_get_min_compatible_client(hctx) < ceph_release_t::octopus) {
+ return 0;
+ }
+
+ auto min_alloc_size = cls_get_osd_min_alloc_size(hctx);
+ if (min_alloc_size == 0) {
+ min_alloc_size = 8;
+ }
+
+ CLS_LOG(20, "pad to %" PRIu64, min_alloc_size);
+
+ auto end = offset + data.length();
+ auto new_end = round_up_to(end, min_alloc_size);
+ if (new_end == end) {
+ return 0;
+ }
+
+ r = cls_cxx_truncate(hctx, new_end);
+ if (r < 0) {
+ CLS_ERR("append: error when truncating: %s", cpp_strerror(r).c_str());
+ return r;
+ }
+
+ return 0;
+}
+
CLS_INIT(journal)
{
CLS_LOG(20, "Loaded journal class!");
cls_method_handle_t h_journal_tag_create;
cls_method_handle_t h_journal_tag_list;
cls_method_handle_t h_journal_object_guard_append;
+ cls_method_handle_t h_journal_object_append;
cls_register("journal", &h_class);
CLS_METHOD_RD | CLS_METHOD_WR,
journal_object_guard_append,
&h_journal_object_guard_append);
+ cls_register_cxx_method(h_class, "append", CLS_METHOD_RD | CLS_METHOD_WR,
+ journal_object_append, &h_journal_object_append);
}
using namespace cls::journal;
+static bool is_sparse_read_supported(librados::IoCtx &ioctx,
+ const std::string &oid) {
+ EXPECT_EQ(0, ioctx.create(oid, true));
+ bufferlist inbl;
+ inbl.append(std::string(1, 'X'));
+ EXPECT_EQ(0, ioctx.write(oid, inbl, inbl.length(), 1));
+ EXPECT_EQ(0, ioctx.write(oid, inbl, inbl.length(), 3));
+
+ std::map<uint64_t, uint64_t> m;
+ bufferlist outbl;
+ int r = ioctx.sparse_read(oid, m, outbl, 4, 0);
+ ioctx.remove(oid);
+
+ int expected_r = 2;
+ std::map<uint64_t, uint64_t> expected_m = {{1, 1}, {3, 1}};
+ bufferlist expected_outbl;
+ expected_outbl.append(std::string(2, 'X'));
+
+ return (r == expected_r && m == expected_m &&
+ outbl.contents_equal(expected_outbl));
+}
+
class TestClsJournal : public ::testing::Test {
public:
client::guard_append(&op2, 1);
ASSERT_EQ(-EOVERFLOW, ioctx.operate(oid, &op2));
}
+
+TEST_F(TestClsJournal, Append) {
+ librados::IoCtx ioctx;
+ ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+ std::string oid = get_temp_image_name();
+ ioctx.remove(oid);
+
+ bool sparse_read_supported = is_sparse_read_supported(ioctx, oid);
+
+ bufferlist bl;
+ bl.append("journal entry!");
+
+ librados::ObjectWriteOperation op1;
+ client::append(&op1, 1, bl);
+ ASSERT_EQ(0, ioctx.operate(oid, &op1));
+
+ librados::ObjectWriteOperation op2;
+ client::append(&op2, 1, bl);
+ ASSERT_EQ(-EOVERFLOW, ioctx.operate(oid, &op2));
+
+ bufferlist outbl;
+ ASSERT_LE(bl.length(), ioctx.read(oid, outbl, 0, 0));
+
+ bufferlist tmpbl;
+ tmpbl.substr_of(outbl, 0, bl.length());
+ ASSERT_TRUE(bl.contents_equal(tmpbl));
+
+ if (outbl.length() == bl.length()) {
+ std::cout << "padding is not enabled" << std::endl;
+ return;
+ }
+
+ tmpbl.clear();
+ tmpbl.substr_of(outbl, bl.length(), outbl.length() - bl.length());
+ ASSERT_TRUE(tmpbl.is_zero());
+
+ if (!sparse_read_supported) {
+ std::cout << "sparse_read is not supported" << std::endl;
+ return;
+ }
+
+ librados::ObjectWriteOperation op3;
+ client::append(&op3, 1 << 24, bl);
+ ASSERT_EQ(0, ioctx.operate(oid, &op3));
+
+ std::map<uint64_t, uint64_t> m;
+ uint64_t pad_len = outbl.length();
+ outbl.clear();
+ std::map<uint64_t, uint64_t> expected_m =
+ {{0, bl.length()}, {pad_len, bl.length()}};
+ ASSERT_EQ(expected_m.size(), ioctx.sparse_read(oid, m, outbl, 2 * pad_len, 0));
+ ASSERT_EQ(m, expected_m);
+
+ uint64_t buffer_offset = 0;
+ for (auto &it : m) {
+ tmpbl.clear();
+ tmpbl.substr_of(outbl, buffer_offset, it.second);
+ ASSERT_TRUE(bl.contents_equal(tmpbl));
+ buffer_offset += it.second;
+ }
+}