record_header_fullness.ool_stats.filled_bytes += ool_stats.header_raw_bytes;
record_header_fullness.ool_stats.total_bytes += ool_stats.header_bytes;
+ // TODO: move to Journal to get accurate result
auto record_size = record_group_size_t(
record.size, reader.get_block_size());
auto inline_overhead =
).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
auto segment_nonce = segment_header.segment_nonce;
return seastar::do_with(
- found_record_handler_t([extents, this](
+ found_record_handler_t([extents](
record_locator_t locator,
- const record_header_t& header,
+ const record_group_header_t& header,
const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
{
- auto maybe_record_extent_infos = try_decode_extent_info(header, mdbuf);
+ logger().debug("ExtentReader::scan_extents: decoding {} records",
+ header.records);
+ auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf);
if (!maybe_record_extent_infos) {
// This should be impossible, we did check the crc on the mdbuf
logger().error(
}
paddr_t extent_offset = locator.record_block_base;
- logger().debug("ExtentReader::scan_extents: decoded {} extents",
- maybe_record_extent_infos->extent_infos.size());
- for (const auto &i : maybe_record_extent_infos->extent_infos) {
- extents->emplace_back(extent_offset, i);
- auto& seg_addr = extent_offset.as_seg_paddr();
- seg_addr.set_segment_off(
- seg_addr.get_segment_off() + i.len);
+ for (auto& r: *maybe_record_extent_infos) {
+ logger().debug("ExtentReader::scan_extents: decoded {} extents",
+ r.extent_infos.size());
+ for (const auto &i : r.extent_infos) {
+ extents->emplace_back(extent_offset, i);
+ auto& seg_addr = extent_offset.as_seg_paddr();
+ seg_addr.set_segment_off(
+ seg_addr.get_segment_off() + i.len);
+ }
}
return scan_extents_ertr::now();
}),
- [=, &cursor](auto &dhandler) {
+ [bytes_to_read, segment_nonce, &cursor, this](auto &dhandler) {
return scan_valid_records(
cursor,
segment_nonce,
ceph_assert(cursor.last_committed == journal_seq_t() ||
cursor.last_committed <= new_committed_to);
cursor.last_committed = new_committed_to;
- cursor.pending_records.emplace_back(
+ cursor.pending_record_groups.emplace_back(
cursor.seq.offset,
header,
std::move(md_bl));
[=, &budget_used, &cursor, &handler] {
logger().debug(
"ExtentReader::scan_valid_records: valid record read, processing queue");
- if (cursor.pending_records.empty()) {
+ if (cursor.pending_record_groups.empty()) {
/* This is only possible if the segment is empty.
* A record's last_commited must be prior to its own
* location since it itself cannot yet have been committed
return scan_valid_records_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
}
- auto &next = cursor.pending_records.front();
+ auto &next = cursor.pending_record_groups.front();
journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset};
if (cursor.last_committed == journal_seq_t() ||
next_seq > cursor.last_committed) {
});
});
} else {
- assert(!cursor.pending_records.empty());
- auto &next = cursor.pending_records.front();
+ assert(!cursor.pending_record_groups.empty());
+ auto &next = cursor.pending_record_groups.front();
return read_validate_data(next.offset, next.header
).safe_then([this, &budget_used, &cursor, &handler](auto valid) {
if (!valid) {
- cursor.pending_records.clear();
+ cursor.pending_record_groups.clear();
return scan_valid_records_ertr::now();
}
return consume_next_records(cursor, handler, budget_used);
segment_manager.get_block_size());
bufferlist bl;
bl.append(bptr);
- auto maybe_header = try_decode_record_header(bl, nonce);
+ auto maybe_header = try_decode_records_header(bl, nonce);
if (!maybe_header.has_value()) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::make_pair(std::move(header), std::move(bl)));
});
}).safe_then([](auto p) {
- if (p && validate_record_metadata(p->second)) {
+ if (p && validate_records_metadata(p->second)) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::move(*p)
ExtentReader::read_validate_data_ret
ExtentReader::read_validate_data(
paddr_t record_base,
- const record_header_t &header)
+ const record_group_header_t &header)
{
auto& segment_manager = *segment_managers[record_base.get_device_id()];
auto data_addr = record_base.add_offset(header.mdlength);
).safe_then([=, &header](auto bptr) {
bufferlist bl;
bl.append(bptr);
- return validate_record_data(header, bl);
+ return validate_records_data(header, bl);
});
}
found_record_handler_t& handler,
std::size_t& budget_used)
{
- auto& next = cursor.pending_records.front();
+ auto& next = cursor.pending_record_groups.front();
auto total_length = next.header.dlength + next.header.mdlength;
budget_used += total_length;
auto locator = record_locator_t{
next.header,
next.mdbuffer
).safe_then([&cursor] {
- cursor.pending_records.pop_front();
+ cursor.pending_record_groups.pop_front();
});
}
record_locator_t record_locator,
// callee may assume header and bl will remain valid until
// returned future resolves
- const record_header_t &header,
+ const record_group_header_t &header,
const bufferlist &mdbuf)>;
scan_valid_records_ret scan_valid_records(
scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
using read_validate_record_metadata_ertr = read_ertr;
using read_validate_record_metadata_ret =
read_validate_record_metadata_ertr::future<
- std::optional<std::pair<record_header_t, bufferlist>>
+ std::optional<std::pair<record_group_header_t, bufferlist>>
>;
read_validate_record_metadata_ret read_validate_record_metadata(
paddr_t start,
using read_validate_data_ret = read_validate_data_ertr::future<bool>;
read_validate_data_ret read_validate_data(
paddr_t record_base,
- const record_header_t &header ///< caller must ensure lifetime through
- /// future resolution
+ const record_group_header_t &header ///< caller must ensure lifetime through
+ /// future resolution
);
using consume_record_group_ertr = scan_valid_records_ertr;
scan_valid_records_cursor(seq),
ExtentReader::found_record_handler_t([=, &handler](
record_locator_t locator,
- const record_header_t& header,
+ const record_group_header_t& header,
const bufferlist& mdbuf)
-> ExtentReader::scan_valid_records_ertr::future<>
{
+ logger().debug("Journal::replay_segment: decoding {} records",
+ header.records);
auto maybe_record_deltas_list = try_decode_deltas(
- header, mdbuf);
+ header, mdbuf, locator.record_block_base);
if (!maybe_record_deltas_list) {
// This should be impossible, we did check the crc on the mdbuf
logger().error(
return seastar::do_with(
std::move(*maybe_record_deltas_list),
- [locator,
+ [write_result=locator.write_result,
this,
- &handler](record_deltas_t& record_deltas)
+ &handler](auto& record_deltas_list)
{
- logger().debug("Journal::replay_segment: decoded {} deltas at block_base {}",
- record_deltas.deltas.size(),
- locator.record_block_base);
return crimson::do_for_each(
- record_deltas.deltas,
- [locator,
+ record_deltas_list,
+ [write_result,
this,
- &handler](delta_info_t& delta)
+ &handler](record_deltas_t& record_deltas)
{
- /* The journal may validly contain deltas for extents in
- * since released segments. We can detect those cases by
- * checking whether the segment in question currently has a
- * sequence number > the current journal segment seq. We can
- * safetly skip these deltas because the extent must already
- * have been rewritten.
- *
- * Note, this comparison exploits the fact that
- * SEGMENT_SEQ_NULL is a large number.
- */
- auto& seg_addr = delta.paddr.as_seg_paddr();
- if (delta.paddr != P_ADDR_NULL &&
- (segment_provider->get_seq(seg_addr.get_segment_id()) >
- locator.write_result.start_seq.segment_seq)) {
- return replay_ertr::now();
- } else {
- return handler(locator, delta);
- }
+ logger().debug("Journal::replay_segment: decoded {} deltas at block_base {}",
+ record_deltas.deltas.size(),
+ record_deltas.record_block_base);
+ auto locator = record_locator_t{
+ record_deltas.record_block_base,
+ write_result
+ };
+ return crimson::do_for_each(
+ record_deltas.deltas,
+ [locator,
+ this,
+ &handler](delta_info_t& delta)
+ {
+ /* The journal may validly contain deltas for extents in
+ * since released segments. We can detect those cases by
+ * checking whether the segment in question currently has a
+ * sequence number > the current journal segment seq. We can
+ * safetly skip these deltas because the extent must already
+ * have been rewritten.
+ *
+ * Note, this comparison exploits the fact that
+ * SEGMENT_SEQ_NULL is a large number.
+ */
+ auto& seg_addr = delta.paddr.as_seg_paddr();
+ if (delta.paddr != P_ADDR_NULL &&
+ (segment_provider->get_seq(seg_addr.get_segment_id()) >
+ locator.write_result.start_seq.segment_seq)) {
+ return replay_ertr::now();
+ } else {
+ return handler(locator, delta);
+ }
+ });
});
});
}),
assert(state != state_t::SUBMITTING);
assert(can_batch(record, block_size) == new_encoded_length);
- auto block_start_offset = record.size.get_mdlength(block_size);
- if (state != state_t::EMPTY) {
- block_start_offset += pending.size.get_encoded_length();
- }
+ auto dlength_offset = pending.current_dlength;
pending.push_back(
std::move(record), block_size);
assert(pending.size.get_encoded_length() == new_encoded_length);
if (state == state_t::EMPTY) {
assert(!io_promise.has_value());
- io_promise = seastar::shared_promise<maybe_result_t>();
+ io_promise = seastar::shared_promise<maybe_promise_result_t>();
} else {
assert(io_promise.has_value());
}
state = state_t::PENDING;
return io_promise->get_shared_future(
- ).then([block_start_offset
- ](auto maybe_write_result) -> add_pending_ret {
- if (!maybe_write_result.has_value()) {
+ ).then([dlength_offset
+ ](auto maybe_promise_result) -> add_pending_ret {
+ if (!maybe_promise_result.has_value()) {
return crimson::ct_error::input_output_error::make();
}
+ auto write_result = maybe_promise_result->write_result;
auto submit_result = record_locator_t{
- maybe_write_result->start_seq.offset.add_offset(block_start_offset),
- *maybe_write_result
+ write_result.start_seq.offset.add_offset(
+ maybe_promise_result->mdlength + dlength_offset),
+ write_result
};
return add_pending_ret(
add_pending_ertr::ready_future_marker{},
state = state_t::SUBMITTING;
submitting_size = pending.get_size();
submitting_length = pending.size.get_encoded_length();
+ submitting_mdlength = pending.size.get_mdlength();
auto bl = encode_records(pending, committed_to, segment_nonce);
// Note: pending is cleared here
assert(bl.length() == (std::size_t)submitting_length);
void Journal::RecordBatch::set_result(
maybe_result_t maybe_write_result)
{
+ maybe_promise_result_t result;
if (maybe_write_result.has_value()) {
logger().debug(
"Journal::RecordBatch::set_result: batches={}, write_start {} + {}",
maybe_write_result->start_seq,
maybe_write_result->length);
assert(maybe_write_result->length == submitting_length);
+ result = promise_result_t{
+ *maybe_write_result,
+ submitting_mdlength
+ };
} else {
logger().error(
"Journal::RecordBatch::set_result: batches={}, write is failed!",
state = state_t::EMPTY;
submitting_size = 0;
submitting_length = 0;
- io_promise->set_value(maybe_write_result);
+ submitting_mdlength = 0;
+ io_promise->set_value(result);
io_promise.reset();
}
record_group_t pending;
std::size_t submitting_size = 0;
segment_off_t submitting_length = 0;
+ segment_off_t submitting_mdlength = 0;
- std::optional<seastar::shared_promise<maybe_result_t> > io_promise;
+ struct promise_result_t {
+ write_result_t write_result;
+ segment_off_t mdlength;
+ };
+ using maybe_promise_result_t = std::optional<promise_result_t>;
+ std::optional<seastar::shared_promise<maybe_promise_result_t> > io_promise;
};
class RecordSubmitter {
plain_mdlength += ceph::encoded_sizeof(delta);
}
-extent_len_t record_size_t::get_raw_mdlength() const
+extent_len_t record_group_size_t::get_raw_mdlength() const
{
return plain_mdlength +
sizeof(checksum_t) +
- ceph::encoded_sizeof_bounded<record_header_t>();
+ ceph::encoded_sizeof_bounded<record_group_header_t>();
}
void record_group_size_t::account(
assert(_block_size > 0);
assert(rsize.dlength % _block_size == 0);
assert(block_size == 0 || block_size == _block_size);
- raw_mdlength += rsize.get_raw_mdlength();
- mdlength += rsize.get_mdlength(_block_size);
+ plain_mdlength += (
+ rsize.plain_mdlength +
+ ceph::encoded_sizeof_bounded<record_header_t>()
+ );
dlength += rsize.dlength;
block_size = _block_size;
}
const journal_seq_t& committed_to,
segment_nonce_t current_segment_nonce)
{
+ record_group_t record_group(std::move(record), block_size);
+ return encode_records(
+ record_group,
+ committed_to,
+ current_segment_nonce);
+}
+
+ceph::bufferlist encode_records(
+ record_group_t& record_group,
+ const journal_seq_t& committed_to,
+ segment_nonce_t current_segment_nonce)
+{
+ assert(record_group.size.block_size > 0);
+ assert(record_group.records.size() > 0);
+
bufferlist data_bl;
- for (auto &i: record.extents) {
- data_bl.append(i.bl);
+ for (auto& r: record_group.records) {
+ for (auto& i: r.extents) {
+ assert(i.bl.length());
+ data_bl.append(i.bl);
+ }
}
bufferlist bl;
- record_header_t header{
- record.size.get_mdlength(block_size),
- record.size.dlength,
- (extent_len_t)record.deltas.size(),
- (extent_len_t)record.extents.size(),
+ record_group_header_t header{
+ static_cast<extent_len_t>(record_group.records.size()),
+ record_group.size.get_mdlength(),
+ record_group.size.dlength,
current_segment_nonce,
committed_to,
data_bl.crc32c(-1)
auto metadata_crc_filler = bl.append_hole(sizeof(checksum_t));
- for (const auto &i: record.extents) {
- encode(extent_info_t(i), bl);
+ for (auto& r: record_group.records) {
+ record_header_t rheader{
+ (extent_len_t)r.deltas.size(),
+ (extent_len_t)r.extents.size(),
+ };
+ encode(rheader, bl);
}
- for (const auto &i: record.deltas) {
- encode(i, bl);
+ for (auto& r: record_group.records) {
+ for (const auto& i: r.extents) {
+ encode(extent_info_t(i), bl);
+ }
}
- ceph_assert(bl.length() == record.size.get_raw_mdlength());
+ for (auto& r: record_group.records) {
+ for (const auto& i: r.deltas) {
+ encode(i, bl);
+ }
+ }
+ ceph_assert(bl.length() == record_group.size.get_raw_mdlength());
- auto aligned_mdlength = record.size.get_mdlength(block_size);
+ auto aligned_mdlength = record_group.size.get_mdlength();
if (bl.length() != aligned_mdlength) {
assert(bl.length() < aligned_mdlength);
bl.append_zero(aligned_mdlength - bl.length());
auto bliter = bl.cbegin();
auto metadata_crc = bliter.crc32c(
- ceph::encoded_sizeof_bounded<record_header_t>(),
+ ceph::encoded_sizeof_bounded<record_group_header_t>(),
-1);
bliter += sizeof(checksum_t); /* metadata crc hole */
metadata_crc = bliter.crc32c(
reinterpret_cast<const char *>(&metadata_crc_le));
bl.claim_append(data_bl);
- ceph_assert(bl.length() == (record.size.get_encoded_length(block_size)));
-
- return bl;
-}
-
-ceph::bufferlist encode_records(
- record_group_t& record_group,
- const journal_seq_t& committed_to,
- segment_nonce_t current_segment_nonce)
-{
- assert(record_group.size.block_size > 0);
- assert(record_group.records.size() > 0);
-
- bufferlist bl;
- for (auto& r: record_group.records) {
- bl.claim_append(
- encode_record(
- std::move(r),
- record_group.size.block_size,
- committed_to,
- current_segment_nonce));
- }
ceph_assert(bl.length() == record_group.size.get_encoded_length());
record_group.clear();
return bl;
}
-std::optional<record_header_t>
-try_decode_record_header(
+std::optional<record_group_header_t>
+try_decode_records_header(
const ceph::bufferlist& header_bl,
segment_nonce_t expected_nonce)
{
auto bp = header_bl.cbegin();
- record_header_t header;
+ record_group_header_t header;
try {
decode(header, bp);
} catch (ceph::buffer::error &e) {
logger().debug(
- "try_decode_record_header: failed, "
- "cannot decode record_header_t, got {}.",
+ "try_decode_records_header: failed, "
+ "cannot decode record_group_header_t, got {}.",
e);
return std::nullopt;
}
if (header.segment_nonce != expected_nonce) {
logger().debug(
- "try_decode_record_header: failed, record_header nonce mismatch, "
+ "try_decode_records_header: failed, record_group_header nonce mismatch, "
"read {}, expected {}!",
header.segment_nonce,
expected_nonce);
return header;
}
-bool validate_record_metadata(
+bool validate_records_metadata(
const ceph::bufferlist& md_bl)
{
auto bliter = md_bl.cbegin();
auto test_crc = bliter.crc32c(
- ceph::encoded_sizeof_bounded<record_header_t>(),
+ ceph::encoded_sizeof_bounded<record_group_header_t>(),
-1);
ceph_le32 recorded_crc_le;
decode(recorded_crc_le, bliter);
test_crc);
bool success = (test_crc == recorded_crc);
if (!success) {
- logger().debug("validate_record_metadata: failed, metadata crc mismatch.");
+ logger().debug("validate_records_metadata: failed, metadata crc mismatch.");
}
return success;
}
-bool validate_record_data(
- const record_header_t& header,
+bool validate_records_data(
+ const record_group_header_t& header,
const ceph::bufferlist& data_bl)
{
bool success = (data_bl.crc32c(-1) == header.data_crc);
if (!success) {
- logger().debug("validate_record_data: failed, data crc mismatch!");
+ logger().debug("validate_records_data: failed, data crc mismatch!");
}
return success;
}
-std::optional<record_extent_infos_t>
-try_decode_extent_info(
- const record_header_t& header,
+namespace {
+
+std::optional<std::vector<record_header_t>>
+try_decode_record_headers(
+ const record_group_header_t& header,
const ceph::bufferlist& md_bl)
{
auto bliter = md_bl.cbegin();
- bliter += ceph::encoded_sizeof_bounded<record_header_t>();
- bliter += sizeof(checksum_t) /* metadata crc hole */;
-
- record_extent_infos_t record_extent_info;
- record_extent_info.extent_infos.resize(header.extents);
- for (auto& i: record_extent_info.extent_infos) {
+ bliter += ceph::encoded_sizeof_bounded<record_group_header_t>();
+ bliter += sizeof(checksum_t); /* metadata crc hole */
+ std::vector<record_header_t> record_headers(header.records);
+ for (auto &&i: record_headers) {
try {
decode(i, bliter);
} catch (ceph::buffer::error &e) {
logger().debug(
- "try_decode_extent_infos: failed, "
- "cannot decode extent_info_t, got {}.",
+ "try_decode_record_headers: failed, "
+ "cannot decode record_header_t, got {}.",
e);
return std::nullopt;
}
}
- return record_extent_info;
+ return record_headers;
}
-std::optional<record_deltas_t>
-try_decode_deltas(
- const record_header_t& header,
+}
+
+std::optional<std::vector<record_extent_infos_t> >
+try_decode_extent_infos(
+ const record_group_header_t& header,
const ceph::bufferlist& md_bl)
{
+ auto maybe_headers = try_decode_record_headers(header, md_bl);
+ if (!maybe_headers) {
+ logger().debug(
+ "try_decode_extent_infos: failed, cannot decode record headers.");
+ return std::nullopt;
+ }
+
auto bliter = md_bl.cbegin();
- bliter += ceph::encoded_sizeof_bounded<record_header_t>();
- bliter += sizeof(checksum_t) /* metadata crc hole */;
- bliter += header.extents * ceph::encoded_sizeof_bounded<extent_info_t>();
+ bliter += ceph::encoded_sizeof_bounded<record_group_header_t>();
+ bliter += sizeof(checksum_t); /* metadata crc hole */
+ bliter += (ceph::encoded_sizeof_bounded<record_header_t>() *
+ maybe_headers->size());
+
+ std::vector<record_extent_infos_t> record_extent_infos(
+ maybe_headers->size());
+ auto result_iter = record_extent_infos.begin();
+ for (auto& h: *maybe_headers) {
+ result_iter->header = h;
+ result_iter->extent_infos.resize(h.extents);
+ for (auto& i: result_iter->extent_infos) {
+ try {
+ decode(i, bliter);
+ } catch (ceph::buffer::error &e) {
+ logger().debug(
+ "try_decode_extent_infos: failed, "
+ "cannot decode extent_info_t, got {}.",
+ e);
+ return std::nullopt;
+ }
+ }
+ ++result_iter;
+ }
+ return record_extent_infos;
+}
- record_deltas_t record_delta;
- record_delta.deltas.resize(header.deltas);
- for (auto& i: record_delta.deltas) {
- try {
- decode(i, bliter);
- } catch (ceph::buffer::error &e) {
- logger().debug(
- "try_decode_deltas: failed, "
- "cannot decode delta_info_t, got {}.",
- e);
- return std::nullopt;
+std::optional<std::vector<record_deltas_t> >
+try_decode_deltas(
+ const record_group_header_t& header,
+ const ceph::bufferlist& md_bl,
+ paddr_t record_block_base)
+{
+ auto maybe_record_extent_infos = try_decode_extent_infos(header, md_bl);
+ if (!maybe_record_extent_infos) {
+ logger().debug(
+ "try_decode_deltas: failed, cannot decode extent_infos.");
+ return std::nullopt;
+ }
+
+ auto bliter = md_bl.cbegin();
+ bliter += ceph::encoded_sizeof_bounded<record_group_header_t>();
+ bliter += sizeof(checksum_t); /* metadata crc hole */
+ bliter += (ceph::encoded_sizeof_bounded<record_header_t>() *
+ maybe_record_extent_infos->size());
+ for (auto& r: *maybe_record_extent_infos) {
+ bliter += (ceph::encoded_sizeof_bounded<extent_info_t>() *
+ r.extent_infos.size());
+ }
+
+ std::vector<record_deltas_t> record_deltas(
+ maybe_record_extent_infos->size());
+ auto result_iter = record_deltas.begin();
+ for (auto& r: *maybe_record_extent_infos) {
+ result_iter->record_block_base = record_block_base;
+ result_iter->deltas.resize(r.header.deltas);
+ for (auto& i: result_iter->deltas) {
+ try {
+ decode(i, bliter);
+ } catch (ceph::buffer::error &e) {
+ logger().debug(
+ "try_decode_deltas: failed, "
+ "cannot decode delta_info_t, got {}.",
+ e);
+ return std::nullopt;
+ }
+ }
+ for (auto& i: r.extent_infos) {
+ auto& seg_addr = record_block_base.as_seg_paddr();
+ seg_addr.set_segment_off(seg_addr.get_segment_off() + i.len);
}
+ ++result_iter;
}
- return record_delta;
+ return record_deltas;
}
bool can_delay_allocation(device_type_t type) {
}
void account(const delta_info_t& delta);
-
- // TODO: remove
- extent_len_t get_raw_mdlength() const;
-
- extent_len_t get_mdlength(extent_len_t block_size) const {
- assert(block_size > 0);
- return p2roundup(get_raw_mdlength(), block_size);
- }
-
- extent_len_t get_encoded_length(extent_len_t block_size) const {
- assert(block_size > 0);
- assert(dlength % block_size == 0);
- return get_mdlength(block_size) + dlength;
- }
};
struct record_t {
};
struct record_header_t {
- // Fixed portion
- extent_len_t mdlength; // block aligned, length of metadata
- extent_len_t dlength; // block aligned, length of data
uint32_t deltas; // number of deltas
uint32_t extents; // number of extents
+
+
+ DENC(record_header_t, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.deltas, p);
+ denc(v.extents, p);
+ DENC_FINISH(p);
+ }
+};
+
+struct record_group_header_t {
+ uint32_t records;
+ extent_len_t mdlength; // block aligned, length of metadata
+ extent_len_t dlength; // block aligned, length of data
segment_nonce_t segment_nonce;// nonce of containing segment
journal_seq_t committed_to; // records prior to committed_to have been
// fully written, maybe in another segment.
checksum_t data_crc; // crc of data payload
- DENC(record_header_t, v, p) {
+ DENC(record_group_header_t, v, p) {
DENC_START(1, 1, p);
+ denc(v.records, p);
denc(v.mdlength, p);
denc(v.dlength, p);
- denc(v.deltas, p);
- denc(v.extents, p);
denc(v.segment_nonce, p);
denc(v.committed_to, p);
denc(v.data_crc, p);
};
struct record_group_size_t {
- extent_len_t raw_mdlength = 0;
- extent_len_t mdlength = 0;
+ extent_len_t plain_mdlength = 0; // mdlength without the group header
extent_len_t dlength = 0;
extent_len_t block_size = 0;
account(rsize, block_size);
}
- extent_len_t get_raw_mdlength() const {
- assert(block_size > 0);
- return raw_mdlength;
- }
+ extent_len_t get_raw_mdlength() const;
extent_len_t get_mdlength() const {
assert(block_size > 0);
- assert(mdlength % block_size == 0);
- return mdlength;
+ return p2roundup(get_raw_mdlength(), block_size);
}
extent_len_t get_encoded_length() const {
struct record_group_t {
std::vector<record_t> records;
record_group_size_t size;
+ extent_len_t current_dlength = 0;
record_group_t() = default;
record_group_t(
record_t&& record,
extent_len_t block_size) {
size.account(record.size, block_size);
+ current_dlength += record.size.dlength;
records.push_back(std::move(record));
assert(size.get_encoded_length() < MAX_SEG_OFF);
}
void clear() {
records.clear();
size = {};
+ current_dlength = 0;
}
};
const journal_seq_t& committed_to,
segment_nonce_t current_segment_nonce);
-std::optional<record_header_t>
-try_decode_record_header(
+std::optional<record_group_header_t>
+try_decode_records_header(
const ceph::bufferlist& header_bl,
segment_nonce_t expected_nonce);
-bool validate_record_metadata(
+bool validate_records_metadata(
const ceph::bufferlist& md_bl);
-bool validate_record_data(
- const record_header_t& header,
+bool validate_records_data(
+ const record_group_header_t& header,
const ceph::bufferlist& data_bl);
struct record_extent_infos_t {
+ record_header_t header;
std::vector<extent_info_t> extent_infos;
};
-std::optional<record_extent_infos_t>
-try_decode_extent_info(
- const record_header_t& header,
+std::optional<std::vector<record_extent_infos_t> >
+try_decode_extent_infos(
+ const record_group_header_t& header,
const ceph::bufferlist& md_bl);
struct record_deltas_t {
+ paddr_t record_block_base;
std::vector<delta_info_t> deltas;
};
-std::optional<record_deltas_t>
+std::optional<std::vector<record_deltas_t> >
try_decode_deltas(
- const record_header_t& header,
- const ceph::bufferlist& md_bl);
+ const record_group_header_t& header,
+ const ceph::bufferlist& md_bl,
+ paddr_t record_block_base);
struct write_result_t {
journal_seq_t start_seq;
journal_seq_t seq;
journal_seq_t last_committed;
- struct found_record_t {
+ struct found_record_group_t {
paddr_t offset;
- record_header_t header;
+ record_group_header_t header;
bufferlist mdbuffer;
- found_record_t(
+ found_record_group_t(
paddr_t offset,
- const record_header_t &header,
+ const record_group_header_t &header,
const bufferlist &mdbuffer)
: offset(offset), header(header), mdbuffer(mdbuffer) {}
};
- std::deque<found_record_t> pending_records;
+ std::deque<found_record_group_t> pending_record_groups;
bool is_complete() const {
- return last_valid_header_found && pending_records.empty();
+ return last_valid_header_found && pending_record_groups.empty();
}
segment_id_t get_segment_id() const {
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal_seq_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::delta_info_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::record_header_t)
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::record_group_header_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::extent_info_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::segment_header_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::rbm_alloc_delta_t)