]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: add basic pipeline phases to TransactionManager
authorSamuel Just <sjust@redhat.com>
Mon, 11 Jan 2021 23:03:31 +0000 (15:03 -0800)
committerSamuel Just <sjust@redhat.com>
Mon, 1 Feb 2021 21:29:19 +0000 (13:29 -0800)
We need to ensure that the metadata preperation and completions happen
in order.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/ordering_handle.h [new file with mode: 0644]
src/crimson/os/seastore/transaction.h
src/crimson/os/seastore/transaction_manager.cc
src/crimson/os/seastore/transaction_manager.h
src/test/crimson/seastore/test_btree_lba_manager.cc
src/test/crimson/seastore/test_seastore_journal.cc

index 21b918e5474aff6b0fc7f2e7d2075deb224f044c..a866058c5a63ca7775d5fd7fca0c3ac8b9c14c8c 100644 (file)
@@ -178,7 +178,8 @@ Journal::read_validate_data_ret Journal::read_validate_data(
 
 Journal::write_record_ret Journal::write_record(
   record_size_t rsize,
-  record_t &&record)
+  record_t &&record,
+  OrderingHandle &handle)
 {
   ceph::bufferlist to_write = encode_record(
     rsize, std::move(record));
@@ -190,10 +191,20 @@ Journal::write_record_ret Journal::write_record(
     rsize.mdlength,
     rsize.dlength,
     target);
-  return current_journal_segment->write(target, to_write).handle_error(
-    write_record_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in Journal::write_record"
+  // Start write under the current exclusive stage, but wait for it
+  // in the device_submission concurrent stage to permit multiple
+  // overlapping writes.
+  auto write_fut = current_journal_segment->write(target, to_write);
+  return handle.enter(write_pipeline->device_submission
+  ).then([write_fut = std::move(write_fut)]() mutable {
+    return std::move(write_fut
+    ).handle_error(
+      write_record_ertr::pass_further{},
+      crimson::ct_error::assert_all{
+       "Invalid error in Journal::write_record"
+      }
+    ).safe_then([this, &handle] {
+      return handle.enter(write_pipeline->finalize);
     }).safe_then([this, target] {
       committed_to = target;
       return write_record_ret(
@@ -202,6 +213,7 @@ Journal::write_record_ret Journal::write_record(
          current_journal_segment->get_segment_id(),
          target});
     });
+  });
 }
 
 Journal::record_size_t Journal::get_encoded_record_length(
index 7424d78b31f14b810f34021dcae6261f9a1e5d4a..ecdd64d316fc5ceeec070f13df7c74fa1807432e 100644 (file)
@@ -14,6 +14,7 @@
 #include "include/denc.h"
 
 #include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/ordering_handle.h"
 #include "crimson/os/seastore/seastore_types.h"
 #include "crimson/osd/exceptions.h"
 
@@ -170,7 +171,11 @@ public:
   using submit_record_ret = submit_record_ertr::future<
     std::pair<paddr_t, journal_seq_t>
     >;
-  submit_record_ret submit_record(record_t &&record) {
+  submit_record_ret submit_record(
+    record_t &&record,
+    OrderingHandle &handle
+  ) {
+    assert(write_pipeline);
     auto rsize = get_encoded_record_length(record);
     auto total = rsize.mdlength + rsize.dlength;
     if (total > max_record_length) {
@@ -180,8 +185,10 @@ public:
       ? roll_journal_segment().safe_then([](auto){})
       : roll_journal_segment_ertr::now();
     return roll.safe_then(
-      [this, rsize, record=std::move(record)]() mutable {
-       return write_record(rsize, std::move(record)
+      [this, rsize, record=std::move(record), &handle]() mutable {
+       return write_record(
+         rsize, std::move(record),
+         handle
        ).safe_then([this, rsize](auto addr) {
          return std::make_pair(
            addr.add_offset(rsize.mdlength),
@@ -223,6 +230,9 @@ public:
     extent_len_t bytes_to_read
   );
 
+  void set_write_pipeline(WritePipeline *_write_pipeline) {
+    write_pipeline = _write_pipeline;
+  }
 
 private:
   const extent_len_t block_size;
@@ -238,6 +248,8 @@ private:
   segment_off_t written_to = 0;
   segment_off_t committed_to = 0;
 
+  WritePipeline *write_pipeline = nullptr;
+
   journal_seq_t get_journal_seq(paddr_t addr) {
     return journal_seq_t{next_journal_segment_seq-1, addr};
   }
@@ -289,7 +301,8 @@ private:
   using write_record_ret = write_record_ertr::future<paddr_t>;
   write_record_ret write_record(
     record_size_t rsize,
-    record_t &&record);
+    record_t &&record,
+    OrderingHandle &handle);
 
   /// close current segment and initialize next one
   using roll_journal_segment_ertr = crimson::errorator<
diff --git a/src/crimson/os/seastore/ordering_handle.h b/src/crimson/os/seastore/ordering_handle.h
new file mode 100644 (file)
index 0000000..0f6a77a
--- /dev/null
@@ -0,0 +1,69 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/common/operation.h"
+
+namespace crimson::os::seastore {
+
+/**
+ * PlaceholderOperation
+ *
+ * Once seastore is more complete, I expect to update the externally
+ * facing interfaces to permit passing the osd level operation through.
+ * Until then (and for tests likely permanently) we'll use this unregistered
+ * placeholder for the pipeline phases necessary for journal correctness.
+ */
+class PlaceholderOperation : public Operation {
+public:
+  using IRef = boost::intrusive_ptr<PlaceholderOperation>;
+
+  unsigned get_type() const final {
+    return 0;
+  }
+
+  const char *get_type_name() const final {
+    return "crimson::os::seastore::PlaceholderOperation";
+  }
+
+private:
+  void dump_detail(ceph::Formatter *f) const final {}
+  void print(std::ostream &) const final {}
+};
+
+struct OrderingHandle {
+  OperationRef op;
+  PipelineHandle phase_handle;
+
+  template <typename T>
+  seastar::future<> enter(T &t) {
+    return op->with_blocking_future(phase_handle.enter(t));
+  }
+
+  void exit() {
+    return phase_handle.exit();
+  }
+
+  seastar::future<> complete() {
+    return phase_handle.complete();
+  }
+};
+
+inline OrderingHandle get_dummy_ordering_handle() {
+  return OrderingHandle{new PlaceholderOperation, {}};
+}
+
+struct WritePipeline {
+  OrderedExclusivePhase prepare{
+    "TransactionManager::prepare_phase"
+  };
+  OrderedConcurrentPhase device_submission{
+    "TransactionManager::journal_phase"
+  };
+  OrderedExclusivePhase finalize{
+    "TransactionManager::finalize_phase"
+  };
+};
+
+}
index e189d1d32da03dfb550358d8d1b532ebf13835d2..a2ad59825ccac3a69d6226c3260c9b6145e94543 100644 (file)
@@ -5,6 +5,7 @@
 
 #include <iostream>
 
+#include "crimson/os/seastore/ordering_handle.h"
 #include "crimson/os/seastore/seastore_types.h"
 #include "crimson/os/seastore/cached_extent.h"
 #include "crimson/os/seastore/root_block.h"
@@ -18,6 +19,8 @@ namespace crimson::os::seastore {
  */
 class Transaction {
 public:
+  OrderingHandle handle;
+
   using Ref = std::unique_ptr<Transaction>;
   enum class get_extent_ret {
     PRESENT,
@@ -130,16 +133,33 @@ private:
   ///< if != NULL_SEG_ID, release this segment after completion
   segment_id_t to_release = NULL_SEG_ID;
 
-  Transaction(bool weak) : weak(weak) {}
+public:
+  Transaction(
+    OrderingHandle &&handle,
+    bool weak
+  ) : handle(std::move(handle)), weak(weak) {}
+
+  ~Transaction() {
+    for (auto i = write_set.begin();
+        i != write_set.end();) {
+      i->state = CachedExtent::extent_state_t::INVALID;
+      write_set.erase(*i++);
+    }
+  }
 };
 using TransactionRef = Transaction::Ref;
 
 inline TransactionRef make_transaction() {
-  return std::unique_ptr<Transaction>(new Transaction(false));
+  return std::make_unique<Transaction>(
+    get_dummy_ordering_handle(),
+    false
+  );
 }
 
 inline TransactionRef make_weak_transaction() {
-  return std::unique_ptr<Transaction>(new Transaction(true));
+  return std::make_unique<Transaction>(
+    get_dummy_ordering_handle(),
+    true);
 }
 
 }
index f4cb6da58885fa2a5c9e173ff317fc1de2aea8da..ae23b4ba0df541de646506f89e6374cb10b99093 100644 (file)
@@ -29,7 +29,9 @@ TransactionManager::TransactionManager(
     cache(cache),
     lba_manager(lba_manager),
     journal(journal)
-{}
+{
+  journal.set_write_pipeline(&write_pipeline);
+}
 
 TransactionManager::mkfs_ertr::future<> TransactionManager::mkfs()
 {
@@ -190,32 +192,41 @@ TransactionManager::submit_transaction(
   TransactionRef t)
 {
   logger().debug("TransactionManager::submit_transaction");
-  return segment_cleaner.do_immediate_work(*t
-  ).safe_then([this, t=std::move(t)]() mutable -> submit_transaction_ertr::future<> {
-    auto record = cache.try_construct_record(*t);
+  auto &tref = *t;
+  return tref.handle.enter(write_pipeline.prepare
+  ).then([this, &tref]() mutable {
+    return segment_cleaner.do_immediate_work(tref);
+  }).safe_then([this, &tref]() mutable
+              -> submit_transaction_ertr::future<> {
+    logger().debug("TransactionManager::submit_transaction after do_immediate");
+    auto record = cache.try_construct_record(tref);
     if (!record) {
       return crimson::ct_error::eagain::make();
     }
 
-    return journal.submit_record(std::move(*record)
-    ).safe_then([this, t=std::move(t)](auto p) mutable {
+    return journal.submit_record(std::move(*record), tref.handle
+    ).safe_then([this, &tref](auto p) mutable {
       auto [addr, journal_seq] = p;
       segment_cleaner.set_journal_head(journal_seq);
-      cache.complete_commit(*t, addr, journal_seq, &segment_cleaner);
-      lba_manager.complete_transaction(*t);
-      auto to_release = t->get_segment_to_release();
+      cache.complete_commit(tref, addr, journal_seq, &segment_cleaner);
+      lba_manager.complete_transaction(tref);
+      auto to_release = tref.get_segment_to_release();
       if (to_release != NULL_SEG_ID) {
        segment_cleaner.mark_segment_released(to_release);
        return segment_manager.release(to_release);
       } else {
        return SegmentManager::release_ertr::now();
       }
+    }).safe_then([&tref] {
+      return tref.handle.complete();
     }).handle_error(
       submit_transaction_ertr::pass_further{},
       crimson::ct_error::all_same_way([](auto e) {
        ceph_assert(0 == "Hit error submitting to journal");
       }));
-  });
+    }).finally([t=std::move(t)]() mutable {
+      t->handle.exit();
+    });
 }
 
 TransactionManager::get_next_dirty_extents_ret
index e105b87e1f68bc36d6fc71ef857bdf5bf7ba3a46..9e08e66437a58d8a1aa4d39de6fc25e916a71ed0 100644 (file)
@@ -332,6 +332,8 @@ private:
   Cache &cache;
   LBAManager &lba_manager;
   Journal &journal;
+
+  WritePipeline write_pipeline;
 };
 using TransactionManagerRef = std::unique_ptr<TransactionManager>;
 
index 60d5c3497ee71655d9cd62133183c37c61426b4d..544d074e81644f28bfcc43477820e63c0b363abc 100644 (file)
@@ -33,6 +33,8 @@ struct btree_lba_manager_test :
 
   const size_t block_size;
 
+  WritePipeline pipeline;
+
   btree_lba_manager_test()
     : segment_manager(segment_manager::create_test_ephemeral()),
       journal(*segment_manager),
@@ -41,6 +43,7 @@ struct btree_lba_manager_test :
       block_size(segment_manager->get_block_size())
   {
     journal.set_segment_provider(this);
+    journal.set_write_pipeline(&pipeline);
   }
 
   segment_id_t next = 0;
@@ -60,7 +63,7 @@ struct btree_lba_manager_test :
       ceph_assert(0 == "cannot fail");
     }
 
-    return journal.submit_record(std::move(*record)).safe_then(
+    return journal.submit_record(std::move(*record), t->handle).safe_then(
       [this, t=std::move(t)](auto p) mutable {
        auto [addr, seq] = p;
        cache.complete_commit(*t, addr, seq);
index 0bed505ffa1d1c7ed10b6e2146bedfbd3e84feba..fe824061552c74f44178f0993fd57a26a409f1d7 100644 (file)
@@ -64,6 +64,7 @@ struct record_validator_t {
 
 struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider {
   segment_manager::EphemeralSegmentManagerRef segment_manager;
+  WritePipeline pipeline;
   std::unique_ptr<Journal> journal;
 
   std::vector<record_validator_t> records;
@@ -91,6 +92,7 @@ struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider {
   seastar::future<> set_up_fut() final {
     journal.reset(new Journal(*segment_manager));
     journal->set_segment_provider(this);
+    journal->set_write_pipeline(&pipeline);
     return segment_manager->init(
     ).safe_then([this] {
       return journal->open_for_write();
@@ -107,6 +109,7 @@ struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider {
     ).safe_then([this, f=std::move(f)]() mutable {
       journal.reset(new Journal(*segment_manager));
       journal->set_segment_provider(this);
+      journal->set_write_pipeline(&pipeline);
       return journal->replay(std::forward<T>(std::move(f)));
     }).safe_then([this] {
       return journal->open_for_write();
@@ -151,7 +154,10 @@ struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider {
   auto submit_record(T&&... _record) {
     auto record{std::forward<T>(_record)...};
     records.push_back(record);
-    auto [addr, _] = journal->submit_record(std::move(record)).unsafe_get0();
+    OrderingHandle handle = get_dummy_ordering_handle();
+    auto [addr, _] = journal->submit_record(
+      std::move(record),
+      handle).unsafe_get0();
     records.back().record_final_offset = addr;
     return addr;
   }