if (can_ow) {
ow_done = true;
}
+ // If multiple blocks are needed to store the kv pair
+ if (log_node->get_max_val_length(k.size()) < v.length()) {
+ co_await _log_set_multi_block_key(log_root, t, log_node, k, v);
+ co_return;
+ }
co_await _log_set_key(log_root, t, log_node, k, v, can_ow);
co_return;
};
if (is_dup_log_key(p.first)) {
cur = dup_e;
}
+ if (e->get_max_val_length(p.first.size()) < p.second.length()) {
+ co_await _log_set_multi_block_key(log_root, t, cur, p.first, p.second);
+ if (!is_dup_log_key(p.first)) {
+ e = co_await log_load_extent<LogNode>(
+ t, log_root.addr, BEGIN_KEY, END_KEY);
+ } else {
+ dup_e = co_await log_load_extent<LogNode>(
+ t, co_await get_dup_addr_from_root(t, log_root.addr),
+ BEGIN_KEY, END_KEY);
+ }
+ continue;
+ }
if (cur->expect_overflow(p.first.size(), p.second.length())) {
cur = co_await alloc_log_node(cur->get_laddr());
if (!is_dup_log_key(p.first)) {
co_return co_await omap_set_keys(log_root, t, std::move(kvs));
}
+LogManager::omap_set_key_ret
+LogManager::_log_set_multi_block_key(omap_root_t &log_root,
+ Transaction &t, LogNodeRef tail,
+ const std::string &key, const ceph::bufferlist &value)
+{
+ LOG_PREFIX(LogManager::_log_set_multi_block_key);
+ DEBUGT("enter key={}", t, key);
+ size_t length = value.length();
+ size_t max_val_length = tail->get_max_val_length(key.size());
+ assert(max_val_length > 0);
+ uint16_t idx = 1;
+ size_t offset = 0;
+ LogNodeRef cur_extent = tail;
+
+ while (offset < length) {
+ size_t chunk_len = std::min(max_val_length, length - offset);
+ ceph::bufferlist chunk;
+ auto extent = co_await tm.alloc_non_data_extent<LogNode>(
+ t, log_root.hint, LOG_NODE_BLOCK_SIZE
+ ).handle_error_interruptible(
+ crimson::ct_error::enospc::assert_failure{"unexpected enospc"},
+ omap_set_key_iertr::pass_further{}
+ );
+ assert(extent);
+ chunk.substr_of(value, offset, chunk_len);
+ DEBUGT("offset={}, chunk_len={}, idx={}, value length={}",
+ t, offset, chunk_len, idx, value.length());
+ extent->append_multi_block_kv(t, key, chunk, idx);
+ extent->set_prev_addr(cur_extent->get_laddr());
+ cur_extent = extent;
+ offset += chunk_len;
+ idx++;
+ }
+ if (!is_dup_log_key(key)) {
+ log_root.update(cur_extent->get_laddr(), log_root.depth,
+ log_root.hint, log_root.type);
+ cur_extent->set_dup_tail_addr(tail->get_dup_tail_addr());
+ } else {
+ auto ext = co_await log_load_extent<LogNode>(
+ t, log_root.addr, BEGIN_KEY, END_KEY);
+ auto mut = tm.get_mutable_extent(t, ext)->cast<LogNode>();
+ mut->set_dup_tail_addr(cur_extent->get_laddr());
+ }
+ co_return;
+}
+
LogManager::omap_set_key_ret
LogManager::_log_set_key(omap_root_t &log_root,
Transaction &t, LogNodeRef tail,
if (extent == nullptr) {
co_return;
}
+ if (extent->has_multi_block_kv()) {
+ co_await find_multi_block_kvs(t, extent, first, last, kvs);
+ co_return;
+ }
extent->list(first, last, kvs);
co_await find_kvs(t, extent->get_prev_addr(), first, last, kvs);
co_return;
}
+LogManager::omap_list_iertr::future<>
+LogManager::find_multi_block_kvs(Transaction &t, LogNodeRef extent,
+ const std::optional<std::string> &first,
+ const std::optional<std::string> &last,
+ std::map<std::string, bufferlist> &kvs)
+{
+ LOG_PREFIX(LogManager::find_multi_block_kvs);
+ DEBUGT("first={}, last={}, dst={}", t, first, last, extent->get_laddr());
+ extent->list(first, last, kvs);
+ if (extent->get_prev_addr() == L_ADDR_NULL) {
+ co_return;
+ }
+ auto prev_extent = co_await log_load_extent<LogNode>(
+ t, extent->get_prev_addr(), BEGIN_KEY, END_KEY);
+ if (prev_extent->has_multi_block_kv()) {
+ co_await find_multi_block_kvs(t, prev_extent, first, last, kvs);
+ } else {
+ co_await find_kvs(t, prev_extent->get_laddr(), first, last, kvs);
+ }
+ co_return;
+}
LogManager::omap_get_value_ret
LogManager::find_kv(Transaction &t, laddr_t dst, const std::string &key)
if (extent == nullptr) {
co_return std::nullopt;
}
+ if (extent->has_multi_block_kv(key)) {
+ bufferlist buf;
+ co_await find_multi_block_kv(t, key, extent, buf);
+ co_return std::move(buf);
+ }
auto e = co_await extent->get_value(key);
if (e == std::nullopt) {
co_return std::move(e);
}
+LogManager::omap_get_value_iertr::future<>
+LogManager::find_multi_block_kv(Transaction &t, const std::string &key,
+ LogNodeRef extent, bufferlist &buf)
+{
+ LOG_PREFIX(LogManager::find_multi_block_kv);
+ DEBUGT("key={}, dst={}", t, key, extent->get_laddr());
+ auto e = co_await extent->get_value(key);
+ assert(e);
+ if (extent->is_first_multi_block(key)) {
+ buf.append(*e);
+ co_return;
+ }
+ assert(extent->get_prev_addr() != L_ADDR_NULL);
+ auto prev_extent = co_await log_load_extent<LogNode>(
+ t, extent->get_prev_addr(), BEGIN_KEY, END_KEY);
+ if (prev_extent->has_multi_block_kv(key)) {
+ co_await find_multi_block_kv(t, key, prev_extent, buf);
+ } else {
+ assert(0 == "impossible");
+ }
+ assert(e);
+ buf.append(*e);
+ co_return;
+}
+
LogManager::omap_rm_key_ret
LogManager::remove_node(Transaction &t, LogNodeRef mut, LogNodeRef prev)
{
LOG_PREFIX(LogManager::remove_kv);
DEBUGT("key={}, dst={}", t, key, dst);
+ if (dst == L_ADDR_NULL) {
+ co_return;
+ }
auto extent = co_await log_load_extent<LogNode>(
t, dst, BEGIN_KEY, END_KEY);
if (extent == nullptr) {
co_return;
}
+ if (extent->has_multi_block_kv(key)) {
+ auto mut = tm.get_mutable_extent(t, extent)->template cast<LogNode>();
+ mut->remove_entry(key);
+ assert(mut->is_removable());
+ if (mut->is_removable()) {
+ co_await remove_node(t, mut, prev);
+ if (prev != nullptr && mut->get_prev_addr() != L_ADDR_NULL) {
+ mut = co_await log_load_extent<LogNode>(
+ t, prev->get_laddr(), BEGIN_KEY, END_KEY);
+ }
+ }
+ co_await remove_kv(t, mut->get_prev_addr(), key, mut);
+ co_return;
+ }
+
auto e = co_await extent->get_value(key);
if (e == std::nullopt) {
if(extent->get_prev_addr() == L_ADDR_NULL) {
base_iertr::future<laddr_t> get_dup_addr_from_root(Transaction &t, laddr_t addr);
+ /**
+ *
+ * Support for multi-block KV pairs
+ *
+ * Each log_key_t contains a chunk_idx field to manage values
+ * that span multiple LogNodes when the value size exceeds the
+ * maximum capacity of a single LogNode.
+ * For simplicity, we always create separate blocks for each chunk,
+ * even if this may introduce some internal fragmentation.
+ * TODO: Implement block packing to improve space efficiency.
+ *
+ * Layout example:
+ *
+ * log_root
+ * |
+ * v
+ * +-------------------------------+
+ * | LogNode (1 KV, chunk_idx:2) |
+ * | (later chunk) |
+ * +-------------------------------+
+ * |
+ * v
+ * +-------------------------------+
+ * | LogNode (1 KV, chunk_idx:1) |
+ * | (earlier chunk) |
+ * +-------------------------------+
+ *
+ */
+ omap_get_value_iertr::future<>
+ find_multi_block_kv(Transaction &t, const std::string &key,
+ LogNodeRef extent, bufferlist &buf);
+ omap_list_iertr::future<>
+ find_multi_block_kvs(Transaction &t, LogNodeRef extent,
+ const std::optional<std::string> &first,
+ const std::optional<std::string> &last,
+ std::map<std::string, bufferlist> &kvs);
+ omap_set_key_ret
+ _log_set_multi_block_key(omap_root_t &log_root,
+ Transaction &t, LogNodeRef tail,
+ const std::string &key, const ceph::bufferlist &value);
+
+
TransactionManager &tm;
};
struct log_key_t {
uint16_t key_len = 0;
uint16_t val_len = 0;
+ uint16_t chunk_idx = 0;
log_key_t() = default;
- log_key_t(uint16_t k_len, uint16_t v_len)
- : key_len(k_len), val_len(v_len) {}
+ log_key_t(uint16_t k_len, uint16_t v_len, uint16_t c_idx = 0)
+ : key_len(k_len), val_len(v_len), chunk_idx(c_idx) {}
DENC(log_key_t, v, p) {
DENC_START(1, 1, p);
denc(v.key_len, p);
denc(v.val_len, p);
+ denc(v.chunk_idx, p);
DENC_FINISH(p);
}
};
struct log_key_le_t {
ceph_le16 key_len{0};
ceph_le16 val_len{0};
+ ceph_le16 chunk_idx{0};
log_key_le_t() = default;
log_key_le_t(const log_key_le_t &) = default;
explicit log_key_le_t(const log_key_t &key)
: key_len(key.key_len),
- val_len(key.val_len) {}
+ val_len(key.val_len),
+ chunk_idx(key.chunk_idx) {}
log_key_le_t& operator=(log_key_t key) {
key_len = key.key_len;
val_len = key.val_len;
+ chunk_idx = key.chunk_idx;
return *this;
}
operator log_key_t() const {
return log_key_t{uint16_t(key_len),
- uint16_t(val_len)};
+ uint16_t(val_len), uint16_t(chunk_idx)};
}
};
bl.append(bptr);
return bl;
}
+
+ uint64_t get_chunk_idx() const {
+ return get_node_key().chunk_idx;
+ }
};
using const_iterator = iter_t<true>;
set_size(get_size() + 1);
}
+ void _append_multi_block_kv(const std::string &key, const ceph::bufferlist &val,
+ const uint16_t idx) {
+ iterator prev_iter(this, get_last_pos());
+ auto last = prev_iter->get_node_key();
+ iterator next_iter(this, get_size() == 0 ? get_last_pos() :
+ get_last_pos() + get_entry_size(last.key_len, last.val_len));
+ next_iter.set_node_key(log_key_t(key.size(), val.length(), idx));
+ next_iter.set_node_val(key, val);
+ ceph_assert(get_size() == 0);
+ set_size(get_size() + 1);
+ }
+
void _overwrite(const std::string &key, const ceph::bufferlist &val) {
iterator iter(this, get_last_pos());
iter.set_node_key(log_key_t(key.size(), val.length()));
return is_mutation_pending() ? &delta_buffer : nullptr;
}
+ void append_multi_block_kv(Transaction &t, const std::string &key,
+ const ceph::bufferlist &val, const uint16_t idx);
+
void append_kv(Transaction &t, const std::string &key,
const ceph::bufferlist &val);
return free_space() < get_entry_size(ksize, vsize) + reserved_len;
}
+ size_t get_max_val_length(size_t ksize) {
+ return (capacity() - get_entry_size(ksize, 0));
+ }
+
+ bool is_first_multi_block(const std::string &key) const {
+ auto iter = iter_begin();
+ return (iter->get_chunk_idx() == 1 && iter->get_key() == key);
+ }
+
+ bool has_multi_block_kv() const {
+ auto iter = iter_begin();
+ return (iter->get_chunk_idx() >= 1);
+ }
+
+ bool has_multi_block_kv(const std::string &key) const {
+ auto iter = iter_begin();
+ return (iter->get_chunk_idx() >= 1 && iter->get_key() == key);
+ }
+
void update_delta() {
if (!delta_buffer.empty()) {
delta_buffer.replay(*this);