]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test/crimson/seastore/test_transaction_manager: add tests for concurrent IO
authorSamuel Just <sjust@redhat.com>
Mon, 11 Jan 2021 23:03:24 +0000 (15:03 -0800)
committerSamuel Just <sjust@redhat.com>
Mon, 1 Feb 2021 20:48:36 +0000 (12:48 -0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/test/crimson/seastore/test_transaction_manager.cc

index 9906f938a63ba6873419f75c37b95a1582961a13..91b39917dad48468f7fbff1e7fc2a7c388987489 100644 (file)
@@ -3,6 +3,8 @@
 
 #include <random>
 
+#include <boost/iterator/counting_iterator.hpp>
+
 #include "test/crimson/gtest_seastar.h"
 #include "test/crimson/seastore/transaction_manager_test_state.h"
 
@@ -79,23 +81,171 @@ struct transaction_manager_test_t :
   }
 
   struct test_extents_t : std::map<laddr_t, test_extent_record_t> {
-  private:
-    void check_available(laddr_t addr, extent_len_t len) {
-      auto iter = upper_bound(addr);
-      if (iter != begin()) {
-       auto liter = iter;
-       liter--;
-       EXPECT_FALSE(liter->first + liter->second.desc.len > addr);
+    using delta_t = std::map<laddr_t, std::optional<test_extent_record_t>>;
+
+    struct delta_overlay_t {
+      const test_extents_t &extents;
+      const delta_t &delta;
+
+      delta_overlay_t(
+       const test_extents_t &extents,
+       const delta_t &delta)
+       : extents(extents), delta(delta) {}
+
+
+      class iterator {
+       friend class test_extents_t;
+
+       const delta_overlay_t &parent;
+       test_extents_t::const_iterator biter;
+       delta_t::const_iterator oiter;
+       std::optional<std::pair<laddr_t, test_extent_record_t>> cur;
+
+       iterator(
+         const delta_overlay_t &parent,
+         test_extents_t::const_iterator biter,
+         delta_t::const_iterator oiter)
+         : parent(parent), biter(biter), oiter(oiter) {}
+
+       laddr_t get_bkey() {
+         return biter == parent.extents.end() ? L_ADDR_MAX : biter->first;
+       }
+
+       laddr_t get_okey() {
+         return oiter == parent.delta.end() ? L_ADDR_MAX : oiter->first;
+       }
+
+       bool is_end() {
+         return oiter == parent.delta.end() && biter == parent.extents.end();
+       }
+
+       bool is_valid() {
+         return is_end() ||
+           ((get_okey() < get_bkey()) && (oiter->second)) ||
+           (get_okey() > get_bkey());
+       }
+
+       auto get_pair() {
+         assert(is_valid());
+         assert(!is_end());
+         auto okey = get_okey();
+         auto bkey = get_bkey();
+         return (
+           bkey < okey ?
+           std::pair<laddr_t, test_extent_record_t>(*biter) :
+           std::make_pair(okey, *(oiter->second)));
+       }
+
+       void adjust() {
+         while (!is_valid()) {
+           if (get_okey() < get_bkey()) {
+             assert(!oiter->second);
+             ++oiter;
+           } else {
+             assert(get_okey() == get_bkey());
+             ++biter;
+           }
+         }
+         assert(is_valid());
+         if (!is_end()) {
+           cur = get_pair();
+         } else {
+           cur = std::nullopt;
+         }
+       }
+
+      public:
+       iterator(const iterator &) = default;
+       iterator(iterator &&) = default;
+       iterator &operator=(const iterator &) = default;
+       iterator &operator=(iterator &&) = default;
+
+       iterator &operator++() {
+         assert(is_valid());
+         assert(!is_end());
+         if (get_bkey() < get_okey()) {
+           ++biter;
+         } else {
+           ++oiter;
+         }
+         adjust();
+         return *this;
+       }
+
+       bool operator==(const iterator &o) const {
+         return o.biter == biter && o.oiter == oiter;
+       }
+       bool operator!=(const iterator &o) const {
+         return !(*this == o);
+       }
+
+       auto operator*() {
+         assert(!is_end());
+         return *cur;
+       }
+       auto operator->() {
+         assert(!is_end());
+         return &*cur;
+       }
+      };
+
+      iterator begin() {
+       auto ret = iterator{*this, extents.begin(), delta.begin()};
+       ret.adjust();
+       return ret;
+      }
+
+      iterator end() {
+       auto ret = iterator{*this, extents.end(), delta.end()};
+       // adjust unnecessary
+       return ret;
+      }
+
+      iterator lower_bound(laddr_t l) {
+       auto ret = iterator{*this, extents.lower_bound(l), delta.lower_bound(l)};
+       ret.adjust();
+       return ret;
+      }
+
+      iterator upper_bound(laddr_t l) {
+       auto ret = iterator{*this, extents.upper_bound(l), delta.upper_bound(l)};
+       ret.adjust();
+       return ret;
       }
-      if (iter != end()) {
-       EXPECT_FALSE(iter->first < addr + len);
+
+      iterator find(laddr_t l) {
+       auto ret = lower_bound(l);
+       if (ret == end() || ret->first != l) {
+         return end();
+       } else {
+         return ret;
+       }
+      }
+    };
+  private:
+    void check_available(
+      laddr_t addr, extent_len_t len, const delta_t &delta
+    ) const {
+      delta_overlay_t overlay(*this, delta);
+      for (const auto &i: overlay) {
+       if (i.first < addr) {
+         EXPECT_FALSE(i.first + i.second.desc.len > addr);
+       } else {
+         EXPECT_FALSE(addr + len > i.first);
+       }
       }
     }
-    void check_hint(laddr_t hint, laddr_t addr, extent_len_t len) {
-      auto iter = lower_bound(hint);
+
+    void check_hint(
+      laddr_t hint,
+      laddr_t addr,
+      extent_len_t len,
+      delta_t &delta) const {
+      delta_overlay_t overlay(*this, delta);
+      auto iter = overlay.lower_bound(hint);
       laddr_t last = hint;
       while (true) {
-       if (iter == end() || iter->first > addr) {
+       if (iter == overlay.end() || iter->first > addr) {
          EXPECT_EQ(addr, last);
          break;
        }
@@ -104,32 +254,109 @@ struct transaction_manager_test_t :
        ++iter;
       }
     }
+
+    std::optional<test_extent_record_t> &populate_delta(
+      laddr_t addr, delta_t &delta, const test_extent_desc_t *desc) const {
+      auto diter = delta.find(addr);
+      if (diter != delta.end())
+       return diter->second;
+
+      auto iter = find(addr);
+      if (iter == end()) {
+       assert(desc);
+       auto ret = delta.emplace(
+         std::make_pair(addr, test_extent_record_t{*desc, 0}));
+       assert(ret.second);
+       return ret.first->second;
+      } else {
+       auto ret = delta.emplace(*iter);
+       assert(ret.second);
+       return ret.first->second;
+      }
+    }
   public:
-    void insert(TestBlock &extent) {
-      check_available(extent.get_laddr(), extent.get_length());
-      emplace(
-       std::make_pair(
-         extent.get_laddr(),
-         test_extent_record_t{extent.get_desc(), 1}
-       ));
+    delta_overlay_t get_overlay(const delta_t &delta) const {
+      return delta_overlay_t{*this, delta};
+    }
+
+    void insert(TestBlock &extent, delta_t &delta) const {
+      check_available(extent.get_laddr(), extent.get_length(), delta);
+      delta[extent.get_laddr()] =
+       test_extent_record_t{extent.get_desc(), 1};
+    }
+
+    void alloced(laddr_t hint, TestBlock &extent, delta_t &delta) const {
+      check_hint(hint, extent.get_laddr(), extent.get_length(), delta);
+      insert(extent, delta);
+    }
+
+    bool contains(laddr_t addr, const delta_t &delta) const {
+      delta_overlay_t overlay(*this, delta);
+      return overlay.find(addr) != overlay.end();
     }
-    void alloced(laddr_t hint, TestBlock &extent) {
-      check_hint(hint, extent.get_laddr(), extent.get_length());
-      insert(extent);
+
+    test_extent_record_t get(laddr_t addr, const delta_t &delta) const {
+      delta_overlay_t overlay(*this, delta);
+      auto iter = overlay.find(addr);
+      assert(iter != overlay.end());
+      return iter->second;
+    }
+
+    void update(
+      laddr_t addr,
+      const test_extent_desc_t &desc,
+      delta_t &delta) const {
+      auto &rec = populate_delta(addr, delta, &desc);
+      assert(rec);
+      rec->desc = desc;
+    }
+
+    int inc_ref(
+      laddr_t addr,
+      delta_t &delta) const {
+      auto &rec = populate_delta(addr, delta, nullptr);
+      assert(rec);
+      return ++rec->refcount;
+    }
+
+    int dec_ref(
+      laddr_t addr,
+      delta_t &delta) const {
+      auto &rec = populate_delta(addr, delta, nullptr);
+      assert(rec);
+      assert(rec->refcount > 0);
+      rec->refcount--;
+      if (rec->refcount == 0) {
+       delta[addr] = std::nullopt;
+       return 0;
+      } else {
+       return rec->refcount;
+      }
+    }
+
+    void consume(const delta_t &delta) {
+      for (const auto &i : delta) {
+       if (i.second) {
+         (*this)[i.first] = *i.second;
+       } else {
+         erase(i.first);
+       }
+      }
     }
+
   } test_mappings;
 
   struct test_transaction_t {
     TransactionRef t;
-    test_extents_t mappings;
+    test_extents_t::delta_t mapping_delta;
   };
 
   test_transaction_t create_transaction() {
-    return { tm->create_transaction(), test_mappings };
+    return { tm->create_transaction(), {} };
   }
 
   test_transaction_t create_weak_transaction() {
-    return { tm->create_weak_transaction(), test_mappings };
+    return { tm->create_weak_transaction(), {} };
   }
 
   TestBlockRef alloc_extent(
@@ -142,9 +369,9 @@ struct transaction_manager_test_t :
       hint,
       len).unsafe_get0();
     extent->set_contents(contents);
-    EXPECT_FALSE(t.mappings.count(extent->get_laddr()));
+    EXPECT_FALSE(test_mappings.contains(extent->get_laddr(), t.mapping_delta));
     EXPECT_EQ(len, extent->get_length());
-    t.mappings.alloced(hint, *extent);
+    test_mappings.alloced(hint, *extent, t.mapping_delta);
     return extent;
   }
 
@@ -194,8 +421,8 @@ struct transaction_manager_test_t :
     test_transaction_t &t,
     laddr_t addr,
     extent_len_t len) {
-    ceph_assert(t.mappings.count(addr));
-    ceph_assert(t.mappings[addr].desc.len == len);
+    ceph_assert(test_mappings.contains(addr, t.mapping_delta));
+    ceph_assert(test_mappings.get(addr, t.mapping_delta).desc.len == len);
 
     auto ret_list = tm->read_extents<TestBlock>(
       *t.t, addr, len
@@ -212,56 +439,97 @@ struct transaction_manager_test_t :
   TestBlockRef mutate_extent(
     test_transaction_t &t,
     TestBlockRef ref) {
-    ceph_assert(t.mappings.count(ref->get_laddr()));
-    ceph_assert(t.mappings[ref->get_laddr()].desc.len == ref->get_length());
+    ceph_assert(test_mappings.contains(ref->get_laddr(), t.mapping_delta));
+    ceph_assert(
+      test_mappings.get(ref->get_laddr(), t.mapping_delta).desc.len ==
+      ref->get_length());
+
     auto ext = tm->get_mutable_extent(*t.t, ref)->cast<TestBlock>();
     EXPECT_EQ(ext->get_laddr(), ref->get_laddr());
     EXPECT_EQ(ext->get_desc(), ref->get_desc());
     mutator.mutate(*ext, gen);
-    t.mappings[ext->get_laddr()].update(ext->get_desc());
+
+    test_mappings.update(ext->get_laddr(), ext->get_desc(), t.mapping_delta);
+    return ext;
+  }
+
+  TestBlockRef mutate_addr(
+    test_transaction_t &t,
+    laddr_t offset,
+    size_t length) {
+    auto ext = get_extent(t, offset, length);
+    mutate_extent(t, ext);
     return ext;
   }
 
   void inc_ref(test_transaction_t &t, laddr_t offset) {
-    ceph_assert(t.mappings.count(offset));
-    ceph_assert(t.mappings[offset].refcount > 0);
+    ceph_assert(test_mappings.contains(offset, t.mapping_delta));
+    ceph_assert(test_mappings.get(offset, t.mapping_delta).refcount > 0);
+
     auto refcnt = tm->inc_ref(*t.t, offset).unsafe_get0();
-    t.mappings[offset].refcount++;
-    EXPECT_EQ(refcnt, t.mappings[offset].refcount);
+    auto check_refcnt = test_mappings.inc_ref(offset, t.mapping_delta);
+    EXPECT_EQ(refcnt, check_refcnt);
   }
 
   void dec_ref(test_transaction_t &t, laddr_t offset) {
-    ceph_assert(t.mappings.count(offset));
-    ceph_assert(t.mappings[offset].refcount > 0);
+    ceph_assert(test_mappings.contains(offset, t.mapping_delta));
+    ceph_assert(test_mappings.get(offset, t.mapping_delta).refcount > 0);
+
     auto refcnt = tm->dec_ref(*t.t, offset).unsafe_get0();
-    t.mappings[offset].refcount--;
-    EXPECT_EQ(refcnt, t.mappings[offset].refcount);
-    if (t.mappings[offset].refcount == 0) {
-      t.mappings.erase(offset);
-    }
+    auto check_refcnt = test_mappings.dec_ref(offset, t.mapping_delta);
+    EXPECT_EQ(refcnt, check_refcnt);
+    if (refcnt == 0)
+      logger().debug("dec_ref: {} at refcount 0", offset);
   }
 
   void check_mappings(test_transaction_t &t) {
-    for (auto &i: t.mappings) {
+    auto overlay = test_mappings.get_overlay(t.mapping_delta);
+    for (const auto &i: overlay) {
       logger().debug("check_mappings: {}->{}", i.first, i.second);
       auto ext = get_extent(t, i.first, i.second.desc.len);
       EXPECT_EQ(i.second, ext->get_desc());
     }
-    auto lt = create_weak_transaction();
     lba_manager->scan_mappings(
-      *lt.t,
+      *t.t,
       0,
       L_ADDR_MAX,
-      [iter=lt.mappings.begin(), &lt](auto l, auto p, auto len) mutable {
-       EXPECT_NE(iter, lt.mappings.end());
+      [iter=overlay.begin(), &overlay](auto l, auto p, auto len) mutable {
+       EXPECT_NE(iter, overlay.end());
+       logger().debug(
+         "check_mappings: scan {}",
+         l);
        EXPECT_EQ(l, iter->first);
        ++iter;
       }).unsafe_get0();
   }
 
-  void submit_transaction(test_transaction_t t) {
-    tm->submit_transaction(std::move(t.t)).unsafe_get();
-    test_mappings = t.mappings;
+  bool try_submit_transaction(test_transaction_t t) {
+    using ertr = TransactionManager::submit_transaction_ertr;
+    using ret = ertr::future<bool>;
+    bool success = tm->submit_transaction(std::move(t.t)
+    ).safe_then([]() -> ret {
+      return ertr::make_ready_future<bool>(true);
+    }).handle_error(
+      [](const crimson::ct_error::eagain &e) {
+       return seastar::make_ready_future<bool>(false);
+      },
+      crimson::ct_error::pass_further_all{}
+    ).unsafe_get0();
+
+    if (success)
+      test_mappings.consume(t.mapping_delta);
+
+    return success;
+  }
+
+  void submit_transaction(test_transaction_t &&t) {
+    bool success = try_submit_transaction(std::move(t));
+    EXPECT_TRUE(success);
+  }
+
+  void submit_transaction_expect_conflict(test_transaction_t &&t) {
+    bool success = try_submit_transaction(std::move(t));
+    EXPECT_FALSE(success);
   }
 };
 
@@ -324,6 +592,110 @@ TEST_F(transaction_manager_test_t, mutate)
   });
 }
 
+TEST_F(transaction_manager_test_t, allocate_lba_conflict)
+{
+  constexpr laddr_t SIZE = 4096;
+  run_async([this] {
+    constexpr laddr_t ADDR = 0xFF * SIZE;
+    constexpr laddr_t ADDR2 = 0xFE * SIZE;
+    auto t = create_transaction();
+    auto t2 = create_transaction();
+
+    // These should conflict as they should both modify the lba root
+    auto extent = alloc_extent(
+      t,
+      ADDR,
+      SIZE,
+      'a');
+    ASSERT_EQ(ADDR, extent->get_laddr());
+    check_mappings(t);
+    check();
+
+    auto extent2 = alloc_extent(
+      t2,
+      ADDR2,
+      SIZE,
+      'a');
+    ASSERT_EQ(ADDR2, extent2->get_laddr());
+    check_mappings(t2);
+    extent2.reset();
+
+    submit_transaction(std::move(t2));
+    submit_transaction_expect_conflict(std::move(t));
+  });
+}
+
+TEST_F(transaction_manager_test_t, mutate_lba_conflict)
+{
+  constexpr laddr_t SIZE = 4096;
+  run_async([this] {
+    {
+      auto t = create_transaction();
+      for (unsigned i = 0; i < 300; ++i) {
+       auto extent = alloc_extent(
+         t,
+         laddr_t(i * SIZE),
+         SIZE);
+      }
+      check_mappings(t);
+      submit_transaction(std::move(t));
+      check();
+    }
+
+    constexpr laddr_t ADDR = 150 * SIZE;
+    {
+      auto t = create_transaction();
+      auto t2 = create_transaction();
+
+      mutate_addr(t, ADDR, SIZE);
+      mutate_addr(t2, ADDR, SIZE);
+
+      submit_transaction(std::move(t));
+      submit_transaction_expect_conflict(std::move(t2));
+    }
+    check();
+
+    {
+      auto t = create_transaction();
+      mutate_addr(t, ADDR, SIZE);
+      submit_transaction(std::move(t));
+    }
+    check();
+  });
+}
+
+TEST_F(transaction_manager_test_t, concurrent_mutate_lba_no_conflict)
+{
+  constexpr laddr_t SIZE = 4096;
+  constexpr size_t NUM = 500;
+  constexpr laddr_t addr = 0;
+  constexpr laddr_t addr2 = SIZE * (NUM - 1);
+  run_async([this] {
+    {
+      auto t = create_transaction();
+      for (unsigned i = 0; i < NUM; ++i) {
+       auto extent = alloc_extent(
+         t,
+         laddr_t(i * SIZE),
+         SIZE);
+      }
+      submit_transaction(std::move(t));
+    }
+
+    {
+      auto t = create_transaction();
+      auto t2 = create_transaction();
+
+      mutate_addr(t, addr, SIZE);
+      mutate_addr(t2, addr2, SIZE);
+
+      submit_transaction(std::move(t));
+      submit_transaction(std::move(t2));
+    }
+    check();
+  });
+}
+
 TEST_F(transaction_manager_test_t, create_remove_same_transaction)
 {
   constexpr laddr_t SIZE = 4096;
@@ -385,7 +757,6 @@ TEST_F(transaction_manager_test_t, split_merge_read_same_transaction)
   });
 }
 
-
 TEST_F(transaction_manager_test_t, inc_dec_ref)
 {
   constexpr laddr_t SIZE = 4096;
@@ -493,3 +864,60 @@ TEST_F(transaction_manager_test_t, random_writes)
     }
   });
 }
+
+TEST_F(transaction_manager_test_t, random_writes_concurrent)
+{
+  constexpr unsigned WRITE_STREAMS = 256;
+
+  constexpr size_t TOTAL = 4<<20;
+  constexpr size_t BSIZE = 4<<10;
+  constexpr size_t BLOCKS = TOTAL / BSIZE;
+  run_async([this] {
+    seastar::parallel_for_each(
+      boost::make_counting_iterator(0u),
+      boost::make_counting_iterator(WRITE_STREAMS),
+      [&](auto idx) {
+       for (unsigned i = idx; i < BLOCKS; i += WRITE_STREAMS) {
+         while (true) {
+           auto t = create_transaction();
+           auto extent = alloc_extent(
+             t,
+             i * BSIZE,
+             BSIZE);
+           ASSERT_EQ(i * BSIZE, extent->get_laddr());
+           if (try_submit_transaction(std::move(t)))
+             break;
+         }
+       }
+      }).get0();
+
+    int writes = 0;
+    unsigned failures = 0;
+    seastar::parallel_for_each(
+      boost::make_counting_iterator(0u),
+      boost::make_counting_iterator(WRITE_STREAMS),
+      [&](auto) {
+       return seastar::async([&] {
+         while (writes < 300) {
+           auto t = create_transaction();
+           auto ext = get_extent(
+             t,
+             get_random_laddr(BSIZE, TOTAL),
+             BSIZE);
+           auto mut = mutate_extent(t, ext);
+           auto success = try_submit_transaction(std::move(t));
+           writes += success;
+           failures += !success;
+         }
+       });
+      }).get0();
+    replay();
+    logger().debug("random_writes: checking");
+    check();
+    logger().debug(
+      "random_writes: {} suceeded, {} failed",
+      writes,
+      failures
+    );
+  });
+}