]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: add transaction_manager
authorSamuel Just <sjust@redhat.com>
Wed, 27 May 2020 01:15:09 +0000 (18:15 -0700)
committerSamuel Just <sjust@redhat.com>
Tue, 2 Jun 2020 23:56:41 +0000 (16:56 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/transaction_manager.cc [new file with mode: 0644]
src/crimson/os/seastore/transaction_manager.h [new file with mode: 0644]
src/test/crimson/seastore/CMakeLists.txt
src/test/crimson/seastore/test_transaction_manager.cc [new file with mode: 0644]

index eda36b4f2721b51fafc59c9204d10e19f60af037..92edd040e6c1378022ab1175a37d5d65806844f8 100644 (file)
@@ -3,6 +3,7 @@ add_library(crimson-seastore
   seastore_types.cc
   segment_manager/ephemeral.cc
   segment_manager.cc
+  transaction_manager.cc
   journal.cc
   cache.cc
   lba_manager.cc
diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc
new file mode 100644 (file)
index 0000000..89281c9
--- /dev/null
@@ -0,0 +1,144 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/denc.h"
+#include "include/intarith.h"
+
+#include "crimson/common/log.h"
+
+#include "crimson/os/seastore/transaction_manager.h"
+#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/journal.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_filestore);
+  }
+}
+
+namespace crimson::os::seastore {
+
+TransactionManager::TransactionManager(
+  SegmentManager &segment_manager,
+  Journal &journal,
+  Cache &cache,
+  LBAManager &lba_manager)
+  : segment_manager(segment_manager),
+    cache(cache),
+    lba_manager(lba_manager),
+    journal(journal)
+{
+  journal.set_segment_provider(this);
+}
+
+TransactionManager::mkfs_ertr::future<> TransactionManager::mkfs()
+{
+  return journal.open_for_write().safe_then([this] {
+    logger().debug("TransactionManager::mkfs: about to do_with");
+    return seastar::do_with(
+      lba_manager.create_transaction(),
+      [this](auto &transaction) {
+       logger().debug("TransactionManager::mkfs: about to cache.mkfs");
+       return cache.mkfs(*transaction
+       ).safe_then([this, &transaction] {
+         return lba_manager.mkfs(*transaction);
+       }).safe_then([this, &transaction] {
+         logger().debug("TransactionManager::mkfs: about to submit_transaction");
+         return submit_transaction(std::move(transaction)).handle_error(
+           crimson::ct_error::eagain::handle([] {
+             ceph_assert(0 == "eagain impossible");
+             return mkfs_ertr::now();
+           }),
+           mkfs_ertr::pass_further{}
+         );
+       });
+      });
+  }).safe_then([this] {
+    return journal.close();
+  });
+}
+
+TransactionManager::mount_ertr::future<> TransactionManager::mount()
+{
+  return journal.replay([this](auto paddr, const auto &e) {
+    return cache.replay_delta(paddr, e);
+  }).safe_then([this] {
+    return journal.open_for_write();
+  }).handle_error(
+    mount_ertr::pass_further{},
+    crimson::ct_error::all_same_way([] {
+      ceph_assert(0 == "unhandled error");
+      return mount_ertr::now();
+    }));
+}
+
+TransactionManager::close_ertr::future<> TransactionManager::close() {
+  return cache.close(
+  ).safe_then([this] {
+    return journal.close();
+  });
+}
+
+TransactionManager::ref_ret TransactionManager::inc_ref(
+  Transaction &t,
+  LogicalCachedExtentRef &ref)
+{
+  return lba_manager.incref_extent(t, ref->get_laddr()
+  ).handle_error(
+    ref_ertr::pass_further{},
+    ct_error::all_same_way([](auto e) {
+      ceph_assert(0 == "unhandled error, TODO");
+    }));
+}
+
+TransactionManager::ref_ret TransactionManager::inc_ref(
+  Transaction &t,
+  laddr_t offset)
+{
+  return lba_manager.incref_extent(t, offset);
+}
+
+TransactionManager::ref_ret TransactionManager::dec_ref(
+  Transaction &t,
+  LogicalCachedExtentRef &ref)
+{
+  return dec_ref(t, ref->get_laddr()
+  ).handle_error(
+    ref_ertr::pass_further{},
+    ct_error::all_same_way([](auto e) {
+      ceph_assert(0 == "unhandled error, TODO");
+    }));
+}
+
+TransactionManager::ref_ret TransactionManager::dec_ref(
+  Transaction &t,
+  laddr_t offset)
+{
+  // TODO: need to retire the extent (only) if it's live, will need cache call
+  return lba_manager.decref_extent(t, offset);
+}
+
+TransactionManager::submit_transaction_ertr::future<>
+TransactionManager::submit_transaction(
+  TransactionRef t)
+{
+  auto record = cache.try_construct_record(*t);
+  if (!record) {
+    return crimson::ct_error::eagain::make();
+  }
+
+  logger().debug("TransactionManager::submit_transaction");
+
+  return journal.submit_record(std::move(*record)).safe_then(
+    [this, t=std::move(t)](paddr_t addr) {
+      cache.complete_commit(*t, addr);
+    },
+    submit_transaction_ertr::pass_further{},
+    crimson::ct_error::all_same_way([](auto e) {
+      ceph_assert(0 == "Hit error submitting to journal");
+    }));
+}
+
+TransactionManager::~TransactionManager() {}
+
+}
diff --git a/src/crimson/os/seastore/transaction_manager.h b/src/crimson/os/seastore/transaction_manager.h
new file mode 100644 (file)
index 0000000..4e10f27
--- /dev/null
@@ -0,0 +1,274 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+#include <optional>
+#include <vector>
+#include <utility>
+#include <functional>
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+
+#include <seastar/core/future.hh>
+
+#include "include/ceph_assert.h"
+#include "include/buffer.h"
+
+#include "crimson/osd/exceptions.h"
+
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/cache.h"
+#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/lba_manager.h"
+#include "crimson/os/seastore/journal.h"
+
+namespace crimson::os::seastore {
+class Journal;
+
+/**
+ * LogicalCachedExtent
+ *
+ * CachedExtent with associated lba mapping.
+ *
+ * Users of TransactionManager should be using extents derived from
+ * LogicalCachedExtent.
+ */
+class LogicalCachedExtent : public CachedExtent {
+public:
+  template <typename... T>
+  LogicalCachedExtent(T&&... t) : CachedExtent(std::forward<T>(t)...) {}
+
+  void set_pin(LBAPinRef &&pin) { this->pin = std::move(pin); }
+
+  LBAPin &get_pin() {
+    assert(pin);
+    return *pin;
+  }
+
+  laddr_t get_laddr() const {
+    assert(pin);
+    return pin->get_laddr();
+  }
+
+private:
+  LBAPinRef pin;
+};
+
+using LogicalCachedExtentRef = TCachedExtentRef<LogicalCachedExtent>;
+struct ref_laddr_cmp {
+  using is_transparent = laddr_t;
+  bool operator()(const LogicalCachedExtentRef &lhs,
+                 const LogicalCachedExtentRef &rhs) const {
+    return lhs->get_laddr() < rhs->get_laddr();
+  }
+  bool operator()(const laddr_t &lhs,
+                 const LogicalCachedExtentRef &rhs) const {
+    return lhs < rhs->get_laddr();
+  }
+  bool operator()(const LogicalCachedExtentRef &lhs,
+                 const laddr_t &rhs) const {
+    return lhs->get_laddr() < rhs;
+  }
+};
+
+using lextent_set_t = addr_extent_set_base_t<
+  laddr_t,
+  LogicalCachedExtentRef,
+  ref_laddr_cmp
+  >;
+
+template <typename T>
+using lextent_list_t = addr_extent_list_base_t<
+  laddr_t, TCachedExtentRef<T>>;
+
+/**
+ * TransactionManager
+ *
+ * Abstraction hiding reading and writing to persistence.
+ * Exposes transaction based interface with read isolation.
+ */
+class TransactionManager : public JournalSegmentProvider {
+public:
+  TransactionManager(
+    SegmentManager &segment_manager,
+    Journal &journal,
+    Cache &cache,
+    LBAManager &lba_manager);
+
+  segment_id_t next = 0;
+  get_segment_ret get_segment() final {
+    // TODO -- part of gc
+    return get_segment_ret(
+      get_segment_ertr::ready_future_marker{},
+      next++);
+  }
+
+  void put_segment(segment_id_t segment) final {
+    // TODO -- part of gc
+    return;
+  }
+
+  /// Writes initial metadata to disk
+  using mkfs_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error
+    >;
+  mkfs_ertr::future<> mkfs();
+
+  /// Reads initial metadata from disk
+  using mount_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error
+    >;
+  mount_ertr::future<> mount();
+
+  /// Closes transaction_manager
+  using close_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error
+    >;
+  close_ertr::future<> close();
+
+  /// Creates empty transaction
+  TransactionRef create_transaction() {
+    return lba_manager.create_transaction();
+  }
+
+  /**
+   * Read extents corresponding to specified lba range
+   */
+  using read_extent_ertr = SegmentManager::read_ertr;
+  template <typename T>
+  using read_extent_ret = read_extent_ertr::future<lextent_list_t<T>>;
+  template <typename T>
+  read_extent_ret<T> read_extents(
+    Transaction &t,
+    laddr_t offset,
+    extent_len_t length)
+  {
+    std::unique_ptr<lextent_list_t<T>> ret =
+      std::make_unique<lextent_list_t<T>>();
+    auto &ret_ref = *ret;
+    std::unique_ptr<lba_pin_list_t> pin_list =
+      std::make_unique<lba_pin_list_t>();
+    auto &pin_list_ref = *pin_list;
+    return lba_manager.get_mapping(
+      t, offset, length
+    ).safe_then([this, &t, &pin_list_ref, &ret_ref](auto pins) {
+      crimson::get_logger(ceph_subsys_filestore).debug(
+       "read_extents: mappings {}",
+       pins);
+      pins.swap(pin_list_ref);
+      return crimson::do_for_each(
+       pin_list_ref.begin(),
+       pin_list_ref.end(),
+       [this, &t, &ret_ref](auto &pin) {
+         crimson::get_logger(ceph_subsys_filestore).debug(
+           "read_extents: get_extent {}~{}",
+           pin->get_paddr(),
+           pin->get_length());
+         return cache.get_extent<T>(
+           t,
+           pin->get_paddr(),
+           pin->get_length()
+         ).safe_then([&pin, &ret_ref](auto ref) mutable {
+           ref->set_pin(std::move(pin));
+           ret_ref.push_back(std::make_pair(ref->get_laddr(), ref));
+           crimson::get_logger(ceph_subsys_filestore).debug(
+             "read_extents: got extent {}",
+             *ref);
+           return read_extent_ertr::now();
+         });
+       });
+    }).safe_then([ret=std::move(ret), pin_list=std::move(pin_list)]() mutable {
+      return read_extent_ret<T>(
+       read_extent_ertr::ready_future_marker{},
+       std::move(*ret));
+    });
+  }
+
+  /// Obtain mutable copy of extent
+  LogicalCachedExtentRef get_mutable_extent(Transaction &t, LogicalCachedExtentRef ref) {
+    auto ret = cache.duplicate_for_write(
+      t,
+      ref)->cast<LogicalCachedExtent>();
+    ret->set_pin(ref->get_pin().duplicate());
+    return ret;
+  }
+
+  using ref_ertr = LBAManager::ref_ertr;
+  using ref_ret = LBAManager::ref_ret;
+
+  /// Add refcount for ref
+  ref_ret inc_ref(
+    Transaction &t,
+    LogicalCachedExtentRef &ref);
+
+  /// Add refcount for offset
+  ref_ret inc_ref(
+    Transaction &t,
+    laddr_t offset);
+
+  /// Remove refcount for ref
+  ref_ret dec_ref(
+    Transaction &t,
+    LogicalCachedExtentRef &ref);
+
+  /// Remove refcount for offset
+  ref_ret dec_ref(
+    Transaction &t,
+    laddr_t offset);
+
+  /**
+   * alloc_extent
+   *
+   * Allocates a new block of type T with the minimum lba range of size len
+   * greater than hint.
+   */
+  using alloc_extent_ertr = SegmentManager::read_ertr;
+  template <typename T>
+  using alloc_extent_ret = alloc_extent_ertr::future<TCachedExtentRef<T>>;
+  template <typename T>
+  alloc_extent_ret<T> alloc_extent(
+    Transaction &t,
+    laddr_t hint,
+    extent_len_t len) {
+    auto ext = cache.alloc_new_extent<T>(
+      t,
+      len);
+    return lba_manager.alloc_extent(
+      t,
+      hint,
+      len,
+      ext->get_paddr()
+    ).safe_then([ext=std::move(ext)](auto &&ref) mutable {
+      ext->set_pin(std::move(ref));
+      return alloc_extent_ertr::make_ready_future<TCachedExtentRef<T>>(
+       std::move(ext));
+    });
+  }
+
+  /**
+   * submit_transaction
+   *
+   * Atomically submits transaction to persistence
+   */
+  using submit_transaction_ertr = crimson::errorator<
+    crimson::ct_error::eagain, // Caller should retry transaction from beginning
+    crimson::ct_error::input_output_error // Media error
+    >;
+  submit_transaction_ertr::future<> submit_transaction(TransactionRef);
+
+  ~TransactionManager();
+
+private:
+  friend class Transaction;
+
+  SegmentManager &segment_manager;
+  Cache &cache;
+  LBAManager &lba_manager;
+  Journal &journal;
+};
+using TransactionManagerRef = std::unique_ptr<TransactionManager>;
+
+}
index 0a0863c66332a10b5b403e1e1d78dced8a11eee5..9e801cd37119a6cf9c2e39832060b90a48e94509 100644 (file)
@@ -1,3 +1,12 @@
+add_executable(unittest_transaction_manager
+  test_transaction_manager.cc
+  ../gtest_seastar.cc)
+add_ceph_unittest(unittest_transaction_manager)
+target_link_libraries(
+  unittest_transaction_manager
+  ${CMAKE_DL_LIBS}
+  crimson-seastore)
+
 add_executable(unittest_btree_lba_manager
   test_btree_lba_manager.cc
   ../gtest_seastar.cc)
diff --git a/src/test/crimson/seastore/test_transaction_manager.cc b/src/test/crimson/seastore/test_transaction_manager.cc
new file mode 100644 (file)
index 0000000..e0d8ef7
--- /dev/null
@@ -0,0 +1,365 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/crimson/gtest_seastar.h"
+
+#include "crimson/os/seastore/cache.h"
+#include "crimson/os/seastore/transaction_manager.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+using namespace crimson;
+using namespace crimson::os;
+using namespace crimson::os::seastore;
+
+namespace {
+  [[maybe_unused]] seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_test);
+  }
+}
+
+struct test_extent_desc_t {
+  size_t len = 0;
+  unsigned checksum = 0;
+
+  bool operator==(const test_extent_desc_t &rhs) const {
+    return (len == rhs.len &&
+           checksum == rhs.checksum);
+  }
+  bool operator!=(const test_extent_desc_t &rhs) const {
+    return !(*this == rhs);
+  }
+};
+
+std::ostream &operator<<(std::ostream &lhs, const test_extent_desc_t &rhs) {
+  return lhs << "test_extent_desc_t(len=" << rhs.len
+            << ", checksum=" << rhs.checksum << ")";
+}
+
+struct test_extent_record_t {
+  test_extent_desc_t desc;
+  unsigned refcount = 0;
+  test_extent_record_t() = default;
+  test_extent_record_t(
+    const test_extent_desc_t &desc,
+    unsigned refcount) : desc(desc), refcount(refcount) {}
+
+  void update(const test_extent_desc_t &to) {
+    desc = to;
+  }
+
+  bool operator==(const test_extent_desc_t &rhs) const {
+    return desc == rhs;
+  }
+  bool operator!=(const test_extent_desc_t &rhs) const {
+    return desc != rhs;
+  }
+};
+
+std::ostream &operator<<(std::ostream &lhs, const test_extent_record_t &rhs) {
+  return lhs << "test_extent_record_t(" << rhs.desc
+            << ", refcount=" << rhs.refcount << ")";
+}
+
+
+struct TestBlock : LogicalCachedExtent {
+  using Ref = TCachedExtentRef<TestBlock>;
+
+  TestBlock(ceph::bufferptr &&ptr) : LogicalCachedExtent(std::move(ptr)) {}
+  TestBlock(const TestBlock &other) : LogicalCachedExtent(other) {}
+
+  CachedExtentRef duplicate_for_write() final {
+    return CachedExtentRef(new TestBlock(*this));
+  };
+
+  static constexpr extent_types_t TYPE = extent_types_t::TEST_BLOCK;
+  extent_types_t get_type() const final {
+    return TYPE;
+  }
+
+  ceph::bufferlist get_delta() final {
+    return ceph::bufferlist();
+  }
+
+  void set_contents(char c) {
+    ::memset(get_bptr().c_str(), c, get_length());
+  }
+
+  test_extent_desc_t get_desc() {
+    return { get_length(), get_crc32c(1) };
+  }
+
+  void apply_delta(paddr_t delta_base, ceph::bufferlist &bl) final {
+    ceph_assert(0 == "TODO");
+  }
+};
+using TestBlockRef = TCachedExtentRef<TestBlock>;
+
+struct transaction_manager_test_t : public seastar_test_suite_t {
+  std::unique_ptr<SegmentManager> segment_manager;
+  Journal journal;
+  Cache cache;
+  LBAManagerRef lba_manager;
+  TransactionManager tm;
+
+  transaction_manager_test_t()
+    : segment_manager(create_ephemeral(segment_manager::DEFAULT_TEST_EPHEMERAL)),
+      journal(*segment_manager),
+      cache(*segment_manager),
+      lba_manager(
+       lba_manager::create_lba_manager(*segment_manager, cache)),
+      tm(*segment_manager, journal, cache, *lba_manager) {}
+
+  seastar::future<> set_up_fut() final {
+    return segment_manager->init().safe_then([this] {
+      return tm.mkfs();
+    }).safe_then([this] {
+      return tm.mount();
+    }).handle_error(
+      crimson::ct_error::all_same_way([] {
+       ASSERT_FALSE("Unable to mount");
+      })
+    );
+  }
+
+  seastar::future<> tear_down_fut() final {
+    return tm.close(
+    ).handle_error(
+      crimson::ct_error::all_same_way([] {
+       ASSERT_FALSE("Unable to close");
+      })
+    );
+  }
+  struct test_extents_t : std::map<laddr_t, test_extent_record_t> {
+  private:
+    void check_available(laddr_t addr, extent_len_t len) {
+      auto iter = upper_bound(addr);
+      if (iter != begin()) {
+       auto liter = iter;
+       liter--;
+       EXPECT_FALSE(liter->first + liter->second.desc.len > addr);
+      }
+      if (iter != end()) {
+       EXPECT_FALSE(iter->first < addr + len);
+      }
+    }
+    void check_hint(laddr_t hint, laddr_t addr, extent_len_t len) {
+      auto iter = lower_bound(hint);
+      laddr_t last = hint;
+      while (true) {
+       if (iter == end() || iter->first > addr) {
+         EXPECT_EQ(addr, last);
+         break;
+       }
+       EXPECT_FALSE(iter->first - last > len);
+       last = iter->first + iter->second.desc.len;
+       ++iter;
+      }
+    }
+  public:
+    void insert(TestBlock &extent) {
+      check_available(extent.get_laddr(), extent.get_length());
+      emplace(
+       std::make_pair(
+         extent.get_laddr(),
+         test_extent_record_t{extent.get_desc(), 1}
+       ));
+    }
+    void alloced(laddr_t hint, TestBlock &extent) {
+      check_hint(hint, extent.get_laddr(), extent.get_length());
+      insert(extent);
+    }
+  } test_mappings;
+
+  struct test_transaction_t {
+    TransactionRef t;
+    test_extents_t mappings;
+  };
+
+  test_transaction_t create_transaction() {
+    return { tm.create_transaction(), test_mappings };
+  }
+
+  TestBlockRef alloc_extent(
+    test_transaction_t &t,
+    laddr_t hint,
+    extent_len_t len,
+    char contents) {
+    auto extent = tm.alloc_extent<TestBlock>(
+      *(t.t),
+      hint,
+      len).unsafe_get0();
+    extent->set_contents(contents);
+    EXPECT_FALSE(t.mappings.count(extent->get_laddr()));
+    EXPECT_EQ(len, extent->get_length());
+    t.mappings.alloced(hint, *extent);
+    return extent;
+  }
+
+  void check_mappings() {
+    auto t = create_transaction();
+    check_mappings(t);
+  }
+
+  TestBlockRef get_extent(
+    test_transaction_t &t,
+    laddr_t addr,
+    extent_len_t len) {
+    ceph_assert(t.mappings.count(addr));
+    ceph_assert(t.mappings[addr].desc.len == len);
+
+    auto ret_list = tm.read_extents<TestBlock>(
+      *t.t, addr, len
+    ).unsafe_get0();
+    EXPECT_EQ(ret_list.size(), 1);
+    auto &ext = ret_list.begin()->second;
+    auto &laddr = ret_list.begin()->first;
+    EXPECT_EQ(addr, laddr);
+    EXPECT_EQ(addr, ext->get_laddr());
+    return ext;
+  }
+
+  TestBlockRef mutate_extent(
+    test_transaction_t &t,
+    TestBlockRef ref,
+    char contents) {
+    ceph_assert(t.mappings.count(ref->get_laddr()));
+    ceph_assert(t.mappings[ref->get_laddr()].desc.len == ref->get_length());
+    auto ext = tm.get_mutable_extent(*t.t, ref)->cast<TestBlock>();
+    EXPECT_EQ(ext->get_laddr(), ref->get_laddr());
+    EXPECT_EQ(ext->get_desc(), ref->get_desc());
+    ext->set_contents(contents);
+    t.mappings[ext->get_laddr()].update(ext->get_desc());
+    return ext;
+  }
+
+  void inc_ref(test_transaction_t &t, laddr_t offset) {
+    ceph_assert(t.mappings.count(offset));
+    ceph_assert(t.mappings[offset].refcount > 0);
+    auto refcnt = tm.inc_ref(*t.t, offset).unsafe_get0();
+    t.mappings[offset].refcount++;
+    EXPECT_EQ(refcnt, t.mappings[offset].refcount);
+  }
+
+  void dec_ref(test_transaction_t &t, laddr_t offset) {
+    ceph_assert(t.mappings.count(offset));
+    ceph_assert(t.mappings[offset].refcount > 0);
+    auto refcnt = tm.dec_ref(*t.t, offset).unsafe_get0();
+    t.mappings[offset].refcount--;
+    EXPECT_EQ(refcnt, t.mappings[offset].refcount);
+    if (t.mappings[offset].refcount == 0) {
+      t.mappings.erase(offset);
+    }
+  }
+
+  void check_mappings(test_transaction_t &t) {
+    for (auto &i: t.mappings) {
+      logger().debug("check_mappings: {}->{}", i.first, i.second);
+      auto ext = get_extent(t, i.first, i.second.desc.len);
+      EXPECT_EQ(i.second, ext->get_desc());
+    }
+  }
+
+  void submit_transaction(test_transaction_t t) {
+    tm.submit_transaction(std::move(t.t)).unsafe_get();
+    test_mappings = t.mappings;
+  }
+};
+
+TEST_F(transaction_manager_test_t, basic)
+{
+  constexpr laddr_t SIZE = 4096;
+  run_async([this] {
+    constexpr laddr_t ADDR = 0xFF * SIZE;
+    {
+      auto t = create_transaction();
+      auto extent = alloc_extent(
+       t,
+       ADDR,
+       SIZE,
+       'a');
+      ASSERT_EQ(ADDR, extent->get_laddr());
+      check_mappings(t);
+      check_mappings();
+      submit_transaction(std::move(t));
+      check_mappings();
+    }
+  });
+}
+
+TEST_F(transaction_manager_test_t, mutate)
+{
+  constexpr laddr_t SIZE = 4096;
+  run_async([this] {
+    constexpr laddr_t ADDR = 0xFF * SIZE;
+    {
+      auto t = create_transaction();
+      auto extent = alloc_extent(
+       t,
+       ADDR,
+       SIZE,
+       'a');
+      ASSERT_EQ(ADDR, extent->get_laddr());
+      check_mappings(t);
+      check_mappings();
+      submit_transaction(std::move(t));
+      check_mappings();
+    }
+    {
+      auto t = create_transaction();
+      auto ext = get_extent(
+       t,
+       ADDR,
+       SIZE);
+      auto mut = mutate_extent(t, ext, 'c');
+      check_mappings(t);
+      check_mappings();
+      submit_transaction(std::move(t));
+      check_mappings();
+    }
+  });
+}
+
+TEST_F(transaction_manager_test_t, inc_dec_ref)
+{
+  constexpr laddr_t SIZE = 4096;
+  run_async([this] {
+    constexpr laddr_t ADDR = 0xFF * SIZE;
+    {
+      auto t = create_transaction();
+      auto extent = alloc_extent(
+       t,
+       ADDR,
+       SIZE,
+       'a');
+      ASSERT_EQ(ADDR, extent->get_laddr());
+      check_mappings(t);
+      check_mappings();
+      submit_transaction(std::move(t));
+      check_mappings();
+    }
+    {
+      auto t = create_transaction();
+      inc_ref(t, ADDR);
+      check_mappings(t);
+      check_mappings();
+      submit_transaction(std::move(t));
+      check_mappings();
+    }
+    {
+      auto t = create_transaction();
+      dec_ref(t, ADDR);
+      check_mappings(t);
+      check_mappings();
+      submit_transaction(std::move(t));
+      check_mappings();
+    }
+    {
+      auto t = create_transaction();
+      dec_ref(t, ADDR);
+      check_mappings(t);
+      check_mappings();
+      submit_transaction(std::move(t));
+      check_mappings();
+    }
+  });
+}