#include <boost/iterator/counting_iterator.hpp>
+#include "include/intarith.h"
+
#include "crimson/common/config_proxy.h"
#include "crimson/os/seastore/journal.h"
-
-#include "include/intarith.h"
+#include "crimson/os/seastore/logging.h"
#include "crimson/os/seastore/segment_cleaner.h"
-namespace {
- seastar::logger& logger() {
- return crimson::get_logger(ceph_subsys_seastore_journal);
- }
-}
+SET_SUBSYS(seastore_journal);
namespace crimson::os::seastore {
Journal::prep_replay_segments(
std::vector<std::pair<segment_id_t, segment_header_t>> segments)
{
- logger().debug(
- "Journal::prep_replay_segments: have {} segments",
- segments.size());
+ LOG_PREFIX(Journal::prep_replay_segments);
+ DEBUG("{} segments", segments.size());
if (segments.empty()) {
return crimson::ct_error::input_output_error::make();
}
auto journal_tail = segments.rbegin()->second.journal_tail;
segment_provider->update_journal_tail_committed(journal_tail);
auto replay_from = journal_tail.offset;
- logger().debug(
- "Journal::prep_replay_segments: journal_tail={}",
- journal_tail);
+ DEBUG("journal_tail={}", journal_tail);
auto from = segments.begin();
if (replay_from != P_ADDR_NULL) {
from = std::find_if(
return seg.first == seg_addr.get_segment_id();
});
if (from->second.journal_segment_seq != journal_tail.segment_seq) {
- logger().error(
- "Journal::prep_replay_segments: journal_tail {} does not match {}",
- journal_tail,
- from->second);
- assert(0 == "invalid");
+ ERROR("journal_tail {} does not match {}",
+ journal_tail, from->second);
+ ceph_abort();
}
} else {
replay_from = paddr_t::make_seg_paddr(
auto ret = replay_segments_t(segments.end() - from);
std::transform(
from, segments.end(), ret.begin(),
- [this](const auto &p) {
+ [this, FNAME](const auto &p) {
auto ret = journal_seq_t{
p.second.journal_segment_seq,
paddr_t::make_seg_paddr(
p.first,
journal_segment_manager.get_block_size())
};
- logger().debug(
- "Journal::prep_replay_segments: replaying from {}",
- ret);
+ DEBUG("replaying from {}", ret);
return std::make_pair(ret, p.second);
});
ret[0].first.offset = replay_from;
segment_header_t header,
delta_handler_t &handler)
{
- logger().debug("Journal::replay_segment: starting at {}", seq);
+ LOG_PREFIX(Journal::replay_segment);
+ DEBUG("starting at {}", seq);
return seastar::do_with(
scan_valid_records_cursor(seq),
ExtentReader::found_record_handler_t([=, &handler](
const bufferlist& mdbuf)
-> ExtentReader::scan_valid_records_ertr::future<>
{
- logger().debug("Journal::replay_segment: decoding {} records",
- header.records);
+ DEBUG("decoding {} records", header.records);
auto maybe_record_deltas_list = try_decode_deltas(
header, mdbuf, locator.record_block_base);
if (!maybe_record_deltas_list) {
// This should be impossible, we did check the crc on the mdbuf
- logger().error(
- "Journal::replay_segment: unable to decode deltas for record {}",
- locator.record_block_base);
+ ERROR("unable to decode deltas for record {}", locator.record_block_base);
return crimson::ct_error::input_output_error::make();
}
std::move(*maybe_record_deltas_list),
[write_result=locator.write_result,
this,
+ FNAME,
&handler](auto& record_deltas_list)
{
return crimson::do_for_each(
record_deltas_list,
[write_result,
this,
+ FNAME,
&handler](record_deltas_t& record_deltas)
{
- logger().debug("Journal::replay_segment: decoded {} deltas at block_base {}",
- record_deltas.deltas.size(),
- record_deltas.record_block_base);
+ DEBUG("decoded {} deltas at block_base {}",
+ record_deltas.deltas.size(),
+ record_deltas.record_block_base);
auto locator = record_locator_t{
record_deltas.record_block_base,
write_result
return seastar::do_with(
std::move(delta_handler), replay_segments_t(),
[this, segment_headers=std::move(segment_headers)]
- (auto &handler, auto &segments) mutable -> replay_ret {
- return prep_replay_segments(std::move(segment_headers)).safe_then(
- [this, &handler, &segments](auto replay_segs) mutable {
- logger().debug("Journal::replay: found {} segments", replay_segs.size());
- segments = std::move(replay_segs);
- return crimson::do_for_each(segments, [this, &handler](auto i) mutable {
- return replay_segment(i.first, i.second, handler);
- });
- });
+ (auto &handler, auto &segments) mutable -> replay_ret
+ {
+ return prep_replay_segments(std::move(segment_headers)
+ ).safe_then([this, &handler, &segments](auto replay_segs) mutable {
+ LOG_PREFIX(Journal::replay);
+ DEBUG("found {} segments", replay_segs.size());
+ segments = std::move(replay_segs);
+ return crimson::do_for_each(segments, [this, &handler](auto i) mutable {
+ return replay_segment(i.first, i.second, handler);
+ });
});
+ });
}
void Journal::register_metrics()
Journal::JournalSegmentManager::write_ret
Journal::JournalSegmentManager::write(ceph::bufferlist to_write)
{
+ LOG_PREFIX(JournalSegmentManager::write);
auto write_length = to_write.length();
auto write_start_seq = get_current_write_seq();
- logger().debug(
- "JournalSegmentManager::write: write_start {} => {}, length={}",
- write_start_seq,
- write_start_seq.offset.as_seg_paddr().get_segment_off() + write_length,
- write_length);
+ DEBUG("write_start {} => {}, length={}",
+ write_start_seq,
+ write_start_seq.offset.as_seg_paddr().get_segment_off() + write_length,
+ write_length);
assert(write_length > 0);
assert((write_length % segment_manager.get_block_size()) == 0);
assert(!needs_roll(write_length));
void Journal::JournalSegmentManager::mark_committed(
const journal_seq_t& new_committed_to)
{
- logger().debug(
- "JournalSegmentManager::mark_committed: committed_to {} => {}",
- committed_to, new_committed_to);
+ LOG_PREFIX(JournalSegmentManager::mark_committed);
+ DEBUG("committed_to {} => {}",
+ committed_to, new_committed_to);
assert(committed_to == journal_seq_t() ||
committed_to <= new_committed_to);
committed_to = new_committed_to;
Journal::JournalSegmentManager::initialize_segment_ertr::future<>
Journal::JournalSegmentManager::initialize_segment(Segment& segment)
{
+ LOG_PREFIX(JournalSegmentManager::initialize_segment);
auto new_tail = segment_provider->get_journal_tail_target();
// write out header
ceph_assert(segment.get_write_ptr() == 0);
new_tail,
current_segment_nonce,
false};
- logger().debug(
- "JournalSegmentManager::initialize_segment: segment_id {} journal_tail_target {}, header {}",
- segment.get_segment_id(),
- new_tail,
- header);
+ DEBUG("segment_id {} journal_tail_target {}, header {}",
+ segment.get_segment_id(),
+ new_tail,
+ header);
encode(header, bl);
bufferptr bp(
record_t&& record,
extent_len_t block_size)
{
+ LOG_PREFIX(RecordBatch::add_pending);
auto new_size = get_encoded_length_after(record, block_size);
- logger().debug(
- "Journal::RecordBatch::add_pending: batches={}, write_size={}",
- pending.get_size() + 1,
- new_size.get_encoded_length());
+ DEBUG("batches={}, write_size={}",
+ pending.get_size() + 1,
+ new_size.get_encoded_length());
assert(state != state_t::SUBMITTING);
assert(can_batch(record, block_size).value() == new_size);
const journal_seq_t& committed_to,
segment_nonce_t segment_nonce)
{
- logger().debug(
- "Journal::RecordBatch::encode_batch: batches={}, committed_to={}",
- pending.get_size(),
- committed_to);
+ LOG_PREFIX(RecordBatch::encode_batch);
+ DEBUG("batches={}, committed_to={}",
+ pending.get_size(),
+ committed_to);
assert(state == state_t::PENDING);
assert(pending.get_size() > 0);
assert(io_promise.has_value());
void Journal::RecordBatch::set_result(
maybe_result_t maybe_write_result)
{
+ LOG_PREFIX(RecordBatch::set_result);
maybe_promise_result_t result;
if (maybe_write_result.has_value()) {
- logger().debug(
- "Journal::RecordBatch::set_result: batches={}, write_start {} + {}",
- submitting_size,
- maybe_write_result->start_seq,
- maybe_write_result->length);
+ DEBUG("batches={}, write_start {} + {}",
+ submitting_size,
+ maybe_write_result->start_seq,
+ maybe_write_result->length);
assert(maybe_write_result->length == submitting_length);
result = promise_result_t{
*maybe_write_result,
submitting_mdlength
};
} else {
- logger().error(
- "Journal::RecordBatch::set_result: batches={}, write is failed!",
- submitting_size);
+ ERROR("batches={}, write is failed!", submitting_size);
}
assert(state == state_t::SUBMITTING);
assert(io_promise.has_value());
const journal_seq_t& committed_to,
segment_nonce_t segment_nonce)
{
+ LOG_PREFIX(RecordBatch::submit_pending_fast);
auto new_size = get_encoded_length_after(record, block_size);
- logger().debug(
- "Journal::RecordBatch::submit_pending_fast: write_size={}",
- new_size.get_encoded_length());
+ DEBUG("write_size={}", new_size.get_encoded_length());
assert(state == state_t::EMPTY);
assert(can_batch(record, block_size).value() == new_size);
journal_segment_manager{jsm},
batches(new RecordBatch[io_depth + 1])
{
- logger().info("Journal::RecordSubmitter: io_depth_limit={}, "
- "batch_capacity={}, batch_flush_size={}, "
- "preferred_fullness={}",
- io_depth, batch_capacity,
- batch_flush_size, preferred_fullness);
+ LOG_PREFIX(RecordSubmitter);
+ INFO("Journal::RecordSubmitter: io_depth_limit={}, "
+ "batch_capacity={}, batch_flush_size={}, "
+ "preferred_fullness={}",
+ io_depth, batch_capacity,
+ batch_flush_size, preferred_fullness);
ceph_assert(io_depth > 0);
ceph_assert(batch_capacity > 0);
ceph_assert(preferred_fullness >= 0 &&
record_t&& record,
OrderingHandle& handle)
{
+ LOG_PREFIX(RecordSubmitter::submit);
assert(write_pipeline);
auto expected_size = record_group_size_t(
record.size,
).get_encoded_length();
auto max_record_length = journal_segment_manager.get_max_write_length();
if (expected_size > max_record_length) {
- logger().error(
- "Journal::RecordSubmitter::submit: record size {} exceeds max {}",
- expected_size,
- max_record_length
+ ERROR("record size {} exceeds max {}",
+ expected_size,
+ max_record_length
);
return crimson::ct_error::erange::make();
}
std::size_t num,
const record_group_size_t& size)
{
- logger().debug("Journal::RecordSubmitter: submitting {} records, "
- "mdsize={}, dsize={}, fillness={}",
- num,
- size.get_raw_mdlength(),
- size.dlength,
- ((double)(size.get_raw_mdlength() + size.dlength) /
- (size.get_mdlength() + size.dlength)));
+ LOG_PREFIX(RecordSubmitter::account_submission);
+ DEBUG("Journal::RecordSubmitter: submitting {} records, "
+ "mdsize={}, dsize={}, fillness={}",
+ num,
+ size.get_raw_mdlength(),
+ size.dlength,
+ ((double)(size.get_raw_mdlength() + size.dlength) /
+ (size.get_mdlength() + size.dlength)));
stats.record_group_padding_bytes +=
(size.get_mdlength() - size.get_raw_mdlength());
stats.record_group_metadata_bytes += size.get_raw_mdlength();
void Journal::RecordSubmitter::flush_current_batch()
{
+ LOG_PREFIX(RecordSubmitter::flush_current_batch);
RecordBatch* p_batch = p_current_batch;
assert(p_batch->is_pending());
p_current_batch = nullptr;
).safe_then([this, p_batch](auto write_result) {
finish_submit_batch(p_batch, write_result);
}).handle_error(
- crimson::ct_error::all_same_way([this, p_batch](auto e) {
- logger().error(
- "Journal::RecordSubmitter::flush_current_batch: got error {}",
- e);
+ crimson::ct_error::all_same_way([this, p_batch, FNAME](auto e) {
+ ERROR("got error {}", e);
finish_submit_batch(p_batch, std::nullopt);
})
- ).handle_exception([this, p_batch](auto e) {
- logger().error(
- "Journal::RecordSubmitter::flush_current_batch: got exception {}",
- e);
+ ).handle_exception([this, p_batch, FNAME](auto e) {
+ ERROR("got exception {}", e);
finish_submit_batch(p_batch, std::nullopt);
});
}