);
}
+OMapInnerNode::merge_entry_ret
+OMapInnerNode::do_merge(
+ omap_context_t oc,
+ internal_const_iterator_t liter,
+ internal_const_iterator_t riter,
+ OMapNodeRef l,
+ OMapNodeRef r)
+{
+ LOG_PREFIX(OMapInnerNode::do_merge);
+ DEBUGT("make_full_merge l {} r {} liter {} riter {}",
+ oc.t, *l, *r, liter->get_key(), riter->get_key());
+ return l->make_full_merge(oc, r
+ ).si_then([liter=liter, riter=riter, l=l, r=r, oc, this, FNAME]
+ (auto &&replacement) {
+ DEBUGT("to update parent: {}", oc.t, *this);
+ this->update_child_ptr(
+ liter.get_offset(),
+ dynamic_cast<base_child_t*>(replacement.get()));
+ journal_inner_update(
+ liter,
+ replacement->get_laddr(),
+ maybe_get_delta_buffer());
+ this->remove_child_ptr(riter.get_offset());
+ journal_inner_remove(riter, maybe_get_delta_buffer());
+ //retire extent
+ std::vector<laddr_t> dec_laddrs {l->get_laddr(), r->get_laddr()};
+ auto next = liter + 1;
+ auto end = next == iter_cend() ? get_end() : next.get_key();
+ assert(end == r->get_end());
+ replacement->init_range(liter.get_key(), std::move(end));
+ if (get_meta().depth > 2) { // replacement is an inner node
+ auto &rep = *replacement->template cast<OMapInnerNode>();
+ rep.adjust_copy_src_dest_on_merge(
+ oc.t,
+ *l->template cast<OMapInnerNode>(),
+ *r->template cast<OMapInnerNode>());
+ }
+ return dec_ref(oc, dec_laddrs
+ ).si_then([this, oc, r=std::move(replacement)] {
+ --(oc.t.get_omap_tree_stats().extents_num_delta);
+ if (extent_is_below_min()) {
+ return merge_entry_ret(
+ interruptible::ready_future_marker{},
+ mutation_result_t(mutation_status_t::NEED_MERGE,
+ std::nullopt, this));
+ } else {
+ return merge_entry_ret(
+ interruptible::ready_future_marker{},
+ mutation_result_t(mutation_status_t::SUCCESS,
+ std::nullopt, std::nullopt));
+ }
+ });
+ });
+}
+
+OMapInnerNode::merge_entry_ret
+OMapInnerNode::do_balance(
+ omap_context_t oc,
+ internal_const_iterator_t liter,
+ internal_const_iterator_t riter,
+ OMapNodeRef l,
+ OMapNodeRef r)
+{
+ LOG_PREFIX(OMapInnerNode::do_balance);
+ DEBUGT("balanced l {} r {} liter {} riter {}",
+ oc.t, *l, *r, liter->get_key(), riter->get_key());
+ return l->make_balanced(oc, r
+ ).si_then([FNAME, liter=liter, riter=riter, l=l, r=r, oc, this](auto tuple) {
+ auto [replacement_l, replacement_r, replacement_pivot] = tuple;
+ if (!replacement_pivot) {
+ return merge_entry_ret(
+ interruptible::ready_future_marker{},
+ mutation_result_t(mutation_status_t::SUCCESS,
+ std::nullopt, std::nullopt));
+ }
+ replacement_l->init_range(l->get_begin(), *replacement_pivot);
+ replacement_r->init_range(*replacement_pivot, r->get_end());
+ DEBUGT("to update parent: {} {} {}",
+ oc.t, *this, *replacement_l, *replacement_r);
+ if (get_meta().depth > 2) { // l and r are inner nodes
+ auto &left = *l->template cast<OMapInnerNode>();
+ auto &right = *r->template cast<OMapInnerNode>();
+ auto &rep_left = *replacement_l->template cast<OMapInnerNode>();
+ auto &rep_right = *replacement_r->template cast<OMapInnerNode>();
+ this->adjust_copy_src_dest_on_balance(
+ oc.t, left, right, true, rep_left, rep_right);
+ }
+
+ //update operation will not cuase node overflow, so we can do it first
+ this->update_child_ptr(
+ liter.get_offset(),
+ dynamic_cast<base_child_t*>(replacement_l.get()));
+ journal_inner_update(
+ liter,
+ replacement_l->get_laddr(),
+ maybe_get_delta_buffer());
+ bool overflow = extent_will_overflow(replacement_pivot->size(),
+ std::nullopt);
+ if (!overflow) {
+ this->update_child_ptr(
+ riter.get_offset(),
+ dynamic_cast<base_child_t*>(replacement_r.get()));
+ journal_inner_remove(riter, maybe_get_delta_buffer());
+ journal_inner_insert(
+ riter,
+ replacement_r->get_laddr(),
+ *replacement_pivot,
+ maybe_get_delta_buffer());
+ std::vector<laddr_t> dec_laddrs{l->get_laddr(), r->get_laddr()};
+ return dec_ref(oc, dec_laddrs
+ ).si_then([] {
+ return merge_entry_ret(
+ interruptible::ready_future_marker{},
+ mutation_result_t(mutation_status_t::SUCCESS,
+ std::nullopt, std::nullopt));
+ });
+ } else {
+ DEBUGT("balanced and split {} r {} riter {}",
+ oc.t, *l, *r, riter.get_key());
+ //use remove and insert to instead of replace,
+ //remove operation will not cause node split, so we can do it first
+ this->remove_child_ptr(riter.get_offset());
+ journal_inner_remove(riter, maybe_get_delta_buffer());
+ return make_split_insert(
+ oc, riter, *replacement_pivot, replacement_r
+ ).si_then([this, oc, l = l, r = r](auto mresult) {
+ std::vector<laddr_t> dec_laddrs{
+ l->get_laddr(),
+ r->get_laddr(),
+ get_laddr()};
+ return dec_ref(oc, dec_laddrs
+ ).si_then([mresult = std::move(mresult)] {
+ return merge_entry_ret(
+ interruptible::ready_future_marker{}, mresult);
+ });
+ });
+ }
+ });
+}
+
OMapInnerNode::merge_entry_ret
OMapInnerNode::merge_entry(
omap_context_t oc,
auto [liter, riter] = is_left ?
std::make_pair(donor_iter, iter) : std::make_pair(iter, donor_iter);
if (l->can_merge(r)) {
- DEBUGT("make_full_merge l {} r {} liter {} riter {}",
- oc.t, *l, *r, liter->get_key(), riter->get_key());
assert(entry->extent_is_below_min());
- return l->make_full_merge(oc, r
- ).si_then([liter=liter, riter=riter, l=l, r=r, oc, this]
- (auto &&replacement) {
- LOG_PREFIX(OMapInnerNode::merge_entry);
- DEBUGT("to update parent: {}", oc.t, *this);
- this->update_child_ptr(
- liter.get_offset(),
- dynamic_cast<base_child_t*>(replacement.get()));
- journal_inner_update(
- liter,
- replacement->get_laddr(),
- maybe_get_delta_buffer());
- this->remove_child_ptr(riter.get_offset());
- journal_inner_remove(riter, maybe_get_delta_buffer());
- //retire extent
- std::vector<laddr_t> dec_laddrs {l->get_laddr(), r->get_laddr()};
- auto next = liter + 1;
- auto end = next == iter_cend() ? get_end() : next.get_key();
- assert(end == r->get_end());
- replacement->init_range(liter.get_key(), std::move(end));
- if (get_meta().depth > 2) { // replacement is an inner node
- auto &rep = *replacement->template cast<OMapInnerNode>();
- rep.adjust_copy_src_dest_on_merge(
- oc.t,
- *l->template cast<OMapInnerNode>(),
- *r->template cast<OMapInnerNode>());
- }
- return dec_ref(oc, dec_laddrs
- ).si_then([this, oc, r=std::move(replacement)] {
- --(oc.t.get_omap_tree_stats().extents_num_delta);
- if (extent_is_below_min()) {
- return merge_entry_ret(
- interruptible::ready_future_marker{},
- mutation_result_t(mutation_status_t::NEED_MERGE,
- std::nullopt, this));
- } else {
- return merge_entry_ret(
- interruptible::ready_future_marker{},
- mutation_result_t(mutation_status_t::SUCCESS,
- std::nullopt, std::nullopt));
- }
- });
- });
+ return do_merge(oc, liter, riter, l, r);
} else { // !l->can_merge(r)
- DEBUGT("balanced l {} r {} liter {} riter {}",
- oc.t, *l, *r, liter->get_key(), riter->get_key());
- return l->make_balanced(oc, r
- ).si_then([liter=liter, riter=riter, l=l, r=r, oc, this](auto tuple) {
- LOG_PREFIX(OMapInnerNode::merge_entry);
- auto [replacement_l, replacement_r, replacement_pivot] = tuple;
- if (!replacement_pivot) {
- return merge_entry_ret(
- interruptible::ready_future_marker{},
- mutation_result_t(mutation_status_t::SUCCESS,
- std::nullopt, std::nullopt));
- }
- replacement_l->init_range(l->get_begin(), *replacement_pivot);
- replacement_r->init_range(*replacement_pivot, r->get_end());
- DEBUGT("to update parent: {} {} {}",
- oc.t, *this, *replacement_l, *replacement_r);
- if (get_meta().depth > 2) { // l and r are inner nodes
- auto &left = *l->template cast<OMapInnerNode>();
- auto &right = *r->template cast<OMapInnerNode>();
- auto &rep_left = *replacement_l->template cast<OMapInnerNode>();
- auto &rep_right = *replacement_r->template cast<OMapInnerNode>();
- this->adjust_copy_src_dest_on_balance(
- oc.t, left, right, true, rep_left, rep_right);
- }
-
- //update operation will not cuase node overflow, so we can do it first
- this->update_child_ptr(
- liter.get_offset(),
- dynamic_cast<base_child_t*>(replacement_l.get()));
- journal_inner_update(
- liter,
- replacement_l->get_laddr(),
- maybe_get_delta_buffer());
- bool overflow = extent_will_overflow(replacement_pivot->size(),
- std::nullopt);
- if (!overflow) {
- this->update_child_ptr(
- riter.get_offset(),
- dynamic_cast<base_child_t*>(replacement_r.get()));
- journal_inner_remove(riter, maybe_get_delta_buffer());
- journal_inner_insert(
- riter,
- replacement_r->get_laddr(),
- *replacement_pivot,
- maybe_get_delta_buffer());
- std::vector<laddr_t> dec_laddrs{l->get_laddr(), r->get_laddr()};
- return dec_ref(oc, dec_laddrs
- ).si_then([] {
- return merge_entry_ret(
- interruptible::ready_future_marker{},
- mutation_result_t(mutation_status_t::SUCCESS,
- std::nullopt, std::nullopt));
- });
- } else {
- DEBUGT("balanced and split {} r {} riter {}",
- oc.t, *l, *r, riter.get_key());
- //use remove and insert to instead of replace,
- //remove operation will not cause node split, so we can do it first
- this->remove_child_ptr(riter.get_offset());
- journal_inner_remove(riter, maybe_get_delta_buffer());
- return make_split_insert(
- oc, riter, *replacement_pivot, replacement_r
- ).si_then([this, oc, l = l, r = r](auto mresult) {
- std::vector<laddr_t> dec_laddrs{
- l->get_laddr(),
- r->get_laddr(),
- get_laddr()};
- return dec_ref(oc, dec_laddrs
- ).si_then([mresult = std::move(mresult)] {
- return merge_entry_ret(
- interruptible::ready_future_marker{}, mresult);
- });
- });
- }
- });
+ return do_balance(oc, liter, riter, l, r);
}
});
-
}
OMapInnerNode::internal_const_iterator_t