]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore: add collection ordering
authorSamuel Just <sjust@redhat.com>
Tue, 29 Jun 2021 17:06:45 +0000 (10:06 -0700)
committerSamuel Just <sjust@redhat.com>
Tue, 29 Jun 2021 17:41:15 +0000 (10:41 -0700)
Adds a mutex to SeastoreCollection which ensures that transactions are
ordered on a per-collection basis.  Future optimizations could enable
the transaction construction phase to be pipelined for a single collection,
but would require correctly handling the case where a more recently
submitted transaction encounters a conflict.

Fixes: https://tracker.ceph.com/issues/51358
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/os/seastore/ordering_handle.h
src/crimson/os/seastore/seastore.cc
src/crimson/os/seastore/seastore.h
src/crimson/os/seastore/transaction_manager.cc

index 7afb22bb7d51b27e601552cfe430bf47c816562e..6e519114ca6de42b2d9e49b8294c0890ba2db672 100644 (file)
@@ -3,6 +3,8 @@
 
 #pragma once
 
+#include <seastar/core/shared_mutex.hh>
+
 #include "crimson/common/operation.h"
 
 namespace crimson::os::seastore {
@@ -35,6 +37,27 @@ private:
 struct OrderingHandle {
   OperationRef op;
   PipelineHandle phase_handle;
+  seastar::shared_mutex *collection_ordering_lock = nullptr;
+
+  OrderingHandle(OperationRef &&op) : op(std::move(op)) {}
+  OrderingHandle(OrderingHandle &&other)
+    : op(std::move(other.op)), phase_handle(std::move(other.phase_handle)),
+      collection_ordering_lock(other.collection_ordering_lock) {
+    other.collection_ordering_lock = nullptr;
+  }
+
+  seastar::future<> take_collection_lock(seastar::shared_mutex &mutex) {
+    ceph_assert(!collection_ordering_lock);
+    collection_ordering_lock = &mutex;
+    return collection_ordering_lock->lock();
+  }
+
+  void maybe_release_collection_lock() {
+    if (collection_ordering_lock) {
+      collection_ordering_lock->unlock();
+      collection_ordering_lock = nullptr;
+    }
+  }
 
   template <typename T>
   seastar::future<> enter(T &t) {
@@ -48,16 +71,17 @@ struct OrderingHandle {
   seastar::future<> complete() {
     return phase_handle.complete();
   }
+
+  ~OrderingHandle() {
+    maybe_release_collection_lock();
+  }
 };
 
 inline OrderingHandle get_dummy_ordering_handle() {
-  return OrderingHandle{new PlaceholderOperation, {}};
+  return OrderingHandle{new PlaceholderOperation};
 }
 
 struct WritePipeline {
-  OrderedExclusivePhase wait_throttle{
-    "WritePipeline::wait_throttle_phase"
-  };
   OrderedExclusivePhase prepare{
     "WritePipeline::prepare_phase"
   };
index 20f36667dde908c5a3631371e9c255da04822566..8fad148b59a00b85edcd6ce94e4c5899b0793b04 100644 (file)
@@ -9,6 +9,8 @@
 #include <fmt/format.h>
 #include <fmt/ostream.h>
 
+#include <seastar/core/shared_mutex.hh>
+
 #include "common/safe_io.h"
 #include "os/Transaction.h"
 
@@ -42,13 +44,6 @@ SeaStore::SeaStore(
 
 SeaStore::~SeaStore() = default;
 
-class SeastoreCollection final : public FuturizedCollection {
-public:
-  template <typename... T>
-  SeastoreCollection(T&&... args) :
-    FuturizedCollection(std::forward<T>(args)...) {}
-};
-
 seastar::future<> SeaStore::stop()
 {
   return seastar::now();
@@ -599,15 +594,7 @@ seastar::future<> SeaStore::do_transaction(
   CollectionRef _ch,
   ceph::os::Transaction&& _t)
 {
-  /* TODO: add ordering to Collection
-   *
-   * TransactionManager::submit_transction will ensure that
-   * beginning at that point operations remain ordered through
-   * to the jorunal.  We still need a pipeline stage associated
-   * with each collection to ensure that this portion in
-   * SeaStore::do_transaction remains correctly ordered for operations
-   * submitted on the same collection. TODO
-   */
+  // repeat_with_internal_context ensures ordering via collection lock
   return repeat_with_internal_context(
     _ch,
     std::move(_t),
index c745daf13b3470e20b4e6c8276bcd1f651091724..682c5626f764e78888dea78305ab9252bd72ad4b 100644 (file)
 
 namespace crimson::os::seastore {
 
-class SeastoreCollection;
 class Onode;
 using OnodeRef = boost::intrusive_ptr<Onode>;
 class TransactionManager;
 
+class SeastoreCollection final : public FuturizedCollection {
+public:
+  template <typename... T>
+  SeastoreCollection(T&&... args) :
+    FuturizedCollection(std::forward<T>(args)...) {}
+
+  seastar::shared_mutex ordering_lock;
+};
+
 class SeaStore final : public FuturizedStore {
 public:
 
@@ -128,8 +136,10 @@ private:
 
     internal_context_t(
       CollectionRef ch,
-      ceph::os::Transaction &&_ext_transaction)
+      ceph::os::Transaction &&_ext_transaction,
+      TransactionRef &&transaction)
       : ch(ch), ext_transaction(std::move(_ext_transaction)),
+       transaction(std::move(transaction)),
        iter(ext_transaction.begin()) {}
 
     TransactionRef transaction;
@@ -158,15 +168,19 @@ private:
        transaction_manager->create_transaction()),
       std::forward<F>(f),
       [this](auto &ctx, auto &f) {
-       return repeat_eagain([&]() {
-         ctx.reset_preserve_handle(transaction_manager);
-         return std::invoke(f, ctx);
-       }).handle_error(
-         crimson::ct_error::eagain::pass_further{},
-         crimson::ct_error::all_same_way([&ctx](auto e) {
-           on_error(ctx.ext_transaction);
-         })
-       );
+       return ctx.transaction->get_handle().take_collection_lock(
+         static_cast<SeastoreCollection&>(*(ctx.ch)).ordering_lock
+       ).then([&, this] {
+         return repeat_eagain([&, this] {
+           ctx.reset_preserve_handle(transaction_manager);
+           return std::invoke(f, ctx);
+         }).handle_error(
+           crimson::ct_error::eagain::pass_further{},
+           crimson::ct_error::all_same_way([&ctx](auto e) {
+             on_error(ctx.ext_transaction);
+           })
+         );
+       });
       });
   }
 
index 5cdb498bc9f3848b50da3c350ba05ef9e8b6dc2e..8f34581eb4445ebe019f745e5dc86a3f1001f92d 100644 (file)
@@ -223,11 +223,8 @@ TransactionManager::submit_transaction(
 {
   LOG_PREFIX(TransactionManager::submit_transaction);
   DEBUGT("about to await throttle", t);
-  return trans_intr::make_interruptible(
-    t.get_handle().enter(write_pipeline.wait_throttle)
-  ).then_interruptible([this] {
-    return trans_intr::make_interruptible(segment_cleaner->await_hard_limits());
-  }).then_interruptible([this, &t]() {
+  return trans_intr::make_interruptible(segment_cleaner->await_hard_limits()
+  ).then_interruptible([this, &t]() {
     return submit_transaction_direct(t);
   });
 }
@@ -245,6 +242,8 @@ TransactionManager::submit_transaction_direct(
     auto record = cache->try_construct_record(tref);
     assert(record); // interruptible future would have already failed
 
+    tref.get_handle().maybe_release_collection_lock();
+
     DEBUGT("about to submit to journal", tref);
 
     return journal->submit_record(std::move(*record), tref.get_handle()