laddr_t node_laddr) override {
LOG_PREFIX(OTree::Extent::Replay);
assert(is_empty());
- node_stage_t stage(reinterpret_cast<const FieldType*>(node.get_read()));
+ node_stage_t stage(reinterpret_cast<const FieldType*>(node.get_read()),
+ node.get_length());
node_delta_op_t op;
try {
ceph::decode(op, delta);
}
case node_delta_op_t::SPLIT: {
DEBUG("decoding SPLIT ...");
- auto split_at = StagedIterator::decode(stage.p_start(), delta);
+ auto split_at = StagedIterator::decode(
+ node.get_read(), node.get_length(), delta);
DEBUG("apply split_at={} ...", split_at);
layout_t::split(node, stage, split_at);
break;
}
case node_delta_op_t::SPLIT_INSERT: {
DEBUG("decoding SPLIT_INSERT ...");
- auto split_at = StagedIterator::decode(stage.p_start(), delta);
+ auto split_at = StagedIterator::decode(
+ node.get_read(), node.get_length(), delta);
auto key = key_hobj_t::decode(delta);
auto value = decode_value(delta);
auto insert_pos = position_t::decode(delta);
NodeExtentAccessorT(NodeExtentRef extent)
: extent{extent},
- node_stage{reinterpret_cast<const FieldType*>(extent->get_read())} {
+ node_stage{reinterpret_cast<const FieldType*>(extent->get_read()),
+ extent->get_length()} {
if (extent->is_initial_pending()) {
state = nextent_state_t::FRESH;
mut.emplace(extent->get_mutable());
extent = extent->mutate(c, std::move(ref_recorder));
state = nextent_state_t::MUTATION_PENDING;
assert(extent->is_mutation_pending());
- node_stage = node_stage_t(
- reinterpret_cast<const FieldType*>(extent->get_read()));
+ node_stage = node_stage_t(reinterpret_cast<const FieldType*>(extent->get_read()),
+ extent->get_length());
assert(recorder == static_cast<recorder_t*>(extent->get_recorder()));
mut.emplace(extent->get_mutable());
}
NodeExtentRef to_discard = extent;
extent = fresh_extent;
- node_stage = node_stage_t(
- reinterpret_cast<const FieldType*>(extent->get_read()));
+ node_stage = node_stage_t(reinterpret_cast<const FieldType*>(extent->get_read()),
+ extent->get_length());
state = nextent_state_t::FRESH;
mut.emplace(fresh_mut);
recorder = nullptr;
STAGE_T::template get_largest_slot<true, false, false>(
extent.read(), &cast_down_fill_0<STAGE>(last_pos), nullptr, nullptr);
} else {
- node_stage_t right_stage{reinterpret_cast<FieldType*>(right_mut.get_write())};
+ node_stage_t right_stage{reinterpret_cast<FieldType*>(right_mut.get_write()),
+ right_mut.get_length()};
STAGE_T::template get_largest_slot<true, false, false>(
right_stage, &cast_down_fill_0<STAGE>(last_pos), nullptr, nullptr);
}
}
static item_iterator_t decode(const char* p_node_start,
+ extent_len_t node_size,
ceph::bufferlist::const_iterator& delta) {
node_offset_t start_offset;
ceph::decode(start_offset, delta);
// TODO: remove
node_extent_t() = default;
- node_extent_t(const FieldType* p_fields) : p_fields{p_fields} {
+ node_extent_t(const FieldType* p_fields, extent_len_t node_size)
+ : p_fields{p_fields}, node_size{node_size} {
+ assert(node_size <= MAX_NODE_SIZE);
+ assert(node_size % DISK_BLOCK_SIZE == 0);
validate(*p_fields);
}
}
static node_extent_t decode(const char* p_node_start,
+ extent_len_t node_size,
ceph::bufferlist::const_iterator& delta) {
// nothing to decode
- return node_extent_t(reinterpret_cast<const FieldType*>(p_node_start));
+ return node_extent_t(
+ reinterpret_cast<const FieldType*>(p_node_start),
+ node_size);
}
static void validate(const FieldType& fields) {
private:
const FieldType& fields() const { return *p_fields; }
const FieldType* p_fields;
+ extent_len_t node_size;
};
template <typename FieldType, node_type_t NODE_TYPE>
* (!IS_BOTTOM) size_to_nxt_at(index_t) const -> node_offset_t
* (!IS_BOTTOM) get_nxt_container(index_t) const
* encode(p_node_start, encoded)
- * decode(p_node_start, delta) -> container_t
+ * decode(p_node_start, node_size, delta) -> container_t
* static:
* header_size() -> node_offset_t
* estimate_insert(key, value) -> node_offset_t
}
static me_t decode(const char* p_node_start,
+ extent_len_t node_size,
ceph::bufferlist::const_iterator& delta) {
- auto container = container_t::decode(p_node_start, delta);
+ auto container = container_t::decode(
+ p_node_start, node_size, delta);
auto ret = me_t(container);
index_t index;
ceph::decode(index, delta);
* get_nxt_container() const
* has_next() const -> bool
* encode(p_node_start, encoded)
- * decode(p_node_start, delta) -> container_t
+ * decode(p_node_start, node_length, delta) -> container_t
* operator++()
* static:
* header_size() -> node_offset_t
}
static me_t decode(const char* p_node_start,
+ extent_len_t node_size,
ceph::bufferlist::const_iterator& delta) {
- auto container = container_t::decode(p_node_start, delta);
+ auto container = container_t::decode(
+ p_node_start, node_size, delta);
auto ret = me_t(container);
uint8_t is_end;
ceph::decode(is_end, delta);
* (!IS_BOTTOM)get_appender_opened(p_mut) -> Appender
* denc:
* encode(p_node_start, encoded)
- * decode(p_node_start, delta) -> iterator_t
+ * decode(p_node_start, node_size, delta) -> iterator_t
* static:
* header_size() -> node_offset_t
* estimate_insert(key, value) -> node_offset_t
}
}
static StagedIterator decode(const char* p_node_start,
+ extent_len_t node_size,
ceph::bufferlist::const_iterator& delta) {
StagedIterator ret;
uint8_t present;
ceph::decode(present, delta);
if (present) {
- ret.iter = iterator_t::decode(p_node_start, delta);
+ ret.iter = iterator_t::decode(
+ p_node_start, node_size, delta);
if constexpr (!IS_BOTTOM) {
- ret._nxt = NXT_STAGE_T::StagedIterator::decode(p_node_start, delta);
+ ret._nxt = NXT_STAGE_T::StagedIterator::decode(
+ p_node_start, node_size, delta);
}
}
return ret;
}
static internal_sub_items_t decode(
- const char* p_node_start, ceph::bufferlist::const_iterator& delta) {
+ const char* p_node_start,
+ extent_len_t node_size,
+ ceph::bufferlist::const_iterator& delta) {
node_offset_t start_offset;
ceph::decode(start_offset, delta);
node_offset_t end_offset;
}
static leaf_sub_items_t decode(
- const char* p_node_start, ceph::bufferlist::const_iterator& delta) {
+ const char* p_node_start,
+ extent_len_t node_size,
+ ceph::bufferlist::const_iterator& delta) {
node_offset_t start_offset;
ceph::decode(start_offset, delta);
node_offset_t end_offset;