]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: add extent placement manager
authorXuehan Xu <xxhdx1985126@gmail.com>
Thu, 10 Jun 2021 06:21:39 +0000 (14:21 +0800)
committerXuehan Xu <xxhdx1985126@gmail.com>
Wed, 8 Sep 2021 03:03:00 +0000 (11:03 +0800)
Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
13 files changed:
src/common/options/crimson.yaml.in
src/crimson/common/errorator.h
src/crimson/common/interruptible_future.h
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/cache.cc
src/crimson/os/seastore/cache.h
src/crimson/os/seastore/cached_extent.h
src/crimson/os/seastore/extent_placement_manager.cc [new file with mode: 0644]
src/crimson/os/seastore/extent_placement_manager.h [new file with mode: 0644]
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/seastore_types.cc
src/crimson/os/seastore/seastore_types.h
src/crimson/os/seastore/transaction.h

index 38fd8c032470a697a17a28268bb2246075bd317d..516f8726bc5a2772434189f608739a246bd83404 100644 (file)
@@ -40,3 +40,8 @@ options:
   default: true
   see_also:
   - seastore_device_size
+- name: seastore_init_rewrite_segments_num_per_device
+  type: uint
+  level: dev
+  desc: Initial number of segments for rewriting extents per device
+  default: 6
index 53879cc9a002c68c649b9addb53ce1b89a684a88..502ca32fba105dbc509c8943c6511e9fd33f8241 100644 (file)
@@ -782,6 +782,11 @@ public:
     }
   };
 
+  template <typename T>
+  static future<T> make_errorator_future(seastar::future<T>&& fut) {
+    return std::move(fut);
+  }
+
   // assert_all{ "TODO" };
   class assert_all {
     const char* const msg = nullptr;
index 5b615057aeaaf80694ce1528e6ebb9edd0be3917..f41bc4f3ededd5af81760ad21ebaf4b6c62a67a5 100644 (file)
@@ -612,6 +612,7 @@ private:
 template <typename InterruptCond, typename Errorator>
 struct interruptible_errorator {
   using base_ertr = Errorator;
+  using intr_cond_t = InterruptCond;
 
   template <typename ValueT = void>
   using future = interruptible_future_detail<InterruptCond,
@@ -659,6 +660,9 @@ class [[nodiscard]] interruptible_future_detail<
 public:
   using core_type = ErroratedFuture<crimson::errorated_future_marker<T>>;
   using errorator_type = typename core_type::errorator_type;
+  using interrupt_errorator_type =
+    interruptible_errorator<InterruptCond, errorator_type>;
+  using interrupt_cond_type = InterruptCond;
 
   template <typename U>
   using interrupt_futurize_t =
index ccd67298328b80c6946e2820e3b7cc48465dbdfd..cdd88ea28a92ad79cc6c1fd1f34bbccdf16286d4 100644 (file)
@@ -32,6 +32,7 @@ add_library(crimson-seastore STATIC
   collection_manager.cc
   collection_manager/flat_collection_manager.cc
   collection_manager/collection_flat_node.cc
+  extent_placement_manager.cc
   object_data_handler.cc
   seastore.cc
   random_block_manager/nvme_manager.cc
index 185e34a84152ba1ca87caac5a4acd2ba409e4548..51e50de086a791ac68d6c2755d0a6b53e0e3a6ec 100644 (file)
@@ -710,7 +710,8 @@ void Cache::on_transaction_destruct(Transaction& t)
 CachedExtentRef Cache::alloc_new_extent_by_type(
   Transaction &t,       ///< [in, out] current transaction
   extent_types_t type,  ///< [in] type tag
-  segment_off_t length  ///< [in] length
+  segment_off_t length, ///< [in] length
+  bool delay           ///< [in] whether to delay paddr alloc
 )
 {
   switch (type) {
@@ -718,26 +719,26 @@ CachedExtentRef Cache::alloc_new_extent_by_type(
     assert(0 == "ROOT is never directly alloc'd");
     return CachedExtentRef();
   case extent_types_t::LADDR_INTERNAL:
-    return alloc_new_extent<lba_manager::btree::LBAInternalNode>(t, length);
+    return alloc_new_extent<lba_manager::btree::LBAInternalNode>(t, length, delay);
   case extent_types_t::LADDR_LEAF:
-    return alloc_new_extent<lba_manager::btree::LBALeafNode>(t, length);
+    return alloc_new_extent<lba_manager::btree::LBALeafNode>(t, length, delay);
   case extent_types_t::ONODE_BLOCK_STAGED:
-    return alloc_new_extent<onode::SeastoreNodeExtent>(t, length);
+    return alloc_new_extent<onode::SeastoreNodeExtent>(t, length, delay);
   case extent_types_t::OMAP_INNER:
-    return alloc_new_extent<omap_manager::OMapInnerNode>(t, length);
+    return alloc_new_extent<omap_manager::OMapInnerNode>(t, length, delay);
   case extent_types_t::OMAP_LEAF:
-    return alloc_new_extent<omap_manager::OMapLeafNode>(t, length);
+    return alloc_new_extent<omap_manager::OMapLeafNode>(t, length, delay);
   case extent_types_t::COLL_BLOCK:
-    return alloc_new_extent<collection_manager::CollectionNode>(t, length);
+    return alloc_new_extent<collection_manager::CollectionNode>(t, length, delay);
   case extent_types_t::OBJECT_DATA_BLOCK:
-    return alloc_new_extent<ObjectDataBlock>(t, length);
+    return alloc_new_extent<ObjectDataBlock>(t, length, delay);
   case extent_types_t::RETIRED_PLACEHOLDER:
     ceph_assert(0 == "impossible");
     return CachedExtentRef();
   case extent_types_t::TEST_BLOCK:
-    return alloc_new_extent<TestBlock>(t, length);
+    return alloc_new_extent<TestBlock>(t, length, delay);
   case extent_types_t::TEST_BLOCK_PHYSICAL:
-    return alloc_new_extent<TestBlockPhysical>(t, length);
+    return alloc_new_extent<TestBlockPhysical>(t, length, delay);
   case extent_types_t::NONE: {
     ceph_assert(0 == "NONE is an invalid extent type");
     return CachedExtentRef();
index 9806d315d34b7c555ec38bfa426a472782e6b579..d808a7d7bc1b01eb5075a861299310b102e8d59a 100644 (file)
@@ -367,23 +367,31 @@ public:
     }
   }
 
-  /**
-   * alloc_new_extent
-   *
-   * Allocates a fresh extent.  addr will be relative until commit.
-   */
   template <typename T>
   TCachedExtentRef<T> alloc_new_extent(
-    Transaction &t,      ///< [in, out] current transaction
-    segment_off_t length ///< [in] length
+    Transaction &t,       ///< [in, out] current transaction
+    segment_off_t length, ///< [in] length
+    bool delayed = false  ///< [in] whether the paddr allocation of extent is delayed
   ) {
     auto ret = CachedExtent::make_cached_extent_ref<T>(
       alloc_cache_buf(length));
-    t.add_fresh_extent(ret);
+    t.add_fresh_extent(ret, delayed);
     ret->state = CachedExtent::extent_state_t::INITIAL_WRITE_PENDING;
     return ret;
   }
 
+  void mark_delayed_extent_inline(
+    Transaction& t,
+    LogicalCachedExtentRef& ref) {
+    t.mark_delayed_extent_inline(ref);
+  }
+
+  void mark_delayed_extent_ool(
+    Transaction& t,
+    LogicalCachedExtentRef& ref) {
+    t.mark_delayed_extent_ool(ref);
+  }
+
   /**
    * alloc_new_extent
    *
@@ -392,7 +400,8 @@ public:
   CachedExtentRef alloc_new_extent_by_type(
     Transaction &t,       ///< [in, out] current transaction
     extent_types_t type,  ///< [in] type tag
-    segment_off_t length  ///< [in] length
+    segment_off_t length, ///< [in] length
+    bool delayed = false  ///< [in] whether delay addr allocation
     );
 
   /**
index 851446371e63a04dbbe21a3a3c536663f3b58711..d97e28f73124171c0c52c83e754201c5c92433e3 100644 (file)
 
 namespace crimson::os::seastore {
 
+class ool_record_t;
 class Transaction;
 class CachedExtent;
 using CachedExtentRef = boost::intrusive_ptr<CachedExtent>;
+class SegmentedAllocator;
+class TransactionManager;
 
 // #define DEBUG_CACHED_EXTENT_REF
 #ifdef DEBUG_CACHED_EXTENT_REF
@@ -320,6 +323,15 @@ public:
 
   virtual ~CachedExtent();
 
+  /// type of the backend device that will hold this extent
+  device_type_t backend_type = device_type_t::NONE;
+
+  /// hint for allocators
+  ool_placement_hint_t hint;
+
+  bool is_relative() const {
+    return poffset.is_relative();
+  }
 private:
   template <typename T>
   friend class read_set_item_t;
@@ -456,6 +468,9 @@ protected:
     }
   }
 
+  friend class crimson::os::seastore::ool_record_t;
+  friend class crimson::os::seastore::SegmentedAllocator;
+  friend class crimson::os::seastore::TransactionManager;
 };
 
 std::ostream &operator<<(std::ostream &, CachedExtent::extent_state_t);
diff --git a/src/crimson/os/seastore/extent_placement_manager.cc b/src/crimson/os/seastore/extent_placement_manager.cc
new file mode 100644 (file)
index 0000000..77ccd1a
--- /dev/null
@@ -0,0 +1,271 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "crimson/os/seastore/journal.h"
+#include "crimson/os/seastore/extent_placement_manager.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_seastore);
+  }
+}
+
+namespace crimson::os::seastore {
+
+SegmentedAllocator::SegmentedAllocator(
+  SegmentProvider& sp,
+  SegmentManager& sm,
+  LBAManager& lba_manager,
+  Journal& journal,
+  Cache& cache)
+  : segment_provider(sp),
+    segment_manager(sm),
+    lba_manager(lba_manager),
+    journal(journal),
+    cache(cache)
+{
+  std::generate_n(
+    std::back_inserter(writers),
+    crimson::common::get_conf<uint64_t>(
+      "seastore_init_rewrite_segments_num_per_device"),
+    [&] {
+      return Writer{
+       segment_provider,
+       segment_manager,
+       lba_manager,
+       journal,
+        cache};
+      });
+}
+
+SegmentedAllocator::Writer::finish_record_ret
+SegmentedAllocator::Writer::finish_write(
+  Transaction& t,
+  ool_record_t& record) {
+  return trans_intr::do_for_each(record.get_extents(),
+    [this, &t](auto& ool_extent) {
+    auto& lextent = ool_extent.get_lextent();
+    logger().debug("SegmentedAllocator::Writer::finish_write: "
+      "extent: {}, ool_paddr: {}",
+      *lextent,
+      ool_extent.get_ool_paddr());
+    return lba_manager.update_mapping(
+      t,
+      lextent->get_laddr(),
+      lextent->get_paddr(),
+      ool_extent.get_ool_paddr()
+    ).si_then([&ool_extent, &t, &lextent, this] {
+      ool_extent.persist_paddr();
+      lextent->backend_type = device_type_t::NONE;
+      lextent->hint = {};
+      cache.mark_delayed_extent_ool(t, lextent);
+      return finish_record_iertr::now();
+    });
+  }).si_then([&record] {
+    record.clear();
+  });
+}
+
+SegmentedAllocator::Writer::write_iertr::future<>
+SegmentedAllocator::Writer::_write(
+  Transaction& t,
+  ool_record_t& record)
+{
+  bufferlist bl = record.encode(current_segment->segment->get_segment_id(), 0);
+  seastar::promise<> pr;
+  current_segment->inflight_writes.emplace_back(pr.get_future());
+
+  logger().debug(
+    "SegmentedAllocator::Writer::write: written {} extents,"
+    " {} bytes to segment {} at {}",
+    record.get_num_extents(),
+    bl.length(),
+    current_segment->segment->get_segment_id(),
+    record.get_base());
+
+  return trans_intr::make_interruptible(
+    current_segment->segment->write(record.get_base(), bl).safe_then(
+      [this, pr=std::move(pr),
+      it=(--current_segment->inflight_writes.end()),
+      cs=current_segment]() mutable {
+        if (cs->outdated) {
+          pr.set_value();
+        } else{
+          current_segment->inflight_writes.erase(it);
+        }
+        return seastar::now();
+    })
+  ).si_then([this, &record, &t]() mutable {
+    return finish_write(t, record);
+  });
+}
+
+void SegmentedAllocator::Writer::add_extent_to_write(
+  ool_record_t& record,
+  LogicalCachedExtentRef& extent) {
+  logger().debug(
+    "SegmentedAllocator::Writer::add_extent_to_write: "
+    "add extent {} to record",
+    extent);
+  extent->prepare_write();
+  record.add_extent(extent);
+}
+
+SegmentedAllocator::Writer::write_iertr::future<>
+SegmentedAllocator::Writer::write(
+  Transaction& t,
+  std::list<LogicalCachedExtentRef>& extents)
+{
+  auto write_func = [this, &extents, &t] {
+    return seastar::do_with(ool_record_t(segment_manager.get_block_size()),
+      [this, &extents, &t](auto& record) {
+      return trans_intr::repeat([this, &record, &t, &extents]()
+        -> write_iertr::future<seastar::stop_iteration> {
+        if (extents.empty()) {
+          return seastar::make_ready_future<
+            seastar::stop_iteration>(seastar::stop_iteration::yes);
+        }
+
+        return segment_rotation_guard.wait(
+          [this] {
+            return !rolling_segment;
+          },
+          [this, &record, &extents, &t]() -> write_iertr::future<> {
+            record.set_base(allocated_to);
+            for (auto it = extents.begin();
+                 it != extents.end();) {
+              auto& extent = *it;
+              auto wouldbe_length =
+                record.get_wouldbe_encoded_record_length(extent);
+              if (_needs_roll(wouldbe_length)) {
+                // reached the segment end, write and roll
+                assert(!rolling_segment);
+                rolling_segment = true;
+                auto num_extents = record.get_num_extents();
+                logger().debug(
+                  "SegmentedAllocator::Writer::write: end of segment, writing {} extents to segment {} at {}",
+                  num_extents,
+                  current_segment->segment->get_segment_id(),
+                  allocated_to);
+                return (num_extents ?
+                        _write(t, record) :
+                        write_iertr::now()
+                ).si_then([this]() mutable {
+                  return roll_segment(false);
+                });
+              }
+              add_extent_to_write(record, extent);
+              it = extents.erase(it);
+            }
+            record_size_t rsize = record.get_encoded_record_length();
+
+            logger().debug(
+              "SegmentedAllocator::Writer::write: writing {} extents to segment {} at {}",
+              record.get_num_extents(),
+              current_segment->segment->get_segment_id(),
+              allocated_to);
+            allocated_to += rsize.mdlength + rsize.dlength;
+            return _write(t, record);
+          }
+        ).si_then([]()
+          -> write_iertr::future<seastar::stop_iteration> {
+          return seastar::make_ready_future<
+            seastar::stop_iteration>(seastar::stop_iteration::no);
+        });
+      });
+    });
+  };
+
+  if (rolling_segment) {
+    return segment_rotation_guard.wait([this] {
+        return !rolling_segment;
+      }, std::move(write_func));
+
+  } else if (!current_segment) {
+    return trans_intr::make_interruptible(roll_segment(true)).si_then(
+      [write_func=std::move(write_func)] {
+      return write_func();
+    });
+  }
+  return write_func();
+}
+
+bool SegmentedAllocator::Writer::_needs_roll(segment_off_t length) const {
+  return allocated_to + length > current_segment->segment->get_write_capacity();
+}
+
+SegmentedAllocator::Writer::init_segment_ertr::future<>
+SegmentedAllocator::Writer::init_segment(Segment& segment) {
+  bufferptr bp(
+    ceph::buffer::create_page_aligned(
+      segment_manager.get_block_size()));
+  bp.zero();
+  auto header =segment_header_t{
+    journal.next_journal_segment_seq - 1, // current seg seq = next seg seq - 1
+    segment.get_segment_id(),
+    NO_DELTAS, 0, true};
+  logger().debug("SegmentedAllocator::Writer::init_segment: initting {}, {}",
+    segment.get_segment_id(),
+    header);
+  ceph::bufferlist bl;
+  encode(header, bl);
+  bl.cbegin().copy(bl.length(), bp.c_str());
+  bl.clear();
+  bl.append(bp);
+  allocated_to = segment_manager.get_block_size();
+  return segment.write(0, bl).handle_error(
+    crimson::ct_error::input_output_error::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error when initing segment"}
+  );
+}
+
+SegmentedAllocator::Writer::roll_segment_ertr::future<>
+SegmentedAllocator::Writer::roll_segment(bool set_rolling) {
+  if (set_rolling) {
+    rolling_segment = true;
+  }
+  assert(rolling_segment);
+  if (current_segment) {
+    (void) seastar::with_gate(writer_guard, [this] {
+      auto fut = seastar::now();
+      if (!current_segment->inflight_writes.empty()) {
+        fut = seastar::when_all_succeed(
+          current_segment->inflight_writes.begin(),
+          current_segment->inflight_writes.end());
+      }
+      current_segment->outdated = true;
+      return fut.then(
+        [cs=std::move(current_segment), this, it=(--open_segments.end())] {
+        return cs->segment->close().safe_then([this, cs, it] {
+          assert((*it).get() == cs.get());
+          segment_provider.close_segment(cs->segment->get_segment_id());
+          open_segments.erase(it);
+        });
+      });
+    }).handle_exception_type([](seastar::gate_closed_exception e) {
+      logger().debug(
+        "SegmentedAllocator::Writer::roll_segment:"
+        " writer_guard closed, should be stopping");
+      return seastar::now();
+    });
+  }
+
+  return segment_provider.get_segment().safe_then([this](auto segment) {
+    return segment_manager.open(segment);
+  }).safe_then([this](auto segref) {
+    return init_segment(*segref).safe_then([segref=std::move(segref), this] {
+      assert(!current_segment.get());
+      current_segment.reset(new open_segment_wrapper_t());
+      current_segment->segment = segref;
+      open_segments.emplace_back(current_segment);
+      rolling_segment = false;
+      segment_rotation_guard.broadcast();
+    });
+  }).handle_error(
+    roll_segment_ertr::pass_further{},
+    crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); })
+  );
+}
+
+}
diff --git a/src/crimson/os/seastore/extent_placement_manager.h b/src/crimson/os/seastore/extent_placement_manager.h
new file mode 100644 (file)
index 0000000..934f012
--- /dev/null
@@ -0,0 +1,400 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include "seastar/core/gate.hh"
+
+#include "crimson/common/condition_variable.h"
+#include "crimson/common/log.h"
+#include "crimson/os/seastore/cache.h"
+#include "crimson/os/seastore/cached_extent.h"
+#include "crimson/os/seastore/lba_manager.h"
+
+namespace crimson::os::seastore {
+
+/**
+ * ool_record_t
+ *
+ * Encapsulates logic for building and encoding an ool record destined for
+ * an ool segment.
+ *
+ * Uses a metadata header to enable scanning the ool segment for gc purposes.
+ * Introducing a seperate physical->logical mapping would enable removing the
+ * metadata block overhead.
+ */
+class ool_record_t {
+  class OolExtent {
+  public:
+    OolExtent(LogicalCachedExtentRef& lextent)
+      : lextent(lextent) {}
+    void set_ool_paddr(paddr_t addr) {
+      ool_offset = addr;
+    }
+    paddr_t get_ool_paddr() const {
+      return ool_offset;
+    }
+    void persist_paddr() {
+      lextent->set_paddr(ool_offset);
+      ool_offset = P_ADDR_NULL;
+    }
+    bufferptr& get_bptr() {
+      return lextent->get_bptr();
+    }
+    LogicalCachedExtentRef& get_lextent() {
+      return lextent;
+    }
+  private:
+    paddr_t ool_offset;
+    LogicalCachedExtentRef lextent;
+  };
+
+public:
+  ool_record_t(size_t block_size) : block_size(block_size) {}
+  record_size_t get_encoded_record_length() {
+    return crimson::os::seastore::get_encoded_record_length(record, block_size);
+  }
+  size_t get_wouldbe_encoded_record_length(LogicalCachedExtentRef& extent) {
+    auto raw_mdlength = get_encoded_record_raw_mdlength(record, block_size);
+    auto wouldbe_mdlength = p2roundup(
+      raw_mdlength + ceph::encoded_sizeof_bounded<extent_info_t>(),
+      block_size);
+    return wouldbe_mdlength + extent_buf_len + extent->get_bptr().length();
+  }
+  ceph::bufferlist encode(segment_id_t segment, segment_nonce_t nonce) {
+    assert(extents.size() == record.extents.size());
+    auto rsize = get_encoded_record_length();
+    segment_off_t extent_offset = base + rsize.mdlength;
+    for (auto& extent : extents) {
+      extent.set_ool_paddr(
+        {segment, extent_offset});
+      extent_offset += extent.get_bptr().length();
+    }
+    assert(extent_offset == (segment_off_t)(base + rsize.mdlength + rsize.dlength));
+    return encode_record(rsize, std::move(record), block_size, base, nonce);
+  }
+  void add_extent(LogicalCachedExtentRef& extent) {
+    extents.emplace_back(extent);
+    ceph::bufferlist bl;
+    bl.append(extent->get_bptr());
+    record.extents.emplace_back(extent_t{
+      extent->get_type(),
+      extent->get_laddr(),
+      std::move(bl)});
+    extent_buf_len += extent->get_bptr().length();
+  }
+  std::vector<OolExtent>& get_extents() {
+    return extents;
+  }
+  void set_base(segment_off_t b) {
+    base = b;
+  }
+  segment_off_t get_base() {
+    return base;
+  }
+  void clear() {
+    record.extents.clear();
+    extents.clear();
+    assert(!record.deltas.size());
+    extent_buf_len = 0;
+    base = MAX_SEG_OFF;
+  }
+  uint64_t get_num_extents() {
+    return extents.size();
+  }
+private:
+  std::vector<OolExtent> extents;
+  record_t record;
+  size_t block_size;
+  segment_off_t extent_buf_len = 0;
+  segment_off_t base = MAX_SEG_OFF;
+};
+
+/**
+ * ExtentOolWriter
+ *
+ * Interface through which final write to ool segment is performed.
+ */
+class ExtentOolWriter {
+public:
+  using write_iertr = trans_iertr<crimson::errorator<
+    crimson::ct_error::input_output_error, // media error or corruption
+    crimson::ct_error::invarg,             // if offset is < write pointer or misaligned
+    crimson::ct_error::ebadf,              // segment closed
+    crimson::ct_error::enospc              // write exceeds segment size
+    >>;
+
+  using stop_ertr = Segment::close_ertr;
+  virtual stop_ertr::future<> stop() = 0;
+  virtual write_iertr::future<> write(
+    Transaction& t,
+    std::list<LogicalCachedExtentRef>& extent) = 0;
+  virtual ~ExtentOolWriter() {}
+};
+
+/**
+ * ExtentAllocator
+ *
+ * Handles allocating ool extents from a specific family of targets.
+ */
+class ExtentAllocator {
+public:
+  using alloc_paddr_iertr = trans_iertr<crimson::errorator<
+    crimson::ct_error::input_output_error, // media error or corruption
+    crimson::ct_error::invarg,             // if offset is < write pointer or misaligned
+    crimson::ct_error::ebadf,              // segment closed
+    crimson::ct_error::enospc              // write exceeds segment size
+    >>;
+
+  virtual alloc_paddr_iertr::future<> alloc_ool_extents_paddr(
+    Transaction& t,
+    std::list<LogicalCachedExtentRef>&) = 0;
+
+  using stop_ertr = ExtentOolWriter::stop_ertr;
+  virtual stop_ertr::future<> stop() = 0;
+  virtual ~ExtentAllocator() {};
+};
+using ExtentAllocatorRef = std::unique_ptr<ExtentAllocator>;
+
+struct open_segment_wrapper_t : public boost::intrusive_ref_counter<
+  open_segment_wrapper_t,
+  boost::thread_unsafe_counter> {
+  SegmentRef segment;
+  std::list<seastar::future<>> inflight_writes;
+  bool outdated = false;
+};
+
+using open_segment_wrapper_ref =
+  boost::intrusive_ptr<open_segment_wrapper_t>;
+
+/**
+ * SegmentedAllocator
+ *
+ * Handles out-of-line writes to a SegmentManager device (such as a ZNS device
+ * or conventional flash device where sequential writes are heavily preferred).
+ *
+ * Creates <seastore_init_rewrite_segments_per_device> Writer instances
+ * internally to round-robin writes.  Later work will partition allocations
+ * based on hint (age, presumably) among the created Writers.
+
+ * Each Writer makes use of SegmentProvider to obtain a new segment for writes
+ * as needed.
+ */
+class SegmentedAllocator : public ExtentAllocator {
+  class Writer : public ExtentOolWriter {
+  public:
+    Writer(
+      SegmentProvider& sp,
+      SegmentManager& sm,
+      LBAManager& lba_manager,
+      Journal& journal,
+      Cache& cache)
+      : segment_provider(sp),
+        segment_manager(sm),
+        lba_manager(lba_manager),
+        journal(journal),
+        cache(cache)
+    {}
+    Writer(Writer &&) = default;
+
+    write_iertr::future<> write(
+      Transaction& t,
+      std::list<LogicalCachedExtentRef>& extent) final;
+    stop_ertr::future<> stop() final {
+      return writer_guard.close().then([this] {
+        return crimson::do_for_each(open_segments, [](auto& seg_wrapper) {
+          return seg_wrapper->segment->close();
+        });
+      });
+    }
+  private:
+    using update_lba_mapping_iertr = LBAManager::update_le_mapping_iertr;
+    using finish_record_iertr = update_lba_mapping_iertr;
+    using finish_record_ret = finish_record_iertr::future<>;
+    finish_record_ret finish_write(
+      Transaction& t,
+      ool_record_t& record);
+    segment_off_t fake_paddr_off = 0;
+    bool _needs_roll(segment_off_t length) const;
+
+    write_iertr::future<> _write(
+      Transaction& t,
+      ool_record_t& record);
+
+    using roll_segment_ertr = crimson::errorator<
+      crimson::ct_error::input_output_error>;
+    roll_segment_ertr::future<> roll_segment(bool);
+
+    using init_segment_ertr = crimson::errorator<
+      crimson::ct_error::input_output_error>;
+    init_segment_ertr::future<> init_segment(Segment& segment);
+
+    void add_extent_to_write(
+      ool_record_t&,
+      LogicalCachedExtentRef& extent);
+
+    SegmentProvider& segment_provider;
+    SegmentManager& segment_manager;
+    open_segment_wrapper_ref current_segment;
+    std::list<open_segment_wrapper_ref> open_segments;
+    segment_off_t allocated_to = 0;
+    LBAManager& lba_manager;
+    Journal& journal;
+    crimson::condition_variable segment_rotation_guard;
+    seastar::gate writer_guard;
+    bool rolling_segment = false;
+    Cache& cache;
+  };
+public:
+  SegmentedAllocator(
+    SegmentProvider& sp,
+    SegmentManager& sm,
+    LBAManager& lba_manager,
+    Journal& journal,
+    Cache& cache);
+
+  Writer &get_writer(ool_placement_hint_t hint) {
+    return writers[std::rand() % writers.size()];
+  }
+
+  alloc_paddr_iertr::future<> alloc_ool_extents_paddr(
+    Transaction& t,
+    std::list<LogicalCachedExtentRef>& extents) final {
+    return seastar::do_with(
+      std::map<Writer*, std::list<LogicalCachedExtentRef>>(),
+      [this, extents=std::move(extents), &t](auto& alloc_map) {
+      for (auto& extent : extents) {
+        auto writer = &(get_writer(extent->hint));
+        alloc_map[writer].emplace_back(extent);
+      }
+      return trans_intr::do_for_each(alloc_map, [&t](auto& p) {
+        auto writer = p.first;
+        auto& extents_to_pesist = p.second;
+        return writer->write(t, extents_to_pesist);
+      });
+    });
+  }
+
+  stop_ertr::future<> stop() {
+    return crimson::do_for_each(writers, [](auto& writer) {
+      return writer.stop();
+    });
+  }
+private:
+  SegmentProvider& segment_provider;
+  SegmentManager& segment_manager;
+  std::vector<Writer> writers;
+  LBAManager& lba_manager;
+  Journal& journal;
+  Cache& cache;
+};
+
+class ExtentPlacementManager {
+public:
+  ExtentPlacementManager(
+    Cache& cache,
+    LBAManager& lba_manager
+  ) : cache(cache), lba_manager(lba_manager) {}
+
+  /**
+   * alloc_new_extent_by_type
+   *
+   * Create a new extent, CachedExtent::poffset may not be set
+   * if a delayed allocation is needed.
+   */
+  CachedExtentRef alloc_new_extent_by_type(
+    Transaction& t,
+    extent_types_t type,
+    segment_off_t length,
+    ool_placement_hint_t hint = ool_placement_hint_t::NONE) {
+    // only logical extents should fall in this path
+    assert(is_logical_type(type));
+    auto dtype = get_allocator_type(hint);
+    CachedExtentRef extent;
+    // for extents that would be stored in NVDIMM/PMEM, no delayed
+    // allocation is needed
+    if (need_delayed_allocation(dtype)) {
+      // set a unique temperary paddr, this is necessary because
+      // transaction's write_set is indexed by paddr
+      extent = cache.alloc_new_extent_by_type(t, type, length, true);
+    } else {
+      extent = cache.alloc_new_extent_by_type(t, type, length);
+    }
+    extent->backend_type = dtype;
+    extent->hint = hint;
+    return extent;
+  }
+
+  /**
+   * delayed_alloc_or_ool_write
+   *
+   * Performs any outstanding ool writes and updates pending lba updates
+   * accordingly
+   */
+  using alloc_paddr_iertr = ExtentOolWriter::write_iertr;
+  alloc_paddr_iertr::future<> delayed_alloc_or_ool_write(
+    Transaction& t) {
+    return seastar::do_with(
+      std::map<ExtentAllocator*, std::list<LogicalCachedExtentRef>>(),
+      std::list<std::pair<paddr_t, LogicalCachedExtentRef>>(),
+      [this, &t](auto& alloc_map, auto& inline_list) mutable {
+      auto& alloc_list = t.get_delayed_alloc_list();
+      for (auto& extent : alloc_list) {
+        // extents may be invalidated
+        if (!extent->is_valid()) {
+          continue;
+        }
+        if (should_be_inline(extent)) {
+          auto old_addr = extent->get_paddr();
+          cache.mark_delayed_extent_inline(t, extent);
+          inline_list.emplace_back(old_addr, extent);
+          continue;
+        }
+        auto& allocator_ptr = get_allocator(extent->backend_type, extent->hint);
+        alloc_map[allocator_ptr.get()].emplace_back(extent);
+      }
+      return trans_intr::do_for_each(alloc_map, [&t](auto& p) {
+        auto allocator = p.first;
+        auto& extents = p.second;
+        return allocator->alloc_ool_extents_paddr(t, extents);
+      }).si_then([&inline_list, this, &t] {
+        return trans_intr::do_for_each(inline_list, [this, &t](auto& p) {
+          auto old_addr = p.first;
+          auto& extent = p.second;
+          return lba_manager.update_mapping(
+            t,
+            extent->get_laddr(),
+            old_addr,
+            extent->get_paddr());
+        });
+      });
+    });
+  }
+
+  void add_allocator(device_type_t type, ExtentAllocatorRef&& allocator) {
+    allocators[type].emplace_back(std::move(allocator));
+  }
+
+private:
+  device_type_t get_allocator_type(ool_placement_hint_t hint) {
+    return device_type_t::SEGMENTED;
+  }
+
+  bool should_be_inline(LogicalCachedExtentRef& extent) {
+    return (std::rand() % 2) == 0;
+  }
+
+  ExtentAllocatorRef& get_allocator(
+    device_type_t type,
+    ool_placement_hint_t hint) {
+    auto& devices = allocators[type];
+    return devices[std::rand() % devices.size()];
+  }
+
+  Cache& cache;
+  LBAManager& lba_manager;
+  std::map<device_type_t, std::vector<ExtentAllocatorRef>> allocators;
+};
+using ExtentPlacementManagerRef = std::unique_ptr<ExtentPlacementManager>;
+
+}
index 057b001e3a7da21d1e81ce888e6be18dff78d5a1..e7552bd13161fefb11c69a489fcc8a8c2033f214 100644 (file)
@@ -22,6 +22,7 @@
 namespace crimson::os::seastore {
 
 class SegmentProvider;
+class SegmentedAllocator;
 
 /**
  * Manages stream of atomically written records to a SegmentManager.
@@ -315,6 +316,7 @@ private:
   );
 
   extent_len_t max_record_length() const;
+  friend class crimson::os::seastore::SegmentedAllocator;
 };
 using JournalRef = std::unique_ptr<Journal>;
 
index ab079c89061da6ea10799774a5d76304f2bd459c..82871df3d9064e9818ab5504006cba5639f87638 100644 (file)
@@ -196,4 +196,8 @@ ceph::bufferlist encode_record(
   return bl;
 }
 
+bool need_delayed_allocation(device_type_t type) {
+  return type <= RANDOM_BLOCK;
+}
+
 }
index cc9b68ec0df00d3dcd7af574a63b523b2690725c..704f2bcea2fd74fa5fbd06c8fe266b5c6c72722f 100644 (file)
@@ -238,6 +238,21 @@ using objaddr_t = uint32_t;
 constexpr objaddr_t OBJ_ADDR_MAX = std::numeric_limits<objaddr_t>::max();
 constexpr objaddr_t OBJ_ADDR_NULL = OBJ_ADDR_MAX - 1;
 
+enum class ool_placement_hint_t {
+  NONE,     /// Denotes empty hint
+  NUM_HINTS /// Constant for number of hints
+};
+
+enum device_type_t {
+  NONE = 0,
+  SEGMENTED, // i.e. Hard_Disk, SATA_SSD, NAND_NVME
+  RANDOM_BLOCK, // i.e. RANDOM_BD
+  PMEM, // i.e. NVDIMM, PMEM
+  NUM_TYPES
+};
+
+bool need_delayed_allocation(device_type_t type);
+
 /* Monotonically increasing identifier for the location of a
  * journal_record.
  */
index 2bde9b11b8e2aba6df2cc2ee2a0ef58f1996f84f..7fda438bba8e2410e467d3f5ccb67ccd4188d85b 100644 (file)
@@ -83,11 +83,35 @@ public:
     ceph_assert(inserted);
   }
 
-  void add_fresh_extent(CachedExtentRef ref) {
+  void add_fresh_extent(
+    CachedExtentRef ref,
+    bool delayed = false) {
     ceph_assert(!is_weak());
-    fresh_block_list.push_back(ref);
+    if (delayed) {
+      assert(ref->is_logical());
+      delayed_alloc_list.emplace_back(ref->cast<LogicalCachedExtent>());
+      delayed_set.insert(*ref);
+    } else {
+      ref->set_paddr(make_record_relative_paddr(offset));
+      offset += ref->get_length();
+      fresh_block_list.push_back(ref);
+      write_set.insert(*ref);
+    }
+  }
+
+  void mark_delayed_extent_inline(LogicalCachedExtentRef& ref) {
     ref->set_paddr(make_record_relative_paddr(offset));
     offset += ref->get_length();
+    delayed_set.erase(*ref);
+    fresh_block_list.push_back(ref);
+    write_set.insert(*ref);
+  }
+
+  void mark_delayed_extent_ool(LogicalCachedExtentRef& ref) {
+    assert(!ref->get_paddr().is_null());
+    assert(!ref->is_inline());
+    delayed_set.erase(*ref);
+    fresh_block_list.push_back(ref);
     write_set.insert(*ref);
   }
 
@@ -133,6 +157,10 @@ public:
     return fresh_block_list;
   }
 
+  auto& get_delayed_alloc_list() {
+    return delayed_alloc_list;
+  }
+
   const auto &get_mutated_block_list() {
     return mutated_block_list;
   }
@@ -200,6 +228,7 @@ public:
     write_set.clear();
     fresh_block_list.clear();
     mutated_block_list.clear();
+    delayed_alloc_list.clear();
     retired_set.clear();
     onode_tree_stats = {};
     lba_tree_stats = {};
@@ -251,6 +280,9 @@ private:
 
   std::list<CachedExtentRef> fresh_block_list;   ///< list of fresh blocks
   std::list<CachedExtentRef> mutated_block_list; ///< list of mutated blocks
+  ///< list of ool extents whose addresses are not
+  //   determine until transaction submission
+  std::list<LogicalCachedExtentRef> delayed_alloc_list;
 
   pextent_set_t retired_set; ///< list of extents mutated by this transaction