});
}
+seastar::future<> SegmentedJournal::flush(OrderingHandle &handle)
+{
+ LOG_PREFIX(SegmentedJournal::flush);
+ DEBUG("H{} flush ...", (void*)&handle);
+ assert(write_pipeline);
+ return handle.enter(write_pipeline->device_submission
+ ).then([this, &handle] {
+ return handle.enter(write_pipeline->finalize);
+ }).then([FNAME, &handle] {
+ DEBUG("H{} flush done", (void*)&handle);
+ });
+}
+
+SegmentedJournal::submit_record_ret
+SegmentedJournal::do_submit_record(
+ record_t &&record,
+ OrderingHandle &handle)
+{
+ LOG_PREFIX(SegmentedJournal::do_submit_record);
+ if (!record_submitter.is_available()) {
+ DEBUG("H{} wait ...", (void*)&handle);
+ return record_submitter.wait_available(
+ ).safe_then([this, record=std::move(record), &handle]() mutable {
+ return do_submit_record(std::move(record), handle);
+ });
+ }
+ auto action = record_submitter.check_action(record.size);
+ if (action == RecordSubmitter::action_t::ROLL) {
+ DEBUG("H{} roll, unavailable ...", (void*)&handle);
+ return record_submitter.roll_segment(
+ ).safe_then([this, record=std::move(record), &handle]() mutable {
+ return do_submit_record(std::move(record), handle);
+ });
+ } else { // SUBMIT_FULL/NOT_FULL
+ DEBUG("H{} submit {} ...",
+ (void*)&handle,
+ action == RecordSubmitter::action_t::SUBMIT_FULL ?
+ "FULL" : "NOT_FULL");
+ auto submit_fut = record_submitter.submit(std::move(record));
+ return handle.enter(write_pipeline->device_submission
+ ).then([submit_fut=std::move(submit_fut)]() mutable {
+ return std::move(submit_fut);
+ }).safe_then([FNAME, this, &handle](record_locator_t result) {
+ return handle.enter(write_pipeline->finalize
+ ).then([FNAME, this, result, &handle] {
+ DEBUG("H{} finish with {}", (void*)&handle, result);
+ auto new_committed_to = result.write_result.get_end_seq();
+ record_submitter.update_committed_to(new_committed_to);
+ return result;
+ });
+ });
+ }
+}
+
+SegmentedJournal::submit_record_ret
+SegmentedJournal::submit_record(
+ record_t &&record,
+ OrderingHandle &handle)
+{
+ LOG_PREFIX(SegmentedJournal::submit_record);
+ DEBUG("H{} {} start ...", (void*)&handle, record);
+ assert(write_pipeline);
+ auto expected_size = record_group_size_t(
+ record.size,
+ journal_segment_allocator.get_block_size()
+ ).get_encoded_length();
+ auto max_record_length = journal_segment_allocator.get_max_write_length();
+ if (expected_size > max_record_length) {
+ ERROR("H{} {} exceeds max record size {}",
+ (void*)&handle, record, max_record_length);
+ return crimson::ct_error::erange::make();
+ }
+
+ return do_submit_record(std::move(record), handle);
+}
+
void SegmentedJournal::register_metrics()
{
LOG_PREFIX(Journal::register_metrics);
SegmentedJournal::RecordBatch::add_pending_ret
SegmentedJournal::RecordBatch::add_pending(
+ const std::string& name,
record_t&& record,
- OrderingHandle& handle,
extent_len_t block_size)
{
LOG_PREFIX(RecordBatch::add_pending);
auto new_size = get_encoded_length_after(record, block_size);
auto dlength_offset = pending.size.dlength;
- TRACE("H{} batches={}, write_size={}, dlength_offset={} ...",
- (void*)&handle,
+ TRACE("{} batches={}, write_size={}, dlength_offset={} ...",
+ name,
pending.get_size() + 1,
new_size.get_encoded_length(),
dlength_offset);
assert(state != state_t::SUBMITTING);
- assert(can_batch(record, block_size).value() == new_size);
+ assert(evaluate_submit(record.size, block_size).submit_size == new_size);
pending.push_back(
std::move(record), block_size);
state = state_t::PENDING;
return io_promise->get_shared_future(
- ).then([dlength_offset, FNAME, &handle
+ ).then([dlength_offset, FNAME, &name
](auto maybe_promise_result) -> add_pending_ret {
if (!maybe_promise_result.has_value()) {
- ERROR("H{} write failed", (void*)&handle);
+ ERROR("{} write failed", name);
return crimson::ct_error::input_output_error::make();
}
auto write_result = maybe_promise_result->write_result;
maybe_promise_result->mdlength + dlength_offset),
write_result
};
- TRACE("H{} write finish with {}", (void*)&handle, submit_result);
+ TRACE("{} write finish with {}", name, submit_result);
return add_pending_ret(
add_pending_ertr::ready_future_marker{},
submit_result);
auto new_size = get_encoded_length_after(record, block_size);
std::ignore = new_size;
assert(state == state_t::EMPTY);
- assert(can_batch(record, block_size).value() == new_size);
+ assert(evaluate_submit(record.size, block_size).submit_size == new_size);
auto group = record_group_t(std::move(record), block_size);
auto size = group.size;
std::size_t batch_capacity,
std::size_t batch_flush_size,
double preferred_fullness,
- SegmentAllocator& jsa)
+ SegmentAllocator& sa)
: io_depth_limit{io_depth},
preferred_fullness{preferred_fullness},
- journal_segment_allocator{jsa},
+ segment_allocator{sa},
batches(new RecordBatch[io_depth + 1])
{
LOG_PREFIX(RecordSubmitter);
- INFO("Journal::RecordSubmitter: io_depth_limit={}, "
- "batch_capacity={}, batch_flush_size={}, "
+ INFO("{} io_depth_limit={}, batch_capacity={}, batch_flush_size={}, "
"preferred_fullness={}",
- io_depth, batch_capacity,
+ get_name(), io_depth, batch_capacity,
batch_flush_size, preferred_fullness);
ceph_assert(io_depth > 0);
ceph_assert(batch_capacity > 0);
pop_free_batch();
}
-SegmentedJournal::RecordSubmitter::submit_ret
-SegmentedJournal::RecordSubmitter::submit(
- record_t&& record,
- OrderingHandle& handle)
+bool SegmentedJournal::RecordSubmitter::is_available() const
{
- LOG_PREFIX(RecordSubmitter::submit);
- DEBUG("H{} {} start ...", (void*)&handle, record);
- assert(write_pipeline);
- auto expected_size = record_group_size_t(
- record.size,
- journal_segment_allocator.get_block_size()
- ).get_encoded_length();
- auto max_record_length = journal_segment_allocator.get_max_write_length();
- if (expected_size > max_record_length) {
- ERROR("H{} {} exceeds max record size {}",
- (void*)&handle, record, max_record_length);
- return crimson::ct_error::erange::make();
+ auto ret = !wait_available_promise.has_value() &&
+ !has_io_error;
+#ifndef NDEBUG
+ if (ret) {
+ // invariants when available
+ ceph_assert(segment_allocator.can_write());
+ ceph_assert(p_current_batch != nullptr);
+ ceph_assert(!p_current_batch->is_submitting());
+ ceph_assert(!p_current_batch->needs_flush());
+ if (!p_current_batch->is_empty()) {
+ auto submit_length =
+ p_current_batch->get_submit_size().get_encoded_length();
+ ceph_assert(!segment_allocator.needs_roll(submit_length));
+ }
}
+#endif
+ return ret;
+}
- return do_submit(std::move(record), handle);
+SegmentedJournal::RecordSubmitter::wa_ertr::future<>
+SegmentedJournal::RecordSubmitter::wait_available()
+{
+ LOG_PREFIX(RecordSubmitter::wait_available);
+ assert(!is_available());
+ if (has_io_error) {
+ ERROR("{} I/O is failed before wait", get_name());
+ return crimson::ct_error::input_output_error::make();
+ }
+ return wait_available_promise->get_shared_future(
+ ).then([FNAME, this]() -> wa_ertr::future<> {
+ if (has_io_error) {
+ ERROR("{} I/O is failed after wait", get_name());
+ return crimson::ct_error::input_output_error::make();
+ }
+ return wa_ertr::now();
+ });
}
-seastar::future<> SegmentedJournal::RecordSubmitter::flush(OrderingHandle &handle)
+SegmentedJournal::RecordSubmitter::action_t
+SegmentedJournal::RecordSubmitter::check_action(
+ const record_size_t& rsize) const
{
- LOG_PREFIX(RecordSubmitter::flush);
- DEBUG("H{} flush", (void*)&handle);
- return handle.enter(write_pipeline->device_submission
- ).then([this, &handle] {
- return handle.enter(write_pipeline->finalize);
- }).then([FNAME, &handle] {
- DEBUG("H{} flush done", (void*)&handle);
+ assert(is_available());
+ auto eval = p_current_batch->evaluate_submit(
+ rsize, segment_allocator.get_block_size());
+ if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) {
+ return action_t::ROLL;
+ } else if (eval.is_full) {
+ return action_t::SUBMIT_FULL;
+ } else {
+ return action_t::SUBMIT_NOT_FULL;
+ }
+}
+
+SegmentedJournal::RecordSubmitter::roll_segment_ertr::future<>
+SegmentedJournal::RecordSubmitter::roll_segment()
+{
+ LOG_PREFIX(RecordSubmitter::roll_segment);
+ assert(is_available());
+ // #1 block concurrent submissions due to rolling
+ wait_available_promise = seastar::shared_promise<>();
+ assert(!wait_unfull_flush_promise.has_value());
+ return [FNAME, this] {
+ if (p_current_batch->is_pending()) {
+ if (state == state_t::FULL) {
+ DEBUG("{} wait flush ...", get_name());
+ wait_unfull_flush_promise = seastar::promise<>();
+ return wait_unfull_flush_promise->get_future();
+ } else { // IDLE/PENDING
+ DEBUG("{} flush", get_name());
+ flush_current_batch();
+ return seastar::now();
+ }
+ } else {
+ assert(p_current_batch->is_empty());
+ return seastar::now();
+ }
+ }().then_wrapped([FNAME, this](auto fut) {
+ if (fut.failed()) {
+ ERROR("{} rolling is skipped unexpectedly, available", get_name());
+ has_io_error = true;
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ return roll_segment_ertr::now();
+ } else {
+ // start rolling in background
+ std::ignore = segment_allocator.roll(
+ ).safe_then([FNAME, this] {
+ // good
+ DEBUG("{} rolling done, available", get_name());
+ assert(!has_io_error);
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ }).handle_error(
+ crimson::ct_error::all_same_way([FNAME, this](auto e) {
+ ERROR("{} got error {}, available", get_name(), e);
+ has_io_error = true;
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ })
+ ).handle_exception([FNAME, this](auto e) {
+ ERROR("{} got exception {}, available", get_name(), e);
+ has_io_error = true;
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ });
+ // wait for background rolling
+ return wait_available();
+ }
});
}
+SegmentedJournal::RecordSubmitter::submit_ret
+SegmentedJournal::RecordSubmitter::submit(record_t&& record)
+{
+ LOG_PREFIX(RecordSubmitter::submit);
+ assert(is_available());
+ assert(check_action(record.size) != action_t::ROLL);
+ auto eval = p_current_batch->evaluate_submit(
+ record.size, segment_allocator.get_block_size());
+ bool needs_flush = (
+ state == state_t::IDLE ||
+ eval.submit_size.get_fullness() > preferred_fullness ||
+ // RecordBatch::needs_flush()
+ eval.is_full ||
+ p_current_batch->get_num_records() + 1 >=
+ p_current_batch->get_batch_capacity());
+ if (p_current_batch->is_empty() &&
+ needs_flush &&
+ state != state_t::FULL) {
+ // fast path with direct write
+ increment_io();
+ auto [to_write, sizes] = p_current_batch->submit_pending_fast(
+ std::move(record),
+ segment_allocator.get_block_size(),
+ committed_to,
+ segment_allocator.get_nonce());
+ DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
+ get_name(), sizes, committed_to, num_outstanding_io);
+ account_submission(1, sizes);
+ return segment_allocator.write(to_write
+ ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
+ return record_locator_t{
+ write_result.start_seq.offset.add_offset(mdlength),
+ write_result
+ };
+ }).finally([this] {
+ decrement_io_with_flush();
+ });
+ }
+ // indirect batched write
+ auto write_fut = p_current_batch->add_pending(
+ get_name(),
+ std::move(record),
+ segment_allocator.get_block_size());
+ if (needs_flush) {
+ if (state == state_t::FULL) {
+ // #2 block concurrent submissions due to lack of resource
+ DEBUG("{} added with {} pending, outstanding_io={}, unavailable, wait flush ...",
+ get_name(),
+ p_current_batch->get_num_records(),
+ num_outstanding_io);
+ wait_available_promise = seastar::shared_promise<>();
+ assert(!wait_unfull_flush_promise.has_value());
+ wait_unfull_flush_promise = seastar::promise<>();
+ // flush and mark available in background
+ std::ignore = wait_unfull_flush_promise->get_future(
+ ).finally([FNAME, this] {
+ DEBUG("{} flush done, available", get_name());
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ });
+ } else {
+ DEBUG("{} added pending, flush", get_name());
+ flush_current_batch();
+ }
+ } else {
+ // will flush later
+ DEBUG("{} added with {} pending, outstanding_io={}",
+ get_name(),
+ p_current_batch->get_num_records(),
+ num_outstanding_io);
+ assert(!p_current_batch->needs_flush());
+ }
+ return write_fut;
+}
+
void SegmentedJournal::RecordSubmitter::update_state()
{
if (num_outstanding_io == 0) {
{
LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
assert(num_outstanding_io > 0);
- --num_outstanding_io;
-#ifndef NDEBUG
auto prv_state = state;
-#endif
+ --num_outstanding_io;
update_state();
- if (wait_submit_promise.has_value()) {
- DEBUG("wait resolved");
- assert(prv_state == state_t::FULL);
- wait_submit_promise->set_value();
- wait_submit_promise.reset();
+ if (prv_state == state_t::FULL) {
+ if (wait_unfull_flush_promise.has_value()) {
+ DEBUG("{} flush, resolve wait_unfull_flush_promise", get_name());
+ assert(!p_current_batch->is_empty());
+ assert(wait_available_promise.has_value());
+ flush_current_batch();
+ wait_unfull_flush_promise->set_value();
+ wait_unfull_flush_promise.reset();
+ return;
+ }
+ } else {
+ assert(!wait_unfull_flush_promise.has_value());
}
- if (!p_current_batch->is_empty()) {
- TRACE("flush");
+ auto needs_flush = (
+ !p_current_batch->is_empty() && (
+ state == state_t::IDLE ||
+ p_current_batch->get_submit_size().get_fullness() > preferred_fullness ||
+ p_current_batch->needs_flush()
+ ));
+ if (needs_flush) {
+ DEBUG("{} flush", get_name());
flush_current_batch();
}
}
increment_io();
auto num = p_batch->get_num_records();
auto [to_write, sizes] = p_batch->encode_batch(
- journal_committed_to, journal_segment_allocator.get_nonce());
- DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...",
- num, sizes, journal_committed_to, num_outstanding_io);
+ committed_to, segment_allocator.get_nonce());
+ DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
+ get_name(), num, sizes, committed_to, num_outstanding_io);
account_submission(num, sizes);
- std::ignore = journal_segment_allocator.write(to_write
+ std::ignore = segment_allocator.write(to_write
).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
- TRACE("{} records, {}, write done with {}", num, sizes, write_result);
+ TRACE("{} {} records, {}, write done with {}",
+ get_name(), num, sizes, write_result);
finish_submit_batch(p_batch, write_result);
}).handle_error(
crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
- ERROR("{} records, {}, got error {}", num, sizes, e);
+ ERROR("{} {} records, {}, got error {}",
+ get_name(), num, sizes, e);
finish_submit_batch(p_batch, std::nullopt);
})
).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
- ERROR("{} records, {}, got exception {}", num, sizes, e);
+ ERROR("{} {} records, {}, got exception {}",
+ get_name(), num, sizes, e);
finish_submit_batch(p_batch, std::nullopt);
});
}
-SegmentedJournal::RecordSubmitter::submit_pending_ret
-SegmentedJournal::RecordSubmitter::submit_pending(
- record_t&& record,
- OrderingHandle& handle,
- bool flush)
-{
- LOG_PREFIX(RecordSubmitter::submit_pending);
- assert(!p_current_batch->is_submitting());
- stats.record_batch_stats.increment(
- p_current_batch->get_num_records() + 1);
- bool do_flush = (flush || state == state_t::IDLE);
- auto write_fut = [this, do_flush, FNAME, record=std::move(record), &handle]() mutable {
- if (do_flush && p_current_batch->is_empty()) {
- // fast path with direct write
- increment_io();
- auto [to_write, sizes] = p_current_batch->submit_pending_fast(
- std::move(record),
- journal_segment_allocator.get_block_size(),
- journal_committed_to,
- journal_segment_allocator.get_nonce());
- DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...",
- (void*)&handle, sizes, journal_committed_to, num_outstanding_io);
- account_submission(1, sizes);
- return journal_segment_allocator.write(to_write
- ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
- return record_locator_t{
- write_result.start_seq.offset.add_offset(mdlength),
- write_result
- };
- }).finally([this] {
- decrement_io_with_flush();
- });
- } else {
- // indirect write with or without the existing pending records
- auto write_fut = p_current_batch->add_pending(
- std::move(record),
- handle,
- journal_segment_allocator.get_block_size());
- if (do_flush) {
- DEBUG("H{} added pending and flush", (void*)&handle);
- flush_current_batch();
- } else {
- DEBUG("H{} added with {} pending",
- (void*)&handle, p_current_batch->get_num_records());
- }
- return write_fut;
- }
- }();
- return handle.enter(write_pipeline->device_submission
- ).then([write_fut=std::move(write_fut)]() mutable {
- return std::move(write_fut);
- }).safe_then([this, FNAME, &handle](auto submit_result) {
- return handle.enter(write_pipeline->finalize
- ).then([this, FNAME, submit_result, &handle] {
- DEBUG("H{} finish with {}", (void*)&handle, submit_result);
- auto new_committed_to = submit_result.write_result.get_end_seq();
- assert(journal_committed_to == JOURNAL_SEQ_NULL ||
- journal_committed_to <= new_committed_to);
- journal_committed_to = new_committed_to;
- return submit_result;
- });
- });
-}
-
-SegmentedJournal::RecordSubmitter::do_submit_ret
-SegmentedJournal::RecordSubmitter::do_submit(
- record_t&& record,
- OrderingHandle& handle)
-{
- LOG_PREFIX(RecordSubmitter::do_submit);
- TRACE("H{} outstanding_io={}/{} ...",
- (void*)&handle, num_outstanding_io, io_depth_limit);
- assert(!p_current_batch->is_submitting());
- if (state <= state_t::PENDING) {
- // can increment io depth
- assert(!wait_submit_promise.has_value());
- auto maybe_new_size = p_current_batch->can_batch(
- record, journal_segment_allocator.get_block_size());
- if (!maybe_new_size.has_value() ||
- (maybe_new_size->get_encoded_length() >
- journal_segment_allocator.get_max_write_length())) {
- TRACE("H{} flush", (void*)&handle);
- assert(p_current_batch->is_pending());
- flush_current_batch();
- return do_submit(std::move(record), handle);
- } else if (journal_segment_allocator.needs_roll(
- maybe_new_size->get_encoded_length())) {
- if (p_current_batch->is_pending()) {
- TRACE("H{} flush and roll", (void*)&handle);
- flush_current_batch();
- } else {
- TRACE("H{} roll", (void*)&handle);
- }
- return journal_segment_allocator.roll(
- ).safe_then([this, record=std::move(record), &handle]() mutable {
- return do_submit(std::move(record), handle);
- });
- } else {
- bool flush = (maybe_new_size->get_fullness() > preferred_fullness ?
- true : false);
- return submit_pending(std::move(record), handle, flush);
- }
- }
-
- assert(state == state_t::FULL);
- // cannot increment io depth
- auto maybe_new_size = p_current_batch->can_batch(
- record, journal_segment_allocator.get_block_size());
- if (!maybe_new_size.has_value() ||
- (maybe_new_size->get_encoded_length() >
- journal_segment_allocator.get_max_write_length()) ||
- journal_segment_allocator.needs_roll(
- maybe_new_size->get_encoded_length())) {
- if (!wait_submit_promise.has_value()) {
- wait_submit_promise = seastar::promise<>();
- }
- DEBUG("H{} wait ...", (void*)&handle);
- return wait_submit_promise->get_future(
- ).then([this, record=std::move(record), &handle]() mutable {
- return do_submit(std::move(record), handle);
- });
- } else {
- return submit_pending(std::move(record), handle, false);
- }
-}
-
}
submit_record_ret submit_record(
record_t &&record,
- OrderingHandle &handle
- ) final {
- return record_submitter.submit(std::move(record), handle);
- }
+ OrderingHandle &handle) final;
- seastar::future<> flush(OrderingHandle &handle) final {
- return record_submitter.flush(handle);
- }
+ seastar::future<> flush(OrderingHandle &handle) final;
replay_ret replay(delta_handler_t &&delta_handler) final;
- void set_write_pipeline(WritePipeline* write_pipeline) final {
- record_submitter.set_write_pipeline(write_pipeline);
+ void set_write_pipeline(WritePipeline *_write_pipeline) final {
+ write_pipeline = _write_pipeline;
}
private:
+ submit_record_ret do_submit_record(
+ record_t &&record,
+ OrderingHandle &handle
+ );
+
class RecordBatch {
enum class state_t {
EMPTY = 0,
return pending.get_size();
}
- // return the expected write sizes if allows to batch,
- // otherwise, return nullopt
- std::optional<record_group_size_t> can_batch(
- const record_t& record,
- extent_len_t block_size) const {
+ std::size_t get_batch_capacity() const {
+ return batch_capacity;
+ }
+
+ const record_group_size_t& get_submit_size() const {
+ assert(state != state_t::EMPTY);
+ return pending.size;
+ }
+
+ bool needs_flush() const {
assert(state != state_t::SUBMITTING);
- if (pending.get_size() >= batch_capacity ||
- (pending.get_size() > 0 &&
- pending.size.get_encoded_length() > batch_flush_size)) {
+ assert(pending.get_size() <= batch_capacity);
+ if (state == state_t::EMPTY) {
+ return false;
+ } else {
assert(state == state_t::PENDING);
- return std::nullopt;
+ return (pending.get_size() >= batch_capacity ||
+ pending.size.get_encoded_length() > batch_flush_size);
}
- return get_encoded_length_after(record, block_size);
+ }
+
+ struct evaluation_t {
+ record_group_size_t submit_size;
+ bool is_full;
+ };
+ evaluation_t evaluate_submit(
+ const record_size_t& rsize,
+ extent_len_t block_size) const {
+ assert(!needs_flush());
+ auto submit_size = pending.size.get_encoded_length_after(
+ rsize, block_size);
+ bool is_full = submit_size.get_encoded_length() > batch_flush_size;
+ return {submit_size, is_full};
}
void initialize(std::size_t i,
using add_pending_ertr = SegmentAllocator::write_ertr;
using add_pending_ret = add_pending_ertr::future<record_locator_t>;
add_pending_ret add_pending(
+ const std::string& name,
record_t&&,
- OrderingHandle&,
extent_len_t block_size);
// Encode the batched records for write.
}
};
+ using base_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+
public:
RecordSubmitter(std::size_t io_depth,
std::size_t batch_capacity,
double preferred_fullness,
SegmentAllocator&);
+ const std::string& get_name() const {
+ return segment_allocator.get_name();
+ }
+
grouped_io_stats get_record_batch_stats() const {
return stats.record_batch_stats;
}
}
journal_seq_t get_committed_to() const {
- return journal_committed_to;
+ return committed_to;
}
void reset_stats() {
stats = {};
}
- void set_write_pipeline(WritePipeline *_write_pipeline) {
- write_pipeline = _write_pipeline;
- }
+ // whether is available to submit a record
+ bool is_available() const;
- using submit_ret = Journal::submit_record_ret;
- submit_ret submit(record_t&&, OrderingHandle&);
- seastar::future<> flush(OrderingHandle &handle);
+ // wait for available if cannot submit, should check is_available() again
+ // when the future is resolved.
+ using wa_ertr = base_ertr;
+ wa_ertr::future<> wait_available();
+
+ // when available, check for the submit action
+ // according to the pending record size
+ enum class action_t {
+ ROLL,
+ SUBMIT_FULL,
+ SUBMIT_NOT_FULL
+ };
+ action_t check_action(const record_size_t&) const;
+
+ // when available, roll the segment if needed
+ using roll_segment_ertr = base_ertr;
+ roll_segment_ertr::future<> roll_segment();
+
+ // when available, submit the record if possible
+ using submit_ertr = base_ertr;
+ using submit_ret = submit_ertr::future<record_locator_t>;
+ submit_ret submit(record_t&&);
+
+ void update_committed_to(const journal_seq_t& new_committed_to) {
+ assert(new_committed_to != JOURNAL_SEQ_NULL);
+ assert(committed_to == JOURNAL_SEQ_NULL ||
+ committed_to <= new_committed_to);
+ committed_to = new_committed_to;
+ }
private:
void update_state();
void flush_current_batch();
- using submit_pending_ertr = SegmentAllocator::write_ertr;
- using submit_pending_ret = submit_pending_ertr::future<
- record_locator_t>;
- submit_pending_ret submit_pending(
- record_t&&, OrderingHandle &handle, bool flush);
-
- using do_submit_ret = submit_pending_ret;
- do_submit_ret do_submit(
- record_t&&, OrderingHandle&);
-
state_t state = state_t::IDLE;
std::size_t num_outstanding_io = 0;
std::size_t io_depth_limit;
double preferred_fullness;
- WritePipeline* write_pipeline = nullptr;
- SegmentAllocator& journal_segment_allocator;
+ SegmentAllocator& segment_allocator;
// committed_to may be in a previous journal segment
- journal_seq_t journal_committed_to = JOURNAL_SEQ_NULL;
+ journal_seq_t committed_to = JOURNAL_SEQ_NULL;
std::unique_ptr<RecordBatch[]> batches;
- std::size_t current_batch_index;
// should not be nullptr after constructed
RecordBatch* p_current_batch = nullptr;
seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
- std::optional<seastar::promise<> > wait_submit_promise;
+
+ // blocked for rolling or lack of resource
+ std::optional<seastar::shared_promise<> > wait_available_promise;
+ bool has_io_error = false;
+ // when needs flush but io depth is full,
+ // wait for decrement_io_with_flush()
+ std::optional<seastar::promise<> > wait_unfull_flush_promise;
struct {
grouped_io_stats record_batch_stats;
RecordSubmitter record_submitter;
ExtentReader& scanner;
seastar::metrics::metric_group metrics;
+ WritePipeline* write_pipeline = nullptr;
/// read journal segment headers from scanner
using find_journal_segments_ertr = crimson::errorator<