Journal::write_record_ret Journal::write_record(
record_size_t rsize,
- record_t &&record)
+ record_t &&record,
+ OrderingHandle &handle)
{
ceph::bufferlist to_write = encode_record(
rsize, std::move(record));
rsize.mdlength,
rsize.dlength,
target);
- return current_journal_segment->write(target, to_write).handle_error(
- write_record_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in Journal::write_record"
+ // Start write under the current exclusive stage, but wait for it
+ // in the device_submission concurrent stage to permit multiple
+ // overlapping writes.
+ auto write_fut = current_journal_segment->write(target, to_write);
+ return handle.enter(write_pipeline->device_submission
+ ).then([write_fut = std::move(write_fut)]() mutable {
+ return std::move(write_fut
+ ).handle_error(
+ write_record_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in Journal::write_record"
+ }
+ ).safe_then([this, &handle] {
+ return handle.enter(write_pipeline->finalize);
}).safe_then([this, target] {
committed_to = target;
return write_record_ret(
current_journal_segment->get_segment_id(),
target});
});
+ });
}
Journal::record_size_t Journal::get_encoded_record_length(
#include "include/denc.h"
#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/ordering_handle.h"
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/osd/exceptions.h"
using submit_record_ret = submit_record_ertr::future<
std::pair<paddr_t, journal_seq_t>
>;
- submit_record_ret submit_record(record_t &&record) {
+ submit_record_ret submit_record(
+ record_t &&record,
+ OrderingHandle &handle
+ ) {
+ assert(write_pipeline);
auto rsize = get_encoded_record_length(record);
auto total = rsize.mdlength + rsize.dlength;
if (total > max_record_length) {
? roll_journal_segment().safe_then([](auto){})
: roll_journal_segment_ertr::now();
return roll.safe_then(
- [this, rsize, record=std::move(record)]() mutable {
- return write_record(rsize, std::move(record)
+ [this, rsize, record=std::move(record), &handle]() mutable {
+ return write_record(
+ rsize, std::move(record),
+ handle
).safe_then([this, rsize](auto addr) {
return std::make_pair(
addr.add_offset(rsize.mdlength),
extent_len_t bytes_to_read
);
+ void set_write_pipeline(WritePipeline *_write_pipeline) {
+ write_pipeline = _write_pipeline;
+ }
private:
const extent_len_t block_size;
segment_off_t written_to = 0;
segment_off_t committed_to = 0;
+ WritePipeline *write_pipeline = nullptr;
+
journal_seq_t get_journal_seq(paddr_t addr) {
return journal_seq_t{next_journal_segment_seq-1, addr};
}
using write_record_ret = write_record_ertr::future<paddr_t>;
write_record_ret write_record(
record_size_t rsize,
- record_t &&record);
+ record_t &&record,
+ OrderingHandle &handle);
/// close current segment and initialize next one
using roll_journal_segment_ertr = crimson::errorator<
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/common/operation.h"
+
+namespace crimson::os::seastore {
+
+/**
+ * PlaceholderOperation
+ *
+ * Once seastore is more complete, I expect to update the externally
+ * facing interfaces to permit passing the osd level operation through.
+ * Until then (and for tests likely permanently) we'll use this unregistered
+ * placeholder for the pipeline phases necessary for journal correctness.
+ */
+class PlaceholderOperation : public Operation {
+public:
+ using IRef = boost::intrusive_ptr<PlaceholderOperation>;
+
+ unsigned get_type() const final {
+ return 0;
+ }
+
+ const char *get_type_name() const final {
+ return "crimson::os::seastore::PlaceholderOperation";
+ }
+
+private:
+ void dump_detail(ceph::Formatter *f) const final {}
+ void print(std::ostream &) const final {}
+};
+
+struct OrderingHandle {
+ OperationRef op;
+ PipelineHandle phase_handle;
+
+ template <typename T>
+ seastar::future<> enter(T &t) {
+ return op->with_blocking_future(phase_handle.enter(t));
+ }
+
+ void exit() {
+ return phase_handle.exit();
+ }
+
+ seastar::future<> complete() {
+ return phase_handle.complete();
+ }
+};
+
+inline OrderingHandle get_dummy_ordering_handle() {
+ return OrderingHandle{new PlaceholderOperation, {}};
+}
+
+struct WritePipeline {
+ OrderedExclusivePhase prepare{
+ "TransactionManager::prepare_phase"
+ };
+ OrderedConcurrentPhase device_submission{
+ "TransactionManager::journal_phase"
+ };
+ OrderedExclusivePhase finalize{
+ "TransactionManager::finalize_phase"
+ };
+};
+
+}
#include <iostream>
+#include "crimson/os/seastore/ordering_handle.h"
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/os/seastore/cached_extent.h"
#include "crimson/os/seastore/root_block.h"
*/
class Transaction {
public:
+ OrderingHandle handle;
+
using Ref = std::unique_ptr<Transaction>;
enum class get_extent_ret {
PRESENT,
///< if != NULL_SEG_ID, release this segment after completion
segment_id_t to_release = NULL_SEG_ID;
- Transaction(bool weak) : weak(weak) {}
+public:
+ Transaction(
+ OrderingHandle &&handle,
+ bool weak
+ ) : handle(std::move(handle)), weak(weak) {}
+
+ ~Transaction() {
+ for (auto i = write_set.begin();
+ i != write_set.end();) {
+ i->state = CachedExtent::extent_state_t::INVALID;
+ write_set.erase(*i++);
+ }
+ }
};
using TransactionRef = Transaction::Ref;
inline TransactionRef make_transaction() {
- return std::unique_ptr<Transaction>(new Transaction(false));
+ return std::make_unique<Transaction>(
+ get_dummy_ordering_handle(),
+ false
+ );
}
inline TransactionRef make_weak_transaction() {
- return std::unique_ptr<Transaction>(new Transaction(true));
+ return std::make_unique<Transaction>(
+ get_dummy_ordering_handle(),
+ true);
}
}
cache(cache),
lba_manager(lba_manager),
journal(journal)
-{}
+{
+ journal.set_write_pipeline(&write_pipeline);
+}
TransactionManager::mkfs_ertr::future<> TransactionManager::mkfs()
{
TransactionRef t)
{
logger().debug("TransactionManager::submit_transaction");
- return segment_cleaner.do_immediate_work(*t
- ).safe_then([this, t=std::move(t)]() mutable -> submit_transaction_ertr::future<> {
- auto record = cache.try_construct_record(*t);
+ auto &tref = *t;
+ return tref.handle.enter(write_pipeline.prepare
+ ).then([this, &tref]() mutable {
+ return segment_cleaner.do_immediate_work(tref);
+ }).safe_then([this, &tref]() mutable
+ -> submit_transaction_ertr::future<> {
+ logger().debug("TransactionManager::submit_transaction after do_immediate");
+ auto record = cache.try_construct_record(tref);
if (!record) {
return crimson::ct_error::eagain::make();
}
- return journal.submit_record(std::move(*record)
- ).safe_then([this, t=std::move(t)](auto p) mutable {
+ return journal.submit_record(std::move(*record), tref.handle
+ ).safe_then([this, &tref](auto p) mutable {
auto [addr, journal_seq] = p;
segment_cleaner.set_journal_head(journal_seq);
- cache.complete_commit(*t, addr, journal_seq, &segment_cleaner);
- lba_manager.complete_transaction(*t);
- auto to_release = t->get_segment_to_release();
+ cache.complete_commit(tref, addr, journal_seq, &segment_cleaner);
+ lba_manager.complete_transaction(tref);
+ auto to_release = tref.get_segment_to_release();
if (to_release != NULL_SEG_ID) {
segment_cleaner.mark_segment_released(to_release);
return segment_manager.release(to_release);
} else {
return SegmentManager::release_ertr::now();
}
+ }).safe_then([&tref] {
+ return tref.handle.complete();
}).handle_error(
submit_transaction_ertr::pass_further{},
crimson::ct_error::all_same_way([](auto e) {
ceph_assert(0 == "Hit error submitting to journal");
}));
- });
+ }).finally([t=std::move(t)]() mutable {
+ t->handle.exit();
+ });
}
TransactionManager::get_next_dirty_extents_ret
Cache &cache;
LBAManager &lba_manager;
Journal &journal;
+
+ WritePipeline write_pipeline;
};
using TransactionManagerRef = std::unique_ptr<TransactionManager>;
const size_t block_size;
+ WritePipeline pipeline;
+
btree_lba_manager_test()
: segment_manager(segment_manager::create_test_ephemeral()),
journal(*segment_manager),
block_size(segment_manager->get_block_size())
{
journal.set_segment_provider(this);
+ journal.set_write_pipeline(&pipeline);
}
segment_id_t next = 0;
ceph_assert(0 == "cannot fail");
}
- return journal.submit_record(std::move(*record)).safe_then(
+ return journal.submit_record(std::move(*record), t->handle).safe_then(
[this, t=std::move(t)](auto p) mutable {
auto [addr, seq] = p;
cache.complete_commit(*t, addr, seq);
struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider {
segment_manager::EphemeralSegmentManagerRef segment_manager;
+ WritePipeline pipeline;
std::unique_ptr<Journal> journal;
std::vector<record_validator_t> records;
seastar::future<> set_up_fut() final {
journal.reset(new Journal(*segment_manager));
journal->set_segment_provider(this);
+ journal->set_write_pipeline(&pipeline);
return segment_manager->init(
).safe_then([this] {
return journal->open_for_write();
).safe_then([this, f=std::move(f)]() mutable {
journal.reset(new Journal(*segment_manager));
journal->set_segment_provider(this);
+ journal->set_write_pipeline(&pipeline);
return journal->replay(std::forward<T>(std::move(f)));
}).safe_then([this] {
return journal->open_for_write();
auto submit_record(T&&... _record) {
auto record{std::forward<T>(_record)...};
records.push_back(record);
- auto [addr, _] = journal->submit_record(std::move(record)).unsafe_get0();
+ OrderingHandle handle = get_dummy_ordering_handle();
+ auto [addr, _] = journal->submit_record(
+ std::move(record),
+ handle).unsafe_get0();
records.back().record_final_offset = addr;
return addr;
}