tree_cursor_t::tree_cursor_t(Ref<LeafNode> node, const search_position_t& pos)
: ref_leaf_node{node}, position{pos}, cache{ref_leaf_node}
{
- assert(!is_end());
+ assert(is_tracked());
ref_leaf_node->do_track_cursor<true>(*this);
}
const key_view_t& key_view, const value_header_t* p_value_header)
: ref_leaf_node{node}, position{pos}, cache{ref_leaf_node}
{
- assert(!is_end());
+ assert(is_tracked());
update_cache_same_node(key_view, p_value_header);
ref_leaf_node->do_track_cursor<true>(*this);
}
tree_cursor_t::~tree_cursor_t()
{
- if (!is_end()) {
+ if (is_tracked()) {
ref_leaf_node->do_untrack_cursor(*this);
}
}
tree_cursor_t::future<Ref<tree_cursor_t>>
tree_cursor_t::get_next(context_t c)
{
+ assert(is_tracked());
return ref_leaf_node->get_next_cursor(c, position);
}
if (is_end()) {
assert(ref_leaf_node == prv.ref_leaf_node);
assert(ref_leaf_node->is_level_tail());
- } else {
+ } else if (is_tracked()) {
auto key = get_key_view(magic);
auto prv_key = prv.get_key_view(magic);
assert(key.compare_to(prv_key) == MatchKindCMP::GT);
assert(!prv.ref_leaf_node->is_level_tail());
assert(position == search_position_t::begin());
}
+ } else {
+ assert(is_invalid());
+ ceph_abort("impossible");
}
#endif
}
MatchKindCMP tree_cursor_t::compare_to(
const tree_cursor_t& o, value_magic_t magic) const
{
- // singleton
+ if (!is_tracked() && !o.is_tracked()) {
+ return MatchKindCMP::EQ;
+ } else if (!is_tracked()) {
+ return MatchKindCMP::GT;
+ } else if (!o.is_tracked()) {
+ return MatchKindCMP::LT;
+ }
+
+ assert(is_tracked() && o.is_tracked());
+ // all tracked cursors are singletons
if (this == &o) {
return MatchKindCMP::EQ;
}
tree_cursor_t::future<>
tree_cursor_t::extend_value(context_t c, value_size_t extend_size)
{
+ assert(is_tracked());
return ref_leaf_node->extend_value(c, position, extend_size);
}
tree_cursor_t::future<>
tree_cursor_t::trim_value(context_t c, value_size_t trim_size)
{
+ assert(is_tracked());
return ref_leaf_node->trim_value(c, position, trim_size);
}
void tree_cursor_t::update_track(
Ref<LeafNode> node, const search_position_t& pos)
{
- // the cursor must be already untracked
+ // I must be already untracked
+ assert(is_tracked());
+ assert(!ref_leaf_node->check_is_tracking(*this));
// track the new node and new pos
assert(!pos.is_end());
- assert(!is_end());
ref_leaf_node = node;
position = pos;
// we lazy update the key/value information until user asked
void tree_cursor_t::update_cache_same_node(const key_view_t& key_view,
const value_header_t* p_value_header) const
{
- assert(!is_end());
+ assert(is_tracked());
cache.update_all(ref_leaf_node->get_version(), key_view, p_value_header);
cache.validate_is_latest(position);
}
+void tree_cursor_t::invalidate()
+{
+ assert(is_tracked());
+ ref_leaf_node.reset();
+ assert(is_invalid());
+ // I must be removed from LeafNode
+}
+
/*
* tree_cursor_t::Cache
*/
template <bool VALIDATE>
void Node::as_child(const search_position_t& pos, Ref<InternalNode> parent_node)
{
+ // I must be already untracked.
assert(!super);
+#ifndef NDEBUG
+ if (_parent_info.has_value()) {
+ assert(!_parent_info->ptr->check_is_tracking(*this));
+ }
+#endif
_parent_info = parent_info_t{pos, parent_node};
parent_info().ptr->do_track_child<VALIDATE>(*this);
}
} else {
cursor = get_or_track_cursor(result.position, index_key, result.p_value);
}
- return node_ertr::make_ready_future<search_result_t>(
- search_result_t{cursor, result.mstat});
+ search_result_t ret{cursor, result.mstat};
+ ret.validate_input_key(key, c.vb.get_header_magic());
+ return node_ertr::make_ready_future<search_result_t>(ret);
}
node_future<> LeafNode::do_get_tree_stats(context_t, tree_stats_t& stats)
{
#ifndef NDEBUG
assert(this == cursor.get_leaf_node().get());
- assert(!cursor.is_end());
+ assert(cursor.is_tracked());
auto [key, p_value_header] = get_kv(cursor.get_position());
auto magic = p_value_header->magic;
assert(key.compare_to(cursor.get_key_view(magic)) == MatchKindCMP::EQ);
* pairs in the tree. An end cursor won't contain valid key-value
* information.
*/
- bool is_end() const { return position.is_end(); }
+ bool is_end() const { return !!ref_leaf_node && position.is_end(); }
+
+ /**
+ * is_tracked
+ *
+ * Represents a key-value pair stored in the tree, which is always tracked
+ * across insert/split/erase/merge operations.
+ */
+ bool is_tracked() const { return !!ref_leaf_node && !position.is_end(); }
+
+ /**
+ * is_invalid
+ *
+ * Represents an invalid cursor which was once valid and tracked by the tree
+ * but is now erased and untracked. User may still hold an invalid cursor.
+ */
+ bool is_invalid() const { return !ref_leaf_node; }
/// Returns the key view in tree if it is not an end cursor.
const key_view_t& get_key_view(value_magic_t magic) const {
- assert(!is_end());
+ assert(is_tracked());
return cache.get_key_view(magic, position);
}
/// Get the latest value_header_t pointer for read.
const value_header_t* read_value_header(value_magic_t magic) const {
- assert(!is_end());
+ assert(is_tracked());
return cache.get_p_value_header(magic, position);
}
/// Prepare the node extent to be mutable and recorded.
std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
prepare_mutate_value_payload(context_t c) {
- assert(!is_end());
+ assert(is_tracked());
return cache.prepare_mutate_value_payload(c, position);
}
/// Trim and shrink the value payload.
future<> trim_value(context_t, value_size_t);
+ static Ref<tree_cursor_t> get_invalid() {
+ static Ref<tree_cursor_t> INVALID = new tree_cursor_t();
+ return INVALID;
+ }
+
private:
tree_cursor_t(Ref<LeafNode>, const search_position_t&);
tree_cursor_t(Ref<LeafNode>, const search_position_t&,
const key_view_t&, const value_header_t*);
// lookup reaches the end, contain leaf node for further insert
tree_cursor_t(Ref<LeafNode>);
+ // create an invalid tree_cursor_t
+ tree_cursor_t() : cache{ref_leaf_node} {}
const search_position_t& get_position() const { return position; }
Ref<LeafNode> get_leaf_node() const { return ref_leaf_node; }
void update_track(Ref<LeafNode>, const search_position_t&);
void update_cache_same_node(const key_view_t&,
const value_header_t*) const;
+ void invalidate();
static Ref<tree_cursor_t> create(Ref<LeafNode> node, const search_position_t& pos) {
return new tree_cursor_t(node, pos);
assert(mstat >= MSTAT_MIN && mstat <= MSTAT_MAX);
return (mstat == MSTAT_EQ ? MatchKindBS::EQ : MatchKindBS::NE);
}
+
+ void validate_input_key(const key_hobj_t& key, value_magic_t magic) const {
+#ifndef NDEBUG
+ if (match() == MatchKindBS::EQ) {
+ assert(key.compare_to(p_cursor->get_key_view(magic)) == MatchKindCMP::EQ);
+ } else {
+ assert(match() == MatchKindBS::NE);
+ if (p_cursor->is_tracked()) {
+ assert(key.compare_to(p_cursor->get_key_view(magic)) == MatchKindCMP::LT);
+ } else if (p_cursor->is_end()) {
+ // good
+ } else {
+ assert(p_cursor->is_invalid());
+ ceph_abort("impossible");
+ }
+ }
+#endif
+ }
};
virtual ~Node();
}
void do_untrack_child(const Node& child) {
+ assert(check_is_tracking(child));
auto& child_pos = child.parent_info().position;
- assert(tracked_child_nodes.find(child_pos)->second == &child);
[[maybe_unused]] auto removed = tracked_child_nodes.erase(child_pos);
assert(removed);
}
+ bool check_is_tracking(const Node& child) const {
+ auto& child_pos = child.parent_info().position;
+ auto found = tracked_child_nodes.find(child_pos);
+ if (found != tracked_child_nodes.end() && found->second == &child) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
static node_future<Ref<InternalNode>> allocate_root(
context_t, level_t, laddr_t, Super::URef&&);
validate_cursor(cursor);
}
auto& cursor_pos = cursor.get_position();
- assert(tracked_cursors.count(cursor_pos) == 0);
+ assert(tracked_cursors.find(cursor_pos) == tracked_cursors.end());
tracked_cursors.emplace(cursor_pos, &cursor);
}
void do_untrack_cursor(const tree_cursor_t& cursor) {
validate_cursor(cursor);
auto& cursor_pos = cursor.get_position();
- assert(tracked_cursors.find(cursor_pos)->second == &cursor);
+ assert(check_is_tracking(cursor));
[[maybe_unused]] auto removed = tracked_cursors.erase(cursor_pos);
assert(removed);
}
+ bool check_is_tracking(const tree_cursor_t& cursor) const {
+ auto& cursor_pos = cursor.get_position();
+ auto found = tracked_cursors.find(cursor_pos);
+ if (found != tracked_cursors.end() && found->second == &cursor) {
+ return true;
+ } else {
+ return false;
+ }
+ }
node_future<> extend_value(context_t, const search_position_t&, value_size_t);
node_future<> trim_value(context_t, const search_position_t&, value_size_t);
~Cursor() = default;
bool is_end() const {
- if (p_cursor) {
- assert(!p_cursor->is_end());
+ if (p_cursor->is_tracked()) {
return false;
- } else {
+ } else if (p_cursor->is_invalid()) {
return true;
+ } else {
+ // we don't actually store end cursor because it will hold a reference
+ // to an end leaf node and is not kept updated.
+ assert(p_cursor->is_end());
+ ceph_abort("impossible");
}
}
private:
Cursor(Btree* p_tree, Ref<tree_cursor_t> _p_cursor) : p_tree(p_tree) {
- if (_p_cursor->is_end()) {
- // no need to hold the leaf node
+ if (_p_cursor->is_invalid()) {
+ // we don't create Cursor from an invalid tree_cursor_t.
+ ceph_abort("impossible");
+ } else if (_p_cursor->is_end()) {
+ // we don't actually store end cursor because it will hold a reference
+ // to an end leaf node and is not kept updated.
} else {
+ assert(_p_cursor->is_tracked());
p_cursor = _p_cursor;
}
}
MatchKindCMP compare_to(const Cursor& o) const {
assert(p_tree == o.p_tree);
- if (p_cursor && o.p_cursor) {
- return p_cursor->compare_to(
- *o.p_cursor, p_tree->value_builder.get_header_magic());
- } else if (!p_cursor && !o.p_cursor) {
- return MatchKindCMP::EQ;
- } else if (!p_cursor) {
- return MatchKindCMP::GT;
- } else { // !o.p_cursor
- return MatchKindCMP::LT;
- }
+ return p_cursor->compare_to(
+ *o.p_cursor, p_tree->value_builder.get_header_magic());
}
static Cursor make_end(Btree* p_tree) {
}
Btree* p_tree;
- Ref<tree_cursor_t> p_cursor;
+ Ref<tree_cursor_t> p_cursor = tree_cursor_t::get_invalid();
friend class Btree;
};
Value::~Value() {}
+bool Value::is_tracked() const
+{
+ assert(!p_cursor->is_end());
+ return p_cursor->is_tracked();
+}
+
future<> Value::extend(Transaction& t, value_size_t extend_size)
{
+ assert(is_tracked());
[[maybe_unused]] auto target_size = get_payload_size() + extend_size;
return p_cursor->extend_value(get_context(t), extend_size)
#ifndef NDEBUG
future<> Value::trim(Transaction& t, value_size_t trim_size)
{
+ assert(is_tracked());
assert(get_payload_size() > trim_size);
[[maybe_unused]] auto target_size = get_payload_size() - trim_size;
return p_cursor->trim_value(get_context(t), trim_size)
Value& operator=(const Value&) = delete;
Value& operator=(Value&&) = delete;
+ /// Returns whether the Value is still tracked in tree.
+ bool is_tracked() const;
+
/// Returns the value payload size.
value_size_t get_payload_size() const {
+ assert(is_tracked());
return read_value_header()->payload_size;
}
template <typename PayloadT, typename ValueDeltaRecorderT>
std::pair<NodeExtentMutable&, ValueDeltaRecorderT*>
prepare_mutate_payload(Transaction& t) {
+ assert(is_tracked());
assert(sizeof(PayloadT) <= get_payload_size());
auto value_mutable = do_prepare_mutate_payload(t);
/// Get the latest payload pointer for read.
template <typename PayloadT>
const PayloadT* read_payload() const {
+ assert(is_tracked());
// see Value documentation
static_assert(alignof(PayloadT) == 1);
assert(sizeof(PayloadT) <= get_payload_size());