#include <boost/iterator/counting_iterator.hpp>
+#include "crimson/common/config_proxy.h"
#include "crimson/os/seastore/journal.h"
#include "include/intarith.h"
SegmentManager& segment_manager,
ExtentReader& scanner)
: journal_segment_manager(segment_manager),
+ record_submitter(crimson::common::get_conf<uint64_t>(
+ "seastore_journal_iodepth_limit"),
+ crimson::common::get_conf<uint64_t>(
+ "seastore_journal_batch_capacity"),
+ crimson::common::get_conf<Option::size_t>(
+ "seastore_journal_batch_flush_size"),
+ journal_segment_manager),
scanner(scanner)
{}
-Journal::write_record_ret Journal::write_record(
- record_size_t rsize,
- record_t &&record,
- OrderingHandle &handle)
-{
- ceph::bufferlist to_write = encode_record(
- rsize,
- std::move(record),
- journal_segment_manager.get_block_size(),
- journal_segment_manager.get_committed_to(),
- journal_segment_manager.get_nonce());
- // Start write under the current exclusive stage, but wait for it
- // in the device_submission concurrent stage to permit multiple
- // overlapping writes.
- auto write_fut = journal_segment_manager.write(to_write);
- return handle.enter(write_pipeline->device_submission
- ).then([write_fut = std::move(write_fut)]() mutable {
- return std::move(write_fut);
- }).safe_then([this, &handle, rsize](journal_seq_t write_start) {
- return handle.enter(write_pipeline->finalize
- ).then([this, write_start, rsize] {
- auto committed_to = write_start;
- committed_to.offset.offset += (rsize.mdlength + rsize.dlength);
- journal_segment_manager.mark_committed(committed_to);
- return write_start.offset;
- });
- });
-}
-
Journal::prep_replay_segments_fut
Journal::prep_replay_segments(
std::vector<std::pair<segment_id_t, segment_header_t>> segments)
});
}
+Journal::RecordBatch::add_pending_ret
+Journal::RecordBatch::add_pending(
+ record_t&& record,
+ const record_size_t& rsize)
+{
+ logger().debug(
+ "Journal::RecordBatch::add_pending: batches={}, write_size={}",
+ records.size() + 1,
+ get_encoded_length(rsize));
+ assert(state != state_t::SUBMITTING);
+ assert(can_batch(rsize));
+
+ auto record_start_offset = encoded_length;
+ records.push_back(std::move(record));
+ record_sizes.push_back(rsize);
+ auto new_encoded_length = get_encoded_length(rsize);
+ assert(new_encoded_length < MAX_SEG_OFF);
+ encoded_length = new_encoded_length;
+ if (state == state_t::EMPTY) {
+ assert(!io_promise.has_value());
+ io_promise = seastar::shared_promise<std::optional<journal_seq_t> >();
+ } else {
+ assert(io_promise.has_value());
+ }
+ state = state_t::PENDING;
+
+ return io_promise->get_shared_future(
+ ).then([record_start_offset
+ ](auto batch_write_start) -> add_pending_ret {
+ if (!batch_write_start.has_value()) {
+ return crimson::ct_error::input_output_error::make();
+ }
+ auto record_write_start = batch_write_start.value();
+ record_write_start.offset.offset += record_start_offset;
+ return add_pending_ret(
+ add_pending_ertr::ready_future_marker{},
+ record_write_start);
+ });
+}
+
+ceph::bufferlist Journal::RecordBatch::encode_records(
+ size_t block_size,
+ segment_off_t committed_to,
+ segment_nonce_t segment_nonce)
+{
+ logger().debug(
+ "Journal::RecordBatch::encode_records: batches={}, committed_to={}",
+ records.size(),
+ committed_to);
+ assert(state == state_t::PENDING);
+ assert(records.size());
+ assert(records.size() == record_sizes.size());
+ assert(io_promise.has_value());
+
+ state = state_t::SUBMITTING;
+ ceph::bufferlist bl;
+ std::size_t i = 0;
+ do {
+ auto record_bl = encode_record(
+ record_sizes[i],
+ std::move(records[i]),
+ block_size,
+ committed_to,
+ segment_nonce);
+ bl.claim_append(record_bl);
+ } while ((++i) < records.size());
+ assert(bl.length() == (std::size_t)encoded_length);
+ return bl;
+}
+
+void Journal::RecordBatch::set_result(
+ std::optional<journal_seq_t> batch_write_start)
+{
+ if (batch_write_start.has_value()) {
+ logger().debug(
+ "Journal::RecordBatch::set_result: batches={}, write_start {} => {}",
+ records.size(),
+ *batch_write_start,
+ batch_write_start->offset.offset + encoded_length);
+ } else {
+ logger().error(
+ "Journal::RecordBatch::set_result: batches={}, write is failed!",
+ records.size());
+ }
+ assert(state == state_t::SUBMITTING);
+ assert(io_promise.has_value());
+
+ state = state_t::EMPTY;
+ encoded_length = 0;
+ records.clear();
+ record_sizes.clear();
+ io_promise->set_value(batch_write_start);
+ io_promise.reset();
+}
+
+ceph::bufferlist Journal::RecordBatch::submit_pending_fast(
+ record_t&& record,
+ const record_size_t& rsize,
+ size_t block_size,
+ segment_off_t committed_to,
+ segment_nonce_t segment_nonce)
+{
+ logger().debug(
+ "Journal::RecordBatch::submit_pending_fast: write_size={}",
+ get_encoded_length(rsize));
+ assert(state == state_t::EMPTY);
+ assert(can_batch(rsize));
+
+ auto bl = encode_record(
+ rsize,
+ std::move(record),
+ block_size,
+ committed_to,
+ segment_nonce);
+ assert(bl.length() == get_encoded_length(rsize));
+ return bl;
+}
+
+Journal::RecordSubmitter::RecordSubmitter(
+ std::size_t io_depth,
+ std::size_t batch_capacity,
+ std::size_t batch_flush_size,
+ JournalSegmentManager& jsm)
+ : io_depth_limit{io_depth},
+ journal_segment_manager{jsm},
+ batches(new RecordBatch[io_depth + 1])
+{
+ logger().info("Journal::RecordSubmitter: io_depth_limit={}, "
+ "batch_capacity={}, batch_flush_size={}",
+ io_depth, batch_capacity, batch_flush_size);
+ assert(io_depth > 0);
+ free_batch_ptrs.reserve(io_depth + 1);
+ for (std::size_t i = 0; i <= io_depth; ++i) {
+ batches[i].initialize(i, batch_capacity, batch_flush_size);
+ free_batch_ptrs.push_back(&batches[i]);
+ }
+ pop_free_batch();
+}
+
+Journal::RecordSubmitter::submit_ret
+Journal::RecordSubmitter::submit(
+ record_t&& record,
+ OrderingHandle& handle)
+{
+ assert(write_pipeline);
+ auto rsize = get_encoded_record_length(
+ record, journal_segment_manager.get_block_size());
+ auto total = rsize.mdlength + rsize.dlength;
+ auto max_record_length = journal_segment_manager.get_max_write_length();
+ if (total > max_record_length) {
+ logger().error(
+ "Journal::RecordSubmitter::submit: record size {} exceeds max {}",
+ total,
+ max_record_length
+ );
+ return crimson::ct_error::erange::make();
+ }
+
+ return do_submit(std::move(record), rsize, handle);
+}
+
+void Journal::RecordSubmitter::update_state()
+{
+ if (num_outstanding_io == 0) {
+ state = state_t::IDLE;
+ } else if (num_outstanding_io < io_depth_limit) {
+ state = state_t::PENDING;
+ } else if (num_outstanding_io == io_depth_limit) {
+ state = state_t::FULL;
+ } else {
+ ceph_abort("fatal error: io-depth overflow");
+ }
+}
+
+void Journal::RecordSubmitter::finish_submit_batch(
+ RecordBatch* p_batch,
+ std::optional<journal_seq_t> result)
+{
+ assert(p_batch->is_submitting());
+ p_batch->set_result(result);
+ free_batch_ptrs.push_back(p_batch);
+ decrement_io_with_flush();
+}
+
+void Journal::RecordSubmitter::flush_current_batch()
+{
+ RecordBatch* p_batch = p_current_batch;
+ assert(p_batch->is_pending());
+ p_current_batch = nullptr;
+ pop_free_batch();
+
+ increment_io();
+ ceph::bufferlist to_write = p_batch->encode_records(
+ journal_segment_manager.get_block_size(),
+ journal_segment_manager.get_committed_to(),
+ journal_segment_manager.get_nonce());
+ std::ignore = journal_segment_manager.write(to_write
+ ).safe_then([this, p_batch](journal_seq_t write_start) {
+ finish_submit_batch(p_batch, write_start);
+ }).handle_error(
+ crimson::ct_error::all_same_way([this, p_batch](auto e) {
+ logger().error(
+ "Journal::RecordSubmitter::flush_current_batch: got error {}",
+ e);
+ finish_submit_batch(p_batch, std::nullopt);
+ })
+ ).handle_exception([this, p_batch](auto e) {
+ logger().error(
+ "Journal::RecordSubmitter::flush_current_batch: got exception {}",
+ e);
+ finish_submit_batch(p_batch, std::nullopt);
+ });
+}
+
+seastar::future<std::pair<paddr_t, journal_seq_t>>
+Journal::RecordSubmitter::mark_record_committed_in_order(
+ OrderingHandle& handle,
+ const journal_seq_t& write_start_seq,
+ const record_size_t& rsize)
+{
+ return handle.enter(write_pipeline->finalize
+ ).then([this, write_start_seq, rsize] {
+ auto committed_to = write_start_seq;
+ committed_to.offset.offset += (rsize.mdlength + rsize.dlength);
+ journal_segment_manager.mark_committed(committed_to);
+ return std::make_pair(
+ write_start_seq.offset.add_offset(rsize.mdlength),
+ write_start_seq);
+ });
+}
+
+Journal::RecordSubmitter::submit_pending_ret
+Journal::RecordSubmitter::submit_pending(
+ record_t&& record,
+ const record_size_t& rsize,
+ OrderingHandle& handle,
+ bool flush)
+{
+ assert(!p_current_batch->is_submitting());
+ auto write_fut = [this, flush, record=std::move(record), &rsize]() mutable {
+ if (flush && p_current_batch->is_empty()) {
+ // fast path with direct write
+ increment_io();
+ ceph::bufferlist to_write = p_current_batch->submit_pending_fast(
+ std::move(record),
+ rsize,
+ journal_segment_manager.get_block_size(),
+ journal_segment_manager.get_committed_to(),
+ journal_segment_manager.get_nonce());
+ return journal_segment_manager.write(to_write
+ ).finally([this] {
+ decrement_io_with_flush();
+ });
+ } else {
+ // indirect write with or without the existing pending records
+ auto write_fut = p_current_batch->add_pending(
+ std::move(record), rsize);
+ if (flush) {
+ flush_current_batch();
+ }
+ return write_fut;
+ }
+ }();
+ return handle.enter(write_pipeline->device_submission
+ ).then([write_fut=std::move(write_fut)]() mutable {
+ return std::move(write_fut);
+ }).safe_then([this, &handle, rsize](journal_seq_t write_start) {
+ return mark_record_committed_in_order(handle, write_start, rsize);
+ });
+}
+
+Journal::RecordSubmitter::do_submit_ret
+Journal::RecordSubmitter::do_submit(
+ record_t&& record,
+ const record_size_t& rsize,
+ OrderingHandle& handle)
+{
+ assert(!p_current_batch->is_submitting());
+ if (state <= state_t::PENDING) {
+ // can increment io depth
+ assert(!wait_submit_promise.has_value());
+ auto batched_size = p_current_batch->can_batch(rsize);
+ if (batched_size == 0 ||
+ batched_size > journal_segment_manager.get_max_write_length()) {
+ assert(p_current_batch->is_pending());
+ flush_current_batch();
+ return do_submit(std::move(record), rsize, handle);
+ } else if (journal_segment_manager.needs_roll(batched_size)) {
+ if (p_current_batch->is_pending()) {
+ flush_current_batch();
+ }
+ return journal_segment_manager.roll(
+ ).safe_then([this, record=std::move(record), rsize, &handle]() mutable {
+ return do_submit(std::move(record), rsize, handle);
+ });
+ } else {
+ return submit_pending(std::move(record), rsize, handle, true);
+ }
+ }
+
+ assert(state == state_t::FULL);
+ // cannot increment io depth
+ auto batched_size = p_current_batch->can_batch(rsize);
+ if (batched_size == 0 ||
+ batched_size > journal_segment_manager.get_max_write_length() ||
+ journal_segment_manager.needs_roll(batched_size)) {
+ if (!wait_submit_promise.has_value()) {
+ wait_submit_promise = seastar::promise<>();
+ }
+ return wait_submit_promise->get_future(
+ ).then([this, record=std::move(record), rsize, &handle]() mutable {
+ return do_submit(std::move(record), rsize, handle);
+ });
+ } else {
+ return submit_pending(std::move(record), rsize, handle, false);
+ }
+}
+
}
#include <boost/intrusive_ptr.hpp>
#include <optional>
+#include <seastar/core/circular_buffer.hh>
#include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
#include "include/ceph_assert.h"
#include "include/buffer.h"
record_t &&record,
OrderingHandle &handle
) {
- assert(write_pipeline);
- auto rsize = get_encoded_record_length(
- record, journal_segment_manager.get_block_size());
- auto total = rsize.mdlength + rsize.dlength;
- auto max_record_length = journal_segment_manager.get_max_write_length();
- if (total > max_record_length) {
- auto &logger = crimson::get_logger(ceph_subsys_seastore);
- logger.error(
- "Journal::submit_record: record size {} exceeds max {}",
- total,
- max_record_length
- );
- return crimson::ct_error::erange::make();
- }
- auto roll = journal_segment_manager.needs_roll(total)
- ? journal_segment_manager.roll()
- : JournalSegmentManager::roll_ertr::now();
- return roll.safe_then(
- [this, rsize, record=std::move(record), &handle]() mutable {
- auto seq = journal_segment_manager.get_segment_seq();
- return write_record(
- rsize, std::move(record),
- handle
- ).safe_then([rsize, seq](auto addr) {
- return std::make_pair(
- addr.add_offset(rsize.mdlength),
- journal_seq_t{seq, addr});
- });
- });
+ return record_submitter.submit(std::move(record), handle);
}
/**
std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
delta_handler_t &&delta_handler);
- void set_write_pipeline(WritePipeline *_write_pipeline) {
- write_pipeline = _write_pipeline;
+ void set_write_pipeline(WritePipeline* write_pipeline) {
+ record_submitter.set_write_pipeline(write_pipeline);
}
private:
journal_seq_t committed_to;
};
+ class RecordBatch {
+ enum class state_t {
+ EMPTY = 0,
+ PENDING,
+ SUBMITTING
+ };
+
+ public:
+ RecordBatch() = default;
+ RecordBatch(RecordBatch&&) = delete;
+ RecordBatch(const RecordBatch&) = delete;
+ RecordBatch& operator=(RecordBatch&&) = delete;
+ RecordBatch& operator=(const RecordBatch&) = delete;
+
+ bool is_empty() const {
+ return state == state_t::EMPTY;
+ }
+
+ bool is_pending() const {
+ return state == state_t::PENDING;
+ }
+
+ bool is_submitting() const {
+ return state == state_t::SUBMITTING;
+ }
+
+ std::size_t get_index() const {
+ return index;
+ }
+
+ std::size_t get_num_records() const {
+ return records.size();
+ }
+
+ // return the expected write size if allows to batch,
+ // otherwise, return 0
+ std::size_t can_batch(const record_size_t& rsize) const {
+ assert(state != state_t::SUBMITTING);
+ if (records.size() >= batch_capacity ||
+ static_cast<std::size_t>(encoded_length) > batch_flush_size) {
+ assert(state == state_t::PENDING);
+ return 0;
+ }
+ return get_encoded_length(rsize);
+ }
+
+ void initialize(std::size_t i,
+ std::size_t _batch_capacity,
+ std::size_t _batch_flush_size) {
+ ceph_assert(_batch_capacity > 0);
+ index = i;
+ batch_capacity = _batch_capacity;
+ batch_flush_size = _batch_flush_size;
+ records.reserve(batch_capacity);
+ record_sizes.reserve(batch_capacity);
+ }
+
+ // Add to the batch, the future will be resolved after the batch is
+ // written.
+ using add_pending_ertr = JournalSegmentManager::write_ertr;
+ using add_pending_ret = JournalSegmentManager::write_ret;
+ add_pending_ret add_pending(record_t&&, const record_size_t&);
+
+ // Encode the batched records for write.
+ ceph::bufferlist encode_records(
+ size_t block_size,
+ segment_off_t committed_to,
+ segment_nonce_t segment_nonce);
+
+ // Set the write result and reset for reuse
+ void set_result(std::optional<journal_seq_t> batch_write_start);
+
+ // The fast path that is equivalent to submit a single record as a batch.
+ //
+ // Essentially, equivalent to the combined logic of:
+ // add_pending(), encode_records() and set_result() above without
+ // the intervention of the shared io_promise.
+ //
+ // Note the current RecordBatch can be reused afterwards.
+ ceph::bufferlist submit_pending_fast(
+ record_t&&,
+ const record_size_t&,
+ size_t block_size,
+ segment_off_t committed_to,
+ segment_nonce_t segment_nonce);
+
+ private:
+ std::size_t get_encoded_length(const record_size_t& rsize) const {
+ auto ret = encoded_length + rsize.mdlength + rsize.dlength;
+ assert(ret > 0);
+ return ret;
+ }
+
+ state_t state = state_t::EMPTY;
+ std::size_t index = 0;
+ std::size_t batch_capacity = 0;
+ std::size_t batch_flush_size = 0;
+ segment_off_t encoded_length = 0;
+ std::vector<record_t> records;
+ std::vector<record_size_t> record_sizes;
+ std::optional<seastar::shared_promise<
+ std::optional<journal_seq_t> > > io_promise;
+ };
+
+ class RecordSubmitter {
+ enum class state_t {
+ IDLE = 0, // outstanding_io == 0
+ PENDING, // outstanding_io < io_depth_limit
+ FULL // outstanding_io == io_depth_limit
+ // OVERFLOW: outstanding_io > io_depth_limit is impossible
+ };
+
+ public:
+ RecordSubmitter(std::size_t io_depth,
+ std::size_t batch_capacity,
+ std::size_t batch_flush_size,
+ JournalSegmentManager&);
+
+ void set_write_pipeline(WritePipeline *_write_pipeline) {
+ write_pipeline = _write_pipeline;
+ }
+
+ using submit_ret = Journal::submit_record_ret;
+ submit_ret submit(record_t&&, OrderingHandle&);
+
+ private:
+ void update_state();
+
+ void increment_io() {
+ ++num_outstanding_io;
+ update_state();
+ }
+
+ void decrement_io_with_flush() {
+ assert(num_outstanding_io > 0);
+ --num_outstanding_io;
+#ifndef NDEBUG
+ auto prv_state = state;
+#endif
+ update_state();
+
+ if (wait_submit_promise.has_value()) {
+ assert(prv_state == state_t::FULL);
+ wait_submit_promise->set_value();
+ wait_submit_promise.reset();
+ }
+
+ if (!p_current_batch->is_empty()) {
+ flush_current_batch();
+ }
+ }
+
+ void pop_free_batch() {
+ assert(p_current_batch == nullptr);
+ assert(!free_batch_ptrs.empty());
+ p_current_batch = free_batch_ptrs.front();
+ assert(p_current_batch->is_empty());
+ assert(p_current_batch == &batches[p_current_batch->get_index()]);
+ free_batch_ptrs.pop_front();
+ }
+
+ void finish_submit_batch(RecordBatch*, std::optional<journal_seq_t>);
+
+ void flush_current_batch();
+
+ seastar::future<std::pair<paddr_t, journal_seq_t>>
+ mark_record_committed_in_order(
+ OrderingHandle&, const journal_seq_t&, const record_size_t&);
+
+ using submit_pending_ertr = JournalSegmentManager::write_ertr;
+ using submit_pending_ret = submit_pending_ertr::future<
+ std::pair<paddr_t, journal_seq_t> >;
+ submit_pending_ret submit_pending(
+ record_t&&, const record_size_t&, OrderingHandle &handle, bool flush);
+
+ using do_submit_ret = submit_pending_ret;
+ do_submit_ret do_submit(
+ record_t&&, const record_size_t&, OrderingHandle&);
+
+ state_t state = state_t::IDLE;
+ std::size_t num_outstanding_io = 0;
+ std::size_t io_depth_limit;
+
+ WritePipeline* write_pipeline = nullptr;
+ JournalSegmentManager& journal_segment_manager;
+ std::unique_ptr<RecordBatch[]> batches;
+ std::size_t current_batch_index;
+ // should not be nullptr after constructed
+ RecordBatch* p_current_batch = nullptr;
+ seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
+ std::optional<seastar::promise<> > wait_submit_promise;
+ };
+
SegmentProvider* segment_provider = nullptr;
JournalSegmentManager journal_segment_manager;
+ RecordSubmitter record_submitter;
ExtentReader& scanner;
- WritePipeline *write_pipeline = nullptr;
-
- /// do record write
- using write_record_ertr = crimson::errorator<
- crimson::ct_error::input_output_error>;
- using write_record_ret = write_record_ertr::future<paddr_t>;
- write_record_ret write_record(
- record_size_t rsize,
- record_t &&record,
- OrderingHandle &handle);
/// return ordered vector of segments to replay
using replay_segments_t = std::vector<