value_magic_t::ONODE,
128, // max_ns_size
320, // max_oid_size
- 1200 // max_value_payload_size
+ 1200, // max_value_payload_size
+ 4096, // internal_node_size
+ 4096 // leaf_node_size
};
enum class status_t {
constexpr auto INDEX_UPPER_BOUND = INDEX_END - 0x8;
inline bool is_valid_index(index_t index) { return index < INDEX_UPPER_BOUND; }
-// TODO: decide by NODE_BLOCK_SIZE
+// we support up to 64 KiB tree nodes
using node_offset_t = uint16_t;
constexpr node_offset_t DISK_BLOCK_SIZE = 1u << 12;
constexpr node_offset_t NODE_BLOCK_SIZE = DISK_BLOCK_SIZE * 1u;
+constexpr auto MAX_NODE_SIZE =
+ (extent_len_t)std::numeric_limits<node_offset_t>::max() + 1;
using string_size_t = uint16_t;
context_t c, laddr_t addr, bool expect_is_level_tail)
{
LOG_PREFIX(OTree::Node::load);
- // NOTE:
- // *option1: all types of node have the same length;
- // option2: length is defined by node/field types;
- // option3: length is totally flexible;
- return c.nm.read_extent(c.t, addr, NODE_BLOCK_SIZE
+ return c.nm.read_extent(c.t, addr
).handle_error(
eagain_ertr::pass_further{},
crimson::ct_error::input_output_error::handle(
c.t, addr, expect_is_level_tail);
ceph_abort("fatal error");
})
- ).safe_then([expect_is_level_tail](auto extent) {
+ ).safe_then([FNAME, c, expect_is_level_tail](auto extent) {
auto [node_type, field_type] = extent->get_types();
if (node_type == node_type_t::LEAF) {
+ if (extent->get_length() != c.vb.get_leaf_node_size()) {
+ ERRORT("leaf length mismatch -- {}", c.t, extent);
+ ceph_abort("fatal error");
+ }
auto impl = LeafNodeImpl::load(extent, field_type, expect_is_level_tail);
return Ref<Node>(new LeafNode(impl.get(), std::move(impl)));
} else if (node_type == node_type_t::INTERNAL) {
+ if (extent->get_length() != c.vb.get_internal_node_size()) {
+ ERRORT("internal length mismatch -- {}", c.t, extent);
+ ceph_abort("fatal error");
+ }
auto impl = InternalNodeImpl::load(extent, field_type, expect_is_level_tail);
return Ref<Node>(new InternalNode(impl.get(), std::move(impl)));
} else {
return eagain_ertr::make_ready_future<NodeExtentMutable>(*mut);
}
assert(!extent->is_initial_pending());
- return c.nm.alloc_extent(c.t, node_stage_t::EXTENT_SIZE
+ auto alloc_size = extent->get_length();
+ return c.nm.alloc_extent(c.t, alloc_size
).handle_error(
eagain_ertr::pass_further{},
crimson::ct_error::input_output_error::handle(
- [FNAME, c, l_to_discard = extent->get_laddr()] {
+ [FNAME, c, alloc_size, l_to_discard = extent->get_laddr()] {
ERRORT("EIO during allocate -- node_size={}, to_discard={:x}",
- c.t, node_stage_t::EXTENT_SIZE, l_to_discard);
+ c.t, alloc_size, l_to_discard);
ceph_abort("fatal error");
})
).safe_then([this, c, FNAME] (auto fresh_extent) {
crimson::ct_error::enoent,
crimson::ct_error::erange>;
virtual read_ertr::future<NodeExtentRef> read_extent(
- Transaction&, laddr_t, extent_len_t) = 0;
+ Transaction&, laddr_t) = 0;
using alloc_ertr = base_ertr;
virtual alloc_ertr::future<NodeExtentRef> alloc_extent(
bool is_read_isolated() const override { return false; }
read_ertr::future<NodeExtentRef> read_extent(
- Transaction& t, laddr_t addr, extent_len_t len) override {
- TRACET("reading {}B at {:#x} ...", t, len, addr);
+ Transaction& t, laddr_t addr) override {
+ TRACET("reading at {:#x} ...", t, addr);
if constexpr (SYNC) {
- return read_extent_sync(t, addr, len);
+ return read_extent_sync(t, addr);
} else {
using namespace std::chrono_literals;
- return seastar::sleep(1us).then([this, &t, addr, len] {
- return read_extent_sync(t, addr, len);
+ return seastar::sleep(1us).then([this, &t, addr] {
+ return read_extent_sync(t, addr);
});
}
}
private:
read_ertr::future<NodeExtentRef> read_extent_sync(
- Transaction& t, laddr_t addr, extent_len_t len) {
+ Transaction& t, laddr_t addr) {
auto iter = allocate_map.find(addr);
assert(iter != allocate_map.end());
auto extent = iter->second;
TRACET("read {}B at {:#x}", t, extent->get_length(), extent->get_laddr());
assert(extent->get_laddr() == addr);
- assert(extent->get_length() == len);
return read_ertr::make_ready_future<NodeExtentRef>(extent);
}
bool is_read_isolated() const override { return true; }
read_ertr::future<NodeExtentRef> read_extent(
- Transaction& t, laddr_t addr, extent_len_t len) override {
- TRACET("reading {}B at {:#x} ...", t, len, addr);
+ Transaction& t, laddr_t addr) override {
+ TRACET("reading at {:#x} ...", t, addr);
if constexpr (INJECT_EAGAIN) {
if (trigger_eagain()) {
- DEBUGT("reading {}B at {:#x}: trigger eagain", t, len, addr);
+ DEBUGT("reading at {:#x}: trigger eagain", t, addr);
return crimson::ct_error::eagain::make();
}
}
- return tm.read_extent<SeastoreNodeExtent>(t, addr, len
- ).safe_then([addr, len, &t](auto&& e) {
+ return tm.read_extent<SeastoreNodeExtent>(t, addr
+ ).safe_then([addr, &t](auto&& e) {
TRACET("read {}B at {:#x} -- {}",
t, e->get_length(), e->get_laddr(), *e);
if (!e->is_valid()) {
ceph_abort("fatal error");
}
assert(e->get_laddr() == addr);
- assert(e->get_length() == len);
std::ignore = addr;
- std::ignore = len;
return NodeExtentRef(e);
});
}
static eagain_future<typename parent_t::fresh_impl_t> allocate(
context_t c, bool is_level_tail, level_t level) {
LOG_PREFIX(OTree::Layout::allocate);
- // NOTE: Currently, all the node types have the same size for simplicity.
- // But depending on the requirement, we may need to make node size
- // configurable by field_type_t and node_type_t, or totally flexible.
- return c.nm.alloc_extent(c.t, node_stage_t::EXTENT_SIZE
+ extent_len_t extent_size;
+ if constexpr (NODE_TYPE == node_type_t::LEAF) {
+ extent_size = c.vb.get_leaf_node_size();
+ } else {
+ extent_size = c.vb.get_internal_node_size();
+ }
+ return c.nm.alloc_extent(c.t, extent_size
).handle_error(
eagain_ertr::pass_further{},
crimson::ct_error::input_output_error::handle(
- [FNAME, c, is_level_tail, level] {
- ERRORT("EIO -- node_size={}, is_level_tail={}, level={}",
- c.t, node_stage_t::EXTENT_SIZE, is_level_tail, level);
+ [FNAME, c, extent_size, is_level_tail, level] {
+ ERRORT("EIO -- extent_size={}, is_level_tail={}, level={}",
+ c.t, extent_size, is_level_tail, level);
ceph_abort("fatal error");
})
).safe_then([is_level_tail, level](auto extent) {
string_key_view_t::VALID_UPPER_BOUND);
ceph_assert(conf.max_oid_size <
string_key_view_t::VALID_UPPER_BOUND);
+ ceph_assert(conf.internal_node_size <= MAX_NODE_SIZE);
+ ceph_assert(conf.internal_node_size % DISK_BLOCK_SIZE == 0);
+ ceph_assert(conf.leaf_node_size <= MAX_NODE_SIZE);
+ ceph_assert(conf.leaf_node_size % DISK_BLOCK_SIZE == 0);
}
}
string_size_t max_ns_size;
string_size_t max_oid_size;
value_size_t max_value_payload_size;
+ extent_len_t internal_node_size;
+ extent_len_t leaf_node_size;
};
class tree_cursor_t;
virtual string_size_t get_max_ns_size() const = 0;
virtual string_size_t get_max_oid_size() const = 0;
virtual value_size_t get_max_value_payload_size() const = 0;
+ virtual extent_len_t get_internal_node_size() const = 0;
+ virtual extent_len_t get_leaf_node_size() const = 0;
virtual std::unique_ptr<ValueDeltaRecorder>
build_value_recorder(ceph::bufferlist&) const = 0;
};
value_size_t get_max_value_payload_size() const override {
return ValueImpl::TREE_CONF.max_value_payload_size;
}
+ extent_len_t get_internal_node_size() const override {
+ return ValueImpl::TREE_CONF.internal_node_size;
+ }
+ extent_len_t get_leaf_node_size() const override {
+ return ValueImpl::TREE_CONF.leaf_node_size;
+ }
std::unique_ptr<ValueDeltaRecorder>
build_value_recorder(ceph::bufferlist& encoded) const override {
template <value_magic_t MAGIC,
string_size_t MAX_NS_SIZE,
string_size_t MAX_OID_SIZE,
- value_size_t MAX_VALUE_PAYLOAD_SIZE>
+ value_size_t MAX_VALUE_PAYLOAD_SIZE,
+ extent_len_t INTERNAL_NODE_SIZE,
+ extent_len_t LEAF_NODE_SIZE>
class TestValue final : public Value {
public:
static constexpr tree_conf_t TREE_CONF = {
MAGIC,
MAX_NS_SIZE,
MAX_OID_SIZE,
- MAX_VALUE_PAYLOAD_SIZE
+ MAX_VALUE_PAYLOAD_SIZE,
+ INTERNAL_NODE_SIZE,
+ LEAF_NODE_SIZE
};
using id_t = test_item_t::id_t;
};
using UnboundedValue = TestValue<
- value_magic_t::TEST_UNBOUND, 4096, 4096, 4096>;
+ value_magic_t::TEST_UNBOUND, 4096, 4096, 4096, 4096, 4096>;
using BoundedValue = TestValue<
- value_magic_t::TEST_BOUNDED, 320, 320, 640>;
+ value_magic_t::TEST_BOUNDED, 320, 320, 640, 4096, 4096>;
}