Switch to run-time node_size.
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
// we support up to 64 KiB tree nodes
using node_offset_t = uint16_t;
constexpr node_offset_t DISK_BLOCK_SIZE = 1u << 12;
-constexpr node_offset_t NODE_BLOCK_SIZE = DISK_BLOCK_SIZE * 1u;
constexpr auto MAX_NODE_SIZE =
(extent_len_t)std::numeric_limits<node_offset_t>::max() + 1;
}
template <typename PtrType>
-void reset_ptr(PtrType& ptr, const char* origin_base, const char* new_base) {
+void reset_ptr(PtrType& ptr, const char* origin_base,
+ const char* new_base, extent_len_t node_size) {
assert((const char*)ptr > origin_base);
- assert((const char*)ptr - origin_base < NODE_BLOCK_SIZE);
+ assert((const char*)ptr - origin_base < (int)node_size);
ptr = reinterpret_cast<PtrType>(
(const char*)ptr - origin_base + new_base);
}
key_view = _key_view;
p_value_header = _p_value_header;
assert((const char*)p_value_header > p_node_base);
- assert((const char*)p_value_header - p_node_base < NODE_BLOCK_SIZE);
+ assert((const char*)p_value_header - p_node_base <
+ (int)ref_leaf_node->get_node_size());
value_payload_mut.reset();
p_value_recorder = nullptr;
auto current_p_node_base = ref_leaf_node->read();
assert(current_p_node_base != p_node_base);
+ auto node_size = ref_leaf_node->get_node_size();
version.state = current_version.state;
- reset_ptr(p_value_header, p_node_base, current_p_node_base);
- key_view->reset_to(p_node_base, current_p_node_base);
+ reset_ptr(p_value_header, p_node_base,
+ current_p_node_base, node_size);
+ key_view->reset_to(p_node_base, current_p_node_base, node_size);
value_payload_mut.reset();
p_value_recorder = nullptr;
return impl->read();
}
+extent_len_t LeafNode::get_node_size() const
+{
+ return impl->get_node_size();
+}
+
std::tuple<key_view_t, const value_header_t*>
LeafNode::get_kv(const search_position_t& pos) const
{
bool is_level_tail() const;
node_version_t get_version() const;
const char* read() const;
+ extent_len_t get_node_size() const;
std::tuple<key_view_t, const value_header_t*> get_kv(const search_position_t&) const;
eagain_future<Ref<tree_cursor_t>> get_next_cursor(context_t, const search_position_t&);
void encode_update_child_addr(
const laddr_t new_addr,
const laddr_packed_t* p_addr,
- const char* p_node_start) {
+ const char* p_node_start,
+ extent_len_t node_size) {
ceph::encode(node_delta_op_t::UPDATE_CHILD_ADDR, encoded);
ceph::encode(new_addr, encoded);
int node_offset = reinterpret_cast<const char*>(p_addr) - p_node_start;
- assert(node_offset > 0 && node_offset <= NODE_BLOCK_SIZE);
+ assert(node_offset > 0 && node_offset < (int)node_size);
+ assert(node_offset < (int)MAX_NODE_SIZE);
ceph::encode(static_cast<node_offset_t>(node_offset), encoded);
}
assert(extent->is_pending());
assert(state != nextent_state_t::READ_ONLY);
if (state == nextent_state_t::MUTATION_PENDING) {
- recorder->encode_update_child_addr(new_addr, p_addr, read().p_start());
+ recorder->encode_update_child_addr(
+ new_addr, p_addr, read().p_start(), get_length());
}
#ifndef NDEBUG
test_extent->prepare_replay(extent);
- test_recorder->encode_update_child_addr(new_addr, p_addr, read().p_start());
+ test_recorder->encode_update_child_addr(
+ new_addr, p_addr, read().p_start(), get_length());
#endif
layout_t::update_child_addr(*mut, new_addr, p_addr);
#ifndef NDEBUG
virtual field_type_t field_type() const = 0;
virtual laddr_t laddr() const = 0;
virtual const char* read() const = 0;
+ virtual extent_len_t get_node_size() const = 0;
virtual nextent_state_t get_extent_state() const = 0;
virtual void prepare_mutate(context_t) = 0;
virtual bool is_level_tail() const = 0;
field_type_t field_type() const override { return FIELD_TYPE; }
laddr_t laddr() const override { return extent.get_laddr(); }
const char* read() const override { return extent.read().p_start(); }
+ extent_len_t get_node_size() const override { return extent.get_length(); }
nextent_state_t get_extent_state() const override { return extent.get_state(); }
void prepare_mutate(context_t c) override { return extent.prepare_mutate(c); }
bool is_level_tail() const override { return extent.read().is_level_tail(); }
} else {
ceph_abort("impossible path");
}
+#ifndef NDEBUG
+ if (pp_value) {
+ assert((const char*)(*pp_value) - extent.read().p_start() <
+ extent.get_length());
+ }
+#endif
}
void get_prev_slot(search_position_t& pos,
{
node_offset_t offset = iter.get_back_offset();
int new_size = change + offset;
- assert(new_size > 0 && new_size < NODE_BLOCK_SIZE);
+ assert(new_size > 0 && new_size < (int)mut.get_length());
mut.copy_in_absolute(
(void*)iter.get_item_range().p_end, node_offset_t(new_size));
}
template <node_type_t NODE_TYPE>
-node_offset_t ITER_T::trim_until(NodeExtentMutable&, const ITER_T& iter)
+node_offset_t ITER_T::trim_until(NodeExtentMutable& mut, const ITER_T& iter)
{
assert(iter.index() != 0);
size_t ret = iter.p_end() - iter.p_items_start;
- assert(ret < NODE_BLOCK_SIZE);
+ assert(ret < mut.get_length());
return ret;
}
NodeExtentMutable& mut, const ITER_T& iter, node_offset_t trimmed)
{
size_t trim_size = iter.p_start() - iter.p_items_start + trimmed;
- assert(trim_size < NODE_BLOCK_SIZE);
+ assert(trim_size < mut.get_length());
assert(iter.get_back_offset() > trimmed);
node_offset_t new_offset = iter.get_back_offset() - trimmed;
mut.copy_in_absolute((void*)iter.item_range.p_end, new_offset);
}
node_offset_t size() const {
size_t ret = item_range.p_end - item_range.p_start + sizeof(node_offset_t);
- assert(ret < NODE_BLOCK_SIZE);
+ assert(ret < node_size);
return ret;
};
node_offset_t size_to_nxt() const {
size_t ret = get_key().size() + sizeof(node_offset_t);
- assert(ret < NODE_BLOCK_SIZE);
+ assert(ret < node_size);
return ret;
}
node_offset_t size_overhead() const {
void encode(const char* p_node_start, ceph::bufferlist& encoded) const {
int start_offset = p_items_start - p_node_start;
int end_offset = p_items_end - p_node_start;
- assert(start_offset > 0 && start_offset < NODE_BLOCK_SIZE);
- assert(end_offset > 0 && end_offset <= NODE_BLOCK_SIZE);
+ assert(start_offset > 0 && start_offset < (int)node_size);
+ assert(end_offset > 0 && end_offset <= (int)node_size);
ceph::encode(static_cast<node_offset_t>(start_offset), encoded);
ceph::encode(static_cast<node_offset_t>(end_offset), encoded);
ceph::encode(_index, encoded);
node_offset_t end_offset;
ceph::decode(end_offset, delta);
assert(start_offset < end_offset);
- assert(end_offset <= NODE_BLOCK_SIZE);
+ assert(end_offset <= node_size);
index_t index;
ceph::decode(index, delta);
}
node_offset_t size() const {
size_t ret = length + sizeof(string_size_t);
- assert(ret < NODE_BLOCK_SIZE);
+ assert(ret < MAX_NODE_SIZE);
return ret;
}
node_offset_t size_logical() const {
}
bool operator!=(const string_key_view_t& x) const { return !(*this == x); }
- void reset_to(const char* origin_base, const char* new_base) {
- reset_ptr(p_key, origin_base, new_base);
- reset_ptr(p_length, origin_base, new_base);
+ void reset_to(const char* origin_base,
+ const char* new_base,
+ extent_len_t node_size) {
+ reset_ptr(p_key, origin_base, new_base, node_size);
+ reset_ptr(p_length, origin_base, new_base, node_size);
#ifndef NDEBUG
string_size_t current_length;
std::memcpy(¤t_length, p_length, sizeof(string_size_t));
node_offset_t size() const {
if (type() == Type::STR) {
size_t ret = nspace.size() + oid.size();
- assert(ret < NODE_BLOCK_SIZE);
+ assert(ret < MAX_NODE_SIZE);
return ret;
} else {
return sizeof(string_size_t);
}
bool operator!=(const ns_oid_view_t& x) const { return !(*this == x); }
- void reset_to(const char* origin_base, const char* new_base) {
- nspace.reset_to(origin_base, new_base);
- oid.reset_to(origin_base, new_base);
+ void reset_to(const char* origin_base,
+ const char* new_base,
+ extent_len_t node_size) {
+ nspace.reset_to(origin_base, new_base, node_size);
+ oid.reset_to(origin_base, new_base, node_size);
}
template <KeyT KT>
replace(key);
}
- void reset_to(const char* origin_base, const char* new_base) {
+ void reset_to(const char* origin_base,
+ const char* new_base,
+ extent_len_t node_size) {
if (p_shard_pool != nullptr) {
- reset_ptr(p_shard_pool, origin_base, new_base);
+ reset_ptr(p_shard_pool, origin_base, new_base, node_size);
}
if (p_crush != nullptr) {
- reset_ptr(p_crush, origin_base, new_base);
+ reset_ptr(p_crush, origin_base, new_base, node_size);
}
if (p_ns_oid.has_value()) {
- p_ns_oid->reset_to(origin_base, new_base);
+ p_ns_oid->reset_to(origin_base, new_base, node_size);
}
if (p_snap_gen != nullptr) {
- reset_ptr(p_snap_gen, origin_base, new_base);
+ reset_ptr(p_snap_gen, origin_base, new_base, node_size);
}
}
IA_TEMPLATE(KeyT::HOBJ);
node_offset_t internal_sub_items_t::trim_until(
- NodeExtentMutable&, internal_sub_items_t& items, index_t index)
+ NodeExtentMutable& mut, internal_sub_items_t& items, index_t index)
{
assert(index != 0);
auto keys = items.keys();
assert(index <= keys);
size_t ret = sizeof(internal_sub_item_t) * (keys - index);
- assert(ret < NODE_BLOCK_SIZE);
+ assert(ret < mut.get_length());
return ret;
}
size_trim_offsets);
mut.copy_in_absolute((void*)items.p_num_keys, num_keys_t(index));
size_t ret = size_trim_offsets + (p_shift_start - p_items_start);
- assert(ret < NODE_BLOCK_SIZE);
+ assert(ret < mut.get_length());
return ret;
}
}
node_offset_t size_before(index_t index) const {
size_t ret = index * sizeof(internal_sub_item_t);
- assert(ret < NODE_BLOCK_SIZE);
+ assert(ret < node_size);
return ret;
}
const laddr_packed_t* get_p_value(index_t index) const {
int end_offset = p_end - p_node_start;
assert(start_offset > 0 &&
start_offset < end_offset &&
- end_offset < NODE_BLOCK_SIZE);
+ end_offset < (int)node_size);
ceph::encode(static_cast<node_offset_t>(start_offset), encoded);
ceph::encode(static_cast<node_offset_t>(end_offset), encoded);
}
node_offset_t end_offset;
ceph::decode(end_offset, delta);
assert(start_offset < end_offset);
- assert(end_offset <= NODE_BLOCK_SIZE);
+ assert(end_offset <= node_size);
return internal_sub_items_t({{p_node_start + start_offset,
p_node_start + end_offset},
node_size});
(index + 1) * sizeof(node_offset_t) +
get_offset(index).value;
}
- assert(ret < NODE_BLOCK_SIZE);
+ assert(ret < node_size);
return ret;
}
node_offset_t size_overhead_at(index_t index) const { return sizeof(node_offset_t); }
int end_offset = p_end - p_node_start;
assert(start_offset > 0 &&
start_offset < end_offset &&
- end_offset < NODE_BLOCK_SIZE);
+ end_offset < (int)node_size);
ceph::encode(static_cast<node_offset_t>(start_offset), encoded);
ceph::encode(static_cast<node_offset_t>(end_offset), encoded);
}
node_offset_t end_offset;
ceph::decode(end_offset, delta);
assert(start_offset < end_offset);
- assert(end_offset <= NODE_BLOCK_SIZE);
+ assert(end_offset < node_size);
return leaf_sub_items_t({{p_node_start + start_offset,
p_node_start + end_offset},
node_size});
field_type_t field_type() const override { return field_type_t::N0; }
const char* read() const override {
ceph_abort("impossible path"); }
+ extent_len_t get_node_size() const override {
+ ceph_abort("impossible path"); }
nextent_state_t get_extent_state() const override {
ceph_abort("impossible path"); }
level_t level() const override { return 0u; }