From: Yingxin Cheng Date: Fri, 18 Feb 2022 06:47:17 +0000 (+0800) Subject: crimson/os/seastore: introduce SegmentAllocator and integrate with Journal X-Git-Tag: v18.0.0~1337^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=b0db4be190390a1ed8325425a8669a22dc028ea4;p=ceph.git crimson/os/seastore: introduce SegmentAllocator and integrate with Journal Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/os/seastore/CMakeLists.txt b/src/crimson/os/seastore/CMakeLists.txt index 5664bcbdb66e5..cec75b10471d4 100644 --- a/src/crimson/os/seastore/CMakeLists.txt +++ b/src/crimson/os/seastore/CMakeLists.txt @@ -39,6 +39,7 @@ set(crimson_seastore_srcs random_block_manager/nvme_manager.cc random_block_manager/nvmedevice.cc journal/segmented_journal.cc + journal/segment_allocator.cc journal.cc ../../../test/crimson/seastore/test_block.cc ${PROJECT_SOURCE_DIR}/src/os/Transaction.cc diff --git a/src/crimson/os/seastore/journal/segment_allocator.cc b/src/crimson/os/seastore/journal/segment_allocator.cc new file mode 100644 index 0000000000000..b1245c61a17e5 --- /dev/null +++ b/src/crimson/os/seastore/journal/segment_allocator.cc @@ -0,0 +1,210 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#include "segment_allocator.h" + +#include "crimson/os/seastore/logging.h" +#include "crimson/os/seastore/segment_cleaner.h" + +SET_SUBSYS(seastore_journal); + +namespace crimson::os::seastore::journal { + +static segment_nonce_t generate_nonce( + segment_seq_t seq, + const seastore_meta_t &meta) +{ + return ceph_crc32c( + seq, + reinterpret_cast(meta.seastore_id.bytes()), + sizeof(meta.seastore_id.uuid)); +} + +SegmentAllocator::SegmentAllocator( + segment_type_t type, + SegmentProvider &sp, + SegmentManager &sm) + : type{type}, + segment_provider{sp}, + segment_manager{sm} +{ + ceph_assert(type != segment_type_t::NULL_SEG); + reset(); +} + +void SegmentAllocator::set_next_segment_seq(segment_seq_t seq) +{ + LOG_PREFIX(SegmentAllocator::set_next_segment_seq); + INFO("{} {} next_segment_seq={}", + type, get_device_id(), segment_seq_printer_t{seq}); + assert(type == segment_seq_to_type(seq)); + next_segment_seq = seq; +} + +SegmentAllocator::open_ertr::future +SegmentAllocator::open() +{ + LOG_PREFIX(SegmentAllocator::open); + ceph_assert(!current_segment); + segment_seq_t new_segment_seq; + if (type == segment_type_t::JOURNAL) { + new_segment_seq = next_segment_seq++; + } else { // OOL + new_segment_seq = next_segment_seq; + } + assert(new_segment_seq == get_current_segment_seq()); + ceph_assert(segment_seq_to_type(new_segment_seq) == type); + auto new_segment_id = segment_provider.get_segment( + get_device_id(), new_segment_seq); + return segment_manager.open(new_segment_id + ).handle_error( + open_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in SegmentAllocator::open open" + } + ).safe_then([this, FNAME, new_segment_seq](auto sref) { + // initialize new segment + journal_seq_t new_journal_tail; + if (type == segment_type_t::JOURNAL) { + new_journal_tail = segment_provider.get_journal_tail_target(); + current_segment_nonce = generate_nonce( + new_segment_seq, segment_manager.get_meta()); + } else { // OOL + new_journal_tail = NO_DELTAS; + assert(current_segment_nonce == 0); + } + segment_id_t segment_id = sref->get_segment_id(); + auto header = segment_header_t{ + new_segment_seq, + segment_id, + new_journal_tail, + current_segment_nonce}; + INFO("{} {} writing header to new segment ... -- {}", + type, get_device_id(), header); + + auto header_length = segment_manager.get_block_size(); + bufferlist bl; + encode(header, bl); + bufferptr bp(ceph::buffer::create_page_aligned(header_length)); + bp.zero(); + auto iter = bl.cbegin(); + iter.copy(bl.length(), bp.c_str()); + bl.clear(); + bl.append(bp); + + ceph_assert(sref->get_write_ptr() == 0); + assert((unsigned)header_length == bl.length()); + written_to = header_length; + auto new_journal_seq = journal_seq_t{ + new_segment_seq, + paddr_t::make_seg_paddr(segment_id, written_to)}; + return sref->write(0, bl + ).handle_error( + open_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in SegmentAllocator::open write" + } + ).safe_then([this, + FNAME, + new_journal_seq, + new_journal_tail, + sref=std::move(sref)]() mutable { + ceph_assert(!current_segment); + current_segment = std::move(sref); + if (type == segment_type_t::JOURNAL) { + segment_provider.update_journal_tail_committed(new_journal_tail); + } + DEBUG("{} {} rolled new segment id={}", + type, get_device_id(), current_segment->get_segment_id()); + ceph_assert(new_journal_seq.segment_seq == get_current_segment_seq()); + return new_journal_seq; + }); + }); +} + +SegmentAllocator::roll_ertr::future<> +SegmentAllocator::roll() +{ + ceph_assert(can_write()); + return close_segment(true).safe_then([this] { + return open().discard_result(); + }); +} + +SegmentAllocator::write_ret +SegmentAllocator::write(ceph::bufferlist to_write) +{ + LOG_PREFIX(SegmentAllocator::write); + assert(can_write()); + auto write_length = to_write.length(); + auto write_start_offset = written_to; + auto write_start_seq = journal_seq_t{ + get_current_segment_seq(), + paddr_t::make_seg_paddr( + current_segment->get_segment_id(), write_start_offset) + }; + TRACE("{} {} {}~{}", type, get_device_id(), write_start_seq, write_length); + assert(write_length > 0); + assert((write_length % segment_manager.get_block_size()) == 0); + assert(!needs_roll(write_length)); + + auto write_result = write_result_t{ + write_start_seq, + static_cast(write_length) + }; + written_to += write_length; + return current_segment->write( + write_start_offset, to_write + ).handle_error( + write_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in SegmentAllocator::write" + } + ).safe_then([write_result] { + return write_result; + }); +} + +SegmentAllocator::close_ertr::future<> +SegmentAllocator::close() +{ + return [this] { + LOG_PREFIX(SegmentAllocator::close); + if (current_segment) { + return close_segment(false); + } else { + INFO("{} {} no current segment", type, get_device_id()); + return close_segment_ertr::now(); + } + }().finally([this] { + reset(); + }); +} + +SegmentAllocator::close_segment_ertr::future<> +SegmentAllocator::close_segment(bool is_rolling) +{ + LOG_PREFIX(SegmentAllocator::close_segment); + assert(can_write()); + auto seg_to_close = std::move(current_segment); + auto close_segment_id = seg_to_close->get_segment_id(); + INFO("{} {} close segment id={}, seq={}, written_to={}, nonce={}", + type, get_device_id(), + close_segment_id, + segment_seq_printer_t{get_current_segment_seq()}, + written_to, + current_segment_nonce); + if (is_rolling) { + segment_provider.close_segment(close_segment_id); + } + return seg_to_close->close( + ).safe_then([seg_to_close=std::move(seg_to_close)] { + }).handle_error( + close_segment_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in SegmentAllocator::close_segment" + } + ); +} + +} diff --git a/src/crimson/os/seastore/journal/segment_allocator.h b/src/crimson/os/seastore/journal/segment_allocator.h new file mode 100644 index 0000000000000..ad36cfd259352 --- /dev/null +++ b/src/crimson/os/seastore/journal/segment_allocator.h @@ -0,0 +1,124 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include "include/buffer.h" + +#include "crimson/common/errorator.h" +#include "crimson/os/seastore/segment_manager.h" + +namespace crimson::os::seastore { + class SegmentProvider; +} + +namespace crimson::os::seastore::journal { + +/** + * SegmentAllocator + * + * Maintain an available segment for writes. + */ +class SegmentAllocator { + using base_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + + public: + SegmentAllocator(segment_type_t type, + SegmentProvider &sp, + SegmentManager &sm); + + device_id_t get_device_id() const { + return segment_manager.get_device_id(); + } + + seastore_off_t get_block_size() const { + return segment_manager.get_block_size(); + } + + extent_len_t get_max_write_length() const { + return segment_manager.get_segment_size() - + p2align(ceph::encoded_sizeof_bounded(), + size_t(segment_manager.get_block_size())); + } + + device_segment_id_t get_num_segments() const { + return segment_manager.get_num_segments(); + } + + bool can_write() const { + return !!current_segment; + } + + segment_nonce_t get_nonce() const { + assert(can_write()); + return current_segment_nonce; + } + + void set_next_segment_seq(segment_seq_t); + + // returns true iff the current segment has insufficient space + bool needs_roll(std::size_t length) const { + assert(can_write()); + auto write_capacity = current_segment->get_write_capacity(); + return length + written_to > std::size_t(write_capacity); + } + + // open for write + using open_ertr = base_ertr; + using open_ret = open_ertr::future; + open_ret open(); + + // close the current segment and initialize next one + using roll_ertr = base_ertr; + roll_ertr::future<> roll(); + + // write the buffer, return the write result + // + // May be called concurrently, but writes may complete in any order. + // If rolling/opening, no write is allowed. + using write_ertr = base_ertr; + using write_ret = write_ertr::future; + write_ret write(ceph::bufferlist to_write); + + using close_ertr = base_ertr; + close_ertr::future<> close(); + + private: + void reset() { + current_segment.reset(); + if (type == segment_type_t::JOURNAL) { + next_segment_seq = 0; + } else { // OOL + next_segment_seq = OOL_SEG_SEQ; + } + current_segment_nonce = 0; + written_to = 0; + } + + // FIXME: remove the unnecessary is_rolling + using close_segment_ertr = base_ertr; + close_segment_ertr::future<> close_segment(bool is_rolling); + + segment_seq_t get_current_segment_seq() const { + segment_seq_t ret; + if (type == segment_type_t::JOURNAL) { + assert(next_segment_seq != 0); + ret = next_segment_seq - 1; + } else { // OOL + ret = next_segment_seq; + } + assert(segment_seq_to_type(ret) == type); + return ret; + } + + const segment_type_t type; // JOURNAL or OOL + SegmentProvider &segment_provider; + SegmentManager &segment_manager; + SegmentRef current_segment; + segment_seq_t next_segment_seq; + segment_nonce_t current_segment_nonce; + seastore_off_t written_to; +}; + +} diff --git a/src/crimson/os/seastore/journal/segmented_journal.cc b/src/crimson/os/seastore/journal/segmented_journal.cc index 851e6cc2182cf..ce677b55c9d8d 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.cc +++ b/src/crimson/os/seastore/journal/segmented_journal.cc @@ -26,22 +26,14 @@ SET_SUBSYS(seastore_journal); namespace crimson::os::seastore::journal { -segment_nonce_t generate_nonce( - segment_seq_t seq, - const seastore_meta_t &meta) -{ - return ceph_crc32c( - seq, - reinterpret_cast(meta.seastore_id.bytes()), - sizeof(meta.seastore_id.uuid)); -} - SegmentedJournal::SegmentedJournal( SegmentManager &segment_manager, ExtentReader &scanner, SegmentProvider &segment_provider) : segment_provider(segment_provider), - journal_segment_manager(segment_manager, segment_provider), + journal_segment_allocator(segment_type_t::JOURNAL, + segment_provider, + segment_manager), record_submitter(crimson::common::get_conf( "seastore_journal_iodepth_limit"), crimson::common::get_conf( @@ -50,7 +42,7 @@ SegmentedJournal::SegmentedJournal( "seastore_journal_batch_flush_size"), crimson::common::get_conf( "seastore_journal_batch_preferred_fullness"), - journal_segment_manager), + journal_segment_allocator), scanner(scanner) { register_metrics(); @@ -59,8 +51,8 @@ SegmentedJournal::SegmentedJournal( SegmentedJournal::open_for_write_ret SegmentedJournal::open_for_write() { LOG_PREFIX(Journal::open_for_write); - INFO("device_id={}", journal_segment_manager.get_device_id()); - return journal_segment_manager.open(); + INFO("device_id={}", journal_segment_allocator.get_device_id()); + return journal_segment_allocator.open(); } SegmentedJournal::close_ertr::future<> SegmentedJournal::close() @@ -69,7 +61,7 @@ SegmentedJournal::close_ertr::future<> SegmentedJournal::close() INFO("closing, committed_to={}", record_submitter.get_committed_to()); metrics.clear(); - return journal_segment_manager.close(); + return journal_segment_allocator.close(); } SegmentedJournal::prep_replay_segments_fut @@ -89,15 +81,15 @@ SegmentedJournal::prep_replay_segments( rt.second.journal_segment_seq; }); - journal_segment_manager.set_segment_seq( - segments.rbegin()->second.journal_segment_seq); + journal_segment_allocator.set_next_segment_seq( + segments.rbegin()->second.journal_segment_seq + 1); std::for_each( segments.begin(), segments.end(), [this, FNAME](auto &seg) { if (seg.first != seg.second.physical_segment_id || - seg.first.device_id() != journal_segment_manager.get_device_id() || + seg.first.device_id() != journal_segment_allocator.get_device_id() || seg.second.get_type() != segment_type_t::JOURNAL) { ERROR("illegal journal segment for replay -- {}", seg.second); ceph_abort(); @@ -124,7 +116,7 @@ SegmentedJournal::prep_replay_segments( } else { replay_from = paddr_t::make_seg_paddr( from->first, - journal_segment_manager.get_block_size()); + journal_segment_allocator.get_block_size()); } auto num_segments = segments.end() - from; @@ -138,7 +130,7 @@ SegmentedJournal::prep_replay_segments( p.second.journal_segment_seq, paddr_t::make_seg_paddr( p.first, - journal_segment_manager.get_block_size()) + journal_segment_allocator.get_block_size()) }; return std::make_pair(ret, p.second); }); @@ -254,10 +246,10 @@ SegmentedJournal::find_journal_segments() return crimson::do_for_each( boost::counting_iterator(0), boost::counting_iterator( - journal_segment_manager.get_num_segments()), + journal_segment_allocator.get_num_segments()), [this, &ret](device_segment_id_t d_segment_id) { segment_id_t segment_id{ - journal_segment_manager.get_device_id(), + journal_segment_allocator.get_device_id(), d_segment_id}; return scanner.read_segment_header( segment_id @@ -367,154 +359,6 @@ void SegmentedJournal::register_metrics() ); } -SegmentedJournal::JournalSegmentManager::JournalSegmentManager( - SegmentManager& segment_manager, - SegmentProvider& segment_provider) - : segment_provider{segment_provider}, segment_manager{segment_manager} -{ - reset(); -} - -SegmentedJournal::JournalSegmentManager::open_ret -SegmentedJournal::JournalSegmentManager::open() -{ - return roll().safe_then([this] { - return get_current_write_seq(); - }); -} - -SegmentedJournal::JournalSegmentManager::close_ertr::future<> -SegmentedJournal::JournalSegmentManager::close() -{ - LOG_PREFIX(JournalSegmentManager::close); - if (current_journal_segment) { - INFO("segment_id={}, seq={}, written_to={}, nonce={}", - current_journal_segment->get_segment_id(), - get_segment_seq(), - written_to, - current_segment_nonce); - } else { - INFO("no current journal segment"); - } - - return ( - current_journal_segment ? - current_journal_segment->close() : - Segment::close_ertr::now() - ).handle_error( - close_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in JournalSegmentManager::close()" - } - ).finally([this] { - reset(); - }); -} - -SegmentedJournal::JournalSegmentManager::roll_ertr::future<> -SegmentedJournal::JournalSegmentManager::roll() -{ - LOG_PREFIX(JournalSegmentManager::roll); - auto old_segment_id = current_journal_segment ? - current_journal_segment->get_segment_id() : - NULL_SEG_ID; - if (current_journal_segment) { - INFO("closing segment {}, seq={}, written_to={}, nonce={}", - old_segment_id, - get_segment_seq(), - written_to, - current_segment_nonce); - } - - return ( - current_journal_segment ? - current_journal_segment->close() : - Segment::close_ertr::now() - ).safe_then([this] { - auto new_segment_id = segment_provider->get_segment( - get_device_id(), next_journal_segment_seq); - return segment_manager.open(new_segment_id); - }).safe_then([this](auto sref) { - current_journal_segment = sref; - return initialize_segment(*current_journal_segment); - }).safe_then([this, old_segment_id] { - if (old_segment_id != NULL_SEG_ID) { - segment_provider.close_segment(old_segment_id); - } - }).handle_error( - roll_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in JournalSegmentManager::roll" - } - ); -} - -SegmentedJournal::JournalSegmentManager::write_ret -SegmentedJournal::JournalSegmentManager::write(ceph::bufferlist to_write) -{ - LOG_PREFIX(JournalSegmentManager::write); - auto write_length = to_write.length(); - auto write_start_seq = get_current_write_seq(); - TRACE("{}~{}", write_start_seq, write_length); - assert(write_length > 0); - assert((write_length % segment_manager.get_block_size()) == 0); - assert(!needs_roll(write_length)); - - auto write_start_offset = written_to; - written_to += write_length; - auto write_result = write_result_t{ - write_start_seq, - static_cast(write_length) - }; - return current_journal_segment->write( - write_start_offset, to_write - ).handle_error( - write_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in JournalSegmentManager::write" - } - ).safe_then([write_result] { - return write_result; - }); -} - -SegmentedJournal::JournalSegmentManager::initialize_segment_ertr::future<> -SegmentedJournal::JournalSegmentManager::initialize_segment(Segment& segment) -{ - LOG_PREFIX(JournalSegmentManager::initialize_segment); - auto new_tail = segment_provider.get_journal_tail_target(); - // write out header - ceph_assert(segment.get_write_ptr() == 0); - bufferlist bl; - - segment_seq_t seq = next_journal_segment_seq++; - current_segment_nonce = generate_nonce( - seq, segment_manager.get_meta()); - auto header = segment_header_t{ - seq, - segment.get_segment_id(), - new_tail, - current_segment_nonce}; - INFO("writing {} ...", header); - ceph_assert(header.get_type() == segment_type_t::JOURNAL); - encode(header, bl); - - bufferptr bp( - ceph::buffer::create_page_aligned( - segment_manager.get_block_size())); - bp.zero(); - auto iter = bl.cbegin(); - iter.copy(bl.length(), bp.c_str()); - bl.clear(); - bl.append(bp); - - written_to = 0; - return write(bl - ).safe_then([this, new_tail](auto) { - segment_provider.update_journal_tail_committed(new_tail); - }); -} - SegmentedJournal::RecordBatch::add_pending_ret SegmentedJournal::RecordBatch::add_pending( record_t&& record, @@ -630,10 +474,10 @@ SegmentedJournal::RecordSubmitter::RecordSubmitter( std::size_t batch_capacity, std::size_t batch_flush_size, double preferred_fullness, - JournalSegmentManager& jsm) + SegmentAllocator& jsa) : io_depth_limit{io_depth}, preferred_fullness{preferred_fullness}, - journal_segment_manager{jsm}, + journal_segment_allocator{jsa}, batches(new RecordBatch[io_depth + 1]) { LOG_PREFIX(RecordSubmitter); @@ -664,9 +508,9 @@ SegmentedJournal::RecordSubmitter::submit( assert(write_pipeline); auto expected_size = record_group_size_t( record.size, - journal_segment_manager.get_block_size() + journal_segment_allocator.get_block_size() ).get_encoded_length(); - auto max_record_length = journal_segment_manager.get_max_write_length(); + auto max_record_length = journal_segment_allocator.get_max_write_length(); if (expected_size > max_record_length) { ERROR("H{} {} exceeds max record size {}", (void*)&handle, record, max_record_length); @@ -755,11 +599,11 @@ void SegmentedJournal::RecordSubmitter::flush_current_batch() increment_io(); auto num = p_batch->get_num_records(); auto [to_write, sizes] = p_batch->encode_batch( - journal_committed_to, journal_segment_manager.get_nonce()); + journal_committed_to, journal_segment_allocator.get_nonce()); DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...", num, sizes, journal_committed_to, num_outstanding_io); account_submission(num, sizes); - std::ignore = journal_segment_manager.write(to_write + std::ignore = journal_segment_allocator.write(to_write ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) { TRACE("{} records, {}, write done with {}", num, sizes, write_result); finish_submit_batch(p_batch, write_result); @@ -791,13 +635,13 @@ SegmentedJournal::RecordSubmitter::submit_pending( increment_io(); auto [to_write, sizes] = p_current_batch->submit_pending_fast( std::move(record), - journal_segment_manager.get_block_size(), + journal_segment_allocator.get_block_size(), journal_committed_to, - journal_segment_manager.get_nonce()); + journal_segment_allocator.get_nonce()); DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...", (void*)&handle, sizes, journal_committed_to, num_outstanding_io); account_submission(1, sizes); - return journal_segment_manager.write(to_write + return journal_segment_allocator.write(to_write ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) { return record_locator_t{ write_result.start_seq.offset.add_offset(mdlength), @@ -811,7 +655,7 @@ SegmentedJournal::RecordSubmitter::submit_pending( auto write_fut = p_current_batch->add_pending( std::move(record), handle, - journal_segment_manager.get_block_size()); + journal_segment_allocator.get_block_size()); if (do_flush) { DEBUG("H{} added pending and flush", (void*)&handle); flush_current_batch(); @@ -851,15 +695,15 @@ SegmentedJournal::RecordSubmitter::do_submit( // can increment io depth assert(!wait_submit_promise.has_value()); auto maybe_new_size = p_current_batch->can_batch( - record, journal_segment_manager.get_block_size()); + record, journal_segment_allocator.get_block_size()); if (!maybe_new_size.has_value() || (maybe_new_size->get_encoded_length() > - journal_segment_manager.get_max_write_length())) { + journal_segment_allocator.get_max_write_length())) { TRACE("H{} flush", (void*)&handle); assert(p_current_batch->is_pending()); flush_current_batch(); return do_submit(std::move(record), handle); - } else if (journal_segment_manager.needs_roll( + } else if (journal_segment_allocator.needs_roll( maybe_new_size->get_encoded_length())) { if (p_current_batch->is_pending()) { TRACE("H{} flush and roll", (void*)&handle); @@ -867,7 +711,7 @@ SegmentedJournal::RecordSubmitter::do_submit( } else { TRACE("H{} roll", (void*)&handle); } - return journal_segment_manager.roll( + return journal_segment_allocator.roll( ).safe_then([this, record=std::move(record), &handle]() mutable { return do_submit(std::move(record), handle); }); @@ -881,11 +725,11 @@ SegmentedJournal::RecordSubmitter::do_submit( assert(state == state_t::FULL); // cannot increment io depth auto maybe_new_size = p_current_batch->can_batch( - record, journal_segment_manager.get_block_size()); + record, journal_segment_allocator.get_block_size()); if (!maybe_new_size.has_value() || (maybe_new_size->get_encoded_length() > - journal_segment_manager.get_max_write_length()) || - journal_segment_manager.needs_roll( + journal_segment_allocator.get_max_write_length()) || + journal_segment_allocator.needs_roll( maybe_new_size->get_encoded_length())) { if (!wait_submit_promise.has_value()) { wait_submit_promise = seastar::promise<>(); diff --git a/src/crimson/os/seastore/journal/segmented_journal.h b/src/crimson/os/seastore/journal/segmented_journal.h index 23e8dbb04719b..17e4056e71569 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.h +++ b/src/crimson/os/seastore/journal/segmented_journal.h @@ -18,10 +18,10 @@ #include "crimson/os/seastore/segment_cleaner.h" #include "crimson/os/seastore/journal.h" #include "crimson/os/seastore/extent_reader.h" -#include "crimson/os/seastore/segment_manager.h" #include "crimson/os/seastore/ordering_handle.h" #include "crimson/os/seastore/seastore_types.h" #include "crimson/osd/exceptions.h" +#include "segment_allocator.h" namespace crimson::os::seastore::journal { @@ -58,96 +58,6 @@ public: } private: - class JournalSegmentManager { - public: - JournalSegmentManager(SegmentManager&, SegmentProvider&); - - using base_ertr = crimson::errorator< - crimson::ct_error::input_output_error>; - extent_len_t get_max_write_length() const { - return segment_manager.get_segment_size() - - p2align(ceph::encoded_sizeof_bounded(), - size_t(segment_manager.get_block_size())); - } - - device_id_t get_device_id() const { - return segment_manager.get_device_id(); - } - - device_segment_id_t get_num_segments() const { - return segment_manager.get_num_segments(); - } - - seastore_off_t get_block_size() const { - return segment_manager.get_block_size(); - } - - segment_nonce_t get_nonce() const { - return current_segment_nonce; - } - - segment_seq_t get_segment_seq() const { - return next_journal_segment_seq - 1; - } - - void set_segment_seq(segment_seq_t current_seq) { - next_journal_segment_seq = (current_seq + 1); - } - - using open_ertr = base_ertr; - using open_ret = open_ertr::future; - open_ret open(); - - using close_ertr = base_ertr; - close_ertr::future<> close(); - - // returns true iff the current segment has insufficient space - bool needs_roll(std::size_t length) const { - auto write_capacity = current_journal_segment->get_write_capacity(); - return length + written_to > std::size_t(write_capacity); - } - - // close the current segment and initialize next one - using roll_ertr = base_ertr; - roll_ertr::future<> roll(); - - // write the buffer, return the write result - // May be called concurrently, writes may complete in any order. - using write_ertr = base_ertr; - using write_ret = write_ertr::future; - write_ret write(ceph::bufferlist to_write); - - private: - journal_seq_t get_current_write_seq() const { - assert(current_journal_segment); - return journal_seq_t{ - get_segment_seq(), - paddr_t::make_seg_paddr(current_journal_segment->get_segment_id(), - written_to) - }; - } - - void reset() { - next_journal_segment_seq = 0; - current_segment_nonce = 0; - current_journal_segment.reset(); - written_to = 0; - } - - // prepare segment for writes, writes out segment header - using initialize_segment_ertr = base_ertr; - initialize_segment_ertr::future<> initialize_segment(Segment&); - - SegmentProvider& segment_provider; - SegmentManager& segment_manager; - - segment_seq_t next_journal_segment_seq; - segment_nonce_t current_segment_nonce; - - SegmentRef current_journal_segment; - seastore_off_t written_to; - }; - class RecordBatch { enum class state_t { EMPTY = 0, @@ -212,7 +122,7 @@ private: // // Set write_result_t::write_length to 0 if the record is not the first one // in the batch. - using add_pending_ertr = JournalSegmentManager::write_ertr; + using add_pending_ertr = SegmentAllocator::write_ertr; using add_pending_ret = add_pending_ertr::future; add_pending_ret add_pending( record_t&&, @@ -290,7 +200,7 @@ private: std::size_t batch_capacity, std::size_t batch_flush_size, double preferred_fullness, - JournalSegmentManager&); + SegmentAllocator&); grouped_io_stats get_record_batch_stats() const { return stats.record_batch_stats; @@ -355,7 +265,7 @@ private: void flush_current_batch(); - using submit_pending_ertr = JournalSegmentManager::write_ertr; + using submit_pending_ertr = SegmentAllocator::write_ertr; using submit_pending_ret = submit_pending_ertr::future< record_locator_t>; submit_pending_ret submit_pending( @@ -371,7 +281,7 @@ private: double preferred_fullness; WritePipeline* write_pipeline = nullptr; - JournalSegmentManager& journal_segment_manager; + SegmentAllocator& journal_segment_allocator; // committed_to may be in a previous journal segment journal_seq_t journal_committed_to = JOURNAL_SEQ_NULL; @@ -392,7 +302,7 @@ private: }; SegmentProvider& segment_provider; - JournalSegmentManager journal_segment_manager; + SegmentAllocator journal_segment_allocator; RecordSubmitter record_submitter; ExtentReader& scanner; seastar::metrics::metric_group metrics;