record_size_t rsize,
record_t &&record)
{
- bufferlist metadatabl;
+ bufferlist data_bl;
+ for (auto &i: record.extents) {
+ data_bl.append(i.bl);
+ }
+
+ bufferlist bl;
record_header_t header{
rsize.mdlength,
rsize.dlength,
- 0 /* checksum, TODO */,
- record.deltas.size(),
- record.extents.size(),
- current_segment_nonce
+ (uint32_t)record.deltas.size(),
+ (uint32_t)record.extents.size(),
+ current_segment_nonce,
+ data_bl.crc32c(-1)
};
- encode(header, metadatabl);
+ encode(header, bl);
+
+ auto metadata_crc_filler = bl.append_hole(sizeof(uint32_t));
+
for (const auto &i: record.extents) {
- encode(extent_info_t(i), metadatabl);
+ encode(extent_info_t(i), bl);
}
for (const auto &i: record.deltas) {
- encode(i, metadatabl);
+ encode(i, bl);
}
- bufferlist databl;
- for (auto &i: record.extents) {
- databl.claim_append(i.bl);
- }
- if (metadatabl.length() % block_size != 0) {
- metadatabl.append(
- ceph::bufferptr(
- block_size - (metadatabl.length() % block_size)));
+ if (bl.length() % block_size != 0) {
+ bl.append_zero(
+ block_size - (bl.length() % block_size));
}
+ ceph_assert(bl.length() == rsize.mdlength);
+
- ceph_assert(metadatabl.length() == rsize.mdlength);
- ceph_assert(databl.length() == rsize.dlength);
- metadatabl.claim_append(databl);
- ceph_assert(metadatabl.length() == (rsize.mdlength + rsize.dlength));
- return metadatabl;
+ auto bliter = bl.cbegin();
+ auto metadata_crc = bliter.crc32c(
+ ceph::encoded_sizeof_bounded<record_header_t>(),
+ -1);
+ bliter += sizeof(checksum_t); /* crc hole again */
+ metadata_crc = bliter.crc32c(
+ bliter.get_remaining(),
+ metadata_crc);
+ ceph_le32 metadata_crc_le;
+ metadata_crc_le = metadata_crc;
+ metadata_crc_filler.copy_in(
+ sizeof(checksum_t),
+ reinterpret_cast<const char *>(&metadata_crc_le));
+
+ bl.claim_append(data_bl);
+ ceph_assert(bl.length() == (rsize.dlength + rsize.mdlength));
+
+ return bl;
+}
+
+bool Journal::validate_metadata(const bufferlist &bl)
+{
+ auto bliter = bl.cbegin();
+ auto test_crc = bliter.crc32c(
+ ceph::encoded_sizeof_bounded<record_header_t>(),
+ -1);
+ ceph_le32 recorded_crc_le;
+ ::decode(recorded_crc_le, bliter);
+ uint32_t recorded_crc = recorded_crc_le;
+ test_crc = bliter.crc32c(
+ bliter.get_remaining(),
+ test_crc);
+ return test_crc == recorded_crc;
}
Journal::write_record_ret Journal::write_record(
const record_t &record) const {
extent_len_t metadata =
(extent_len_t)ceph::encoded_sizeof_bounded<record_header_t>();
+ metadata += sizeof(checksum_t) /* crc */;
metadata += record.extents.size() *
ceph::encoded_sizeof_bounded<extent_info_t>();
extent_len_t data = 0;
{
auto bliter = bl.cbegin();
bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+ bliter += sizeof(checksum_t) /* crc */;
bliter += header.extents * ceph::encoded_sizeof_bounded<extent_info_t>();
logger().debug("{}: decoding {} deltas", __func__, header.deltas);
std::vector<delta_info_t> deltas(header.deltas);
{
auto bliter = bl.cbegin();
bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+ bliter += sizeof(checksum_t) /* crc */;
logger().debug("{}: decoding {} extents", __func__, header.extents);
std::vector<extent_info_t> extent_infos(header.extents);
for (auto &&i : extent_infos) {
// Fixed portion
extent_len_t mdlength; // block aligned, length of metadata
extent_len_t dlength; // block aligned, length of data
- checksum_t full_checksum; // checksum for full record (TODO)
size_t deltas; // number of deltas
size_t extents; // number of extents
segment_nonce_t segment_nonce;// nonce of containing segment
+ checksum_t data_crc; // crc of data payload
+
DENC(record_header_t, v, p) {
DENC_START(1, 1, p);
denc(v.mdlength, p);
denc(v.dlength, p);
- denc(v.full_checksum, p);
denc(v.deltas, p);
denc(v.extents, p);
denc(v.segment_nonce, p);
+ denc(v.data_crc, p);
DENC_FINISH(p);
}
};
record_size_t rsize,
record_t &&record);
+ /// validate metadata
+ static bool validate_metadata(const bufferlist &bl);
+
/// do record write
using write_record_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;