From 5586d07be7421a71629b0696d264f3550f87a23a Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 12 Jan 2022 13:42:11 +0800 Subject: [PATCH] crimson/os/seastore: consolidate seastore_journal logs with structured level and format Signed-off-by: Yingxin Cheng --- src/crimson/os/seastore/extent_reader.cc | 61 ++++--- src/crimson/os/seastore/journal.cc | 211 +++++++++++++++------- src/crimson/os/seastore/journal.h | 39 +--- src/crimson/os/seastore/seastore_types.cc | 4 - 4 files changed, 195 insertions(+), 120 deletions(-) diff --git a/src/crimson/os/seastore/extent_reader.cc b/src/crimson/os/seastore/extent_reader.cc index 9ae0fcebf989..8b66ffffd316 100644 --- a/src/crimson/os/seastore/extent_reader.cc +++ b/src/crimson/os/seastore/extent_reader.cc @@ -110,11 +110,13 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( size_t budget, found_record_handler_t &handler) { + LOG_PREFIX(ExtentReader::scan_valid_records); auto& segment_manager = *segment_managers[cursor.get_segment_id().device_id()]; if (cursor.get_segment_offset() == 0) { cursor.increment_seq(segment_manager.get_block_size()); } + DEBUG("starting at {}, budget={}", cursor, budget); auto retref = std::make_unique(0); auto &budget_used = *retref; return crimson::repeat( @@ -122,26 +124,32 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( -> scan_valid_records_ertr::future { return [=, &handler, &cursor, &budget_used] { if (!cursor.last_valid_header_found) { - LOG_PREFIX(ExtentReader::scan_valid_records); return read_validate_record_metadata(cursor.seq.offset, nonce ).safe_then([=, &cursor](auto md) { - DEBUG("read complete {}", cursor.seq); if (!md) { - DEBUG("found invalid header at {}, presumably at end", cursor.seq); cursor.last_valid_header_found = true; + if (cursor.is_complete()) { + INFO("complete at {}, invalid record group metadata", + cursor); + } else { + DEBUG("found invalid record group metadata at {}, " + "processing {} pending record groups", + cursor.seq, + cursor.pending_record_groups.size()); + } return scan_valid_records_ertr::now(); } else { auto& [header, md_bl] = *md; - auto new_committed_to = header.committed_to; - DEBUG("valid record read at {}, now committed at {}", - cursor.seq, new_committed_to); + DEBUG("found valid {} at {}", header, cursor.seq); cursor.emplace_record_group(header, std::move(md_bl)); return scan_valid_records_ertr::now(); } }).safe_then([=, &cursor, &budget_used, &handler] { + DEBUG("processing committed record groups until {}, {} pending", + cursor.last_committed, + cursor.pending_record_groups.size()); return crimson::repeat( [=, &budget_used, &cursor, &handler] { - DEBUG("valid record read, processing queue"); if (cursor.pending_record_groups.empty()) { /* This is only possible if the segment is empty. * A record's last_commited must be prior to its own @@ -169,8 +177,10 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( assert(!cursor.pending_record_groups.empty()); auto &next = cursor.pending_record_groups.front(); return read_validate_data(next.offset, next.header - ).safe_then([this, &budget_used, &cursor, &handler](auto valid) { + ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) { if (!valid) { + INFO("complete at {}, invalid record group data at {}, {}", + cursor, next.offset, next.header); cursor.pending_record_groups.clear(); return scan_valid_records_ertr::now(); } @@ -179,6 +189,8 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( } }().safe_then([=, &budget_used, &cursor] { if (cursor.is_complete() || budget_used >= budget) { + DEBUG("finish at {}, budget_used={}, budget={}", + cursor, budget_used, budget); return seastar::stop_iteration::yes; } else { return seastar::stop_iteration::no; @@ -200,14 +212,14 @@ ExtentReader::read_validate_record_metadata( auto& seg_addr = start.as_seg_paddr(); auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()]; auto block_size = segment_manager.get_block_size(); - if (seg_addr.get_segment_off() + block_size > - (int64_t)segment_manager.get_segment_size()) { - DEBUG("failed, reach segment end"); + auto segment_size = static_cast(segment_manager.get_segment_size()); + if (seg_addr.get_segment_off() + block_size > segment_size) { + DEBUG("failed -- record group header block {}~4096 > segment_size {}", start, segment_size); return read_validate_record_metadata_ret( read_validate_record_metadata_ertr::ready_future_marker{}, std::nullopt); } - DEBUG("reading header block {}...", start); + TRACE("reading record group header block {}~4096", start); return segment_manager.read(start, block_size ).safe_then([=, &segment_manager](bufferptr bptr) mutable -> read_validate_record_metadata_ret { @@ -228,9 +240,8 @@ ExtentReader::read_validate_record_metadata( header.dlength % block_size != 0 || (header.committed_to != journal_seq_t() && header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) || - (seg_addr.get_segment_off() + header.mdlength + header.dlength > - (int64_t)segment_manager.get_segment_size())) { - ERROR("failed, invalid header"); + (seg_addr.get_segment_off() + header.mdlength + header.dlength > segment_size)) { + ERROR("failed, invalid record group header {}", start); return crimson::ct_error::input_output_error::make(); } if (header.mdlength == block_size) { @@ -239,12 +250,14 @@ ExtentReader::read_validate_record_metadata( std::make_pair(std::move(header), std::move(bl)) ); } - return segment_manager.read( - paddr_t::make_seg_paddr( + + auto rest_start = paddr_t::make_seg_paddr( seg_addr.get_segment_id(), seg_addr.get_segment_off() + (segment_off_t)block_size - ), - header.mdlength - block_size + ); + auto rest_len = header.mdlength - block_size; + TRACE("reading record group header rest {}~{}", rest_start, rest_len); + return segment_manager.read(rest_start, rest_len ).safe_then([header=std::move(header), bl=std::move(bl) ](auto&& bptail) mutable { bl.push_back(bptail); @@ -274,7 +287,7 @@ ExtentReader::read_validate_data( LOG_PREFIX(ExtentReader::read_validate_data); auto& segment_manager = *segment_managers[record_base.get_device_id()]; auto data_addr = record_base.add_offset(header.mdlength); - DEBUG("reading data blocks {}+{}...", data_addr, header.dlength); + TRACE("reading record group data blocks {}~{}", data_addr, header.dlength); return segment_manager.read( data_addr, header.dlength @@ -291,6 +304,7 @@ ExtentReader::consume_next_records( found_record_handler_t& handler, std::size_t& budget_used) { + LOG_PREFIX(ExtentReader::consume_next_records); auto& next = cursor.pending_record_groups.front(); auto total_length = next.header.dlength + next.header.mdlength; budget_used += total_length; @@ -304,12 +318,17 @@ ExtentReader::consume_next_records( static_cast(total_length) } }; + DEBUG("processing {} at {}, budget_used={}", + next.header, locator, budget_used); return handler( locator, next.header, next.mdbuffer - ).safe_then([&cursor] { + ).safe_then([FNAME, &cursor] { cursor.pop_record_group(); + if (cursor.is_complete()) { + INFO("complete at {}, no more record group", cursor); + } }); } diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index f24adb689047..ebcc366c3e95 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -13,6 +13,15 @@ #include "crimson/os/seastore/segment_cleaner.h" SET_SUBSYS(seastore_journal); +/* + * format: + * - H information + * + * levels: + * - INFO: major initiation, closing, rolling and replay operations + * - DEBUG: INFO details, major submit operations + * - TRACE: DEBUG details + */ namespace crimson::os::seastore { @@ -44,13 +53,28 @@ Journal::Journal( register_metrics(); } +Journal::open_for_write_ret Journal::open_for_write() +{ + LOG_PREFIX(Journal::open_for_write); + INFO("device_id={}", journal_segment_manager.get_device_id()); + return journal_segment_manager.open(); +} + +Journal::close_ertr::future<> Journal::close() +{ + LOG_PREFIX(Journal::close); + INFO("closing"); + metrics.clear(); + return journal_segment_manager.close(); +} + Journal::prep_replay_segments_fut Journal::prep_replay_segments( std::vector> segments) { LOG_PREFIX(Journal::prep_replay_segments); - DEBUG("{} segments", segments.size()); if (segments.empty()) { + ERROR("no journal segments for replay"); return crimson::ct_error::input_output_error::make(); } std::sort( @@ -83,7 +107,6 @@ Journal::prep_replay_segments( auto journal_tail = segments.rbegin()->second.journal_tail; segment_provider->update_journal_tail_committed(journal_tail); auto replay_from = journal_tail.offset; - DEBUG("journal_tail={}", journal_tail); auto from = segments.begin(); if (replay_from != P_ADDR_NULL) { from = std::find_if( @@ -103,17 +126,20 @@ Journal::prep_replay_segments( from->first, journal_segment_manager.get_block_size()); } - auto ret = replay_segments_t(segments.end() - from); + + auto num_segments = segments.end() - from; + INFO("{} segments to replay, from {}", + num_segments, replay_from); + auto ret = replay_segments_t(num_segments); std::transform( from, segments.end(), ret.begin(), - [this, FNAME](const auto &p) { + [this](const auto &p) { auto ret = journal_seq_t{ p.second.journal_segment_seq, paddr_t::make_seg_paddr( p.first, journal_segment_manager.get_block_size()) }; - DEBUG("replaying from {}", ret); return std::make_pair(ret, p.second); }); ret[0].first.offset = replay_from; @@ -129,7 +155,7 @@ Journal::replay_segment( delta_handler_t &handler) { LOG_PREFIX(Journal::replay_segment); - DEBUG("starting at {}", seq); + INFO("starting at {} -- {}", seq, header); return seastar::do_with( scan_valid_records_cursor(seq), ExtentReader::found_record_handler_t([=, &handler]( @@ -138,12 +164,12 @@ Journal::replay_segment( const bufferlist& mdbuf) -> ExtentReader::scan_valid_records_ertr::future<> { - DEBUG("decoding {} records", header.records); auto maybe_record_deltas_list = try_decode_deltas( header, mdbuf, locator.record_block_base); if (!maybe_record_deltas_list) { // This should be impossible, we did check the crc on the mdbuf - ERROR("unable to decode deltas for record {}", locator.record_block_base); + ERROR("unable to decode deltas for record {} at {}", + header, locator); return crimson::ct_error::input_output_error::make(); } @@ -161,13 +187,13 @@ Journal::replay_segment( FNAME, &handler](record_deltas_t& record_deltas) { - DEBUG("decoded {} deltas at block_base {}", - record_deltas.deltas.size(), - record_deltas.record_block_base); auto locator = record_locator_t{ record_deltas.record_block_base, write_result }; + DEBUG("processing {} deltas at block_base {}", + record_deltas.deltas.size(), + locator); return crimson::do_for_each( record_deltas.deltas, [locator, @@ -216,6 +242,8 @@ Journal::replay_ret Journal::replay( std::vector>&& segment_headers, delta_handler_t &&delta_handler) { + LOG_PREFIX(Journal::replay); + INFO("got {} segments", segment_headers.size()); return seastar::do_with( std::move(delta_handler), replay_segments_t(), [this, segment_headers=std::move(segment_headers)] @@ -223,8 +251,6 @@ Journal::replay_ret Journal::replay( { return prep_replay_segments(std::move(segment_headers) ).safe_then([this, &handler, &segments](auto replay_segs) mutable { - LOG_PREFIX(Journal::replay); - DEBUG("found {} segments", replay_segs.size()); segments = std::move(replay_segs); return crimson::do_for_each(segments, [this, &handler](auto i) mutable { return replay_segment(i.first, i.second, handler); @@ -235,6 +261,8 @@ Journal::replay_ret Journal::replay( void Journal::register_metrics() { + LOG_PREFIX(Journal::register_metrics); + DEBUG(""); record_submitter.reset_stats(); namespace sm = seastar::metrics; metrics.add_group( @@ -300,9 +328,30 @@ Journal::JournalSegmentManager::JournalSegmentManager( reset(); } +Journal::JournalSegmentManager::open_ret +Journal::JournalSegmentManager::open() +{ + return roll().safe_then([this] { + return get_current_write_seq(); + }); +} + Journal::JournalSegmentManager::close_ertr::future<> Journal::JournalSegmentManager::close() { + LOG_PREFIX(JournalSegmentManager::close); + if (current_journal_segment) { + INFO("segment_id={}, seq={}, " + "written_to={}, committed_to={}, nonce={}", + current_journal_segment->get_segment_id(), + get_segment_seq(), + written_to, + committed_to, + current_segment_nonce); + } else { + INFO("no current journal segment"); + } + return ( current_journal_segment ? current_journal_segment->close() : @@ -320,16 +369,26 @@ Journal::JournalSegmentManager::close() Journal::JournalSegmentManager::roll_ertr::future<> Journal::JournalSegmentManager::roll() { + LOG_PREFIX(JournalSegmentManager::roll); auto old_segment_id = current_journal_segment ? current_journal_segment->get_segment_id() : NULL_SEG_ID; + if (current_journal_segment) { + INFO("closing segment {}, seq={}, " + "written_to={}, committed_to={}, nonce={}", + old_segment_id, + get_segment_seq(), + written_to, + committed_to, + current_segment_nonce); + } return ( current_journal_segment ? current_journal_segment->close() : Segment::close_ertr::now() ).safe_then([this] { - return segment_provider->get_segment(segment_manager.get_device_id()); + return segment_provider->get_segment(get_device_id()); }).safe_then([this](auto segment) { return segment_manager.open(segment); }).safe_then([this](auto sref) { @@ -356,10 +415,7 @@ Journal::JournalSegmentManager::write(ceph::bufferlist to_write) LOG_PREFIX(JournalSegmentManager::write); auto write_length = to_write.length(); auto write_start_seq = get_current_write_seq(); - DEBUG("write_start {} => {}, length={}", - write_start_seq, - write_start_seq.offset.as_seg_paddr().get_segment_off() + write_length, - write_length); + TRACE("{}~{}", write_start_seq, write_length); assert(write_length > 0); assert((write_length % segment_manager.get_block_size()) == 0); assert(!needs_roll(write_length)); @@ -386,8 +442,7 @@ void Journal::JournalSegmentManager::mark_committed( const journal_seq_t& new_committed_to) { LOG_PREFIX(JournalSegmentManager::mark_committed); - DEBUG("committed_to {} => {}", - committed_to, new_committed_to); + TRACE("{} => {}", committed_to, new_committed_to); assert(committed_to == journal_seq_t() || committed_to <= new_committed_to); committed_to = new_committed_to; @@ -411,10 +466,7 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment) new_tail, current_segment_nonce, false}; - DEBUG("segment_id {} journal_tail_target {}, header {}", - segment.get_segment_id(), - new_tail, - header); + INFO("writing {} ...", header); encode(header, bl); bufferptr bp( @@ -436,17 +488,20 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment) Journal::RecordBatch::add_pending_ret Journal::RecordBatch::add_pending( record_t&& record, + OrderingHandle& handle, extent_len_t block_size) { LOG_PREFIX(RecordBatch::add_pending); auto new_size = get_encoded_length_after(record, block_size); - DEBUG("batches={}, write_size={}", + auto dlength_offset = pending.size.dlength; + TRACE("H{} batches={}, write_size={}, dlength_offset={} ...", + (void*)&handle, pending.get_size() + 1, - new_size.get_encoded_length()); + new_size.get_encoded_length(), + dlength_offset); assert(state != state_t::SUBMITTING); assert(can_batch(record, block_size).value() == new_size); - auto dlength_offset = pending.size.dlength; pending.push_back( std::move(record), block_size); assert(pending.size == new_size); @@ -459,9 +514,10 @@ Journal::RecordBatch::add_pending( state = state_t::PENDING; return io_promise->get_shared_future( - ).then([dlength_offset + ).then([dlength_offset, FNAME, &handle ](auto maybe_promise_result) -> add_pending_ret { if (!maybe_promise_result.has_value()) { + ERROR("H{} write failed", (void*)&handle); return crimson::ct_error::input_output_error::make(); } auto write_result = maybe_promise_result->write_result; @@ -470,6 +526,7 @@ Journal::RecordBatch::add_pending( maybe_promise_result->mdlength + dlength_offset), write_result }; + TRACE("H{} write finish with {}", (void*)&handle, submit_result); return add_pending_ret( add_pending_ertr::ready_future_marker{}, submit_result); @@ -481,10 +538,6 @@ Journal::RecordBatch::encode_batch( const journal_seq_t& committed_to, segment_nonce_t segment_nonce) { - LOG_PREFIX(RecordBatch::encode_batch); - DEBUG("batches={}, committed_to={}", - pending.get_size(), - committed_to); assert(state == state_t::PENDING); assert(pending.get_size() > 0); assert(io_promise.has_value()); @@ -503,20 +556,13 @@ Journal::RecordBatch::encode_batch( void Journal::RecordBatch::set_result( maybe_result_t maybe_write_result) { - LOG_PREFIX(RecordBatch::set_result); maybe_promise_result_t result; if (maybe_write_result.has_value()) { - DEBUG("batches={}, write_start {} + {}", - submitting_size, - maybe_write_result->start_seq, - maybe_write_result->length); assert(maybe_write_result->length == submitting_length); result = promise_result_t{ *maybe_write_result, submitting_mdlength }; - } else { - ERROR("batches={}, write is failed!", submitting_size); } assert(state == state_t::SUBMITTING); assert(io_promise.has_value()); @@ -536,9 +582,8 @@ Journal::RecordBatch::submit_pending_fast( const journal_seq_t& committed_to, segment_nonce_t segment_nonce) { - LOG_PREFIX(RecordBatch::submit_pending_fast); auto new_size = get_encoded_length_after(record, block_size); - DEBUG("write_size={}", new_size.get_encoded_length()); + std::ignore = new_size; assert(state == state_t::EMPTY); assert(can_batch(record, block_size).value() == new_size); @@ -585,6 +630,7 @@ Journal::RecordSubmitter::submit( OrderingHandle& handle) { LOG_PREFIX(RecordSubmitter::submit); + DEBUG("H{} {} start ...", (void*)&handle, record); assert(write_pipeline); auto expected_size = record_group_size_t( record.size, @@ -592,10 +638,8 @@ Journal::RecordSubmitter::submit( ).get_encoded_length(); auto max_record_length = journal_segment_manager.get_max_write_length(); if (expected_size > max_record_length) { - ERROR("record size {} exceeds max {}", - expected_size, - max_record_length - ); + ERROR("H{} {} exceeds max record size {}", + (void*)&handle, record, max_record_length); return crimson::ct_error::erange::make(); } @@ -615,18 +659,33 @@ void Journal::RecordSubmitter::update_state() } } +void Journal::RecordSubmitter::decrement_io_with_flush() +{ + LOG_PREFIX(RecordSubmitter::decrement_io_with_flush); + assert(num_outstanding_io > 0); + --num_outstanding_io; +#ifndef NDEBUG + auto prv_state = state; +#endif + update_state(); + + if (wait_submit_promise.has_value()) { + DEBUG("wait resolved"); + assert(prv_state == state_t::FULL); + wait_submit_promise->set_value(); + wait_submit_promise.reset(); + } + + if (!p_current_batch->is_empty()) { + TRACE("flush"); + flush_current_batch(); + } +} + void Journal::RecordSubmitter::account_submission( std::size_t num, const record_group_size_t& size) { - LOG_PREFIX(RecordSubmitter::account_submission); - DEBUG("Journal::RecordSubmitter: submitting {} records, " - "mdsize={}, dsize={}, fillness={}", - num, - size.get_raw_mdlength(), - size.dlength, - ((double)(size.get_raw_mdlength() + size.dlength) / - (size.get_mdlength() + size.dlength))); stats.record_group_padding_bytes += (size.get_mdlength() - size.get_raw_mdlength()); stats.record_group_metadata_bytes += size.get_raw_mdlength(); @@ -653,20 +712,23 @@ void Journal::RecordSubmitter::flush_current_batch() increment_io(); auto num = p_batch->get_num_records(); + auto committed_to = journal_segment_manager.get_committed_to(); auto [to_write, sizes] = p_batch->encode_batch( - journal_segment_manager.get_committed_to(), - journal_segment_manager.get_nonce()); + committed_to, journal_segment_manager.get_nonce()); + DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...", + num, sizes, committed_to, num_outstanding_io); account_submission(num, sizes); std::ignore = journal_segment_manager.write(to_write - ).safe_then([this, p_batch](auto write_result) { + ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) { + TRACE("{} records, {}, write done with {}", num, sizes, write_result); finish_submit_batch(p_batch, write_result); }).handle_error( - crimson::ct_error::all_same_way([this, p_batch, FNAME](auto e) { - ERROR("got error {}", e); + crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) { + ERROR("{} records, {}, got error {}", num, sizes, e); finish_submit_batch(p_batch, std::nullopt); }) - ).handle_exception([this, p_batch, FNAME](auto e) { - ERROR("got exception {}", e); + ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) { + ERROR("{} records, {}, got exception {}", num, sizes, e); finish_submit_batch(p_batch, std::nullopt); }); } @@ -677,19 +739,23 @@ Journal::RecordSubmitter::submit_pending( OrderingHandle& handle, bool flush) { + LOG_PREFIX(RecordSubmitter::submit_pending); assert(!p_current_batch->is_submitting()); stats.record_batch_stats.increment( p_current_batch->get_num_records() + 1); bool do_flush = (flush || state == state_t::IDLE); - auto write_fut = [this, do_flush, record=std::move(record)]() mutable { + auto write_fut = [this, do_flush, FNAME, record=std::move(record), &handle]() mutable { if (do_flush && p_current_batch->is_empty()) { // fast path with direct write increment_io(); + auto committed_to = journal_segment_manager.get_committed_to(); auto [to_write, sizes] = p_current_batch->submit_pending_fast( std::move(record), journal_segment_manager.get_block_size(), - journal_segment_manager.get_committed_to(), + committed_to, journal_segment_manager.get_nonce()); + DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...", + (void*)&handle, sizes, committed_to, num_outstanding_io); account_submission(1, sizes); return journal_segment_manager.write(to_write ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) { @@ -703,9 +769,15 @@ Journal::RecordSubmitter::submit_pending( } else { // indirect write with or without the existing pending records auto write_fut = p_current_batch->add_pending( - std::move(record), journal_segment_manager.get_block_size()); + std::move(record), + handle, + journal_segment_manager.get_block_size()); if (do_flush) { + DEBUG("H{} added pending and flush", (void*)&handle); flush_current_batch(); + } else { + DEBUG("H{} added with {} pending", + (void*)&handle, p_current_batch->get_num_records()); } return write_fut; } @@ -713,9 +785,10 @@ Journal::RecordSubmitter::submit_pending( return handle.enter(write_pipeline->device_submission ).then([write_fut=std::move(write_fut)]() mutable { return std::move(write_fut); - }).safe_then([this, &handle](auto submit_result) { + }).safe_then([this, FNAME, &handle](auto submit_result) { return handle.enter(write_pipeline->finalize - ).then([this, submit_result] { + ).then([this, FNAME, submit_result, &handle] { + DEBUG("H{} finish with {}", (void*)&handle, submit_result); journal_segment_manager.mark_committed( submit_result.write_result.get_end_seq()); return submit_result; @@ -728,6 +801,9 @@ Journal::RecordSubmitter::do_submit( record_t&& record, OrderingHandle& handle) { + LOG_PREFIX(RecordSubmitter::do_submit); + TRACE("H{} outstanding_io={}/{} ...", + (void*)&handle, num_outstanding_io, io_depth_limit); assert(!p_current_batch->is_submitting()); if (state <= state_t::PENDING) { // can increment io depth @@ -737,13 +813,17 @@ Journal::RecordSubmitter::do_submit( if (!maybe_new_size.has_value() || (maybe_new_size->get_encoded_length() > journal_segment_manager.get_max_write_length())) { + TRACE("H{} flush", (void*)&handle); assert(p_current_batch->is_pending()); flush_current_batch(); return do_submit(std::move(record), handle); } else if (journal_segment_manager.needs_roll( maybe_new_size->get_encoded_length())) { if (p_current_batch->is_pending()) { + TRACE("H{} flush and roll", (void*)&handle); flush_current_batch(); + } else { + TRACE("H{} roll", (void*)&handle); } return journal_segment_manager.roll( ).safe_then([this, record=std::move(record), &handle]() mutable { @@ -768,6 +848,7 @@ Journal::RecordSubmitter::do_submit( if (!wait_submit_promise.has_value()) { wait_submit_promise = seastar::promise<>(); } + DEBUG("H{} wait ...", (void*)&handle); return wait_submit_promise->get_future( ).then([this, record=std::move(record), &handle]() mutable { return do_submit(std::move(record), handle); diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index f64472b963ea..c7be7cbab2bb 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -63,9 +63,7 @@ public: crimson::ct_error::input_output_error >; using open_for_write_ret = open_for_write_ertr::future; - open_for_write_ret open_for_write() { - return journal_segment_manager.open(); - } + open_for_write_ret open_for_write(); /** * close journal @@ -74,10 +72,7 @@ public: */ using close_ertr = crimson::errorator< crimson::ct_error::input_output_error>; - close_ertr::future<> close() { - metrics.clear(); - return journal_segment_manager.close(); - } + close_ertr::future<> close(); /** * submit_record @@ -130,6 +125,10 @@ private: size_t(segment_manager.get_block_size())); } + device_id_t get_device_id() const { + return segment_manager.get_device_id(); + } + segment_off_t get_block_size() const { return segment_manager.get_block_size(); } @@ -156,11 +155,7 @@ private: using open_ertr = base_ertr; using open_ret = open_ertr::future; - open_ret open() { - return roll().safe_then([this] { - return get_current_write_seq(); - }); - } + open_ret open(); using close_ertr = base_ertr; close_ertr::future<> close(); @@ -286,6 +281,7 @@ private: using add_pending_ret = add_pending_ertr::future; add_pending_ret add_pending( record_t&&, + OrderingHandle&, extent_len_t block_size); // Encode the batched records for write. @@ -401,24 +397,7 @@ private: update_state(); } - void decrement_io_with_flush() { - assert(num_outstanding_io > 0); - --num_outstanding_io; -#ifndef NDEBUG - auto prv_state = state; -#endif - update_state(); - - if (wait_submit_promise.has_value()) { - assert(prv_state == state_t::FULL); - wait_submit_promise->set_value(); - wait_submit_promise.reset(); - } - - if (!p_current_batch->is_empty()) { - flush_current_batch(); - } - } + void decrement_io_with_flush(); void pop_free_batch() { assert(p_current_batch == nullptr); diff --git a/src/crimson/os/seastore/seastore_types.cc b/src/crimson/os/seastore/seastore_types.cc index 5846fa8b5858..8befb9f1eed4 100644 --- a/src/crimson/os/seastore/seastore_types.cc +++ b/src/crimson/os/seastore/seastore_types.cc @@ -427,8 +427,6 @@ try_decode_extent_infos( { auto maybe_headers = try_decode_record_headers(header, md_bl); if (!maybe_headers) { - journal_logger().debug( - "try_decode_extent_infos: failed, cannot decode record headers."); return std::nullopt; } @@ -468,8 +466,6 @@ try_decode_deltas( { auto maybe_record_extent_infos = try_decode_extent_infos(header, md_bl); if (!maybe_record_extent_infos) { - journal_logger().debug( - "try_decode_deltas: failed, cannot decode extent_infos."); return std::nullopt; } -- 2.47.3