#pragma once
 
+#include <seastar/core/shared_mutex.hh>
+
 #include "crimson/common/operation.h"
 
 namespace crimson::os::seastore {
 struct OrderingHandle {
   OperationRef op;
   PipelineHandle phase_handle;
+  seastar::shared_mutex *collection_ordering_lock = nullptr;
+
+  OrderingHandle(OperationRef &&op) : op(std::move(op)) {}
+  OrderingHandle(OrderingHandle &&other)
+    : op(std::move(other.op)), phase_handle(std::move(other.phase_handle)),
+      collection_ordering_lock(other.collection_ordering_lock) {
+    other.collection_ordering_lock = nullptr;
+  }
+
+  seastar::future<> take_collection_lock(seastar::shared_mutex &mutex) {
+    ceph_assert(!collection_ordering_lock);
+    collection_ordering_lock = &mutex;
+    return collection_ordering_lock->lock();
+  }
+
+  void maybe_release_collection_lock() {
+    if (collection_ordering_lock) {
+      collection_ordering_lock->unlock();
+      collection_ordering_lock = nullptr;
+    }
+  }
 
   template <typename T>
   seastar::future<> enter(T &t) {
   seastar::future<> complete() {
     return phase_handle.complete();
   }
+
+  ~OrderingHandle() {
+    maybe_release_collection_lock();
+  }
 };
 
 inline OrderingHandle get_dummy_ordering_handle() {
-  return OrderingHandle{new PlaceholderOperation, {}};
+  return OrderingHandle{new PlaceholderOperation};
 }
 
 struct WritePipeline {
-  OrderedExclusivePhase wait_throttle{
-    "WritePipeline::wait_throttle_phase"
-  };
   OrderedExclusivePhase prepare{
     "WritePipeline::prepare_phase"
   };
 
 #include <fmt/format.h>
 #include <fmt/ostream.h>
 
+#include <seastar/core/shared_mutex.hh>
+
 #include "common/safe_io.h"
 #include "os/Transaction.h"
 
 
 SeaStore::~SeaStore() = default;
 
-class SeastoreCollection final : public FuturizedCollection {
-public:
-  template <typename... T>
-  SeastoreCollection(T&&... args) :
-    FuturizedCollection(std::forward<T>(args)...) {}
-};
-
 seastar::future<> SeaStore::stop()
 {
   return seastar::now();
   CollectionRef _ch,
   ceph::os::Transaction&& _t)
 {
-  /* TODO: add ordering to Collection
-   *
-   * TransactionManager::submit_transction will ensure that
-   * beginning at that point operations remain ordered through
-   * to the jorunal.  We still need a pipeline stage associated
-   * with each collection to ensure that this portion in
-   * SeaStore::do_transaction remains correctly ordered for operations
-   * submitted on the same collection. TODO
-   */
+  // repeat_with_internal_context ensures ordering via collection lock
   return repeat_with_internal_context(
     _ch,
     std::move(_t),
 
 
 namespace crimson::os::seastore {
 
-class SeastoreCollection;
 class Onode;
 using OnodeRef = boost::intrusive_ptr<Onode>;
 class TransactionManager;
 
+class SeastoreCollection final : public FuturizedCollection {
+public:
+  template <typename... T>
+  SeastoreCollection(T&&... args) :
+    FuturizedCollection(std::forward<T>(args)...) {}
+
+  seastar::shared_mutex ordering_lock;
+};
+
 class SeaStore final : public FuturizedStore {
 public:
 
 
     internal_context_t(
       CollectionRef ch,
-      ceph::os::Transaction &&_ext_transaction)
+      ceph::os::Transaction &&_ext_transaction,
+      TransactionRef &&transaction)
       : ch(ch), ext_transaction(std::move(_ext_transaction)),
+       transaction(std::move(transaction)),
        iter(ext_transaction.begin()) {}
 
     TransactionRef transaction;
        transaction_manager->create_transaction()),
       std::forward<F>(f),
       [this](auto &ctx, auto &f) {
-       return repeat_eagain([&]() {
-         ctx.reset_preserve_handle(transaction_manager);
-         return std::invoke(f, ctx);
-       }).handle_error(
-         crimson::ct_error::eagain::pass_further{},
-         crimson::ct_error::all_same_way([&ctx](auto e) {
-           on_error(ctx.ext_transaction);
-         })
-       );
+       return ctx.transaction->get_handle().take_collection_lock(
+         static_cast<SeastoreCollection&>(*(ctx.ch)).ordering_lock
+       ).then([&, this] {
+         return repeat_eagain([&, this] {
+           ctx.reset_preserve_handle(transaction_manager);
+           return std::invoke(f, ctx);
+         }).handle_error(
+           crimson::ct_error::eagain::pass_further{},
+           crimson::ct_error::all_same_way([&ctx](auto e) {
+             on_error(ctx.ext_transaction);
+           })
+         );
+       });
       });
   }
 
 
 {
   LOG_PREFIX(TransactionManager::submit_transaction);
   DEBUGT("about to await throttle", t);
-  return trans_intr::make_interruptible(
-    t.get_handle().enter(write_pipeline.wait_throttle)
-  ).then_interruptible([this] {
-    return trans_intr::make_interruptible(segment_cleaner->await_hard_limits());
-  }).then_interruptible([this, &t]() {
+  return trans_intr::make_interruptible(segment_cleaner->await_hard_limits()
+  ).then_interruptible([this, &t]() {
     return submit_transaction_direct(t);
   });
 }
     auto record = cache->try_construct_record(tref);
     assert(record); // interruptible future would have already failed
 
+    tref.get_handle().maybe_release_collection_lock();
+
     DEBUGT("about to submit to journal", tref);
 
     return journal->submit_record(std::move(*record), tref.get_handle()