return pin;
}
+ using _overwrite_pin_iertr = TransactionManager::get_pin_iertr;
+ using _overwrite_pin_ret = _overwrite_pin_iertr::future<
+ std::tuple<LBAMappingRef, TestBlockRef, LBAMappingRef>>;
+ _overwrite_pin_ret _overwrite_pin(
+ Transaction &t,
+ LBAMappingRef &&opin,
+ extent_len_t new_offset,
+ extent_len_t new_len,
+ ceph::bufferlist &bl) {
+ auto o_laddr = opin->get_key();
+ auto o_len = opin->get_length();
+ if (new_offset != 0 && o_len != new_offset + new_len) {
+ return tm->remap_pin<TestBlock, 2>(
+ t,
+ std::move(opin),
+ std::array{
+ remap_entry(
+ 0,
+ new_offset),
+ remap_entry(
+ new_offset + new_len,
+ o_len - new_offset - new_len)
+ }
+ ).si_then([this, new_offset, new_len, o_laddr, &t, &bl](auto ret) {
+ return tm->alloc_extent<TestBlock>(t, o_laddr + new_offset, new_len
+ ).si_then([this, ret = std::move(ret), new_len,
+ new_offset, o_laddr, &t, &bl](auto ext) mutable {
+ ceph_assert(ret.size() == 2);
+ auto iter = bl.cbegin();
+ iter.copy(new_len, ext->get_bptr().c_str());
+ auto r_laddr = o_laddr + new_offset + new_len;
+ // old pins expired after alloc new extent, need to get it.
+ return tm->get_pin(t, o_laddr
+ ).si_then([this, &t, ext = std::move(ext), r_laddr](auto lpin) mutable {
+ return tm->get_pin(t, r_laddr
+ ).si_then([lpin = std::move(lpin), ext = std::move(ext)]
+ (auto rpin) mutable {
+ return _overwrite_pin_iertr::make_ready_future<
+ std::tuple<LBAMappingRef, TestBlockRef, LBAMappingRef>>(
+ std::make_tuple(
+ std::move(lpin), std::move(ext), std::move(rpin)));
+ });
+ });
+ });
+ });
+ } else if (new_offset == 0 && o_len != new_offset + new_len) {
+ return tm->remap_pin<TestBlock, 1>(
+ t,
+ std::move(opin),
+ std::array{
+ remap_entry(
+ new_offset + new_len,
+ o_len - new_offset - new_len)
+ }
+ ).si_then([this, new_offset, new_len, o_laddr, &t, &bl](auto ret) {
+ return tm->alloc_extent<TestBlock>(t, o_laddr + new_offset, new_len
+ ).si_then([this, ret = std::move(ret), new_offset, new_len,
+ o_laddr, &t, &bl](auto ext) mutable {
+ ceph_assert(ret.size() == 1);
+ auto iter = bl.cbegin();
+ iter.copy(new_len, ext->get_bptr().c_str());
+ auto r_laddr = o_laddr + new_offset + new_len;
+ return tm->get_pin(t, r_laddr
+ ).si_then([ext = std::move(ext)](auto rpin) mutable {
+ return _overwrite_pin_iertr::make_ready_future<
+ std::tuple<LBAMappingRef, TestBlockRef, LBAMappingRef>>(
+ std::make_tuple(
+ nullptr, std::move(ext), std::move(rpin)));
+ });
+ });
+ });
+ } else if (new_offset != 0 && o_len == new_offset + new_len) {
+ return tm->remap_pin<TestBlock, 1>(
+ t,
+ std::move(opin),
+ std::array{
+ remap_entry(
+ 0,
+ new_offset)
+ }
+ ).si_then([this, new_offset, new_len, o_laddr, &t, &bl](auto ret) {
+ return tm->alloc_extent<TestBlock>(t, o_laddr + new_offset, new_len
+ ).si_then([this, ret = std::move(ret), new_len, o_laddr, &t, &bl]
+ (auto ext) mutable {
+ ceph_assert(ret.size() == 1);
+ auto iter = bl.cbegin();
+ iter.copy(new_len, ext->get_bptr().c_str());
+ return tm->get_pin(t, o_laddr
+ ).si_then([ext = std::move(ext)](auto lpin) mutable {
+ return _overwrite_pin_iertr::make_ready_future<
+ std::tuple<LBAMappingRef, TestBlockRef, LBAMappingRef>>(
+ std::make_tuple(
+ std::move(lpin), std::move(ext), nullptr));
+ });
+ });
+ });
+ } else {
+ ceph_abort("impossible");
+ return _overwrite_pin_iertr::make_ready_future<
+ std::tuple<LBAMappingRef, TestBlockRef, LBAMappingRef>>(
+ std::make_tuple(nullptr, nullptr, nullptr));
+ }
+ }
+
+ using overwrite_pin_ret = std::tuple<LBAMappingRef, TestBlockRef, LBAMappingRef>;
+ overwrite_pin_ret overwrite_pin(
+ test_transaction_t &t,
+ LBAMappingRef &&opin,
+ extent_len_t new_offset,
+ extent_len_t new_len,
+ ceph::bufferlist &bl) {
+ if (t.t->is_conflicted()) {
+ return std::make_tuple<LBAMappingRef, TestBlockRef, LBAMappingRef>(
+ nullptr, nullptr, nullptr);
+ }
+ auto o_laddr = opin->get_key();
+ auto o_paddr = opin->get_val();
+ auto o_len = opin->get_length();
+ auto res = with_trans_intr(*(t.t), [&](auto& trans) {
+ return _overwrite_pin(
+ trans, std::move(opin), new_offset, new_len, bl);
+ }).handle_error(crimson::ct_error::eagain::handle([] {
+ return std::make_tuple<LBAMappingRef, TestBlockRef, LBAMappingRef>(
+ nullptr, nullptr, nullptr);
+ }), crimson::ct_error::pass_further_all{}).unsafe_get0();
+ if (t.t->is_conflicted()) {
+ return std::make_tuple<LBAMappingRef, TestBlockRef, LBAMappingRef>(
+ nullptr, nullptr, nullptr);
+ }
+ test_mappings.dec_ref(o_laddr, t.mapping_delta);
+ EXPECT_FALSE(test_mappings.contains(o_laddr, t.mapping_delta));
+ auto &[lpin, ext, rpin] = res;
+
+ EXPECT_TRUE(ext);
+ EXPECT_TRUE(lpin || rpin);
+ EXPECT_TRUE(o_len > ext->get_length());
+ if (lpin) {
+ EXPECT_EQ(lpin->get_key(), o_laddr);
+ EXPECT_EQ(lpin->get_val(), o_paddr);
+ EXPECT_EQ(lpin->get_length(), new_offset);
+ auto lext = try_read_pin(t, lpin->duplicate());
+ if (lext) {
+ test_mappings.alloced(lpin->get_key(), *lext, t.mapping_delta);
+ EXPECT_TRUE(lext->is_exist_clean());
+ } else {
+ ceph_assert(t.t->is_conflicted());
+ return std::make_tuple<LBAMappingRef, TestBlockRef, LBAMappingRef>(
+ nullptr, nullptr, nullptr);
+ }
+ }
+ EXPECT_EQ(ext->get_laddr(), o_laddr + new_offset);
+ EXPECT_EQ(ext->get_length(), new_len);
+ test_mappings.alloced(ext->get_laddr(), *ext, t.mapping_delta);
+ if (rpin) {
+ EXPECT_EQ(rpin->get_key(), o_laddr + new_offset + new_len);
+ EXPECT_EQ(rpin->get_val(), o_paddr.add_offset(new_offset)
+ .add_offset(new_len));
+ EXPECT_EQ(rpin->get_length(), o_len - new_offset - new_len);
+ auto rext = try_read_pin(t, rpin->duplicate());
+ if (rext) {
+ test_mappings.alloced(rpin->get_key(), *rext, t.mapping_delta);
+ EXPECT_TRUE(rext->is_exist_clean());
+ } else {
+ ceph_assert(t.t->is_conflicted());
+ return std::make_tuple<LBAMappingRef, TestBlockRef, LBAMappingRef>(
+ nullptr, nullptr, nullptr);
+ }
+ }
+ return std::make_tuple<LBAMappingRef, TestBlockRef, LBAMappingRef>(
+ std::move(lpin), std::move(ext), std::move(rpin));
+ }
+
void test_remap_pin() {
run_async([this] {
constexpr size_t l_offset = 32 << 10;
});
}
+ void test_overwrite_pin() {
+ run_async([this] {
+ constexpr size_t m_offset = 8 << 10;
+ constexpr size_t m_len = 56 << 10;
+ constexpr size_t l_offset = 64 << 10;
+ constexpr size_t l_len = 64 << 10;
+ constexpr size_t r_offset = 128 << 10;
+ constexpr size_t r_len = 64 << 10;
+ {
+ auto t = create_transaction();
+ auto m_ext = alloc_extent(t, m_offset, m_len);
+ m_ext->set_contents('a', 0 << 10, 8 << 10);
+ m_ext->set_contents('b', 16 << 10, 4 << 10);
+ m_ext->set_contents('c', 36 << 10, 4 << 10);
+ m_ext->set_contents('d', 52 << 10, 4 << 10);
+
+ auto l_ext = alloc_extent(t, l_offset, l_len);
+ auto r_ext = alloc_extent(t, r_offset, r_len);
+ submit_transaction(std::move(t));
+ }
+ {
+ auto t = create_transaction();
+ auto mpin = get_pin(t, m_offset);
+ auto lpin = get_pin(t, l_offset);
+ auto rpin = get_pin(t, r_offset);
+
+ bufferlist mbl1, mbl2, mbl3;
+ mbl1.append(ceph::bufferptr(ceph::buffer::create(8 << 10, 0)));
+ mbl2.append(ceph::bufferptr(ceph::buffer::create(16 << 10, 0)));
+ mbl3.append(ceph::bufferptr(ceph::buffer::create(12 << 10, 0)));
+ auto [mlp1, mext1, mrp1] = overwrite_pin(
+ t, std::move(mpin), 8 << 10 , 8 << 10, mbl1);
+ auto [mlp2, mext2, mrp2] = overwrite_pin(
+ t, std::move(mrp1), 4 << 10 , 16 << 10, mbl2);
+ auto [mlpin3, me3, mrpin3] = overwrite_pin(
+ t, std::move(mrp2), 4 << 10 , 12 << 10, mbl3);
+ auto mlext1 = get_extent(t, mlp1->get_key(), mlp1->get_length());
+ auto mlext2 = get_extent(t, mlp2->get_key(), mlp2->get_length());
+ auto mlext3 = get_extent(t, mlpin3->get_key(), mlpin3->get_length());
+ auto mrext3 = get_extent(t, mrpin3->get_key(), mrpin3->get_length());
+ EXPECT_EQ('a', mlext1->get_bptr().c_str()[0]);
+ EXPECT_EQ('b', mlext2->get_bptr().c_str()[0]);
+ EXPECT_EQ('c', mlext3->get_bptr().c_str()[0]);
+ EXPECT_EQ('d', mrext3->get_bptr().c_str()[0]);
+ auto mutate_mlext1 = mutate_extent(t, mlext1);
+ auto mutate_mlext2 = mutate_extent(t, mlext2);
+ auto mutate_mlext3 = mutate_extent(t, mlext3);
+ auto mutate_mrext3 = mutate_extent(t, mrext3);
+ ASSERT_TRUE(mutate_mlext1->is_exist_mutation_pending());
+ ASSERT_TRUE(mutate_mlext2->is_exist_mutation_pending());
+ ASSERT_TRUE(mutate_mlext3->is_exist_mutation_pending());
+ ASSERT_TRUE(mutate_mrext3->is_exist_mutation_pending());
+ ASSERT_TRUE(mutate_mlext1.get() == mlext1.get());
+ ASSERT_TRUE(mutate_mlext2.get() == mlext2.get());
+ ASSERT_TRUE(mutate_mlext3.get() == mlext3.get());
+ ASSERT_TRUE(mutate_mrext3.get() == mrext3.get());
+
+ bufferlist lbl1, rbl1;
+ lbl1.append(ceph::bufferptr(ceph::buffer::create(32 << 10, 0)));
+ auto [llp1, lext1, lrp1] = overwrite_pin(
+ t, std::move(lpin), 0 , 32 << 10, lbl1);
+ EXPECT_FALSE(llp1);
+ EXPECT_TRUE(lrp1);
+ EXPECT_TRUE(lext1);
+
+ rbl1.append(ceph::bufferptr(ceph::buffer::create(32 << 10, 0)));
+ auto [rlp1, rext1, rrp1] = overwrite_pin(
+ t, std::move(rpin), 32 << 10 , 32 << 10, rbl1);
+ EXPECT_TRUE(rlp1);
+ EXPECT_TRUE(rext1);
+ EXPECT_FALSE(rrp1);
+
+ submit_transaction(std::move(t));
+ check();
+ }
+ replay();
+ check();
+ });
+ }
+
void test_remap_pin_concurrent() {
run_async([this] {
constexpr unsigned REMAP_NUM = 32;
check();
});
}
+
+ void test_overwrite_pin_concurrent() {
+ run_async([this] {
+ constexpr unsigned REMAP_NUM = 32;
+ constexpr size_t offset = 0;
+ constexpr size_t length = 256 << 10;
+ {
+ auto t = create_transaction();
+ auto extent = alloc_extent(t, offset, length);
+ ASSERT_EQ(length, extent->get_length());
+ submit_transaction(std::move(t));
+ }
+ int success = 0;
+ int early_exit = 0;
+ int conflicted = 0;
+
+ seastar::parallel_for_each(
+ boost::make_counting_iterator(0u),
+ boost::make_counting_iterator(REMAP_NUM),
+ [&](auto) {
+ return seastar::async([&] {
+ uint32_t pieces = std::uniform_int_distribution<>(6, 31)(gen);
+ if (pieces % 2 == 1) {
+ pieces++;
+ }
+ std::list<uint32_t> split_points;
+ for (uint32_t i = 0; i < pieces; i++) {
+ auto p = std::uniform_int_distribution<>(1, 120)(gen);
+ split_points.push_back(p - p % 4);
+ }
+ split_points.sort();
+
+ auto t = create_transaction();
+ auto pin0 = try_get_pin(t, offset);
+ if (!pin0 || pin0->get_length() != length) {
+ early_exit++;
+ return;
+ }
+
+ auto empty_transaction = true;
+ auto last_rpin = pin0->duplicate();
+ ASSERT_TRUE(!split_points.empty());
+ while(!split_points.empty()) {
+ // new overwrite area: start_off ~ end_off
+ auto start_off = split_points.front();
+ split_points.pop_front();
+ auto end_off = split_points.front();
+ split_points.pop_front();
+ ASSERT_TRUE(start_off <= end_off);
+ if (((end_off << 10) == pin0->get_key() + pin0->get_length())
+ || (start_off == end_off)) {
+ if (split_points.empty() && empty_transaction) {
+ early_exit++;
+ return;
+ }
+ continue;
+ }
+ empty_transaction = false;
+ auto new_off = (start_off << 10) - last_rpin->get_key();
+ auto new_len = (end_off - start_off) << 10;
+ bufferlist bl;
+ bl.append(ceph::bufferptr(ceph::buffer::create(new_len, 0)));
+ auto [lpin, ext, rpin] = overwrite_pin(
+ t, last_rpin->duplicate(), new_off, new_len, bl);
+ if (!ext) {
+ conflicted++;
+ return;
+ }
+ // lpin is nullptr might not cause by confliction,
+ // it might just not exist.
+ if (lpin) {
+ auto lext = try_get_extent(t, lpin->get_key());
+ if (!lext) {
+ conflicted++;
+ return;
+ }
+ if (get_random_contents() % 2 == 0) {
+ auto lext1 = mutate_extent(t, lext);
+ ASSERT_TRUE(lext1->is_exist_mutation_pending());
+ }
+ }
+ ASSERT_TRUE(rpin);
+ last_rpin = rpin->duplicate();
+ }
+ auto last_rext = try_get_extent(t, last_rpin->get_key());
+ if (!last_rext) {
+ conflicted++;
+ return;
+ }
+ if (get_random_contents() % 2 == 0) {
+ auto last_rext1 = mutate_extent(t, last_rext);
+ ASSERT_TRUE(last_rext1->is_exist_mutation_pending());
+ }
+
+ if (try_submit_transaction(std::move(t))) {
+ success++;
+ logger().info("transaction {} submit the transction",
+ static_cast<void*>(t.t.get()));
+ } else {
+ conflicted++;
+ }
+ });
+ }).handle_exception([](std::exception_ptr e) {
+ logger().info("{}", e);
+ }).get0();
+ logger().info("test_overwrite_pin_concurrent: "
+ "early_exit {} conflicted {} success {}",
+ early_exit, conflicted, success);
+ ASSERT_TRUE(success == 1 || early_exit == REMAP_NUM);
+ ASSERT_EQ(success + conflicted + early_exit, REMAP_NUM);
+ replay();
+ check();
+ });
+ }
};
struct tm_single_device_test_t :
{
test_remap_pin();
}
+TEST_P(tm_single_device_test_t, test_overwrite_pin)
+{
+ test_overwrite_pin();
+}
TEST_P(tm_single_device_test_t, test_remap_pin_concurrent)
{
test_remap_pin_concurrent();
}
+TEST_P(tm_single_device_test_t, test_overwrite_pin_concurrent)
+{
+ test_overwrite_pin_concurrent();
+}
INSTANTIATE_TEST_SUITE_P(
transaction_manager_test,
tm_single_device_test_t,