From: Yingxin Cheng Date: Thu, 21 Oct 2021 07:31:47 +0000 (+0800) Subject: crimson/os/seastore/journal: refactor, introduce JournalSegmentManager X-Git-Tag: v17.1.0~519^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a6d91c3f971ce2c564fe8ad82a66ab6451207cd4;p=ceph.git crimson/os/seastore/journal: refactor, introduce JournalSegmentManager In preparation for the follow-up batching control. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index cdda5c50747..f4b7d4ef032 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -49,55 +49,12 @@ segment_nonce_t generate_nonce( sizeof(meta.seastore_id.uuid)); } -Journal::Journal(SegmentManager &segment_manager, ExtentReader& scanner) - : segment_manager(segment_manager), scanner(scanner) {} - - -Journal::initialize_segment_ertr::future -Journal::initialize_segment(Segment &segment) -{ - auto new_tail = segment_provider->get_journal_tail_target(); - // write out header - ceph_assert(segment.get_write_ptr() == 0); - bufferlist bl; - - segment_seq_t seq = next_journal_segment_seq++; - current_segment_nonce = generate_nonce( - seq, segment_manager.get_meta()); - auto header = segment_header_t{ - seq, - segment.get_segment_id(), - segment_provider->get_journal_tail_target(), - current_segment_nonce, - false}; - logger().debug( - "initialize_segment {} journal_tail_target {}, header {}", - segment.get_segment_id(), - new_tail, - header); - encode(header, bl); - - bufferptr bp( - ceph::buffer::create_page_aligned( - segment_manager.get_block_size())); - bp.zero(); - auto iter = bl.cbegin(); - iter.copy(bl.length(), bp.c_str()); - bl.clear(); - bl.append(bp); - - written_to = segment_manager.get_block_size(); - committed_to = 0; - return segment.write(0, bl).safe_then( - [=] { - segment_provider->update_journal_tail_committed(new_tail); - return seq; - }, - initialize_segment_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in Journal::initialize_segment" - }); -} +Journal::Journal( + SegmentManager& segment_manager, + ExtentReader& scanner) + : journal_segment_manager(segment_manager), + scanner(scanner) +{} Journal::write_record_ret Journal::write_record( record_size_t rsize, @@ -105,98 +62,26 @@ Journal::write_record_ret Journal::write_record( OrderingHandle &handle) { ceph::bufferlist to_write = encode_record( - rsize, std::move(record), segment_manager.get_block_size(), - committed_to, current_segment_nonce); - auto target = written_to; - assert((to_write.length() % segment_manager.get_block_size()) == 0); - written_to += to_write.length(); - logger().debug( - "write_record, mdlength {}, dlength {}, target {}", - rsize.mdlength, - rsize.dlength, - target); - - auto segment_id = current_journal_segment->get_segment_id(); - + rsize, + std::move(record), + journal_segment_manager.get_block_size(), + journal_segment_manager.get_committed_to(), + journal_segment_manager.get_nonce()); // Start write under the current exclusive stage, but wait for it // in the device_submission concurrent stage to permit multiple // overlapping writes. - auto write_fut = current_journal_segment->write(target, to_write); + auto write_fut = journal_segment_manager.write(to_write); return handle.enter(write_pipeline->device_submission ).then([write_fut = std::move(write_fut)]() mutable { - return std::move(write_fut - ).handle_error( - write_record_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in Journal::write_record" - } - ); - }).safe_then([this, &handle] { - return handle.enter(write_pipeline->finalize); - }).safe_then([this, target, segment_id] { - logger().debug( - "write_record: commit target {}", - target); - if (segment_id == current_journal_segment->get_segment_id()) { - assert(committed_to < target); - committed_to = target; - } - return write_record_ret( - write_record_ertr::ready_future_marker{}, - paddr_t{ - segment_id, - target}); - }); -} - -bool Journal::needs_roll(segment_off_t length) const -{ - return length + written_to > - current_journal_segment->get_write_capacity(); -} - -Journal::roll_journal_segment_ertr::future -Journal::roll_journal_segment() -{ - auto old_segment_id = current_journal_segment ? - current_journal_segment->get_segment_id() : - NULL_SEG_ID; - - return (current_journal_segment ? - current_journal_segment->close() : - Segment::close_ertr::now()).safe_then([this] { - return segment_provider->get_segment(segment_manager.get_device_id()); - }).safe_then([this](auto segment) { - return segment_manager.open(segment); - }).safe_then([this](auto sref) { - current_journal_segment = sref; - written_to = 0; - return initialize_segment(*current_journal_segment); - }).safe_then([=](auto seq) { - if (old_segment_id != NULL_SEG_ID) { - segment_provider->close_segment(old_segment_id); - } - segment_provider->set_journal_segment( - current_journal_segment->get_segment_id(), - seq); - return seq; - }).handle_error( - roll_journal_segment_ertr::pass_further{}, - crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); }) - ); -} - -Journal::open_for_write_ret Journal::open_for_write() -{ - return roll_journal_segment().safe_then([this](auto seq) { - return open_for_write_ret( - open_for_write_ertr::ready_future_marker{}, - journal_seq_t{ - seq, - paddr_t{ - current_journal_segment->get_segment_id(), - static_cast(segment_manager.get_block_size())} - }); + return std::move(write_fut); + }).safe_then([this, &handle, rsize](journal_seq_t write_start) { + return handle.enter(write_pipeline->finalize + ).then([this, write_start, rsize] { + auto committed_to = write_start; + committed_to.offset.offset += (rsize.mdlength + rsize.dlength); + journal_segment_manager.mark_committed(committed_to); + return write_start.offset; + }); }); } @@ -218,8 +103,8 @@ Journal::prep_replay_segments( rt.second.journal_segment_seq; }); - next_journal_segment_seq = - segments.rbegin()->second.journal_segment_seq + 1; + journal_segment_manager.set_segment_seq( + segments.rbegin()->second.journal_segment_seq); std::for_each( segments.begin(), segments.end(), @@ -254,7 +139,7 @@ Journal::prep_replay_segments( } else { replay_from = paddr_t{ from->first, - (segment_off_t)segment_manager.get_block_size()}; + (segment_off_t)journal_segment_manager.get_block_size()}; } auto ret = replay_segments_t(segments.end() - from); std::transform( @@ -264,7 +149,7 @@ Journal::prep_replay_segments( p.second.journal_segment_seq, paddr_t{ p.first, - (segment_off_t)segment_manager.get_block_size()}}; + (segment_off_t)journal_segment_manager.get_block_size()}}; logger().debug( "Journal::prep_replay_segments: replaying from {}", ret); @@ -383,4 +268,149 @@ Journal::replay_ret Journal::replay( }); } +Journal::JournalSegmentManager::JournalSegmentManager( + SegmentManager& segment_manager) + : segment_manager{segment_manager} +{ + reset(); +} + +Journal::JournalSegmentManager::close_ertr::future<> +Journal::JournalSegmentManager::close() +{ + return ( + current_journal_segment ? + current_journal_segment->close() : + Segment::close_ertr::now() + ).handle_error( + close_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in JournalSegmentManager::close()" + } + ).finally([this] { + reset(); + }); +} + +Journal::JournalSegmentManager::roll_ertr::future<> +Journal::JournalSegmentManager::roll() +{ + auto old_segment_id = current_journal_segment ? + current_journal_segment->get_segment_id() : + NULL_SEG_ID; + + return ( + current_journal_segment ? + current_journal_segment->close() : + Segment::close_ertr::now() + ).safe_then([this] { + return segment_provider->get_segment(segment_manager.get_device_id()); + }).safe_then([this](auto segment) { + return segment_manager.open(segment); + }).safe_then([this](auto sref) { + current_journal_segment = sref; + return initialize_segment(*current_journal_segment); + }).safe_then([this, old_segment_id] { + if (old_segment_id != NULL_SEG_ID) { + segment_provider->close_segment(old_segment_id); + } + segment_provider->set_journal_segment( + current_journal_segment->get_segment_id(), + get_segment_seq()); + }).handle_error( + roll_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in JournalSegmentManager::roll" + } + ); +} + +Journal::JournalSegmentManager::write_ret +Journal::JournalSegmentManager::write(ceph::bufferlist to_write) +{ + auto write_length = to_write.length(); + auto write_start_seq = get_current_write_seq(); + logger().debug( + "JournalSegmentManager::write: write_start {} => {}, length={}", + write_start_seq, + write_start_seq.offset.offset + write_length, + write_length); + assert((write_length % segment_manager.get_block_size()) == 0); + assert(!needs_roll(write_length)); + + auto write_start_offset = written_to; + written_to += write_length; + return current_journal_segment->write( + write_start_offset, to_write + ).handle_error( + write_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in JournalSegmentManager::write" + } + ).safe_then([write_start_seq] { + return write_start_seq; + }); +} + +void Journal::JournalSegmentManager::mark_committed( + const journal_seq_t& new_committed_to) +{ + logger().debug( + "JournalSegmentManager::mark_committed: committed_to {} => {}", + committed_to, new_committed_to); + assert(new_committed_to.segment_seq <= + get_segment_seq()); + if (new_committed_to.segment_seq == + get_segment_seq()) { + assert(committed_to.offset.offset < new_committed_to.offset.offset); + committed_to = new_committed_to; + } +} + +Journal::JournalSegmentManager::initialize_segment_ertr::future<> +Journal::JournalSegmentManager::initialize_segment(Segment& segment) +{ + auto new_tail = segment_provider->get_journal_tail_target(); + // write out header + ceph_assert(segment.get_write_ptr() == 0); + bufferlist bl; + + segment_seq_t seq = next_journal_segment_seq++; + current_segment_nonce = generate_nonce( + seq, segment_manager.get_meta()); + auto header = segment_header_t{ + seq, + segment.get_segment_id(), + segment_provider->get_journal_tail_target(), + current_segment_nonce, + false}; + logger().debug( + "JournalSegmentManager::initialize_segment: segment_id {} journal_tail_target {}, header {}", + segment.get_segment_id(), + new_tail, + header); + encode(header, bl); + + bufferptr bp( + ceph::buffer::create_page_aligned( + segment_manager.get_block_size())); + bp.zero(); + auto iter = bl.cbegin(); + iter.copy(bl.length(), bp.c_str()); + bl.clear(); + bl.append(bp); + + written_to = 0; + // FIXME: improve committed_to to point to another segment + committed_to = get_current_write_seq(); + return write(bl + ).safe_then([this, new_tail, write_size=bl.length() + ](journal_seq_t write_start_seq) { + auto committed_to = write_start_seq; + committed_to.offset.offset += write_size; + mark_committed(committed_to); + segment_provider->update_journal_tail_committed(new_tail); + }); +} + } diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index 7c6eb51074c..064020dcc1c 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -4,6 +4,7 @@ #pragma once #include +#include #include @@ -34,7 +35,7 @@ public: * Gets the current journal segment sequence. */ segment_seq_t get_segment_seq() const { - return next_journal_segment_seq - 1; + return journal_segment_manager.get_segment_seq(); } /** @@ -48,6 +49,7 @@ public: */ void set_segment_provider(SegmentProvider *provider) { segment_provider = provider; + journal_segment_manager.set_segment_provider(provider); } /** @@ -59,7 +61,9 @@ public: crimson::ct_error::input_output_error >; using open_for_write_ret = open_for_write_ertr::future; - open_for_write_ret open_for_write(); + open_for_write_ret open_for_write() { + return journal_segment_manager.open(); + } /** * close journal @@ -69,19 +73,7 @@ public: using close_ertr = crimson::errorator< crimson::ct_error::input_output_error>; close_ertr::future<> close() { - return ( - current_journal_segment ? - current_journal_segment->close() : - Segment::close_ertr::now() - ).handle_error( - close_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Error during Journal::close()" - } - ).finally([this] { - current_journal_segment.reset(); - reset_soft_state(); - }); + return journal_segment_manager.close(); } /** @@ -102,23 +94,24 @@ public: ) { assert(write_pipeline); auto rsize = get_encoded_record_length( - record, segment_manager.get_block_size()); + record, journal_segment_manager.get_block_size()); auto total = rsize.mdlength + rsize.dlength; - if (total > max_record_length()) { + auto max_record_length = journal_segment_manager.get_max_write_length(); + if (total > max_record_length) { auto &logger = crimson::get_logger(ceph_subsys_seastore); logger.error( "Journal::submit_record: record size {} exceeds max {}", total, - max_record_length() + max_record_length ); return crimson::ct_error::erange::make(); } - auto roll = needs_roll(total) - ? roll_journal_segment().safe_then([](auto){}) - : roll_journal_segment_ertr::now(); + auto roll = journal_segment_manager.needs_roll(total) + ? journal_segment_manager.roll() + : JournalSegmentManager::roll_ertr::now(); return roll.safe_then( [this, rsize, record=std::move(record), &handle]() mutable { - auto seq = next_journal_segment_seq - 1; + auto seq = journal_segment_manager.get_segment_seq(); return write_record( rsize, std::move(record), handle @@ -151,32 +144,111 @@ public: } private: - SegmentProvider *segment_provider = nullptr; - SegmentManager &segment_manager; + class JournalSegmentManager { + public: + JournalSegmentManager(SegmentManager&); + + using base_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + extent_len_t get_max_write_length() const { + return segment_manager.get_segment_size() - + p2align(ceph::encoded_sizeof_bounded(), + size_t(segment_manager.get_block_size())); + } - segment_seq_t next_journal_segment_seq = 0; - segment_nonce_t current_segment_nonce = 0; + segment_off_t get_block_size() const { + return segment_manager.get_block_size(); + } - SegmentRef current_journal_segment; - segment_off_t written_to = 0; - segment_off_t committed_to = 0; + segment_nonce_t get_nonce() const { + return current_segment_nonce; + } - ExtentReader& scanner; - WritePipeline *write_pipeline = nullptr; + segment_off_t get_committed_to() const { + assert(committed_to.segment_seq == + get_segment_seq()); + return committed_to.offset.offset; + } - void reset_soft_state() { - next_journal_segment_seq = 0; - current_segment_nonce = 0; - written_to = 0; - committed_to = 0; - } + segment_seq_t get_segment_seq() const { + return next_journal_segment_seq - 1; + } - /// prepare segment for writes, writes out segment header - using initialize_segment_ertr = crimson::errorator< - crimson::ct_error::input_output_error>; - initialize_segment_ertr::future initialize_segment( - Segment &segment); + void set_segment_provider(SegmentProvider* provider) { + segment_provider = provider; + } + + void set_segment_seq(segment_seq_t current_seq) { + next_journal_segment_seq = (current_seq + 1); + } + + using open_ertr = base_ertr; + using open_ret = open_ertr::future; + open_ret open() { + return roll().safe_then([this] { + return get_current_write_seq(); + }); + } + + using close_ertr = base_ertr; + close_ertr::future<> close(); + // returns true iff the current segment has insufficient space + bool needs_roll(std::size_t length) const { + auto write_capacity = current_journal_segment->get_write_capacity(); + return length + written_to > std::size_t(write_capacity); + } + + // close the current segment and initialize next one + using roll_ertr = base_ertr; + roll_ertr::future<> roll(); + + // write the buffer, return the write start + // May be called concurrently, writes may complete in any order. + using write_ertr = base_ertr; + using write_ret = write_ertr::future; + write_ret write(ceph::bufferlist to_write); + + // mark write committed in order + void mark_committed(const journal_seq_t& new_committed_to); + + private: + journal_seq_t get_current_write_seq() const { + assert(current_journal_segment); + return journal_seq_t{ + get_segment_seq(), + {current_journal_segment->get_segment_id(), written_to} + }; + } + + void reset() { + next_journal_segment_seq = 0; + current_segment_nonce = 0; + current_journal_segment.reset(); + written_to = 0; + committed_to = {}; + } + + // prepare segment for writes, writes out segment header + using initialize_segment_ertr = base_ertr; + initialize_segment_ertr::future<> initialize_segment(Segment&); + + SegmentProvider* segment_provider; + SegmentManager& segment_manager; + + segment_seq_t next_journal_segment_seq; + segment_nonce_t current_segment_nonce; + + SegmentRef current_journal_segment; + segment_off_t written_to; + // committed_to may be in a previous journal segment + journal_seq_t committed_to; + }; + + SegmentProvider* segment_provider = nullptr; + JournalSegmentManager journal_segment_manager; + ExtentReader& scanner; + WritePipeline *write_pipeline = nullptr; /// do record write using write_record_ertr = crimson::errorator< @@ -187,14 +259,6 @@ private: record_t &&record, OrderingHandle &handle); - /// close current segment and initialize next one - using roll_journal_segment_ertr = crimson::errorator< - crimson::ct_error::input_output_error>; - roll_journal_segment_ertr::future roll_journal_segment(); - - /// returns true iff current segment has insufficient space - bool needs_roll(segment_off_t length) const; - /// return ordered vector of segments to replay using replay_segments_t = std::vector< std::pair>; @@ -219,19 +283,7 @@ private: segment_header_t header, ///< [in] segment header delta_handler_t &delta_handler ///< [in] processes deltas in order ); - - extent_len_t max_record_length() const; }; using JournalRef = std::unique_ptr; } - -namespace crimson::os::seastore { - -inline extent_len_t Journal::max_record_length() const { - return segment_manager.get_segment_size() - - p2align(ceph::encoded_sizeof_bounded(), - size_t(segment_manager.get_block_size())); -} - -}