size_t budget,
found_record_handler_t &handler)
{
+ LOG_PREFIX(ExtentReader::scan_valid_records);
auto& segment_manager =
*segment_managers[cursor.get_segment_id().device_id()];
if (cursor.get_segment_offset() == 0) {
cursor.increment_seq(segment_manager.get_block_size());
}
+ DEBUG("starting at {}, budget={}", cursor, budget);
auto retref = std::make_unique<size_t>(0);
auto &budget_used = *retref;
return crimson::repeat(
-> scan_valid_records_ertr::future<seastar::stop_iteration> {
return [=, &handler, &cursor, &budget_used] {
if (!cursor.last_valid_header_found) {
- LOG_PREFIX(ExtentReader::scan_valid_records);
return read_validate_record_metadata(cursor.seq.offset, nonce
).safe_then([=, &cursor](auto md) {
- DEBUG("read complete {}", cursor.seq);
if (!md) {
- DEBUG("found invalid header at {}, presumably at end", cursor.seq);
cursor.last_valid_header_found = true;
+ if (cursor.is_complete()) {
+ INFO("complete at {}, invalid record group metadata",
+ cursor);
+ } else {
+ DEBUG("found invalid record group metadata at {}, "
+ "processing {} pending record groups",
+ cursor.seq,
+ cursor.pending_record_groups.size());
+ }
return scan_valid_records_ertr::now();
} else {
auto& [header, md_bl] = *md;
- auto new_committed_to = header.committed_to;
- DEBUG("valid record read at {}, now committed at {}",
- cursor.seq, new_committed_to);
+ DEBUG("found valid {} at {}", header, cursor.seq);
cursor.emplace_record_group(header, std::move(md_bl));
return scan_valid_records_ertr::now();
}
}).safe_then([=, &cursor, &budget_used, &handler] {
+ DEBUG("processing committed record groups until {}, {} pending",
+ cursor.last_committed,
+ cursor.pending_record_groups.size());
return crimson::repeat(
[=, &budget_used, &cursor, &handler] {
- DEBUG("valid record read, processing queue");
if (cursor.pending_record_groups.empty()) {
/* This is only possible if the segment is empty.
* A record's last_commited must be prior to its own
assert(!cursor.pending_record_groups.empty());
auto &next = cursor.pending_record_groups.front();
return read_validate_data(next.offset, next.header
- ).safe_then([this, &budget_used, &cursor, &handler](auto valid) {
+ ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) {
if (!valid) {
+ INFO("complete at {}, invalid record group data at {}, {}",
+ cursor, next.offset, next.header);
cursor.pending_record_groups.clear();
return scan_valid_records_ertr::now();
}
}
}().safe_then([=, &budget_used, &cursor] {
if (cursor.is_complete() || budget_used >= budget) {
+ DEBUG("finish at {}, budget_used={}, budget={}",
+ cursor, budget_used, budget);
return seastar::stop_iteration::yes;
} else {
return seastar::stop_iteration::no;
auto& seg_addr = start.as_seg_paddr();
auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()];
auto block_size = segment_manager.get_block_size();
- if (seg_addr.get_segment_off() + block_size >
- (int64_t)segment_manager.get_segment_size()) {
- DEBUG("failed, reach segment end");
+ auto segment_size = static_cast<int64_t>(segment_manager.get_segment_size());
+ if (seg_addr.get_segment_off() + block_size > segment_size) {
+ DEBUG("failed -- record group header block {}~4096 > segment_size {}", start, segment_size);
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
- DEBUG("reading header block {}...", start);
+ TRACE("reading record group header block {}~4096", start);
return segment_manager.read(start, block_size
).safe_then([=, &segment_manager](bufferptr bptr) mutable
-> read_validate_record_metadata_ret {
header.dlength % block_size != 0 ||
(header.committed_to != journal_seq_t() &&
header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) ||
- (seg_addr.get_segment_off() + header.mdlength + header.dlength >
- (int64_t)segment_manager.get_segment_size())) {
- ERROR("failed, invalid header");
+ (seg_addr.get_segment_off() + header.mdlength + header.dlength > segment_size)) {
+ ERROR("failed, invalid record group header {}", start);
return crimson::ct_error::input_output_error::make();
}
if (header.mdlength == block_size) {
std::make_pair(std::move(header), std::move(bl))
);
}
- return segment_manager.read(
- paddr_t::make_seg_paddr(
+
+ auto rest_start = paddr_t::make_seg_paddr(
seg_addr.get_segment_id(),
seg_addr.get_segment_off() + (segment_off_t)block_size
- ),
- header.mdlength - block_size
+ );
+ auto rest_len = header.mdlength - block_size;
+ TRACE("reading record group header rest {}~{}", rest_start, rest_len);
+ return segment_manager.read(rest_start, rest_len
).safe_then([header=std::move(header), bl=std::move(bl)
](auto&& bptail) mutable {
bl.push_back(bptail);
LOG_PREFIX(ExtentReader::read_validate_data);
auto& segment_manager = *segment_managers[record_base.get_device_id()];
auto data_addr = record_base.add_offset(header.mdlength);
- DEBUG("reading data blocks {}+{}...", data_addr, header.dlength);
+ TRACE("reading record group data blocks {}~{}", data_addr, header.dlength);
return segment_manager.read(
data_addr,
header.dlength
found_record_handler_t& handler,
std::size_t& budget_used)
{
+ LOG_PREFIX(ExtentReader::consume_next_records);
auto& next = cursor.pending_record_groups.front();
auto total_length = next.header.dlength + next.header.mdlength;
budget_used += total_length;
static_cast<segment_off_t>(total_length)
}
};
+ DEBUG("processing {} at {}, budget_used={}",
+ next.header, locator, budget_used);
return handler(
locator,
next.header,
next.mdbuffer
- ).safe_then([&cursor] {
+ ).safe_then([FNAME, &cursor] {
cursor.pop_record_group();
+ if (cursor.is_complete()) {
+ INFO("complete at {}, no more record group", cursor);
+ }
});
}
#include "crimson/os/seastore/segment_cleaner.h"
SET_SUBSYS(seastore_journal);
+/*
+ * format:
+ * - H<handle-addr> information
+ *
+ * levels:
+ * - INFO: major initiation, closing, rolling and replay operations
+ * - DEBUG: INFO details, major submit operations
+ * - TRACE: DEBUG details
+ */
namespace crimson::os::seastore {
register_metrics();
}
+Journal::open_for_write_ret Journal::open_for_write()
+{
+ LOG_PREFIX(Journal::open_for_write);
+ INFO("device_id={}", journal_segment_manager.get_device_id());
+ return journal_segment_manager.open();
+}
+
+Journal::close_ertr::future<> Journal::close()
+{
+ LOG_PREFIX(Journal::close);
+ INFO("closing");
+ metrics.clear();
+ return journal_segment_manager.close();
+}
+
Journal::prep_replay_segments_fut
Journal::prep_replay_segments(
std::vector<std::pair<segment_id_t, segment_header_t>> segments)
{
LOG_PREFIX(Journal::prep_replay_segments);
- DEBUG("{} segments", segments.size());
if (segments.empty()) {
+ ERROR("no journal segments for replay");
return crimson::ct_error::input_output_error::make();
}
std::sort(
auto journal_tail = segments.rbegin()->second.journal_tail;
segment_provider->update_journal_tail_committed(journal_tail);
auto replay_from = journal_tail.offset;
- DEBUG("journal_tail={}", journal_tail);
auto from = segments.begin();
if (replay_from != P_ADDR_NULL) {
from = std::find_if(
from->first,
journal_segment_manager.get_block_size());
}
- auto ret = replay_segments_t(segments.end() - from);
+
+ auto num_segments = segments.end() - from;
+ INFO("{} segments to replay, from {}",
+ num_segments, replay_from);
+ auto ret = replay_segments_t(num_segments);
std::transform(
from, segments.end(), ret.begin(),
- [this, FNAME](const auto &p) {
+ [this](const auto &p) {
auto ret = journal_seq_t{
p.second.journal_segment_seq,
paddr_t::make_seg_paddr(
p.first,
journal_segment_manager.get_block_size())
};
- DEBUG("replaying from {}", ret);
return std::make_pair(ret, p.second);
});
ret[0].first.offset = replay_from;
delta_handler_t &handler)
{
LOG_PREFIX(Journal::replay_segment);
- DEBUG("starting at {}", seq);
+ INFO("starting at {} -- {}", seq, header);
return seastar::do_with(
scan_valid_records_cursor(seq),
ExtentReader::found_record_handler_t([=, &handler](
const bufferlist& mdbuf)
-> ExtentReader::scan_valid_records_ertr::future<>
{
- DEBUG("decoding {} records", header.records);
auto maybe_record_deltas_list = try_decode_deltas(
header, mdbuf, locator.record_block_base);
if (!maybe_record_deltas_list) {
// This should be impossible, we did check the crc on the mdbuf
- ERROR("unable to decode deltas for record {}", locator.record_block_base);
+ ERROR("unable to decode deltas for record {} at {}",
+ header, locator);
return crimson::ct_error::input_output_error::make();
}
FNAME,
&handler](record_deltas_t& record_deltas)
{
- DEBUG("decoded {} deltas at block_base {}",
- record_deltas.deltas.size(),
- record_deltas.record_block_base);
auto locator = record_locator_t{
record_deltas.record_block_base,
write_result
};
+ DEBUG("processing {} deltas at block_base {}",
+ record_deltas.deltas.size(),
+ locator);
return crimson::do_for_each(
record_deltas.deltas,
[locator,
std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
delta_handler_t &&delta_handler)
{
+ LOG_PREFIX(Journal::replay);
+ INFO("got {} segments", segment_headers.size());
return seastar::do_with(
std::move(delta_handler), replay_segments_t(),
[this, segment_headers=std::move(segment_headers)]
{
return prep_replay_segments(std::move(segment_headers)
).safe_then([this, &handler, &segments](auto replay_segs) mutable {
- LOG_PREFIX(Journal::replay);
- DEBUG("found {} segments", replay_segs.size());
segments = std::move(replay_segs);
return crimson::do_for_each(segments, [this, &handler](auto i) mutable {
return replay_segment(i.first, i.second, handler);
void Journal::register_metrics()
{
+ LOG_PREFIX(Journal::register_metrics);
+ DEBUG("");
record_submitter.reset_stats();
namespace sm = seastar::metrics;
metrics.add_group(
reset();
}
+Journal::JournalSegmentManager::open_ret
+Journal::JournalSegmentManager::open()
+{
+ return roll().safe_then([this] {
+ return get_current_write_seq();
+ });
+}
+
Journal::JournalSegmentManager::close_ertr::future<>
Journal::JournalSegmentManager::close()
{
+ LOG_PREFIX(JournalSegmentManager::close);
+ if (current_journal_segment) {
+ INFO("segment_id={}, seq={}, "
+ "written_to={}, committed_to={}, nonce={}",
+ current_journal_segment->get_segment_id(),
+ get_segment_seq(),
+ written_to,
+ committed_to,
+ current_segment_nonce);
+ } else {
+ INFO("no current journal segment");
+ }
+
return (
current_journal_segment ?
current_journal_segment->close() :
Journal::JournalSegmentManager::roll_ertr::future<>
Journal::JournalSegmentManager::roll()
{
+ LOG_PREFIX(JournalSegmentManager::roll);
auto old_segment_id = current_journal_segment ?
current_journal_segment->get_segment_id() :
NULL_SEG_ID;
+ if (current_journal_segment) {
+ INFO("closing segment {}, seq={}, "
+ "written_to={}, committed_to={}, nonce={}",
+ old_segment_id,
+ get_segment_seq(),
+ written_to,
+ committed_to,
+ current_segment_nonce);
+ }
return (
current_journal_segment ?
current_journal_segment->close() :
Segment::close_ertr::now()
).safe_then([this] {
- return segment_provider->get_segment(segment_manager.get_device_id());
+ return segment_provider->get_segment(get_device_id());
}).safe_then([this](auto segment) {
return segment_manager.open(segment);
}).safe_then([this](auto sref) {
LOG_PREFIX(JournalSegmentManager::write);
auto write_length = to_write.length();
auto write_start_seq = get_current_write_seq();
- DEBUG("write_start {} => {}, length={}",
- write_start_seq,
- write_start_seq.offset.as_seg_paddr().get_segment_off() + write_length,
- write_length);
+ TRACE("{}~{}", write_start_seq, write_length);
assert(write_length > 0);
assert((write_length % segment_manager.get_block_size()) == 0);
assert(!needs_roll(write_length));
const journal_seq_t& new_committed_to)
{
LOG_PREFIX(JournalSegmentManager::mark_committed);
- DEBUG("committed_to {} => {}",
- committed_to, new_committed_to);
+ TRACE("{} => {}", committed_to, new_committed_to);
assert(committed_to == journal_seq_t() ||
committed_to <= new_committed_to);
committed_to = new_committed_to;
new_tail,
current_segment_nonce,
false};
- DEBUG("segment_id {} journal_tail_target {}, header {}",
- segment.get_segment_id(),
- new_tail,
- header);
+ INFO("writing {} ...", header);
encode(header, bl);
bufferptr bp(
Journal::RecordBatch::add_pending_ret
Journal::RecordBatch::add_pending(
record_t&& record,
+ OrderingHandle& handle,
extent_len_t block_size)
{
LOG_PREFIX(RecordBatch::add_pending);
auto new_size = get_encoded_length_after(record, block_size);
- DEBUG("batches={}, write_size={}",
+ auto dlength_offset = pending.size.dlength;
+ TRACE("H{} batches={}, write_size={}, dlength_offset={} ...",
+ (void*)&handle,
pending.get_size() + 1,
- new_size.get_encoded_length());
+ new_size.get_encoded_length(),
+ dlength_offset);
assert(state != state_t::SUBMITTING);
assert(can_batch(record, block_size).value() == new_size);
- auto dlength_offset = pending.size.dlength;
pending.push_back(
std::move(record), block_size);
assert(pending.size == new_size);
state = state_t::PENDING;
return io_promise->get_shared_future(
- ).then([dlength_offset
+ ).then([dlength_offset, FNAME, &handle
](auto maybe_promise_result) -> add_pending_ret {
if (!maybe_promise_result.has_value()) {
+ ERROR("H{} write failed", (void*)&handle);
return crimson::ct_error::input_output_error::make();
}
auto write_result = maybe_promise_result->write_result;
maybe_promise_result->mdlength + dlength_offset),
write_result
};
+ TRACE("H{} write finish with {}", (void*)&handle, submit_result);
return add_pending_ret(
add_pending_ertr::ready_future_marker{},
submit_result);
const journal_seq_t& committed_to,
segment_nonce_t segment_nonce)
{
- LOG_PREFIX(RecordBatch::encode_batch);
- DEBUG("batches={}, committed_to={}",
- pending.get_size(),
- committed_to);
assert(state == state_t::PENDING);
assert(pending.get_size() > 0);
assert(io_promise.has_value());
void Journal::RecordBatch::set_result(
maybe_result_t maybe_write_result)
{
- LOG_PREFIX(RecordBatch::set_result);
maybe_promise_result_t result;
if (maybe_write_result.has_value()) {
- DEBUG("batches={}, write_start {} + {}",
- submitting_size,
- maybe_write_result->start_seq,
- maybe_write_result->length);
assert(maybe_write_result->length == submitting_length);
result = promise_result_t{
*maybe_write_result,
submitting_mdlength
};
- } else {
- ERROR("batches={}, write is failed!", submitting_size);
}
assert(state == state_t::SUBMITTING);
assert(io_promise.has_value());
const journal_seq_t& committed_to,
segment_nonce_t segment_nonce)
{
- LOG_PREFIX(RecordBatch::submit_pending_fast);
auto new_size = get_encoded_length_after(record, block_size);
- DEBUG("write_size={}", new_size.get_encoded_length());
+ std::ignore = new_size;
assert(state == state_t::EMPTY);
assert(can_batch(record, block_size).value() == new_size);
OrderingHandle& handle)
{
LOG_PREFIX(RecordSubmitter::submit);
+ DEBUG("H{} {} start ...", (void*)&handle, record);
assert(write_pipeline);
auto expected_size = record_group_size_t(
record.size,
).get_encoded_length();
auto max_record_length = journal_segment_manager.get_max_write_length();
if (expected_size > max_record_length) {
- ERROR("record size {} exceeds max {}",
- expected_size,
- max_record_length
- );
+ ERROR("H{} {} exceeds max record size {}",
+ (void*)&handle, record, max_record_length);
return crimson::ct_error::erange::make();
}
}
}
+void Journal::RecordSubmitter::decrement_io_with_flush()
+{
+ LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
+ assert(num_outstanding_io > 0);
+ --num_outstanding_io;
+#ifndef NDEBUG
+ auto prv_state = state;
+#endif
+ update_state();
+
+ if (wait_submit_promise.has_value()) {
+ DEBUG("wait resolved");
+ assert(prv_state == state_t::FULL);
+ wait_submit_promise->set_value();
+ wait_submit_promise.reset();
+ }
+
+ if (!p_current_batch->is_empty()) {
+ TRACE("flush");
+ flush_current_batch();
+ }
+}
+
void Journal::RecordSubmitter::account_submission(
std::size_t num,
const record_group_size_t& size)
{
- LOG_PREFIX(RecordSubmitter::account_submission);
- DEBUG("Journal::RecordSubmitter: submitting {} records, "
- "mdsize={}, dsize={}, fillness={}",
- num,
- size.get_raw_mdlength(),
- size.dlength,
- ((double)(size.get_raw_mdlength() + size.dlength) /
- (size.get_mdlength() + size.dlength)));
stats.record_group_padding_bytes +=
(size.get_mdlength() - size.get_raw_mdlength());
stats.record_group_metadata_bytes += size.get_raw_mdlength();
increment_io();
auto num = p_batch->get_num_records();
+ auto committed_to = journal_segment_manager.get_committed_to();
auto [to_write, sizes] = p_batch->encode_batch(
- journal_segment_manager.get_committed_to(),
- journal_segment_manager.get_nonce());
+ committed_to, journal_segment_manager.get_nonce());
+ DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...",
+ num, sizes, committed_to, num_outstanding_io);
account_submission(num, sizes);
std::ignore = journal_segment_manager.write(to_write
- ).safe_then([this, p_batch](auto write_result) {
+ ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
+ TRACE("{} records, {}, write done with {}", num, sizes, write_result);
finish_submit_batch(p_batch, write_result);
}).handle_error(
- crimson::ct_error::all_same_way([this, p_batch, FNAME](auto e) {
- ERROR("got error {}", e);
+ crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+ ERROR("{} records, {}, got error {}", num, sizes, e);
finish_submit_batch(p_batch, std::nullopt);
})
- ).handle_exception([this, p_batch, FNAME](auto e) {
- ERROR("got exception {}", e);
+ ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+ ERROR("{} records, {}, got exception {}", num, sizes, e);
finish_submit_batch(p_batch, std::nullopt);
});
}
OrderingHandle& handle,
bool flush)
{
+ LOG_PREFIX(RecordSubmitter::submit_pending);
assert(!p_current_batch->is_submitting());
stats.record_batch_stats.increment(
p_current_batch->get_num_records() + 1);
bool do_flush = (flush || state == state_t::IDLE);
- auto write_fut = [this, do_flush, record=std::move(record)]() mutable {
+ auto write_fut = [this, do_flush, FNAME, record=std::move(record), &handle]() mutable {
if (do_flush && p_current_batch->is_empty()) {
// fast path with direct write
increment_io();
+ auto committed_to = journal_segment_manager.get_committed_to();
auto [to_write, sizes] = p_current_batch->submit_pending_fast(
std::move(record),
journal_segment_manager.get_block_size(),
- journal_segment_manager.get_committed_to(),
+ committed_to,
journal_segment_manager.get_nonce());
+ DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...",
+ (void*)&handle, sizes, committed_to, num_outstanding_io);
account_submission(1, sizes);
return journal_segment_manager.write(to_write
).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
} else {
// indirect write with or without the existing pending records
auto write_fut = p_current_batch->add_pending(
- std::move(record), journal_segment_manager.get_block_size());
+ std::move(record),
+ handle,
+ journal_segment_manager.get_block_size());
if (do_flush) {
+ DEBUG("H{} added pending and flush", (void*)&handle);
flush_current_batch();
+ } else {
+ DEBUG("H{} added with {} pending",
+ (void*)&handle, p_current_batch->get_num_records());
}
return write_fut;
}
return handle.enter(write_pipeline->device_submission
).then([write_fut=std::move(write_fut)]() mutable {
return std::move(write_fut);
- }).safe_then([this, &handle](auto submit_result) {
+ }).safe_then([this, FNAME, &handle](auto submit_result) {
return handle.enter(write_pipeline->finalize
- ).then([this, submit_result] {
+ ).then([this, FNAME, submit_result, &handle] {
+ DEBUG("H{} finish with {}", (void*)&handle, submit_result);
journal_segment_manager.mark_committed(
submit_result.write_result.get_end_seq());
return submit_result;
record_t&& record,
OrderingHandle& handle)
{
+ LOG_PREFIX(RecordSubmitter::do_submit);
+ TRACE("H{} outstanding_io={}/{} ...",
+ (void*)&handle, num_outstanding_io, io_depth_limit);
assert(!p_current_batch->is_submitting());
if (state <= state_t::PENDING) {
// can increment io depth
if (!maybe_new_size.has_value() ||
(maybe_new_size->get_encoded_length() >
journal_segment_manager.get_max_write_length())) {
+ TRACE("H{} flush", (void*)&handle);
assert(p_current_batch->is_pending());
flush_current_batch();
return do_submit(std::move(record), handle);
} else if (journal_segment_manager.needs_roll(
maybe_new_size->get_encoded_length())) {
if (p_current_batch->is_pending()) {
+ TRACE("H{} flush and roll", (void*)&handle);
flush_current_batch();
+ } else {
+ TRACE("H{} roll", (void*)&handle);
}
return journal_segment_manager.roll(
).safe_then([this, record=std::move(record), &handle]() mutable {
if (!wait_submit_promise.has_value()) {
wait_submit_promise = seastar::promise<>();
}
+ DEBUG("H{} wait ...", (void*)&handle);
return wait_submit_promise->get_future(
).then([this, record=std::move(record), &handle]() mutable {
return do_submit(std::move(record), handle);