if (i->get_type() == extent_types_t::ROOT) {
root = t.root;
DEBUGT("writing out root delta for {}", t, *t.root);
- record.deltas.push_back(
+ record.push_back(
delta_info_t{
extent_types_t::ROOT,
paddr_t{},
t.root->get_delta()
});
} else {
- record.deltas.push_back(
+ record.push_back(
delta_info_t{
i->get_type(),
i->get_paddr(),
}
assert(bl.length() == i->get_length());
- record.extents.push_back(extent_t{
+ record.push_back(extent_t{
i->get_type(),
i->is_logical()
? i->cast<LogicalCachedExtent>()->get_laddr()
delta_info_t delta;
delta.type = extent_types_t::RBM_ALLOC_INFO;
delta.bl = bl;
- record.deltas.push_back(delta);
+ record.push_back(std::move(delta));
}
for (auto &i: t.ool_block_list) {
record_header_fullness.ool_stats.filled_bytes += ool_stats.header_raw_bytes;
record_header_fullness.ool_stats.total_bytes += ool_stats.header_bytes;
- auto record_size = get_encoded_record_length(
- record, reader.get_block_size());
+ auto record_size = record_group_size_t(
+ record.size, reader.get_block_size());
auto inline_overhead =
- record_size.mdlength + record_size.dlength - record.get_raw_data_size();
+ record_size.get_encoded_length() - record.get_raw_data_size();
efforts.inline_record_overhead_bytes += inline_overhead;
- record_header_fullness.inline_stats.filled_bytes += record_size.raw_mdlength;
- record_header_fullness.inline_stats.total_bytes += record_size.mdlength;
+ record_header_fullness.inline_stats.filled_bytes += record_size.get_raw_mdlength();
+ record_header_fullness.inline_stats.total_bytes += record_size.get_mdlength();
return record;
}
Transaction& t,
ool_record_t& record)
{
- record_size_t record_size = record.get_encoded_record_length();
- allocated_to += record_size.mdlength + record_size.dlength;
+ auto record_size = record.get_encoded_record_length();
+ allocated_to += record_size.get_encoded_length();
bufferlist bl = record.encode(
- record_size,
current_segment->segment->get_segment_id(),
0);
seastar::promise<> pr;
auto& stats = t.get_ool_write_stats();
stats.extents.num += record.get_num_extents();
stats.extents.bytes += record_size.dlength;
- stats.header_raw_bytes += record_size.raw_mdlength;
- stats.header_bytes += record_size.mdlength;
+ stats.header_raw_bytes += record_size.get_raw_mdlength();
+ stats.header_bytes += record_size.get_mdlength();
stats.num_records += 1;
return trans_intr::make_interruptible(
public:
ool_record_t(size_t block_size) : block_size(block_size) {}
- record_size_t get_encoded_record_length() {
+ record_group_size_t get_encoded_record_length() {
assert(extents.size() == record.extents.size());
- return crimson::os::seastore::get_encoded_record_length(record, block_size);
+ return record_group_size_t(record.size, block_size);
}
size_t get_wouldbe_encoded_record_length(LogicalCachedExtentRef& extent) {
- auto raw_mdlength = get_encoded_record_raw_mdlength(record, block_size);
- auto wouldbe_mdlength = p2roundup(
- raw_mdlength + ceph::encoded_sizeof_bounded<extent_info_t>(),
- block_size);
- return wouldbe_mdlength + extent_buf_len + extent->get_bptr().length();
+ record_size_t rsize = record.size;
+ rsize.account_extent(extent->get_bptr().length());
+ return record_group_size_t(rsize, block_size).get_encoded_length();
}
- ceph::bufferlist encode(const record_size_t& rsize,
- segment_id_t segment,
+ ceph::bufferlist encode(segment_id_t segment,
segment_nonce_t nonce) {
assert(extents.size() == record.extents.size());
- segment_off_t extent_offset = base + rsize.mdlength;
+ assert(!record.deltas.size());
+ auto record_group = record_group_t(std::move(record), block_size);
+ segment_off_t extent_offset = base + record_group.size.get_mdlength();
for (auto& extent : extents) {
extent.set_ool_paddr(
paddr_t::make_seg_paddr(segment, extent_offset));
extent_offset += extent.get_bptr().length();
}
- assert(extent_offset == (segment_off_t)(base + rsize.mdlength + rsize.dlength));
- return encode_record(rsize, std::move(record), block_size, journal_seq_t(), nonce);
+ assert(extent_offset ==
+ (segment_off_t)(base + record_group.size.get_encoded_length()));
+ return encode_records(record_group, journal_seq_t(), nonce);
}
void add_extent(LogicalCachedExtentRef& extent) {
extents.emplace_back(extent);
ceph::bufferlist bl;
bl.append(extent->get_bptr());
- record.extents.emplace_back(extent_t{
+ record.push_back(extent_t{
extent->get_type(),
extent->get_laddr(),
std::move(bl)});
- extent_buf_len += extent->get_bptr().length();
}
std::vector<OolExtent>& get_extents() {
return extents;
return base;
}
void clear() {
- record.extents.clear();
+ record = {};
extents.clear();
- assert(!record.deltas.size());
- extent_buf_len = 0;
base = MAX_SEG_OFF;
}
uint64_t get_num_extents() const {
std::vector<OolExtent> extents;
record_t record;
size_t block_size;
- segment_off_t extent_buf_len = 0;
segment_off_t base = MAX_SEG_OFF;
};
const record_header_t& header,
const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
{
- auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf);
+ auto maybe_record_extent_infos = try_decode_extent_info(header, mdbuf);
if (!maybe_record_extent_infos) {
// This should be impossible, we did check the crc on the mdbuf
logger().error(
paddr_t extent_offset = locator.record_block_base;
logger().debug("ExtentReader::scan_extents: decoded {} extents",
- maybe_record_extent_infos->size());
- for (const auto &i : *maybe_record_extent_infos) {
+ maybe_record_extent_infos->extent_infos.size());
+ for (const auto &i : maybe_record_extent_infos->extent_infos) {
extents->emplace_back(extent_offset, i);
auto& seg_addr = extent_offset.as_seg_paddr();
seg_addr.set_segment_off(
segment_manager.get_block_size());
bufferlist bl;
bl.append(bptr);
- auto bp = bl.cbegin();
- record_header_t header;
- try {
- decode(header, bp);
- } catch (ceph::buffer::error &e) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- if (header.segment_nonce != nonce) {
+ auto maybe_header = try_decode_record_header(bl, nonce);
+ if (!maybe_header.has_value()) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
auto& seg_addr = start.as_seg_paddr();
+ auto& header = *maybe_header;
if (header.mdlength < block_size ||
header.mdlength % block_size != 0 ||
header.dlength % block_size != 0 ||
read_validate_record_metadata_ertr::ready_future_marker{},
std::make_pair(std::move(header), std::move(bl)));
});
- }).safe_then([this](auto p) {
- if (p && validate_metadata(p->second)) {
+ }).safe_then([](auto p) {
+ if (p && validate_record_metadata(p->second)) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::move(*p)
});
}
-std::optional<std::vector<extent_info_t>>
-ExtentReader::try_decode_extent_infos(
- record_header_t header,
- const bufferlist &bl)
-{
- auto bliter = bl.cbegin();
- bliter += ceph::encoded_sizeof_bounded<record_header_t>();
- bliter += sizeof(checksum_t) /* crc */;
- logger().debug("{}: decoding {} extents", __func__, header.extents);
- std::vector<extent_info_t> extent_infos(header.extents);
- for (auto &&i : extent_infos) {
- try {
- decode(i, bliter);
- } catch (ceph::buffer::error &e) {
- return std::nullopt;
- }
- }
- return extent_infos;
-}
-
ExtentReader::read_validate_data_ret
ExtentReader::read_validate_data(
paddr_t record_base,
).safe_then([=, &header](auto bptr) {
bufferlist bl;
bl.append(bptr);
- return bl.crc32c(-1) == header.data_crc;
+ return validate_record_data(header, bl);
});
}
-bool ExtentReader::validate_metadata(const bufferlist &bl)
-{
- auto bliter = bl.cbegin();
- auto test_crc = bliter.crc32c(
- ceph::encoded_sizeof_bounded<record_header_t>(),
- -1);
- ceph_le32 recorded_crc_le;
- decode(recorded_crc_le, bliter);
- uint32_t recorded_crc = recorded_crc_le;
- test_crc = bliter.crc32c(
- bliter.get_remaining(),
- test_crc);
- return test_crc == recorded_crc;
-}
-
ExtentReader::consume_record_group_ertr::future<>
ExtentReader::consume_next_records(
scan_valid_records_cursor& cursor,
paddr_t start,
segment_nonce_t nonce);
- /// attempts to decode extent infos from bl, return nullopt if unsuccessful
- std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
- record_header_t header,
- const bufferlist &bl);
-
/// read and validate data
using read_validate_data_ertr = read_ertr;
using read_validate_data_ret = read_validate_data_ertr::future<bool>;
found_record_handler_t& handler,
std::size_t& budget_used);
-
- /// validate embedded metadata checksum
- static bool validate_metadata(const bufferlist &bl);
-
friend class TransactionManager;
};
std::move(ret));
}
-std::optional<std::vector<delta_info_t>> Journal::try_decode_deltas(
- record_header_t header,
- const bufferlist &bl)
-{
- auto bliter = bl.cbegin();
- bliter += ceph::encoded_sizeof_bounded<record_header_t>();
- bliter += sizeof(checksum_t) /* crc */;
- bliter += header.extents * ceph::encoded_sizeof_bounded<extent_info_t>();
- logger().debug("Journal::try_decode_deltas: decoding {} deltas", header.deltas);
- std::vector<delta_info_t> deltas(header.deltas);
- for (auto &&i : deltas) {
- try {
- decode(i, bliter);
- } catch (ceph::buffer::error &e) {
- return std::nullopt;
- }
- }
- return deltas;
-}
-
Journal::replay_ertr::future<>
Journal::replay_segment(
journal_seq_t seq,
return seastar::do_with(
std::move(*maybe_record_deltas_list),
- [=](auto &deltas)
+ [locator,
+ this,
+ &handler](record_deltas_t& record_deltas)
{
logger().debug("Journal::replay_segment: decoded {} deltas at block_base {}",
- deltas.size(),
+ record_deltas.deltas.size(),
locator.record_block_base);
return crimson::do_for_each(
- deltas,
- [=](auto &delta)
+ record_deltas.deltas,
+ [locator,
+ this,
+ &handler](delta_info_t& delta)
{
/* The journal may validly contain deltas for extents in
* since released segments. We can detect those cases by
auto& seg_addr = delta.paddr.as_seg_paddr();
if (delta.paddr != P_ADDR_NULL &&
(segment_provider->get_seq(seg_addr.get_segment_id()) >
- seq.segment_seq)) {
+ locator.write_result.start_seq.segment_seq)) {
return replay_ertr::now();
} else {
return handler(locator, delta);
Journal::RecordBatch::add_pending_ret
Journal::RecordBatch::add_pending(
record_t&& record,
- const record_size_t& rsize)
+ extent_len_t block_size)
{
+ auto new_encoded_length = get_encoded_length_after(record, block_size);
logger().debug(
"Journal::RecordBatch::add_pending: batches={}, write_size={}",
- records.size() + 1,
- get_encoded_length(rsize));
+ pending.get_size() + 1,
+ new_encoded_length);
assert(state != state_t::SUBMITTING);
- assert(can_batch(rsize));
-
- auto block_start_offset = encoded_length + rsize.mdlength;
- records.push_back(std::move(record));
- record_sizes.push_back(rsize);
- auto new_encoded_length = get_encoded_length(rsize);
- assert(new_encoded_length < MAX_SEG_OFF);
- encoded_length = new_encoded_length;
+ assert(can_batch(record, block_size) == new_encoded_length);
+
+ auto block_start_offset = record.size.get_mdlength(block_size);
+ if (state != state_t::EMPTY) {
+ block_start_offset += pending.size.get_encoded_length();
+ }
+ pending.push_back(
+ std::move(record), block_size);
+ assert(pending.size.get_encoded_length() == new_encoded_length);
if (state == state_t::EMPTY) {
assert(!io_promise.has_value());
io_promise = seastar::shared_promise<maybe_result_t>();
});
}
-ceph::bufferlist Journal::RecordBatch::encode_records(
- size_t block_size,
+ceph::bufferlist Journal::RecordBatch::encode_batch(
const journal_seq_t& committed_to,
segment_nonce_t segment_nonce)
{
logger().debug(
- "Journal::RecordBatch::encode_records: batches={}, committed_to={}",
- records.size(),
+ "Journal::RecordBatch::encode_batch: batches={}, committed_to={}",
+ pending.get_size(),
committed_to);
assert(state == state_t::PENDING);
- assert(records.size());
- assert(records.size() == record_sizes.size());
+ assert(pending.get_size() > 0);
assert(io_promise.has_value());
state = state_t::SUBMITTING;
- ceph::bufferlist bl;
- std::size_t i = 0;
- do {
- auto record_bl = encode_record(
- record_sizes[i],
- std::move(records[i]),
- block_size,
- committed_to,
- segment_nonce);
- bl.claim_append(record_bl);
- } while ((++i) < records.size());
- assert(bl.length() == (std::size_t)encoded_length);
+ submitting_size = pending.get_size();
+ submitting_length = pending.size.get_encoded_length();
+ auto bl = encode_records(pending, committed_to, segment_nonce);
+ // Note: pending is cleared here
+ assert(bl.length() == (std::size_t)submitting_length);
return bl;
}
if (maybe_write_result.has_value()) {
logger().debug(
"Journal::RecordBatch::set_result: batches={}, write_start {} + {}",
- records.size(),
+ submitting_size,
maybe_write_result->start_seq,
maybe_write_result->length);
- assert(maybe_write_result->length == encoded_length);
+ assert(maybe_write_result->length == submitting_length);
} else {
logger().error(
"Journal::RecordBatch::set_result: batches={}, write is failed!",
- records.size());
+ submitting_size);
}
assert(state == state_t::SUBMITTING);
assert(io_promise.has_value());
state = state_t::EMPTY;
- encoded_length = 0;
- records.clear();
- record_sizes.clear();
+ submitting_size = 0;
+ submitting_length = 0;
io_promise->set_value(maybe_write_result);
io_promise.reset();
}
-ceph::bufferlist Journal::RecordBatch::submit_pending_fast(
+std::pair<ceph::bufferlist, record_group_size_t>
+Journal::RecordBatch::submit_pending_fast(
record_t&& record,
- const record_size_t& rsize,
- size_t block_size,
+ extent_len_t block_size,
const journal_seq_t& committed_to,
segment_nonce_t segment_nonce)
{
+ auto encoded_length = get_encoded_length_after(record, block_size);
logger().debug(
"Journal::RecordBatch::submit_pending_fast: write_size={}",
- get_encoded_length(rsize));
+ encoded_length);
assert(state == state_t::EMPTY);
- assert(can_batch(rsize));
-
- auto bl = encode_record(
- rsize,
- std::move(record),
- block_size,
- committed_to,
- segment_nonce);
- assert(bl.length() == get_encoded_length(rsize));
- return bl;
+ assert(can_batch(record, block_size) == encoded_length);
+
+ auto group = record_group_t(std::move(record), block_size);
+ auto size = group.size;
+ auto bl = encode_records(group, committed_to, segment_nonce);
+ assert(bl.length() == encoded_length);
+ assert(bl.length() == size.get_encoded_length());
+ return std::make_pair(bl, size);
}
Journal::RecordSubmitter::RecordSubmitter(
OrderingHandle& handle)
{
assert(write_pipeline);
- auto rsize = get_encoded_record_length(
- record, journal_segment_manager.get_block_size());
- auto total = rsize.mdlength + rsize.dlength;
+ auto expected_size = record_group_size_t(
+ record.size,
+ journal_segment_manager.get_block_size()
+ ).get_encoded_length();
auto max_record_length = journal_segment_manager.get_max_write_length();
- if (total > max_record_length) {
+ if (expected_size > max_record_length) {
logger().error(
"Journal::RecordSubmitter::submit: record size {} exceeds max {}",
- total,
+ expected_size,
max_record_length
);
return crimson::ct_error::erange::make();
}
- return do_submit(std::move(record), rsize, handle);
+ return do_submit(std::move(record), handle);
}
void Journal::RecordSubmitter::update_state()
pop_free_batch();
increment_io();
- ceph::bufferlist to_write = p_batch->encode_records(
- journal_segment_manager.get_block_size(),
+ ceph::bufferlist to_write = p_batch->encode_batch(
journal_segment_manager.get_committed_to(),
journal_segment_manager.get_nonce());
std::ignore = journal_segment_manager.write(to_write
Journal::RecordSubmitter::submit_pending_ret
Journal::RecordSubmitter::submit_pending(
record_t&& record,
- const record_size_t& rsize,
OrderingHandle& handle,
bool flush)
{
assert(!p_current_batch->is_submitting());
record_batch_stats.increment(
p_current_batch->get_num_records() + 1);
- auto write_fut = [this, flush, record=std::move(record), &rsize]() mutable {
+ auto write_fut = [this, flush, record=std::move(record)]() mutable {
if (flush && p_current_batch->is_empty()) {
// fast path with direct write
increment_io();
- ceph::bufferlist to_write = p_current_batch->submit_pending_fast(
+ auto [to_write, sizes] = p_current_batch->submit_pending_fast(
std::move(record),
- rsize,
journal_segment_manager.get_block_size(),
journal_segment_manager.get_committed_to(),
journal_segment_manager.get_nonce());
return journal_segment_manager.write(to_write
- ).safe_then([rsize](auto write_result) {
+ ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
return record_locator_t{
- write_result.start_seq.offset.add_offset(rsize.mdlength),
+ write_result.start_seq.offset.add_offset(mdlength),
write_result
};
}).finally([this] {
} else {
// indirect write with or without the existing pending records
auto write_fut = p_current_batch->add_pending(
- std::move(record), rsize);
+ std::move(record), journal_segment_manager.get_block_size());
if (flush) {
flush_current_batch();
}
Journal::RecordSubmitter::do_submit_ret
Journal::RecordSubmitter::do_submit(
record_t&& record,
- const record_size_t& rsize,
OrderingHandle& handle)
{
assert(!p_current_batch->is_submitting());
if (state <= state_t::PENDING) {
// can increment io depth
assert(!wait_submit_promise.has_value());
- auto batched_size = p_current_batch->can_batch(rsize);
+ auto batched_size = p_current_batch->can_batch(
+ record, journal_segment_manager.get_block_size());
if (batched_size == 0 ||
batched_size > journal_segment_manager.get_max_write_length()) {
assert(p_current_batch->is_pending());
flush_current_batch();
- return do_submit(std::move(record), rsize, handle);
+ return do_submit(std::move(record), handle);
} else if (journal_segment_manager.needs_roll(batched_size)) {
if (p_current_batch->is_pending()) {
flush_current_batch();
}
return journal_segment_manager.roll(
- ).safe_then([this, record=std::move(record), rsize, &handle]() mutable {
- return do_submit(std::move(record), rsize, handle);
+ ).safe_then([this, record=std::move(record), &handle]() mutable {
+ return do_submit(std::move(record), handle);
});
} else {
- return submit_pending(std::move(record), rsize, handle, true);
+ return submit_pending(std::move(record), handle, true);
}
}
assert(state == state_t::FULL);
// cannot increment io depth
- auto batched_size = p_current_batch->can_batch(rsize);
+ auto batched_size = p_current_batch->can_batch(
+ record, journal_segment_manager.get_block_size());
if (batched_size == 0 ||
batched_size > journal_segment_manager.get_max_write_length() ||
journal_segment_manager.needs_roll(batched_size)) {
wait_submit_promise = seastar::promise<>();
}
return wait_submit_promise->get_future(
- ).then([this, record=std::move(record), rsize, &handle]() mutable {
- return do_submit(std::move(record), rsize, handle);
+ ).then([this, record=std::move(record), &handle]() mutable {
+ return do_submit(std::move(record), handle);
});
} else {
- return submit_pending(std::move(record), rsize, handle, false);
+ return submit_pending(std::move(record), handle, false);
}
}
}
std::size_t get_num_records() const {
- return records.size();
+ return pending.get_size();
}
// return the expected write size if allows to batch,
// otherwise, return 0
- std::size_t can_batch(const record_size_t& rsize) const {
+ std::size_t can_batch(
+ const record_t& record,
+ extent_len_t block_size) const {
assert(state != state_t::SUBMITTING);
- if (records.size() >= batch_capacity ||
- static_cast<std::size_t>(encoded_length) > batch_flush_size) {
+ if (pending.get_size() >= batch_capacity ||
+ (pending.get_size() > 0 &&
+ pending.size.get_encoded_length() > batch_flush_size)) {
assert(state == state_t::PENDING);
return 0;
}
- return get_encoded_length(rsize);
+ return get_encoded_length_after(record, block_size);
}
void initialize(std::size_t i,
index = i;
batch_capacity = _batch_capacity;
batch_flush_size = _batch_flush_size;
- records.reserve(batch_capacity);
- record_sizes.reserve(batch_capacity);
+ pending.reserve(batch_capacity);
}
// Add to the batch, the future will be resolved after the batch is
// in the batch.
using add_pending_ertr = JournalSegmentManager::write_ertr;
using add_pending_ret = add_pending_ertr::future<record_locator_t>;
- add_pending_ret add_pending(record_t&&, const record_size_t&);
+ add_pending_ret add_pending(
+ record_t&&,
+ extent_len_t block_size);
// Encode the batched records for write.
- ceph::bufferlist encode_records(
- size_t block_size,
+ ceph::bufferlist encode_batch(
const journal_seq_t& committed_to,
segment_nonce_t segment_nonce);
// The fast path that is equivalent to submit a single record as a batch.
//
// Essentially, equivalent to the combined logic of:
- // add_pending(), encode_records() and set_result() above without
+ // add_pending(), encode_batch() and set_result() above without
// the intervention of the shared io_promise.
//
// Note the current RecordBatch can be reused afterwards.
- ceph::bufferlist submit_pending_fast(
+ std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
record_t&&,
- const record_size_t&,
- size_t block_size,
+ extent_len_t block_size,
const journal_seq_t& committed_to,
segment_nonce_t segment_nonce);
private:
- std::size_t get_encoded_length(const record_size_t& rsize) const {
- auto ret = encoded_length + rsize.mdlength + rsize.dlength;
- assert(ret > 0);
- return ret;
+ std::size_t get_encoded_length_after(
+ const record_t& record,
+ extent_len_t block_size) const {
+ return pending.size.get_encoded_length_after(
+ record.size, block_size);
}
state_t state = state_t::EMPTY;
std::size_t index = 0;
std::size_t batch_capacity = 0;
std::size_t batch_flush_size = 0;
- segment_off_t encoded_length = 0;
- std::vector<record_t> records;
- std::vector<record_size_t> record_sizes;
+
+ record_group_t pending;
+ std::size_t submitting_size = 0;
+ segment_off_t submitting_length = 0;
+
std::optional<seastar::shared_promise<maybe_result_t> > io_promise;
};
using submit_pending_ret = submit_pending_ertr::future<
record_locator_t>;
submit_pending_ret submit_pending(
- record_t&&, const record_size_t&, OrderingHandle &handle, bool flush);
+ record_t&&, OrderingHandle &handle, bool flush);
using do_submit_ret = submit_pending_ret;
do_submit_ret do_submit(
- record_t&&, const record_size_t&, OrderingHandle&);
+ record_t&&, OrderingHandle&);
state_t state = state_t::IDLE;
std::size_t num_outstanding_io = 0;
prep_replay_segments_fut prep_replay_segments(
std::vector<std::pair<segment_id_t, segment_header_t>> segments);
- /// attempts to decode deltas from bl, return nullopt if unsuccessful
- std::optional<std::vector<delta_info_t>> try_decode_deltas(
- record_header_t header,
- const bufferlist &bl);
-
/// replays records starting at start through end of segment
replay_ertr::future<>
replay_segment(
// vim: ts=8 sw=2 smarttab
#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/common/log.h"
+
+namespace {
+
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_seastore);
+}
+
+}
namespace crimson::os::seastore {
<< ")";
}
-extent_len_t get_encoded_record_raw_mdlength(
- const record_t &record,
- size_t block_size) {
- extent_len_t metadata =
- (extent_len_t)ceph::encoded_sizeof_bounded<record_header_t>();
- metadata += sizeof(checksum_t) /* crc */;
- metadata += record.extents.size() *
- ceph::encoded_sizeof_bounded<extent_info_t>();
- for (const auto &i: record.deltas) {
- metadata += ceph::encoded_sizeof(i);
- }
- return metadata;
+void record_size_t::account_extent(extent_len_t extent_len)
+{
+ assert(extent_len);
+ plain_mdlength += ceph::encoded_sizeof_bounded<extent_info_t>();
+ dlength += extent_len;
}
-record_size_t get_encoded_record_length(
- const record_t &record,
- size_t block_size) {
- extent_len_t raw_mdlength =
- get_encoded_record_raw_mdlength(record, block_size);
- extent_len_t mdlength =
- p2roundup(raw_mdlength, (extent_len_t)block_size);
- extent_len_t dlength = 0;
- for (const auto &i: record.extents) {
- dlength += i.bl.length();
- }
- return record_size_t{raw_mdlength, mdlength, dlength};
+void record_size_t::account(const delta_info_t& delta)
+{
+ assert(delta.bl.length());
+ plain_mdlength += ceph::encoded_sizeof(delta);
+}
+
+extent_len_t record_size_t::get_raw_mdlength() const
+{
+ return plain_mdlength +
+ sizeof(checksum_t) +
+ ceph::encoded_sizeof_bounded<record_header_t>();
+}
+
+void record_group_size_t::account(
+ const record_size_t& rsize,
+ extent_len_t _block_size)
+{
+ assert(_block_size > 0);
+ assert(rsize.dlength % _block_size == 0);
+ assert(block_size == 0 || block_size == _block_size);
+ raw_mdlength += rsize.get_raw_mdlength();
+ mdlength += rsize.get_mdlength(_block_size);
+ dlength += rsize.dlength;
+ block_size = _block_size;
}
ceph::bufferlist encode_record(
- record_size_t rsize,
- record_t &&record,
- size_t block_size,
+ record_t&& record,
+ extent_len_t block_size,
const journal_seq_t& committed_to,
segment_nonce_t current_segment_nonce)
{
bufferlist bl;
record_header_t header{
- rsize.mdlength,
- rsize.dlength,
- (uint32_t)record.deltas.size(),
- (uint32_t)record.extents.size(),
+ record.size.get_mdlength(block_size),
+ record.size.dlength,
+ (extent_len_t)record.deltas.size(),
+ (extent_len_t)record.extents.size(),
current_segment_nonce,
committed_to,
data_bl.crc32c(-1)
};
encode(header, bl);
- auto metadata_crc_filler = bl.append_hole(sizeof(uint32_t));
+ auto metadata_crc_filler = bl.append_hole(sizeof(checksum_t));
for (const auto &i: record.extents) {
encode(extent_info_t(i), bl);
for (const auto &i: record.deltas) {
encode(i, bl);
}
- ceph_assert(bl.length() == rsize.raw_mdlength);
+ ceph_assert(bl.length() == record.size.get_raw_mdlength());
- if (bl.length() % block_size != 0) {
- bl.append_zero(
- block_size - (bl.length() % block_size));
+ auto aligned_mdlength = record.size.get_mdlength(block_size);
+ if (bl.length() != aligned_mdlength) {
+ assert(bl.length() < aligned_mdlength);
+ bl.append_zero(aligned_mdlength - bl.length());
}
- ceph_assert(bl.length() == rsize.mdlength);
auto bliter = bl.cbegin();
auto metadata_crc = bliter.crc32c(
ceph::encoded_sizeof_bounded<record_header_t>(),
-1);
- bliter += sizeof(checksum_t); /* crc hole again */
+ bliter += sizeof(checksum_t); /* metadata crc hole */
metadata_crc = bliter.crc32c(
bliter.get_remaining(),
metadata_crc);
reinterpret_cast<const char *>(&metadata_crc_le));
bl.claim_append(data_bl);
- ceph_assert(bl.length() == (rsize.dlength + rsize.mdlength));
+ ceph_assert(bl.length() == (record.size.get_encoded_length(block_size)));
+
+ return bl;
+}
+
+ceph::bufferlist encode_records(
+ record_group_t& record_group,
+ const journal_seq_t& committed_to,
+ segment_nonce_t current_segment_nonce)
+{
+ assert(record_group.size.block_size > 0);
+ assert(record_group.records.size() > 0);
+
+ bufferlist bl;
+ for (auto& r: record_group.records) {
+ bl.claim_append(
+ encode_record(
+ std::move(r),
+ record_group.size.block_size,
+ committed_to,
+ current_segment_nonce));
+ }
+ ceph_assert(bl.length() == record_group.size.get_encoded_length());
+ record_group.clear();
return bl;
}
+std::optional<record_header_t>
+try_decode_record_header(
+ const ceph::bufferlist& header_bl,
+ segment_nonce_t expected_nonce)
+{
+ auto bp = header_bl.cbegin();
+ record_header_t header;
+ try {
+ decode(header, bp);
+ } catch (ceph::buffer::error &e) {
+ logger().debug(
+ "try_decode_record_header: failed, "
+ "cannot decode record_header_t, got {}.",
+ e);
+ return std::nullopt;
+ }
+ if (header.segment_nonce != expected_nonce) {
+ logger().debug(
+ "try_decode_record_header: failed, record_header nonce mismatch, "
+ "read {}, expected {}!",
+ header.segment_nonce,
+ expected_nonce);
+ return std::nullopt;
+ }
+ return header;
+}
+
+bool validate_record_metadata(
+ const ceph::bufferlist& md_bl)
+{
+ auto bliter = md_bl.cbegin();
+ auto test_crc = bliter.crc32c(
+ ceph::encoded_sizeof_bounded<record_header_t>(),
+ -1);
+ ceph_le32 recorded_crc_le;
+ decode(recorded_crc_le, bliter);
+ uint32_t recorded_crc = recorded_crc_le;
+ test_crc = bliter.crc32c(
+ bliter.get_remaining(),
+ test_crc);
+ bool success = (test_crc == recorded_crc);
+ if (!success) {
+ logger().debug("validate_record_metadata: failed, metadata crc mismatch.");
+ }
+ return success;
+}
+
+bool validate_record_data(
+ const record_header_t& header,
+ const ceph::bufferlist& data_bl)
+{
+ bool success = (data_bl.crc32c(-1) == header.data_crc);
+ if (!success) {
+ logger().debug("validate_record_data: failed, data crc mismatch!");
+ }
+ return success;
+}
+
+std::optional<record_extent_infos_t>
+try_decode_extent_info(
+ const record_header_t& header,
+ const ceph::bufferlist& md_bl)
+{
+ auto bliter = md_bl.cbegin();
+ bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+ bliter += sizeof(checksum_t) /* metadata crc hole */;
+
+ record_extent_infos_t record_extent_info;
+ record_extent_info.extent_infos.resize(header.extents);
+ for (auto& i: record_extent_info.extent_infos) {
+ try {
+ decode(i, bliter);
+ } catch (ceph::buffer::error &e) {
+ logger().debug(
+ "try_decode_extent_infos: failed, "
+ "cannot decode extent_info_t, got {}.",
+ e);
+ return std::nullopt;
+ }
+ }
+ return record_extent_info;
+}
+
+std::optional<record_deltas_t>
+try_decode_deltas(
+ const record_header_t& header,
+ const ceph::bufferlist& md_bl)
+{
+ auto bliter = md_bl.cbegin();
+ bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+ bliter += sizeof(checksum_t) /* metadata crc hole */;
+ bliter += header.extents * ceph::encoded_sizeof_bounded<extent_info_t>();
+
+ record_deltas_t record_delta;
+ record_delta.deltas.resize(header.deltas);
+ for (auto& i: record_delta.deltas) {
+ try {
+ decode(i, bliter);
+ } catch (ceph::buffer::error &e) {
+ logger().debug(
+ "try_decode_deltas: failed, "
+ "cannot decode delta_info_t, got {}.",
+ e);
+ return std::nullopt;
+ }
+ }
+ return record_delta;
+}
+
bool can_delay_allocation(device_type_t type) {
// Some types of device may not support delayed allocation, for example PMEM.
return type <= RANDOM_BLOCK;
#include <limits>
#include <numeric>
+#include <optional>
#include <iostream>
+#include <vector>
#include "include/byteorder.h"
#include "include/denc.h"
std::ostream &operator<<(std::ostream &lhs, const delta_info_t &rhs);
-struct record_t {
- std::vector<extent_t> extents;
- std::vector<delta_info_t> deltas;
-
- std::size_t get_raw_data_size() const {
- auto extent_size = std::accumulate(
- extents.begin(), extents.end(), 0,
- [](uint64_t sum, auto& extent) {
- return sum + extent.bl.length();
- }
- );
- auto delta_size = std::accumulate(
- deltas.begin(), deltas.end(), 0,
- [](uint64_t sum, auto& delta) {
- return sum + delta.bl.length();
- }
- );
- return extent_size + delta_size;
- }
-};
-
class object_data_t {
laddr_t reserved_data_base = L_ADDR_NULL;
extent_len_t reserved_data_len = 0;
DENC_FINISH(p);
}
};
+std::ostream &operator<<(std::ostream &out, const extent_info_t &header);
using segment_nonce_t = uint32_t;
};
std::ostream &operator<<(std::ostream &out, const segment_header_t &header);
+struct record_size_t {
+ extent_len_t plain_mdlength = 0; // mdlength without the record header
+ extent_len_t dlength = 0;
+
+ void account_extent(extent_len_t extent_len);
+
+ void account(const extent_t& extent) {
+ account_extent(extent.bl.length());
+ }
+
+ void account(const delta_info_t& delta);
+
+ // TODO: remove
+ extent_len_t get_raw_mdlength() const;
+
+ extent_len_t get_mdlength(extent_len_t block_size) const {
+ assert(block_size > 0);
+ return p2roundup(get_raw_mdlength(), block_size);
+ }
+
+ extent_len_t get_encoded_length(extent_len_t block_size) const {
+ assert(block_size > 0);
+ assert(dlength % block_size == 0);
+ return get_mdlength(block_size) + dlength;
+ }
+};
+
+struct record_t {
+ std::vector<extent_t> extents;
+ std::vector<delta_info_t> deltas;
+ record_size_t size;
+
+ record_t() = default;
+ record_t(std::vector<extent_t>&& _extents,
+ std::vector<delta_info_t>&& _deltas) {
+ for (auto& e: _extents) {
+ push_back(std::move(e));
+ }
+ for (auto& d: _deltas) {
+ push_back(std::move(d));
+ }
+ }
+
+ // the size of extents and delta buffers
+ std::size_t get_raw_data_size() const {
+ auto delta_size = std::accumulate(
+ deltas.begin(), deltas.end(), 0,
+ [](uint64_t sum, auto& delta) {
+ return sum + delta.bl.length();
+ }
+ );
+ return size.dlength + delta_size;
+ }
+
+ void push_back(extent_t&& extent) {
+ size.account(extent);
+ extents.push_back(std::move(extent));
+ }
+
+ void push_back(delta_info_t&& delta) {
+ size.account(delta);
+ deltas.push_back(std::move(delta));
+ }
+};
+
struct record_header_t {
// Fixed portion
extent_len_t mdlength; // block aligned, length of metadata
}
};
-std::ostream &operator<<(std::ostream &out, const extent_info_t &header);
-
-struct record_size_t {
+struct record_group_size_t {
extent_len_t raw_mdlength = 0;
extent_len_t mdlength = 0;
extent_len_t dlength = 0;
+ extent_len_t block_size = 0;
+
+ record_group_size_t() = default;
+ record_group_size_t(
+ const record_size_t& rsize,
+ extent_len_t block_size) {
+ account(rsize, block_size);
+ }
+
+ extent_len_t get_raw_mdlength() const {
+ assert(block_size > 0);
+ return raw_mdlength;
+ }
+
+ extent_len_t get_mdlength() const {
+ assert(block_size > 0);
+ assert(mdlength % block_size == 0);
+ return mdlength;
+ }
+
+ extent_len_t get_encoded_length() const {
+ assert(block_size > 0);
+ assert(dlength % block_size == 0);
+ return get_mdlength() + dlength;
+ }
+
+ extent_len_t get_encoded_length_after(
+ const record_size_t& rsize,
+ extent_len_t block_size) const {
+ record_group_size_t tmp = *this;
+ tmp.account(rsize, block_size);
+ return tmp.get_encoded_length();
+ }
+
+ void account(const record_size_t& rsize,
+ extent_len_t block_size);
};
-extent_len_t get_encoded_record_raw_mdlength(
- const record_t &record,
- size_t block_size);
+struct record_group_t {
+ std::vector<record_t> records;
+ record_group_size_t size;
-/**
- * Return <mdlength, dlength> pair denoting length of
- * metadata and blocks respectively.
- */
-record_size_t get_encoded_record_length(
- const record_t &record,
- size_t block_size);
+ record_group_t() = default;
+ record_group_t(
+ record_t&& record,
+ extent_len_t block_size) {
+ push_back(std::move(record), block_size);
+ }
+
+ std::size_t get_size() const {
+ return records.size();
+ }
+
+ void push_back(
+ record_t&& record,
+ extent_len_t block_size) {
+ size.account(record.size, block_size);
+ records.push_back(std::move(record));
+ assert(size.get_encoded_length() < MAX_SEG_OFF);
+ }
+
+ void reserve(std::size_t limit) {
+ records.reserve(limit);
+ }
+
+ void clear() {
+ records.clear();
+ size = {};
+ }
+};
ceph::bufferlist encode_record(
- record_size_t rsize,
- record_t &&record,
- size_t block_size,
+ record_t&& record,
+ extent_len_t block_size,
+ const journal_seq_t& committed_to,
+ segment_nonce_t current_segment_nonce);
+
+ceph::bufferlist encode_records(
+ record_group_t& record_group,
const journal_seq_t& committed_to,
- segment_nonce_t current_segment_nonce = 0);
+ segment_nonce_t current_segment_nonce);
+
+std::optional<record_header_t>
+try_decode_record_header(
+ const ceph::bufferlist& header_bl,
+ segment_nonce_t expected_nonce);
+
+bool validate_record_metadata(
+ const ceph::bufferlist& md_bl);
+
+bool validate_record_data(
+ const record_header_t& header,
+ const ceph::bufferlist& data_bl);
+
+struct record_extent_infos_t {
+ std::vector<extent_info_t> extent_infos;
+};
+std::optional<record_extent_infos_t>
+try_decode_extent_info(
+ const record_header_t& header,
+ const ceph::bufferlist& md_bl);
+
+struct record_deltas_t {
+ std::vector<delta_info_t> deltas;
+};
+std::optional<record_deltas_t>
+try_decode_deltas(
+ const record_header_t& header,
+ const ceph::bufferlist& md_bl);
struct write_result_t {
journal_seq_t start_seq;