auto ret = i->duplicate_for_write();
ret->version++;
+ ret->last_committed_crc = i->last_committed_crc;
ret->state = CachedExtent::extent_state_t::MUTATION_PENDING;
if (ret->get_type() == extent_types_t::ROOT) {
add_extent(i);
i->prepare_write();
i->set_io_wait();
+ assert(i->get_version() > 0);
+ auto final_crc = i->get_crc32c();
record.deltas.push_back(
delta_info_t{
i->get_type(),
i->get_paddr(),
+ i->last_committed_crc,
+ final_crc,
(segment_off_t)i->get_length(),
- i->get_version(),
+ i->get_version() - 1,
i->get_delta()
});
+ i->last_committed_crc = final_crc;
}
record.extents.reserve(t.fresh_block_list.size());
i->get_paddr(),
0,
0,
+ 0,
+ 0,
bufferlist()
});
}
i->set_paddr(cur);
cur.offset += i->get_length();
i->state = CachedExtent::extent_state_t::CLEAN;
+ i->last_committed_crc = i->get_crc32c();
logger().debug("complete_commit: fresh {}", *i);
i->on_initial_write();
add_extent(i);
? record_base.add_record_relative(delta.paddr)
: delta.paddr;
logger().debug("replay_delta: found root addr {}", root_location);
+ root->apply_delta_and_adjust_crc(record_base, delta.bl);
return get_extent<RootBlock>(
root_location,
RootBlock::SIZE
ref->set_io_wait();
ref->set_paddr(offset);
ref->state = CachedExtent::extent_state_t::CLEAN;
+
+ /* TODO: crc should be checked against LBA manager */
+ ref->last_committed_crc = ref->get_crc32c();
+
return segment_manager.read(
offset,
length,
} state = extent_state_t::INVALID;
friend std::ostream &operator<<(std::ostream &, extent_state_t);
+ uint32_t last_committed_crc = 0;
public:
/**
* duplicate_for_write
virtual ceph::bufferlist get_delta() = 0;
/**
+ * apply_delta
+ *
* bl is a delta obtained previously from get_delta. The versions will
* match. Implementation should mutate buffer based on bl. base matches
* the address passed on_delta_write.
+ *
+ * Implementation *must* use set_last_committed_crc to update the crc to
+ * what the crc of the buffer would have been at submission. For physical
+ * extents that use base to adjust internal record-relative deltas, this
+ * means that the crc should be of the buffer after applying the delta,
+ * but before that adjustment. We do it this way because the crc in the
+ * commit path does not yet know the record base address.
+ *
+ * LogicalCachedExtent overrides this method and provides a simpler
+ * apply_delta override for LogicalCachedExtent implementers.
*/
- virtual void apply_delta(paddr_t base, const ceph::bufferlist &bl) = 0;
+ virtual void apply_delta_and_adjust_crc(
+ paddr_t base, const ceph::bufferlist &bl) = 0;
/**
* Called on dirty CachedExtent implementation after replay.
}
/// Returns crc32c of buffer
- uint32_t get_crc32c(uint32_t crc) {
+ uint32_t get_crc32c(uint32_t crc=1) {
return ceph_crc32c(
crc,
reinterpret_cast<const unsigned char *>(get_bptr().c_str()),
return new T(std::move(ptr));
}
+ /// Sets last_committed_crc
+ void set_last_committed_crc(uint32_t crc) {
+ last_committed_crc = crc;
+ }
+
void set_paddr(paddr_t offset) { poffset = offset; }
/**
return ceph::bufferlist();
}
- void apply_delta(paddr_t delta_base, const ceph::bufferlist &bl) final {
+ void apply_delta_and_adjust_crc(
+ paddr_t base, const ceph::bufferlist &_bl) final {
ceph_assert(0 == "TODO");
}
return ceph::bufferlist();
}
- void apply_delta(paddr_t delta_base, const ceph::bufferlist &bl) final {
+ void apply_delta_and_adjust_crc(
+ paddr_t base, const ceph::bufferlist &_bl) final {
ceph_assert(0 == "TODO");
}
apply_pending_changes(true);
}
-void OnodeBlock::apply_delta(paddr_t, const ceph::bufferlist &bl)
+void OnodeBlock::apply_delta(const ceph::bufferlist &bl)
{
assert(deltas.empty());
ceph::bufferlist get_delta() final;
void on_initial_write() final;
void on_delta_write(paddr_t record_block_offset) final;
- void apply_delta(paddr_t base, const ceph::bufferlist &bl) final;
+ void apply_delta(const ceph::bufferlist &bl) final;
void sync() {
apply_pending_changes(false);
return ceph::bufferlist();
}
- void apply_delta(paddr_t base, const ceph::bufferlist &bl) final {
+ void apply_delta_and_adjust_crc(paddr_t base, const ceph::bufferlist &_bl) final {
ceph_assert(0 == "TODO");
}
paddr_t paddr; ///< physical address
/* logical address -- needed for repopulating cache -- TODO don't actually need */
// laddr_t laddr = L_ADDR_NULL;
+ uint32_t prev_crc;
+ uint32_t final_crc;
segment_off_t length = NULL_SEG_OFF; ///< extent length
extent_version_t pversion; ///< prior version
ceph::bufferlist bl; ///< payload
denc(v.type, p);
denc(v.paddr, p);
//denc(v.laddr, p);
+ denc(v.prev_crc, p);
+ denc(v.final_crc, p);
denc(v.length, p);
denc(v.pversion, p);
denc(v.bl, p);
return (
type == rhs.type &&
paddr == rhs.paddr &&
+ prev_crc == rhs.prev_crc &&
+ final_crc == rhs.final_crc &&
length == rhs.length &&
pversion == rhs.pversion &&
bl == rhs.bl
return pin->get_laddr();
}
+ void apply_delta_and_adjust_crc(
+ paddr_t base, const ceph::bufferlist &bl) final {
+ apply_delta(bl);
+ set_last_committed_crc(get_crc32c());
+ }
+protected:
+ virtual void apply_delta(const ceph::bufferlist &bl) = 0;
+
private:
LBAPinRef pin;
};
return { get_length(), get_crc32c(1) };
}
- void apply_delta(paddr_t delta_base, const ceph::bufferlist &bl) final {
+ void apply_delta(const ceph::bufferlist &bl) final {
ceph_assert(0 == "TODO");
}
};
return delta_info_t{
extent_types_t::TEST_BLOCK,
paddr_t{},
+ 0, 0,
block_size,
1,
bl