From: Yingxin Cheng Date: Mon, 8 Nov 2021 03:52:50 +0000 (+0800) Subject: crimson/os/seastore/journal: fix updates to journal seq head and target X-Git-Tag: v17.1.0~476^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=9fbc6957af28d0521ac926322e9c96a3024a95bb;p=ceph.git crimson/os/seastore/journal: fix updates to journal seq head and target * update journal sequences at the write boundary; * update journal head to the sequence of write end; Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index eb7084418a515..408acf2774197 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -207,9 +207,15 @@ Journal::replay_segment( seq.segment_seq)) { return replay_ertr::now(); } else { - return handler( - journal_seq_t{seq.segment_seq, base}, + auto offsets = submit_result_t{ base.add_offset(header.mdlength), + write_result_t{ + journal_seq_t{seq.segment_seq, base}, + static_cast(header.mdlength + header.dlength) + } + }; + return handler( + offsets, delta); } }); @@ -440,41 +446,33 @@ Journal::RecordBatch::add_pending( assert(state != state_t::SUBMITTING); assert(can_batch(rsize)); - auto record_start_offset = encoded_length; + auto block_start_offset = encoded_length + rsize.mdlength; records.push_back(std::move(record)); record_sizes.push_back(rsize); auto new_encoded_length = get_encoded_length(rsize); assert(new_encoded_length < MAX_SEG_OFF); encoded_length = new_encoded_length; - bool is_first; if (state == state_t::EMPTY) { assert(!io_promise.has_value()); io_promise = seastar::shared_promise(); - is_first = true; } else { assert(io_promise.has_value()); - is_first = false; } state = state_t::PENDING; return io_promise->get_shared_future( - ).then([record_start_offset, is_first + ).then([block_start_offset ](auto maybe_write_result) -> add_pending_ret { if (!maybe_write_result.has_value()) { return crimson::ct_error::input_output_error::make(); } - auto write_result = maybe_write_result.value(); - if (is_first) { - assert(record_start_offset == 0); - } else { - assert(record_start_offset > 0); - write_result.write_start_seq.offset.offset += record_start_offset; - // only the first record should update JournalSegmentManager::committed_to - write_result.write_length = 0; - } + auto submit_result = submit_result_t{ + maybe_write_result->start_seq.offset.add_offset(block_start_offset), + *maybe_write_result + }; return add_pending_ret( add_pending_ertr::ready_future_marker{}, - write_result); + submit_result); }); } @@ -515,8 +513,9 @@ void Journal::RecordBatch::set_result( logger().debug( "Journal::RecordBatch::set_result: batches={}, write_start {} + {}", records.size(), - maybe_write_result->write_start_seq, - maybe_write_result->write_length); + maybe_write_result->start_seq, + maybe_write_result->length); + assert(maybe_write_result->length == encoded_length); } else { logger().error( "Journal::RecordBatch::set_result: batches={}, write is failed!", @@ -673,7 +672,12 @@ Journal::RecordSubmitter::submit_pending( journal_segment_manager.get_committed_to(), journal_segment_manager.get_nonce()); return journal_segment_manager.write(to_write - ).finally([this] { + ).safe_then([rsize](auto write_result) { + return submit_result_t{ + write_result.start_seq.offset.add_offset(rsize.mdlength), + write_result + }; + }).finally([this] { decrement_io_with_flush(); }); } else { @@ -689,17 +693,12 @@ Journal::RecordSubmitter::submit_pending( return handle.enter(write_pipeline->device_submission ).then([write_fut=std::move(write_fut)]() mutable { return std::move(write_fut); - }).safe_then([this, &handle, rsize](auto write_result) { + }).safe_then([this, &handle](auto submit_result) { return handle.enter(write_pipeline->finalize - ).then([this, write_result, rsize] { - if (write_result.write_length > 0) { - auto committed_to = write_result.write_start_seq; - committed_to.offset.offset += write_result.write_length; - journal_segment_manager.mark_committed(committed_to); - } - return std::make_pair( - write_result.write_start_seq.offset.add_offset(rsize.mdlength), - write_result.write_start_seq); + ).then([this, submit_result] { + journal_segment_manager.mark_committed( + submit_result.write_result.get_end_seq()); + return submit_result; }); }); } diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index 893968822c20b..dec7c89ab54f4 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -83,14 +83,26 @@ public: /** * submit_record * - * @param write record and returns offset of first block and seq + * write record with the ordering handle */ + struct write_result_t { + journal_seq_t start_seq; + segment_off_t length; + + journal_seq_t get_end_seq() const { + return start_seq.add_offset(length); + } + }; + struct submit_result_t { + paddr_t record_block_base; + write_result_t write_result; + }; using submit_record_ertr = crimson::errorator< crimson::ct_error::erange, crimson::ct_error::input_output_error >; using submit_record_ret = submit_record_ertr::future< - std::pair + submit_result_t >; submit_record_ret submit_record( record_t &&record, @@ -108,8 +120,7 @@ public: using replay_ertr = SegmentManager::read_ertr; using replay_ret = replay_ertr::future<>; using delta_handler_t = std::function< - replay_ret(journal_seq_t seq, - paddr_t record_block_base, + replay_ret(const submit_result_t&, const delta_info_t&)>; replay_ret replay( std::vector>&& segment_headers, @@ -177,12 +188,8 @@ private: using roll_ertr = base_ertr; roll_ertr::future<> roll(); - // write the buffer, return the write start + // write the buffer, return the write result // May be called concurrently, writes may complete in any order. - struct write_result_t { - journal_seq_t write_start_seq; - segment_off_t write_length; - }; using write_ertr = base_ertr; using write_ret = write_ertr::future; write_ret write(ceph::bufferlist to_write); @@ -285,9 +292,8 @@ private: // // Set write_result_t::write_length to 0 if the record is not the first one // in the batch. - using add_pending_result_t = JournalSegmentManager::write_result_t; using add_pending_ertr = JournalSegmentManager::write_ertr; - using add_pending_ret = JournalSegmentManager::write_ret; + using add_pending_ret = add_pending_ertr::future; add_pending_ret add_pending(record_t&&, const record_size_t&); // Encode the batched records for write. @@ -297,8 +303,8 @@ private: segment_nonce_t segment_nonce); // Set the write result and reset for reuse - using maybe_result_t = std::optional; - void set_result(maybe_result_t maybe_write_result); + using maybe_result_t = std::optional; + void set_result(maybe_result_t maybe_write_end_seq); // The fast path that is equivalent to submit a single record as a batch. // @@ -419,7 +425,7 @@ private: using submit_pending_ertr = JournalSegmentManager::write_ertr; using submit_pending_ret = submit_pending_ertr::future< - std::pair >; + submit_result_t>; submit_pending_ret submit_pending( record_t&&, const record_size_t&, OrderingHandle &handle, bool flush); diff --git a/src/crimson/os/seastore/seastore_types.h b/src/crimson/os/seastore/seastore_types.h index a7fc29a46ffc3..8ce7e8d77b063 100644 --- a/src/crimson/os/seastore/seastore_types.h +++ b/src/crimson/os/seastore/seastore_types.h @@ -585,6 +585,10 @@ struct journal_seq_t { segment_seq_t segment_seq = 0; paddr_t offset; + journal_seq_t add_offset(segment_off_t o) const { + return {segment_seq, offset.add_offset(o)}; + } + DENC(journal_seq_t, v, p) { DENC_START(1, 1, p); denc(v.segment_seq, p); diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc index 5d52fa83d2c17..643becf009b1d 100644 --- a/src/crimson/os/seastore/transaction_manager.cc +++ b/src/crimson/os/seastore/transaction_manager.cc @@ -75,11 +75,14 @@ TransactionManager::mount_ertr::future<> TransactionManager::mount() [this](auto&& segments) { return journal->replay( std::move(segments), - [this](auto seq, auto paddr, const auto &e) { - auto fut = cache->replay_delta(seq, paddr, e); + [this](const auto &offsets, const auto &e) { + auto start_seq = offsets.write_result.start_seq; segment_cleaner->update_journal_tail_target( - cache->get_oldest_dirty_from().value_or(seq)); - return fut; + cache->get_oldest_dirty_from().value_or(start_seq)); + return cache->replay_delta( + start_seq, + offsets.record_block_base, + e); }); }).safe_then([this] { return journal->open_for_write(); @@ -273,14 +276,23 @@ TransactionManager::submit_transaction_direct( DEBUGT("about to submit to journal", tref); return journal->submit_record(std::move(record), tref.get_handle() - ).safe_then([this, FNAME, &tref](auto p) mutable { - auto [addr, journal_seq] = p; - DEBUGT("journal commit to {} seq {}", tref, addr, journal_seq); - segment_cleaner->set_journal_head(journal_seq); - cache->complete_commit(tref, addr, journal_seq, segment_cleaner.get()); + ).safe_then([this, FNAME, &tref](auto submit_result) mutable { + auto start_seq = submit_result.write_result.start_seq; + auto end_seq = submit_result.write_result.get_end_seq(); + DEBUGT("journal commit to record_block_base={}, start_seq={}, end_seq={}", + tref, + submit_result.record_block_base, + start_seq, + end_seq); + segment_cleaner->set_journal_head(end_seq); + cache->complete_commit( + tref, + submit_result.record_block_base, + start_seq, + segment_cleaner.get()); lba_manager->complete_transaction(tref); segment_cleaner->update_journal_tail_target( - cache->get_oldest_dirty_from().value_or(journal_seq)); + cache->get_oldest_dirty_from().value_or(start_seq)); auto to_release = tref.get_segment_to_release(); if (to_release != NULL_SEG_ID) { return segment_manager.release(to_release diff --git a/src/test/crimson/seastore/test_btree_lba_manager.cc b/src/test/crimson/seastore/test_btree_lba_manager.cc index 4c4bec1478059..9bfa40e4f3955 100644 --- a/src/test/crimson/seastore/test_btree_lba_manager.cc +++ b/src/test/crimson/seastore/test_btree_lba_manager.cc @@ -58,9 +58,11 @@ struct btree_test_base : { auto record = cache->prepare_record(*t); return journal->submit_record(std::move(record), t->get_handle()).safe_then( - [this, t=std::move(t)](auto p) mutable { - auto [addr, seq] = p; - cache->complete_commit(*t, addr, seq); + [this, t=std::move(t)](auto submit_result) mutable { + cache->complete_commit( + *t, + submit_result.record_block_base, + submit_result.write_result.start_seq); complete_commit(*t); }).handle_error(crimson::ct_error::assert_all{}); } diff --git a/src/test/crimson/seastore/test_seastore_journal.cc b/src/test/crimson/seastore/test_seastore_journal.cc index bb69c81750259..366f3f3b9b8a1 100644 --- a/src/test/crimson/seastore/test_seastore_journal.cc +++ b/src/test/crimson/seastore/test_seastore_journal.cc @@ -193,11 +193,11 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider { replay( [&advance, &delta_checker] - (auto seq, auto base, const auto &di) mutable { + (const auto &offsets, const auto &di) mutable { if (!delta_checker) { EXPECT_FALSE("No Deltas Left"); } - if (!(*delta_checker)(base, di)) { + if (!(*delta_checker)(offsets.record_block_base, di)) { delta_checker = std::nullopt; advance(); }