write_start_seq,
write_start_seq.offset.offset + write_length,
write_length);
+ assert(write_length > 0);
assert((write_length % segment_manager.get_block_size()) == 0);
assert(!needs_roll(write_length));
auto write_start_offset = written_to;
written_to += write_length;
+ auto write_result = write_result_t{
+ write_start_seq,
+ static_cast<segment_off_t>(write_length)
+ };
return current_journal_segment->write(
write_start_offset, to_write
).handle_error(
crimson::ct_error::assert_all{
"Invalid error in JournalSegmentManager::write"
}
- ).safe_then([write_start_seq] {
- return write_start_seq;
+ ).safe_then([write_result] {
+ return write_result;
});
}
written_to = 0;
return write(bl
- ).safe_then([this, new_tail, write_size=bl.length()
- ](journal_seq_t write_start_seq) {
+ ).safe_then([this, new_tail](auto) {
segment_provider->update_journal_tail_committed(new_tail);
});
}
auto new_encoded_length = get_encoded_length(rsize);
assert(new_encoded_length < MAX_SEG_OFF);
encoded_length = new_encoded_length;
+ bool is_first;
if (state == state_t::EMPTY) {
assert(!io_promise.has_value());
- io_promise = seastar::shared_promise<std::optional<journal_seq_t> >();
+ io_promise = seastar::shared_promise<maybe_result_t>();
+ is_first = true;
} else {
assert(io_promise.has_value());
+ is_first = false;
}
state = state_t::PENDING;
return io_promise->get_shared_future(
- ).then([record_start_offset
- ](auto batch_write_start) -> add_pending_ret {
- if (!batch_write_start.has_value()) {
+ ).then([record_start_offset, is_first
+ ](auto maybe_write_result) -> add_pending_ret {
+ if (!maybe_write_result.has_value()) {
return crimson::ct_error::input_output_error::make();
}
- auto record_write_start = batch_write_start.value();
- record_write_start.offset.offset += record_start_offset;
+ auto write_result = maybe_write_result.value();
+ if (is_first) {
+ assert(record_start_offset == 0);
+ } else {
+ assert(record_start_offset > 0);
+ write_result.write_start_seq.offset.offset += record_start_offset;
+ // only the first record should update JournalSegmentManager::committed_to
+ write_result.write_length = 0;
+ }
return add_pending_ret(
add_pending_ertr::ready_future_marker{},
- record_write_start);
+ write_result);
});
}
}
void Journal::RecordBatch::set_result(
- std::optional<journal_seq_t> batch_write_start)
+ maybe_result_t maybe_write_result)
{
- if (batch_write_start.has_value()) {
+ if (maybe_write_result.has_value()) {
logger().debug(
- "Journal::RecordBatch::set_result: batches={}, write_start {} => {}",
+ "Journal::RecordBatch::set_result: batches={}, write_start {} + {}",
records.size(),
- *batch_write_start,
- batch_write_start->offset.offset + encoded_length);
+ maybe_write_result->write_start_seq,
+ maybe_write_result->write_length);
} else {
logger().error(
"Journal::RecordBatch::set_result: batches={}, write is failed!",
encoded_length = 0;
records.clear();
record_sizes.clear();
- io_promise->set_value(batch_write_start);
+ io_promise->set_value(maybe_write_result);
io_promise.reset();
}
void Journal::RecordSubmitter::finish_submit_batch(
RecordBatch* p_batch,
- std::optional<journal_seq_t> result)
+ maybe_result_t maybe_result)
{
assert(p_batch->is_submitting());
- p_batch->set_result(result);
+ p_batch->set_result(maybe_result);
free_batch_ptrs.push_back(p_batch);
decrement_io_with_flush();
}
journal_segment_manager.get_committed_to(),
journal_segment_manager.get_nonce());
std::ignore = journal_segment_manager.write(to_write
- ).safe_then([this, p_batch](journal_seq_t write_start) {
- finish_submit_batch(p_batch, write_start);
+ ).safe_then([this, p_batch](auto write_result) {
+ finish_submit_batch(p_batch, write_result);
}).handle_error(
crimson::ct_error::all_same_way([this, p_batch](auto e) {
logger().error(
});
}
-seastar::future<std::pair<paddr_t, journal_seq_t>>
-Journal::RecordSubmitter::mark_record_committed_in_order(
- OrderingHandle& handle,
- const journal_seq_t& write_start_seq,
- const record_size_t& rsize)
-{
- return handle.enter(write_pipeline->finalize
- ).then([this, write_start_seq, rsize] {
- auto committed_to = write_start_seq;
- committed_to.offset.offset += (rsize.mdlength + rsize.dlength);
- journal_segment_manager.mark_committed(committed_to);
- return std::make_pair(
- write_start_seq.offset.add_offset(rsize.mdlength),
- write_start_seq);
- });
-}
-
Journal::RecordSubmitter::submit_pending_ret
Journal::RecordSubmitter::submit_pending(
record_t&& record,
return handle.enter(write_pipeline->device_submission
).then([write_fut=std::move(write_fut)]() mutable {
return std::move(write_fut);
- }).safe_then([this, &handle, rsize](journal_seq_t write_start) {
- return mark_record_committed_in_order(handle, write_start, rsize);
+ }).safe_then([this, &handle, rsize](auto write_result) {
+ return handle.enter(write_pipeline->finalize
+ ).then([this, write_result, rsize] {
+ if (write_result.write_length > 0) {
+ auto committed_to = write_result.write_start_seq;
+ committed_to.offset.offset += write_result.write_length;
+ journal_segment_manager.mark_committed(committed_to);
+ }
+ return std::make_pair(
+ write_result.write_start_seq.offset.add_offset(rsize.mdlength),
+ write_result.write_start_seq);
+ });
});
}
// write the buffer, return the write start
// May be called concurrently, writes may complete in any order.
+ struct write_result_t {
+ journal_seq_t write_start_seq;
+ segment_off_t write_length;
+ };
using write_ertr = base_ertr;
- using write_ret = write_ertr::future<journal_seq_t>;
+ using write_ret = write_ertr::future<write_result_t>;
write_ret write(ceph::bufferlist to_write);
// mark write committed in order
// Add to the batch, the future will be resolved after the batch is
// written.
+ //
+ // Set write_result_t::write_length to 0 if the record is not the first one
+ // in the batch.
+ using add_pending_result_t = JournalSegmentManager::write_result_t;
using add_pending_ertr = JournalSegmentManager::write_ertr;
using add_pending_ret = JournalSegmentManager::write_ret;
add_pending_ret add_pending(record_t&&, const record_size_t&);
segment_nonce_t segment_nonce);
// Set the write result and reset for reuse
- void set_result(std::optional<journal_seq_t> batch_write_start);
+ using maybe_result_t = std::optional<add_pending_result_t>;
+ void set_result(maybe_result_t maybe_write_result);
// The fast path that is equivalent to submit a single record as a batch.
//
segment_off_t encoded_length = 0;
std::vector<record_t> records;
std::vector<record_size_t> record_sizes;
- std::optional<seastar::shared_promise<
- std::optional<journal_seq_t> > > io_promise;
+ std::optional<seastar::shared_promise<maybe_result_t> > io_promise;
};
class RecordSubmitter {
free_batch_ptrs.pop_front();
}
- void finish_submit_batch(RecordBatch*, std::optional<journal_seq_t>);
+ using maybe_result_t = RecordBatch::maybe_result_t;
+ void finish_submit_batch(RecordBatch*, maybe_result_t);
void flush_current_batch();
- seastar::future<std::pair<paddr_t, journal_seq_t>>
- mark_record_committed_in_order(
- OrderingHandle&, const journal_seq_t&, const record_size_t&);
-
using submit_pending_ertr = JournalSegmentManager::write_ertr;
using submit_pending_ret = submit_pending_ertr::future<
std::pair<paddr_t, journal_seq_t> >;