From: Yingxin Cheng Date: Fri, 5 Nov 2021 08:00:16 +0000 (+0800) Subject: crimson/os/seastore/journal: mark committed_to at the write boundary X-Git-Tag: v17.1.0~476^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c6b3b92fcfeaf573c78703d1cc98fa9cae3923cb;p=ceph.git crimson/os/seastore/journal: mark committed_to at the write boundary Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index b2d4aaf43ea08..eb7084418a515 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -355,11 +355,16 @@ Journal::JournalSegmentManager::write(ceph::bufferlist to_write) write_start_seq, write_start_seq.offset.offset + write_length, write_length); + assert(write_length > 0); assert((write_length % segment_manager.get_block_size()) == 0); assert(!needs_roll(write_length)); auto write_start_offset = written_to; written_to += write_length; + auto write_result = write_result_t{ + write_start_seq, + static_cast(write_length) + }; return current_journal_segment->write( write_start_offset, to_write ).handle_error( @@ -367,8 +372,8 @@ Journal::JournalSegmentManager::write(ceph::bufferlist to_write) crimson::ct_error::assert_all{ "Invalid error in JournalSegmentManager::write" } - ).safe_then([write_start_seq] { - return write_start_seq; + ).safe_then([write_result] { + return write_result; }); } @@ -418,8 +423,7 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment) written_to = 0; return write(bl - ).safe_then([this, new_tail, write_size=bl.length() - ](journal_seq_t write_start_seq) { + ).safe_then([this, new_tail](auto) { segment_provider->update_journal_tail_committed(new_tail); }); } @@ -442,25 +446,35 @@ Journal::RecordBatch::add_pending( auto new_encoded_length = get_encoded_length(rsize); assert(new_encoded_length < MAX_SEG_OFF); encoded_length = new_encoded_length; + bool is_first; if (state == state_t::EMPTY) { assert(!io_promise.has_value()); - io_promise = seastar::shared_promise >(); + io_promise = seastar::shared_promise(); + is_first = true; } else { assert(io_promise.has_value()); + is_first = false; } state = state_t::PENDING; return io_promise->get_shared_future( - ).then([record_start_offset - ](auto batch_write_start) -> add_pending_ret { - if (!batch_write_start.has_value()) { + ).then([record_start_offset, is_first + ](auto maybe_write_result) -> add_pending_ret { + if (!maybe_write_result.has_value()) { return crimson::ct_error::input_output_error::make(); } - auto record_write_start = batch_write_start.value(); - record_write_start.offset.offset += record_start_offset; + auto write_result = maybe_write_result.value(); + if (is_first) { + assert(record_start_offset == 0); + } else { + assert(record_start_offset > 0); + write_result.write_start_seq.offset.offset += record_start_offset; + // only the first record should update JournalSegmentManager::committed_to + write_result.write_length = 0; + } return add_pending_ret( add_pending_ertr::ready_future_marker{}, - record_write_start); + write_result); }); } @@ -495,14 +509,14 @@ ceph::bufferlist Journal::RecordBatch::encode_records( } void Journal::RecordBatch::set_result( - std::optional batch_write_start) + maybe_result_t maybe_write_result) { - if (batch_write_start.has_value()) { + if (maybe_write_result.has_value()) { logger().debug( - "Journal::RecordBatch::set_result: batches={}, write_start {} => {}", + "Journal::RecordBatch::set_result: batches={}, write_start {} + {}", records.size(), - *batch_write_start, - batch_write_start->offset.offset + encoded_length); + maybe_write_result->write_start_seq, + maybe_write_result->write_length); } else { logger().error( "Journal::RecordBatch::set_result: batches={}, write is failed!", @@ -515,7 +529,7 @@ void Journal::RecordBatch::set_result( encoded_length = 0; records.clear(); record_sizes.clear(); - io_promise->set_value(batch_write_start); + io_promise->set_value(maybe_write_result); io_promise.reset(); } @@ -600,10 +614,10 @@ void Journal::RecordSubmitter::update_state() void Journal::RecordSubmitter::finish_submit_batch( RecordBatch* p_batch, - std::optional result) + maybe_result_t maybe_result) { assert(p_batch->is_submitting()); - p_batch->set_result(result); + p_batch->set_result(maybe_result); free_batch_ptrs.push_back(p_batch); decrement_io_with_flush(); } @@ -621,8 +635,8 @@ void Journal::RecordSubmitter::flush_current_batch() journal_segment_manager.get_committed_to(), journal_segment_manager.get_nonce()); std::ignore = journal_segment_manager.write(to_write - ).safe_then([this, p_batch](journal_seq_t write_start) { - finish_submit_batch(p_batch, write_start); + ).safe_then([this, p_batch](auto write_result) { + finish_submit_batch(p_batch, write_result); }).handle_error( crimson::ct_error::all_same_way([this, p_batch](auto e) { logger().error( @@ -638,23 +652,6 @@ void Journal::RecordSubmitter::flush_current_batch() }); } -seastar::future> -Journal::RecordSubmitter::mark_record_committed_in_order( - OrderingHandle& handle, - const journal_seq_t& write_start_seq, - const record_size_t& rsize) -{ - return handle.enter(write_pipeline->finalize - ).then([this, write_start_seq, rsize] { - auto committed_to = write_start_seq; - committed_to.offset.offset += (rsize.mdlength + rsize.dlength); - journal_segment_manager.mark_committed(committed_to); - return std::make_pair( - write_start_seq.offset.add_offset(rsize.mdlength), - write_start_seq); - }); -} - Journal::RecordSubmitter::submit_pending_ret Journal::RecordSubmitter::submit_pending( record_t&& record, @@ -692,8 +689,18 @@ Journal::RecordSubmitter::submit_pending( return handle.enter(write_pipeline->device_submission ).then([write_fut=std::move(write_fut)]() mutable { return std::move(write_fut); - }).safe_then([this, &handle, rsize](journal_seq_t write_start) { - return mark_record_committed_in_order(handle, write_start, rsize); + }).safe_then([this, &handle, rsize](auto write_result) { + return handle.enter(write_pipeline->finalize + ).then([this, write_result, rsize] { + if (write_result.write_length > 0) { + auto committed_to = write_result.write_start_seq; + committed_to.offset.offset += write_result.write_length; + journal_segment_manager.mark_committed(committed_to); + } + return std::make_pair( + write_result.write_start_seq.offset.add_offset(rsize.mdlength), + write_result.write_start_seq); + }); }); } diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index be7c49695617a..893968822c20b 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -179,8 +179,12 @@ private: // write the buffer, return the write start // May be called concurrently, writes may complete in any order. + struct write_result_t { + journal_seq_t write_start_seq; + segment_off_t write_length; + }; using write_ertr = base_ertr; - using write_ret = write_ertr::future; + using write_ret = write_ertr::future; write_ret write(ceph::bufferlist to_write); // mark write committed in order @@ -278,6 +282,10 @@ private: // Add to the batch, the future will be resolved after the batch is // written. + // + // Set write_result_t::write_length to 0 if the record is not the first one + // in the batch. + using add_pending_result_t = JournalSegmentManager::write_result_t; using add_pending_ertr = JournalSegmentManager::write_ertr; using add_pending_ret = JournalSegmentManager::write_ret; add_pending_ret add_pending(record_t&&, const record_size_t&); @@ -289,7 +297,8 @@ private: segment_nonce_t segment_nonce); // Set the write result and reset for reuse - void set_result(std::optional batch_write_start); + using maybe_result_t = std::optional; + void set_result(maybe_result_t maybe_write_result); // The fast path that is equivalent to submit a single record as a batch. // @@ -319,8 +328,7 @@ private: segment_off_t encoded_length = 0; std::vector records; std::vector record_sizes; - std::optional > > io_promise; + std::optional > io_promise; }; class RecordSubmitter { @@ -404,14 +412,11 @@ private: free_batch_ptrs.pop_front(); } - void finish_submit_batch(RecordBatch*, std::optional); + using maybe_result_t = RecordBatch::maybe_result_t; + void finish_submit_batch(RecordBatch*, maybe_result_t); void flush_current_batch(); - seastar::future> - mark_record_committed_in_order( - OrderingHandle&, const journal_seq_t&, const record_size_t&); - using submit_pending_ertr = JournalSegmentManager::write_ertr; using submit_pending_ret = submit_pending_ertr::future< std::pair >;