#pragma once
+#include <seastar/core/shared_mutex.hh>
+
#include "crimson/common/operation.h"
namespace crimson::os::seastore {
struct OrderingHandle {
OperationRef op;
PipelineHandle phase_handle;
+ seastar::shared_mutex *collection_ordering_lock = nullptr;
+
+ OrderingHandle(OperationRef &&op) : op(std::move(op)) {}
+ OrderingHandle(OrderingHandle &&other)
+ : op(std::move(other.op)), phase_handle(std::move(other.phase_handle)),
+ collection_ordering_lock(other.collection_ordering_lock) {
+ other.collection_ordering_lock = nullptr;
+ }
+
+ seastar::future<> take_collection_lock(seastar::shared_mutex &mutex) {
+ ceph_assert(!collection_ordering_lock);
+ collection_ordering_lock = &mutex;
+ return collection_ordering_lock->lock();
+ }
+
+ void maybe_release_collection_lock() {
+ if (collection_ordering_lock) {
+ collection_ordering_lock->unlock();
+ collection_ordering_lock = nullptr;
+ }
+ }
template <typename T>
seastar::future<> enter(T &t) {
seastar::future<> complete() {
return phase_handle.complete();
}
+
+ ~OrderingHandle() {
+ maybe_release_collection_lock();
+ }
};
inline OrderingHandle get_dummy_ordering_handle() {
- return OrderingHandle{new PlaceholderOperation, {}};
+ return OrderingHandle{new PlaceholderOperation};
}
struct WritePipeline {
- OrderedExclusivePhase wait_throttle{
- "WritePipeline::wait_throttle_phase"
- };
OrderedExclusivePhase prepare{
"WritePipeline::prepare_phase"
};
#include <fmt/format.h>
#include <fmt/ostream.h>
+#include <seastar/core/shared_mutex.hh>
+
#include "common/safe_io.h"
#include "os/Transaction.h"
SeaStore::~SeaStore() = default;
-class SeastoreCollection final : public FuturizedCollection {
-public:
- template <typename... T>
- SeastoreCollection(T&&... args) :
- FuturizedCollection(std::forward<T>(args)...) {}
-};
-
seastar::future<> SeaStore::stop()
{
return seastar::now();
CollectionRef _ch,
ceph::os::Transaction&& _t)
{
- /* TODO: add ordering to Collection
- *
- * TransactionManager::submit_transction will ensure that
- * beginning at that point operations remain ordered through
- * to the jorunal. We still need a pipeline stage associated
- * with each collection to ensure that this portion in
- * SeaStore::do_transaction remains correctly ordered for operations
- * submitted on the same collection. TODO
- */
+ // repeat_with_internal_context ensures ordering via collection lock
return repeat_with_internal_context(
_ch,
std::move(_t),
namespace crimson::os::seastore {
-class SeastoreCollection;
class Onode;
using OnodeRef = boost::intrusive_ptr<Onode>;
class TransactionManager;
+class SeastoreCollection final : public FuturizedCollection {
+public:
+ template <typename... T>
+ SeastoreCollection(T&&... args) :
+ FuturizedCollection(std::forward<T>(args)...) {}
+
+ seastar::shared_mutex ordering_lock;
+};
+
class SeaStore final : public FuturizedStore {
public:
internal_context_t(
CollectionRef ch,
- ceph::os::Transaction &&_ext_transaction)
+ ceph::os::Transaction &&_ext_transaction,
+ TransactionRef &&transaction)
: ch(ch), ext_transaction(std::move(_ext_transaction)),
+ transaction(std::move(transaction)),
iter(ext_transaction.begin()) {}
TransactionRef transaction;
transaction_manager->create_transaction()),
std::forward<F>(f),
[this](auto &ctx, auto &f) {
- return repeat_eagain([&]() {
- ctx.reset_preserve_handle(transaction_manager);
- return std::invoke(f, ctx);
- }).handle_error(
- crimson::ct_error::eagain::pass_further{},
- crimson::ct_error::all_same_way([&ctx](auto e) {
- on_error(ctx.ext_transaction);
- })
- );
+ return ctx.transaction->get_handle().take_collection_lock(
+ static_cast<SeastoreCollection&>(*(ctx.ch)).ordering_lock
+ ).then([&, this] {
+ return repeat_eagain([&, this] {
+ ctx.reset_preserve_handle(transaction_manager);
+ return std::invoke(f, ctx);
+ }).handle_error(
+ crimson::ct_error::eagain::pass_further{},
+ crimson::ct_error::all_same_way([&ctx](auto e) {
+ on_error(ctx.ext_transaction);
+ })
+ );
+ });
});
}
{
LOG_PREFIX(TransactionManager::submit_transaction);
DEBUGT("about to await throttle", t);
- return trans_intr::make_interruptible(
- t.get_handle().enter(write_pipeline.wait_throttle)
- ).then_interruptible([this] {
- return trans_intr::make_interruptible(segment_cleaner->await_hard_limits());
- }).then_interruptible([this, &t]() {
+ return trans_intr::make_interruptible(segment_cleaner->await_hard_limits()
+ ).then_interruptible([this, &t]() {
return submit_transaction_direct(t);
});
}
auto record = cache->try_construct_record(tref);
assert(record); // interruptible future would have already failed
+ tref.get_handle().maybe_release_collection_lock();
+
DEBUGT("about to submit to journal", tref);
return journal->submit_record(std::move(*record), tref.get_handle()