}
}
}
+
std::map<std::string, ceph::bufferlist> dup_kvs;
+ if (kvs.size() > BATCH_CREATE_SIZE) {
+ auto alloc_log_node = [&](laddr_t prev_laddr)
+ -> omap_set_key_iertr::future<LogNodeRef> {
+ return tm.alloc_non_data_extent<LogNode>(
+ t, log_root.hint, LOG_NODE_BLOCK_SIZE
+ ).handle_error_interruptible(
+ crimson::ct_error::enospc::assert_failure{"unexpected enospc"},
+ omap_set_key_iertr::pass_further{}
+ ).si_then([prev_laddr](auto ext) {
+ assert(ext);
+ ext->set_prev_addr(prev_laddr);
+ return omap_set_key_iertr::make_ready_future<LogNodeRef>(ext);
+ });
+ };
+
+ LogNodeRef e = co_await alloc_log_node(ext->get_laddr());
+ LogNodeRef dup_e = co_await alloc_log_node(
+ co_await get_dup_addr_from_root(t, ext->get_laddr()));
+ for (auto &p : kvs) {
+ if (!is_log_key(p.first)) {
+ co_await remove_kv(t, log_root.addr, p.first, nullptr);
+ // reload latest log list e because e was updated if the key is in e
+ CachedExtentRef node;
+ Transaction::get_extent_ret ret;
+ ret = t.get_extent(e->get_paddr(), &node);
+ assert(ret == Transaction::get_extent_ret::PRESENT);
+ assert(node);
+ e = node->template cast<LogNode>();
+ }
+ LogNodeRef cur = e;
+ if (is_dup_log_key(p.first)) {
+ cur = dup_e;
+ }
+ if (cur->expect_overflow(p.first.size(), p.second.length())) {
+ cur = co_await alloc_log_node(cur->get_laddr());
+ if (!is_dup_log_key(p.first)) {
+ e = cur;
+ } else {
+ dup_e = cur;
+ }
+ }
+ cur->append_kv(t, p.first, p.second);
+ }
+ if (e->is_initial_pending()) {
+ e->set_dup_tail_addr(dup_e->get_laddr());
+ } else {
+ auto mut = tm.get_mutable_extent(t, e)->cast<LogNode>();
+ mut->set_dup_tail_addr(dup_e->get_laddr());
+ }
+ log_root.update(e->get_laddr(), log_root.depth,
+ log_root.hint, log_root.type);
+ co_return;
+ }
+
for (auto &p : kvs) {
if (is_ow_key(p.first) && has_ow_key) {
continue;
dup_keys.insert(std::move(nh));
}
-
// Deletion of pg_log_entry_t entries is performed by omap_rm_keys using a set.
// For example, omap_rm_keys might be called with a set containing
// pg_log_entry_t entries ranging from 0011.0001 to 0011.0010.
struct d_bitmap_t {
uint64_t bitmap[BITMAP_ARRAY_SIZE] = {0};
static constexpr size_t BITS_PER_WORD = 64;
+ static constexpr size_t MAX_ENTRY = BITS_PER_WORD * BITMAP_ARRAY_SIZE;
d_bitmap_t() = default;
void set_bitmap(size_t bit) {
int ow_gap_from_last_entry(const size_t key, const size_t val);
bool expect_overflow(const std::string &key, size_t vsize, bool can_ow);
+ bool expect_overflow(size_t ksize, size_t vsize) const {
+ if (get_size() + reserved_size + 1 > d_bitmap_t::MAX_ENTRY) {
+ return true;
+ }
+ return free_space() < get_entry_size(ksize, vsize) + reserved_len;
+ }
void update_delta() {
if (!delta_buffer.empty()) {