--- /dev/null
+#include <common/perf_counters_collection.h>
+
+#include "test/librados/test_cxx.h"
+#include "test/librados/testcase_cxx.h"
+#include "crimson_utils.h"
+#include "cls/fifo/cls_fifo_ops.h"
+#include "cls/version/cls_version_ops.h"
+
+using namespace std;
+using namespace librados;
+
+typedef RadosTestPP LibRadosSplitOpPP;
+typedef RadosTestECPP LibRadosSplitOpECPP;
+
+// After a write is committed, it isn't necessarily true that the log is
+// committed. We do a read of the written area, which allows us to be
+// sure that the shards have all received the message that the log can be
+// committed, allowing us to test split ops with certainty that it won't be
+// bounced due to unstability.
+void RadosTestPPBase::ensure_log_committed(const char* oid, uint64_t offset, uint64_t length) {
+ ObjectReadOperation read;
+ read.read(offset, length, NULL, NULL);
+
+ bufferlist bl;
+ int rc = ioctx.operate(oid, &read, &bl);
+ ASSERT_EQ(0, rc);
+}
+
+TEST_P(LibRadosSplitOpPP, BigRead) {
+ bufferlist bl;
+ bl.append_zero(512*1024);
+ ObjectWriteOperation write1, write2;
+ write1.write(0, bl);
+ uint32_t hash_position;
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, "foo", &write1));
+ ASSERT_EQ(0, ioctx.get_object_pg_hash_position2("foo", &hash_position));
+
+ std::string other_object = "other";
+ while (true) {
+ uint32_t hash_position2;
+ ASSERT_EQ(0, ioctx.get_object_pg_hash_position2(other_object, &hash_position2));
+ if (hash_position == hash_position2) {
+ break;
+ }
+ other_object += ".";
+ }
+ // The second write flushes the commit of the first.
+ write2.write(0, bl);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, other_object, &write2));
+
+
+ ObjectReadOperation read;
+ read.read(0, bl.length(), NULL, NULL);
+ ASSERT_TRUE(AssertOperateWithSplitOp(0, 3, "foo", &read, &bl, librados::OPERATION_BALANCE_READS));
+}
+
+TEST_P(LibRadosSplitOpPP, ReadTwoShards) {
+ // Read the osd_min_split_replica_read_size config value
+ std::string min_split_size_str;
+ ASSERT_EQ(0, cluster.conf_get("osd_min_split_replica_read_size", min_split_size_str));
+ uint64_t min_split_size = std::stoull(min_split_size_str);
+
+ // Write data large enough to cover multiple shards
+ bufferlist bl;
+ bl.append_zero(min_split_size * 3);
+ ObjectWriteOperation write1, write2;
+ write1.write(0, bl);
+ uint32_t hash_position;
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, "foo", &write1));
+ ASSERT_EQ(0, ioctx.get_object_pg_hash_position2("foo", &hash_position));
+
+ std::string other_object = "other";
+ while (true) {
+ uint32_t hash_position2;
+ ASSERT_EQ(0, ioctx.get_object_pg_hash_position2(other_object, &hash_position2));
+ if (hash_position == hash_position2) {
+ break;
+ }
+ other_object += ".";
+ }
+ // The second write flushes the commit of the first.
+ write2.write(0, bl);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, other_object, &write2));
+
+ // Test 1: Read exactly osd_min_split_replica_read_size - should NOT split
+ {
+ ObjectReadOperation read;
+ bufferlist read_bl;
+ read.read(0, min_split_size, NULL, NULL);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, "foo", &read, &read_bl, librados::OPERATION_BALANCE_READS));
+ }
+
+ // Test 2: Read osd_min_split_replica_read_size * 2 - 1 - should NOT split
+ {
+ ObjectReadOperation read;
+ bufferlist read_bl;
+ read.read(0, min_split_size * 2 - 1, NULL, NULL);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, "foo", &read, &read_bl, librados::OPERATION_BALANCE_READS));
+ }
+
+ // Test 3: Read exactly osd_min_split_replica_read_size * 2 - should split into 2
+ {
+ ObjectReadOperation read;
+ bufferlist read_bl;
+ read.read(0, min_split_size * 2, NULL, NULL);
+ ASSERT_TRUE(AssertOperateWithSplitOp(0, 2, "foo", &read, &read_bl, librados::OPERATION_BALANCE_READS));
+ }
+
+ // Test 4: Read osd_min_split_replica_read_size * 3 - 1 - should split into 2
+ {
+ ObjectReadOperation read;
+ bufferlist read_bl;
+ read.read(0, min_split_size * 3 - 1, NULL, NULL);
+ ASSERT_TRUE(AssertOperateWithSplitOp(0, 2, "foo", &read, &read_bl, librados::OPERATION_BALANCE_READS));
+ }
+
+ // Test 5: Read osd_min_split_replica_read_size * 3 - should split into 3
+ {
+ ObjectReadOperation read;
+ bufferlist read_bl;
+ read.read(0, min_split_size * 3, NULL, NULL);
+ ASSERT_TRUE(AssertOperateWithSplitOp(0, 3, "foo", &read, &read_bl, librados::OPERATION_BALANCE_READS));
+ }
+}
+
+TEST_P(LibRadosSplitOpECPP, ReadWithVersion) {
+ SKIP_IF_CRIMSON();
+ bufferlist bl;
+ bl.append("ceph");
+ ObjectWriteOperation write1;
+ write1.write(0, bl);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, "foo", &write1));
+
+ ObjectReadOperation read;
+ read.read(0, bl.length(), NULL, NULL);
+
+ bufferlist exec_inbl, exec_outbl;
+ int exec_rval;
+ read.exec("version", "read", exec_inbl, &exec_outbl, &exec_rval);
+ ASSERT_TRUE(AssertOperateWithSplitOp(0, "foo", &read, &bl, librados::OPERATION_BALANCE_READS));
+ ASSERT_EQ(0, memcmp(bl.c_str(), "ceph", 4));
+ ASSERT_EQ(0, exec_rval);
+ cls_version_read_ret exec_version;
+ auto iter = exec_outbl.cbegin();
+ decode(exec_version, iter);
+ ASSERT_EQ(0, exec_version.objv.ver);
+ ASSERT_EQ("", exec_version.objv.tag);
+}
+
+TEST_P(LibRadosSplitOpECPP, SmallRead) {
+ SKIP_IF_CRIMSON();
+ bufferlist bl;
+ bl.append("ceph");
+ ObjectWriteOperation write1;
+ write1.write(0, bl);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, "foo", &write1));
+
+ ioctx.set_no_version_on_read(true);
+ ObjectReadOperation read;
+ read.read(0, bl.length(), NULL, NULL);
+ ASSERT_TRUE(AssertOperateWithSplitOp(0, "foo", &read, &bl, librados::OPERATION_BALANCE_READS));
+ ioctx.set_no_version_on_read(false);
+}
+
+TEST_P(LibRadosSplitOpECPP, ReadTwoShards) {
+ SKIP_IF_CRIMSON();
+ bufferlist bl;
+ bl.append_zero(8*1024);
+ ObjectWriteOperation write1;
+ write1.write(0, bl);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, "foo", &write1));
+
+ ensure_log_committed("foo", 0, bl.length());
+
+ ioctx.set_no_version_on_read(true);
+ ObjectReadOperation read;
+ read.read(0, bl.length(), NULL, NULL);
+ ASSERT_TRUE(AssertOperateWithSplitOp(0, 2, "foo", &read, &bl, librados::OPERATION_BALANCE_READS));
+ ioctx.set_no_version_on_read(false);
+}
+
+TEST_P(LibRadosSplitOpECPP, ReadSecondShard) {
+ SKIP_IF_CRIMSON();
+ bufferlist bl;
+ bl.append_zero(8*1024);
+ ObjectWriteOperation write1;
+ write1.write(0, bl);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, "foo", &write1));
+
+ ensure_log_committed("foo", 0, bl.length());
+
+ ioctx.set_no_version_on_read(true);
+ ObjectReadOperation read;
+ read.read(4*1024, 4*1024, NULL, NULL);
+ ASSERT_TRUE(AssertOperateWithSplitOp(0, "foo", &read, &bl, librados::OPERATION_BALANCE_READS));
+ ioctx.set_no_version_on_read(false);
+}
+
+TEST_P(LibRadosSplitOpECPP, ReadSecondShardWithVersion) {
+ SKIP_IF_CRIMSON();
+ bufferlist bl;
+ bl.append_zero(8*1024);
+ ObjectWriteOperation write1;
+ write1.write(0, bl);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, "foo", &write1));
+
+ ensure_log_committed("foo", 0, bl.length());
+
+ ObjectReadOperation read;
+ read.read(4*1024, 4*1024, NULL, NULL);
+ ASSERT_TRUE(AssertOperateWithSplitOp(0, 2, "foo", &read, &bl, librados::OPERATION_BALANCE_READS));
+}
+
+TEST_P(LibRadosSplitOpECPP, ReadWithIllegalClsOp) {
+ SKIP_IF_CRIMSON();
+ bufferlist bl;
+ bl.append("ceph");
+ ObjectWriteOperation write1;
+ write1.write(0, bl);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, "foo", &write1));
+
+ bufferlist new_bl;
+ new_bl.append("CEPH");
+ ObjectWriteOperation write2;
+ bufferlist exec_inbl, exec_outbl;
+ int exec_rval;
+ rados::cls::fifo::op::init_part op;
+ encode(op, exec_inbl);
+ write2.exec("fifo", "init_part", exec_inbl, &exec_outbl, &exec_rval);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(-EOPNOTSUPP, "foo", &write2));
+}
+
+TEST_P(LibRadosSplitOpECPP, XattrReads) {
+ SKIP_IF_CRIMSON();
+ bufferlist bl, attr_bl, attr_read_bl;
+ std::string attr_key = "my_key";
+ std::string attr_value = "my_attr";
+
+ bl.append("ceph");
+ ObjectWriteOperation write1;
+ write1.write(0, bl);
+ encode(attr_value, attr_bl);
+ write1.setxattr(attr_key.c_str(), attr_bl);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, "foo", &write1));
+
+ ObjectReadOperation read;
+ read.read(0, bl.length(), NULL, NULL);
+
+ int getxattr_rval, getxattrs_rval;
+ read.getxattr(attr_key.c_str(), &attr_read_bl, &getxattr_rval);
+ std::map<string, bufferlist> pattrs{ {"", {}}, {attr_key, {}}};
+ read.getxattrs(&pattrs, &getxattrs_rval);
+ read.cmpxattr(attr_key.c_str(), CEPH_OSD_CMPXATTR_OP_EQ, attr_bl);
+
+ ASSERT_TRUE(AssertOperateWithSplitOp(1, "foo", &read, &bl, librados::OPERATION_BALANCE_READS));
+ ASSERT_EQ(0, memcmp(bl.c_str(), "ceph", 4));
+ ASSERT_EQ(0, getxattr_rval);
+ ASSERT_EQ(0, getxattrs_rval);
+}
+
+TEST_P(LibRadosSplitOpECPP, Stat) {
+ SKIP_IF_CRIMSON();
+ bufferlist bl, attr_bl, attr_read_bl;
+ std::string attr_key = "my_key";
+ std::string attr_value = "my_attr";
+
+ bl.append("ceph");
+ ObjectWriteOperation write1;
+ write1.write(0, bl);
+ encode(attr_value, attr_bl);
+ write1.setxattr(attr_key.c_str(), attr_bl);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, "foo", &write1));
+
+ ObjectReadOperation read;
+ read.read(0, bl.length(), NULL, NULL);
+
+ uint64_t size;
+ timespec time;
+ time.tv_nsec = 0;
+ time.tv_sec = 0;
+ int stat_rval;
+ read.stat2(&size, &time, &stat_rval);
+
+ ASSERT_TRUE(AssertOperateWithSplitOp(0, "foo", &read, &bl, librados::OPERATION_BALANCE_READS));
+ ASSERT_EQ(0, memcmp(bl.c_str(), "ceph", 4));
+ ASSERT_EQ(0, stat_rval);
+ ASSERT_EQ(4, size);
+ ASSERT_NE(0, time.tv_nsec);
+ ASSERT_NE(0, time.tv_sec);
+}
+
+TEST_P(LibRadosSplitOpPP, CancelReplica)
+{
+ SKIP_IF_CRIMSON();
+ if (!split_ops) {
+ GTEST_SKIP() << "Inject requires split_ops!";
+ }
+ bufferlist bl, attr_bl, attr_read_bl;
+ uint64_t length = 512 * 1024;
+ const std::string oid = "foo";
+
+ bl.append_zero(length);
+ ObjectWriteOperation write1;
+ write1.write(0, bl);
+ ASSERT_TRUE(AssertOperateWithoutSplitOp(0, oid, &write1));
+
+ int ret = 0;
+ auto c = std::unique_ptr<AioCompletion>{Rados::aio_create_completion()};
+ ObjectReadOperation op;
+ int osd_ret;
+ bufferlist outval;
+ op.read(0, length, &outval, &osd_ret);
+ ioctx.aio_operate(oid, c.get(), &op, librados::OPERATION_BALANCE_READS, nullptr);
+
+ EXPECT_EQ(0, c->cancel());
+ {
+ TestAlarm alarm;
+ EXPECT_EQ(0, c->wait_for_complete());
+ }
+ ret = c->get_return_value();
+
+ EXPECT_EQ(-ECANCELED, ret);
+}
+
+INSTANTIATE_TEST_SUITE_P_REPLICA(LibRadosSplitOpPP);
+INSTANTIATE_TEST_SUITE_P_EC(LibRadosSplitOpECPP);